Statistics
| Branch: | Revision:

root / src / sim / clockedthreadpool.cc @ e1750c09

History | View | Annotate | Download (4.87 KB)

1 01873262 Georg Kunz
//=========================================================================
2
//  CLOCKEDTHREADPOOL.CC - part of
3
//
4
//                  Horizon/OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//
8
//   Member functions of
9
//    cLockedThreadPool : implements a pool of worker threads over a work queue.
10
//                        Synchronization is achieved using libpthread.
11
//
12
//  Author: Georg Kunz
13
//
14
//=========================================================================
15
16
/*--------------------------------------------------------------*
17
  Copyright (C) 2010 Georg Kunz
18

19
  This file is distributed WITHOUT ANY WARRANTY. See the file
20
  `license' for details on this and other legal matters.
21
*--------------------------------------------------------------*/
22
23
#include "clockedthreadpool.h"
24
#include "cmessage.h"
25
#include "cconfiguration.h"
26
#include "cdefaultlist.h"
27
#include "cbarriermessage.h"
28 fbe00e73 Mirko Stoffers
#include "csimplemodule.h"
29 01873262 Georg Kunz
30
//#define TIMING_DEBUG
31
32
#ifdef TIMING_DEBUG
33
#include <time.h>
34
#include <iostream>
35
#endif
36
37
38
cLockedThreadPool::cLockedThreadPool() :
39
    tasks(),
40
    shutdownDone(false)
41
{
42
    pthread_mutex_init(&mutex, NULL);
43
    pthread_cond_init(&nonEmpty, NULL);
44
45
    pthread_mutex_init(&condMutex, NULL);
46
    pthread_cond_init(&threadsDone, NULL);
47
}
48
49
50
cLockedThreadPool::~cLockedThreadPool()
51
{
52
    pthread_mutex_destroy(&mutex);
53
    pthread_cond_destroy(&nonEmpty);
54
55
    pthread_mutex_destroy(&condMutex);
56
    pthread_cond_destroy(&threadsDone);
57
}
58
59
60
#ifdef TIMING_DEBUG
61
static int delta_t(struct timespec *interval, struct timespec *begin, struct timespec *now)
62
{
63
    interval->tv_nsec = now->tv_nsec - begin->tv_nsec; /* Subtract 'decimal fraction' first */
64
    if(interval->tv_nsec < 0 ){
65
        interval->tv_nsec += 1000000000; /* Borrow 1sec from 'tv_sec' if subtraction -ve */
66
        interval->tv_sec = now->tv_sec - begin->tv_sec - 1; /* Subtract whole number of seconds and return 1 */
67
        return (1);
68
    }
69
    else{
70
        interval->tv_sec = now->tv_sec - begin->tv_sec; /* Subtract whole number of seconds and return 0 */
71
        return (0);
72
    }
73
}
74
#endif
75
76
/**
77
 * Cleanup handler for thread cancellation. Used to unlock the mutex which
78
 * was locked by the thread before the call to pthread_cond_wait.
79
 */
80
static void threadCleanup(void* arg)
81
{
82
    pthread_mutex_unlock((pthread_mutex_t*)arg);
83
}
84
85
86
void cLockedThreadPool::worker()
87
{
88
    //
89
        // Initialize Simtime. Not possible in constructor because OMNeT needs to set
90
    // the scaling exponent first.
91
    //
92
        cThreadPool::getLocalData()->setSimTime(0.0);
93
94
    cMessage* msg;
95
    cBarrierMessage* barrier;
96
97
    //
98
    // set cancellation handler. Needed for clean exit without deadlocks.
99
    //
100
    pthread_cleanup_push(threadCleanup, &mutex);
101
    while (true)
102
    {
103
        //
104
        // get next event from the task queue.
105
        //
106
        pthread_mutex_lock(&mutex);
107
        while (tasks.empty())
108
        {
109
            pthread_cond_wait(&nonEmpty, &mutex);
110
        }
111
112
        msg = tasks.front();
113
        tasks.pop_front();
114
115
        pthread_mutex_unlock(&mutex);
116
117
        //
118
        // collect required data
119
        // NOTE: possible race condition with deregister module which sets vect[...]
120
        //       to NULL. However, shouldn't be a problem since vect is ever increasing
121
        //
122 fbe00e73 Mirko Stoffers
        cSimpleModule* mod = (cSimpleModule*)simulation.vect[msg->getArrivalModuleId()];
123 01873262 Georg Kunz
        cThreadPool::setDefaultOwner(mod);
124
125
#ifdef TIMING_DEBUG
126
        // for profiling
127
        struct timespec then, now, diff;
128
        clock_gettime(CLOCK_REALTIME, &then);
129
#endif
130
131
        barrier = msg->getBarrier();
132
133
        //
134
        // call the module
135
        //
136
        mod->callHandleAsyncMessage(msg);
137
138
#ifdef TIMING_DEBUG
139
        clock_gettime(CLOCK_REALTIME, &now);
140
        delta_t(&diff, &then, &now);
141
        std::cout << diff.tv_sec + diff.tv_nsec / 1000000000.0 << std::endl;
142
#endif
143
        //
144
        // indicate that the thread has finished
145
                //
146
                mod->unsetBusy();
147
148
        //
149
        // signal completion
150
        //
151
        barrier->signal();
152
    }
153
154
    //
155
    // unset cancellation handler
156
    //
157
    pthread_cleanup_pop(1);
158
}
159
160
161 2dd4eb12 Simon Tenbusch
void cLockedThreadPool::insertTask(cMessage* msg, simtime_t)
162 01873262 Georg Kunz
{
163
    pthread_mutex_lock(&mutex);
164
    tasks.push_back(msg);
165
    pthread_cond_signal(&nonEmpty);
166
    pthread_mutex_unlock(&mutex);
167 2dd4eb12 Simon Tenbusch
        return;
168 01873262 Georg Kunz
}
169
170
171
void cLockedThreadPool::shutdown()
172
{
173
        pthread_mutex_lock(&mutex);
174
        tasks.clear();
175
        pthread_mutex_unlock(&mutex);
176
        cThreadPool::shutdown();
177
}
178
179
180 2dd4eb12 Simon Tenbusch
void cLockedThreadPool::waitAtBarrier(cMessageHeap* msgQueue)
181 01873262 Georg Kunz
{
182
        throw cRuntimeError("Invalid parameter combination: LockedThreadpool cannot be combined with NOBARRIER!");
183
}
184
185
186
bool cLockedThreadPool::barrierEmpty()
187
{
188
    throw cRuntimeError("Invalid parameter combination: LockedThreadpool cannot be combined with NOBARRIER!");
189
}
190 6b81f4fa Simon Tenbusch
191
bool cLockedThreadPool::isBeforeBarrier(cMessage* msg)
192
{
193
    throw cRuntimeError("Invalid parameter combination: LockedThreadpool cannot be combined with NOBARRIER!");
194
}