Statistics
| Branch: | Revision:

root / src / sim / cspinningthreadpool.cc @ a3d116e3

History | View | Annotate | Download (9.8 KB)

1 01873262 Georg Kunz
//=========================================================================
2
//  CSPINNINGTHREADPOOL.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//
8
//   Member functions of
9
//    cSpinningThreadPool : implements a pool of worker threads
10
//
11
//  Author: Georg Kunz
12
//
13
//=========================================================================
14
15
/*--------------------------------------------------------------*
16
 Copyright (C) 2010 Georg Kunz
17

18
 This file is distributed WITHOUT ANY WARRANTY. See the file
19
 `license' for details on this and other legal matters.
20
 *--------------------------------------------------------------*/
21
#include <string.h>
22
23
#include "cspinningthreadpool.h"
24
#include "cmessage.h"
25
#include "cconfiguration.h"
26
#include "casyncmodule.h"
27
#include "cconfigoption.h"
28
#ifndef NOBARRIER
29
#        include "cbarriermessage.h"
30
#endif
31
32
inline int operator <=(cSpinningThreadPool::threadState_t& a, cSpinningThreadPool::threadState_t& b)
33
{
34
    return (a.barrierTime < b.barrierTime) ? 1 :
35
           (a.barrierTime > b.barrierTime) ? 0 :
36
            a.priority <= b.priority;
37
}
38
39
inline int operator <(cSpinningThreadPool::threadState_t& a, cMessage& b)
40
{
41
    return (a.barrierTime < b.getArrivalTime()) ? 1 :
42
           (a.barrierTime > b.getArrivalTime()) ? 0 :
43
            a.priority < b.getSchedulingPriority();
44
}
45
46
cSpinningThreadPool::cSpinningThreadPool() :
47
    useMainThread(false)
48
{
49
    AO_store(&cancel, 0);
50
}
51
52
cSpinningThreadPool::~cSpinningThreadPool()
53
{
54
}
55
56
/**
57
 * Cleanup handler for thread cancelation. Used to unlock the mutex which
58
 * was locked by the thread before the call to pthread_cond_wait.
59
 */
60
void threadCleanup(void* arg)
61
{
62
}
63
64
void cSpinningThreadPool::doOneEvent(cMessage* msg)
65
{
66
    //
67
    // NOTE: possible race condition with de-register module which sets vect[...]
68
    //       to NULL. However, shouldn't be a problem since vect is ever increasing
69
    //
70
    cAsyncModule* mod = (cAsyncModule*) simulation.vect[msg->getArrivalModuleId()];
71
    cThreadPool::setDefaultOwner(mod);
72
73
#ifndef NOBARRIER
74
    cBarrierMessage* barrier = msg->getBarrier();
75
#endif
76
77
    //
78
    // call the module...
79
    //
80
    mod->callHandleAsyncMessage(msg);
81
82
83
    // indicate that the thread has finished
84
        //
85
        mod->unsetBusy();
86
87
    //
88
    // ... and lift the barrier
89
    // NOTE: this is actually a bit too early since the main thread might now
90
    //       continue, but still finds the flag of this thread set to busy
91
    //
92
#ifndef NOBARRIER
93
    barrier->signal();
94
#endif
95
}
96
97
void cSpinningThreadPool::worker()
98
{
99
        //Initialize Simtime. Not possible in constructor because Omnet needs to set the scaling exponent first.
100
        cThreadPool::getLocalData()->setSimTime(0.0);
101
102
    int threadId = 0;
103
    AO_t event;
104
105
    //
106
    // identify own ID
107
    //
108
    pthread_t id = pthread_self();
109
    for (unsigned int i = 0; i < numThreads; i++)
110
    {
111
        if (pthread_equal(id, workerIDs[i]))
112
        {
113
            threadId = i;
114
            break;
115
        }
116
    }
117
118
    //
119
    // start working
120
    //
121
    while (true)
122
    {
123
        //
124
        // wait for task
125
        //
126
        while ((event = AO_load_read(&threadStates[threadId * SPACING].msg)) == 0)
127
        {
128
            __asm__ ("pause");
129
        }
130
131
        if (AO_load_read(&cancel) == 1)
132
            break;
133
134
        //
135
        // collect required data
136
        //
137
        cMessage* msg = (cMessage*) event;
138
139
        doOneEvent(msg);
140
141
        //
142
        // signal completion
143
        //
144
        AO_store_write(&threadStates[threadId * SPACING].msg, 0);
145
    }
146
}
147
148
#define BARRIERVALID(i) (AO_load_read(&threadStates[i*SPACING].msg) !=0)
149
150
// Waits till there is no pseudo barrier message at the beginning of the message queue.
151
int cSpinningThreadPool::waitAtBarrier(int barrierMin, cMessageHeap* msgQueue)
152
{
153
    //
154
    // no need to block if there is no event left
155
    //
156
    cMessage* msg = msgQueue->peekFirst();
157
    if (!msg)
158
        return barrierMin;
159
160
    //
161
    // block at the current (valid) barrier if needed
162
    //
163
    if (barrierMin >= 0)
164
    {
165
        if (threadStates[barrierMin*SPACING] < *msg)
166
            while (BARRIERVALID(barrierMin)) {__asm__ ("pause");}
167
        else
168
            return barrierMin;
169
    }
170
171
    do
172
    {
173
        //
174
        // find next barrier
175
        //
176
        barrierMin = -1;
177
        for (unsigned int i = 0; i < numThreads; i++)
178
        {
179
            //
180
            // If the barrier we found is in use and we have either no valid min
181
            // pointer before or the one found now is smaller:
182
            //
183
            if (BARRIERVALID(i) && (barrierMin < 0 || threadStates[i*SPACING] <= threadStates[barrierMin*SPACING]))
184
            {
185
                // Then update the min pointer to the actually found item:
186
                barrierMin = i;
187
            }
188
        }
189
190
        //
191
        // Apparently, we have passed a barrier. If we had waited at this barrier for a longer time,
192
        // a new message might have arrived in between. For this reason we take a new message.
193
        // Note: We only take a new message if a barrier has been outdated, i.e. an event handler has finished.
194
        //
195
        cMessage* msg = msgQueue->peekFirst();
196
        if (!msg)
197
            return barrierMin;
198
199
        //
200
        // wait at the next valid barrier if needed of return new barrier pointer
201
        //
202
        if (barrierMin >= 0 && threadStates[barrierMin*SPACING] < *msg)
203
            while ((BARRIERVALID(barrierMin))) {__asm__ ("pause");}
204
        else
205
            return barrierMin;
206
207
    } while (true);
208
}
209
#undef BARRIERVALID
210
211 a3d116e3 Simon Tenbusch
bool cSpinningThreadPool::isBeforeBarrier(int barrierMin, cMessage* msg) {
212
    return (barrierMin >= 0 && threadStates[barrierMin*SPACING] < *msg);
213
}
214 01873262 Georg Kunz
215
Register_PerRunConfigOption(CFGID_SPINNING_THREADPOOL_THREAD_POOL_SIZE, "thread-pool-size", CFG_INT, "5", "Number of worker threads");
216
Register_PerRunConfigOption(CFGID_SPINNING_THREADPOOL_USE_MAIN_THREAD, "use-main-thread", CFG_BOOL, "false", "Use the Main Thread as a workerthread also");
217
218
void cSpinningThreadPool::activate()
219
{
220
    //
221
    // get configuration
222
    //
223
    useMainThread = ev.getConfig()->getAsBool(CFGID_SPINNING_THREADPOOL_USE_MAIN_THREAD);
224
    int numThreads = ev.getConfig()->getAsInt(CFGID_SPINNING_THREADPOOL_THREAD_POOL_SIZE);
225
226
    if (!useMainThread && numThreads == 0)
227
    {
228
        throw cRuntimeError("Thread pool size of 0 workers requested while "
229
                "use-main-thread==false. Noone would do any work. Exiting");
230
    }
231
232
    threadStates = new threadState_t[numThreads * SPACING];
233
234
    for (int i = 0; i < numThreads; i++)
235
          memset(&threadStates[i*SPACING], 0, sizeof(threadState_t));
236
237
    cThreadPool::activate();
238
}
239
240
void cSpinningThreadPool::shutdown()
241
{
242
    if (shutdownDone)
243
        return;
244
245
    //
246
    // we use "dummy" events to shutdown the threads
247
    //  i) wait for the thread to finish its current task (if any)
248
    // ii) set dummy value (1) to task to release thread again
249
    // The threads check the cancel flag after they have been released
250
    //
251
    AO_store_write(&cancel, 1);
252
    for (unsigned int i = 0; i < numThreads; i++)
253
    {
254
        std::cout << "waiting for thread " << i << std::endl;
255
        while (AO_load_read(&threadStates[i * SPACING].msg) != 0)
256
        {
257
            __asm__ ("pause");
258
        }
259
        AO_store_write(&threadStates[i * SPACING].msg, 1);
260
        pthread_join(workerIDs[i], NULL);
261
    }
262
263
    //
264
    // yes, we delete workerIDs here although it was alloced in the super
265
    // class. Maybe dirty, but simpler.
266
    //
267
    delete[] workerIDs;
268
    delete[] threadStates;
269
    shutdownDone = true;
270
}
271
272
int cSpinningThreadPool::insertTask(cMessage* msg, simtime_t duration, int barrierMin)
273
{
274
    int threadId;
275
    if (useMainThread)
276
        threadId = handleTaskWorkersAndMain(msg);
277
    else
278
        threadId = handleTaskWorkersOnly(msg);
279
280
    //
281
    // Did we assign it to a worker thread (or did we handle it ourselves)?
282
    //
283
    if (threadId >= 0)
284
    {
285
#ifdef NOBARRIER
286
        threadStates[threadId * SPACING].barrierTime = msg->getArrivalTime() + duration;
287
        threadStates[threadId * SPACING].priority = msg->getSchedulingPriority();
288
#endif
289
        AO_store_write(&threadStates[threadId * SPACING].msg, (size_t) msg);
290
    }
291
292
    // If barrierMin==threadId, we utilized the CPU with the currently active barrier again
293
    // Thus, we are not sure what to set the minPointer to, and we invalidate it.
294
    // This causes waitAtBarrier() to search for a new one.
295
    if(barrierMin==threadId) return -1;
296
297
    //
298
    // If we have either no valid min pointer before or the barrier just set is smaller:
299
    //
300
    if (barrierMin < 0 || threadStates[threadId * SPACING] <= threadStates[barrierMin * SPACING])
301
        //
302
        // Then update the min pointer to the actually set item:
303
        //
304
        return threadId;
305
    else
306
        return barrierMin;
307
}
308
309
int cSpinningThreadPool::handleTaskWorkersOnly(cMessage* msg)
310
{
311
    //
312
    // cycle through threads and find an idling one
313
    //
314
    unsigned int i = 0;
315
    while (AO_load_read(&threadStates[i * SPACING].msg) != 0)
316
    {
317
        i++;
318
        i %= numThreads;
319
    }
320
321
    return i;
322
}
323
324
int cSpinningThreadPool::handleTaskWorkersAndMain(cMessage *msg)
325
{
326
    //
327
    // cycle through threads and find an idling one
328
    //
329
    for (unsigned int i = 0; i < numThreads; i++)
330
    {
331
        if (AO_load_read(&threadStates[i * SPACING].msg) == 0)
332
        {
333
            //
334
            // assign message to this worker
335
            //
336
            return i;
337
        }
338
    }
339
340
    //
341
    // all threads busy, do it yourself
342
    // NOTE: if we have to handle a highly complex event here, we might block
343
    //       while we cannot pipe new events to the workers
344
    //
345
    doOneEvent(msg);
346
    return -1;
347
}
348
349
bool cSpinningThreadPool::barrierEmpty()
350
{
351
    // TODO: replace with global counter? -> cache contention?
352
    for (unsigned int i = 0; i < numThreads; i++)
353
        if (AO_load_read(&threadStates[i * SPACING].msg) != 0)
354
            return false;
355
    return true;
356
}