Statistics
| Branch: | Revision:

root / src / sim / cscheduler.cc @ 77cb9420

History | View | Annotate | Download (12.4 KB)

1
//=========================================================================
2
//  CSCHEDULER.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//  Author: Andras Varga, 2003
8
//
9
//=========================================================================
10

    
11
/*--------------------------------------------------------------*
12
  Copyright (C) 2003-2008 Andras Varga
13
  Copyright (C) 2006-2008 OpenSim Ltd.
14
  Monash University, Dept. of Electrical and Computer Systems Eng.
15
  Melbourne, Australia
16

17
  This file is distributed WITHOUT ANY WARRANTY. See the file
18
  `license' for details on this and other legal matters.
19
*--------------------------------------------------------------*/
20

    
21
#include "cscheduler.h"
22
#include "cmessage.h"
23
#include "csimulation.h"
24
#include "cmessageheap.h"
25
#include "globals.h"
26
#include "cenvir.h"
27
#include "cconfiguration.h"
28
#include "cconfigoption.h"
29
#include "platmisc.h" // usleep
30
#include "cbarriermessage.h"
31

    
32
USING_NAMESPACE
33

    
34
Register_GlobalConfigOption(CFGID_REALTIMESCHEDULER_SCALING, "realtimescheduler-scaling", CFG_DOUBLE, NULL, "When cRealTimeScheduler is selected as scheduler class: ratio of simulation time to real time. For example, scaling=2 will cause simulation time to progress twice as fast as runtime.");
35

    
36
cScheduler::cScheduler()
37
{
38
    sim = NULL;
39
}
40

    
41
cScheduler::~cScheduler()
42
{
43
}
44

    
45
void cScheduler::setSimulation(cSimulation *_sim)
46
{
47
    sim = _sim;
48
}
49
//-----
50

    
51
Register_Class(cSequentialScheduler);
52

    
53
cMessage *cSequentialScheduler::getNextEvent()
54
{
55
#ifdef NOBARRIER
56
    // Do we have to wait for a barrier?
57
    if (sim->threadPool)
58
        sim->threadPool->waitAtBarrier(&(sim->msgQueue));
59

    
60
    //
61
    // If we retrieve a valid msg from the queue, we return it:
62
    //
63
    cMessage *msg = sim->msgQueue.removeFirst();
64

    
65
    if (msg)
66
        return msg;
67

    
68

    
69
    //
70
    // if there is no event left and we don't use the threadpool, end the sim
71
    //
72
    if (!sim->threadPool)
73
        throw cTerminationException(eENDEDOK);
74

    
75
    //
76
    // If we did not get a valid msg from the queue, but there are still
77
    // barrier messages left, we wait:
78
    //
79
    while (sim->msgQueue.empty() && !sim->threadPool->barrierEmpty())
80
    {
81
        __asm__ ("pause");
82
    }
83
    msg = sim->msgQueue.removeFirst();
84

    
85
    //
86
    // If there is a msg now, we return it:
87
    //
88
    if (msg)
89
        return msg;
90

    
91
    //
92
    // If there is still no message in the queue, there are
93
    // also no barriers left (else we would have waited), and we quit:
94
    //
95
    else
96
        throw cTerminationException(eENDEDOK);
97

    
98
#else
99
    cMessage *msg = NULL;
100
    cBarrierMessage* barrier = NULL;
101

    
102
    while (!msg)
103
    {
104
        msg = sim->msgQueue.removeFirst();
105
        if (!msg)
106
            throw cTerminationException(eENDEDOK);
107

    
108
        //
109
        // If we have a Barriermsg, we wait
110
        //
111
        barrier = dynamic_cast<cBarrierMessage*> (msg);
112
        if (barrier != NULL)
113
        {
114
            //
115
            // wait for the task to complete
116
            //
117
            barrier->wait();
118
            delete barrier;
119
            msg = NULL;
120
        }
121
    }
122

    
123
    cSimpleModule* mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
124
    if (mod->isAsyncModule())
125
    {
126
        cAsyncModule* aMod = (cAsyncModule*) mod;
127
        simtime_t duration = msg->getEventDuration();
128
        if (aMod->mayParallelize(msg, duration) && sim->threadPool)
129
        {
130
            //
131
            // create a new barrier and schedule it
132
            //
133
            cBarrierMessage* barrier = new cBarrierMessage();
134
            barrier->setArrival(aMod, -1, msg->getArrivalTime() + duration);
135
            msg->setBarrier(barrier);
136

    
137
            // If the event is a null duration event,
138
            // we have to set additional data in the barrier
139
            // to make sure the barrier gets placed correctly
140
            barrier->setSchedulingPriority(msg->getSchedulingPriority());
141
            barrier->setParentStartTime(msg->getArrivalTime());
142
            barrier->setParentExecutionOrderId(sim->getNextExecutionOrderId(msg));
143
            // Set scheduling order to 0 (in front of all children)
144
            barrier->setSchedulingOrderId(0);
145

    
146
            sim->msgQueue.insert(barrier);
147
        }
148
    }
149
    return msg;
150
#endif
151
}
152

    
153
//-----
154
Register_Class(cEEFScheduler)
155
;
156

    
157
cEEFScheduler::cEEFScheduler()
158
{
159
}
160

    
161
cEEFScheduler::~cEEFScheduler()
162
{
163
}
164

    
165
cMessage *cEEFScheduler::getNextEvent()
166
{
167
    if(!(sim->threadPool)) {
168
        throw cRuntimeError("EEFScheduler is not supported in sequential mode. Activate Threadpool or use cSequentialScheduler.");
169
    }
170

    
171
    updateIES();
172
    return getFirstEvent();
173
}
174

    
175
void cEEFScheduler::updateIES() {
176
    cMessage *msg = NULL;
177
    cSimpleModule* mod = NULL;
178
    cAsyncModule* aMod = NULL;
179
#ifdef NOBARRIER
180

    
181
    while (!(sim->msgQueue.empty()))
182
    {
183
        msg = sim->msgQueue.removeFirst();
184

    
185
        mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
186

    
187
        if (!(mod->isAsyncModule())) {
188
            break;
189
        }
190

    
191
        aMod = (cAsyncModule*) mod;
192
        simtime_t duration = msg->getEventDuration();
193

    
194
        if (!aMod->mayParallelize(msg, duration)) {
195
            break;
196
        }
197

    
198
        //
199
        // If there is an active barrier in the threadpool before msg, break
200
        //
201
        if (sim->threadPool->isBeforeBarrier(msg)) {
202
            break;
203
        }
204

    
205
        //
206
        // If an event in the IES would cause a barrier before msg, break
207
        //
208
        if (!independentEventsHeap.empty() && msg->getArrivalTime() >= independentEventsHeap.peekFirst()->getTend()) {
209
            break;
210
        }
211
/*        printf(
212
                "adding to IEH: %s, tstart=%f, tend= %f, now First in IEH: ",
213
                ((cSimpleModule*) sim->getModule(msg->getArrivalModuleId()))->getName(),SIMTIME_DBL(msg->getArrivalTime()),
214
                SIMTIME_DBL(msg->getTend()));*/
215

    
216
        independentEventsHeap.insert(msg);
217

    
218
/*        printf(
219
                "%s, length=%i\n",
220
                ((cSimpleModule*) sim->getModule(
221
                        independentEventsHeap.peekFirst()->getArrivalModuleId()))->getName(),
222
                independentEventsHeap.length());*/
223
        msg = NULL;
224
    }
225

    
226
    if (msg)
227
        sim->msgQueue.insert(msg);
228
#else
229
    cBarrierMessage* barrier = NULL;
230
    /*
231
     * Fill up independent event heap
232
     */
233
    while (!(sim->msgQueue.empty()))
234
    {
235
        msg = sim->msgQueue.removeFirst();
236
        barrier = dynamic_cast<cBarrierMessage*> (msg);
237
        if (barrier != NULL)
238
        {
239
            /*
240
             * If the barrier has already been signaled, we can just delete it,
241
             * as we would not have to wait here
242
            */
243
            if(!barrier->isValid()) {
244
                delete barrier;
245
                continue;
246
            }
247
            /*
248
             * if we hit a barrier, we are done and return the first independent msg
249
             * or we have wait at the barrier if no independent event exists
250
             */
251
            if (independentEventsHeap.empty())
252
            {
253
                barrier->wait();
254
                delete barrier;
255
                continue;
256
            }
257
            else
258
            {
259
                sim->msgQueue.insert(msg);
260
                return;
261
            }
262
        }
263

    
264
        mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
265
        aMod = NULL;
266
        if (mod->isAsyncModule())
267
        {
268
            aMod = (cAsyncModule*) mod;
269
            simtime_t duration = msg->getEventDuration();
270

    
271
            if (!aMod->mayParallelize(msg, duration))
272
            {
273
                sim->msgQueue.insert(msg);
274
                return;
275
            }
276
            // create a new barrier and schedule it
277
            cBarrierMessage* barrier = new cBarrierMessage();
278
            barrier->setArrival(aMod, -1, msg->getArrivalTime() + duration);
279
            msg->setBarrier(barrier);
280
            sim->msgQueue.insert(barrier);
281

    
282
/*            printf(
283
                    "adding to IEH: %s, tstart=%f, tend= %f, now First in IEH: ",
284
                    ((cSimpleModule*) sim->getModule(msg->getArrivalModuleId()))->getName(),SIMTIME_DBL(msg->getArrivalTime()),
285
                    SIMTIME_DBL(msg->getTend()));*/
286

    
287
            independentEventsHeap.insert(msg);
288

    
289
/*            printf(
290
                    "%s, length=%i\n",
291
                    ((cSimpleModule*) sim->getModule(
292
                            independentEventsHeap.peekFirst()->getArrivalModuleId()))->getName(),
293
                    independentEventsHeap.length());*/
294

    
295
        }
296
        else //Not a AsyncModule
297
        {
298
            sim->msgQueue.insert(msg);
299
            return;
300
        }
301

    
302
    } // while (!(sim->msgQueue.empty()))
303
#endif
304
}
305

    
306
cMessage* cEEFScheduler::getFirstEvent() {
307
#ifdef NOBARRIER
308
    cMessage *msg = NULL;
309
    if(independentEventsHeap.empty()) {
310
            if (!sim->msgQueue.empty()) {
311
                // Do we have to wait for a barrier?
312
                if (sim->threadPool) sim->threadPool->waitAtBarrier(&(sim->msgQueue));
313
                return sim->msgQueue.removeFirst();
314
            }
315
        } else {
316
            return independentEventsHeap.getFirst();
317
        }
318
        // At this point, both IES and FES are empty
319
        //
320
        // if there is no event left and we don't use the threadpool, end the sim
321
        //
322
        if (!sim->threadPool)
323
            throw cTerminationException(eENDEDOK);
324

    
325
        //
326
        // If we did not get a valid msg from the queue, but there are still
327
        // barrier messages left, we wait:
328
        //
329
        while (sim->msgQueue.empty() && !sim->threadPool->barrierEmpty())
330
        {
331
            __asm__ ("pause");
332
        }
333
        msg = sim->msgQueue.removeFirst();
334

    
335
        if (msg) return msg;
336

    
337
        //
338
        // If there is still no message in the queue, there are
339
        // also no barriers left (else we would have waited), and we quit:
340
        //
341

    
342
        else
343
            throw cTerminationException(eENDEDOK);
344
#else
345
        if (independentEventsHeap.empty())
346
        {
347
            if (sim->msgQueue.empty()) {
348
                /*
349
                 * Both Queues are empty, we are done
350
                 */
351
                throw cTerminationException(eENDEDOK);
352
            } else {
353
                /*
354
                 * In this case, we have sequential execution
355
                 */
356
                return sim->msgQueue.removeFirst();
357
            }
358
        }
359
        return independentEventsHeap.getFirst();
360
#endif
361
}
362

    
363
//-----
364
Register_Class(cRealTimeScheduler);
365

    
366
void cRealTimeScheduler::startRun()
367
{
368
    factor = ev.getConfig()->getAsDouble(CFGID_REALTIMESCHEDULER_SCALING);
369
    if (factor!=0)
370
        factor = 1/factor;
371
    doScaling = (factor!=0);
372

    
373
    gettimeofday(&baseTime, NULL);
374
}
375

    
376
void cRealTimeScheduler::endRun()
377
{
378
}
379

    
380
void cRealTimeScheduler::executionResumed()
381
{
382
    gettimeofday(&baseTime, NULL);
383
    baseTime = timeval_substract(baseTime, SIMTIME_DBL(doScaling ? factor*sim->getSimTime() : sim->getSimTime()));
384
}
385

    
386
bool cRealTimeScheduler::waitUntil(const timeval& targetTime)
387
{
388
    // if there's more than 200ms to wait, wait in 100ms chunks
389
    // in order to keep UI responsiveness by invoking ev.idle()
390
    timeval curTime;
391
    gettimeofday(&curTime, NULL);
392
    while (targetTime.tv_sec-curTime.tv_sec >=2 ||
393
           timeval_diff_usec(targetTime, curTime) >= 200000)
394
    {
395
        usleep(100000); // 100ms
396
        if (ev.idle())
397
            return false;
398
        gettimeofday(&curTime, NULL);
399
    }
400

    
401
    // difference is now at most 100ms, do it at once
402
    long usec = timeval_diff_usec(targetTime, curTime);
403
    if (usec>0)
404
        usleep(usec);
405
    return true;
406
}
407

    
408
cMessage *cRealTimeScheduler::getNextEvent()
409
{
410
#ifdef NOBARRIER
411
    throw cRuntimeError("RealTimeScheduler is not supported with NOBARRIER.");
412
#endif
413
    if(sim->threadPool) {
414
        //TODO: Handle barriermsg
415
        throw cRuntimeError("RealTimeScheduler is not supported with Threadpool.");
416
    }
417
    cMessage *msg = sim->msgQueue.removeFirst();
418
    if (!msg)
419
        throw cTerminationException(eENDEDOK);
420

    
421
    // calculate target time
422
    simtime_t eventSimtime = msg->getArrivalTime();
423
    timeval targetTime = timeval_add(baseTime, SIMTIME_DBL(doScaling ? factor*eventSimtime : eventSimtime));
424

    
425
    // if needed, wait until that time arrives
426
    timeval curTime;
427
    gettimeofday(&curTime, NULL);
428
    if (timeval_greater(targetTime, curTime))
429
    {
430
        if (!waitUntil(targetTime))
431
            return NULL; // user break
432
    }
433
    else
434
    {
435
        // we're behind -- customized versions of this class may alert
436
        // if we're too much behind, or modify basetime to accept the skew
437
    }
438

    
439
    // ok, return the message
440
    return msg;
441
}