graph TB
EventChannel-->|baseOn|MethodChannel
EventChannel-->|baseOn|Stream
sequenceDiagram
rect rgb(199, 237, 204)
Dart->>Dart: stream.listen
activate Dart
Dart->>Platform: methodChannel.invokeMethod(listen)
deactivate Dart
activate Platform
Platform->>Platform: onListen
activate Platform
Platform->>Platform: handler.onListen(EventSink)
deactivate Platform
deactivate Platform
end
rect rgb(199, 237, 204)
Platform->>Platform: eventSink.success
activate Platform
Platform->>Dart: messenger.send
deactivate Platform
activate Dart
Dart->>Dart: StreamController.add(T event)
deactivate Dart
end
demo参考/Users/qianpianpian/git/flutter/samples/platform_channels/lib/src/event_channel_demo.dart
使用StreamBuilder包裹stream
child: StreamBuilder<AccelerometerReadings>(
stream: Accelerometer.readings,//stream
builder: (context, snapshot) {
if (snapshot.hasError) {
return Text((snapshot.error as PlatformException).message);
} else if (snapshot.hasData) {
return Column(
mainAxisAlignment: MainAxisAlignment.center,
children: [
Text(
'x axis: ' + snapshot.data.x.toStringAsFixed(3),
style: textStyle,
),
Text(
'y axis: ' + snapshot.data.y.toStringAsFixed(3),
style: textStyle,
),
Text(
'z axis: ' + snapshot.data.z.toStringAsFixed(3),
style: textStyle,
)
],
);
}
/// This class includes the implementation for [EventChannel] to listen to value
/// changes from the Accelerometer sensor from native side. It has a [readings]
/// getter to provide a stream of [AccelerometerReadings].
class Accelerometer {
static final _eventChannel = const EventChannel('eventChannelDemo');
/// Method responsible for providing a stream of [AccelerometerReadings] to listen
/// to value changes from the Accelerometer sensor.
static Stream<AccelerometerReadings> get readings {
return _eventChannel.receiveBroadcastStream().map(
(dynamic event) => AccelerometerReadings(
event[0] as double,
event[1] as double,
event[2] as double,
),
);
}
}
_eventChannel.receiveBroadcastStream().listen((event) {
{
print("NativeNetworkApi it = ${event}");
}
});
EventChannel(flutterEngine.dartExecutor, "eventChannelDemo")
.setStreamHandler(AccelerometerStreamHandler(sensorManger, accelerometerSensor))
class AccelerometerStreamHandler(sManager: SensorManager, s: Sensor) : EventChannel.StreamHandler, SensorEventListener {
private val sensorManager: SensorManager = sManager
private val accelerometerSensor: Sensor = s
private lateinit var eventSink: EventChannel.EventSink
override fun onListen(arguments: Any?, events: EventChannel.EventSink?) {
if (events != null) {
eventSink = events//main
sensorManager.registerListener(this, accelerometerSensor, SensorManager.SENSOR_DELAY_UI)
}
}
override fun onCancel(arguments: Any?) {
sensorManager.unregisterListener(this)
}
override fun onAccuracyChanged(sensor: Sensor?, accuracy: Int) {}
override fun onSensorChanged(sensorEvent: SensorEvent?) {
if (sensorEvent != null) {
val axisValues = listOf(sensorEvent.values[0], sensorEvent.values[1], sensorEvent.values[2])
eventSink.success(axisValues)//main
}
}
}
EventChannel(FlutterEngineCache.getInstance().get(FlutterBoost.ENGINE_ID)?.dartExecutor, networkEventChannel)
.setStreamHandler(object : EventChannel.StreamHandler {
override fun onListen(arguments: Any?, events: EventChannel.EventSink?) {
eventSink = events//main
}
override fun onCancel(arguments: Any?) {
eventSink = null
}
})
Messages.NativeNetworkApi.setup(FlutterEngineCache.getInstance().get(FlutterBoost.ENGINE_ID)?.dartExecutor, object : Messages.NativeNetworkApi {
override fun request(arg: Messages.RequestParams?, result: Messages.Result<Messages.Resource>?) {
eventSink?.success(mockNetworkResource("testEventChannel1").toMap())//main
eventSink?.success(mockNetworkResource("testEventChannel2").toMap())
eventSink?.success(mockNetworkResource("testEventChannel3").toMap())
eventSink?.success(mockNetworkResource("testEventChannel4").toMap())
}
})
public final class EventChannel {
private final BinaryMessenger messenger;
private final String name;
private final MethodCodec codec;
}
public EventChannel(BinaryMessenger messenger, String name) {
this(messenger, name, StandardMethodCodec.INSTANCE);
}
public EventChannel(BinaryMessenger messenger, String name, MethodCodec codec) {
this.messenger = messenger;
this.name = name;
this.codec = codec;
}
@UiThread
public void setStreamHandler(final StreamHandler handler) {
messenger.setMessageHandler(
name, handler == null ? null : new IncomingStreamRequestHandler(handler));
}
private final class IncomingStreamRequestHandler implements BinaryMessageHandler {
private final StreamHandler handler;
private final AtomicReference<EventSink> activeSink = new AtomicReference<>(null);
IncomingStreamRequestHandler(StreamHandler handler) {
this.handler = handler;
}
@Override
public void onMessage(ByteBuffer message, final BinaryReply reply) {
final MethodCall call = codec.decodeMethodCall(message);
if (call.method.equals("listen")) {
onListen(call.arguments, reply);
} else if (call.method.equals("cancel")) {
onCancel(call.arguments, reply);
} else {
reply.reply(null);
}
}
private void onListen(Object arguments, BinaryReply callback) {
final EventSink eventSink = new EventSinkImplementation();
final EventSink oldSink = activeSink.getAndSet(eventSink);
if (oldSink != null) {
// Repeated calls to onListen may happen during hot restart.
// We separate them with a call to onCancel.
try {
handler.onCancel(null);
} catch (RuntimeException e) {
Log.e(TAG + name, "Failed to close existing event stream", e);
}
}
try {
handler.onListen(arguments, eventSink);
callback.reply(codec.encodeSuccessEnvelope(null));
} catch (RuntimeException e) {
activeSink.set(null);
Log.e(TAG + name, "Failed to open event stream", e);
callback.reply(codec.encodeErrorEnvelope("error", e.getMessage(), null));
}
}
private void onCancel(Object arguments, BinaryReply callback) {
final EventSink oldSink = activeSink.getAndSet(null);//cancel之后会将activeSink设置为null,后续不能在使用之前的sink进行发送,发送前校验
if (oldSink != null) {
try {
handler.onCancel(arguments);
callback.reply(codec.encodeSuccessEnvelope(null));
public final class EventChannel {
private final class IncomingStreamRequestHandler implements BinaryMessageHandler {
private final class EventSinkImplementation implements EventSink {
final AtomicBoolean hasEnded = new AtomicBoolean(false);
@Override
@UiThread
public void success(Object event) {
if (hasEnded.get() || activeSink.get() != this) {
return;
}
EventChannel.this.messenger.send(name, codec.encodeSuccessEnvelope(event));
}
@Override
@UiThread
public void error(String errorCode, String errorMessage, Object errorDetails) {
if (hasEnded.get() || activeSink.get() != this) {//发送前校验activeSink是否是最新的,不是则不发送
return;
}
EventChannel.this.messenger.send(
name, codec.encodeErrorEnvelope(errorCode, errorMessage, errorDetails));
}
class EventChannel {
/// The logical channel on which communication happens, not null.
final String name;
/// The message codec used by this channel, not null.
final MethodCodec codec;
/// The messenger used by this channel to send platform messages, not null.
BinaryMessenger get binaryMessenger => _binaryMessenger ?? defaultBinaryMessenger;
final BinaryMessenger? _binaryMessenger;
final _eventChannel =
const EventChannel('platform_channel_events/connectivity');
Stream<dynamic> receiveBroadcastStream([ dynamic arguments ]) {
final MethodChannel methodChannel = MethodChannel(name, codec);
late StreamController<dynamic> controller;
controller = StreamController<dynamic>.broadcast(
onListen: () async {
binaryMessenger.setMessageHandler(name, (ByteData? reply) async {
if (reply == null) {
controller.close();
} else {
try {
controller.add(codec.decodeEnvelope(reply));
} on PlatformException catch (e) {
controller.addError(e);
}
}
return null;
});
await methodChannel.invokeMethod<void>('listen', arguments);
},
onCancel: () async {
binaryMessenger.setMessageHandler(name, null);
await methodChannel.invokeMethod<void>('cancel', arguments);
});
return controller.stream;
}