Statistics
| Branch: | Revision:

root / src / sim / cthreadpool.cc @ e1750c09

History | View | Annotate | Download (4.8 KB)

1
//=========================================================================
2
//  CTHREADPOOL.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//
8
//   Member functions of
9
//    cThreadPool : A pool of worker threads. This class only implements
10
//                  creation and shutdown of the threads. The actual
11
//                  event processing is implemented in derived classes.
12
//
13
//  Author: Georg Kunz
14
//
15
//=========================================================================
16

    
17
/*--------------------------------------------------------------*
18
  Copyright (C) 2010 Georg Kunz
19

20
  This file is distributed WITHOUT ANY WARRANTY. See the file
21
  `license' for details on this and other legal matters.
22
 *--------------------------------------------------------------*/
23

    
24
#include <iostream>
25
#include <stdlib.h>
26
#include <cstring>
27
#include <cstdlib>
28

    
29
#include <unistd.h>
30
#include <sys/signal.h>
31

    
32
#include "cenvir.h"
33
#include "cthreadpool.h"
34
#include "cmessage.h"
35
#include "cconfiguration.h"
36
#include "cdefaultlist.h"
37
#include "cbarriermessage.h"
38
#include "sysdep.h"
39
#include "regmacros.h"
40
#include "cconfigoption.h"
41

    
42
//#define TIMING_DEBUG
43

    
44
#ifdef TIMING_DEBUG
45
#include <time.h>
46
#endif
47

    
48
USING_NAMESPACE
49

    
50
Register_GlobalConfigOption(CFG_IN_THREADPOOL_CPU_ID_OFFSET, "cpu-id-offset", CFG_INT, "0", "Offset for Thread Affinity. (When running multiple instances of Horizon at once)");
51
Register_PerRunConfigOption(CFGID_THREADPOOL_SIZE, "thread-pool-size", CFG_INT, "5", "Number of Threads in Threadpool");
52

    
53

    
54
static __thread cThreadLocalData* localData = NULL;
55

    
56

    
57
cThreadPool::cThreadPool() :
58
    shutdownDone(false)
59
{
60
}
61

    
62

    
63
cThreadPool::~cThreadPool()
64
{
65
}
66

    
67

    
68
static void* startWorker(void* arg)
69
{
70

    
71
        cThreadPool* pool = (cThreadPool*)arg;
72
        pool->worker();
73
        return NULL;
74
}
75

    
76

    
77
void cThreadPool::activate()
78
{
79
        //
80
        // Get configuration data.
81
        //
82

    
83
        numThreads = ev.getConfig()->getAsInt(CFGID_THREADPOOL_SIZE);
84
        unsigned cpuCount = (unsigned)sysconf(_SC_NPROCESSORS_ONLN);
85

    
86
        if (numThreads >= cpuCount) {
87
                std::cerr << "WARNING: The requested number of " << numThreads
88
                                << " worker "
89
                                << "threads plus the scheduler thread  exceeds the number of "
90
                                << cpuCount << " physical CPUs. Falling back to " << cpuCount
91
                                - 1 << " worker threads." << std::endl;
92
                numThreads = cpuCount - 1;
93
                sleep(1);
94

    
95
        }
96

    
97
        unsigned cpuIdOffset = ev.getConfig()->getAsInt(CFG_IN_THREADPOOL_CPU_ID_OFFSET);
98
        if (numThreads + cpuIdOffset >= cpuCount)
99
        {
100
                throw cRuntimeError("Mis-configured CPU-id offset: exceeding number of"
101
                                " physical CPUs");
102
        }
103

    
104
        //
105
        // create worker threads
106
        //
107
        std::cout << "starting " << numThreads << " threads" << std::endl;
108

    
109
        workerIDs = new pthread_t[numThreads];
110
        for (uint i = 0; i < numThreads; i++)
111
        {
112
                int err = pthread_create(&workerIDs[i], NULL, &startWorker, this);
113
                if (err == 0)
114
                {
115
                        std::cout << "created thread " << i << std::endl;
116
                        thread_set_affinity(workerIDs[i], i+1 + cpuIdOffset);
117
                }
118
                else
119
                {
120
                        throw cRuntimeError(this, "Unable to create worker thread: %s", strerror(err));
121
                }
122
                thread_set_affinity(pthread_self(), 0 + cpuIdOffset);
123
        }
124
}
125

    
126

    
127
void cThreadPool::shutdown()
128
{
129
        //
130
        // check if we already shut down the thread pool
131
        //
132
        if (shutdownDone)
133
                return;
134

    
135
        //
136
        // wait for threads to exit.
137
        //
138
        for (uint i = 0; i < numThreads; i++)
139
        {
140
                int err = pthread_cancel(workerIDs[i]);
141
                if (err != 0)
142
                        throw cRuntimeError(this, "Unable to cancel worker thread: %s", strerror(err));
143

    
144
                std::cout << "waiting for thread " << i << std::endl;
145

    
146
                err = pthread_join(workerIDs[i], NULL);
147
                if (err != 0)
148
                        throw cRuntimeError(this, "Unable to join worker thread: %s", strerror(err));
149
        }
150
        delete[] workerIDs;
151
        shutdownDone = true;
152
}
153

    
154

    
155
cThreadLocalData* cThreadPool::getLocalData()
156
{
157
        if (localData == NULL)
158
        {
159
                localData = new cThreadLocalData();
160
        }
161
        return localData;
162
}
163

    
164

    
165
void cThreadPool::setDefaultOwner(cDefaultList* list)
166
{
167
        cThreadLocalData* data = cThreadPool::getLocalData();
168
        data->setDefaultOwner(list);
169
}
170

    
171

    
172
cDefaultList* cThreadPool::getDefaultOwner()
173
{
174
        cThreadLocalData* data = cThreadPool::getLocalData();
175
        cDefaultList* list = data->getDefaultOwner();
176
        if (list == NULL)
177
        {
178
                data->setDefaultOwner(&defaultList);
179
                return &defaultList;
180
        }
181
        else
182
        {
183
                return list;
184
        }
185
}
186

    
187

    
188
void cThreadPool::setSimTime(simtime_t t)
189
{
190

    
191
        cThreadLocalData* data = cThreadPool::getLocalData();
192
        data->setSimTime(t);
193
}
194

    
195

    
196
simtime_t cThreadPool::getSimTime()
197
{
198
        return cThreadPool::getLocalData()->getSimTime();
199
}
200

    
201

    
202
cComponent* cThreadPool::getContext()
203
{
204
        return cThreadPool::getLocalData()->getContextModule();
205
}
206

    
207

    
208
void cThreadPool::setContext(cComponent* mod)
209
{
210
        cThreadLocalData* data = cThreadPool::getLocalData();
211
        data->setContextModule(mod);
212
}
213

    
214

    
215
// -------------------------------------
216

    
217

    
218
cThreadLocalData::cThreadLocalData() :
219
    defaultOwner(NULL)
220
{
221

    
222
}
223

    
224

    
225
cThreadLocalData::~cThreadLocalData()
226
{
227
        // nothing to do (yet)
228
}
229
NAMESPACE_END