// Copyright (c) 2011, NetEase Inc. All rights reserved.
//
// Author: wrt(guangguang)
// Date: 2011/06/09
//
// This file trys to implement a cross flatform message loop,
// the mechanism of which is from the Google Chrome project.

#include "base/framework/message_loop.h"

#include <assert.h>
#include "base/memory/lazy_instance.h"
#include "base/thread/thread_local.h"

namespace nbase
{

LazyInstance<ThreadLocalPointer<MessageLoop> > g_lazy_ptr;

MessageLoop::MessageLoop()
	: type_(kDefaultMessageLoop),
	  state_(NULL),
#if defined(OS_WIN)
	  os_modal_loop_(false),
#endif // OS_WIN
	  nestable_tasks_allowed_(true),
	  next_delayed_task_sequence_num_(0)
{
	// 一个线程内不能存在两个或以上MessageLoop
	assert(g_lazy_ptr.Pointer()->Get() == NULL);
	// 默认消息循环
	if (type_ == kDefaultMessageLoop)
		pump_.reset(new DefaultMessagePump);
	g_lazy_ptr.Pointer()->Set(this);

	message_loop_proxy_.reset(new MessageLoopProxy, &MessageLoopProxyTraits::Destruct);
	message_loop_proxy_->target_message_loop_ = this;
}

MessageLoop::~MessageLoop()
{
	bool has_work = false;

	// 清理未处理的任务可能导致生成新的任务,
	// 这里通过有限次的循环尝试清理这些新生成的任务
	for (int i = 0; i < 100; i++)
	{
		DeletePendingTasks();
		ReloadWorkQueue();
		has_work = DeletePendingTasks();
		if (!has_work)
			break;
	}

	assert(!has_work);

	PreDestruct();

	message_loop_proxy_->WillDestroyCurrentMessageLoop();
	message_loop_proxy_ = nullptr;

	g_lazy_ptr.Pointer()->Set(NULL);
}

MessageLoop* MessageLoop::current()
{
	return g_lazy_ptr.Pointer()->Get();
}

#if defined(OS_WIN)

UIMessageLoop* MessageLoop::ToUIMessageLoop()
{
	if (type_ == kUIMessageLoop)
		return reinterpret_cast<UIMessageLoop *>(this);
	return NULL;
}

IOMessageLoop* MessageLoop::ToIOMessageLoop()
{
	if (type_ == kIOMessageLoop)
		return reinterpret_cast<IOMessageLoop *>(this);
	return NULL;
}

void MessageLoop::RunWithDispatcher(Dispatcher *dispatcher)
{
	assert(this == current());
	AutoRunState state(this);
	state_->dispatcher = dispatcher;
	RunInternal();
}

#elif defined(OS_POSIX)
	
IOMessageLoop* MessageLoop::ToIOMessageLoop()
{
	if (type_ == kIOMessageLoop)
		return reinterpret_cast<IOMessageLoop *>(this);
	return NULL;
}
	
#endif // OS_WIN

void MessageLoop::Run()
{
	assert(this == current());
	AutoRunState state(this);
	RunInternal();
}

void MessageLoop::RunAllPending()
{
	assert(this == current());
	AutoRunState state(this);
	state_->quit_received = true;  // Means run until we would otherwise block.
	RunInternal();
}

void MessageLoop::RunInternal()
{
	assert(this == current());

#if defined(OS_WIN)
	if (state_->dispatcher && type() == kUIMessageLoop)
	{
		static_cast<WinUIMessagePump *>(pump_.get())->
			RunWithDispatcher(this, state_->dispatcher);
		return;
	}
#endif

	pump_->Run(this);
}

bool MessageLoop::DeletePendingTasks()
{
	bool has_work = false;
	while (!work_queue_.empty())
	{
		PendingTask task = work_queue_.front();
		work_queue_.pop();
		if (!task.delayed_run_time.is_null())
			AddToDelayedWorkQueue(task);
	}

	while (!deferred_non_nestable_work_queue_.empty())
		deferred_non_nestable_work_queue_.pop();

	has_work = !delayed_work_queue_.empty();
	while (!delayed_work_queue_.empty())
		delayed_work_queue_.pop();

	return has_work;
}

void MessageLoop::Quit()
{
	if (state_)
		state_->quit_received = true;
}

void MessageLoop::QuitNow()
{
	if (pump_)
		pump_->Quit();
}

void MessageLoop::PostTask(const StdClosure &task)
{
	PendingTask pending_task(task);
	AddToIncomingQueue(pending_task);
}

void MessageLoop::PostDelayedTask(const StdClosure &task, TimeDelta delay)
{
	PendingTask pending_task(task,
		TimeTicks::Now() + delay,
		true);
	AddToIncomingQueue(pending_task);
}

void MessageLoop::PostNonNestableTask(const StdClosure &task)
{
	PendingTask pending_task(task,
		TimeTicks(),
		false);
	AddToIncomingQueue(pending_task);
}

void MessageLoop::PostNonNestableDelayedTask(const StdClosure &task, TimeDelta delay)
{
	PendingTask pending_task(task,
		TimeTicks::Now() + delay,
		false);
	AddToIncomingQueue(pending_task);
}

TimeTicks MessageLoop::EvalDelayedRuntime(int64_t delay_ms)
{
	TimeTicks delayed_run_time;
	if (delay_ms > 0)
		delayed_run_time = TimeTicks::Now() + TimeDelta::FromMilliseconds(delay_ms);
	return delayed_run_time;
}

void MessageLoop::AddToIncomingQueue(const PendingTask &task)
{
	// 本方法可能会在另一个线程中被执行,所以必须线程安全
	std::shared_ptr<MessagePump> pump;
	{
		NAutoLock lock(&incoming_queue_lock_);
		bool was_empty = incoming_queue_.empty();
		incoming_queue_.push(task);
		if (!was_empty)
			return;
		// 因为这函数可能是间接地在另一个线程中被调用的,
		// 此时MessageLoop中可能正有任务在运行,
		// 这些任务中可能包含销毁MessageLoop的任务,
		// 为了保证对MessageLoop中的MessagePump引用有效,
		// 这里需要用到引用指针
		pump = pump_;
	}
	pump->ScheduleWork();
}

void MessageLoop::AddToDelayedWorkQueue(const PendingTask &task)
{
	PendingTask new_task(task);
	new_task.sequence_num = next_delayed_task_sequence_num_++;
	delayed_work_queue_.push(new_task);
}

void MessageLoop::ReloadWorkQueue()
{
	if (!work_queue_.empty())
		return;

	{
		NAutoLock lock(&incoming_queue_lock_);
		if (incoming_queue_.empty())
			return;
		// 常数时间交换内存
		work_queue_.Swap(&incoming_queue_);
	}
}

bool MessageLoop::DeferOrRunPendingTask(const PendingTask &task)
{
	// 任务符合立即执行的条件,那么执行之
	if (task.nestable || state_->run_depth == 1)
	{
		RunTask(task);
		return true;
	}
	// 不可嵌套任务,需要缓存之直到在最顶层MessageLoop中执行
	deferred_non_nestable_work_queue_.push(task);
	return false;
}

void MessageLoop::RunTask(const PendingTask &task)
{
	assert(nestable_tasks_allowed_);

	// 考虑到最坏情况下,任务可能是不可重入的,
	// 所以暂时禁用嵌套任务

	nestable_tasks_allowed_ = false;
	PendingTask pending_task = task;
	PreProcessTask();
	pending_task.Run();
	PostPrecessTask();
	nestable_tasks_allowed_ = true;
}

bool MessageLoop::DoWork()
{
	// 任务当前是否允许被执行
	if (!nestable_tasks_allowed_)
		return false;

	for (;;)
	{
		// 先从incoming队列取任务
		ReloadWorkQueue();
		if (work_queue_.empty())
			break;

		// 一次性处理work队列中的所有任务
		do
		{
			PendingTask task = work_queue_.front();
			work_queue_.pop();
			if (!task.delayed_run_time.is_null())
			{
				// 加入到定时任务队列
				AddToDelayedWorkQueue(task);
				// 如果加入的新任务是将被最先执行的,那么需要重新调度
				if (delayed_work_queue_.top().sequence_num == task.sequence_num)
					pump_->ScheduleDelayedWork(task.delayed_run_time);
			}
			else
			{
				if (DeferOrRunPendingTask(task))
					return true;
			}
		} while (!work_queue_.empty());
	}

	return false;
}

bool MessageLoop::DoDelayedWork(nbase::TimeTicks* next_delayed_work_time)
{
	if (!nestable_tasks_allowed_ || delayed_work_queue_.empty())
	{
		*next_delayed_work_time = recent_tick_ = TimeTicks();
		return false;
	}

	// recent_tick_记录最近一次调用TimeTick::Now时的时间,
	// 它不能代替TimeTick::Now,它是只是一个过去的Now的缓存,
	// 用来最大限度减少对TimeTick::Now的调用。
	// recent_tick_用来进行第一轮判断,要进行精确判断需要更新它为真正的TimeTick::Now

	TimeTicks next_run_time = delayed_work_queue_.top().delayed_run_time;
	if (next_run_time > recent_tick_)
	{
		// 可能是recent_tick_的不精确性引起,需要更新之
		recent_tick_ = TimeTicks::Now();
		if (next_run_time > recent_tick_)
		{
			// 真的是一个将来才需要被运行的任务,留到将来运行
			*next_delayed_work_time = next_run_time;
			return false;
		}
	}

	// 这个定时任务运行时刻已到,运行之
	PendingTask task = delayed_work_queue_.top();
	delayed_work_queue_.pop();

	if (!delayed_work_queue_.empty())
		*next_delayed_work_time = delayed_work_queue_.top().delayed_run_time;

	return DeferOrRunPendingTask(task);
}

bool MessageLoop::ProcessNextDelayedNonNestableTask()
{
	// 嵌套任务?
	if (state_->run_depth != 1)
		return false;

	if (deferred_non_nestable_work_queue_.empty())
		return false;

	PendingTask task = deferred_non_nestable_work_queue_.front();
	deferred_non_nestable_work_queue_.pop();
	RunTask(task);
	return true;
}

bool MessageLoop::DoIdleWork()
{
	// 进入Idle状态后,先尝试执行被缓存着的非嵌套任务
	if (ProcessNextDelayedNonNestableTask())
		return true;

	// 检查退出标记
	if (state_->quit_received)
		pump_->Quit();

	return false;
}

void MessageLoop::SetNestableTasksAllowed(bool allowed)
{
	if (nestable_tasks_allowed_ != allowed)
	{
		nestable_tasks_allowed_ = allowed;
		if (!nestable_tasks_allowed_)
			return;
		pump_->ScheduleWork();
	}
}

void MessageLoop::AddDestructionObserver(DestructionObserver *observer)
{
	assert(this == current());
	destruction_observers_.AddObserver(observer);
}

void MessageLoop::RemoveDestructionObserver(DestructionObserver *observer)
{
	assert(this == current());
	destruction_observers_.RemoveObserver(observer);
}

void MessageLoop::AddTaskObserver(TaskObserver *observer)
{
	assert(this == current());
	task_observers_.AddObserver(observer);
}

void MessageLoop::RemoveTaskObserver(TaskObserver *observer)
{
	assert(this == current());
	task_observers_.RemoveObserver(observer);
}

void MessageLoop::PreDestruct()
{
	size_t index = 0;
	DestructionObserver* observer;
	AutoLazyEraser lazy_eraser(&destruction_observers_);
	while (index < destruction_observers_.GetObserverCount())
	{
		observer = destruction_observers_.GetObserver(index++);
		if (observer == NULL)
			continue;
		observer->PreDestroyCurrentMessageLoop();
	}
}

void MessageLoop::PreProcessTask()
{
	size_t index = 0;
	TaskObserver* observer;
	AutoLazyEraser lazy_eraser(&task_observers_);
	while (index < task_observers_.GetObserverCount())
	{
		observer = task_observers_.GetObserver(index++);
		if (observer == NULL)
			continue;
		observer->PreProcessTask();
	}
	// NOT compact the observer list, here
}

void MessageLoop::PostPrecessTask()
{
	size_t index = 0;
	TaskObserver* observer;
	AutoLazyEraser lazy_eraser(&task_observers_);
	while (index < task_observers_.GetObserverCount())
	{
		observer = task_observers_.GetObserver(index++);
		if (observer == NULL)
			continue;
		observer->PostProcessTask();
	}
}

// the AutoRunState class
MessageLoop::AutoRunState::AutoRunState(MessageLoop* loop) : loop_(loop)
{
	// Make the loop reference us.
	previous_state_ = loop_->state_;
	if (previous_state_) {
		run_depth = previous_state_->run_depth + 1;
	} else {
		run_depth = 1;
	}
	loop_->state_ = this;

	// Initialize the other fields:
	quit_received = false;
#if defined(OS_WIN)
	dispatcher = NULL;
#endif
}

MessageLoop::AutoRunState::~AutoRunState()
{
	loop_->state_ = previous_state_;
}

MessageLoop::PendingTask::PendingTask(const StdClosure &task)
	: std_task(task), nestable(true), sequence_num(0)
{

}

MessageLoop::PendingTask::PendingTask(const StdClosure &task, TimeTicks delayed_run_time, bool nestable) :
	std_task(task), delayed_run_time(delayed_run_time),
	nestable(nestable), sequence_num(0)
{

}

MessageLoop::PendingTask::~PendingTask()
{

}

bool MessageLoop::PendingTask::operator<(const MessageLoop::PendingTask& other) const
{
	if (delayed_run_time > other.delayed_run_time)
		return true;
	if (delayed_run_time < other.delayed_run_time)
		return false;
	return sequence_num > other.sequence_num;
}

#if defined(OS_WIN)

// the UIMessageLoop class
UIMessageLoop::UIMessageLoop()
{
	pump_.reset(new UIMessagePump);
	type_ = kUIMessageLoop;
}

void UIMessageLoop::AddUIObserver(UIObserver* observer)
{
	assert(this == current());
	static_cast<UIMessagePump *>(pump())->AddObserver(observer);
}

void UIMessageLoop::RemoveUIObserver(UIObserver *observer)
{
	assert(this == current());
	static_cast<UIMessagePump *>(pump())->RemoveObserver(observer);
}
	
#endif // OS_WIN
	
// the IOMessageLoop class
IOMessageLoop::IOMessageLoop()
{
	pump_.reset(new IOMessagePump);
	type_ = kIOMessageLoop;
}

void IOMessageLoop::AddIOObserver(IOObserver* observer)
{
	assert(this == current());
	static_cast<IOMessagePump *>(pump())->AddObserver(observer);
}

void IOMessageLoop::RemoveIOObserver(IOObserver *observer)
{
	assert(this == current());
	static_cast<IOMessagePump *>(pump())->RemoveObserver(observer);
}

#if defined(OS_WIN)
void IOMessageLoop::RegisterIOHandler(HANDLE file_handle, IOHandler *handler)
{
	assert(this == current());
	static_cast<IOMessagePump *>(pump())->RegisterIOHandler(file_handle, handler);
}

bool IOMessageLoop::WaitForIOCompletion(DWORD timeout, IOHandler *handler)
{
	assert(this == current());
	return static_cast<IOMessagePump *>(pump())->WaitForIOCompletion(timeout, handler);
}

#elif defined(OS_POSIX)
bool IOMessageLoop::WatchFileDescriptor(int fd,
					                    bool persistent,
						                Mode mode,
						                FileDescriptorWatcher *controller,
						                Watcher *delegate)
{
	assert(this == current());
	return static_cast<IOMessagePump *>(pump())()->WatchFileDescriptor(
		fd, persistent, static_cast<LibeventMessagePump::Mode>(mode), controller, delegate);
}
#endif


} // namespace nbase