القنوات Channels
بسم الله الرحمن الرحيم
السلام عليكم ورحمة الله وبركاته
في هذا الدرس سنتعرف على التيارات Streams وعلى أنواعهم وبالاخص النوع المسمى بالقنوات Channels.
ماذا ستقرأ
- مقدمة في التيارات Streams.
- التيار الحار والتيار البارد.
- ماهي القنوات.
- لماذا القنوات.
- آلية مشاركة القنوات للعناصر.
- آلية عمل القنوات.
- كيف نتعامل مع القنوات.
- مثال برمجي لإرسال العناصر من خلال قناة من كروتين الى آخر.
- أنواع القنوات.
- النوع الأول الـ Unbuffered) Rendezvous).
- النوع الثاني الـ Conflated.
- النوع الثالث الـ Buffered.
- النوع الرابع الـ Unlimited.
- أنماط القنوات.
- النمط الأول: المنتجين Producers.
- النمط الثاني: خطوط الأنابيب Pipelines.
- النمط الثالث: Fan-out.
- النمط الرابع: Fan-in.
- النمط الخامس: Ticker.
مقدمة في التيارات Streams
بعض الاحيان يكفينا إرسال طلب واحد سواء كان الغاية منه الحصول على نتيجة ما أو عدم الحصول. مثل تحميل ملف من سيرفر بالانترنت, عرض صورة, أمر تخزين أو قراءة في قواعد البيانات, وما الى ذلك.
هذا الأمر رأيناه في الدروس السابقة. فمثلاً نستطيع جعل دالة تأجيلية أو كروتين يقوم بتنفيذ أمر واحد بدون الحاجة الى رجوع نتيجة ما, والتحكم في هذا الأمر من خلال البناء launch والعنصر Job. أو عكس ذلك فإذا, اردنا الحصول على نتيجة مرجعة, فنستطيع ذلك من خلال البناء async والعنصر Deferred الذي يسمح بإرجاع ناتج.
ولكن ماذا لو اردنا الحصول على أكثر من نتيجة بطلب واحد؟ هنا تكمن ميزة التيارات (بما يعرف بالـ Streams) فمن خلال إرسال طلب واحد نستطيع الحصول على نتائج كثيرة. هذا الشئ يعتبر ذو منفعة كبيرة في التعامل مع الكثير من الأشياء مثل نظام الملاحة (بما يعرف بالـ GPS). فمن خلال إرسال طلب واحد نستطيع تلقي المعطيات أكثر من مرة بدون ان نعمل تحديث لذلك الطلب. كذلك ينطبق هذا الأمر على الكثير من حالات الطلب من السيرفرات ففي بعض الاحيان نحتاج الى تدفقات كثيرة من البيانات بطلب واحد فقط.
عنصر الـ Deferred يسمح لنا بنقل قيمة واحدة بين الكروتينات, بينما القناة Channel تسمح لنا بنقل أكثر من قيمة واحدة بين الكروتينات.
التيار الحار والتيار البارد
من حسن الحظ أن مكتبة الـ Kotlin Coroutines توفر لنا نوعين من التيارات streams منها ما يسمى بـ القناة Channel وآخر يسمى بـ التدفق Flow.
ويكمن الاختلاف بينهما في أن القناة Channel تعتبر تيار حار Hot Stream, وهذا يعني أنها سوف تقوم بإرسال النتائج لتلبية طلبنا بشكل مستمر سواء كنا منصتين لها أم لا. أما التدفق Flow فيعتبر تيار بارد Cold Stream, وهذا يعني بإنه سوف يقوم بإرسال النتائج لتلبية طلبنا فقط إذا كنا منصتين له. وفي العادة فإن استخدام القنوات يكون مكلفاً نوعاً ما, ويقتصر على الأنواع البدائية.
ماهي القنوات؟
تعتبر القنوات Channels احدى الاستراتيجيات من التيارات Streams. وهي تمثل طريقة لنقل العناصر (خاصة البدائية) بين الكروتينات.
لماذا القنوات؟
كما تم الاشارة لهذه النقطه سابقاً, نقوم بإستخدام القنوات حينما نريد الحصول على نتائج كثيرة من خلال طلب واحد. أو بشكل تفصيلي نقوم بإستخدام القنوات عندما نريد نقل أكثر من عنصر بين الكروتينات.
آلية مشاركة القنوات للعناصر
في الخيوط الحاسوبية Thread يتم مشاركة العناصر من خلال مشاركة الذاكرة بين خيط وآخر, وهذا يسبب ظهور مشاكل مثل سباق السرعة والقفل الميت وما الى ذلك اخطاء التزامن (الـ Concurrency) الشائعة. بينما الكروتينات Coroutines تستخدم شئ نستطيع تشبيهه بانه خط أنابيب بينها لنقل الذاكرة من كروتين الى آخر. إي إنها تقوم بنقل العنصر الذي يمثل البيانات وليس مشاركته بينها.
آلية عمل القنوات
ليكون لديك قناة يجب أن يكون لديك أكثر من كروتين. فلنفرض أن لديك كروتين بإسم A وكروتين آخر بإسم B. الان تستطيع إنشاء قناة تربط كلاً منهم.
وعندما يكون الكروتين A يريد ارسال بيان (بيان Datum وهو مفرد جمع بيانات Data) فإنها اولاً تعلن عن ذلك وتتحول حالة الكروتين A من نشطة إلى مؤجلة وهي تنتظر أحد أن يأخذ ذلك البيان. أما في الجهه المقابله من القناة, يعلن الكروتين B أنه مستعدة لإستقبال البيان, وهنا يصبح مؤجل. ولحسن الحظ أن الكروتين A مستعد لإرسال له البيان, فيأخذه وينتهي تأجيل الكروتينان A و B ويباشرون أعمالهم. ويجب الانتباه الى وجوب اغلاق القناة في نهاية المهمة والإ سيؤدي إلى تأجيل الكروتينات إلى وقت وقت لا نهائي.
كيف نتعامل مع القنوات
الأساس هو الواجهة المسمية بالـ Channel ونستطيع التعامل وإنشاء القنوات من خلالها. يوجد بعض من العناصر المصاحبة Companion Objects والتي من خلالها نستطيع تحديد نوع القناة. وكذلك عناصر موروثة Inherited Properties للإستعلام عن حالة القناة مثل isEmpty واخرى. ايضاً يوجد بعض من الدوال الموروثة Inherited Functions مثل close لغلق القناة و send لإرسال العناصر في القناة و receive لإستقبال العناصر. وما يهم هو وجود العديد من الدوال التأجيلية Extension Functions للتعامل مع القناة وعناصرها. للإطلاع على جميع الدوال راجع المصادر.
مثال برمجي
سنقوم بإنشاء قناة ثم سنقوم بإنشاء كروتين يرسل عناصر الى كروتين آخر من خلال هذه القناة, كالتالي:
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
fun main() {
// Channel <--- 1
val channel: Channel<Char> = Channel<Char>()
// Coroutine A <--- 2
GlobalScope.launch(Dispatchers.Default) {
val name = "Mohammad"
for (char: Char in name) {
channel.send(char)
}
// Close the Channel <--- 3
channel.close()
}
// Coroutine B <--- 4
GlobalScope.launch(Dispatchers.IO) {
for (char: Char in channel) {
println(c)
}
}
Thread.sleep(500)
}
- في ١ قمنا بإنشاء قناة افتراضية.
- في ٢ قمنا بإنشاء كروتين A. وتكمن وضيفته في إرسال العناصر من خلال هذه القناة.
- في ٣ قمنا بغلق القناة.
- في ٤ قمنا بإنشاء كروتين B. وتكمن وضيفته في إستقبال العناصر من خلال هذه القناة.
- يجب الانتباه الى نقطه مهمه هنا وهي: ان العناصر بعد استهلاكهم (إي استقبالهم) فإنها تزول من القناة.
أنواع القنوات
النوع الأول الـ Unbuffered) Rendezvous)
هذا النوع هو الافتراضي, ويقوم بإرسال العناصر حين يلتقيان الكروتينات مع بعضهم البعض (كـ الموعد). وآلية عمله كما تم شرحها في الفقرة السابقة. يتميز هذا النوع بعدم وجود Buffer لذلك نرى الكروتينات تتأجل كلما تعلن عن ارسال او استقبال بيان.
مثال برمجي لإنشائها:
val rendezvousChannel: Channel<Int> = Channel<Int>(Channel.RENDEZVOUS)
// Or
val rendezvousChannel: Channel<Int> = Channel<Int>()
- نوع القناة هنا سواء تم تحديدة ام لا فانه يعتبر افتراضياً.
- لاحظ نوع القناة Int إي انها سوف تقوم بإرسال عناصر من نوع ارقام Int.
النوع الثاني الـ Conflated
هذا النوع لدية Buffer يسع لبيان واحد. إذا كان الكروتين المستقبل B لا يستطيع مجارات الكروتين A فإن A سيقوم بمسح البيان القديم التي يمسك به واستبداله بالجديد. وهذا يعني أن الكروتين المرسل A لن يتم تأجيل عمله. أي سيقوم بإرسال البيانات واحداً تلو الآخر. وعندما يصبح الكروتين B جاهز لاستلام البيان فإنه سيستلم الجديد طبعاً, وهذا يعني أن B ستؤجل. إي أن الكروتين A لا يتأجل بل يرسل بياناً تلو الاخر, اما الكروتين B فإنه يتأجل عندما يريد استلام بيان.
مثال برمجي لإنشئه:
val conflatedChannel: Channel<Int> = Channel<Int>(Channel.CONFLATED)
النوع الثالث الـ Buffered
هذا النوع يستطيع صنع Buffer بإي عدد يريد المبرمج. ويتمثل هذا الـ Buffer في متسلسلة Array. عندما تكون ممتلئة بالبيانات يؤجل عمل الكروتين المرسل A. وفي الجهه المقابله الكروتين المستقبل B سيتأجل عند خلو ذلك الـ Buffer.
مثال برمجي لإنشئه:
val bufferedChannel: Channel<Int> = Channel<Int>(Channel.BUFFERED) // 64 Capacity.
// Or
val bufferedChannel: Channel<Int> = Channel<Int>(4)
- تستطيع تحديد افتراضياً لهذا النوع والذي سوف يعطيك مساحة بحجم ٦٤. وكذلك تستطيع تحديد حجم المساحة يدوياً اذا اردت ذلك.
النوع الرابع الـ Unlimited
مشابه للنوع السابق, ما يميزه انه يملك Buffer غير محدود وربما يقوم بملئ الذاكرة و تعطل البرنامج برمي الخطئ OutOfMemoryExecpction.
مثال برمجي لإنشئه:
val unlimitedChannel: Channel<Int> = Channel<Int>(Channel.UNLIMITED) // Int.MAX_VALUE capacity.
- هذا النوع يعطيك مساحة بحجم أكبر Int.
أنماط القنوات
والأن بعد ان عرفنا انواع القنوات التي تقدمهم مكتبة الـ Coroutine نستطيع بناء مانشاء من أنماط لحل المشاكل التي تواجهنا. وهنا في هذا القسم بعض من أشهر هذه الانماط.
النمط الأول: المنتجين Producers
وهو أن نقوم بجعل كروتين واحد يقوم بإنتاج بعض من العناصر. ومن ثم نقوم بإستقبال هذه العناصر أينما نريد. ولإختصار هذا الشئ يوجد بناء Builder بإسم produce.
مثال برمجي:
نستطيع إنشاءه بإستخدام البناء produce كالتالي:
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
for (x in 1..5) send(x * x)
}
- لاحظ إنشاء دالة اضافية بإسم produceSquares الى الفئه CoroutineScope.
- لاحظ ارجاع النوع ReceiveChannel وهو عبارة عن interface مخصصه بهذا الامر, وتحديد نوع البيانات عددي Int.
- لاحظ استخدام البناء produce.
- الدالة ستقوم بإرسال حصيلة ضرب الاعداد من ١ الى ٥ في القناة.
ثم نقوم بإستخدامه بالشكل التالي:
fun main() = runBlocking {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
- لاحظ إنشاء عنصر بإسم squares عددي من نوع ReceiveChannel (بشكل ضمني).
- لاحظ استخدام الدالة consumeEach لإستهلاك البيانات العددية.
النمط الثاني: خطوط الأنابيب Pipelines
وهو أن نقوم بجعل كروتين واحد يقوم بإنتاج قيم لانهائية, ومن ثم يقوم كروتين آخر أو عدة كروتينات بإستقبال هذه القيم ومعالجتها.
مثال برمجي:
كروتين A يقوم بإنتاج الارقام بشكل لانهائي:
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++)
}
- عبارة عن إنشاء منتج produce من نوع عددي Int.
كروتين B آخر يقوم بإستهلاك هذه الارقام, ثم اجراء عليها عمليه حسابية:
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x)
}
- إنشاء منتج produce وجعل نوع الارجاع ReceiveChannel حاملة نوع البيانات العددية Int.
وتتم عملية الربط بين الاثنين كما بالشكل التالي:
fun main() = runBlocking {
val numbers = produceNumbers()
val squares = square(numbers)
repeat(5) {
println(squares.receive())
}
println("Done!")
coroutineContext.cancelChildren()
}
يوجد هنالك مثال آخر في هذا Prime numbers with pipeline.
النمط الثالث: Fan-out
وهو أن يقوم العديد من الكروتينات بالاستقبال من نفس القناة, ومن ثم توزيع العمل على انفسهم.
مثال برمجي:
إنشاء منتج كما جاء سابقاً, ينتج اعداد Int كل ثانية:
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
إنشاء كروتين مغلف في دالة اضافية, تستقبل عدد وقناه من نوع عددي:
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
والاستخدام كالتالي:
fun main() = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
}
- لاحظ إنشاء منتج producer من الدالة السابقة.
- لاحظ استخدام الدالة repeat لإنشاء ٥ كروتينات من الدالة السابقة. وهكذا سيقومون بإستقبال ماسيتم ارساله من المنتج.
النمط الرابع: Fan-in
وهو أن يقوم العديد من الكروتينات بالارسال الى نفس القناة.
مثال برمجي:
إنشاء دالة تأجيلية تقوم بإستقبال قناة من نوع String ومتغير من نوع String وعدد رقمي يمثل الوقت.
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
- لفهم هذه الدالة:
- تقوم بإستقبال قناة.
- و نص.
- و وقت.
- وتقوم بإرسال النص الى القناة التي استقبلتها مع مراعات الوقت.
ويكون استخدامها على الشكل التالي:
fun main() = runBlocking {
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) {
println(channel.receive())
}
coroutineContext.cancelChildren()
}
- ننشئ قناة من نوع String.
- نستخدم البناء launch لمناداة دالتنا التأجيلية التي قمنا بعملها سابقاً.
- نزودها بالقناة التي إنشئناها و ونص ومقدار الزمن.
- نقوم بتكرار امر الاستقبال ٦ مرات.
- ولاننسى بإلغاء جميع الكروتينات التي تم إنشائهم في هذا النطاق.
وهكذا قمنا بعمل العديد من الكروتينات (يتمثلون في دالتنا التأجيلية) وجعلناهم يرسلون الى نفس القناة.
النمط الخامس: Ticker
وهو نمط يقوم على إنشاء قناة من نوع rendezvous وظيفتها إنتاج النوع Unit في كل عملية delay تتم بعد استهلاك عناصر من القناة. وتعتبر قناة مساعدة.
في النهاية تعرفنا على التيارات والتيار الحار القنوات, في الدرس القادم سنتعرف على التيار البارد والذي يعتبر من أكثر الاشياء المستخدمه!
المصادر
محتوى الدورة
مقدمة | |
---|---|
1 | مقدمة |
الروتينات التعاونية | |
1 | نظرة عامة على الـ Coroutines |
2 | نظرة خاصة على الـ Coroutines في لغة الكوتلن |
3 | إنشاء الكروتين Coroutine |
مكونات الروتينات التعاونية | |
1 | النطاقات الـ Scopes |
2 | البنائين الـ Builders |
3 | الموزعين الـ Dispatchers |
الدوال التأجيلية | |
1 | الدوال التأجيلية Suspended Functions |
التيارات Streams | |
1 | القنوات Channels الدرس الحالي |
2 | التدفق Flow |
الكلمات الدليلية
عن الدرس
0 إعجاب |
1 متابع |
0 مشاركة |
1365 مشاهدات |
منذ 4 سنوات |
التعليقات (0)
لايوجد لديك حساب في عالم البرمجة؟
تحب تنضم لعالم البرمجة؟ وتنشئ عالمك الخاص، تنشر المقالات، الدورات، تشارك المبرمجين وتساعد الآخرين، اشترك الآن بخطوات يسيرة !