no message

master
caiyuzheng 2021-03-15 23:07:33 +08:00
parent f571e39106
commit 7a62c0f824
4 changed files with 80 additions and 16 deletions

View File

@ -4,9 +4,14 @@ using namespace std;
#include "thread_usage.h" #include "thread_usage.h"
#include "threadpool.h" #include "threadpool.h"
extern "C"{
#include <time.h>
}
int main(){ int main(){
std::cout<<"test start"<<endl; std::cout<<"test start"<<endl;
try{ try{
std::cout<<"cpu count is "<<CoreCount()<<std::endl;
TestThreadPool(); TestThreadPool();
}catch( std::exception e){ }catch( std::exception e){
std::cout<<"exception"<<e.what(); std::cout<<"exception"<<e.what();

View File

@ -142,21 +142,29 @@ int TestRValue()
std::cout << "The contents of the vector are \"" << v[0]; std::cout << "The contents of the vector are \"" << v[0];
} }
int global = 0;
class TestTask : public general::Task class TestTask : public general::Task
{ {
public: public:
void Run() void Run()
{ {
std::cout<<"testwefwefwfwe"<<std::endl; std::cout<<"test"<<std::endl;
} }
}; };
// 42973.6
// 42942.3
int TestThreadPool() int TestThreadPool()
{ {
//run code
TestTask *t = new TestTask; TestTask *t = new TestTask;
general::ThreadPool pool(10); general::ThreadPool pool(12);
pool.Start(); pool.Start();
pool.AddTask(t); auto t1 = std::chrono::steady_clock::now();
// pool.Stop(); for(int i = 0;i < 200000;i++)
pool.AddTask(t);
pool.StopAll();
auto t2 = std::chrono::steady_clock::now();
double dr_ms=std::chrono::duration<double,std::milli>(t2-t1).count();
std::cout<<"count is "<<dr_ms<<std::endl;
getchar(); getchar();
} }

View File

@ -1,5 +1,19 @@
#include "threadpool.h" #include "threadpool.h"
// 参考于https://www.cnblogs.com/bigosprite/p/11071462.html // 参考于https://www.cnblogs.com/bigosprite/p/11071462.html
unsigned int CoreCount()
{
unsigned count = 1; // 至少一个
#if defined (LINUX)
count = sysconf(_SC_NPROCESSORS_CONF);
#elif defined (WINDOWS)
SYSTEM_INFO si;
GetSystemInfo(&si);
count = si.dwNumberOfProcessors;
#endif
return count;
}
namespace general{ namespace general{
CThreadPool::CThreadPool(int num) CThreadPool::CThreadPool(int num)
@ -9,15 +23,23 @@ namespace general{
if (num < 2){ if (num < 2){
mThreadCnt = 2; mThreadCnt = 2;
} }
mStoping = false;
} }
int CThreadPool::AddTask(Task *t){ int CThreadPool::AddTask(Task *t){
if ( nullptr == t){ if ( nullptr == t){
return -1; return -1;
} }
std::unique_lock<std::mutex> lck(this->mMutex); if(!mStoping){
this->mTasks.push(t); {
mCd.notify_one(); std::unique_lock<std::mutex> lck(this->mMutex);
this->mTasks.push(t);
}
mQueueAvaliableCondition.notify_one();
return 0;
}else{
return -2;
}
} }
void CThreadPool::Start(){ void CThreadPool::Start(){
@ -33,9 +55,12 @@ namespace general{
std::unique_lock<std::mutex> lk(this->mMutex); std::unique_lock<std::mutex> lk(this->mMutex);
while (mTasks.empty() && mStarted) while (mTasks.empty() && mStarted)
{ {
mCd.wait(lk); mQueueAvaliableCondition.wait(lk);
}
if(!mStarted){
return nullptr;
} }
Task *ret = this->mTasks.front(); Task *ret = this->mTasks.front();
this->mTasks.pop(); this->mTasks.pop();
return ret; return ret;
@ -43,11 +68,10 @@ namespace general{
void CThreadPool::Process(int id) void CThreadPool::Process(int id)
{ {
std::cout << "thread id " << id << " started " << std::endl; // std::cout << "thread id " << id << " started " << std::endl;
while (mStarted) while (mStarted)
{ {
Task *task = PopTask(); Task *task = PopTask();
std::cout<<"wake up "<<std::endl;
if (nullptr == task) if (nullptr == task)
{ {
continue; continue;
@ -64,7 +88,7 @@ namespace general{
{ {
std::lock_guard<std::mutex> lk(mMutex); std::lock_guard<std::mutex> lk(mMutex);
mStarted = false; mStarted = false;
mCd.notify_all(); mQueueAvaliableCondition.notify_all();
} }
for (auto &th : mThreads) for (auto &th : mThreads)
@ -73,19 +97,34 @@ namespace general{
} }
mStarted = false; mStarted = false;
} }
void CThreadPool::StopAll()
{
{
while(this->mTasks.size() > 0) ;
std::lock_guard<std::mutex> lk(mMutex);
mStarted = false;
mQueueAvaliableCondition.notify_all();
}
for (auto &th : mThreads)
{
th->join();
}
mStarted = false;
}
bool CThreadPool::Started(){ bool CThreadPool::Started(){
return this->mStarted; return this->mStarted;
} }
CThreadPool::~CThreadPool(){ CThreadPool::~CThreadPool(){
std::cout << "desruction" << std::endl;
if (this->mStarted) if (this->mStarted)
{ {
std::cout << "desruction" << std::endl;
this->mStarted = false;
{ {
std::lock_guard<std::mutex> lk(mMutex); std::lock_guard<std::mutex> lk(mMutex);
mStarted = false; mStarted = false;
mCd.notify_all(); mQueueAvaliableCondition.notify_all();
} }
for (size_t i = 0; i != this->mThreads.size(); ++i) for (size_t i = 0; i != this->mThreads.size(); ++i)

View File

@ -10,6 +10,16 @@
#include <utility> #include <utility>
#include "threadpool.h" #include "threadpool.h"
extern "C"{
#if !defined (_WIN32) && !defined (_WIN64)
#define LINUX
#include <sysconf.h>
#else
#define WINDOWS
#include <windows.h>
#endif
unsigned int CoreCount();
}
namespace general{ namespace general{
class CThreadPool; class CThreadPool;
enum PRIORITY enum PRIORITY
@ -58,16 +68,18 @@ namespace general{
void Start(); void Start();
void Stop(); void Stop();
bool Started(); bool Started();
void StopAll();
private: private:
CThreadPool(); CThreadPool();
void Process(int id); void Process(int id);
uint32_t mThreadCnt; uint32_t mThreadCnt;
bool mStarted; bool mStarted;
bool mStoping;
Task *PopTask(); Task *PopTask();
std::vector<std::thread*> mThreads; std::vector<std::thread*> mThreads;
std::queue<Task *> mTasks; std::queue<Task *> mTasks;
std::mutex mMutex; std::mutex mMutex;
std::condition_variable mCd; std::condition_variable mQueueAvaliableCondition;
}ThreadPool; }ThreadPool;
} }