Statistics
| Branch: | Revision:

root / src / sim / cspinningthreadpool.cc @ fbe00e73

History | View | Annotate | Download (11.9 KB)

1
//=========================================================================
2
//  CSPINNINGTHREADPOOL.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//
8
//   Member functions of
9
//    cSpinningThreadPool : implements a pool of worker threads
10
//
11
//  Author: Georg Kunz
12
//
13
//=========================================================================
14

    
15
/*--------------------------------------------------------------*
16
 Copyright (C) 2010 Georg Kunz
17

18
 This file is distributed WITHOUT ANY WARRANTY. See the file
19
 `license' for details on this and other legal matters.
20
 *--------------------------------------------------------------*/
21
#include <string.h>
22

    
23
#include "cspinningthreadpool.h"
24
#include "cmessage.h"
25
#include "cconfiguration.h"
26
#include "cconfigoption.h"
27
#ifndef NOBARRIER
28
#        include "cbarriermessage.h"
29
#endif
30

    
31
inline bool operator <=(cSpinningThreadPool::threadState_t& a, cSpinningThreadPool::threadState_t& b)
32
{
33
    return  a.data.barrierTime < b.data.barrierTime ? true :
34
            a.data.barrierTime > b.data.barrierTime ? false :
35
            a.data.priority < b.data.priority ? true :
36
            a.data.priority > b.data.priority ? false :
37
            a.data.parentStartTime < b.data.parentStartTime ? true :
38
            a.data.parentStartTime > b.data.parentStartTime ? false :
39
            a.data.parentExecutionOrderId < b.data.parentExecutionOrderId ? true :
40
            a.data.parentExecutionOrderId > b.data.parentExecutionOrderId ? false :
41
            a.data.schedulingOrderId < b.data.schedulingOrderId ? true :
42
            a.data.schedulingOrderId > b.data.schedulingOrderId ? false :
43
            a.data.insertOrder <= b.data.insertOrder;
44
}
45

    
46
inline bool operator <(cSpinningThreadPool::threadState_t& a, cMessage& b)
47
{
48
    //Assertion that we never have to "equal" events according to ordering
49
    assert(!(a.data.barrierTime == b.getArrivalTime() &&
50
             a.data.priority == b.getSchedulingPriority() &&
51
             a.data.parentStartTime == b.getParentStartTime() &&
52
             a.data.parentExecutionOrderId == b.getParentExecutionOrderId() &&
53
             a.data.schedulingOrderId == b.getSchedulingOrderId() &&
54
             a.data.insertOrder == b.getInsertOrder() )
55
             );
56

    
57
    return  a.data.barrierTime < b.getArrivalTime() ? true :
58
            a.data.barrierTime > b.getArrivalTime() ? false :
59
            a.data.priority < b.getSchedulingPriority() ? true :
60
            a.data.priority > b.getSchedulingPriority() ? false :
61
            a.data.parentStartTime < b.getParentStartTime() ? true :
62
            a.data.parentStartTime > b.getParentStartTime() ? false :
63
            a.data.parentExecutionOrderId < b.getParentExecutionOrderId() ? true :
64
            a.data.parentExecutionOrderId > b.getParentExecutionOrderId() ? false :
65
            a.data.schedulingOrderId < b.getSchedulingOrderId() ? true :
66
            a.data.schedulingOrderId > b.getSchedulingOrderId() ? false :
67
            a.data.insertOrder < b.getInsertOrder();
68
}
69

    
70
cSpinningThreadPool::cSpinningThreadPool() :
71
    useMainThread(false), barrierMin(-1)
72
{
73
    AO_store(&cancel, 0);
74
}
75

    
76
cSpinningThreadPool::~cSpinningThreadPool()
77
{
78
}
79

    
80
/**
81
 * Cleanup handler for thread cancelation. Used to unlock the mutex which
82
 * was locked by the thread before the call to pthread_cond_wait.
83
 */
84
void threadCleanup(void* arg)
85
{
86
}
87

    
88
void cSpinningThreadPool::doOneEvent(cMessage* msg)
89
{
90
    //
91
    // NOTE: possible race condition with de-register module which sets vect[...]
92
    //       to NULL. However, shouldn't be a problem since vect is ever increasing
93
    //
94
    cSimpleModule* mod = (cSimpleModule*) simulation.vect[msg->getArrivalModuleId()];
95
    cThreadPool::setDefaultOwner(mod);
96

    
97
#ifndef NOBARRIER
98
    cBarrierMessage* barrier = msg->getBarrier();
99
#endif
100

    
101
    //
102
    // call the module...
103
    //
104
    mod->callHandleAsyncMessage(msg);
105

    
106

    
107
    // indicate that the thread has finished
108
        //
109
        mod->unsetBusy();
110

    
111
    //
112
    // ... and lift the barrier
113
    // NOTE: this is actually a bit too early since the main thread might now
114
    //       continue, but still finds the flag of this thread set to busy
115
    //
116
#ifndef NOBARRIER
117
    barrier->signal();
118
#endif
119
}
120

    
121
void cSpinningThreadPool::worker()
122
{
123
        //Initialize Simtime. Not possible in constructor because Omnet needs to set the scaling exponent first.
124
        cThreadPool::getLocalData()->setSimTime(0.0);
125

    
126
    int threadId = 0;
127
    AO_t event;
128

    
129
    //
130
    // identify own ID
131
    //
132
    pthread_t id = pthread_self();
133
    for (unsigned int i = 0; i < numThreads; i++)
134
    {
135
        if (pthread_equal(id, workerIDs[i]))
136
        {
137
            threadId = i;
138
            break;
139
        }
140
    }
141

    
142
    //
143
    // start working
144
    //
145
    while (true)
146
    {
147
        //
148
        // wait for task
149
        //
150
        while ((event = AO_load_read(&threadStates[threadId].data.msg)) == 0)
151
        {
152
            __asm__ __volatile__ ("pause");
153
        }
154

    
155
        if (AO_load_read(&cancel) == 1)
156
            break;
157

    
158
        //
159
        // collect required data
160
        //
161
        cMessage* msg = (cMessage*) event;
162

    
163
        doOneEvent(msg);
164

    
165
        //
166
        // signal completion
167
        //
168
        AO_store_write(&threadStates[threadId].data.msg, 0);
169
    }
170
}
171

    
172
#define BARRIERVALID(i) (AO_load_read(&threadStates[i].data.msg) !=0)
173

    
174
// Waits till there is no pseudo barrier message at the beginning of the message queue.
175
void cSpinningThreadPool::waitAtBarrier(cMessageHeap* msgQueue)
176
{
177
    //
178
    // no need to block if there is no event left
179
    //
180
    cMessage* msg = msgQueue->peekFirst();
181
    if (!msg)
182
        return;
183

    
184
    //
185
    // block at the current (valid) barrier if needed
186
    //
187
    if (barrierMin >= 0)
188
    {
189
        if (threadStates[barrierMin] < *msg)
190
            while (BARRIERVALID(barrierMin)) {__asm__ __volatile__ ("pause");}
191
        else
192
            return;
193
    }
194

    
195
    do
196
    {
197
        //
198
        // find next barrier
199
        //
200
        barrierMin = -1;
201
        for (unsigned int i = 0; i < numThreads; i++)
202
        {
203
            //
204
            // If the barrier we found is in use and we have either no valid min
205
            // pointer before or the one found now is smaller:
206
            //
207
            if (BARRIERVALID(i) && (barrierMin < 0 || threadStates[i] <= threadStates[barrierMin]))
208
            {
209
                // Then update the min pointer to the actually found item:
210
                barrierMin = i;
211
            }
212
        }
213

    
214
        //
215
        // Apparently, we have passed a barrier. If we had waited at this barrier for a longer time,
216
        // a new message might have arrived in between. For this reason we take a new message.
217
        // Note: We only take a new message if a barrier has been outdated, i.e. an event handler has finished.
218
        //
219
        cMessage* msg = msgQueue->peekFirst();
220
        if (!msg)
221
            return;
222

    
223
        //
224
        // wait at the next valid barrier if needed of return new barrier pointer
225
        //
226
        if (barrierMin >= 0 && threadStates[barrierMin] < *msg)
227
            while ((BARRIERVALID(barrierMin))) {__asm__ __volatile__ ("pause");}
228
        else
229
            return;
230

    
231
    } while (true);
232
}
233
#undef BARRIERVALID
234
/*
235
 * TODO: also update barrierMin if not vaild anymore
236
 */
237
bool cSpinningThreadPool::isBeforeBarrier(cMessage* msg) {
238
    return (barrierMin >=0 && threadStates[barrierMin] < *msg);
239
    /*
240
    if (barrierMin >=0) {
241
        printf("barriertime=%f ",SIMTIME_DBL(threadStates[barrierMin].barrierTime));
242
        return (threadStates[barrierMin] < *msg);
243
    }
244
    return false; //is this correct?*/
245
}
246

    
247
Register_PerRunConfigOption(CFGID_SPINNING_THREADPOOL_THREAD_POOL_SIZE, "thread-pool-size", CFG_INT, "5", "Number of worker threads");
248
Register_PerRunConfigOption(CFGID_SPINNING_THREADPOOL_USE_MAIN_THREAD, "use-main-thread", CFG_BOOL, "false", "Use the Main Thread as a workerthread also");
249

    
250
void cSpinningThreadPool::activate()
251
{
252
    //
253
    // get configuration
254
    //
255
    useMainThread = ev.getConfig()->getAsBool(CFGID_SPINNING_THREADPOOL_USE_MAIN_THREAD);
256
    int numThreads = ev.getConfig()->getAsInt(CFGID_SPINNING_THREADPOOL_THREAD_POOL_SIZE);
257

    
258
    if (!useMainThread && numThreads == 0)
259
    {
260
        throw cRuntimeError("Thread pool size of 0 workers requested while "
261
                "use-main-thread==false. Noone would do any work. Exiting");
262
    }
263

    
264
    threadStates = new threadState_t[numThreads];
265

    
266
    for (int i = 0; i < numThreads; i++)
267
          memset(&threadStates[i], 0, sizeof(threadState_t));
268

    
269
    cThreadPool::activate();
270
}
271

    
272
void cSpinningThreadPool::shutdown()
273
{
274
    if (shutdownDone)
275
        return;
276

    
277
    //
278
    // we use "dummy" events to shutdown the threads
279
    //  i) wait for the thread to finish its current task (if any)
280
    // ii) set dummy value (1) to task to release thread again
281
    // The threads check the cancel flag after they have been released
282
    //
283
    AO_store_write(&cancel, 1);
284
    for (unsigned int i = 0; i < numThreads; i++)
285
    {
286
        std::cout << "waiting for thread " << i << std::endl;
287
        while (AO_load_read(&threadStates[i].data.msg) != 0)
288
        {
289
            __asm__  __volatile__ ("pause");
290
        }
291
        AO_store_write(&threadStates[i].data.msg, 1);
292
        pthread_join(workerIDs[i], NULL);
293
    }
294

    
295
    //
296
    // yes, we delete workerIDs here although it was alloced in the super
297
    // class. Maybe dirty, but simpler.
298
    //
299
    delete[] workerIDs;
300
    delete[] threadStates;
301
    shutdownDone = true;
302
}
303

    
304
void cSpinningThreadPool::insertTask(cMessage* msg, simtime_t duration)
305
{
306
    int threadId;
307
    if (useMainThread)
308
        threadId = handleTaskWorkersAndMain(msg);
309
    else
310
        threadId = handleTaskWorkersOnly(msg);
311

    
312
    //
313
    // Did we assign it to a worker thread (or did we handle it ourselves)?
314
    //
315
    if (threadId >= 0)
316
    {
317
#ifdef NOBARRIER
318
        threadStates[threadId].data.barrierTime = msg->getArrivalTime() + duration;
319
        threadStates[threadId].data.priority = msg->getSchedulingPriority();
320
        threadStates[threadId].data.parentStartTime = msg->getArrivalTime();
321
        threadStates[threadId].data.parentExecutionOrderId = msg->getExecutionOrderId();
322
        threadStates[threadId].data.schedulingOrderId = 0;
323
        threadStates[threadId].data.insertOrder = msg->getInsertOrder();
324
#endif
325
        AO_store_write(&threadStates[threadId].data.msg, (size_t) msg);
326
    }
327

    
328
    // If barrierMin==threadId, we utilized the CPU with the currently active barrier again
329
    // Thus, we are not sure what to set the minPointer to, and we invalidate it.
330
    // This causes waitAtBarrier() to search for a new one.
331
    if(barrierMin==threadId) {
332
        barrierMin = -1;
333
        return;
334
    }
335

    
336
    //
337
    // If we have either no valid min pointer before or the barrier just set is smaller:
338
    //
339
    if (barrierMin < 0 || threadStates[threadId] <= threadStates[barrierMin])
340
    {
341
        //
342
        // Then update the min pointer to the actually set item:
343
        //
344
        barrierMin = threadId;
345
        return;
346
    }
347
    else
348
        return;
349
}
350

    
351
int cSpinningThreadPool::handleTaskWorkersOnly(cMessage* msg)
352
{
353
    //
354
    // cycle through threads and find an idling one
355
    //
356
    unsigned int i = 0;
357
    while (AO_load_read(&threadStates[i].data.msg) != 0)
358
    {
359
        i++;
360
        i %= numThreads;
361
    }
362

    
363
    return i;
364
}
365

    
366
int cSpinningThreadPool::handleTaskWorkersAndMain(cMessage *msg)
367
{
368
    //
369
    // cycle through threads and find an idling one
370
    //
371
    for (unsigned int i = 0; i < numThreads; i++)
372
    {
373
        if (AO_load_read(&threadStates[i].data.msg) == 0)
374
        {
375
            //
376
            // assign message to this worker
377
            //
378
            return i;
379
        }
380
    }
381

    
382
    //
383
    // all threads busy, do it yourself
384
    // NOTE: if we have to handle a highly complex event here, we might block
385
    //       while we cannot pipe new events to the workers
386
    //
387
    doOneEvent(msg);
388
    return -1;
389
}
390

    
391
bool cSpinningThreadPool::barrierEmpty()
392
{
393
    // TODO: replace with global counter? -> cache contention?
394
    for (unsigned int i = 0; i < numThreads; i++)
395
        if (AO_load_read(&threadStates[i].data.msg) != 0)
396
            return false;
397
    return true;
398
}