EventChannel

技术依赖

graph TB
EventChannel-->|baseOn|MethodChannel
EventChannel-->|baseOn|Stream

核心原理

sequenceDiagram

rect rgb(199, 237, 204)

Dart->>Dart: stream.listen
activate Dart
Dart->>Platform: methodChannel.invokeMethod(listen)
deactivate Dart

activate Platform

Platform->>Platform: onListen
activate Platform
Platform->>Platform: handler.onListen(EventSink)
deactivate Platform
deactivate Platform

end

rect rgb(199, 237, 204)

Platform->>Platform: eventSink.success
activate Platform
Platform->>Dart: messenger.send
deactivate Platform

activate Dart
Dart->>Dart: StreamController.add(T event)
deactivate Dart

end

Usage

Dart

StreamBuilder Use

demo参考/Users/qianpianpian/git/flutter/samples/platform_channels/lib/src/event_channel_demo.dart

使用StreamBuilder包裹stream

child: StreamBuilder<AccelerometerReadings>(
  stream: Accelerometer.readings,//stream
  builder: (context, snapshot) {
    if (snapshot.hasError) {
      return Text((snapshot.error as PlatformException).message);
    } else if (snapshot.hasData) {
      return Column(
        mainAxisAlignment: MainAxisAlignment.center,
        children: [
          Text(
            'x axis: ' + snapshot.data.x.toStringAsFixed(3),
            style: textStyle,
          ),
          Text(
            'y axis: ' + snapshot.data.y.toStringAsFixed(3),
            style: textStyle,
          ),
          Text(
            'z axis: ' + snapshot.data.z.toStringAsFixed(3),
            style: textStyle,
          )
        ],
      );
    }
/// This class includes the implementation for [EventChannel] to listen to value
/// changes from the Accelerometer sensor from native side. It has a [readings]
/// getter to provide a stream of [AccelerometerReadings].
class Accelerometer {
  static final _eventChannel = const EventChannel('eventChannelDemo');

  /// Method responsible for providing a stream of [AccelerometerReadings] to listen
  /// to value changes from the Accelerometer sensor.
  static Stream<AccelerometerReadings> get readings {
    return _eventChannel.receiveBroadcastStream().map(
          (dynamic event) => AccelerometerReadings(
            event[0] as double,
            event[1] as double,
            event[2] as double,
          ),
        );
  }
}

Direct Use

_eventChannel.receiveBroadcastStream().listen((event) {
  {
    print("NativeNetworkApi it = ${event}");
  }
});

Native

EventChannel(flutterEngine.dartExecutor, "eventChannelDemo")
        .setStreamHandler(AccelerometerStreamHandler(sensorManger, accelerometerSensor))
  
class AccelerometerStreamHandler(sManager: SensorManager, s: Sensor) : EventChannel.StreamHandler, SensorEventListener {
    private val sensorManager: SensorManager = sManager
    private val accelerometerSensor: Sensor = s
    private lateinit var eventSink: EventChannel.EventSink

    override fun onListen(arguments: Any?, events: EventChannel.EventSink?) {
        if (events != null) {
            eventSink = events//main
            sensorManager.registerListener(this, accelerometerSensor, SensorManager.SENSOR_DELAY_UI)
        }
    }

    override fun onCancel(arguments: Any?) {
        sensorManager.unregisterListener(this)
    }

    override fun onAccuracyChanged(sensor: Sensor?, accuracy: Int) {}

    override fun onSensorChanged(sensorEvent: SensorEvent?) {
        if (sensorEvent != null) {
            val axisValues = listOf(sensorEvent.values[0], sensorEvent.values[1], sensorEvent.values[2])
            eventSink.success(axisValues)//main
        }
    }
}
EventChannel(FlutterEngineCache.getInstance().get(FlutterBoost.ENGINE_ID)?.dartExecutor, networkEventChannel)
        .setStreamHandler(object : EventChannel.StreamHandler {
            override fun onListen(arguments: Any?, events: EventChannel.EventSink?) {
               eventSink = events//main
            }

            override fun onCancel(arguments: Any?) {
               eventSink = null
            }
        })

Messages.NativeNetworkApi.setup(FlutterEngineCache.getInstance().get(FlutterBoost.ENGINE_ID)?.dartExecutor, object : Messages.NativeNetworkApi {
    override fun request(arg: Messages.RequestParams?, result: Messages.Result<Messages.Resource>?) {
        eventSink?.success(mockNetworkResource("testEventChannel1").toMap())//main
        eventSink?.success(mockNetworkResource("testEventChannel2").toMap())
        eventSink?.success(mockNetworkResource("testEventChannel3").toMap())
        eventSink?.success(mockNetworkResource("testEventChannel4").toMap())
    }
})

NativeSide

public final class EventChannel {
  private final BinaryMessenger messenger;
  private final String name;
  private final MethodCodec codec;
  
}
public EventChannel(BinaryMessenger messenger, String name) {
  this(messenger, name, StandardMethodCodec.INSTANCE);
}
public EventChannel(BinaryMessenger messenger, String name, MethodCodec codec) {
  this.messenger = messenger;
  this.name = name;
  this.codec = codec;
}

EventChannel.setStreamHandler

@UiThread
public void setStreamHandler(final StreamHandler handler) {
  messenger.setMessageHandler(
      name, handler == null ? null : new IncomingStreamRequestHandler(handler));
}

IncomingStreamRequestHandler.onMessage

private final class IncomingStreamRequestHandler implements BinaryMessageHandler {
  private final StreamHandler handler;
  private final AtomicReference<EventSink> activeSink = new AtomicReference<>(null);

  IncomingStreamRequestHandler(StreamHandler handler) {
    this.handler = handler;
  }

  @Override
  public void onMessage(ByteBuffer message, final BinaryReply reply) {
    final MethodCall call = codec.decodeMethodCall(message);
    if (call.method.equals("listen")) {
      onListen(call.arguments, reply);
    } else if (call.method.equals("cancel")) {
      onCancel(call.arguments, reply);
    } else {
      reply.reply(null);
    }
  }
private void onListen(Object arguments, BinaryReply callback) {
  final EventSink eventSink = new EventSinkImplementation();
  final EventSink oldSink = activeSink.getAndSet(eventSink);
  if (oldSink != null) {
    // Repeated calls to onListen may happen during hot restart.
    // We separate them with a call to onCancel.
    try {
      handler.onCancel(null);
    } catch (RuntimeException e) {
      Log.e(TAG + name, "Failed to close existing event stream", e);
    }
  }
  try {
    handler.onListen(arguments, eventSink);
    callback.reply(codec.encodeSuccessEnvelope(null));
  } catch (RuntimeException e) {
    activeSink.set(null);
    Log.e(TAG + name, "Failed to open event stream", e);
    callback.reply(codec.encodeErrorEnvelope("error", e.getMessage(), null));
  }
}
private void onCancel(Object arguments, BinaryReply callback) {
  final EventSink oldSink = activeSink.getAndSet(null);//cancel之后会将activeSink设置为null,后续不能在使用之前的sink进行发送,发送前校验
  if (oldSink != null) {
    try {
      handler.onCancel(arguments);
      callback.reply(codec.encodeSuccessEnvelope(null));
public final class EventChannel {

private final class IncomingStreamRequestHandler implements BinaryMessageHandler {

private final class EventSinkImplementation implements EventSink {
  final AtomicBoolean hasEnded = new AtomicBoolean(false);

  @Override
  @UiThread
  public void success(Object event) {
    if (hasEnded.get() || activeSink.get() != this) {
      return;
    }
    EventChannel.this.messenger.send(name, codec.encodeSuccessEnvelope(event));
  }

  @Override
  @UiThread
  public void error(String errorCode, String errorMessage, Object errorDetails) {
    if (hasEnded.get() || activeSink.get() != this) {//发送前校验activeSink是否是最新的,不是则不发送
      return;
    }
    EventChannel.this.messenger.send(
        name, codec.encodeErrorEnvelope(errorCode, errorMessage, errorDetails));
  }

DartSide

class EventChannel {
  /// The logical channel on which communication happens, not null.
  final String name;

  /// The message codec used by this channel, not null.
  final MethodCodec codec;

  /// The messenger used by this channel to send platform messages, not null.
  BinaryMessenger get binaryMessenger => _binaryMessenger ?? defaultBinaryMessenger;
  final BinaryMessenger? _binaryMessenger;

final _eventChannel =
    const EventChannel('platform_channel_events/connectivity');

EventChannel.receiveBroadcastStream

Stream<dynamic> receiveBroadcastStream([ dynamic arguments ]) {
  final MethodChannel methodChannel = MethodChannel(name, codec);
  late StreamController<dynamic> controller;
  controller = StreamController<dynamic>.broadcast(
    onListen: () async {
    binaryMessenger.setMessageHandler(name, (ByteData? reply) async {
      if (reply == null) {
        controller.close();
      } else {
        try {
          controller.add(codec.decodeEnvelope(reply));
        } on PlatformException catch (e) {
          controller.addError(e);
        }
      }
      return null;
    });
      await methodChannel.invokeMethod<void>('listen', arguments);
  }, 
    onCancel: () async {
    binaryMessenger.setMessageHandler(name, null);
    await methodChannel.invokeMethod<void>('cancel', arguments);
  });
  return controller.stream;
}