Stream

Stream/BLoC

https://www.didierboelens.com/2018/08/reactive-programming-streams-bloc/

Flutter中如何利用StreamBuilder和BLoC来控制Widget状态

Why use RxDart and how we can use with BLoC Pattern in Flutter?

image

What is a Stream?

Introduction

In order to easily visualize the notion of Stream, simply consider a pipe with 2 ends, only one allowing to insert something into it. When you insert something into the pipe, it flows inside the pipe and goes out by the other end.

In Flutter,

  • the pipe is called a Stream
  • to control the Stream, we usually(*) use a StreamController
  • to insert something into the Stream, the StreamController exposes the “entrance", called a StreamSink, accessible via the sink property
  • the way out of the Stream, is exposed by the StreamController via the stream property

How do I know that something is conveyed by a Stream?

When you need to be notified that something is conveyed by a Stream, you simply need to listen to the stream property of the StreamController.

When you define a listener, you receive a StreamSubscription object. This is via that StreamSubscription object that you will be notified that something happens at the level of the Stream.

As soon as there is at least one active listener, the Stream starts generating events to notify the active StreamSubscription object(s) each time:

  • some data goes out from the stream,
  • when some error has been sent to the stream,
  • when the stream is closed.

The StreamSubscription object also allows you to:

  • stop listening,
  • pause,
  • resume.

Is a Stream only a simple pipe?

No, a Stream also allows to process the data that flows inside it before it goes out.

To control the processing of the data inside a Stream, we use a StreamTransformer, which is nothing but

  • a function that “captures” the data that flows inside the Stream
  • does something with the data
  • the outcome of this transformation is also a Stream

You will directly understand from this statement that it is very possible to use several StreamTransformers in sequence.

A StreamTransformer may be used to do any type of processing, such as, e.g.:

  • filtering: to filter the data based on any type of condition,
  • regrouping: to regroup data,
  • modification: to apply any type of modification to the data,
  • inject data to other streams,
  • buffering,
  • processing: do any kind of action/operation based on the data,

Types of Streams

There are 2 types of Streams.

Single-subscription Streams

This type of Stream only allows a single listener during the whole lifetime of that Stream.

It is not possible to listen twice on such Stream, even after the first subscription has been canceled.

Broadcast Streams

This second type of Stream allows any number of listeners.

It is possible to add a listener to a Broadcast Stream at any moment. The new listener will receive the events, as of the moment it starts listening to the Stream.

Important note about the Resources

It is a very good practice to always release the resources which are no longer necessary.

This statement applies to:

  • StreamSubscription - when you no longer need to listen to a stream, cancel the subscription;
  • StreamController - when you no longer need a StreamController, close it;
  • the same applies to RxDart Subjects, when you no longer need a BehaviourSubject, a PublishSubject…, close it.

Usage

StreamController

import 'dart:async';

void main() {
  // 初始化一个单订阅的Stream controller
  final StreamController ctrl = StreamController();
  
  // 初始化一个监听
  final StreamSubscription subscription = ctrl.stream.listen((data) => print('$data'));

  // 往Stream中添加数据
  ctrl.sink.add('my name');
  ctrl.sink.add(1234);
  ctrl.sink.add({'a': 'element A', 'b': 'element B'});
  ctrl.sink.add(123.45);
  
  // StreamController用完后需要释放
  ctrl.close();
}

StreamBuilder

StreamBuilder其实是一个StatefulWidget,它通过监听Stream,发现有数据输出时,自动重建,调用builder方法。

StreamBuilder<T>(
    key: ...可选...
    stream: ...需要监听的stream...
    initialData: ...初始数据,否则为空...
    builder: (BuildContext context, AsyncSnapshot<T> snapshot){
        if (snapshot.hasData){
            return ...基于snapshot.hasData返回的控件
        }
        return ...没有数据的时候返回的控件
    },
)

下面是一个模仿官方自带demo“计数器”的一个例子,使用了StreamBuilder,而不需要任何setState:


import 'dart:async';
import 'package:flutter/material.dart';

class CounterPage extends StatefulWidget {
  @override
  _CounterPageState createState() => _CounterPageState();
}

class _CounterPageState extends State<CounterPage> {
  int _counter = 0;
  final StreamController<int> _streamController = StreamController<int>();

  @override
  void dispose(){
    _streamController.close();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text('Stream version of the Counter App')),
      body: Center(
        child: StreamBuilder<int>(  // 监听Stream,每次值改变的时候,更新Text中的内容
          stream: _streamController.stream,
          initialData: _counter,
          builder: (BuildContext context, AsyncSnapshot<int> snapshot){
            return Text('You hit me: ${snapshot.data} times');
          }
        ),
      ),
      floatingActionButton: FloatingActionButton(
        child: const Icon(Icons.add),
        onPressed: (){
          // 每次点击按钮,更加_counter的值,同时通过Sink将它发送给Stream;
          // 每注入一个值,都会引起StreamBuilder的监听,StreamBuilder重建并刷新counter
          _streamController.sink.add(++_counter);
        },
      ),
    );
  }
}

Source

classDiagram类结构

classDiagram

class EventSink~T~ {
 add(T event)void
 addError(Object error, [StackTrace? stackTrace])void
 close()void
}

class StreamConsumer~S~ {
addStream(Stream<S> stream)Future
close()Future
}

class StreamSink~T~ {
 close()Future
 Future done;
}

class StreamController~T~ {
 Stream<T> stream
 StreamSink<T> sink
}

class Stream~T~ {
   listen(void onData(T event)?, Function? onError, void onDone()?, bool? cancelOnError)StreamSubscription~T~
}

class _StreamControllerBase~T~ {
 
}

class _BroadcastStreamController~T~ {
 Function onListen
 Function onCancel
 Stream<T> stream => _BroadcastStream<T>(this);

}

class _AsyncBroadcastStreamController~T~ {
 _sendData(T data)void
 _sendError(Object error, StackTrace stackTrace)void
 _sendDone()void
}

class _StreamImpl~T~ {
 listen(void onData(T event)?, ...)StreamSubscription~T~
}

class _ControllerStream~T~ {
  _StreamControllerLifecycle<T> _controller;
 _createSubscription(void onData(T event)?, ...)StreamSubscription~T~
}
class _BroadcastStream~T~ {
 isBroadcast => true;
}

StreamSink<|--StreamController
EventSink<|--StreamSink
StreamConsumer<|--StreamSink
StreamController<|--_StreamControllerBase

_StreamControllerBase<|--_BroadcastStreamController
_BroadcastStreamController<|--_AsyncBroadcastStreamController
Stream<|--_StreamImpl
_StreamImpl<|--_ControllerStream
_ControllerStream<|--_BroadcastStream

class _ForwardingStream~S, T~ {
Stream<S> _source
listen(void onData(T event)?, ...)StreamSubscription~T~
_handleData(S data)void
}

class _MapStream~S, T~ {
_Transformation<S, T> _transform
_handleData(S inputEvent, _EventSink<T> sink)void
}
Stream<|--_ForwardingStream
_ForwardingStream<|--_MapStream


class StreamView~T~ {
Stream<T> _stream;
}

Stream<|--StreamView

class Subject~T~ {
StreamController<T> _controller
}
StreamController<|--Subject
StreamView<|--Subject
classDiagram
class StreamSubscription~T~ {
   onData(void handleData(T data)?)void
   onError(Function? handleError)void
   onDone(void handleDone()?)void
   resume()void
   pause([Future<void>? resumeSignal])void
}
class _EventDispatch~T~ {
  _sendData(T data)void
  _sendError(Object error, StackTrace stackTrace)void
  _sendDone()void
}
class _BufferingStreamSubscription~T~ {
  _DataHandler<T> _onData;
   Function _onError;
  _DoneHandler _onDone;
}

class _EventSink~T~ {
  _add(T data)void
  _addError(Object error, StackTrace stackTrace)void
  _close()void
}

class _ForwardingStreamSubscription~S, T~ {
_ForwardingStream<S, T> _stream;
StreamSubscription<S>? _subscription;
_add(T data)void
}

StreamSubscription<|--_BufferingStreamSubscription
_EventSink<|--_BufferingStreamSubscription
_EventDispatch<|--_BufferingStreamSubscription
_BufferingStreamSubscription<|--_ForwardingStreamSubscription
graph LR
Stream.listen-->|_subscribe, call|onListen_Callback

StreamController.broadcast

  /// The [onListen] callback is called when the first listener is subscribed,
  /// and the [onCancel] is called when there are no longer any active listeners.
  /// If a listener is added again later, after the [onCancel] was called,
  /// the [onListen] will be called again.
factory StreamController.broadcast(
    {void onListen()?, void onCancel()?, bool sync = false}) {
  return sync
      ? _SyncBroadcastStreamController<T>(onListen, onCancel)
      : _AsyncBroadcastStreamController<T>(onListen, onCancel);
}

Stream.map

Stream<S> map<S>(S convert(T event)) {
  return new _MapStream<T, S>(this, convert);
}

原理总结

graph LR
subgraph 事件订阅
subscribe("类似RxJava设计,subscribe/listen时持有上游Stream对象进行订阅,并包裹observer/subcription")
end
subscribe-->consume
subgraph 事件消费
consume("在消费事件时能够提前拦截并按照操作符的含义进行操作,之后转发给下游OnData")
end

RxJava核心架构图如下:

图片

_MapStream<S, T>

/// A stream pipe that converts data events before passing them on.
class _MapStream<S, T> extends _ForwardingStream<S, T> {
  final _Transformation<S, T> _transform;

  _MapStream(Stream<S> source, T transform(S event))
      : this._transform = transform,
        super(source);

  void _handleData(S inputEvent, _EventSink<T> sink) {
    T outputEvent;
    try {
      outputEvent = _transform(inputEvent);
    } catch (e, s) {
      _addErrorWithReplacement(sink, e, s);
      return;
    }
    sink._add(outputEvent);
  }
}
_ForwardingStream
abstract class _ForwardingStream<S, T> extends Stream<T> {
  final Stream<S> _source;

  _ForwardingStream(this._source);//_source是上游Stream对象,用来进行订阅

  bool get isBroadcast => _source.isBroadcast;

  StreamSubscription<T> listen(void onData(T value)?,
      {Function? onError, void onDone()?, bool? cancelOnError}) {//开始事件订阅过程
    return _createSubscription(onData, onError, onDone, cancelOnError ?? false);
  }

  StreamSubscription<T> _createSubscription(void onData(T data)?,
      Function? onError, void onDone()?, bool cancelOnError) {
    return new _ForwardingStreamSubscription<S, T>(//main
        this, onData, onError, onDone, cancelOnError);
  }
  
    // Override the following methods in subclasses to change the behavior.
  void _handleData(S data, _EventSink<T> sink);

  void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) {
    sink._addError(error, stackTrace);
  }

  void _handleDone(_EventSink<T> sink) {
    sink._close();
  }
}

_ForwardingStreamSubscription

/// Abstract superclass for subscriptions that forward to other subscriptions.
class _ForwardingStreamSubscription<S, T>
    extends _BufferingStreamSubscription<T> {
  final _ForwardingStream<S, T> _stream;

  StreamSubscription<S>? _subscription;

  _ForwardingStreamSubscription(this._stream, void onData(T data)?,
      Function? onError, void onDone()?, bool cancelOnError)
      : super(onData, onError, onDone, cancelOnError) {//传入父类的是onData,用于transform后进行_EventSink的_add听通知
    _subscription = _stream._source//用上游Stream对象进行订阅
        .listen(_handleData, onError: _handleError, onDone: _handleDone);//将_handleData方法作为onData传入,方便回调时拦截
  }
  
    void _add(T data) {//开始消费事件过程
    if (_isClosed) return;
    super._add(data);
  }
  
    // Methods used as listener on source subscription.
  void _handleData(S data) {
    _stream._handleData(data, this);//转发到_MapStream._handleData,类似RxJava的设计,在subscribe/listen时配置observer/subcription,进而在消费事件时能够提前拦截并按照操作符的含义进行操作
  }

  void _handleError(error, StackTrace stackTrace) {
    _stream._handleError(error, stackTrace, this);
  }

  void _handleDone() {
    _stream._handleDone(this);
  }
}
class _BufferingStreamSubscription<T>
    implements StreamSubscription<T>, _EventSink<T>, _EventDispatch<T> {
  
    _BufferingStreamSubscription(void onData(T data)?, Function? onError,
      void onDone()?, bool cancelOnError)
      : this.zoned(Zone.current, onData, onError, onDone, cancelOnError);

  _BufferingStreamSubscription.zoned(this._zone, void onData(T data)?,
      Function? onError, void onDone()?, bool cancelOnError)
      : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0),
        _onData = _registerDataHandler<T>(_zone, onData),
        _onError = _registerErrorHandler(_zone, onError),
        _onDone = _registerDoneHandler(_zone, onDone);


  void _add(T data) {
    if (_canFire) {
      _sendData(data);
    } else {
      _addPending(new _DelayedData<T>(data));
    }
  }
  
  void _sendData(T data) {
    _zone.runUnaryGuarded(_onData, data);//执行_onData配置的方法,也就是构造方法传入的onData
  }
  
static void Function(T) _registerDataHandler<T>(
    Zone zone, void Function(T)? handleData) {
  return zone.registerUnaryCallback<void, T>(handleData ?? _nullDataHandler);
}