https://www.didierboelens.com/2018/08/reactive-programming-streams-bloc/
Flutter中如何利用StreamBuilder和BLoC来控制Widget状态
Why use RxDart and how we can use with BLoC Pattern in Flutter?
In order to easily visualize the notion of Stream, simply consider a pipe with 2 ends, only one allowing to insert something into it. When you insert something into the pipe, it flows inside the pipe and goes out by the other end.
In Flutter,
When you need to be notified that something is conveyed by a Stream, you simply need to listen to the stream property of the StreamController.
When you define a listener, you receive a StreamSubscription object. This is via that StreamSubscription object that you will be notified that something happens at the level of the Stream.
As soon as there is at least one active listener, the Stream starts generating events to notify the active StreamSubscription object(s) each time:
The StreamSubscription object also allows you to:
No, a Stream also allows to process the data that flows inside it before it goes out.
To control the processing of the data inside a Stream, we use a StreamTransformer, which is nothing but
You will directly understand from this statement that it is very possible to use several StreamTransformers in sequence.
A StreamTransformer may be used to do any type of processing, such as, e.g.:
There are 2 types of Streams.
This type of Stream only allows a single listener during the whole lifetime of that Stream.
It is not possible to listen twice on such Stream, even after the first subscription has been canceled.
This second type of Stream allows any number of listeners.
It is possible to add a listener to a Broadcast Stream at any moment. The new listener will receive the events, as of the moment it starts listening to the Stream.
It is a very good practice to always release the resources which are no longer necessary.
This statement applies to:
import 'dart:async';
void main() {
// 初始化一个单订阅的Stream controller
final StreamController ctrl = StreamController();
// 初始化一个监听
final StreamSubscription subscription = ctrl.stream.listen((data) => print('$data'));
// 往Stream中添加数据
ctrl.sink.add('my name');
ctrl.sink.add(1234);
ctrl.sink.add({'a': 'element A', 'b': 'element B'});
ctrl.sink.add(123.45);
// StreamController用完后需要释放
ctrl.close();
}
StreamBuilder其实是一个StatefulWidget,它通过监听Stream,发现有数据输出时,自动重建,调用builder方法。
StreamBuilder<T>(
key: ...可选...
stream: ...需要监听的stream...
initialData: ...初始数据,否则为空...
builder: (BuildContext context, AsyncSnapshot<T> snapshot){
if (snapshot.hasData){
return ...基于snapshot.hasData返回的控件
}
return ...没有数据的时候返回的控件
},
)
下面是一个模仿官方自带demo“计数器”的一个例子,使用了StreamBuilder,而不需要任何setState:
import 'dart:async';
import 'package:flutter/material.dart';
class CounterPage extends StatefulWidget {
@override
_CounterPageState createState() => _CounterPageState();
}
class _CounterPageState extends State<CounterPage> {
int _counter = 0;
final StreamController<int> _streamController = StreamController<int>();
@override
void dispose(){
_streamController.close();
super.dispose();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('Stream version of the Counter App')),
body: Center(
child: StreamBuilder<int>( // 监听Stream,每次值改变的时候,更新Text中的内容
stream: _streamController.stream,
initialData: _counter,
builder: (BuildContext context, AsyncSnapshot<int> snapshot){
return Text('You hit me: ${snapshot.data} times');
}
),
),
floatingActionButton: FloatingActionButton(
child: const Icon(Icons.add),
onPressed: (){
// 每次点击按钮,更加_counter的值,同时通过Sink将它发送给Stream;
// 每注入一个值,都会引起StreamBuilder的监听,StreamBuilder重建并刷新counter
_streamController.sink.add(++_counter);
},
),
);
}
}
classDiagram
class EventSink~T~ {
add(T event)void
addError(Object error, [StackTrace? stackTrace])void
close()void
}
class StreamConsumer~S~ {
addStream(Stream<S> stream)Future
close()Future
}
class StreamSink~T~ {
close()Future
Future done;
}
class StreamController~T~ {
Stream<T> stream
StreamSink<T> sink
}
class Stream~T~ {
listen(void onData(T event)?, Function? onError, void onDone()?, bool? cancelOnError)StreamSubscription~T~
}
class _StreamControllerBase~T~ {
}
class _BroadcastStreamController~T~ {
Function onListen
Function onCancel
Stream<T> stream => _BroadcastStream<T>(this);
}
class _AsyncBroadcastStreamController~T~ {
_sendData(T data)void
_sendError(Object error, StackTrace stackTrace)void
_sendDone()void
}
class _StreamImpl~T~ {
listen(void onData(T event)?, ...)StreamSubscription~T~
}
class _ControllerStream~T~ {
_StreamControllerLifecycle<T> _controller;
_createSubscription(void onData(T event)?, ...)StreamSubscription~T~
}
class _BroadcastStream~T~ {
isBroadcast => true;
}
StreamSink<|--StreamController
EventSink<|--StreamSink
StreamConsumer<|--StreamSink
StreamController<|--_StreamControllerBase
_StreamControllerBase<|--_BroadcastStreamController
_BroadcastStreamController<|--_AsyncBroadcastStreamController
Stream<|--_StreamImpl
_StreamImpl<|--_ControllerStream
_ControllerStream<|--_BroadcastStream
class _ForwardingStream~S, T~ {
Stream<S> _source
listen(void onData(T event)?, ...)StreamSubscription~T~
_handleData(S data)void
}
class _MapStream~S, T~ {
_Transformation<S, T> _transform
_handleData(S inputEvent, _EventSink<T> sink)void
}
Stream<|--_ForwardingStream
_ForwardingStream<|--_MapStream
class StreamView~T~ {
Stream<T> _stream;
}
Stream<|--StreamView
class Subject~T~ {
StreamController<T> _controller
}
StreamController<|--Subject
StreamView<|--Subject
classDiagram
class StreamSubscription~T~ {
onData(void handleData(T data)?)void
onError(Function? handleError)void
onDone(void handleDone()?)void
resume()void
pause([Future<void>? resumeSignal])void
}
class _EventDispatch~T~ {
_sendData(T data)void
_sendError(Object error, StackTrace stackTrace)void
_sendDone()void
}
class _BufferingStreamSubscription~T~ {
_DataHandler<T> _onData;
Function _onError;
_DoneHandler _onDone;
}
class _EventSink~T~ {
_add(T data)void
_addError(Object error, StackTrace stackTrace)void
_close()void
}
class _ForwardingStreamSubscription~S, T~ {
_ForwardingStream<S, T> _stream;
StreamSubscription<S>? _subscription;
_add(T data)void
}
StreamSubscription<|--_BufferingStreamSubscription
_EventSink<|--_BufferingStreamSubscription
_EventDispatch<|--_BufferingStreamSubscription
_BufferingStreamSubscription<|--_ForwardingStreamSubscription
graph LR
Stream.listen-->|_subscribe, call|onListen_Callback
/// The [onListen] callback is called when the first listener is subscribed,
/// and the [onCancel] is called when there are no longer any active listeners.
/// If a listener is added again later, after the [onCancel] was called,
/// the [onListen] will be called again.
factory StreamController.broadcast(
{void onListen()?, void onCancel()?, bool sync = false}) {
return sync
? _SyncBroadcastStreamController<T>(onListen, onCancel)
: _AsyncBroadcastStreamController<T>(onListen, onCancel);
}
Stream<S> map<S>(S convert(T event)) {
return new _MapStream<T, S>(this, convert);
}
graph LR
subgraph 事件订阅
subscribe("类似RxJava设计,subscribe/listen时持有上游Stream对象进行订阅,并包裹observer/subcription")
end
subscribe-->consume
subgraph 事件消费
consume("在消费事件时能够提前拦截并按照操作符的含义进行操作,之后转发给下游OnData")
end
RxJava核心架构图如下:
/// A stream pipe that converts data events before passing them on.
class _MapStream<S, T> extends _ForwardingStream<S, T> {
final _Transformation<S, T> _transform;
_MapStream(Stream<S> source, T transform(S event))
: this._transform = transform,
super(source);
void _handleData(S inputEvent, _EventSink<T> sink) {
T outputEvent;
try {
outputEvent = _transform(inputEvent);
} catch (e, s) {
_addErrorWithReplacement(sink, e, s);
return;
}
sink._add(outputEvent);
}
}
abstract class _ForwardingStream<S, T> extends Stream<T> {
final Stream<S> _source;
_ForwardingStream(this._source);//_source是上游Stream对象,用来进行订阅
bool get isBroadcast => _source.isBroadcast;
StreamSubscription<T> listen(void onData(T value)?,
{Function? onError, void onDone()?, bool? cancelOnError}) {//开始事件订阅过程
return _createSubscription(onData, onError, onDone, cancelOnError ?? false);
}
StreamSubscription<T> _createSubscription(void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError) {
return new _ForwardingStreamSubscription<S, T>(//main
this, onData, onError, onDone, cancelOnError);
}
// Override the following methods in subclasses to change the behavior.
void _handleData(S data, _EventSink<T> sink);
void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) {
sink._addError(error, stackTrace);
}
void _handleDone(_EventSink<T> sink) {
sink._close();
}
}
/// Abstract superclass for subscriptions that forward to other subscriptions.
class _ForwardingStreamSubscription<S, T>
extends _BufferingStreamSubscription<T> {
final _ForwardingStream<S, T> _stream;
StreamSubscription<S>? _subscription;
_ForwardingStreamSubscription(this._stream, void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError)
: super(onData, onError, onDone, cancelOnError) {//传入父类的是onData,用于transform后进行_EventSink的_add听通知
_subscription = _stream._source//用上游Stream对象进行订阅
.listen(_handleData, onError: _handleError, onDone: _handleDone);//将_handleData方法作为onData传入,方便回调时拦截
}
void _add(T data) {//开始消费事件过程
if (_isClosed) return;
super._add(data);
}
// Methods used as listener on source subscription.
void _handleData(S data) {
_stream._handleData(data, this);//转发到_MapStream._handleData,类似RxJava的设计,在subscribe/listen时配置observer/subcription,进而在消费事件时能够提前拦截并按照操作符的含义进行操作
}
void _handleError(error, StackTrace stackTrace) {
_stream._handleError(error, stackTrace, this);
}
void _handleDone() {
_stream._handleDone(this);
}
}
class _BufferingStreamSubscription<T>
implements StreamSubscription<T>, _EventSink<T>, _EventDispatch<T> {
_BufferingStreamSubscription(void onData(T data)?, Function? onError,
void onDone()?, bool cancelOnError)
: this.zoned(Zone.current, onData, onError, onDone, cancelOnError);
_BufferingStreamSubscription.zoned(this._zone, void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError)
: _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0),
_onData = _registerDataHandler<T>(_zone, onData),
_onError = _registerErrorHandler(_zone, onError),
_onDone = _registerDoneHandler(_zone, onDone);
void _add(T data) {
if (_canFire) {
_sendData(data);
} else {
_addPending(new _DelayedData<T>(data));
}
}
void _sendData(T data) {
_zone.runUnaryGuarded(_onData, data);//执行_onData配置的方法,也就是构造方法传入的onData
}
static void Function(T) _registerDataHandler<T>(
Zone zone, void Function(T)? handleData) {
return zone.registerUnaryCallback<void, T>(handleData ?? _nullDataHandler);
}