Statistics
| Branch: | Revision:

root / src / sim / cspinningthreadpool.cc @ 6b81f4fa

History | View | Annotate | Download (10.1 KB)

1
//=========================================================================
2
//  CSPINNINGTHREADPOOL.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//
8
//   Member functions of
9
//    cSpinningThreadPool : implements a pool of worker threads
10
//
11
//  Author: Georg Kunz
12
//
13
//=========================================================================
14

    
15
/*--------------------------------------------------------------*
16
 Copyright (C) 2010 Georg Kunz
17

18
 This file is distributed WITHOUT ANY WARRANTY. See the file
19
 `license' for details on this and other legal matters.
20
 *--------------------------------------------------------------*/
21
#include <string.h>
22

    
23
#include "cspinningthreadpool.h"
24
#include "cmessage.h"
25
#include "cconfiguration.h"
26
#include "casyncmodule.h"
27
#include "cconfigoption.h"
28
#ifndef NOBARRIER
29
#        include "cbarriermessage.h"
30
#endif
31

    
32
inline int operator <=(cSpinningThreadPool::threadState_t& a, cSpinningThreadPool::threadState_t& b)
33
{
34
    return (a.barrierTime < b.barrierTime) ? 1 :
35
           (a.barrierTime > b.barrierTime) ? 0 :
36
            a.priority <= b.priority;
37
}
38

    
39
inline int operator <(cSpinningThreadPool::threadState_t& a, cMessage& b)
40
{
41
    return (a.barrierTime < b.getArrivalTime()) ? 1 :
42
           (a.barrierTime > b.getArrivalTime()) ? 0 :
43
            a.priority < b.getSchedulingPriority();
44
}
45

    
46
cSpinningThreadPool::cSpinningThreadPool() :
47
    useMainThread(false), barrierMin(-1)
48
{
49
    AO_store(&cancel, 0);
50
}
51

    
52
cSpinningThreadPool::~cSpinningThreadPool()
53
{
54
}
55

    
56
/**
57
 * Cleanup handler for thread cancelation. Used to unlock the mutex which
58
 * was locked by the thread before the call to pthread_cond_wait.
59
 */
60
void threadCleanup(void* arg)
61
{
62
}
63

    
64
void cSpinningThreadPool::doOneEvent(cMessage* msg)
65
{
66
    //
67
    // NOTE: possible race condition with de-register module which sets vect[...]
68
    //       to NULL. However, shouldn't be a problem since vect is ever increasing
69
    //
70
    cAsyncModule* mod = (cAsyncModule*) simulation.vect[msg->getArrivalModuleId()];
71
    cThreadPool::setDefaultOwner(mod);
72

    
73
#ifndef NOBARRIER
74
    cBarrierMessage* barrier = msg->getBarrier();
75
#endif
76

    
77
    //
78
    // call the module...
79
    //
80
    mod->callHandleAsyncMessage(msg);
81

    
82

    
83
    // indicate that the thread has finished
84
        //
85
        mod->unsetBusy();
86

    
87
    //
88
    // ... and lift the barrier
89
    // NOTE: this is actually a bit too early since the main thread might now
90
    //       continue, but still finds the flag of this thread set to busy
91
    //
92
#ifndef NOBARRIER
93
    barrier->signal();
94
#endif
95
}
96

    
97
void cSpinningThreadPool::worker()
98
{
99
        //Initialize Simtime. Not possible in constructor because Omnet needs to set the scaling exponent first.
100
        cThreadPool::getLocalData()->setSimTime(0.0);
101

    
102
    int threadId = 0;
103
    AO_t event;
104

    
105
    //
106
    // identify own ID
107
    //
108
    pthread_t id = pthread_self();
109
    for (unsigned int i = 0; i < numThreads; i++)
110
    {
111
        if (pthread_equal(id, workerIDs[i]))
112
        {
113
            threadId = i;
114
            break;
115
        }
116
    }
117

    
118
    //
119
    // start working
120
    //
121
    while (true)
122
    {
123
        //
124
        // wait for task
125
        //
126
        while ((event = AO_load_read(&threadStates[threadId * SPACING].msg)) == 0)
127
        {
128
            __asm__ ("pause");
129
        }
130

    
131
        if (AO_load_read(&cancel) == 1)
132
            break;
133

    
134
        //
135
        // collect required data
136
        //
137
        cMessage* msg = (cMessage*) event;
138

    
139
        doOneEvent(msg);
140

    
141
        //
142
        // signal completion
143
        //
144
        AO_store_write(&threadStates[threadId * SPACING].msg, 0);
145
    }
146
}
147

    
148
#define BARRIERVALID(i) (AO_load_read(&threadStates[i*SPACING].msg) !=0)
149

    
150
// Waits till there is no pseudo barrier message at the beginning of the message queue.
151
void cSpinningThreadPool::waitAtBarrier(cMessageHeap* msgQueue)
152
{
153
    //
154
    // no need to block if there is no event left
155
    //
156
    cMessage* msg = msgQueue->peekFirst();
157
    if (!msg)
158
        return;
159

    
160
    //
161
    // block at the current (valid) barrier if needed
162
    //
163
    if (barrierMin >= 0)
164
    {
165
        if (threadStates[barrierMin*SPACING] < *msg)
166
            while (BARRIERVALID(barrierMin)) {__asm__ ("pause");}
167
        else
168
            return;
169
    }
170

    
171
    do
172
    {
173
        //
174
        // find next barrier
175
        //
176
        barrierMin = -1;
177
        for (unsigned int i = 0; i < numThreads; i++)
178
        {
179
            //
180
            // If the barrier we found is in use and we have either no valid min
181
            // pointer before or the one found now is smaller:
182
            //
183
            if (BARRIERVALID(i) && (barrierMin < 0 || threadStates[i*SPACING] <= threadStates[barrierMin*SPACING]))
184
            {
185
                // Then update the min pointer to the actually found item:
186
                barrierMin = i;
187
            }
188
        }
189

    
190
        //
191
        // Apparently, we have passed a barrier. If we had waited at this barrier for a longer time,
192
        // a new message might have arrived in between. For this reason we take a new message.
193
        // Note: We only take a new message if a barrier has been outdated, i.e. an event handler has finished.
194
        //
195
        cMessage* msg = msgQueue->peekFirst();
196
        if (!msg)
197
            return;
198

    
199
        //
200
        // wait at the next valid barrier if needed of return new barrier pointer
201
        //
202
        if (barrierMin >= 0 && threadStates[barrierMin*SPACING] < *msg)
203
            while ((BARRIERVALID(barrierMin))) {__asm__ ("pause");}
204
        else
205
            return;
206

    
207
    } while (true);
208
}
209
#undef BARRIERVALID
210
/*
211
 * TODO: also update barrierMin if not vaild anymore
212
 */
213
bool cSpinningThreadPool::isBeforeBarrier(cMessage* msg) {
214
    return (barrierMin >=0 && threadStates[barrierMin*SPACING] < *msg);
215
    /*
216
    if (barrierMin >=0) {
217
        printf("barriertime=%f ",SIMTIME_DBL(threadStates[barrierMin*SPACING].barrierTime));
218
        return (threadStates[barrierMin*SPACING] < *msg);
219
    }
220
    return false; //is this correct?*/
221
}
222

    
223
Register_PerRunConfigOption(CFGID_SPINNING_THREADPOOL_THREAD_POOL_SIZE, "thread-pool-size", CFG_INT, "5", "Number of worker threads");
224
Register_PerRunConfigOption(CFGID_SPINNING_THREADPOOL_USE_MAIN_THREAD, "use-main-thread", CFG_BOOL, "false", "Use the Main Thread as a workerthread also");
225

    
226
void cSpinningThreadPool::activate()
227
{
228
    //
229
    // get configuration
230
    //
231
    useMainThread = ev.getConfig()->getAsBool(CFGID_SPINNING_THREADPOOL_USE_MAIN_THREAD);
232
    int numThreads = ev.getConfig()->getAsInt(CFGID_SPINNING_THREADPOOL_THREAD_POOL_SIZE);
233

    
234
    if (!useMainThread && numThreads == 0)
235
    {
236
        throw cRuntimeError("Thread pool size of 0 workers requested while "
237
                "use-main-thread==false. Noone would do any work. Exiting");
238
    }
239

    
240
    threadStates = new threadState_t[numThreads * SPACING];
241

    
242
    for (int i = 0; i < numThreads; i++)
243
          memset(&threadStates[i*SPACING], 0, sizeof(threadState_t));
244

    
245
    cThreadPool::activate();
246
}
247

    
248
void cSpinningThreadPool::shutdown()
249
{
250
    if (shutdownDone)
251
        return;
252

    
253
    //
254
    // we use "dummy" events to shutdown the threads
255
    //  i) wait for the thread to finish its current task (if any)
256
    // ii) set dummy value (1) to task to release thread again
257
    // The threads check the cancel flag after they have been released
258
    //
259
    AO_store_write(&cancel, 1);
260
    for (unsigned int i = 0; i < numThreads; i++)
261
    {
262
        std::cout << "waiting for thread " << i << std::endl;
263
        while (AO_load_read(&threadStates[i * SPACING].msg) != 0)
264
        {
265
            __asm__ ("pause");
266
        }
267
        AO_store_write(&threadStates[i * SPACING].msg, 1);
268
        pthread_join(workerIDs[i], NULL);
269
    }
270

    
271
    //
272
    // yes, we delete workerIDs here although it was alloced in the super
273
    // class. Maybe dirty, but simpler.
274
    //
275
    delete[] workerIDs;
276
    delete[] threadStates;
277
    shutdownDone = true;
278
}
279

    
280
void cSpinningThreadPool::insertTask(cMessage* msg, simtime_t duration)
281
{
282
    int threadId;
283
    if (useMainThread)
284
        threadId = handleTaskWorkersAndMain(msg);
285
    else
286
        threadId = handleTaskWorkersOnly(msg);
287

    
288
    //
289
    // Did we assign it to a worker thread (or did we handle it ourselves)?
290
    //
291
    if (threadId >= 0)
292
    {
293
#ifdef NOBARRIER
294
        threadStates[threadId * SPACING].barrierTime = msg->getArrivalTime() + duration;
295
        threadStates[threadId * SPACING].priority = msg->getSchedulingPriority();
296
#endif
297
        AO_store_write(&threadStates[threadId * SPACING].msg, (size_t) msg);
298
    }
299

    
300
    // If barrierMin==threadId, we utilized the CPU with the currently active barrier again
301
    // Thus, we are not sure what to set the minPointer to, and we invalidate it.
302
    // This causes waitAtBarrier() to search for a new one.
303
    if(barrierMin==threadId) {
304
        barrierMin = -1;
305
        return;
306
    }
307

    
308
    //
309
    // If we have either no valid min pointer before or the barrier just set is smaller:
310
    //
311
    if (barrierMin < 0 || threadStates[threadId * SPACING] <= threadStates[barrierMin * SPACING])
312
    {
313
        //
314
        // Then update the min pointer to the actually set item:
315
        //
316
        barrierMin = threadId;
317
        return;
318
    }
319
    else
320
        return;
321
}
322

    
323
int cSpinningThreadPool::handleTaskWorkersOnly(cMessage* msg)
324
{
325
    //
326
    // cycle through threads and find an idling one
327
    //
328
    unsigned int i = 0;
329
    while (AO_load_read(&threadStates[i * SPACING].msg) != 0)
330
    {
331
        i++;
332
        i %= numThreads;
333
    }
334

    
335
    return i;
336
}
337

    
338
int cSpinningThreadPool::handleTaskWorkersAndMain(cMessage *msg)
339
{
340
    //
341
    // cycle through threads and find an idling one
342
    //
343
    for (unsigned int i = 0; i < numThreads; i++)
344
    {
345
        if (AO_load_read(&threadStates[i * SPACING].msg) == 0)
346
        {
347
            //
348
            // assign message to this worker
349
            //
350
            return i;
351
        }
352
    }
353

    
354
    //
355
    // all threads busy, do it yourself
356
    // NOTE: if we have to handle a highly complex event here, we might block
357
    //       while we cannot pipe new events to the workers
358
    //
359
    doOneEvent(msg);
360
    return -1;
361
}
362

    
363
bool cSpinningThreadPool::barrierEmpty()
364
{
365
    // TODO: replace with global counter? -> cache contention?
366
    for (unsigned int i = 0; i < numThreads; i++)
367
        if (AO_load_read(&threadStates[i * SPACING].msg) != 0)
368
            return false;
369
    return true;
370
}