engine: try to find optimal number of engine threads

Signed-off-by: Thorsten Liebig <Thorsten.Liebig@gmx.de>
This commit is contained in:
Thorsten Liebig 2023-01-06 20:01:07 +01:00
parent 63c5fe561d
commit 6673aefd70
9 changed files with 122 additions and 36 deletions

View File

@ -47,6 +47,8 @@ public:
virtual unsigned int GetNumberOfTimesteps() {return numTS;}
virtual void NextInterval(float curr_speed) {};
//this access functions muss be overloaded by any new engine using a different storage model
inline virtual FDTD_FLOAT GetVolt( unsigned int n, unsigned int x, unsigned int y, unsigned int z ) const { return volt[n][x][y][z]; }
inline virtual FDTD_FLOAT GetVolt( unsigned int n, const unsigned int pos[3] ) const { return volt[n][pos[0]][pos[1]][pos[2]]; }

View File

@ -55,6 +55,12 @@ Engine_Multithread::Engine_Multithread(const Operator_Multithread* op) : ENGINE_
m_IterateBarrier = 0;
m_startBarrier = 0;
m_stopBarrier = 0;
m_thread_group = 0;
m_max_numThreads = boost::thread::hardware_concurrency();
m_numThreads = 0;
m_last_speed = 0;
m_opt_speed = false;
m_stopThreads = true;
#ifdef ENABLE_DEBUG_TIME
m_MPI_Barrier = 0;
@ -92,27 +98,91 @@ void Engine_Multithread::setNumThreads( unsigned int numThreads )
void Engine_Multithread::Init()
{
m_stopThreads = true;
m_opt_speed = false;
ENGINE_MULTITHREAD_BASE::Init();
// initialize threads
m_stopThreads = false;
if (m_numThreads == 0)
m_numThreads = boost::thread::hardware_concurrency();
{
m_opt_speed = true;
m_numThreads = 1;
}
else if (m_numThreads > m_max_numThreads)
m_numThreads = m_max_numThreads;
#ifdef MPI_SUPPORT
m_MPI_Barrier = 0;
#endif
this->changeNumThreads(m_numThreads);
}
void Engine_Multithread::Reset()
{
if (!m_stopThreads) // prevent multiple invocations
{
ClearExtensions(); //prevent extensions from interfering with thread reset...
// stop the threads
//NS_Engine_Multithread::DBG().cout() << "stopping all threads" << endl;
m_iterTS = 1;
m_startBarrier->wait(); // start the threads
m_stopThreads = true;
m_stopBarrier->wait(); // wait for the threads to finish
m_thread_group->join_all(); // wait for termination
delete m_IterateBarrier;
m_IterateBarrier = 0;
delete m_startBarrier;
m_startBarrier = 0;
delete m_stopBarrier;
m_stopBarrier = 0;
delete m_thread_group;
m_thread_group = 0;
}
ENGINE_MULTITHREAD_BASE::Reset();
}
void Engine_Multithread::changeNumThreads(unsigned int numThreads)
{
if (m_thread_group!=0)
{
m_thread_group->interrupt_all();
//m_stopThreads = true;
//m_startBarrier->wait(); // start the threads
m_thread_group->join_all(); // wait for termination
delete m_thread_group;
m_thread_group = 0;
//m_stopThreads = false;
}
m_numThreads = numThreads;
if (g_settings.GetVerboseLevel()>0)
cout << "Multithreaded engine using " << m_numThreads << " threads. Utilization: (";
vector<unsigned int> m_Start_Lines;
vector<unsigned int> m_Stop_Lines;
m_Op_MT->CalcStartStopLines( m_numThreads, m_Start_Lines, m_Stop_Lines );
if (g_settings.GetVerboseLevel()>0)
cout << "Multithreaded engine using " << m_numThreads << " threads. Utilization: (";
if (m_IterateBarrier!=0)
delete m_IterateBarrier;
// make sure all threads are waiting
m_IterateBarrier = new boost::barrier(m_numThreads); // numThread workers
if (m_startBarrier!=0)
delete m_startBarrier;
m_startBarrier = new boost::barrier(m_numThreads+1); // numThread workers + 1 controller
if (m_stopBarrier!=0)
delete m_stopBarrier;
m_stopBarrier = new boost::barrier(m_numThreads+1); // numThread workers + 1 controller
#ifdef MPI_SUPPORT
m_MPI_Barrier = 0;
#endif
m_thread_group = new boost::thread_group();
for (unsigned int n=0; n<m_numThreads; n++)
{
unsigned int start = m_Start_Lines.at(n);
@ -130,50 +200,44 @@ void Engine_Multithread::Init()
cout << stop-start+1 << ";";
// NS_Engine_Multithread::DBG().cout() << "###DEBUG## Thread " << n << ": start=" << start << " stop=" << stop << " stop_h=" << stop_h << std::endl;
boost::thread *t = new boost::thread( NS_Engine_Multithread::thread(this,start,stop,stop_h,n) );
m_thread_group.add_thread( t );
m_thread_group->add_thread( t );
}
for (size_t n=0; n<m_Eng_exts.size(); ++n)
m_Eng_exts.at(n)->SetNumberOfThreads(m_numThreads);
}
void Engine_Multithread::Reset()
{
if (!m_stopThreads) // prevent multiple invocations
{
ClearExtensions(); //prevent extensions from interfering with thread reset...
// stop the threads
//NS_Engine_Multithread::DBG().cout() << "stopping all threads" << endl;
m_iterTS = 1;
m_startBarrier->wait(); // start the threads
m_stopThreads = true;
m_stopBarrier->wait(); // wait for the threads to finish
m_thread_group.join_all(); // wait for termination
delete m_IterateBarrier;
m_IterateBarrier = 0;
delete m_startBarrier;
m_startBarrier = 0;
delete m_stopBarrier;
m_stopBarrier = 0;
}
ENGINE_MULTITHREAD_BASE::Reset();
}
bool Engine_Multithread::IterateTS(unsigned int iterTS)
{
m_iterTS = iterTS;
//cout << "bool Engine_Multithread::IterateTS(): starting threads ...";
//cerr << "bool Engine_Multithread::IterateTS(): starting threads ...";
m_startBarrier->wait(); // start the threads
//cout << "... threads started";
//cerr << "... threads started" << endl;
m_stopBarrier->wait(); // wait for the threads to finish <iterTS> time steps
return true;
}
void Engine_Multithread::NextInterval(float curr_speed)
{
ENGINE_MULTITHREAD_BASE::NextInterval(curr_speed);
if (!m_opt_speed) return;
if (curr_speed<m_last_speed)
{
this->changeNumThreads(m_numThreads-1);
cout << "Multithreaded Engine: Best performance found using " << m_numThreads << " threads." << std::endl;
m_opt_speed = false;
}
else if (m_numThreads<m_max_numThreads)
{
m_last_speed = curr_speed;
this->changeNumThreads(m_numThreads+1);
}
}
void Engine_Multithread::DoPreVoltageUpdates(int threadID)
{
//execute extensions in reverse order -> highest priority gets access to the voltages last
@ -269,6 +333,11 @@ void thread::operator()()
m_enginePtr->m_startBarrier->wait();
//cout << "Thread " << boost::this_thread::get_id() << " waiting... started." << endl;
if (m_enginePtr->m_stopThreads)
{
return;
}
DEBUG_TIME( Timer timer1 );
for (unsigned int iter=0; iter<m_enginePtr->m_iterTS; ++iter)

View File

@ -90,6 +90,7 @@ public:
virtual void setNumThreads( unsigned int numThreads );
virtual void Init();
virtual void Reset();
virtual void NextInterval(float curr_speed);
//! Iterate \a iterTS number of timesteps
virtual bool IterateTS(unsigned int iterTS);
@ -104,13 +105,17 @@ public:
protected:
Engine_Multithread(const Operator_Multithread* op);
void changeNumThreads(unsigned int numThreads);
const Operator_Multithread* m_Op_MT;
boost::thread_group m_thread_group;
boost::thread_group *m_thread_group;
boost::barrier *m_startBarrier, *m_stopBarrier;
boost::barrier *m_IterateBarrier;
volatile unsigned int m_iterTS;
unsigned int m_numThreads; //!< number of worker threads
unsigned int m_max_numThreads; //!< max. number of worker threads
volatile bool m_stopThreads;
bool m_opt_speed;
float m_last_speed;
#ifdef MPI_SUPPORT
/*! Workaround needed for subgridding scheme... (see Engine_CylinderMultiGrid)

View File

@ -46,7 +46,7 @@ Operator_Cylinder::~Operator_Cylinder()
Engine* Operator_Cylinder::CreateEngine()
{
//! create a special cylindrical-engine
m_Engine = Engine_Cylinder::New(this, m_numThreads);
m_Engine = Engine_Cylinder::New(this, m_orig_numThreads);
return m_Engine;
}

View File

@ -51,7 +51,7 @@ Operator_CylinderMultiGrid* Operator_CylinderMultiGrid::New(vector<double> Split
Engine* Operator_CylinderMultiGrid::CreateEngine()
{
m_Engine = Engine_CylinderMultiGrid::New(this,m_numThreads);
m_Engine = Engine_CylinderMultiGrid::New(this, m_orig_numThreads);
return m_Engine;
}

View File

@ -36,11 +36,12 @@ Operator_Multithread::~Operator_Multithread()
void Operator_Multithread::setNumThreads( unsigned int numThreads )
{
m_numThreads = numThreads;
m_orig_numThreads = numThreads;
}
Engine* Operator_Multithread::CreateEngine()
{
m_Engine = Engine_Multithread::New(this,m_numThreads);
m_Engine = Engine_Multithread::New(this, m_orig_numThreads);
return m_Engine;
}
@ -106,7 +107,7 @@ void Operator_Multithread::CalcStartStopLines(unsigned int &numThreads, vector<u
int Operator_Multithread::CalcECOperator( DebugFlags debugFlags )
{
if (m_numThreads == 0)
if ((m_numThreads == 0) || (m_numThreads > boost::thread::hardware_concurrency()))
m_numThreads = boost::thread::hardware_concurrency();
vector<unsigned int> m_Start_Lines;

View File

@ -64,6 +64,7 @@ protected:
boost::thread_group m_thread_group;
unsigned int m_numThreads; // number of worker threads
unsigned int m_orig_numThreads;
//! Calculate the start/stop lines for the multithreading operator and engine.
/*!

View File

@ -245,6 +245,13 @@ bool openEMS::parseCommandLineArgument( const char *argv )
return false;
}
void openEMS::SetNumberOfThreads(int val)
{
if ((val<0) || (val>boost::thread::hardware_concurrency()))
val = boost::thread::hardware_concurrency();
m_engine_numThreads = val;
}
string openEMS::GetExtLibsInfo(string prefix)
{
stringstream str;
@ -1203,6 +1210,7 @@ void openEMS::RunFDTD()
if (m_DumpStats)
DumpRunStatistics(__OPENEMS_RUN_STAT_FILE__, t_run, currTS, speed, currE);
FDTD_Eng->NextInterval(speed);
}
}
if ((change>endCrit) && (FDTD_Op->GetExcitationSignal()->GetExciteType()==0))

View File

@ -76,7 +76,7 @@ public:
void SetTimeStepFactor(double val) {m_TS_fac=val;}
void SetMaxTime(double val) {m_maxTime=val;}
void SetNumberOfThreads(unsigned int val) {m_engine_numThreads = val;}
void SetNumberOfThreads(int val);
void DebugMaterial() {DebugMat=true;}
void DebugOperator() {DebugOp=true;}