MessageLoop

androidshellholder图解

TaskRunners::TaskRunners(std::string label,
                         fml::RefPtr<fml::TaskRunner> platform,
                         fml::RefPtr<fml::TaskRunner> gpu,
                         fml::RefPtr<fml::TaskRunner> ui,
                         fml::RefPtr<fml::TaskRunner> io)
    : label_(std::move(label)),
      platform_(std::move(platform)),
      gpu_(std::move(gpu)),
      ui_(std::move(ui)),
      io_(std::move(io)) {}

ThreadHost::ThreadHost

/// The collection of all the threads used by the engine.
ThreadHost

  enum Type {
    Platform = 1 << 0,
    UI = 1 << 1,
    GPU = 1 << 2,
    IO = 1 << 3,
  };

  std::unique_ptr<fml::Thread> platform_thread;
  std::unique_ptr<fml::Thread> ui_thread;
  std::unique_ptr<fml::Thread> gpu_thread;
  std::unique_ptr<fml::Thread> io_thread;

ThreadHost(std::string name_prefix, uint64_t type_mask);

ThreadHost::ThreadHost(std::string name_prefix, uint64_t mask) {
  if (mask & ThreadHost::Type::Platform) {
    platform_thread = std::make_unique<fml::Thread>(name_prefix + ".platform");
  }

  if (mask & ThreadHost::Type::UI) {
    ui_thread = std::make_unique<fml::Thread>(name_prefix + ".ui");
  }

  if (mask & ThreadHost::Type::GPU) {
    gpu_thread = std::make_unique<fml::Thread>(name_prefix + ".gpu");
  }

  if (mask & ThreadHost::Type::IO) {
    io_thread = std::make_unique<fml::Thread>(name_prefix + ".io");
  }
}

Thread::Thread

Thread

  std::unique_ptr<std::thread> thread_;
  fml::RefPtr<fml::TaskRunner> task_runner_;
  
  Thread::Thread(const std::string& name) : joined_(false) {
    fml::AutoResetWaitableEvent latch;
    fml::RefPtr<fml::TaskRunner> runner;
    thread_ = std::make_unique<std::thread>([&latch, &runner, name]() -> void {
      SetCurrentThreadName(name);
      fml::MessageLoop::EnsureInitializedForCurrentThread();
      auto& loop = MessageLoop::GetCurrent();
      runner = loop.GetTaskRunner();
      latch.Signal();
      loop.Run();
    });
    latch.Wait();
    task_runner_ = runner;
}

tls_message_loop.reset(new MessageLoop());

MessageLoop

  fml::RefPtr<MessageLoopImpl> loop_;
  fml::RefPtr<fml::TaskRunner> task_runner_;

FML_THREAD_LOCAL ThreadLocalUniquePtr<MessageLoop> tls_message_loop;

void MessageLoop::EnsureInitializedForCurrentThread() {
  if (tls_message_loop.get() != nullptr) {
    // Already initialized.
    return;
  }
  tls_message_loop.reset(new MessageLoop());
}

MessageLoop::MessageLoop()
    : loop_(MessageLoopImpl::Create()),
      task_runner_(fml::MakeRefCounted<fml::TaskRunner>(loop_)) {
}

MessageLoop& MessageLoop::GetCurrent() {
  auto* loop = tls_message_loop.get();
  return *loop;
}

void MessageLoop::Run() {
  loop_->DoRun();
}

TaskRunner::TaskRunner

//TaskRunner
  
fml::RefPtr<MessageLoopImpl> loop_;

TaskRunner::TaskRunner(fml::RefPtr<MessageLoopImpl> loop)
    : loop_(std::move(loop)) {}

MessageLoopImpl::Create

MessageLoopImpl
  
  // Exposed for the embedder shell which allows clients to poll for events
  // instead of dedicating a thread to the message loop.
  friend class MessageLoop;

fml::RefPtr<MessageLoopTaskQueues> task_queue_;
TaskQueueId queue_id_;

fml::RefPtr<MessageLoopImpl> MessageLoopImpl::Create() {
#if OS_MACOSX
  return fml::MakeRefCounted<MessageLoopDarwin>();
#elif OS_ANDROID
  return fml::MakeRefCounted<MessageLoopAndroid>();
#elif OS_FUCHSIA
  return fml::MakeRefCounted<MessageLoopFuchsia>();
#elif OS_LINUX
  return fml::MakeRefCounted<MessageLoopLinux>();
#elif OS_WIN
  return fml::MakeRefCounted<MessageLoopWin>();
#else
  return nullptr;
#endif
}

void MessageLoopImpl::DoRun() {
  if (terminated_) {
    // Message loops may be run only once.
    return;
  }

  // Allow the implementation to do its thing.
  Run();//main

  // The loop may have been implicitly terminated. This can happen if the
  // implementation supports termination via platform specific APIs or just
  // error conditions. Set the terminated flag manually.
  terminated_ = true;

  // The message loop is shutting down. Check if there are expired tasks. This
  // is the last chance for expired tasks to be serviced. Make sure the
  // terminated flag is already set so we don't accrue additional tasks now.
  RunExpiredTasksNow();

  // When the message loop is in the process of shutting down, pending tasks
  // should be destructed on the message loop's thread. We have just returned
  // from the implementations |Run| method which we know is on the correct
  // thread. Drop all pending tasks on the floor.
  task_queue_->DisposeTasks(queue_id_);
}

MessageLoopAndroid::Run

MessageLoopAndroid
  
  fml::UniqueObject<ALooper*, UniqueLooperTraits> looper_;
  fml::UniqueFD timer_fd_;

MessageLoopAndroid::MessageLoopAndroid()
    : looper_(AcquireLooperForThread()),
      timer_fd_(::timerfd_create(kClockType, TFD_NONBLOCK | TFD_CLOEXEC)),
      running_(false) {

  static const int kWakeEvents = ALOOPER_EVENT_INPUT;

  ALooper_callbackFunc read_event_fd = [](int, int events, void* data) -> int {//在epoll_wait被唤醒之后会回调本callback
    if (events & kWakeEvents) {
      reinterpret_cast<MessageLoopAndroid*>(data)->OnEventFired();
    }
    return 1;  // continue receiving callbacks
  };

  int add_result = ::ALooper_addFd(looper_.get(),          // looper
                                   timer_fd_.get(),        // fd
                                   ALOOPER_POLL_CALLBACK,  // ident
                                   kWakeEvents,            // events
                                   read_event_fd,          // callback
                                   this                    // baton
  );
/*此处是通过android中的ndk工具实现loop消息机制,对于1.ui, 1.gpu, 1.io线程会创建native的loop,对于main线程会复用Android原生的native loop。

ALooper_forThread:获取当前线程的loop,对应Looper::getForThread(),通过该线程key向TLS来查询是否存在已创建的c++层的loop
ALooper_prepare:创建新的loop,对应Looper::prepare(),在TLS中记录着该线程为key,loop为value的数据。
ALooper_acquire: 获取loop的引用,对应looper->incStrong(),也就是将引用计数加1;*/
static ALooper* AcquireLooperForThread() {
  ALooper* looper = ALooper_forThread();
  if (looper == nullptr) {
    // No looper has been configured for the current thread. Create one and
    // return the same.
    looper = ALooper_prepare(0);
  }

  // The thread already has a looper. Acquire a reference to the same and return
  // it.
  ALooper_acquire(looper);
  return looper;
}
//frameworks/base/native/android/looper.cpp
struct ALooper;//存在的价值是不希望调用者直接调用Looper的方法,只能调用Alooper开头的指定方法,提供封装性
/**
 * ALooper
 *
 * A looper is the state tracking an event loop for a thread.
 * Loopers do not define event structures or other such things; rather
 * they are a lower-level facility to attach one or more discrete objects
 * listening for an event.  An "event" here is simply data available on
 * a file descriptor: each attached object has an associated file descriptor,
 * and waiting for "events" means (internally) polling on all of these file
 * descriptors until one or more of them have data available.
 *
 * A thread can have only one ALooper associated with it.
 */
typedef struct ALooper ALooper;

32static inline ALooper* Looper_to_ALooper(Looper* looper) {
33    return reinterpret_cast<ALooper*>(looper);
34}

ALooper* ALooper_forThread() {
    return Looper_to_ALooper(Looper::getForThread().get());
}

ALooper* ALooper_prepare(int opts) {
    return Looper_to_ALooper(Looper::prepare(opts).get());
}

void ALooper_acquire(ALooper* looper) {
    ALooper_to_Looper(looper)->incStrong((void*)ALooper_acquire);
}

void ALooper_release(ALooper* looper) {
    ALooper_to_Looper(looper)->decStrong((void*)ALooper_acquire);
}

78int ALooper_addFd(ALooper* looper, int fd, int ident, int events,
79        ALooper_callbackFunc callback, void* data) {
80    return ALooper_to_Looper(looper)->addFd(fd, ident, events, callback, data);
81}
82
83int ALooper_removeFd(ALooper* looper, int fd) {
84    return ALooper_to_Looper(looper)->removeFd(fd);
85}
void MessageLoopAndroid::Run() {
  FML_DCHECK(looper_.get() == ALooper_forThread());
  running_ = true;
  while (running_) {
    int result = ::ALooper_pollOnce(-1,       // infinite timeout
                                    nullptr,  // out fd,
                                    nullptr,  // out events,
                                    nullptr   // out data
    );
    if (result == ALOOPER_POLL_TIMEOUT || result == ALOOPER_POLL_ERROR) {
      // This handles the case where the loop is terminated using ALooper APIs.
      running_ = false;
    }
  }
}
52int ALooper_pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
53    sp<Looper> looper = Looper::getForThread();
54    if (looper == NULL) {
55        ALOGE("ALooper_pollOnce: No looper for this thread!");
56        return ALOOPER_POLL_ERROR;
57    }
58
59    IPCThreadState::self()->flushCommands();
60    return looper->pollOnce(timeoutMillis, outFd, outEvents, outData);
61}

TaskRunner::PostTask

void TaskRunner::PostTask(const fml::closure& task) {
  loop_->PostTask(task, fml::TimePoint::Now());
}

void TaskRunner::PostTaskForTime(const fml::closure& task,
                                 fml::TimePoint target_time) {
  loop_->PostTask(task, target_time);
}

void TaskRunner::PostDelayedTask(const fml::closure& task,
                                 fml::TimeDelta delay) {
  loop_->PostTask(task, fml::TimePoint::Now() + delay);
}

MessageLoopImpl::PostTask

void MessageLoopImpl::PostTask(const fml::closure& task,
                               fml::TimePoint target_time) {
  task_queue_->RegisterTask(queue_id_, task, target_time);
}

MessageLoopTaskQueues::RegisterTask

//MessageLoopTaskQueues
void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id,
                                         const fml::closure& task,
                                         fml::TimePoint target_time) {
  std::scoped_lock queue_lock(GetMutex(queue_id));

  size_t order = order_++;
  const auto& queue_entry = queue_entries_[queue_id];
  queue_entry->delayed_tasks.push({order, task, target_time});
  TaskQueueId loop_to_wake = queue_id;
  if (queue_entry->subsumed_by != _kUnmerged) {
    loop_to_wake = queue_entry->subsumed_by;
  }
  WakeUpUnlocked(loop_to_wake,
                 queue_entry->delayed_tasks.top().GetTargetTime());
}

void MessageLoopTaskQueues::WakeUpUnlocked(TaskQueueId queue_id,
                                           fml::TimePoint time) const {
  if (queue_entries_.at(queue_id)->wakeable) {
    queue_entries_.at(queue_id)->wakeable->WakeUp(time);
  }
}
void MessageLoopAndroid::WakeUp(fml::TimePoint time_point) {
  bool result = TimerRearm(timer_fd_.get(), time_point);
  FML_DCHECK(result);
}

//TimerRearm
bool TimerRearm(int fd, fml::TimePoint time_point) {
  uint64_t nano_secs = time_point.ToEpochDelta().ToNanoseconds();

  // "0" will disarm the timer, desired behavior is to immediately
  // trigger the timer.
  if (nano_secs < 1) {
    nano_secs = 1;
  }

  struct itimerspec spec = {};
  spec.it_value.tv_sec = (time_t)(nano_secs / NSEC_PER_SEC);
  spec.it_value.tv_nsec = nano_secs % NSEC_PER_SEC;
  spec.it_interval = spec.it_value;  // single expiry.

  int result = ::timerfd_settime(fd, TFD_TIMER_ABSTIME, &spec, nullptr);
  return result == 0;
}

int timerfd_settime(int ufc,
                    int flags,
                    const struct itimerspec* utmr,
                    struct itimerspec* otmr) {
  return syscall(__NR_timerfd_settime, ufc, flags, utmr, otmr);//通过系统调用__NR_timerfd_settime来设置定时唤醒。
}

looper唤醒之后的回调处理

void MessageLoopAndroid::OnEventFired() {
  if (TimerDrain(timer_fd_.get())) {
    RunExpiredTasksNow();
  }}
void MessageLoopImpl::RunExpiredTasksNow() {
  FlushTasks(FlushType::kAll);
}

void MessageLoopImpl::FlushTasks(FlushType type) {
  TRACE_EVENT0("fml", "MessageLoop::FlushTasks");
  std::vector<fml::closure> invocations;

  //遍历整个延迟任务队列,将时间已到期的任务加入invocations
  task_queue_->GetTasksToRunNow(queue_id_, type, invocations);

  for (const auto& invocation : invocations) {
    invocation();//执行postTask时设置的回调闭包,main
    std::vector<fml::closure> observers =
        task_queue_->GetObserversToNotify(queue_id_);
    for (const auto& observer : observers) {//queue的observers被逐个调用
      observer();
    }
  }
}

其他

ThreadHost初始化

[-> flutter/shell/common/thread_host.cc]

ThreadHost::ThreadHost(std::string name_prefix, uint64_t mask) {
  if (mask & ThreadHost::Type::Platform) {
    platform_thread = std::make_unique<fml::Thread>(name_prefix + ".platform");
  }

  if (mask & ThreadHost::Type::UI) {
    //创建线程 [见小节2.2]
    ui_thread = std::make_unique<fml::Thread>(name_prefix + ".ui");
  }

  if (mask & ThreadHost::Type::GPU) {
    gpu_thread = std::make_unique<fml::Thread>(name_prefix + ".gpu");
  }

  if (mask & ThreadHost::Type::IO) {
    io_thread = std::make_unique<fml::Thread>(name_prefix + ".io");
  }
}

TaskRunner初始化

[-> flutter/fml/task_runner.cc]

TaskRunner::TaskRunner(fml::RefPtr<MessageLoopImpl> loop)
    : loop_(std::move(loop)) {}

Flutter引擎启动过程,会创建UI/GPU/IO这3个线程,并且会为每个线程依次创建MessageLoop对象,启动后处于epoll_wait等待状态。对于Flutter的消息机制跟Android原生的消息机制有很多相似之处,都有消息(或者任务)、消息队列以及Looper,有一点不同的是Android有一个Handler类,用于发送消息以及执行回调方法,相对应Flutter中有着相近功能的便是TaskRunner。

image

上图是从源码中提炼而来的任务处理流程,比官方流程图更容易理解一些复杂流程的时序问题,后续会专门讲解个中原由。Flutter的任务队列处理机制跟Android的消息队列处理相通,只不过Flutter分为Task和MicroTask两种类型,引擎和Dart虚拟机的事件以及Future都属于Task,Dart层执行scheduleMicrotask()所产生的属于Microtask。

每次Flutter引擎在消费任务时调用FlushTasks()方法,遍历整个延迟任务队列delayed_tasks_,将已到期的任务加入task队列,然后开始处理任务。

  • Step 1: 检查task,当task队列不为空,先执行一个task;
  • Step 2: 检查microTask,当microTask不为空,则执行microTask;不断循环Step 2 直到microTask队列为空,再回到执行Step 1;

可简单理解为先处理完所有的Microtask,然后再处理Task。因为scheduleMicrotask()方法的调用自身就处于一个Task,执行完当前的task,也就意味着马上执行该Microtask。

了解了其工作机制,再来看看这4个Task Runner的具体工作内容。

  • Platform Task Runner:运行在Android或者iOS的主线程,尽管阻塞该线程并不会影响Flutter渲染管道,平台线程建议不要执行耗时操作;否则可能触发watchdog来结束该应用。比如Android、iOS都是使用平台线程来传递用户输入事件,一旦平台线程被阻塞则会引起手势事件丢失。
  • UI Task Runner: 运行在ui线程,比如1.ui,用于引擎执行root isolate中的所有Dart代码,执行渲染与处理Vsync信号,将widget转换生成Layer Tree。除了渲染之外,还有处理Native Plugins消息、Timers、Microtasks等工作;
  • GPU Task Runner:运行在gpu线程,比如1.gpu,用于将Layer Tree转换为具体GPU指令,执行设备GPU相关的skia调用,转换相应平台的绘制方式,比如OpenGL, vulkan, metal等。每一帧的绘制需要UI Runner和GPU Runner配合完成,任何一个环节延迟都可能导致掉帧;
  • IO Task Runner:运行在io线程,比如1.io,前3个Task Runner都不允许执行耗时操作,该Runner用于将图片从磁盘读取出来,解压转换为GPU可识别的格式后,再上传给GPU线程。为了能访问GPU,IO Runner跟GPU Runner的Context在同一个ShareGroup。比如ui.image通过异步调用让IO Runner来异步加载图片,该线程不能执行其他耗时操作,否则可能会影响图片加载的性能。

深入理解Flutter消息机制

深入理解Flutter异步Future机制

深入理解Flutter的Isolate创建过程