TaskRunners::TaskRunners(std::string label,
fml::RefPtr<fml::TaskRunner> platform,
fml::RefPtr<fml::TaskRunner> gpu,
fml::RefPtr<fml::TaskRunner> ui,
fml::RefPtr<fml::TaskRunner> io)
: label_(std::move(label)),
platform_(std::move(platform)),
gpu_(std::move(gpu)),
ui_(std::move(ui)),
io_(std::move(io)) {}
/// The collection of all the threads used by the engine.
ThreadHost
enum Type {
Platform = 1 << 0,
UI = 1 << 1,
GPU = 1 << 2,
IO = 1 << 3,
};
std::unique_ptr<fml::Thread> platform_thread;
std::unique_ptr<fml::Thread> ui_thread;
std::unique_ptr<fml::Thread> gpu_thread;
std::unique_ptr<fml::Thread> io_thread;
ThreadHost(std::string name_prefix, uint64_t type_mask);
ThreadHost::ThreadHost(std::string name_prefix, uint64_t mask) {
if (mask & ThreadHost::Type::Platform) {
platform_thread = std::make_unique<fml::Thread>(name_prefix + ".platform");
}
if (mask & ThreadHost::Type::UI) {
ui_thread = std::make_unique<fml::Thread>(name_prefix + ".ui");
}
if (mask & ThreadHost::Type::GPU) {
gpu_thread = std::make_unique<fml::Thread>(name_prefix + ".gpu");
}
if (mask & ThreadHost::Type::IO) {
io_thread = std::make_unique<fml::Thread>(name_prefix + ".io");
}
}
Thread
std::unique_ptr<std::thread> thread_;
fml::RefPtr<fml::TaskRunner> task_runner_;
Thread::Thread(const std::string& name) : joined_(false) {
fml::AutoResetWaitableEvent latch;
fml::RefPtr<fml::TaskRunner> runner;
thread_ = std::make_unique<std::thread>([&latch, &runner, name]() -> void {
SetCurrentThreadName(name);
fml::MessageLoop::EnsureInitializedForCurrentThread();
auto& loop = MessageLoop::GetCurrent();
runner = loop.GetTaskRunner();
latch.Signal();
loop.Run();
});
latch.Wait();
task_runner_ = runner;
}
MessageLoop
fml::RefPtr<MessageLoopImpl> loop_;
fml::RefPtr<fml::TaskRunner> task_runner_;
FML_THREAD_LOCAL ThreadLocalUniquePtr<MessageLoop> tls_message_loop;
void MessageLoop::EnsureInitializedForCurrentThread() {
if (tls_message_loop.get() != nullptr) {
// Already initialized.
return;
}
tls_message_loop.reset(new MessageLoop());
}
MessageLoop::MessageLoop()
: loop_(MessageLoopImpl::Create()),
task_runner_(fml::MakeRefCounted<fml::TaskRunner>(loop_)) {
}
MessageLoop& MessageLoop::GetCurrent() {
auto* loop = tls_message_loop.get();
return *loop;
}
void MessageLoop::Run() {
loop_->DoRun();
}
//TaskRunner
fml::RefPtr<MessageLoopImpl> loop_;
TaskRunner::TaskRunner(fml::RefPtr<MessageLoopImpl> loop)
: loop_(std::move(loop)) {}
MessageLoopImpl
// Exposed for the embedder shell which allows clients to poll for events
// instead of dedicating a thread to the message loop.
friend class MessageLoop;
fml::RefPtr<MessageLoopTaskQueues> task_queue_;
TaskQueueId queue_id_;
fml::RefPtr<MessageLoopImpl> MessageLoopImpl::Create() {
#if OS_MACOSX
return fml::MakeRefCounted<MessageLoopDarwin>();
#elif OS_ANDROID
return fml::MakeRefCounted<MessageLoopAndroid>();
#elif OS_FUCHSIA
return fml::MakeRefCounted<MessageLoopFuchsia>();
#elif OS_LINUX
return fml::MakeRefCounted<MessageLoopLinux>();
#elif OS_WIN
return fml::MakeRefCounted<MessageLoopWin>();
#else
return nullptr;
#endif
}
void MessageLoopImpl::DoRun() {
if (terminated_) {
// Message loops may be run only once.
return;
}
// Allow the implementation to do its thing.
Run();//main
// The loop may have been implicitly terminated. This can happen if the
// implementation supports termination via platform specific APIs or just
// error conditions. Set the terminated flag manually.
terminated_ = true;
// The message loop is shutting down. Check if there are expired tasks. This
// is the last chance for expired tasks to be serviced. Make sure the
// terminated flag is already set so we don't accrue additional tasks now.
RunExpiredTasksNow();
// When the message loop is in the process of shutting down, pending tasks
// should be destructed on the message loop's thread. We have just returned
// from the implementations |Run| method which we know is on the correct
// thread. Drop all pending tasks on the floor.
task_queue_->DisposeTasks(queue_id_);
}
MessageLoopAndroid
fml::UniqueObject<ALooper*, UniqueLooperTraits> looper_;
fml::UniqueFD timer_fd_;
MessageLoopAndroid::MessageLoopAndroid()
: looper_(AcquireLooperForThread()),
timer_fd_(::timerfd_create(kClockType, TFD_NONBLOCK | TFD_CLOEXEC)),
running_(false) {
static const int kWakeEvents = ALOOPER_EVENT_INPUT;
ALooper_callbackFunc read_event_fd = [](int, int events, void* data) -> int {//在epoll_wait被唤醒之后会回调本callback
if (events & kWakeEvents) {
reinterpret_cast<MessageLoopAndroid*>(data)->OnEventFired();
}
return 1; // continue receiving callbacks
};
int add_result = ::ALooper_addFd(looper_.get(), // looper
timer_fd_.get(), // fd
ALOOPER_POLL_CALLBACK, // ident
kWakeEvents, // events
read_event_fd, // callback
this // baton
);
/*此处是通过android中的ndk工具实现loop消息机制,对于1.ui, 1.gpu, 1.io线程会创建native的loop,对于main线程会复用Android原生的native loop。
ALooper_forThread:获取当前线程的loop,对应Looper::getForThread(),通过该线程key向TLS来查询是否存在已创建的c++层的loop
ALooper_prepare:创建新的loop,对应Looper::prepare(),在TLS中记录着该线程为key,loop为value的数据。
ALooper_acquire: 获取loop的引用,对应looper->incStrong(),也就是将引用计数加1;*/
static ALooper* AcquireLooperForThread() {
ALooper* looper = ALooper_forThread();
if (looper == nullptr) {
// No looper has been configured for the current thread. Create one and
// return the same.
looper = ALooper_prepare(0);
}
// The thread already has a looper. Acquire a reference to the same and return
// it.
ALooper_acquire(looper);
return looper;
}
//frameworks/base/native/android/looper.cpp
struct ALooper;//存在的价值是不希望调用者直接调用Looper的方法,只能调用Alooper开头的指定方法,提供封装性
/**
* ALooper
*
* A looper is the state tracking an event loop for a thread.
* Loopers do not define event structures or other such things; rather
* they are a lower-level facility to attach one or more discrete objects
* listening for an event. An "event" here is simply data available on
* a file descriptor: each attached object has an associated file descriptor,
* and waiting for "events" means (internally) polling on all of these file
* descriptors until one or more of them have data available.
*
* A thread can have only one ALooper associated with it.
*/
typedef struct ALooper ALooper;
32static inline ALooper* Looper_to_ALooper(Looper* looper) {
33 return reinterpret_cast<ALooper*>(looper);
34}
ALooper* ALooper_forThread() {
return Looper_to_ALooper(Looper::getForThread().get());
}
ALooper* ALooper_prepare(int opts) {
return Looper_to_ALooper(Looper::prepare(opts).get());
}
void ALooper_acquire(ALooper* looper) {
ALooper_to_Looper(looper)->incStrong((void*)ALooper_acquire);
}
void ALooper_release(ALooper* looper) {
ALooper_to_Looper(looper)->decStrong((void*)ALooper_acquire);
}
78int ALooper_addFd(ALooper* looper, int fd, int ident, int events,
79 ALooper_callbackFunc callback, void* data) {
80 return ALooper_to_Looper(looper)->addFd(fd, ident, events, callback, data);
81}
82
83int ALooper_removeFd(ALooper* looper, int fd) {
84 return ALooper_to_Looper(looper)->removeFd(fd);
85}
void MessageLoopAndroid::Run() {
FML_DCHECK(looper_.get() == ALooper_forThread());
running_ = true;
while (running_) {
int result = ::ALooper_pollOnce(-1, // infinite timeout
nullptr, // out fd,
nullptr, // out events,
nullptr // out data
);
if (result == ALOOPER_POLL_TIMEOUT || result == ALOOPER_POLL_ERROR) {
// This handles the case where the loop is terminated using ALooper APIs.
running_ = false;
}
}
}
52int ALooper_pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
53 sp<Looper> looper = Looper::getForThread();
54 if (looper == NULL) {
55 ALOGE("ALooper_pollOnce: No looper for this thread!");
56 return ALOOPER_POLL_ERROR;
57 }
58
59 IPCThreadState::self()->flushCommands();
60 return looper->pollOnce(timeoutMillis, outFd, outEvents, outData);
61}
void TaskRunner::PostTask(const fml::closure& task) {
loop_->PostTask(task, fml::TimePoint::Now());
}
void TaskRunner::PostTaskForTime(const fml::closure& task,
fml::TimePoint target_time) {
loop_->PostTask(task, target_time);
}
void TaskRunner::PostDelayedTask(const fml::closure& task,
fml::TimeDelta delay) {
loop_->PostTask(task, fml::TimePoint::Now() + delay);
}
void MessageLoopImpl::PostTask(const fml::closure& task,
fml::TimePoint target_time) {
task_queue_->RegisterTask(queue_id_, task, target_time);
}
//MessageLoopTaskQueues
void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id,
const fml::closure& task,
fml::TimePoint target_time) {
std::scoped_lock queue_lock(GetMutex(queue_id));
size_t order = order_++;
const auto& queue_entry = queue_entries_[queue_id];
queue_entry->delayed_tasks.push({order, task, target_time});
TaskQueueId loop_to_wake = queue_id;
if (queue_entry->subsumed_by != _kUnmerged) {
loop_to_wake = queue_entry->subsumed_by;
}
WakeUpUnlocked(loop_to_wake,
queue_entry->delayed_tasks.top().GetTargetTime());
}
void MessageLoopTaskQueues::WakeUpUnlocked(TaskQueueId queue_id,
fml::TimePoint time) const {
if (queue_entries_.at(queue_id)->wakeable) {
queue_entries_.at(queue_id)->wakeable->WakeUp(time);
}
}
void MessageLoopAndroid::WakeUp(fml::TimePoint time_point) {
bool result = TimerRearm(timer_fd_.get(), time_point);
FML_DCHECK(result);
}
//TimerRearm
bool TimerRearm(int fd, fml::TimePoint time_point) {
uint64_t nano_secs = time_point.ToEpochDelta().ToNanoseconds();
// "0" will disarm the timer, desired behavior is to immediately
// trigger the timer.
if (nano_secs < 1) {
nano_secs = 1;
}
struct itimerspec spec = {};
spec.it_value.tv_sec = (time_t)(nano_secs / NSEC_PER_SEC);
spec.it_value.tv_nsec = nano_secs % NSEC_PER_SEC;
spec.it_interval = spec.it_value; // single expiry.
int result = ::timerfd_settime(fd, TFD_TIMER_ABSTIME, &spec, nullptr);
return result == 0;
}
int timerfd_settime(int ufc,
int flags,
const struct itimerspec* utmr,
struct itimerspec* otmr) {
return syscall(__NR_timerfd_settime, ufc, flags, utmr, otmr);//通过系统调用__NR_timerfd_settime来设置定时唤醒。
}
void MessageLoopAndroid::OnEventFired() {
if (TimerDrain(timer_fd_.get())) {
RunExpiredTasksNow();
}}
void MessageLoopImpl::RunExpiredTasksNow() {
FlushTasks(FlushType::kAll);
}
void MessageLoopImpl::FlushTasks(FlushType type) {
TRACE_EVENT0("fml", "MessageLoop::FlushTasks");
std::vector<fml::closure> invocations;
//遍历整个延迟任务队列,将时间已到期的任务加入invocations
task_queue_->GetTasksToRunNow(queue_id_, type, invocations);
for (const auto& invocation : invocations) {
invocation();//执行postTask时设置的回调闭包,main
std::vector<fml::closure> observers =
task_queue_->GetObserversToNotify(queue_id_);
for (const auto& observer : observers) {//queue的observers被逐个调用
observer();
}
}
}
ThreadHost初始化
[-> flutter/shell/common/thread_host.cc]
ThreadHost::ThreadHost(std::string name_prefix, uint64_t mask) {
if (mask & ThreadHost::Type::Platform) {
platform_thread = std::make_unique<fml::Thread>(name_prefix + ".platform");
}
if (mask & ThreadHost::Type::UI) {
//创建线程 [见小节2.2]
ui_thread = std::make_unique<fml::Thread>(name_prefix + ".ui");
}
if (mask & ThreadHost::Type::GPU) {
gpu_thread = std::make_unique<fml::Thread>(name_prefix + ".gpu");
}
if (mask & ThreadHost::Type::IO) {
io_thread = std::make_unique<fml::Thread>(name_prefix + ".io");
}
}
TaskRunner初始化
[-> flutter/fml/task_runner.cc]
TaskRunner::TaskRunner(fml::RefPtr<MessageLoopImpl> loop)
: loop_(std::move(loop)) {}
Flutter引擎启动过程,会创建UI/GPU/IO这3个线程,并且会为每个线程依次创建MessageLoop对象,启动后处于epoll_wait等待状态。对于Flutter的消息机制跟Android原生的消息机制有很多相似之处,都有消息(或者任务)、消息队列以及Looper,有一点不同的是Android有一个Handler类,用于发送消息以及执行回调方法,相对应Flutter中有着相近功能的便是TaskRunner。
上图是从源码中提炼而来的任务处理流程,比官方流程图更容易理解一些复杂流程的时序问题,后续会专门讲解个中原由。Flutter的任务队列处理机制跟Android的消息队列处理相通,只不过Flutter分为Task和MicroTask两种类型,引擎和Dart虚拟机的事件以及Future都属于Task,Dart层执行scheduleMicrotask()所产生的属于Microtask。
每次Flutter引擎在消费任务时调用FlushTasks()方法,遍历整个延迟任务队列delayed_tasks_,将已到期的任务加入task队列,然后开始处理任务。
可简单理解为先处理完所有的Microtask,然后再处理Task。因为scheduleMicrotask()方法的调用自身就处于一个Task,执行完当前的task,也就意味着马上执行该Microtask。
了解了其工作机制,再来看看这4个Task Runner的具体工作内容。