Async in Dart (3) ตอบค่าเป็นกระแสข้อมูลด้วย Stream

ในการเขียนโปรแกรมทั่วๆ ไปเรามักจะส่งข้อมูลกันในรูปแบบของ sync เช่น

int getNumber() {
    return 1;
}

การเรียกใช้งานก็ง่ายๆ แบบนี้

print(getNumber());

แต่ในบทก่อนๆ เราเคยสอนเรื่องของ Generator ไปแล้ว นั่นคือการที่ส่งข้อมูลแบบหลายๆ ตัวกลับมาในรูปแบบของ sync*

ทบทวนแบบเร็วๆ! คือแทนที่จะตอบข้อมูลกลับแค่ครั้งเดียวด้วย return ก็เปลี่ยนเป็น yield แทน (yield ตอบได้หลายครั้งมากกว่า return)

Iterable<int> getNumbers() sync* {
    yield 1;
    yield 2;
    yield 3;
}

เวลารับข้อมูลจากฟังก์ชันพวกนี้ไปใช้ ก็เลยต้องใช้การวนลูปนั่นเอง

for(var num in getNumbers()) {
    print(num);
}

ถ้าสรุปเป็นรูป ก็จะได้แบบข้างล่างนี่

แต่ทั้ง 2 วิธีนี้มันเป็นการส่งข้อมูลแบบ Synchronous คือโค้ดยังทำงานเรียงๆ กันตามลำดับ และโค้ดทั้งหมดจะไม่ถูกแตกเป็นส่วนๆ เข้า Event Loop แบบ Asynchronous

ดังนั้นต่อไป... เรามาดู 2 กรณีนี้ แต่อยู่ในรูปของ Asynchronous กันบ้างดีกว่า!


ในบทที่แล้วเราพูดถึง Future กันไปแล้ว ซึ่งมันคือการเปลี่ยนจาก sync ให้กลายเป็น async นั่นเอง

แปลว่าเรายังขาดอยู่หนึ่งตัว คือ async* นั่นเอง

Single Value Zero or more Values
Sync int Iterable
Async Future Stream

สรุปง่ายๆ คือ Stream ก็คือ Iterable ที่อยู่ในรูปของ Asynchronous นั่นเอง

วิธีการดึงค่าออกมาจาก Stream

การใช้งาน Stream ก็คล้ายๆ กับ Future นั่นแหละ แต่มีรายละเอียดเยอะกว่า เพราะมันตอบได้หลายค่ามากกว่า Future ยังไงล่ะ

ขอสมมุติว่าเรามีฟังก์ชันหนึ่ง ที่ตอบค่าเป็น Stream ชื่อ getNumberStream() นะ

รอฟังค่าเรื่อยๆ ด้วย listen

สำหรับ Future เราจะใช้คำสั่ง then() ในการรอค่า แต่ Stream จะใช้ listen() แทน

Stream<int> numberStream = getNumberStream();

var subscription = numberStream.listen((num){
    print('Receive: $num');
});

listen() จะตอบกลับเป็น subscription ซึ่งเดี๋ยวจะพูดถึงอีกที
แน่นอนว่าเวลาเอาไปรัน ผลที่ได้อาจจะเป็นแบบนี้

output:
Receive: 1
Receive: 2
Receive: 3
Receive: ...

ก็คือเราได้รับตัวเลขหลายๆ ตัวนั่นเอง แต่อาจจะได้ตอนไหนก็ไม่รู้ แบบนี้

time            │     │    │
0|             1sec.  │    │
1| Receive: 1  ─┘    2sec. │
2| Receive: 2  ───────┘    │
3|                        4sec.
4| Receive: 3  ────────────┘
5|

สำหรับตัว listen() นั้นมี options เสริมให้เราใช้งานได้หลายตัว คือ

  • onError: เมื่อเกิด error ให้ทำอะไร
  • onDone: เมื่อได้รับข้อมูลครบทุกตัวแล้วให้ทำอะไร (ทำงานเมื่อ Stream ตอบข้อมูลครบหมดทุกตัวแล้ว มันจะเรียก onDone ให้ทำงาน)
  • cancelOnError: เป็นตัวกำหนดว่าถ้าเกิด error ขึ้นแล้ว (ในกรณีที่ข้อมูลยังส่งกลับมาไม่ครบทุกตัว) จะเลือกที่จะหยุดการทำงานของ Stream ไปเลย หรือจะยังให้ทำงานต่อก็ได้
var subscription = numberStream.listen(
    (num){
        print('ได้รับข้อมูล $num');
    }, 
    onError: (err){
        print('มีข้อผิดพลาดเกิดขึ้นล่ะ $err');
    },
    onDone: (){
        print('ได้รับข้อมูลครบแล้วจ้า');
    },
    cancelOnError: false,
);

ลดรูป Stream ด้วย await

เช่นเดียวกับตอน Future นะ คือเราสามารถเปลี่ยน listen() ให้ไปอยู่ในรูปแบบการเขียนแบบ Synchronous ได้ แต่ก็จะไม่ได้ตรงๆ แบบ Future นะเพราะเราต้องรับข้อมูลด้วยลูป

Stream<int> numberStream = getNumberStream();

await for(var number in numberStream) {
    print('Receive: $number');
}

Broadcast Stream

ตามปกติแล้วการใช้งาน Stream จะมีการ listen() ได้แค่ครั้งเดียวเท่านั้น

เมื่อเรา subscription ไปครั้งหนึ่งแล้ว ถ้าจะ subscription ซึ่งกับ Stream ตัวเดิมมันจะเกิด Exception ขึ้นนะ!!

Stream<int> numberStream = getNumberStream();

var subscription1 = numberStream.listen((num){
    print('Receive: $num');
});

//second subscribe -> Exception!!
var subscription2 = numberStream.listen((num){
    print('Receive: $num');
});

วิธีการแก้ (ถ้าอยาก subscription หลายครั้งจริงๆ) ให้เปลี่ยน Stream ตัวนั้นให้เป็นสิ่งที่เรียกว่า "Broadcast Stream" แทน

วิธีเปลี่ยนก็ง่ายๆ คือใช้ get property ที่ชื่อว่า asBroadcastStream แบบนี้

Stream<int> numberStream = getNumberStream().asBroadcastStream;

var subscription1 = numberStream.listen((num){
    print('First receive:  $num');
});

var subscription2 = numberStream.listen((num){
    print('Second receive: $num');
});

ทีนี้ ถ้า Stream ของเรามีการส่งตัวเลข [1 เมื่อผ่านไป1วินาที] และ [3 เมื่อผ่านไป3วินาที] ก็จะได้ผลลัพธ์แบบนี้ (สังเกตดูว่าเราได้ข้อมูลตัวละ 2 ครั้ง เพราะมี subscription 2 ตัวนั่นเอง)

time                   │     │
0|                   1sec.   │
1| First receive:  1  ─┤     │
 | First receive:  1  ─┘    3sec.
2|                           │
3| Second receive: 3  ───────┤
 | Second receive: 3  ───────┘
4|

Subscription

กลับมาที่สิ่งที่เราพูดค้างไว้ นั่นคือตอบที่เราสั่ง listen() เราจะได้สิ่งที่เรียกว่า subscription กลับมา

เพราะ Stream คือ "กระแสข้อมูล" ที่ไหลมาเรื่อยๆ มาตอนไหนก็ไม่รู้ เลยมี subscription เอาไว้ควบคุมกระแสนั้นอีกที

//หยุดการรับข้อมูล
subscription.pause();

//กลับรับข้อมูลใหม่
subscription.resume();

//แคนเซิล Stream ตัวนั้น
subscription.cancel();

แต่มีข้อควรระวังอย่างนึง คือการ pause() ไม่ใช้การไม่รับข้อมูลในจังหวะนั้น แต่เป็นการหยุดรับข้อมูลชั่วคราวเท่านั้น แปลว่า "ข้อมูลไม่ได้หายไปนะ มันแค่เข้าคิวรอเรา resume() อีกทีนั่นเอง!"

ลองดูตัวอย่างข้างล่างประกอบ ขอสมมุติว่าเรามี stream อยู่หนึ่งตัวที่จะส่งตัวเลขกลับมาเรื่อยๆ ทุกๆ 1 วินาที [1, 2, 3, 4, 5, ...] แบบนี้นะ

var subscription = stream.listen((x){
    print('Receive: $x');
});
Future.delayed(Duration(seconds: 3), (){
    subscription.pause();
});
Future.delayed(Duration(seconds: 6), (){
    subscription.resume();
});
Future.delayed(Duration(seconds: 8), (){
    subscription.cancel();
});

แล้วเราก็ตั้งค่า (โดยใช้ Future.delayed) ให้มัน..

  • pause: เมื่อผ่านไป 3 วินาที
  • resume: เมื่อผ่านไป 6 วินาที (จากตอนเริ่มโปรแกรม)
  • cancel: เมื่อผ่านไป 8 วินาที (จากตอนเริ่มโปรแกรม)

เราอาจจะคิดว่าระหว่างวินาทีที่ 3-6 เราจะไม่ได้รับข้อมูลช่วงนั้น (คือข้อมูล [4,5] หายไปเลย)

แต่ไม่ใช่นะ! เพราะเลข [4,5] นั้นยังอยู่นะ แค่มันเข้าคิวรอที่จะออกมาอยู่

เมื่อเรา resume ในวินาทีที่ 6 มันก็จะออกมาทีเดียวหมดเลย [4,5,6]

ดูอธิบายด้วย timeline ข้างล่างนี่น่าจะเข้าใจมากกว่า

time  
1| Receive: 1
2| Receive: 2
3| Receive: 3
4|            ├─ waiting until (6)
5|            │
6| Receive: 4 ┐
 | Receive: 5 ├─ 3 values in 1 sec.
 | Receive: 6 ┘
7| Receive: 7
8| Receive: 8
9|

มาลองสร้าง Stream กันบ้าง

การสร้าง Stream ทำได้หลายวิธีมากๆ แต่แบบง่ายที่สุดคือใช้วิธีแบบ Generator

แปล Iterable เป็น Stream

เราเคยสอนการสร้าง Generator ไปแล้วในบท Dart 105: มันวนซ้ำได้นะ! มาสร้าง Generator และใช้งาน Iterable กันเถอะ แต่ตอนนั้นเราเขียนมันในรูปแบบของ sync

อย่างที่พูดไปตอนต้นว่า Stream ก็คือ Iterable ที่อยู่ในรูปของ Asynchronous เราจะมาแสดงให้เห็นกัน

ขอเปิดด้วยโค้ดแบบ Iterable ในรูปของ Synchronous

โจทย์คือ... เราต้องการสร้างฟังก์ชันสำหรับดึง Data ออกมาจำนวนหนึ่งตั้งแต่ from ถึง to

Data fetchOneData(int id){
    ...
}

Iterable<int> getData(int from, int to) sync* {
    var dataList = [];
    for(var i=from; i<=to; i++){
        dataList.add(fetchOneData(i))
    }
    return dataList;
}

(ฟังก์ชัน fetchOneData() ทำอะไรไม่ต้องสนใจนะ ไม่ใช่ประเด็นของเรื่อนี้)

แต่เพื่อ performance ที่ดี เลยเราไม่โหลดข้อมูลในครั้งเดียว แต่เลือกที่จะเขียนมันในรูปแบบ Generator แทน ก็จะได้โค้ดแบบนี้..

Data fetchOneData(int id){
    ...
}

Iterable<int> getData(int from, int to) sync* {
    for(var i=from; i<=to; i++){
        yield fetchOneData(i);
    }
}

เวลาเราจะใช้งาน ก็เอาไปวนลูปได้ธรรมดาๆ แบบนี้

var dataList = getData(1, 10);

for(var data in dataList) {
    print(data);
}

ขึ้นต่อไปคือ แล้วถ้าข้อมูลของเราไม่ได้มาแบบ sync ล่ะ?

ถ้าฟังก์ชัน fetchOneData() ใช้เวลาโหลดข้อมูลนานขึ้น เราคงต้องเปลี่ยนมันเป็น Future แบบนี้

Future<Data> fetchData(int id) {
    ...
}

เมื่อเป็นแบบนี้ เราก็มีปัญหาซะแล้ว!

เพราะฟังก์ชัน getData() ที่เป็น sync ไม่สามารถเรียก fetchOneData() ที่เป็น async ได้ล่ะ!!

วิธีการแก้ก็คือเปลี่ยนฟังก์ชัน fetchOneData() ให้หลายเป็น async ยังไงล่ะ

"ด้วยการเปลี่ยน Iterable ให้กลายเป็น Stream"

สิ่งที่เราต้องทำ มีอยู่ 3 อย่าง

  1. จากเดิมที่รีเทิร์นค่าเป็น Iterable เราต้องเปลี่ยนมันเป็น Stream แทน
  2. จากเดิมที่ฟังก์ชันเป็นชนิด sync* เราต้องเปลี่ยนมันเป็น async*
  3. จากเดิมที่เรา yield ค่าทันทีได้เลย แต่เมื่อมันเป็น Future แล้วเราก็ต้องสั่งให้มันรอด้วย await
Stream<Data> getData(int from, int to) async* {
    for(var id=from; id<=to; id++) {
        yield await fetchOneData(id);
    }
}

void main() {
    fetchAllData(1, 10).listen(print);
}

จะเห็นว่า Iterable นั้นแปลงเป็น Stream ได้ง่ายๆ เลย

ลองมาดูกันอีกตัวอย่างกัน

int sumIt(List<int> items) {
    int sum = 0;
    for(var item in items) {
        sum += item;
    }
    return sum;
}

เรามีฟังก์ชัน sumIt() สำหรับหาผลรวมทั้งหมดใน List แต่ถ้าลิสต์ตัวนี้ไม่ได้ค่ามาแบบ sync ล่ะ? จะทำยังไง?

คราวนี้ให้สังเกตว่า parameter นั้นรับ List (หรือก็คือ Iterable นั่นแหละนะ) เข้ามา แล้วมันรีเทิร์นค่าแบบสเกล่าธรรมดา (คือ int ธรรมดาๆ นี่แหละ)

สำหรับเคสนี้ สิ่งที่เราต้องทำก็คือ

  1. เปลี่ยนค่าแบบสเกล่าธรรมดาให้กลายเป็น Future
  2. เปลี่ยนให้ฟังก์ชันเป็น async (ระวัง! ไม่ได้เปลี่ยนเป็น async* นะ เพราะมันไม่ได้รีเทิร์น Stream)
  3. จุดที่เป็นปัญหาคือ for loop ที่วนค่าตัว Stream .. ให้เราเติม await ลงไปข้างหน้าเป็นอันจบ
Future<int> sumIt(Stream<int> items) async {
    int sum = 0;

    await for(var item in items) {
        sum += item;
    }

    return sum;
}

เรื่องสุดท้ายคือในบางกรณี เราอาจจะมีฟังก์ชัน Stream ที่เรียกใช้งาน Stream อีกตัวก็เป็นได้นะ

Stream<Data> getFirstStream() async* {
    ...
}

Stream<Data> getSecondStream() async* {
    yield getFirstStream(); //Not Work!!
}

วิธีการนี้โค้ดอาจคอมไพล์ได้ แต่เราจะไม่ได้ข้อมูลอะไรเลย (ถ้าใครยังไม่เข้าใจว่าทำไมไม่เวิร์ค ลองกลับไปอ่านบทที่เราพูดถึง Generator อีกทีนะ)

วิธีการแก้ก็เหมือนกับฟังก์ชัน sync* นั่นแหละ คือเราจะต้องวนลูปทีละตัว

Stream<Data> getFirstStream() async* {
    ...
}

Stream<Data> getSecondStream() async* {
    await for(var s in getFirstStream()){
        yield s;
    }
}

หรือแบบง่ายกว่าคือการใช้ yield* ก็ได้

Stream<Data> getFirstStream() async* {
    ...
}

Stream<Data> getSecondStream() async* {
   yield* getFirstStream(); //Ok!!
}

นอกจากวิธีสร้างแบบ Generator ที่เราใช้แนวคิดเดียวกับ Generator ใน sync แล้วยังมีอีกวิธีหนึ่งที่ให้เราคุมการไหลของข้อมูลใน Stream ได้คล่องตัวขึ้น นั่นคือใช้ StreamController

ซึ่งเราจะพูดถึงกันในบทต่อไปนะ (พร้อมกับเก็บตกเนื้อหาเกี่ยวกับ Stream กันอีกนิดหน่อยด้วย)

12 Total Views 3 Views Today
Ta

Ta

สิ่งมีชีวิตตัวอ้วนๆ กลมๆ เคลื่อนที่ไปไหนโดยการกลิ้ง .. ถนัดการดำรงชีวิตโดยไม่โดนแสงแดด
ปัจจุบันเป็น Senior Software Engineer อยู่ที่ Centrillion Technology
งานอดิเรกคือ เขียนโปรแกรม อ่านหนังสือ เขียนบทความ วาดรูป และ เล่นแบดมินตัน

You may also like...