Project

General

Profile

Statistics
| Branch: | Revision:

root / src / sim / cthreadpool.cc @ 3e29b8a0

History | View | Annotate | Download (5.16 KB)

1 01873262 Georg Kunz
//=========================================================================
2
//  CTHREADPOOL.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//
8
//   Member functions of
9
//    cThreadPool : A pool of worker threads. This class only implements
10
//                  creation and shutdown of the threads. The actual
11
//                  event processing is implemented in derived classes.
12
//
13
//  Author: Georg Kunz
14
//
15
//=========================================================================
16
17
/*--------------------------------------------------------------*
18
  Copyright (C) 2010 Georg Kunz
19

20
  This file is distributed WITHOUT ANY WARRANTY. See the file
21
  `license' for details on this and other legal matters.
22
 *--------------------------------------------------------------*/
23
24
#include <iostream>
25
#include <stdlib.h>
26
#include <cstring>
27
#include <cstdlib>
28
29
#include <unistd.h>
30
#include <sys/signal.h>
31
32
#include "cthreadpool.h"
33
#include "cmessage.h"
34
#include "cconfiguration.h"
35
#include "casyncmodule.h"
36
#include "cdefaultlist.h"
37
#include "cbarriermessage.h"
38
#include "sysdep.h"
39
#include "regmacros.h"
40
#include "cconfigoption.h"
41
42
//#define TIMING_DEBUG
43
44
#ifdef TIMING_DEBUG
45
#include <time.h>
46
#endif
47
48
USING_NAMESPACE
49
50
Register_GlobalConfigOption(CFG_IN_THREADPOOL_CPU_ID_OFFSET, "cpu-id-offset", CFG_INT, "0", "Offset for Thread Affinity. (When running multiple instances of Horizon at once)");
51
Register_PerRunConfigOption(CFGID_THREADPOOL_SIZE, "thread-pool-size", CFG_INT, "5", "Number of Threads in Threadpool");
52
Register_GlobalConfigOption(CFGID_SIMTIME_SCALE, "simtime-scale", CFG_INT, "-12", "Sets the scale exponent, and thus the resolution of time for the 64-bit fixed-point simulation time representation. Accepted values are -18..0; for example, -6 selects microsecond resolution. -12 means picosecond resolution, with a maximum simtime of ~110 days.");
53
54
55
static __thread cThreadLocalData* localData = NULL;
56
57
58
cThreadPool::cThreadPool() :
59
                    shutdownDone(false)
60
{
61
}
62
63
64
cThreadPool::~cThreadPool()
65
{
66
}
67
68
69
static void* startWorker(void* arg)
70
                {
71
72
        cThreadPool* pool = (cThreadPool*)arg;
73
        pool->worker();
74
        return NULL;
75
                }
76
77
78
void cThreadPool::activate()
79
{
80
        //
81
        // Get configuration data.
82
        //
83
84
        numThreads = ev.getConfig()->getAsInt(CFGID_THREADPOOL_SIZE);
85
        unsigned cpuCount = (unsigned)sysconf(_SC_NPROCESSORS_ONLN);
86
87
        if (numThreads >= cpuCount) {
88
                std::cerr << "WARNING: The requested number of " << numThreads
89
                                << " worker "
90
                                << "threads plus the scheduler thread  exceeds the number of "
91
                                << cpuCount << " physical CPUs. Falling back to " << cpuCount
92
                                - 1 << " worker threads." << std::endl;
93
                numThreads = cpuCount - 1;
94
                sleep(1);
95
96
        }
97
98
        unsigned cpuIdOffset = ev.getConfig()->getAsInt(CFG_IN_THREADPOOL_CPU_ID_OFFSET);
99
        if (numThreads + cpuIdOffset >= cpuCount)
100
        {
101
                throw cRuntimeError("Mis-configured CPU-id offset: exceeding number of"
102
                                " physical CPUs");
103
        }
104
105
        //
106
        // create worker threads
107
        //
108
        std::cout << "starting " << numThreads << " threads" << std::endl;
109
110
        workerIDs = new pthread_t[numThreads];
111
        for (uint i = 0; i < numThreads; i++)
112
        {
113
                int err = pthread_create(&workerIDs[i], NULL, &startWorker, this);
114
                if (err == 0)
115
                {
116
                        std::cout << "created thread " << i << std::endl;
117
                        thread_set_affinity(workerIDs[i], i+1 + cpuIdOffset);
118
                }
119
                else
120
                {
121
                        throw cRuntimeError(this, "Unable to create worker thread: %s", strerror(err));
122
                }
123
                thread_set_affinity(pthread_self(), 0 + cpuIdOffset);
124
        }
125
}
126
127
128
void cThreadPool::shutdown()
129
{
130
        //
131
        // check if we already shut down the thread pool
132
        //
133
        if (shutdownDone)
134
                return;
135
136
        //
137
        // wait for threads to exit.
138
        //
139
        for (uint i = 0; i < numThreads; i++)
140
        {
141
                int err = pthread_cancel(workerIDs[i]);
142
                if (err != 0)
143
                        throw cRuntimeError(this, "Unable to cancel worker thread: %s", strerror(err));
144
145
                std::cout << "waiting for thread " << i << std::endl;
146
147
                err = pthread_join(workerIDs[i], NULL);
148
                if (err != 0)
149
                        throw cRuntimeError(this, "Unable to join worker thread: %s", strerror(err));
150
        }
151
        delete[] workerIDs;
152
        shutdownDone = true;
153
}
154
155
156
cThreadLocalData* cThreadPool::getLocalData()
157
{
158
        if (localData == NULL)
159
        {
160
                localData = new cThreadLocalData();
161
        }
162
        return localData;
163
}
164
165
166
void cThreadPool::setDefaultOwner(cDefaultList* list)
167
{
168
        cThreadLocalData* data = cThreadPool::getLocalData();
169
        data->setDefaultOwner(list);
170
}
171
172
173
cDefaultList* cThreadPool::getDefaultOwner()
174
{
175
        cThreadLocalData* data = cThreadPool::getLocalData();
176
        cDefaultList* list = data->getDefaultOwner();
177
        if (list == NULL)
178
        {
179
                data->setDefaultOwner(&defaultList);
180
                return &defaultList;
181
        }
182
        else
183
        {
184
                return list;
185
        }
186
}
187
188
189
void cThreadPool::setSimTime(simtime_t t)
190
{
191
192
        cThreadLocalData* data = cThreadPool::getLocalData();
193
        data->setSimTime(t);
194
}
195
196
197
simtime_t cThreadPool::getSimTime()
198
{
199
        return cThreadPool::getLocalData()->getSimTime();
200
}
201
202
203
cComponent* cThreadPool::getContext()
204
{
205
        return cThreadPool::getLocalData()->getContextModule();
206
}
207
208
209
void cThreadPool::setContext(cComponent* mod)
210
{
211
        cThreadLocalData* data = cThreadPool::getLocalData();
212
        data->setContextModule(mod);
213
}
214
215
216
// -------------------------------------
217
218
219
cThreadLocalData::cThreadLocalData() :
220
                                    defaultOwner(NULL)
221
{
222
223
}
224
225
226
cThreadLocalData::~cThreadLocalData()
227
{
228
        // nothing to do (yet)
229
}
230
NAMESPACE_END