Statistics
| Branch: | Revision:

root / src / sim / cscheduler.cc @ 6b81f4fa

History | View | Annotate | Download (13 KB)

1
//=========================================================================
2
//  CSCHEDULER.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//  Author: Andras Varga, 2003
8
//
9
//=========================================================================
10

    
11
/*--------------------------------------------------------------*
12
  Copyright (C) 2003-2008 Andras Varga
13
  Copyright (C) 2006-2008 OpenSim Ltd.
14
  Monash University, Dept. of Electrical and Computer Systems Eng.
15
  Melbourne, Australia
16

17
  This file is distributed WITHOUT ANY WARRANTY. See the file
18
  `license' for details on this and other legal matters.
19
*--------------------------------------------------------------*/
20

    
21
#include "cscheduler.h"
22
#include "cmessage.h"
23
#include "csimulation.h"
24
#include "cmessageheap.h"
25
#include "globals.h"
26
#include "cenvir.h"
27
#include "cconfiguration.h"
28
#include "cconfigoption.h"
29
#include "platmisc.h" // usleep
30
#include "cbarriermessage.h"
31

    
32
USING_NAMESPACE
33

    
34
Register_GlobalConfigOption(CFGID_REALTIMESCHEDULER_SCALING, "realtimescheduler-scaling", CFG_DOUBLE, NULL, "When cRealTimeScheduler is selected as scheduler class: ratio of simulation time to real time. For example, scaling=2 will cause simulation time to progress twice as fast as runtime.");
35

    
36
cScheduler::cScheduler()
37
{
38
    sim = NULL;
39
}
40

    
41
cScheduler::~cScheduler()
42
{
43
}
44

    
45
void cScheduler::setSimulation(cSimulation *_sim)
46
{
47
    sim = _sim;
48
}
49
//-----
50

    
51
Register_Class(cSequentialScheduler);
52

    
53
cMessage *cSequentialScheduler::getNextEvent()
54
{
55
#ifdef NOBARRIER
56
    // Do we have to wait for a barrier?
57
    if (sim->threadPool) sim->threadPool->waitAtBarrier(&(sim->msgQueue));
58

    
59
    //
60
    // If we retrieve a valid msg from the queue, we return it:
61
    //
62
    cMessage *msg = sim->msgQueue.removeFirst();
63

    
64
    if (msg) {
65
        /*
66
         * Set Duration
67
         */
68
        cSimpleModule* mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
69
        if (mod->isAsyncModule()) {
70
            msg->setEventDuration(((cAsyncModule*) mod)->getProcessingDelay(msg));
71
        }
72
        /*
73
         * return message
74
         */
75
        return msg;
76
    }
77

    
78

    
79
    //
80
    // if there is no event left and we don't use the threadpool, end the sim
81
    //
82
    if (!sim->threadPool)
83
        throw cTerminationException(eENDEDOK);
84

    
85
    //
86
    // If we did not get a valid msg from the queue, but there are still
87
    // barrier messages left, we wait:
88
    //
89
    while (sim->msgQueue.empty() && !sim->threadPool->barrierEmpty())
90
    {
91
        __asm__ ("pause");
92
    }
93
    msg = sim->msgQueue.removeFirst();
94

    
95
    //
96
    // If there is a msg now, we return it:
97
    //
98
    if (msg) {
99
        /*
100
         * Set Duration
101
         */
102
        cSimpleModule* mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
103
        if (mod->isAsyncModule()) {
104
            msg->setEventDuration(((cAsyncModule*) mod)->getProcessingDelay(msg));
105
        }
106
        /*
107
         * return message
108
         */
109
        return msg;
110
    }
111

    
112
    //
113
    // If there is still no message in the queue, there are
114
    // also no barriers left (else we would have waited), and we quit:
115
    //
116

    
117
    else
118
    throw cTerminationException(eENDEDOK);
119

    
120
#else
121
    cMessage *msg = NULL;
122
    cBarrierMessage* barrier = NULL;
123

    
124
    while (!msg)
125
    {
126
        msg = sim->msgQueue.removeFirst();
127
        if (!msg)
128
            throw cTerminationException(eENDEDOK);
129
        /*
130
         * If we have a Barriermsg, we wait
131
         */
132
        barrier = dynamic_cast<cBarrierMessage*> (msg);
133
        if (barrier != NULL)
134
        {
135
            // wait for the task to complete
136
            barrier->wait();
137
            delete barrier;
138
            msg = NULL;
139
        }
140
    }
141

    
142
    cSimpleModule* mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
143
    if (mod->isAsyncModule())
144
    {
145
        cAsyncModule* aMod = (cAsyncModule*) mod;
146
        simtime_t now = msg->getArrivalTime();
147
        simtime_t duration = aMod->getProcessingDelay(msg);
148

    
149
        if (aMod->mayParallelize(msg, duration))
150
        {
151
            msg->setEventDuration(duration);
152
            // create a new barrier and schedule it
153
            cBarrierMessage* barrier = new cBarrierMessage();
154
            barrier->setArrival(aMod, -1, now + duration);
155
            msg->setBarrier(barrier);
156
            // insert user supplied message in task queue.
157
            sim->msgQueue.insert(barrier);
158
        }
159
    }
160
    return msg;
161
#endif
162
}
163

    
164
//-----
165
Register_Class(cEEFScheduler)
166
;
167

    
168
cEEFScheduler::cEEFScheduler()
169
{
170
}
171
cEEFScheduler::~cEEFScheduler()
172
{
173
}
174

    
175
cMessage *cEEFScheduler::getNextEvent()
176
{
177

    
178
    //TODO: check if barriermessage is still vaild before waiting (may speed up things, as IES may not run empty)
179

    
180
    cMessage *msg = NULL;
181
    cSimpleModule* mod = NULL;
182
    cAsyncModule* aMod = NULL;
183

    
184
#ifdef NOBARRIER
185

    
186
    while (!(sim->msgQueue.empty()))
187
       {
188
        msg = sim->msgQueue.removeFirst();
189

    
190
        mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
191

    
192
        if (!(mod->isAsyncModule())) {
193
            break;
194
        }
195
        aMod = (cAsyncModule*) mod;
196
        simtime_t duration = aMod->getProcessingDelay(msg);
197
        msg->setEventDuration(duration);
198
        bool mayPar = aMod->mayParallelize(msg, duration);
199

    
200
        if (!mayPar) {
201
            break;
202
        }
203
        /*
204
         * If there is an active barrier in the threadpool before msg, break
205
         */
206
        if (sim->threadPool->isBeforeBarrier(msg)) {
207
            break;
208
        }
209
        /*
210
         * If an event in the IES would cause a barrier before msg, break
211
         */
212
        if (!independentEventsHeap.empty() && msg->getArrivalTime() >= independentEventsHeap.peekFirst()->getTend()) {
213
            break;
214
        }
215
        printf(
216
                "adding to IEH: %s, tstart=%f, tend= %f, now First in IEH: ",
217
                ((cSimpleModule*) sim->getModule(msg->getArrivalModuleId()))->getName(),SIMTIME_DBL(msg->getArrivalTime()),
218
                SIMTIME_DBL(msg->getTend()));
219

    
220
        independentEventsHeap.insert(msg);
221

    
222
        printf(
223
                "%s, length=%i\n",
224
                ((cSimpleModule*) sim->getModule(
225
                        independentEventsHeap.peekFirst()->getArrivalModuleId()))->getName(),
226
                independentEventsHeap.length());
227
        msg = NULL;
228
    }
229

    
230
    if (msg) sim->msgQueue.insert(msg);
231

    
232
    if(independentEventsHeap.empty()) {
233
        if (!sim->msgQueue.empty()) {
234
            // Do we have to wait for a barrier?
235
            if (sim->threadPool) sim->threadPool->waitAtBarrier(&(sim->msgQueue));
236
            return sim->msgQueue.removeFirst();
237
        }
238
    } else {
239
        return independentEventsHeap.getFirst();
240
    }
241
    // At this point, both IES and FES are empty
242
    //
243
    // if there is no event left and we don't use the threadpool, end the sim
244
    //
245
    if (!sim->threadPool)
246
    throw cTerminationException(eENDEDOK);
247

    
248
    //
249
    // If we did not get a valid msg from the queue, but there are still
250
    // barrier messages left, we wait:
251
    //
252
    while (sim->msgQueue.empty() && !sim->threadPool->barrierEmpty())
253
    {
254
        __asm__ ("pause");
255
    }
256
    msg = sim->msgQueue.removeFirst();
257

    
258
    //
259
    // If there is a msg now, we return it:
260
    //
261
    if (msg)
262
    return msg;
263

    
264
    //
265
    // If there is still no message in the queue, there are
266
    // also no barriers left (else we would have waited), and we quit:
267
    //
268

    
269
    else
270
    throw cTerminationException(eENDEDOK);
271

    
272
#else
273
    cBarrierMessage* barrier = NULL;
274
    /*
275
     * Fill up independent event heap
276
     */
277
    while (!(sim->msgQueue.empty()))
278
    {
279
        msg = sim->msgQueue.removeFirst();
280
        barrier = dynamic_cast<cBarrierMessage*> (msg);
281
        if (barrier != NULL)
282
        {
283
            /*
284
             * if we hit a barrier, we are done and return the first independent msg
285
             * or we have wait at the barrier if no independent event exists
286
             */
287
            if (independentEventsHeap.empty())
288
            {
289
                barrier->wait();
290
                delete barrier;
291
                continue;
292
            }
293
            else
294
            {
295
                sim->msgQueue.insert(msg);
296
                return independentEventsHeap.getFirst();
297
            }
298
        }
299

    
300
        mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
301
        aMod = NULL;
302
        if (mod->isAsyncModule())
303
        {
304
            aMod = (cAsyncModule*) mod;
305
            simtime_t now = msg->getArrivalTime();
306
            simtime_t duration = aMod->getProcessingDelay(msg);
307
            msg->setEventDuration(duration);
308

    
309
            if (!aMod->mayParallelize(msg, duration))
310
            {
311
                if (independentEventsHeap.empty())
312
                {
313
                    return msg;
314
                }
315
                else
316
                {
317
                    sim->msgQueue.insert(msg);
318
                    return independentEventsHeap.getFirst();
319
                }
320
            }
321
            // create a new barrier and schedule it
322
            cBarrierMessage* barrier = new cBarrierMessage();
323
            barrier->setArrival(aMod, -1, now + duration);
324
            msg->setBarrier(barrier);
325
            sim->msgQueue.insert(barrier);
326

    
327
            printf(
328
                     "adding to IEH: %s, tstart=%f, tend= %f, now First in IEH: ", SIMTIME_DBL(msg->getArrivalTime()),
329
                     ((cSimpleModule*) sim->getModule(msg->getArrivalModuleId()))->getName(),
330
                     SIMTIME_DBL(msg->getTend()));
331

    
332
            independentEventsHeap.insert(msg);
333

    
334
            printf(
335
                    "%s, length=%i\n",
336
                    ((cSimpleModule*) sim->getModule(
337
                            independentEventsHeap.peekFirst()->getArrivalModuleId()))->getName(),
338
                    independentEventsHeap.length());
339

    
340
        }
341
        else //Not a AsyncModule
342
        {
343
            if (independentEventsHeap.empty())
344
            {
345
                return msg;
346
            }
347
            else
348
            {
349
                sim->msgQueue.insert(msg);
350
                return independentEventsHeap.getFirst();
351
            }
352
        }
353

    
354
    } // while (!(sim->msgQueue.empty()))
355
/*
356
 * the FES is empty
357
 * check if the independent event set is also empty
358
 */
359
if (independentEventsHeap.empty())
360
{
361
    throw cTerminationException(eENDEDOK);
362
}
363
return independentEventsHeap.getFirst();
364
#endif
365
}
366

    
367

    
368

    
369

    
370
//-----
371
Register_Class(cRealTimeScheduler);
372

    
373
void cRealTimeScheduler::startRun()
374
{
375
    factor = ev.getConfig()->getAsDouble(CFGID_REALTIMESCHEDULER_SCALING);
376
    if (factor!=0)
377
        factor = 1/factor;
378
    doScaling = (factor!=0);
379

    
380
    gettimeofday(&baseTime, NULL);
381
}
382

    
383
void cRealTimeScheduler::endRun()
384
{
385
}
386

    
387
void cRealTimeScheduler::executionResumed()
388
{
389
    gettimeofday(&baseTime, NULL);
390
    baseTime = timeval_substract(baseTime, SIMTIME_DBL(doScaling ? factor*sim->getSimTime() : sim->getSimTime()));
391
}
392

    
393
bool cRealTimeScheduler::waitUntil(const timeval& targetTime)
394
{
395
    // if there's more than 200ms to wait, wait in 100ms chunks
396
    // in order to keep UI responsiveness by invoking ev.idle()
397
    timeval curTime;
398
    gettimeofday(&curTime, NULL);
399
    while (targetTime.tv_sec-curTime.tv_sec >=2 ||
400
           timeval_diff_usec(targetTime, curTime) >= 200000)
401
    {
402
        usleep(100000); // 100ms
403
        if (ev.idle())
404
            return false;
405
        gettimeofday(&curTime, NULL);
406
    }
407

    
408
    // difference is now at most 100ms, do it at once
409
    long usec = timeval_diff_usec(targetTime, curTime);
410
    if (usec>0)
411
        usleep(usec);
412
    return true;
413
}
414

    
415
cMessage *cRealTimeScheduler::getNextEvent()
416
{
417
#ifdef NOBARRIER
418
    //
419
    // If we retrieve a valid msg from the queue, we return it:
420
    //
421
    cMessage *msg = sim->msgQueue.peekFirst();
422
    if (msg)
423
        return msg;
424

    
425
    //
426
    // if there is no event left and we don't use the threadpool, end the sim
427
    //
428
    if (!sim->threadPool)
429
        throw cTerminationException(eENDEDOK);
430

    
431
    //
432
    // If we did not get a valid msg from the queue, but there are still
433
    // barrier messages left, we wait:
434
    //
435
    while (sim->msgQueue.empty() && !sim->threadPool->barrierEmpty())
436
    {
437
        __asm__ ("pause");
438
    }
439
    msg = sim->msgQueue.peekFirst();
440

    
441
    //
442
    // If there is a msg now, we return it:
443
    //
444
    if (msg)
445
        return msg;
446

    
447
    //
448
    // If there is still no message in the queue, there are
449
    // also no barriers left (else we would have waited), and we quit:
450
    //
451
    else
452
        throw cTerminationException(eENDEDOK);
453

    
454
#else
455
    cMessage *msg = sim->msgQueue.peekFirst();
456
    if (!msg)
457
        throw cTerminationException(eENDEDOK);
458

    
459
    // calculate target time
460
    simtime_t eventSimtime = msg->getArrivalTime();
461
    timeval targetTime = timeval_add(baseTime, SIMTIME_DBL(doScaling ? factor*eventSimtime : eventSimtime));
462

    
463
    // if needed, wait until that time arrives
464
    timeval curTime;
465
    gettimeofday(&curTime, NULL);
466
    if (timeval_greater(targetTime, curTime))
467
    {
468
        if (!waitUntil(targetTime))
469
            return NULL; // user break
470
    }
471
    else
472
    {
473
        // we're behind -- customized versions of this class may alert
474
        // if we're too much behind, or modify basetime to accept the skew
475
    }
476

    
477
    // ok, return the message
478
    return msg;
479
#endif
480
}
481

    
482

    
483