Statistics
| Branch: | Revision:

root / src / sim / cscheduler.cc @ c87b95b0

History | View | Annotate | Download (11.8 KB)

1
//=========================================================================
2
//  CSCHEDULER.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//  Author: Andras Varga, 2003
8
//
9
//=========================================================================
10

    
11
/*--------------------------------------------------------------*
12
  Copyright (C) 2003-2008 Andras Varga
13
  Copyright (C) 2006-2008 OpenSim Ltd.
14
  Monash University, Dept. of Electrical and Computer Systems Eng.
15
  Melbourne, Australia
16

17
  This file is distributed WITHOUT ANY WARRANTY. See the file
18
  `license' for details on this and other legal matters.
19
*--------------------------------------------------------------*/
20

    
21
#include "cscheduler.h"
22
#include "cmessage.h"
23
#include "csimulation.h"
24
#include "cmessageheap.h"
25
#include "globals.h"
26
#include "cenvir.h"
27
#include "cconfiguration.h"
28
#include "cconfigoption.h"
29
#include "platmisc.h" // usleep
30
#include "cbarriermessage.h"
31

    
32
USING_NAMESPACE
33

    
34
Register_GlobalConfigOption(CFGID_REALTIMESCHEDULER_SCALING, "realtimescheduler-scaling", CFG_DOUBLE, NULL, "When cRealTimeScheduler is selected as scheduler class: ratio of simulation time to real time. For example, scaling=2 will cause simulation time to progress twice as fast as runtime.");
35

    
36
cScheduler::cScheduler()
37
{
38
    sim = NULL;
39
}
40

    
41
cScheduler::~cScheduler()
42
{
43
}
44

    
45
void cScheduler::setSimulation(cSimulation *_sim)
46
{
47
    sim = _sim;
48
}
49
//-----
50

    
51
Register_Class(cSequentialScheduler);
52

    
53
cMessage *cSequentialScheduler::getNextEvent()
54
{
55
#ifdef NOBARRIER
56
    // Do we have to wait for a barrier?
57
    if (sim->threadPool) sim->threadPool->waitAtBarrier(&(sim->msgQueue));
58

    
59
    //
60
    // If we retrieve a valid msg from the queue, we return it:
61
    //
62
    cMessage *msg = sim->msgQueue.removeFirst();
63

    
64
    if (msg) return msg;
65

    
66

    
67
    //
68
    // if there is no event left and we don't use the threadpool, end the sim
69
    //
70
    if (!sim->threadPool)
71
        throw cTerminationException(eENDEDOK);
72

    
73
    //
74
    // If we did not get a valid msg from the queue, but there are still
75
    // barrier messages left, we wait:
76
    //
77
    while (sim->msgQueue.empty() && !sim->threadPool->barrierEmpty())
78
    {
79
        __asm__ ("pause");
80
    }
81
    msg = sim->msgQueue.removeFirst();
82

    
83
    //
84
    // If there is a msg now, we return it:
85
    //
86
    if (msg) return msg;
87

    
88
    //
89
    // If there is still no message in the queue, there are
90
    // also no barriers left (else we would have waited), and we quit:
91
    //
92

    
93
    else
94
    throw cTerminationException(eENDEDOK);
95

    
96
#else
97
    cMessage *msg = NULL;
98
    cBarrierMessage* barrier = NULL;
99

    
100
    while (!msg)
101
    {
102
        msg = sim->msgQueue.removeFirst();
103
        if (!msg)
104
            throw cTerminationException(eENDEDOK);
105
        /*
106
         * If we have a Barriermsg, we wait
107
         */
108
        barrier = dynamic_cast<cBarrierMessage*> (msg);
109
        if (barrier != NULL)
110
        {
111
            // wait for the task to complete
112
            barrier->wait();
113
            delete barrier;
114
            msg = NULL;
115
        }
116
    }
117

    
118
    cSimpleModule* mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
119
    if (mod->isAsyncModule())
120
    {
121
        cAsyncModule* aMod = (cAsyncModule*) mod;
122
        simtime_t duration = msg->getEventDuration();
123
        if (aMod->mayParallelize(msg, duration) && sim->threadPool)
124
        {
125
            // create a new barrier and schedule it
126
            cBarrierMessage* barrier = new cBarrierMessage();
127
            barrier->setArrival(aMod, -1, msg->getArrivalTime() + duration);
128
            msg->setBarrier(barrier);
129
            sim->msgQueue.insert(barrier);
130
        }
131
    }
132
    return msg;
133
#endif
134
}
135

    
136
//-----
137
Register_Class(cEEFScheduler)
138
;
139

    
140
cEEFScheduler::cEEFScheduler()
141
{
142

    
143
}
144
cEEFScheduler::~cEEFScheduler()
145
{
146
}
147

    
148
cMessage *cEEFScheduler::getNextEvent()
149
{
150
    if(!(sim->threadPool)) {
151
        throw cRuntimeError("EEFScheduler is not supported in sequential mode. Activate Threadpool or use cSequentialScheduler.");
152
    }
153

    
154
    updateIES();
155
    return getFirstEvent();
156
}
157

    
158
void cEEFScheduler::updateIES() {
159
    cMessage *msg = NULL;
160
    cSimpleModule* mod = NULL;
161
    cAsyncModule* aMod = NULL;
162
#ifdef NOBARRIER
163

    
164
    while (!(sim->msgQueue.empty()))
165
       {
166
        msg = sim->msgQueue.removeFirst();
167

    
168
        mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
169

    
170
        if (!(mod->isAsyncModule())) {
171
            break;
172
        }
173

    
174
        aMod = (cAsyncModule*) mod;
175
        simtime_t duration = msg->getEventDuration();
176

    
177
        if (!aMod->mayParallelize(msg, duration)) {
178
            break;
179
        }
180
        /*
181
         * If there is an active barrier in the threadpool before msg, break
182
         */
183
        if (sim->threadPool->isBeforeBarrier(msg)) {
184
            break;
185
        }
186
        /*
187
         * If an event in the IES would cause a barrier before msg, break
188
         */
189
        if (!independentEventsHeap.empty() && msg->getArrivalTime() >= independentEventsHeap.peekFirst()->getTend()) {
190
            break;
191
        }
192
/*        printf(
193
                "adding to IEH: %s, tstart=%f, tend= %f, now First in IEH: ",
194
                ((cSimpleModule*) sim->getModule(msg->getArrivalModuleId()))->getName(),SIMTIME_DBL(msg->getArrivalTime()),
195
                SIMTIME_DBL(msg->getTend()));*/
196

    
197
        independentEventsHeap.insert(msg);
198

    
199
/*        printf(
200
                "%s, length=%i\n",
201
                ((cSimpleModule*) sim->getModule(
202
                        independentEventsHeap.peekFirst()->getArrivalModuleId()))->getName(),
203
                independentEventsHeap.length());*/
204
        msg = NULL;
205
    }
206

    
207
    if (msg) sim->msgQueue.insert(msg);
208
#else
209
    cBarrierMessage* barrier = NULL;
210
    /*
211
     * Fill up independent event heap
212
     */
213
    while (!(sim->msgQueue.empty()))
214
    {
215
        msg = sim->msgQueue.removeFirst();
216
        barrier = dynamic_cast<cBarrierMessage*> (msg);
217
        if (barrier != NULL)
218
        {
219
            /*
220
             * If the barrier has already been signaled, we can just delete it,
221
             * as we would not have to wait here
222
            */
223
            if(!barrier->isValid()) {
224
                delete barrier;
225
                continue;
226
            }
227
            /*
228
             * if we hit a barrier, we are done and return the first independent msg
229
             * or we have wait at the barrier if no independent event exists
230
             */
231
            if (independentEventsHeap.empty())
232
            {
233
                barrier->wait();
234
                delete barrier;
235
                continue;
236
            }
237
            else
238
            {
239
                sim->msgQueue.insert(msg);
240
                return;
241
            }
242
        }
243

    
244
        mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
245
        aMod = NULL;
246
        if (mod->isAsyncModule())
247
        {
248
            aMod = (cAsyncModule*) mod;
249
            simtime_t duration = msg->getEventDuration();
250

    
251
            if (!aMod->mayParallelize(msg, duration))
252
            {
253
                sim->msgQueue.insert(msg);
254
                return;
255
            }
256
            // create a new barrier and schedule it
257
            cBarrierMessage* barrier = new cBarrierMessage();
258
            barrier->setArrival(aMod, -1, msg->getArrivalTime() + duration);
259
            msg->setBarrier(barrier);
260
            sim->msgQueue.insert(barrier);
261

    
262
/*            printf(
263
                    "adding to IEH: %s, tstart=%f, tend= %f, now First in IEH: ",
264
                    ((cSimpleModule*) sim->getModule(msg->getArrivalModuleId()))->getName(),SIMTIME_DBL(msg->getArrivalTime()),
265
                    SIMTIME_DBL(msg->getTend()));*/
266

    
267
            independentEventsHeap.insert(msg);
268

    
269
/*            printf(
270
                    "%s, length=%i\n",
271
                    ((cSimpleModule*) sim->getModule(
272
                            independentEventsHeap.peekFirst()->getArrivalModuleId()))->getName(),
273
                    independentEventsHeap.length());*/
274

    
275
        }
276
        else //Not a AsyncModule
277
        {
278
            sim->msgQueue.insert(msg);
279
            return;
280
        }
281

    
282
    } // while (!(sim->msgQueue.empty()))
283
#endif
284
}
285

    
286
cMessage* cEEFScheduler::getFirstEvent() {
287
#ifdef NOBARRIER
288
    cMessage *msg = NULL;
289
    if(independentEventsHeap.empty()) {
290
            if (!sim->msgQueue.empty()) {
291
                // Do we have to wait for a barrier?
292
                if (sim->threadPool) sim->threadPool->waitAtBarrier(&(sim->msgQueue));
293
                return sim->msgQueue.removeFirst();
294
            }
295
        } else {
296
            return independentEventsHeap.getFirst();
297
        }
298
        // At this point, both IES and FES are empty
299
        //
300
        // if there is no event left and we don't use the threadpool, end the sim
301
        //
302
        if (!sim->threadPool)
303
        throw cTerminationException(eENDEDOK);
304

    
305
        //
306
        // If we did not get a valid msg from the queue, but there are still
307
        // barrier messages left, we wait:
308
        //
309
        while (sim->msgQueue.empty() && !sim->threadPool->barrierEmpty())
310
        {
311
            __asm__ ("pause");
312
        }
313
        msg = sim->msgQueue.removeFirst();
314

    
315
        if (msg) return msg;
316

    
317
        //
318
        // If there is still no message in the queue, there are
319
        // also no barriers left (else we would have waited), and we quit:
320
        //
321

    
322
        else
323
        throw cTerminationException(eENDEDOK);
324
#else
325
        if (independentEventsHeap.empty())
326
        {
327
            if (sim->msgQueue.empty()) {
328
                /*
329
                 * Both Queues are empty, we are done
330
                 */
331
                throw cTerminationException(eENDEDOK);
332
            } else {
333
                /*
334
                 * In this case, we have sequential execution
335
                 */
336
                return sim->msgQueue.removeFirst();
337
            }
338
        }
339
        return independentEventsHeap.getFirst();
340
#endif
341
}
342

    
343
//-----
344
Register_Class(cRealTimeScheduler);
345

    
346
void cRealTimeScheduler::startRun()
347
{
348
    factor = ev.getConfig()->getAsDouble(CFGID_REALTIMESCHEDULER_SCALING);
349
    if (factor!=0)
350
        factor = 1/factor;
351
    doScaling = (factor!=0);
352

    
353
    gettimeofday(&baseTime, NULL);
354
}
355

    
356
void cRealTimeScheduler::endRun()
357
{
358
}
359

    
360
void cRealTimeScheduler::executionResumed()
361
{
362
    gettimeofday(&baseTime, NULL);
363
    baseTime = timeval_substract(baseTime, SIMTIME_DBL(doScaling ? factor*sim->getSimTime() : sim->getSimTime()));
364
}
365

    
366
bool cRealTimeScheduler::waitUntil(const timeval& targetTime)
367
{
368
    // if there's more than 200ms to wait, wait in 100ms chunks
369
    // in order to keep UI responsiveness by invoking ev.idle()
370
    timeval curTime;
371
    gettimeofday(&curTime, NULL);
372
    while (targetTime.tv_sec-curTime.tv_sec >=2 ||
373
           timeval_diff_usec(targetTime, curTime) >= 200000)
374
    {
375
        usleep(100000); // 100ms
376
        if (ev.idle())
377
            return false;
378
        gettimeofday(&curTime, NULL);
379
    }
380

    
381
    // difference is now at most 100ms, do it at once
382
    long usec = timeval_diff_usec(targetTime, curTime);
383
    if (usec>0)
384
        usleep(usec);
385
    return true;
386
}
387

    
388
cMessage *cRealTimeScheduler::getNextEvent()
389
{
390
#ifdef NOBARRIER
391
    throw cRuntimeError("RealTimeScheduler is not supported with NOBARRIER.");
392
#endif
393
    if(sim->threadPool) {
394
        //TODO: Handle barriermsg
395
        throw cRuntimeError("RealTimeScheduler is not supported with Threadpool.");
396
    }
397
    cMessage *msg = sim->msgQueue.removeFirst();
398
    if (!msg)
399
        throw cTerminationException(eENDEDOK);
400

    
401
    // calculate target time
402
    simtime_t eventSimtime = msg->getArrivalTime();
403
    timeval targetTime = timeval_add(baseTime, SIMTIME_DBL(doScaling ? factor*eventSimtime : eventSimtime));
404

    
405
    // if needed, wait until that time arrives
406
    timeval curTime;
407
    gettimeofday(&curTime, NULL);
408
    if (timeval_greater(targetTime, curTime))
409
    {
410
        if (!waitUntil(targetTime))
411
            return NULL; // user break
412
    }
413
    else
414
    {
415
        // we're behind -- customized versions of this class may alert
416
        // if we're too much behind, or modify basetime to accept the skew
417
    }
418

    
419
    // ok, return the message
420
    return msg;
421
}
422

    
423

    
424