nim_duilib/base/framework/win_io_message_pump.cpp

241 lines
6.0 KiB
C++
Raw Normal View History

2019-04-19 17:19:57 +08:00
// Copyright (c) 2011, NetEase Inc. All rights reserved.
//
// Author: wrt(guangguang)
// Date: 2011/6/8
//
// a implemention of a Windows specific message pump for I/O,
// the mechanism of which is from the Google Chrome project
#include "base/framework/win_io_message_pump.h"
#if defined(OS_WIN)
#include <assert.h>
#include <algorithm>
namespace nbase
{
WinIOMessagePump::WinIOMessagePump()
{
port_.Reset(::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1));
assert(port_.Valid());
}
void WinIOMessagePump::ScheduleWork()
{
if (::InterlockedExchange(&have_work_, 1))
return; // Someone else continued the pumping.
// Make sure the MessagePump does some work for us.
BOOL ret = ::PostQueuedCompletionStatus(port_, 0,
reinterpret_cast<ULONG_PTR>(this),
reinterpret_cast<OVERLAPPED*>(this));
assert(ret);
}
void WinIOMessagePump::ScheduleDelayedWork(const TimeTicks& delayed_work_time)
{
// We know that we can't be blocked right now since this method can only be
// called on the same thread as Run, so we only need to update our record of
// how long to sleep when we do sleep.
delayed_work_time_ = delayed_work_time;
}
void WinIOMessagePump::RegisterIOHandler(HANDLE file_handle, IOHandler *handler)
{
ULONG_PTR key = reinterpret_cast<ULONG_PTR>(handler);
HANDLE port = ::CreateIoCompletionPort(file_handle, port_, key, 1);
assert(port);
}
void WinIOMessagePump::DoRunLoop()
{
for (;;)
{
// If we do any work, we may create more messages etc., and more work may
// possibly be waiting in another task group. When we (for example)
// WaitForIOCompletion(), there is a good chance there are still more
// messages waiting. On the other hand, when any of these methods return
// having done no work, then it is pretty unlikely that calling them
// again quickly will find any work to do. Finally, if they all say they
// had no work, then it is a good time to consider sleeping (waiting) for
// more work.
bool more_work_is_plausible = state_->delegate->DoWork();
if (state_->should_quit)
break;
more_work_is_plausible |= WaitForIOCompletion(0, NULL);
if (state_->should_quit)
break;
more_work_is_plausible |=
state_->delegate->DoDelayedWork(&delayed_work_time_);
if (state_->should_quit)
break;
if (more_work_is_plausible)
continue;
more_work_is_plausible = state_->delegate->DoIdleWork();
if (state_->should_quit)
break;
if (more_work_is_plausible)
continue;
WaitForWork(); // Wait (sleep) until we have work to do again.
}
}
// Wait until IO completes, up to the time needed by the timer manager to fire
// the next set of timers.
void WinIOMessagePump::WaitForWork()
{
// We do not support nested IO message loops. This is to avoid messy
// recursion problems.
assert(state_->run_depth == 1);
int64_t timeout = GetCurrentDelay();
DWORD wait_time;
if (timeout < 0) // Negative value means no timers waiting.
wait_time = INFINITE;
else if (timeout > 0xfffffffe)
wait_time = 0xfffffffe;
else
wait_time = static_cast<DWORD>(timeout);
WaitForIOCompletion(wait_time, NULL);
}
bool WinIOMessagePump::WaitForIOCompletion(DWORD timeout, IOHandler *filter)
{
IOItem item;
if (completed_io_.empty() || !MatchCompletedIOItem(filter, &item))
{
// We have to ask the system for another IO completion.
if (!GetIOItem(timeout, &item))
return false;
if (ProcessInternalIOItem(item))
return true;
}
if (item.context->handler)
{
if (filter && item.handler != filter)
{
// Save this item for later
completed_io_.push_back(item);
}
else
{
assert(item.context->handler == item.handler);
PreProcessIOEvent();
item.handler->OnIOCompleted(item.context, item.bytes_transfered, item.error);
PostProcessIOEvent();
}
}
else
{
// The handler must be gone by now, just cleanup the mess.
delete item.context;
}
return true;
}
// Asks the OS for another IO completion result.
bool WinIOMessagePump::GetIOItem(DWORD timeout, IOItem* item)
{
memset(item, 0, sizeof(*item));
ULONG_PTR key = NULL;
OVERLAPPED* overlapped = NULL;
if (!::GetQueuedCompletionStatus(port_.Get(),
&item->bytes_transfered,
&key,
&overlapped,
timeout))
{
if (!overlapped)
return false; // Nothing in the queue.
item->error = GetLastError();
item->bytes_transfered = 0;
}
item->handler = reinterpret_cast<IOHandler*>(key);
item->context = reinterpret_cast<IOContext*>(overlapped);
return true;
}
bool WinIOMessagePump::ProcessInternalIOItem(const IOItem& item)
{
if (this == reinterpret_cast<WinIOMessagePump*>(item.context) &&
this == reinterpret_cast<WinIOMessagePump*>(item.handler))
{
// This is our internal completion.
assert(!item.bytes_transfered);
::InterlockedExchange(&have_work_, 0);
return true;
}
return false;
}
// Returns a completion item that was previously received.
bool WinIOMessagePump::MatchCompletedIOItem(IOHandler* filter, IOItem* item)
{
assert(!completed_io_.empty());
for (std::list<IOItem>::iterator it = completed_io_.begin(); it != completed_io_.end(); ++it)
{
if (!filter || it->handler == filter)
{
*item = *it;
completed_io_.erase(it);
return true;
}
}
return false;
}
void WinIOMessagePump::AddObserver(IOObserver *observer)
{
observers_.AddObserver(observer);
}
void WinIOMessagePump::RemoveObserver(IOObserver *observer)
{
observers_.RemoveObserver(observer);
}
void WinIOMessagePump::PreProcessIOEvent()
{
AutoLazyEraser lazy_eraser(&observers_);
size_t index = 0;
IOObserver* observer;
while (index < observers_.GetObserverCount())
{
observer = observers_.GetObserver(index++);
if (observer == NULL)
continue;
observer->PreProcessIOEvent();
}
}
void WinIOMessagePump::PostProcessIOEvent()
{
AutoLazyEraser lazy_eraser(&observers_);
size_t index = 0;
IOObserver* observer;
while (index < observers_.GetObserverCount())
{
observer = observers_.GetObserver(index++);
if (observer == NULL)
continue;
observer->PostProcessIOEvent();
}
}
} // namespace nbase
#endif // OS_WIN