Statistics
| Branch: | Revision:

root / src / sim / cspinningthreadpool.cc @ 77cb9420

History | View | Annotate | Download (11.9 KB)

1 01873262 Georg Kunz
//=========================================================================
2
//  CSPINNINGTHREADPOOL.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//
8
//   Member functions of
9
//    cSpinningThreadPool : implements a pool of worker threads
10
//
11
//  Author: Georg Kunz
12
//
13
//=========================================================================
14
15
/*--------------------------------------------------------------*
16
 Copyright (C) 2010 Georg Kunz
17

18
 This file is distributed WITHOUT ANY WARRANTY. See the file
19
 `license' for details on this and other legal matters.
20
 *--------------------------------------------------------------*/
21
#include <string.h>
22
23
#include "cspinningthreadpool.h"
24
#include "cmessage.h"
25
#include "cconfiguration.h"
26
#include "casyncmodule.h"
27
#include "cconfigoption.h"
28
#ifndef NOBARRIER
29
#        include "cbarriermessage.h"
30
#endif
31
32 b9e9c37a Simon Tenbusch
inline bool operator <=(cSpinningThreadPool::threadState_t& a, cSpinningThreadPool::threadState_t& b)
33 01873262 Georg Kunz
{
34 b9e9c37a Simon Tenbusch
    return  a.data.barrierTime < b.data.barrierTime ? true :
35
            a.data.barrierTime > b.data.barrierTime ? false :
36
            a.data.priority < b.data.priority ? true :
37
            a.data.priority > b.data.priority ? false :
38
            a.data.parentStartTime < b.data.parentStartTime ? true :
39
            a.data.parentStartTime > b.data.parentStartTime ? false :
40
            a.data.parentExecutionOrderId < b.data.parentExecutionOrderId ? true :
41
            a.data.parentExecutionOrderId > b.data.parentExecutionOrderId ? false :
42 006e5abb Simon Tenbusch
            a.data.schedulingOrderId < b.data.schedulingOrderId ? true :
43
            a.data.schedulingOrderId > b.data.schedulingOrderId ? false :
44
            a.data.insertOrder <= b.data.insertOrder;
45 01873262 Georg Kunz
}
46
47 b9e9c37a Simon Tenbusch
inline bool operator <(cSpinningThreadPool::threadState_t& a, cMessage& b)
48 01873262 Georg Kunz
{
49 006e5abb Simon Tenbusch
    //Assertion that we never have to "equal" events according to ordering
50 b9e9c37a Simon Tenbusch
    assert(!(a.data.barrierTime == b.getArrivalTime() &&
51
             a.data.priority == b.getSchedulingPriority() &&
52
             a.data.parentStartTime == b.getParentStartTime() &&
53
             a.data.parentExecutionOrderId == b.getParentExecutionOrderId() &&
54 006e5abb Simon Tenbusch
             a.data.schedulingOrderId == b.getSchedulingOrderId() &&
55
             a.data.insertOrder == b.getInsertOrder() )
56 b9e9c37a Simon Tenbusch
             );
57
58
    return  a.data.barrierTime < b.getArrivalTime() ? true :
59
            a.data.barrierTime > b.getArrivalTime() ? false :
60
            a.data.priority < b.getSchedulingPriority() ? true :
61
            a.data.priority > b.getSchedulingPriority() ? false :
62
            a.data.parentStartTime < b.getParentStartTime() ? true :
63
            a.data.parentStartTime > b.getParentStartTime() ? false :
64
            a.data.parentExecutionOrderId < b.getParentExecutionOrderId() ? true :
65
            a.data.parentExecutionOrderId > b.getParentExecutionOrderId() ? false :
66 006e5abb Simon Tenbusch
            a.data.schedulingOrderId < b.getSchedulingOrderId() ? true :
67
            a.data.schedulingOrderId < b.getSchedulingOrderId() ? false :
68
            a.data.insertOrder < b.getInsertOrder();
69 01873262 Georg Kunz
}
70
71
cSpinningThreadPool::cSpinningThreadPool() :
72 6b81f4fa Simon Tenbusch
    useMainThread(false), barrierMin(-1)
73 01873262 Georg Kunz
{
74
    AO_store(&cancel, 0);
75
}
76
77
cSpinningThreadPool::~cSpinningThreadPool()
78
{
79
}
80
81
/**
82
 * Cleanup handler for thread cancelation. Used to unlock the mutex which
83
 * was locked by the thread before the call to pthread_cond_wait.
84
 */
85
void threadCleanup(void* arg)
86
{
87
}
88
89
void cSpinningThreadPool::doOneEvent(cMessage* msg)
90
{
91
    //
92
    // NOTE: possible race condition with de-register module which sets vect[...]
93
    //       to NULL. However, shouldn't be a problem since vect is ever increasing
94
    //
95
    cAsyncModule* mod = (cAsyncModule*) simulation.vect[msg->getArrivalModuleId()];
96
    cThreadPool::setDefaultOwner(mod);
97
98
#ifndef NOBARRIER
99
    cBarrierMessage* barrier = msg->getBarrier();
100
#endif
101
102
    //
103
    // call the module...
104
    //
105
    mod->callHandleAsyncMessage(msg);
106
107
108
    // indicate that the thread has finished
109
        //
110
        mod->unsetBusy();
111
112
    //
113
    // ... and lift the barrier
114
    // NOTE: this is actually a bit too early since the main thread might now
115
    //       continue, but still finds the flag of this thread set to busy
116
    //
117
#ifndef NOBARRIER
118
    barrier->signal();
119
#endif
120
}
121
122
void cSpinningThreadPool::worker()
123
{
124
        //Initialize Simtime. Not possible in constructor because Omnet needs to set the scaling exponent first.
125
        cThreadPool::getLocalData()->setSimTime(0.0);
126
127
    int threadId = 0;
128
    AO_t event;
129
130
    //
131
    // identify own ID
132
    //
133
    pthread_t id = pthread_self();
134
    for (unsigned int i = 0; i < numThreads; i++)
135
    {
136
        if (pthread_equal(id, workerIDs[i]))
137
        {
138
            threadId = i;
139
            break;
140
        }
141
    }
142
143
    //
144
    // start working
145
    //
146
    while (true)
147
    {
148
        //
149
        // wait for task
150
        //
151 b9e9c37a Simon Tenbusch
        while ((event = AO_load_read(&threadStates[threadId].data.msg)) == 0)
152 01873262 Georg Kunz
        {
153 96cc8d6f Simon Tenbusch
            __asm__ __volatile__ ("pause");
154 01873262 Georg Kunz
        }
155
156
        if (AO_load_read(&cancel) == 1)
157
            break;
158
159
        //
160
        // collect required data
161
        //
162
        cMessage* msg = (cMessage*) event;
163
164
        doOneEvent(msg);
165
166
        //
167
        // signal completion
168
        //
169 b9e9c37a Simon Tenbusch
        AO_store_write(&threadStates[threadId].data.msg, 0);
170 01873262 Georg Kunz
    }
171
}
172
173 b9e9c37a Simon Tenbusch
#define BARRIERVALID(i) (AO_load_read(&threadStates[i].data.msg) !=0)
174 01873262 Georg Kunz
175
// Waits till there is no pseudo barrier message at the beginning of the message queue.
176 2dd4eb12 Simon Tenbusch
void cSpinningThreadPool::waitAtBarrier(cMessageHeap* msgQueue)
177 01873262 Georg Kunz
{
178
    //
179
    // no need to block if there is no event left
180
    //
181
    cMessage* msg = msgQueue->peekFirst();
182
    if (!msg)
183 2dd4eb12 Simon Tenbusch
        return;
184 01873262 Georg Kunz
185
    //
186
    // block at the current (valid) barrier if needed
187
    //
188
    if (barrierMin >= 0)
189
    {
190 6aeda35d Georg Kunz
        if (threadStates[barrierMin] < *msg)
191 96cc8d6f Simon Tenbusch
            while (BARRIERVALID(barrierMin)) {__asm__ __volatile__ ("pause");}
192 01873262 Georg Kunz
        else
193 2dd4eb12 Simon Tenbusch
            return;
194 01873262 Georg Kunz
    }
195
196
    do
197
    {
198
        //
199
        // find next barrier
200
        //
201
        barrierMin = -1;
202
        for (unsigned int i = 0; i < numThreads; i++)
203
        {
204
            //
205
            // If the barrier we found is in use and we have either no valid min
206
            // pointer before or the one found now is smaller:
207
            //
208 6aeda35d Georg Kunz
            if (BARRIERVALID(i) && (barrierMin < 0 || threadStates[i] <= threadStates[barrierMin]))
209 01873262 Georg Kunz
            {
210
                // Then update the min pointer to the actually found item:
211
                barrierMin = i;
212
            }
213
        }
214
215
        //
216
        // Apparently, we have passed a barrier. If we had waited at this barrier for a longer time,
217
        // a new message might have arrived in between. For this reason we take a new message.
218
        // Note: We only take a new message if a barrier has been outdated, i.e. an event handler has finished.
219
        //
220
        cMessage* msg = msgQueue->peekFirst();
221
        if (!msg)
222 2dd4eb12 Simon Tenbusch
            return;
223 01873262 Georg Kunz
224
        //
225
        // wait at the next valid barrier if needed of return new barrier pointer
226
        //
227 6aeda35d Georg Kunz
        if (barrierMin >= 0 && threadStates[barrierMin] < *msg)
228 96cc8d6f Simon Tenbusch
            while ((BARRIERVALID(barrierMin))) {__asm__ __volatile__ ("pause");}
229 01873262 Georg Kunz
        else
230 2dd4eb12 Simon Tenbusch
            return;
231 01873262 Georg Kunz
232
    } while (true);
233
}
234
#undef BARRIERVALID
235 6b81f4fa Simon Tenbusch
/*
236
 * TODO: also update barrierMin if not vaild anymore
237
 */
238 2dd4eb12 Simon Tenbusch
bool cSpinningThreadPool::isBeforeBarrier(cMessage* msg) {
239 6aeda35d Georg Kunz
    return (barrierMin >=0 && threadStates[barrierMin] < *msg);
240 6b81f4fa Simon Tenbusch
    /*
241
    if (barrierMin >=0) {
242 6aeda35d Georg Kunz
        printf("barriertime=%f ",SIMTIME_DBL(threadStates[barrierMin].barrierTime));
243
        return (threadStates[barrierMin] < *msg);
244 6b81f4fa Simon Tenbusch
    }
245
    return false; //is this correct?*/
246 a3d116e3 Simon Tenbusch
}
247 01873262 Georg Kunz
248
Register_PerRunConfigOption(CFGID_SPINNING_THREADPOOL_THREAD_POOL_SIZE, "thread-pool-size", CFG_INT, "5", "Number of worker threads");
249
Register_PerRunConfigOption(CFGID_SPINNING_THREADPOOL_USE_MAIN_THREAD, "use-main-thread", CFG_BOOL, "false", "Use the Main Thread as a workerthread also");
250
251
void cSpinningThreadPool::activate()
252
{
253
    //
254
    // get configuration
255
    //
256
    useMainThread = ev.getConfig()->getAsBool(CFGID_SPINNING_THREADPOOL_USE_MAIN_THREAD);
257
    int numThreads = ev.getConfig()->getAsInt(CFGID_SPINNING_THREADPOOL_THREAD_POOL_SIZE);
258
259
    if (!useMainThread && numThreads == 0)
260
    {
261
        throw cRuntimeError("Thread pool size of 0 workers requested while "
262
                "use-main-thread==false. Noone would do any work. Exiting");
263
    }
264
265 6aeda35d Georg Kunz
    threadStates = new threadState_t[numThreads];
266 01873262 Georg Kunz
267
    for (int i = 0; i < numThreads; i++)
268 6aeda35d Georg Kunz
          memset(&threadStates[i], 0, sizeof(threadState_t));
269 01873262 Georg Kunz
270
    cThreadPool::activate();
271
}
272
273
void cSpinningThreadPool::shutdown()
274
{
275
    if (shutdownDone)
276
        return;
277
278
    //
279
    // we use "dummy" events to shutdown the threads
280
    //  i) wait for the thread to finish its current task (if any)
281
    // ii) set dummy value (1) to task to release thread again
282
    // The threads check the cancel flag after they have been released
283
    //
284
    AO_store_write(&cancel, 1);
285
    for (unsigned int i = 0; i < numThreads; i++)
286
    {
287
        std::cout << "waiting for thread " << i << std::endl;
288 b9e9c37a Simon Tenbusch
        while (AO_load_read(&threadStates[i].data.msg) != 0)
289 01873262 Georg Kunz
        {
290 96cc8d6f Simon Tenbusch
            __asm__  __volatile__ ("pause");
291 01873262 Georg Kunz
        }
292 b9e9c37a Simon Tenbusch
        AO_store_write(&threadStates[i].data.msg, 1);
293 01873262 Georg Kunz
        pthread_join(workerIDs[i], NULL);
294
    }
295
296
    //
297
    // yes, we delete workerIDs here although it was alloced in the super
298
    // class. Maybe dirty, but simpler.
299
    //
300
    delete[] workerIDs;
301
    delete[] threadStates;
302
    shutdownDone = true;
303
}
304
305 2dd4eb12 Simon Tenbusch
void cSpinningThreadPool::insertTask(cMessage* msg, simtime_t duration)
306 01873262 Georg Kunz
{
307
    int threadId;
308
    if (useMainThread)
309
        threadId = handleTaskWorkersAndMain(msg);
310
    else
311
        threadId = handleTaskWorkersOnly(msg);
312
313
    //
314
    // Did we assign it to a worker thread (or did we handle it ourselves)?
315
    //
316
    if (threadId >= 0)
317
    {
318
#ifdef NOBARRIER
319 b9e9c37a Simon Tenbusch
        threadStates[threadId].data.barrierTime = msg->getArrivalTime() + duration;
320
        threadStates[threadId].data.priority = msg->getSchedulingPriority();
321 e1d5f2c9 Simon Tenbusch
        threadStates[threadId].data.parentStartTime = msg->getArrivalTime();
322
        threadStates[threadId].data.parentExecutionOrderId = msg->getExecutionOrderId();
323 77cb9420 Simon Tenbusch
        threadStates[threadId].data.schedulingOrderId = 0;
324 006e5abb Simon Tenbusch
        threadStates[threadId].data.insertOrder = msg->getInsertOrder();
325 01873262 Georg Kunz
#endif
326 b9e9c37a Simon Tenbusch
        AO_store_write(&threadStates[threadId].data.msg, (size_t) msg);
327 01873262 Georg Kunz
    }
328
329
    // If barrierMin==threadId, we utilized the CPU with the currently active barrier again
330
    // Thus, we are not sure what to set the minPointer to, and we invalidate it.
331
    // This causes waitAtBarrier() to search for a new one.
332 2dd4eb12 Simon Tenbusch
    if(barrierMin==threadId) {
333
        barrierMin = -1;
334
        return;
335
    }
336 01873262 Georg Kunz
337
    //
338
    // If we have either no valid min pointer before or the barrier just set is smaller:
339
    //
340 6aeda35d Georg Kunz
    if (barrierMin < 0 || threadStates[threadId] <= threadStates[barrierMin])
341 2dd4eb12 Simon Tenbusch
    {
342 01873262 Georg Kunz
        //
343
        // Then update the min pointer to the actually set item:
344
        //
345 2dd4eb12 Simon Tenbusch
        barrierMin = threadId;
346
        return;
347
    }
348 01873262 Georg Kunz
    else
349 2dd4eb12 Simon Tenbusch
        return;
350 01873262 Georg Kunz
}
351
352
int cSpinningThreadPool::handleTaskWorkersOnly(cMessage* msg)
353
{
354
    //
355
    // cycle through threads and find an idling one
356
    //
357
    unsigned int i = 0;
358 b9e9c37a Simon Tenbusch
    while (AO_load_read(&threadStates[i].data.msg) != 0)
359 01873262 Georg Kunz
    {
360
        i++;
361
        i %= numThreads;
362
    }
363
364
    return i;
365
}
366
367
int cSpinningThreadPool::handleTaskWorkersAndMain(cMessage *msg)
368
{
369
    //
370
    // cycle through threads and find an idling one
371
    //
372
    for (unsigned int i = 0; i < numThreads; i++)
373
    {
374 b9e9c37a Simon Tenbusch
        if (AO_load_read(&threadStates[i].data.msg) == 0)
375 01873262 Georg Kunz
        {
376
            //
377
            // assign message to this worker
378
            //
379
            return i;
380
        }
381
    }
382
383
    //
384
    // all threads busy, do it yourself
385
    // NOTE: if we have to handle a highly complex event here, we might block
386
    //       while we cannot pipe new events to the workers
387
    //
388
    doOneEvent(msg);
389
    return -1;
390
}
391
392
bool cSpinningThreadPool::barrierEmpty()
393
{
394
    // TODO: replace with global counter? -> cache contention?
395
    for (unsigned int i = 0; i < numThreads; i++)
396 b9e9c37a Simon Tenbusch
        if (AO_load_read(&threadStates[i].data.msg) != 0)
397 01873262 Georg Kunz
            return false;
398
    return true;
399
}