Statistics
| Branch: | Revision:

root / src / sim / cthreadpool.cc @ fbe00e73

History | View | Annotate | Download (5.14 KB)

1
//=========================================================================
2
//  CTHREADPOOL.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//
8
//   Member functions of
9
//    cThreadPool : A pool of worker threads. This class only implements
10
//                  creation and shutdown of the threads. The actual
11
//                  event processing is implemented in derived classes.
12
//
13
//  Author: Georg Kunz
14
//
15
//=========================================================================
16

    
17
/*--------------------------------------------------------------*
18
  Copyright (C) 2010 Georg Kunz
19

20
  This file is distributed WITHOUT ANY WARRANTY. See the file
21
  `license' for details on this and other legal matters.
22
 *--------------------------------------------------------------*/
23

    
24
#include <iostream>
25
#include <stdlib.h>
26
#include <cstring>
27
#include <cstdlib>
28

    
29
#include <unistd.h>
30
#include <sys/signal.h>
31

    
32
#include "cenvir.h"
33
#include "cthreadpool.h"
34
#include "cmessage.h"
35
#include "cconfiguration.h"
36
#include "cdefaultlist.h"
37
#include "cbarriermessage.h"
38
#include "sysdep.h"
39
#include "regmacros.h"
40
#include "cconfigoption.h"
41

    
42
//#define TIMING_DEBUG
43

    
44
#ifdef TIMING_DEBUG
45
#include <time.h>
46
#endif
47

    
48
USING_NAMESPACE
49

    
50
Register_GlobalConfigOption(CFG_IN_THREADPOOL_CPU_ID_OFFSET, "cpu-id-offset", CFG_INT, "0", "Offset for Thread Affinity. (When running multiple instances of Horizon at once)");
51
Register_PerRunConfigOption(CFGID_THREADPOOL_SIZE, "thread-pool-size", CFG_INT, "5", "Number of Threads in Threadpool");
52
Register_GlobalConfigOption(CFGID_SIMTIME_SCALE, "simtime-scale", CFG_INT, "-12", "Sets the scale exponent, and thus the resolution of time for the 64-bit fixed-point simulation time representation. Accepted values are -18..0; for example, -6 selects microsecond resolution. -12 means picosecond resolution, with a maximum simtime of ~110 days.");
53

    
54

    
55
static __thread cThreadLocalData* localData = NULL;
56

    
57

    
58
cThreadPool::cThreadPool() :
59
    shutdownDone(false)
60
{
61
}
62

    
63

    
64
cThreadPool::~cThreadPool()
65
{
66
}
67

    
68

    
69
static void* startWorker(void* arg)
70
{
71

    
72
        cThreadPool* pool = (cThreadPool*)arg;
73
        pool->worker();
74
        return NULL;
75
}
76

    
77

    
78
void cThreadPool::activate()
79
{
80
        //
81
        // Get configuration data.
82
        //
83

    
84
        numThreads = ev.getConfig()->getAsInt(CFGID_THREADPOOL_SIZE);
85
        unsigned cpuCount = (unsigned)sysconf(_SC_NPROCESSORS_ONLN);
86

    
87
        if (numThreads >= cpuCount) {
88
                std::cerr << "WARNING: The requested number of " << numThreads
89
                                << " worker "
90
                                << "threads plus the scheduler thread  exceeds the number of "
91
                                << cpuCount << " physical CPUs. Falling back to " << cpuCount
92
                                - 1 << " worker threads." << std::endl;
93
                numThreads = cpuCount - 1;
94
                sleep(1);
95

    
96
        }
97

    
98
        unsigned cpuIdOffset = ev.getConfig()->getAsInt(CFG_IN_THREADPOOL_CPU_ID_OFFSET);
99
        if (numThreads + cpuIdOffset >= cpuCount)
100
        {
101
                throw cRuntimeError("Mis-configured CPU-id offset: exceeding number of"
102
                                " physical CPUs");
103
        }
104

    
105
        //
106
        // create worker threads
107
        //
108
        std::cout << "starting " << numThreads << " threads" << std::endl;
109

    
110
        workerIDs = new pthread_t[numThreads];
111
        for (uint i = 0; i < numThreads; i++)
112
        {
113
                int err = pthread_create(&workerIDs[i], NULL, &startWorker, this);
114
                if (err == 0)
115
                {
116
                        std::cout << "created thread " << i << std::endl;
117
                        thread_set_affinity(workerIDs[i], i+1 + cpuIdOffset);
118
                }
119
                else
120
                {
121
                        throw cRuntimeError(this, "Unable to create worker thread: %s", strerror(err));
122
                }
123
                thread_set_affinity(pthread_self(), 0 + cpuIdOffset);
124
        }
125
}
126

    
127

    
128
void cThreadPool::shutdown()
129
{
130
        //
131
        // check if we already shut down the thread pool
132
        //
133
        if (shutdownDone)
134
                return;
135

    
136
        //
137
        // wait for threads to exit.
138
        //
139
        for (uint i = 0; i < numThreads; i++)
140
        {
141
                int err = pthread_cancel(workerIDs[i]);
142
                if (err != 0)
143
                        throw cRuntimeError(this, "Unable to cancel worker thread: %s", strerror(err));
144

    
145
                std::cout << "waiting for thread " << i << std::endl;
146

    
147
                err = pthread_join(workerIDs[i], NULL);
148
                if (err != 0)
149
                        throw cRuntimeError(this, "Unable to join worker thread: %s", strerror(err));
150
        }
151
        delete[] workerIDs;
152
        shutdownDone = true;
153
}
154

    
155

    
156
cThreadLocalData* cThreadPool::getLocalData()
157
{
158
        if (localData == NULL)
159
        {
160
                localData = new cThreadLocalData();
161
        }
162
        return localData;
163
}
164

    
165

    
166
void cThreadPool::setDefaultOwner(cDefaultList* list)
167
{
168
        cThreadLocalData* data = cThreadPool::getLocalData();
169
        data->setDefaultOwner(list);
170
}
171

    
172

    
173
cDefaultList* cThreadPool::getDefaultOwner()
174
{
175
        cThreadLocalData* data = cThreadPool::getLocalData();
176
        cDefaultList* list = data->getDefaultOwner();
177
        if (list == NULL)
178
        {
179
                data->setDefaultOwner(&defaultList);
180
                return &defaultList;
181
        }
182
        else
183
        {
184
                return list;
185
        }
186
}
187

    
188

    
189
void cThreadPool::setSimTime(simtime_t t)
190
{
191

    
192
        cThreadLocalData* data = cThreadPool::getLocalData();
193
        data->setSimTime(t);
194
}
195

    
196

    
197
simtime_t cThreadPool::getSimTime()
198
{
199
        return cThreadPool::getLocalData()->getSimTime();
200
}
201

    
202

    
203
cComponent* cThreadPool::getContext()
204
{
205
        return cThreadPool::getLocalData()->getContextModule();
206
}
207

    
208

    
209
void cThreadPool::setContext(cComponent* mod)
210
{
211
        cThreadLocalData* data = cThreadPool::getLocalData();
212
        data->setContextModule(mod);
213
}
214

    
215

    
216
// -------------------------------------
217

    
218

    
219
cThreadLocalData::cThreadLocalData() :
220
    defaultOwner(NULL)
221
{
222

    
223
}
224

    
225

    
226
cThreadLocalData::~cThreadLocalData()
227
{
228
        // nothing to do (yet)
229
}
230
NAMESPACE_END