Statistics
| Branch: | Revision:

root / src / sim / cspinningthreadpool.cc @ e26d3d25

History | View | Annotate | Download (9.66 KB)

1 01873262 Georg Kunz
//=========================================================================
2
//  CSPINNINGTHREADPOOL.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//
8
//   Member functions of
9
//    cSpinningThreadPool : implements a pool of worker threads
10
//
11
//  Author: Georg Kunz
12
//
13
//=========================================================================
14
15
/*--------------------------------------------------------------*
16
 Copyright (C) 2010 Georg Kunz
17

18
 This file is distributed WITHOUT ANY WARRANTY. See the file
19
 `license' for details on this and other legal matters.
20
 *--------------------------------------------------------------*/
21
#include <string.h>
22
23
#include "cspinningthreadpool.h"
24
#include "cmessage.h"
25
#include "cconfiguration.h"
26
#include "casyncmodule.h"
27
#include "cconfigoption.h"
28
#ifndef NOBARRIER
29
#        include "cbarriermessage.h"
30
#endif
31
32
inline int operator <=(cSpinningThreadPool::threadState_t& a, cSpinningThreadPool::threadState_t& b)
33
{
34
    return (a.barrierTime < b.barrierTime) ? 1 :
35
           (a.barrierTime > b.barrierTime) ? 0 :
36
            a.priority <= b.priority;
37
}
38
39
inline int operator <(cSpinningThreadPool::threadState_t& a, cMessage& b)
40
{
41
    return (a.barrierTime < b.getArrivalTime()) ? 1 :
42
           (a.barrierTime > b.getArrivalTime()) ? 0 :
43
            a.priority < b.getSchedulingPriority();
44
}
45
46
cSpinningThreadPool::cSpinningThreadPool() :
47
    useMainThread(false)
48
{
49
    AO_store(&cancel, 0);
50
}
51
52
cSpinningThreadPool::~cSpinningThreadPool()
53
{
54
}
55
56
/**
57
 * Cleanup handler for thread cancelation. Used to unlock the mutex which
58
 * was locked by the thread before the call to pthread_cond_wait.
59
 */
60
void threadCleanup(void* arg)
61
{
62
}
63
64
void cSpinningThreadPool::doOneEvent(cMessage* msg)
65
{
66
    //
67
    // NOTE: possible race condition with de-register module which sets vect[...]
68
    //       to NULL. However, shouldn't be a problem since vect is ever increasing
69
    //
70
    cAsyncModule* mod = (cAsyncModule*) simulation.vect[msg->getArrivalModuleId()];
71
    cThreadPool::setDefaultOwner(mod);
72
73
#ifndef NOBARRIER
74
    cBarrierMessage* barrier = msg->getBarrier();
75
#endif
76
77
    //
78
    // call the module...
79
    //
80
    mod->callHandleAsyncMessage(msg);
81
82
83
    // indicate that the thread has finished
84
        //
85
        mod->unsetBusy();
86
87
    //
88
    // ... and lift the barrier
89
    // NOTE: this is actually a bit too early since the main thread might now
90
    //       continue, but still finds the flag of this thread set to busy
91
    //
92
#ifndef NOBARRIER
93
    barrier->signal();
94
#endif
95
}
96
97
void cSpinningThreadPool::worker()
98
{
99
        //Initialize Simtime. Not possible in constructor because Omnet needs to set the scaling exponent first.
100
        cThreadPool::getLocalData()->setSimTime(0.0);
101
102
    int threadId = 0;
103
    AO_t event;
104
105
    //
106
    // identify own ID
107
    //
108
    pthread_t id = pthread_self();
109
    for (unsigned int i = 0; i < numThreads; i++)
110
    {
111
        if (pthread_equal(id, workerIDs[i]))
112
        {
113
            threadId = i;
114
            break;
115
        }
116
    }
117
118
    //
119
    // start working
120
    //
121
    while (true)
122
    {
123
        //
124
        // wait for task
125
        //
126
        while ((event = AO_load_read(&threadStates[threadId * SPACING].msg)) == 0)
127
        {
128
            __asm__ ("pause");
129
        }
130
131
        if (AO_load_read(&cancel) == 1)
132
            break;
133
134
        //
135
        // collect required data
136
        //
137
        cMessage* msg = (cMessage*) event;
138
139
        doOneEvent(msg);
140
141
        //
142
        // signal completion
143
        //
144
        AO_store_write(&threadStates[threadId * SPACING].msg, 0);
145
    }
146
}
147
148
#define BARRIERVALID(i) (AO_load_read(&threadStates[i*SPACING].msg) !=0)
149
150
// Waits till there is no pseudo barrier message at the beginning of the message queue.
151
int cSpinningThreadPool::waitAtBarrier(int barrierMin, cMessageHeap* msgQueue)
152
{
153
    //
154
    // no need to block if there is no event left
155
    //
156
    cMessage* msg = msgQueue->peekFirst();
157
    if (!msg)
158
        return barrierMin;
159
160
    //
161
    // block at the current (valid) barrier if needed
162
    //
163
    if (barrierMin >= 0)
164
    {
165
        if (threadStates[barrierMin*SPACING] < *msg)
166
            while (BARRIERVALID(barrierMin)) {__asm__ ("pause");}
167
        else
168
            return barrierMin;
169
    }
170
171
    do
172
    {
173
        //
174
        // find next barrier
175
        //
176
        barrierMin = -1;
177
        for (unsigned int i = 0; i < numThreads; i++)
178
        {
179
            //
180
            // If the barrier we found is in use and we have either no valid min
181
            // pointer before or the one found now is smaller:
182
            //
183
            if (BARRIERVALID(i) && (barrierMin < 0 || threadStates[i*SPACING] <= threadStates[barrierMin*SPACING]))
184
            {
185
                // Then update the min pointer to the actually found item:
186
                barrierMin = i;
187
            }
188
        }
189
190
        //
191
        // Apparently, we have passed a barrier. If we had waited at this barrier for a longer time,
192
        // a new message might have arrived in between. For this reason we take a new message.
193
        // Note: We only take a new message if a barrier has been outdated, i.e. an event handler has finished.
194
        //
195
        cMessage* msg = msgQueue->peekFirst();
196
        if (!msg)
197
            return barrierMin;
198
199
        //
200
        // wait at the next valid barrier if needed of return new barrier pointer
201
        //
202
        if (barrierMin >= 0 && threadStates[barrierMin*SPACING] < *msg)
203
            while ((BARRIERVALID(barrierMin))) {__asm__ ("pause");}
204
        else
205
            return barrierMin;
206
207
    } while (true);
208
}
209
210
#undef BARRIERVALID
211
212
213
Register_PerRunConfigOption(CFGID_SPINNING_THREADPOOL_THREAD_POOL_SIZE, "thread-pool-size", CFG_INT, "5", "Number of worker threads");
214
Register_PerRunConfigOption(CFGID_SPINNING_THREADPOOL_USE_MAIN_THREAD, "use-main-thread", CFG_BOOL, "false", "Use the Main Thread as a workerthread also");
215
216
void cSpinningThreadPool::activate()
217
{
218
    //
219
    // get configuration
220
    //
221
    useMainThread = ev.getConfig()->getAsBool(CFGID_SPINNING_THREADPOOL_USE_MAIN_THREAD);
222
    int numThreads = ev.getConfig()->getAsInt(CFGID_SPINNING_THREADPOOL_THREAD_POOL_SIZE);
223
224
    if (!useMainThread && numThreads == 0)
225
    {
226
        throw cRuntimeError("Thread pool size of 0 workers requested while "
227
                "use-main-thread==false. Noone would do any work. Exiting");
228
    }
229
230
    threadStates = new threadState_t[numThreads * SPACING];
231
232
    for (int i = 0; i < numThreads; i++)
233
          memset(&threadStates[i*SPACING], 0, sizeof(threadState_t));
234
235
    cThreadPool::activate();
236
}
237
238
void cSpinningThreadPool::shutdown()
239
{
240
    if (shutdownDone)
241
        return;
242
243
    //
244
    // we use "dummy" events to shutdown the threads
245
    //  i) wait for the thread to finish its current task (if any)
246
    // ii) set dummy value (1) to task to release thread again
247
    // The threads check the cancel flag after they have been released
248
    //
249
    AO_store_write(&cancel, 1);
250
    for (unsigned int i = 0; i < numThreads; i++)
251
    {
252
        std::cout << "waiting for thread " << i << std::endl;
253
        while (AO_load_read(&threadStates[i * SPACING].msg) != 0)
254
        {
255
            __asm__ ("pause");
256
        }
257
        AO_store_write(&threadStates[i * SPACING].msg, 1);
258
        pthread_join(workerIDs[i], NULL);
259
    }
260
261
    //
262
    // yes, we delete workerIDs here although it was alloced in the super
263
    // class. Maybe dirty, but simpler.
264
    //
265
    delete[] workerIDs;
266
    delete[] threadStates;
267
    shutdownDone = true;
268
}
269
270
int cSpinningThreadPool::insertTask(cMessage* msg, simtime_t duration, int barrierMin)
271
{
272
    int threadId;
273
    if (useMainThread)
274
        threadId = handleTaskWorkersAndMain(msg);
275
    else
276
        threadId = handleTaskWorkersOnly(msg);
277
278
    //
279
    // Did we assign it to a worker thread (or did we handle it ourselves)?
280
    //
281
    if (threadId >= 0)
282
    {
283
#ifdef NOBARRIER
284
        threadStates[threadId * SPACING].barrierTime = msg->getArrivalTime() + duration;
285
        threadStates[threadId * SPACING].priority = msg->getSchedulingPriority();
286
#endif
287
        AO_store_write(&threadStates[threadId * SPACING].msg, (size_t) msg);
288
    }
289
290
    // If barrierMin==threadId, we utilized the CPU with the currently active barrier again
291
    // Thus, we are not sure what to set the minPointer to, and we invalidate it.
292
    // This causes waitAtBarrier() to search for a new one.
293
    if(barrierMin==threadId) return -1;
294
295
    //
296
    // If we have either no valid min pointer before or the barrier just set is smaller:
297
    //
298
    if (barrierMin < 0 || threadStates[threadId * SPACING] <= threadStates[barrierMin * SPACING])
299
        //
300
        // Then update the min pointer to the actually set item:
301
        //
302
        return threadId;
303
    else
304
        return barrierMin;
305
}
306
307
int cSpinningThreadPool::handleTaskWorkersOnly(cMessage* msg)
308
{
309
    //
310
    // cycle through threads and find an idling one
311
    //
312
    unsigned int i = 0;
313
    while (AO_load_read(&threadStates[i * SPACING].msg) != 0)
314
    {
315
        i++;
316
        i %= numThreads;
317
    }
318
319
    return i;
320
}
321
322
int cSpinningThreadPool::handleTaskWorkersAndMain(cMessage *msg)
323
{
324
    //
325
    // cycle through threads and find an idling one
326
    //
327
    for (unsigned int i = 0; i < numThreads; i++)
328
    {
329
        if (AO_load_read(&threadStates[i * SPACING].msg) == 0)
330
        {
331
            //
332
            // assign message to this worker
333
            //
334
            return i;
335
        }
336
    }
337
338
    //
339
    // all threads busy, do it yourself
340
    // NOTE: if we have to handle a highly complex event here, we might block
341
    //       while we cannot pipe new events to the workers
342
    //
343
    doOneEvent(msg);
344
    return -1;
345
}
346
347
bool cSpinningThreadPool::barrierEmpty()
348
{
349
    // TODO: replace with global counter? -> cache contention?
350
    for (unsigned int i = 0; i < numThreads; i++)
351
        if (AO_load_read(&threadStates[i * SPACING].msg) != 0)
352
            return false;
353
    return true;
354
}