Statistics
| Branch: | Revision:

root / src / sim / cscheduler.cc @ 94cf1056

History | View | Annotate | Download (13.2 KB)

1
//=========================================================================
2
//  CSCHEDULER.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//  Author: Andras Varga, 2003
8
//
9
//=========================================================================
10

    
11
/*--------------------------------------------------------------*
12
  Copyright (C) 2003-2008 Andras Varga
13
  Copyright (C) 2006-2008 OpenSim Ltd.
14
  Monash University, Dept. of Electrical and Computer Systems Eng.
15
  Melbourne, Australia
16

17
  This file is distributed WITHOUT ANY WARRANTY. See the file
18
  `license' for details on this and other legal matters.
19
*--------------------------------------------------------------*/
20

    
21
#include "cscheduler.h"
22
#include "cmessage.h"
23
#include "csimulation.h"
24
#include "cmessageheap.h"
25
#include "globals.h"
26
#include "cenvir.h"
27
#include "cconfiguration.h"
28
#include "cconfigoption.h"
29
#include "platmisc.h" // usleep
30
#include "cbarriermessage.h"
31

    
32
USING_NAMESPACE
33

    
34
Register_GlobalConfigOption(CFGID_REALTIMESCHEDULER_SCALING, "realtimescheduler-scaling", CFG_DOUBLE, NULL, "When cRealTimeScheduler is selected as scheduler class: ratio of simulation time to real time. For example, scaling=2 will cause simulation time to progress twice as fast as runtime.");
35

    
36
cScheduler::cScheduler()
37
{
38
    sim = NULL;
39
}
40

    
41
cScheduler::~cScheduler()
42
{
43
}
44

    
45
void cScheduler::setSimulation(cSimulation *_sim)
46
{
47
    sim = _sim;
48
}
49
//-----
50

    
51
Register_Class(cSequentialScheduler);
52

    
53
cMessage *cSequentialScheduler::getNextEvent()
54
{
55
#ifdef NOBARRIER
56
    // Do we have to wait for a barrier?
57
    if (sim->threadPool) sim->threadPool->waitAtBarrier(&(sim->msgQueue));
58

    
59
    //
60
    // If we retrieve a valid msg from the queue, we return it:
61
    //
62
    cMessage *msg = sim->msgQueue.removeFirst();
63

    
64
    if (msg) {
65
        /*
66
         * Set Duration
67
         */
68
        cSimpleModule* mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
69
        if (mod->isAsyncModule()) {
70
            msg->setEventDuration(((cAsyncModule*) mod)->getProcessingDelay(msg));
71
        }
72
        /*
73
         * return message
74
         */
75
        return msg;
76
    }
77

    
78

    
79
    //
80
    // if there is no event left and we don't use the threadpool, end the sim
81
    //
82
    if (!sim->threadPool)
83
        throw cTerminationException(eENDEDOK);
84

    
85
    //
86
    // If we did not get a valid msg from the queue, but there are still
87
    // barrier messages left, we wait:
88
    //
89
    while (sim->msgQueue.empty() && !sim->threadPool->barrierEmpty())
90
    {
91
        __asm__ ("pause");
92
    }
93
    msg = sim->msgQueue.removeFirst();
94

    
95
    //
96
    // If there is a msg now, we return it:
97
    //
98
    if (msg) {
99
        /*
100
         * Set Duration
101
         */
102
        cSimpleModule* mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
103
        if (mod->isAsyncModule()) {
104
            msg->setEventDuration(((cAsyncModule*) mod)->getProcessingDelay(msg));
105
        }
106
        /*
107
         * return message
108
         */
109
        return msg;
110
    }
111

    
112
    //
113
    // If there is still no message in the queue, there are
114
    // also no barriers left (else we would have waited), and we quit:
115
    //
116

    
117
    else
118
    throw cTerminationException(eENDEDOK);
119

    
120
#else
121
    cMessage *msg = NULL;
122
    cBarrierMessage* barrier = NULL;
123

    
124
    while (!msg)
125
    {
126
        msg = sim->msgQueue.removeFirst();
127
        if (!msg)
128
            throw cTerminationException(eENDEDOK);
129
        /*
130
         * If we have a Barriermsg, we wait
131
         */
132
        barrier = dynamic_cast<cBarrierMessage*> (msg);
133
        if (barrier != NULL)
134
        {
135
            // wait for the task to complete
136
            barrier->wait();
137
            delete barrier;
138
            msg = NULL;
139
        }
140
    }
141

    
142
    cSimpleModule* mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
143
    if (mod->isAsyncModule())
144
    {
145
        cAsyncModule* aMod = (cAsyncModule*) mod;
146
        simtime_t now = msg->getArrivalTime();
147
        simtime_t duration = aMod->getProcessingDelay(msg);
148
        msg->setEventDuration(duration);
149

    
150
        if (aMod->mayParallelize(msg, duration))
151
        {
152
            // create a new barrier and schedule it
153
            cBarrierMessage* barrier = new cBarrierMessage();
154
            barrier->setArrival(aMod, -1, now + duration);
155
            msg->setBarrier(barrier);
156
            sim->msgQueue.insert(barrier);
157
        }
158
    }
159
    return msg;
160
#endif
161
}
162

    
163
//-----
164
Register_Class(cEEFScheduler)
165
;
166

    
167
cEEFScheduler::cEEFScheduler()
168
{
169
}
170
cEEFScheduler::~cEEFScheduler()
171
{
172
}
173

    
174
cMessage *cEEFScheduler::getNextEvent()
175
{
176

    
177
    //TODO: check if barriermessage is still vaild before waiting (may speed up things, as IES may not run empty)
178

    
179
    cMessage *msg = NULL;
180
    cSimpleModule* mod = NULL;
181
    cAsyncModule* aMod = NULL;
182

    
183
#ifdef NOBARRIER
184

    
185
    while (!(sim->msgQueue.empty()))
186
       {
187
        msg = sim->msgQueue.removeFirst();
188

    
189
        mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
190

    
191
        if (!(mod->isAsyncModule())) {
192
            break;
193
        }
194
        aMod = (cAsyncModule*) mod;
195
        simtime_t duration = aMod->getProcessingDelay(msg);
196
        msg->setEventDuration(duration);
197
        bool mayPar = aMod->mayParallelize(msg, duration);
198

    
199
        if (!mayPar) {
200
            break;
201
        }
202
        /*
203
         * If there is an active barrier in the threadpool before msg, break
204
         */
205
        if (sim->threadPool->isBeforeBarrier(msg)) {
206
            break;
207
        }
208
        /*
209
         * If an event in the IES would cause a barrier before msg, break
210
         */
211
        if (!independentEventsHeap.empty() && msg->getArrivalTime() >= independentEventsHeap.peekFirst()->getTend()) {
212
            break;
213
        }
214
        printf(
215
                "adding to IEH: %s, tstart=%f, tend= %f, now First in IEH: ",
216
                ((cSimpleModule*) sim->getModule(msg->getArrivalModuleId()))->getName(),SIMTIME_DBL(msg->getArrivalTime()),
217
                SIMTIME_DBL(msg->getTend()));
218

    
219
        independentEventsHeap.insert(msg);
220

    
221
        printf(
222
                "%s, length=%i\n",
223
                ((cSimpleModule*) sim->getModule(
224
                        independentEventsHeap.peekFirst()->getArrivalModuleId()))->getName(),
225
                independentEventsHeap.length());
226
        msg = NULL;
227
    }
228

    
229
    if (msg) sim->msgQueue.insert(msg);
230

    
231
    if(independentEventsHeap.empty()) {
232
        if (!sim->msgQueue.empty()) {
233
            // Do we have to wait for a barrier?
234
            if (sim->threadPool) sim->threadPool->waitAtBarrier(&(sim->msgQueue));
235
            return sim->msgQueue.removeFirst();
236
        }
237
    } else {
238
        return independentEventsHeap.getFirst();
239
    }
240
    // At this point, both IES and FES are empty
241
    //
242
    // if there is no event left and we don't use the threadpool, end the sim
243
    //
244
    if (!sim->threadPool)
245
    throw cTerminationException(eENDEDOK);
246

    
247
    //
248
    // If we did not get a valid msg from the queue, but there are still
249
    // barrier messages left, we wait:
250
    //
251
    while (sim->msgQueue.empty() && !sim->threadPool->barrierEmpty())
252
    {
253
        __asm__ ("pause");
254
    }
255
    msg = sim->msgQueue.removeFirst();
256

    
257
    //
258
    // If there is a msg now, we return it:
259
    //
260
    if (msg)
261
    return msg;
262

    
263
    //
264
    // If there is still no message in the queue, there are
265
    // also no barriers left (else we would have waited), and we quit:
266
    //
267

    
268
    else
269
    throw cTerminationException(eENDEDOK);
270

    
271
#else
272
    cBarrierMessage* barrier = NULL;
273
    /*
274
     * Fill up independent event heap
275
     */
276
    while (!(sim->msgQueue.empty()))
277
    {
278
        msg = sim->msgQueue.removeFirst();
279
        barrier = dynamic_cast<cBarrierMessage*> (msg);
280
        if (barrier != NULL)
281
        {
282
            /*
283
             * If the barrier has already been signaled, we can just delete it, as we would not have to wait here
284
            */
285
            if(!barrier->isValid()) {
286
                delete barrier;
287
                continue;
288
            }
289
            /*
290
             * if we hit a barrier, we are done and return the first independent msg
291
             * or we have wait at the barrier if no independent event exists
292
             */
293
            if (independentEventsHeap.empty())
294
            {
295
                barrier->wait();
296
                delete barrier;
297
                continue;
298
            }
299
            else
300
            {
301
                sim->msgQueue.insert(msg);
302
                return independentEventsHeap.getFirst();
303
            }
304
        }
305

    
306
        mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
307
        aMod = NULL;
308
        if (mod->isAsyncModule())
309
        {
310
            aMod = (cAsyncModule*) mod;
311
            simtime_t now = msg->getArrivalTime();
312
            simtime_t duration = aMod->getProcessingDelay(msg);
313
            msg->setEventDuration(duration);
314

    
315
            if (!aMod->mayParallelize(msg, duration))
316
            {
317
                if (independentEventsHeap.empty())
318
                {
319
                    return msg;
320
                }
321
                else
322
                {
323
                    sim->msgQueue.insert(msg);
324
                    return independentEventsHeap.getFirst();
325
                }
326
            }
327
            // create a new barrier and schedule it
328
            cBarrierMessage* barrier = new cBarrierMessage();
329
            barrier->setArrival(aMod, -1, now + duration);
330
            msg->setBarrier(barrier);
331
            sim->msgQueue.insert(barrier);
332

    
333
            printf(
334
                     "adding to IEH: %f, tstart=%s, tend= %f, now First in IEH: ", SIMTIME_DBL(msg->getArrivalTime()),
335
                     ((cSimpleModule*) sim->getModule(msg->getArrivalModuleId()))->getName(),
336
                     SIMTIME_DBL(msg->getTend()));
337

    
338
            independentEventsHeap.insert(msg);
339

    
340
            printf(
341
                    "%s, length=%i\n",
342
                    ((cSimpleModule*) sim->getModule(
343
                            independentEventsHeap.peekFirst()->getArrivalModuleId()))->getName(),
344
                    independentEventsHeap.length());
345

    
346
        }
347
        else //Not a AsyncModule
348
        {
349
            if (independentEventsHeap.empty())
350
            {
351
                return msg;
352
            }
353
            else
354
            {
355
                sim->msgQueue.insert(msg);
356
                return independentEventsHeap.getFirst();
357
            }
358
        }
359

    
360
    } // while (!(sim->msgQueue.empty()))
361
/*
362
 * the FES is empty
363
 * check if the independent event set is also empty
364
 */
365
if (independentEventsHeap.empty())
366
{
367
    throw cTerminationException(eENDEDOK);
368
}
369
return independentEventsHeap.getFirst();
370
#endif
371
}
372

    
373

    
374

    
375

    
376
//-----
377
Register_Class(cRealTimeScheduler);
378

    
379
void cRealTimeScheduler::startRun()
380
{
381
    factor = ev.getConfig()->getAsDouble(CFGID_REALTIMESCHEDULER_SCALING);
382
    if (factor!=0)
383
        factor = 1/factor;
384
    doScaling = (factor!=0);
385

    
386
    gettimeofday(&baseTime, NULL);
387
}
388

    
389
void cRealTimeScheduler::endRun()
390
{
391
}
392

    
393
void cRealTimeScheduler::executionResumed()
394
{
395
    gettimeofday(&baseTime, NULL);
396
    baseTime = timeval_substract(baseTime, SIMTIME_DBL(doScaling ? factor*sim->getSimTime() : sim->getSimTime()));
397
}
398

    
399
bool cRealTimeScheduler::waitUntil(const timeval& targetTime)
400
{
401
    // if there's more than 200ms to wait, wait in 100ms chunks
402
    // in order to keep UI responsiveness by invoking ev.idle()
403
    timeval curTime;
404
    gettimeofday(&curTime, NULL);
405
    while (targetTime.tv_sec-curTime.tv_sec >=2 ||
406
           timeval_diff_usec(targetTime, curTime) >= 200000)
407
    {
408
        usleep(100000); // 100ms
409
        if (ev.idle())
410
            return false;
411
        gettimeofday(&curTime, NULL);
412
    }
413

    
414
    // difference is now at most 100ms, do it at once
415
    long usec = timeval_diff_usec(targetTime, curTime);
416
    if (usec>0)
417
        usleep(usec);
418
    return true;
419
}
420

    
421
cMessage *cRealTimeScheduler::getNextEvent()
422
{
423
#ifdef NOBARRIER
424
    //
425
    // If we retrieve a valid msg from the queue, we return it:
426
    //
427
    cMessage *msg = sim->msgQueue.peekFirst();
428
    if (msg)
429
        return msg;
430

    
431
    //
432
    // if there is no event left and we don't use the threadpool, end the sim
433
    //
434
    if (!sim->threadPool)
435
        throw cTerminationException(eENDEDOK);
436

    
437
    //
438
    // If we did not get a valid msg from the queue, but there are still
439
    // barrier messages left, we wait:
440
    //
441
    while (sim->msgQueue.empty() && !sim->threadPool->barrierEmpty())
442
    {
443
        __asm__ ("pause");
444
    }
445
    msg = sim->msgQueue.peekFirst();
446

    
447
    //
448
    // If there is a msg now, we return it:
449
    //
450
    if (msg)
451
        return msg;
452

    
453
    //
454
    // If there is still no message in the queue, there are
455
    // also no barriers left (else we would have waited), and we quit:
456
    //
457
    else
458
        throw cTerminationException(eENDEDOK);
459

    
460
#else
461
    cMessage *msg = sim->msgQueue.peekFirst();
462
    if (!msg)
463
        throw cTerminationException(eENDEDOK);
464

    
465
    // calculate target time
466
    simtime_t eventSimtime = msg->getArrivalTime();
467
    timeval targetTime = timeval_add(baseTime, SIMTIME_DBL(doScaling ? factor*eventSimtime : eventSimtime));
468

    
469
    // if needed, wait until that time arrives
470
    timeval curTime;
471
    gettimeofday(&curTime, NULL);
472
    if (timeval_greater(targetTime, curTime))
473
    {
474
        if (!waitUntil(targetTime))
475
            return NULL; // user break
476
    }
477
    else
478
    {
479
        // we're behind -- customized versions of this class may alert
480
        // if we're too much behind, or modify basetime to accept the skew
481
    }
482

    
483
    // ok, return the message
484
    return msg;
485
#endif
486
}
487

    
488

    
489