Project

General

Profile

Statistics
| Branch: | Revision:

root / src / sim / casyncmodule.cc @ eefbed01

History | View | Annotate | Download (14 KB)

1
//=========================================================================
2
//  CASYNCMODULE.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//
8
//   Member functions of
9
//    cAsyncModule : derived class of cSimpleModule for asynchronous event handling
10
//
11
//  Author: Georg Kunz
12
//
13
//=========================================================================
14

    
15
/*--------------------------------------------------------------*
16
  Copyright (C) 2009 Georg Kunz
17

18
  This file is distributed WITHOUT ANY WARRANTY. See the file
19
  `license' for details on this and other legal matters.
20
*--------------------------------------------------------------*/
21

    
22
#include <cstring>
23
#include <stdlib.h>
24

    
25
#include "casyncmodule.h"
26
#include "cthreadpool.h"
27
#include "cconfiguration.h"
28
#include "cconfigoption.h"
29

    
30

    
31
Register_PerRunConfigOption(CFGID_ASYNCMODULE_PARZERODUR, "parallelize-zero-duration-events", CFG_BOOL, "false", "Parallelize Events with zero duration");
32

    
33

    
34
cAsyncModule::cAsyncModule(const char *name, cModule *parent, unsigned stacksize) :
35
                t_end(0.0)
36
{
37
    //
38
        // how many local random number generators do we need? This should become a
39
        // special parameter of every asyncmodule. Not yet implemented.
40
        //
41
        unsigned nrNumGens = 1;
42
        /*
43
         cPar *p = (cPar*)paramv.get("number-local-rngs");
44
         if (p)
45
         nrNumGens = *p;
46

47
         std::cout << "number-local-rngs in " << fullName() << ": " << nrNumGens << std::endl;
48
         */
49
        numGen = new cNumberGenerator(nrNumGens);
50

    
51
        //
52
        // find out if events with 0 durations also be parallelized?
53
        //
54
        parZeroDur = ev.getConfig()->getAsBool(CFGID_ASYNCMODULE_PARZERODUR);
55

    
56
        //
57
        // init state flag to non-busy since no thread is active yet
58
        //
59
        AO_store(&busy, 0);
60
}
61

    
62

    
63
cAsyncModule::~cAsyncModule()
64
{
65
  delete numGen;
66
}
67

    
68

    
69
void cAsyncModule::setMessageMetaData(cMessage* msg)
70
{
71
    msg->setEventDuration(-1);
72
    msg->setSchedulingOrderId(scheduledMessageCount++);
73
    msg->setParentExecutionOrderId(executionOrderId);
74
    msg->setParentStartTime(now);
75
}
76

    
77
int cAsyncModule::scheduleAt(simtime_t t, cMessage *msg)
78
{
79
    setMessageMetaData(msg);
80

    
81
    //
82
    // check current state
83
    //
84
    if (executionState == asynchronous)
85
    {
86
        if (msg==NULL)
87
          throw cRuntimeError("scheduleAt(): message pointer is NULL");
88
        if (t < t_end)
89
                throw cRuntimeError("scheduleAt(): event cannot be scheduled at "
90
                                "%s because this precedes the end of the processing duration "
91
                                "at %s of the scheduling event.",
92
                                t.str().c_str(), t_end.str().c_str());
93

    
94
        // insert this message in the FES. When the scheduler arrives at this
95
        // message it will trigger the processing of the associated task
96
        msg->setSentFrom(this, -1, t_end);
97
        msg->setArrival(this, -1, t);
98

    
99

    
100
        simulation.msgQueue.insert(msg);
101
        return 0;
102
    }
103
    else // synch execution
104
    {
105
        return cSimpleModule::scheduleAt(t, msg);
106
    }
107
}
108

    
109

    
110
void cAsyncModule::callHandleAsyncMessage(cMessage* msg)
111
{
112
    //
113
    // set execution state
114
    //
115
    executionState = asynchronous;
116

    
117
    //
118
    // set the simtime for this event
119
    //
120
    cThreadPool::setSimTime(msg->getArrivalTime());
121
    // corresponds to setContextModule(mod);
122
    cThreadPool::setContext(this);
123
    cThreadPool::setDefaultOwner(this);
124

    
125
    //
126
    // update meta data
127
    //
128
    prepareHandleMessage(msg);
129

    
130
    //
131
    // call the actual method
132
    //
133
    handleMessage(msg);
134
}
135

    
136

    
137
void cAsyncModule::callHandleMessage(cMessage* msg)
138
{
139
    //
140
    // set execution state
141
    //
142
    executionState = synchronous;
143

    
144
    //
145
    // update meta data
146
    //
147
    prepareHandleMessage(msg);
148

    
149
    //
150
    // call the actual method
151
    //
152
    handleMessage(msg);
153
}
154

    
155

    
156
void cAsyncModule::prepareHandleMessage(cMessage* msg)
157
{
158
    //
159
    // set some internal state
160
    //
161
    t_end = msg->getArrivalTime() + msg->getEventDuration();
162
    now = msg->getArrivalTime();
163

    
164
    //
165
    // take ownership of the message only after the test above
166
    //
167
    this->take(msg);
168

    
169
    //
170
    // reset message counter
171
    //
172
    scheduledMessageCount = 0;
173

    
174
    executionOrderId = msg->getExecutionOrderId();
175
}
176

    
177
int cAsyncModule::send(cMessage *msg, int gateid)
178
{
179
    return sendDelayed(msg, 0.0, gateid);
180
}
181

    
182

    
183
int cAsyncModule::send(cMessage *msg, const char *gatename, int sn)
184
{
185
    return sendDelayed(msg, 0.0, gatename, sn);
186
}
187

    
188

    
189
int cAsyncModule::send(cMessage *msg, cGate *outputgate)
190
{
191
    return sendDelayed(msg, 0.0, outputgate);
192
}
193

    
194

    
195
int cAsyncModule::sendDelayed(cMessage *msg, simtime_t delay, int gateid)
196
{
197
    cGate *outgate = gate(gateid);
198
    if (outgate==NULL)
199
        throw cRuntimeError("send()/sendDelayed(): module has no gate #%d", gateid);
200

    
201
    return sendDelayed(msg, delay, outgate);
202
}
203

    
204

    
205
int cAsyncModule::sendDelayed(cMessage *msg, simtime_t delay, const char *gatename, int sn)
206
{
207
    cGate *outgate = gate(gatename, sn);
208
    if (outgate==NULL)
209
       throw cRuntimeError(sn<0 ? "send()/sendDelayed(): module has no gate `%s'":
210
                               "send()/sendDelayed(): module has no gate `%s[%d]'",gatename,sn);
211

    
212
    return sendDelayed(msg, delay, outgate);
213
}
214

    
215

    
216
int cAsyncModule::sendDelayed(cMessage *msg, simtime_t delay, cGate *outputgate)
217
{
218
    setMessageMetaData(msg);
219

    
220
    //
221
    // check current state
222
    //
223
    if (executionState == asynchronous)
224
    {
225
        //
226
        // error checking, omit ownership checks if it breaks
227
        //
228
        if (outputgate==NULL)
229
            throw cRuntimeError("send()/sendDelayed(): gate pointer is NULL");
230
        if (outputgate->getType()=='I')
231
            throw cRuntimeError("send()/sendDelayed(): cannot send via an input gate (`%s')",outputgate->getName());
232
        if (!outputgate->getNextGate())  // NOTE: without this error check, msg would become self-message
233
            throw cRuntimeError("send()/sendDelayed(): gate `%s' not connected",outputgate->getFullName());
234
        if (msg==NULL)
235
            throw cRuntimeError("send()/sendDelayed(): message pointer is NULL");
236
        if (msg->getOwner()!=this)
237
        {
238
            if (this!=simulation.getContext())
239
                throw cRuntimeError("send()/sendDelayed() of module (%s)%s called in the context of "
240
                                        "module (%s)%s: method called from the latter module "
241
                                        "lacks Enter_Method() or Enter_Method_Silent()? "
242
                                        "Also, if message to be sent is passed from that module, "
243
                                        "you'll need to call take(msg) after Enter_Method() as well",
244
                                        getClassName(), getFullPath().c_str(),
245
                                        simulation.getContext()->getClassName(),
246
                                        simulation.getContext()->getFullPath().c_str());
247
            else if (msg->getOwner()==&simulation.msgQueue && msg->isSelfMessage() && msg->getArrivalModuleId()==getId())
248
                throw cRuntimeError("send()/sendDelayed(): cannot send message (%s)%s, it is "
249
                                        "currently scheduled as a self-message for this module",
250
                                        msg->getClassName(), msg->getName());
251
            else if (msg->getOwner()==&simulation.msgQueue && msg->isSelfMessage())
252
                throw cRuntimeError("send()/sendDelayed(): cannot send message (%s)%s, it is "
253
                                        "currently scheduled as a self-message for ANOTHER module",
254
                                        msg->getClassName(), msg->getName());
255
            else if (msg->getOwner()==&simulation.msgQueue)
256
                throw cRuntimeError("send()/sendDelayed(): cannot send message (%s)%s, it is "
257
                                        "currently in scheduled-events, being underway between two modules",
258
                                        msg->getClassName(), msg->getName());
259
            else
260
                throw cRuntimeError("send()/sendDelayed(): cannot send message (%s)%s, "
261
                                        "it is currently contained/owned by (%s)%s",
262
                                        msg->getClassName(), msg->getName(), msg->getOwner()->getClassName(),
263
                                        msg->getOwner()->getFullPath().c_str());
264
        }
265

    
266
        simtime_t t = simTime();
267
        if (delay<0.0)
268
            throw cRuntimeError("sendDelayed(): negative delay %s",delay.str().c_str());
269
        if (t+delay<t_end) // TODO: Consider channel delay!
270
            throw cRuntimeError("sendDelayed(): send delay shorter than processing delay %s", delay.str().c_str());
271

    
272
        //
273
        // set message parameters and send it
274
        //
275
        msg->setSentFrom(this, outputgate->getId(), t+delay);
276
        bool keepit = outputgate->deliver(msg, t+delay);
277
        if (!keepit)
278
            delete msg;
279

    
280
        return 0;
281
    }
282
    else // syncExecution
283
    {
284
        return cSimpleModule::sendDelayed(msg, delay, outputgate);
285
    }
286
}
287

    
288

    
289
int cAsyncModule::sendDirect(cMessage *msg, simtime_t delay, simtime_t duration, cModule *mod, int inputgateid)
290
{
291
    cGate *togate = mod->gate(inputgateid);
292
    if (togate==NULL)
293
        throw cRuntimeError("sendDirect(): module `%s' has no gate #%d",
294
                                mod->getFullPath().c_str(), inputgateid);
295

    
296
    return sendDirect(msg, delay, duration, togate);
297
}
298

    
299

    
300
int cAsyncModule::sendDirect(cMessage *msg, simtime_t delay, simtime_t duration, cModule *mod, const char *inputgatename, int sn)
301
{
302
    if (!mod)
303
        throw cRuntimeError("sendDirect(): module ptr is NULL");
304
    cGate *togate = mod->gate(inputgatename,sn);
305
    if (togate==NULL)
306
        throw cRuntimeError(sn<0 ? "sendDirect(): module `%s' has no gate `%s'":
307
                                "sendDirect(): module `%s' has no gate `%s[%d]'",
308
                                mod->getFullPath().c_str(), inputgatename, sn);
309
    return sendDirect(msg, delay, duration, togate);
310
}
311

    
312

    
313
int cAsyncModule::sendDirect(cMessage *msg, simtime_t delay, simtime_t duration, cGate *togate)
314
{
315
    setMessageMetaData(msg);
316

    
317
    //
318
    // check current state
319
    //
320
    if (executionState == asynchronous)
321
    {
322
        //
323
        // error checking
324
        //
325
        // Note: it is permitted to send to an output gate. It is especially useful
326
        // with several submodules sending to a single output gate of their parent module.
327
        if (togate==NULL)
328
                throw cRuntimeError("sendDirect(): destination gate pointer is NULL");
329
        if (togate->getPreviousGate())
330
                throw cRuntimeError("sendDirect(): module must have dedicated gate(s) for receiving via sendDirect()"
331
                                                                " (\"from\" side of dest. gate `%s' should NOT be connected)",togate->getFullPath().c_str());
332
        if (msg==NULL)
333
                throw cRuntimeError("sendDirect(): message pointer is NULL");
334
        if (msg->getOwner()!=this)
335
        {
336
                if (this!=simulation.getContext())
337
                        throw cRuntimeError("sendDirect() of module (%s)%s called in the context of "
338
                                                                        "module (%s)%s: method called from the latter module "
339
                                                                        "lacks Enter_Method() or Enter_Method_Silent()? "
340
                                                                        "Also, if message to be sent is passed from that module, "
341
                                                                        "you'll need to call take(msg) after Enter_Method() as well",
342
                                                                        getClassName(), getFullPath().c_str(),
343
                                                                        simulation.getContext()->getClassName(),
344
                                                                        simulation.getContext()->getFullPath().c_str());
345
                else if (msg->getOwner()==&simulation.msgQueue && msg->isSelfMessage() && msg->getArrivalModuleId()==getId())
346
                        throw cRuntimeError("sendDirect(): cannot send message (%s)%s, it is "
347
                                                                        "currently scheduled as a self-message for this module",
348
                                                                        msg->getClassName(), msg->getName());
349
                else if (msg->getOwner()==&simulation.msgQueue && msg->isSelfMessage())
350
                        throw cRuntimeError("sendDirect(): cannot send message (%s)%s, it is "
351
                                                                        "currently scheduled as a self-message for ANOTHER module",
352
                                                                        msg->getClassName(), msg->getName());
353
                else if (msg->getOwner()==&simulation.msgQueue)
354
                        throw cRuntimeError("sendDirect(): cannot send message (%s)%s, it is "
355
                                                                        "currently in scheduled-events, being underway between two modules",
356
                                                                        msg->getClassName(), msg->getName());
357
                else
358
                        throw cRuntimeError("sendDirect(): cannot send message (%s)%s, "
359
                                                                        "it is currently contained/owned by (%s)%s",
360
                                                                        msg->getClassName(), msg->getName(), msg->getOwner()->getClassName(),
361
                                                                        msg->getOwner()->getFullPath().c_str());
362
        }
363

    
364
        // set message parameters and send it
365
        simtime_t time = simTime();
366
        if (time + delay < t_end) // TODO: Consider channel delay!
367
            throw cRuntimeError("sendDelayed(): send delay shorter than processing delay %s",delay.str().c_str());
368

    
369
        msg->setSentFrom(this, -1, time);
370

    
371
        bool keepit = togate->deliver(msg, time + delay);
372
        if (!keepit)
373
                delete msg;
374
        return 0;
375
    }
376
    else  // sync execution
377
    {
378
            return cSimpleModule::sendDirect(msg, delay, duration, togate);
379
    }
380
}