Async in Dart (4) ควบคุมข้อมูลในstreamอย่างเหนือชั้นด้วย StreamController

ในบทที่แล้วเราสอนการสร้าง Stream แบบง่ายๆ ไปแล้ว แต่ในบางครั้งเราต้องการควบคุมข้อมูลใน Stream แบบคัสตอมมากๆ ซึ่งข้อมูลอาจจะเข้ามาจากหลายทางมากๆ การที่เรามีแค่ yield จากฟังก์ชันเดียวอาจจะไม่เพียงพอ

ดังนั้นเลยเป็นที่มาของคลาสที่ชื่อว่า StreamController ซึ่งเป็นคลาสที่เอาไว้ควบคุม Stream อีกที

StreamController

การสร้าง Stream แบบธรรมดาก็จะเป็นประมาณนี้

Stream<int> getNumberStream() async* {
    ...
}

Stream<int> numberStream = getNumberStream();

ส่วนใหญ่เราจะสร้าง Stream จากฟังก์ชันประเภท async* ซึ่งภายในฟังก์ชันก็จะมีการ yield ข้อมูลกลับมา

ทีนี้ ถ้าเราจะเปลี่ยนไปใช้ StreamController แทน ก็จะเป็นแบบนี้

StreamController<int> controller = StreamController<int>();

นั่นคือ จากปกติที่เราจะสร้างข้อมูลในฟังก์ชันเท่านั้น การสร้างด้วย StreamController คือเราไม่ต้องกำหนดว่า Stream ตัวนี้จะสร้างอะไรเลย (แค่ new ตัวคอนโทรเลอร์ขึ้นมาเฉยๆ ก็พอแล้ว)

StreamController ไม่ใช่ Stream นะ ไม่สามารถเอาไปใช้แทน Stream ได้ แต่สามารถดึง Stream ออกมาจากตัวมันได้

ให้คิดง่ายๆ ว่ามันคือ Controller ที่ถือ Stream เอาไว้ข้างในอีกทีก็ได้

สำหรับอ็อบเจค StreamController ที่เราสร้างขึ้นมา จะมี Stream อยู่ข้างใน เราสามารถดึงออกมาใช้งานได้ด้วยการสั่ง .stream ตรงๆ เลย

Stream stream = controller.stream;
stream.listen((value) {
    print('value: $value');
});

//หรือ

controller.stream.listen((value) {
    print('value: $value');
});

แล้วข้อมูลจะมาจากไหน? ใช้คำสั่ง add ยังไงล่ะ

StreamController นั้นไม่ได้เป็นฟังก์ชัน ดังนั้นมันจะไม่มีโลจิคการสร้างข้อมูลอะไรอยู่กับตัวมันทั้งนั้น!

StreamController เป็นแค่ตัวกลางสำหรับใส่ข้อมูลเข้า Stream เท่านั้น

import 'dart:async';

var controller = StreamController<int>();

void main() {
    controller.stream.listen((x) {
        print(x);
    });

    ...
}

void addNumberToStream(int x) {
    controller.add(x);
}

จากโค้ดด้านบน เรามีการสร้าง StreamController ขึ้นมา 1 ตัว แล้วจัดการ listen() มันใน main (listen ไว้ก่อน ยังไม่มีข้อมูลอะไรเข้ามาทั้งนั้น)

ทีนี้ เราก็ไปสร้างฟังก์ชันอีกตัวหนึ่งชื่อ addNumberToStream(int x) ซึ่งจะมีการเพิ่มค่าเข้า Stream ผ่านคำสั่ง add()

เมื่อฟังก์ชัน addNumberToStream(int x) ถูกเรียก StreamController ก็จะส่งค่านั้นไปให้ listener ที่รอค่าอยู่นั่นเอง

เราสามารถส่งข้อมูลกี่ครั้งก็ได้ผ่านคำสั่ง add()

ส่วนใหญ่ use case ที่เราต้องใช้ StreamController แทนการสร้างฟังก์ชัน async* แบบธรรมดาจะเป็นการเขียนโปรแกรมแบบ Event-Driving เพราะเราไม่รู้ว่าข้อมูลจะเข้ามาเมื่อไหร่ ตอนไหนที่ผู้ใช้จะกดปุ่ม อะไรแบบนั้น เราเลยสร้างฟังก์ชันเตรียมไว้ก่อนไม่ได้ ทำได้แค่ให้ controller คอยส่งข้อมูลมาให้เมื่อเกิด Event เท่านั้น

คำเตือน! StreamController มักจะไม่รู้จุดสิ้นสุด!

การใช้งาน Stream ธรรมดา เราสามารถเปลี่ยนจากการใช้ callback ด้วย listen() ไปใช้ await เพื่อเปลี่ยนโค้ดให้อยู่ในรูปแบบ sync แทนได้

Stream<int> numberStream = getNumberStream();

await for(var number in numberStream) {
    print('Receive: $number');
}

ซึ่งมีข้อควรระวังในการใช้ await คือ Stream ตัวนั้นจะต้องเป็น "Finite Stream" หรือสตรีมที่รู้ว่ามีจุดสิ้นสุดแน่ๆ !

ไม่งั้นโค้ดของเราจะถูก await บล็อกการทำงานอยู่ที่ลูปนั้น ไปต่อไม่ได้ เช่น

import 'dart:async';

var controller = StreamController<void>();

void main() {

    print('start');

    await for(var _ in controller.stream) {
        print('clicked!');
    }

    print('end');

    ...
}

void onClick() {
    controller.add(null);
}

ในกรณีแบบนี้ โค้ดจะถูกบล็อกอยู่ที่ลูป (โปรแกรมจะไม่หลุดไปยังคำสั่งปริ้น end เลย) เพราะไม่มีการปิดสตรีม

ดังนั้นถ้าฟังก์ชันเรามีคำสั่งให้ทำอะไรบ้างอย่างต่อไป เราไม่ควรเขียนมันในรูป await นะ ใช้ listen() จะดีกว่า

ยังมี addError และ close อีกนะ

นอกจากการ add ข้อมูลเข้า Stream แล้วเรายังสามารถสั่งอีก 2 คำสั่งกับคอรโทรเลอร์ได้ คือการโยน Exception หรือสั่งปิด Stream นั่นเอง

import 'dart:async';

var controller = StreamController<int>();

void main() {
    controller.stream.listen(
        (x) {
            print(x);
        },
        onDone: () {
            print('จบแล้วจ้า!');
        },
        onError: (e){
            print('error is $e');
        },
        cancelOnError: false,
    );

    print('start');
    controller.add(1);
    controller.add(2);
    controller.add(3);
    controller.addError(Exception('ตู้มมม'));
    controller.add(4);
    controller.add(5);
    controller.close();
    print('end');
}

output:

start
end
1
2
3
error is Exception: ตู้มมม
4
5
จบแล้วจ้า!

การสั่ง addError ไม่ได้ทำให้ Stream หยุดทำงานนะ ไม่งั้นต้องเพิ่มอ็อบชัน cancelOnError: true ลงไปด้วย (ถ้าไม่เซ็ต ดีฟอลต์จะเป็น false คือถึงเจอ Exception ฉันก็ไม่หยุดนะ)

ข้อสังเกตอีกอย่างคือ listen() นั้นเป็น Isolates นะ มันจะปริ้น start และ end (อยู่ใน Isolates ของ ฟังก์ชัน main) ก่อน

ใครงงว่า Isolates คืออะไร ย้อนกลับไปอ่านบทความแรกของซีรีส์นี้อีกทีนะ

สุดท้าย การใช้งาน StreamController นั้นมีอีกเรื่องที่ต้องระวัง คือ...

Stream อาจจะไม่ได้ถูก subscription ในทันที

ลองดูตัวอย่างโค้ดนี้

Stream<int> counter1(int maxCount) {
    var controller = StreamController<int>();
    int counter = 0;

    Timer.periodic(Duration(seconds: 1), (Timer timer) {
        counter++;
        controller.add(counter);
        if (counter >= maxCount) {
            timer.cancel();
            controller.close();
        }
    });

    return controller.stream;
}

เราตั้งให้ฟังก์ชัน counter1 ทำการ add ตัวเลขเข้าไปเรื่อยๆ ทุกๆ 1 วินาที (ฟังก์ชัน Timer.periodic() เอาไว้สำหรับสั่งงานที่ทำงานซ้ๆ กันทุกๆ ช่วงเวลาหนึ่ง คล้ายๆ กับ setInterval ใน JavaScript)

เวลาใช้งาน ก็ใช้ listen() ตามปกติหรือใช้ for loop แล้วเติม await ลงไปก็ได้

void main() {
    //start!
    var counterStream = counter1(5);

    //listen
    counterStream.listen((n){
        print(n);
    });
}

แบบนี้ถ้าเอาไปรัน ก็จะได้ output เป็นเลข 1, 2, 3, 4, 5 ตัวใน 5 วินาที ... อันนี้มาตราฐาน ตามคอมมอนเซ้นส์ ไม่มีอะไรแปลก

แต่ถ้าเรามีการสั่ง delay ก่อนที่เราจะ listen() แบบนี้

void main() {
    //start!
    var counterStream = counter1(5);

    //waiting 2 sec.
    await Future.delayed(const Duration(seconds: 2));

    //listen
    counterStream.listen((n){
        print(n);
    });
}

คำถามคือถ้าเขียนโค้ดแบบนี้ เราคาดหวังว่า output จะเป็นยังไง?

  • โปรแกรมก็น่าจะหยุดรอ 2 วินาที
  • แล้วก็ค่อยๆ ปริ้นเลข 1, 2, 3, 4, 5 ทีละวิ

แต่มันไม่ใช่แบบนั้นน่ะสิ!

การทำงานของโปรแกรมคือหลังจาก delay 2 วินาทีแล้ว มันจะทำการปริ้นเลข 1, 2 ออกมาในทันที (ดูรูปข้างล่างประกอบนะ)

สำหรับการใช้งาน Stream นั้นมันจะทำงานทันที แม้ว่าจะยังไม่มีใครมารอ listen มันเลยก็ตาม

แบบในเคสนี้ เราเรียกฟังก์ชัน counter1() ให้ทำงาน แล้วทำการ delay มันซะ 2 วินาทีแล้วค่อย listen แต่ตัว counter นั้นดันทำงานก่อนไปแล้ว ไม่ได้เริ่มทำงานตอนที่เรา listen

แต่ถ้าเราต้องการให้มันทำงานเมื่อสั่ง listen จังหวะนั้นเลย (แบบ counter2 ในรูปข้างบน) เราจะต้องเซ็ตค่าตอนสร้าง StreamController

ในการสร้าง StreamController เราสามารถกำหนดอ็อบชันเพิ่มได้คือ

  • onListen
  • onPause
  • onResume
  • onCancel

วิธีการก็คือเราจะต้องสร้าง Stream ที่ตอนแรกยังไม่เริ่มนับ counter แล้วสั่งให้มันเริ่มนับตอน onListen นั่นเอง

Stream<int> counter2(int maxCount) {
    StreamController<int> controller;
    Timer timer;
    int counter = 0;

    //ฟังก์ชันสำหรับเริ่มรับเลข
    void startTimer() {
        timer = Timer.periodic(
            Duration(seconds: 1), 
            (_) {
                counter++;
                controller.add(counter);
                if (counter == maxCount) {
                    timer.cancel();
                    controller.close();
                }
            },
        );
    }

    //ฟังก์ชันสำหรับหยุดนับเลข
    void stopTimer() {
        if (timer != null) {
            timer.cancel();
            timer = null;
        }
    }

    controller = StreamController<int>(

        //เริ่มทำงาน startTimer() เมื่อ listen นะ
        onListen: startTimer,

        onPause: stopTimer,
        onResume: startTimer,
        onCancel: stopTimer,
    );

    return controller.stream;
}
12 Total Views 3 Views Today
Ta

Ta

สิ่งมีชีวิตตัวอ้วนๆ กลมๆ เคลื่อนที่ไปไหนโดยการกลิ้ง .. ถนัดการดำรงชีวิตโดยไม่โดนแสงแดด
ปัจจุบันเป็น Senior Software Engineer อยู่ที่ Centrillion Technology
งานอดิเรกคือ เขียนโปรแกรม อ่านหนังสือ เขียนบทความ วาดรูป และ เล่นแบดมินตัน

You may also like...