Statistics
| Branch: | Revision:

root / src / sim / cscheduler.cc @ e1750c09

History | View | Annotate | Download (11.2 KB)

1
//=========================================================================
2
//  CSCHEDULER.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//  Author: Andras Varga, 2003
8
//
9
//=========================================================================
10

    
11
/*--------------------------------------------------------------*
12
  Copyright (C) 2003-2008 Andras Varga
13
  Copyright (C) 2006-2008 OpenSim Ltd.
14
  Monash University, Dept. of Electrical and Computer Systems Eng.
15
  Melbourne, Australia
16

17
  This file is distributed WITHOUT ANY WARRANTY. See the file
18
  `license' for details on this and other legal matters.
19
*--------------------------------------------------------------*/
20

    
21
#include "cscheduler.h"
22
#include "cmessage.h"
23
#include "csimulation.h"
24
#include "cmessageheap.h"
25
#include "globals.h"
26
#include "cenvir.h"
27
#include "cconfiguration.h"
28
#include "cconfigoption.h"
29
#include "platmisc.h" // usleep
30
#include "cbarriermessage.h"
31
#include "csimplemodule.h"
32

    
33
USING_NAMESPACE
34

    
35
Register_GlobalConfigOption(CFGID_REALTIMESCHEDULER_SCALING, "realtimescheduler-scaling", CFG_DOUBLE, NULL, "When cRealTimeScheduler is selected as scheduler class: ratio of simulation time to real time. For example, scaling=2 will cause simulation time to progress twice as fast as runtime.");
36

    
37
cScheduler::cScheduler()
38
{
39
    sim = NULL;
40
}
41

    
42
cScheduler::~cScheduler()
43
{
44
}
45

    
46
void cScheduler::setSimulation(cSimulation *_sim)
47
{
48
    sim = _sim;
49
}
50
//-----
51

    
52
Register_Class(cSequentialScheduler);
53

    
54
cMessage *cSequentialScheduler::getNextEvent()
55
{
56
#ifdef NOBARRIER
57
    // Do we have to wait for a barrier?
58
    if (sim->threadPool)
59
        sim->threadPool->waitAtBarrier(&(sim->msgQueue));
60

    
61
    //
62
    // If we retrieve a valid msg from the queue, we return it:
63
    //
64
    cMessage *msg = sim->msgQueue.peekFirst();
65

    
66
    if (msg)
67
        return msg;
68

    
69

    
70
    //
71
    // if there is no event left and we don't use the threadpool, end the sim
72
    //
73
    if (!sim->threadPool)
74
        throw cTerminationException(eENDEDOK);
75

    
76
    //
77
    // If we did not get a valid msg from the queue, but there are still
78
    // barrier messages left, we wait:
79
    //
80
    while (sim->msgQueue.empty() && !sim->threadPool->barrierEmpty())
81
    {
82
        __asm__ ("pause");
83
    }
84
    msg = sim->msgQueue.peekFirst();
85

    
86
    //
87
    // If there is a msg now, we return it:
88
    //
89
    if (msg)
90
        return msg;
91

    
92
    //
93
    // If there is still no message in the queue, there are
94
    // also no barriers left (else we would have waited), and we quit:
95
    //
96
    else
97
        throw cTerminationException(eENDEDOK);
98

    
99
#else
100
    cMessage *msg = NULL;
101
    cBarrierMessage* barrier = NULL;
102

    
103
    while (!msg)
104
    {
105
        msg = sim->msgQueue.peekFirst();
106
        if (!msg)
107
            throw cTerminationException(eENDEDOK);
108

    
109
        //
110
        // If we have a Barriermsg, we wait
111
        //
112
        barrier = dynamic_cast<cBarrierMessage*> (msg);
113
        if (barrier != NULL)
114
        {
115
            //
116
            // wait for the task to complete
117
            //
118
            barrier->wait();
119
            delete barrier;
120
            msg = NULL;
121
        }
122
    }
123

    
124
    cSimpleModule* mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
125
        simtime_t duration = msg->getEventDuration();
126
        if (duration != SimTime::simTimeSequentialExecution && mod->mayParallelize(msg, duration) && sim->threadPool)
127
        {
128
                //
129
                // create a new barrier and schedule it
130
                //
131
                cBarrierMessage* barrier = new cBarrierMessage();
132
                barrier->setArrival(mod, -1, msg->getArrivalTime() + duration);
133
                msg->setBarrier(barrier);
134

    
135
                // If the event is a null duration event,
136
                // we have to set additional data in the barrier
137
                // to make sure the barrier gets placed correctly
138
                barrier->setSchedulingPriority(msg->getSchedulingPriority());
139
                barrier->setParentStartTime(msg->getArrivalTime());
140
                barrier->setParentExecutionOrderId(sim->getNextExecutionOrderId(msg));
141
                // Set scheduling order to 0 (in front of all children)
142
                barrier->setSchedulingOrderId(0);
143

    
144
                sim->msgQueue.insert(barrier);
145
        }
146
    return msg;
147
#endif
148
}
149

    
150
//-----
151
Register_Class(cEEFScheduler)
152
;
153

    
154
cEEFScheduler::cEEFScheduler()
155
{
156
}
157

    
158
cEEFScheduler::~cEEFScheduler()
159
{
160
}
161

    
162
cMessage *cEEFScheduler::getNextEvent()
163
{
164
    if(!(sim->threadPool)) {
165
        throw cRuntimeError("EEFScheduler is not supported in sequential mode. Activate Threadpool or use cSequentialScheduler.");
166
    }
167

    
168
    updateIES();
169
    cMessage* msg=getFirstEvent();
170
        sim->msgQueue.insert(msg);
171
        return msg;
172
}
173

    
174
void cEEFScheduler::updateIES() {
175
    cMessage *msg = NULL;
176
    cSimpleModule* mod = NULL;
177
#ifdef NOBARRIER
178

    
179
    while (!(sim->msgQueue.empty()))
180
    {
181
        msg = sim->msgQueue.removeFirst();
182

    
183
        mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
184

    
185
        simtime_t duration = msg->getEventDuration();
186

    
187
        if (!mod->mayParallelize(msg, duration)) {
188
            break;
189
        }
190

    
191
        //
192
        // If there is an active barrier in the threadpool before msg, break
193
        //
194
        if (sim->threadPool->isBeforeBarrier(msg)) {
195
            break;
196
        }
197

    
198
        //
199
        // If an event in the IES would cause a barrier before msg, break
200
        //
201
        if (!independentEventsHeap.empty() && msg->getArrivalTime() >= independentEventsHeap.peekFirst()->getTend()) {
202
            break;
203
        }
204
/*        printf(
205
                "adding to IEH: %s, tstart=%f, tend= %f, now First in IEH: ",
206
                ((cSimpleModule*) sim->getModule(msg->getArrivalModuleId()))->getName(),SIMTIME_DBL(msg->getArrivalTime()),
207
                SIMTIME_DBL(msg->getTend()));*/
208

    
209
        independentEventsHeap.insert(msg);
210

    
211
/*        printf(
212
                "%s, length=%i\n",
213
                ((cSimpleModule*) sim->getModule(
214
                        independentEventsHeap.peekFirst()->getArrivalModuleId()))->getName(),
215
                independentEventsHeap.length());*/
216
        msg = NULL;
217
    }
218

    
219
    if (msg)
220
        sim->msgQueue.insert(msg);
221
#else
222
    cBarrierMessage* barrier = NULL;
223
    /*
224
     * Fill up independent event heap
225
     */
226
    while (!(sim->msgQueue.empty()))
227
    {
228
        msg = sim->msgQueue.removeFirst();
229
        barrier = dynamic_cast<cBarrierMessage*> (msg);
230
        if (barrier != NULL)
231
        {
232
            /*
233
             * If the barrier has already been signaled, we can just delete it,
234
             * as we would not have to wait here
235
            */
236
            if(!barrier->isValid()) {
237
                delete barrier;
238
                continue;
239
            }
240
            /*
241
             * if we hit a barrier, we are done and return the first independent msg
242
             * or we have wait at the barrier if no independent event exists
243
             */
244
            if (independentEventsHeap.empty())
245
            {
246
                barrier->wait();
247
                delete barrier;
248
                continue;
249
            }
250
            else
251
            {
252
                sim->msgQueue.insert(msg);
253
                return;
254
            }
255
        }
256

    
257
        mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
258
                simtime_t duration = msg->getEventDuration();
259

    
260
                if (!mod->mayParallelize(msg, duration))
261
                {
262
                        sim->msgQueue.insert(msg);
263
                        return;
264
                }
265
                // create a new barrier and schedule it
266
                cBarrierMessage* barrier = new cBarrierMessage();
267
                barrier->setArrival(mod, -1, msg->getArrivalTime() + duration);
268
                msg->setBarrier(barrier);
269
                sim->msgQueue.insert(barrier);
270

    
271
                independentEventsHeap.insert(msg);
272

    
273
    } // while (!(sim->msgQueue.empty()))
274
#endif
275
}
276

    
277
cMessage* cEEFScheduler::getFirstEvent() {
278
#ifdef NOBARRIER
279
    cMessage *msg = NULL;
280
    if(independentEventsHeap.empty()) {
281
            if (!sim->msgQueue.empty()) {
282
                // Do we have to wait for a barrier?
283
                if (sim->threadPool) sim->threadPool->waitAtBarrier(&(sim->msgQueue));
284
                return sim->msgQueue.removeFirst();
285
            }
286
        } else {
287
            return independentEventsHeap.getFirst();
288
        }
289
        // At this point, both IES and FES are empty
290
        //
291
        // if there is no event left and we don't use the threadpool, end the sim
292
        //
293
        if (!sim->threadPool)
294
            throw cTerminationException(eENDEDOK);
295

    
296
        //
297
        // If we did not get a valid msg from the queue, but there are still
298
        // barrier messages left, we wait:
299
        //
300
        while (sim->msgQueue.empty() && !sim->threadPool->barrierEmpty())
301
        {
302
            __asm__ ("pause");
303
        }
304
        msg = sim->msgQueue.removeFirst();
305

    
306
        if (msg) return msg;
307

    
308
        //
309
        // If there is still no message in the queue, there are
310
        // also no barriers left (else we would have waited), and we quit:
311
        //
312

    
313
        else
314
            throw cTerminationException(eENDEDOK);
315
#else
316
        if (independentEventsHeap.empty())
317
        {
318
            if (sim->msgQueue.empty()) {
319
                /*
320
                 * Both Queues are empty, we are done
321
                 */
322
                throw cTerminationException(eENDEDOK);
323
            } else {
324
                /*
325
                 * In this case, we have sequential execution
326
                 */
327
                return sim->msgQueue.removeFirst();
328
            }
329
        }
330
        return independentEventsHeap.getFirst();
331
#endif
332
}
333

    
334
//-----
335
Register_Class(cRealTimeScheduler);
336

    
337
void cRealTimeScheduler::startRun()
338
{
339
    factor = ev.getConfig()->getAsDouble(CFGID_REALTIMESCHEDULER_SCALING);
340
    if (factor!=0)
341
        factor = 1/factor;
342
    doScaling = (factor!=0);
343

    
344
    gettimeofday(&baseTime, NULL);
345
}
346

    
347
void cRealTimeScheduler::endRun()
348
{
349
}
350

    
351
void cRealTimeScheduler::executionResumed()
352
{
353
    gettimeofday(&baseTime, NULL);
354
    baseTime = timeval_substract(baseTime, SIMTIME_DBL(doScaling ? factor*sim->getSimTime() : sim->getSimTime()));
355
}
356

    
357
bool cRealTimeScheduler::waitUntil(const timeval& targetTime)
358
{
359
    // if there's more than 200ms to wait, wait in 100ms chunks
360
    // in order to keep UI responsiveness by invoking ev.idle()
361
    timeval curTime;
362
    gettimeofday(&curTime, NULL);
363
    while (targetTime.tv_sec-curTime.tv_sec >=2 ||
364
           timeval_diff_usec(targetTime, curTime) >= 200000)
365
    {
366
        usleep(100000); // 100ms
367
        if (ev.idle())
368
            return false;
369
        gettimeofday(&curTime, NULL);
370
    }
371

    
372
    // difference is now at most 100ms, do it at once
373
    long usec = timeval_diff_usec(targetTime, curTime);
374
    if (usec>0)
375
        usleep(usec);
376
    return true;
377
}
378

    
379
cMessage *cRealTimeScheduler::getNextEvent()
380
{
381
#ifdef NOBARRIER
382
    throw cRuntimeError("RealTimeScheduler is not supported with NOBARRIER.");
383
#endif
384
    if(sim->threadPool) {
385
        //TODO: Handle barriermsg
386
        throw cRuntimeError("RealTimeScheduler is not supported with Threadpool.");
387
    }
388
    cMessage *msg = sim->msgQueue.peekFirst();
389
    if (!msg)
390
        throw cTerminationException(eENDEDOK);
391

    
392
    // calculate target time
393
    simtime_t eventSimtime = msg->getArrivalTime();
394
    timeval targetTime = timeval_add(baseTime, SIMTIME_DBL(doScaling ? factor*eventSimtime : eventSimtime));
395

    
396
    // if needed, wait until that time arrives
397
    timeval curTime;
398
    gettimeofday(&curTime, NULL);
399
    if (timeval_greater(targetTime, curTime))
400
    {
401
        if (!waitUntil(targetTime))
402
            return NULL; // user break
403
    }
404
    else
405
    {
406
        // we're behind -- customized versions of this class may alert
407
        // if we're too much behind, or modify basetime to accept the skew
408
    }
409

    
410
    // ok, return the message
411
    return msg;
412
}