Project

General

Profile

Statistics
| Branch: | Revision:

root / src / sim / casyncmodule.cc @ 96e929a8

History | View | Annotate | Download (14.7 KB)

1
//=========================================================================
2
//  CASYNCMODULE.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//
8
//   Member functions of
9
//    cAsyncModule : derived class of cSimpleModule for asynchronous event handling
10
//
11
//  Author: Georg Kunz
12
//
13
//=========================================================================
14

    
15
/*--------------------------------------------------------------*
16
  Copyright (C) 2009 Georg Kunz
17

18
  This file is distributed WITHOUT ANY WARRANTY. See the file
19
  `license' for details on this and other legal matters.
20
*--------------------------------------------------------------*/
21

    
22
#include <cstring>
23
#include <stdlib.h>
24

    
25
#include "casyncmodule.h"
26
#include "cthreadpool.h"
27
#include "cconfiguration.h"
28
#include "cconfigoption.h"
29

    
30

    
31
Register_PerRunConfigOption(CFGID_ASYNCMODULE_PARZERODUR, "parallelize-zero-duration-events", CFG_BOOL, "false", "Parallelize Events with zero duration");
32

    
33

    
34
cAsyncModule::cAsyncModule(const char *name, cModule *parent, unsigned stacksize) :
35
                t_end(0.0)
36
{
37
    AO_store(&execState, 0);
38

    
39
    //
40
        // how many local random number generators do we need? This should become a
41
        // special parameter of every asyncmodule. Not yet implemented.
42
        //
43
        unsigned nrNumGens = 1;
44
        /*
45
         cPar *p = (cPar*)paramv.get("number-local-rngs");
46
         if (p)
47
         nrNumGens = *p;
48

49
         std::cout << "number-local-rngs in " << fullName() << ": " << nrNumGens << std::endl;
50
         */
51
        numGen = new cNumberGenerator(nrNumGens);
52

    
53
        //
54
        // find out if events with 0 durations also be parallelized?
55
        //
56
        parZeroDur = ev.getConfig()->getAsBool(CFGID_ASYNCMODULE_PARZERODUR);
57

    
58
        //
59
        // init state flag to non-busy since no thread is active yet
60
        //
61
        AO_store(&busy, 0);
62
}
63

    
64

    
65
cAsyncModule::~cAsyncModule()
66
{
67
  delete numGen;
68
}
69

    
70

    
71
int cAsyncModule::scheduleAt(simtime_t t, cMessage *msg)
72
{
73
    msg->setEventDuration(-1);
74

    
75
    //
76
    // check current state
77
    //
78
    AO_t currentState = AO_load_acquire(&execState);
79
    if (currentState == asyncExecution)
80
    {
81
        if (msg==NULL)
82
          throw cRuntimeError("scheduleAt(): message pointer is NULL");
83
        if (t < t_end)
84
                throw cRuntimeError("scheduleAt(): event cannot be scheduled at "
85
                                "%s because this precedes the end of the processing duration "
86
                                "at %s of the scheduling event.",
87
                                t.str().c_str(), t_end.str().c_str());
88

    
89
        // insert this message in the FES. When the scheduler arrives at this
90
        // message it will trigger the processing of the associated task
91
        msg->setSentFrom(this, -1, t_end);
92
        msg->setArrival(this, -1, t);
93

    
94

    
95
        simulation.msgQueue.insert(msg);
96
        return 0;
97
    }
98
    else // synch execution
99
    {
100
        return cSimpleModule::scheduleAt(t, msg);
101
    }
102
}
103

    
104

    
105
void cAsyncModule::callHandleAsyncMessage(cMessage* msg)
106
{
107
    //
108
    // check if an event is being processed currently
109
    //
110
    AO_t oldState, newState;
111
    AO_store_write(&newState, asyncExecution);
112
    AO_store_write(&oldState, idleExecution);
113

    
114
    int res = AO_compare_and_swap_full(&execState, oldState, newState);
115
    if (res == 0)
116
    {
117
        throw cRuntimeError(this, "Race condition detected: Cannot process "
118
                                      "asynchronous event - already processing "
119
                                      "event in module %s", this->getName());
120
    }
121

    
122
    //
123
    // set some internal state
124
    //
125
    t_end = msg->getArrivalTime() + msg->getEventDuration();
126
    // set the simtime for this event
127
    //
128
    cThreadPool::setSimTime(msg->getArrivalTime());
129
    // corresponds to setContextModule(mod);
130
    cThreadPool::setContext(this);
131
    cThreadPool::setDefaultOwner(this);
132

    
133
    // take ownership of the message only after the test above
134
    this->take(msg);
135

    
136
    //
137
    // call the actual method
138
    //
139
    handleMessage(msg);
140

    
141
    AO_store_release(&execState, idleExecution);
142

    
143
}
144

    
145

    
146
void cAsyncModule::callHandleMessage(cMessage* msg)
147
{
148
    //
149
    // check if an event is being processed currently
150
    //
151
    AO_t oldState, newState;
152
    AO_store_write(&newState, syncExecution);
153
    AO_store_write(&oldState, idleExecution);
154

    
155
    int res = AO_compare_and_swap_full(&execState, oldState, newState);
156
    if (res == 0)
157
    {
158
        throw cRuntimeError(this, "Race condition detected: Cannot process "
159
                                      "synchronous event - already processing "
160
                                      "event in module %s", this->getName());
161
    }
162

    
163
    t_end = msg->getArrivalTime() + msg->getEventDuration();
164

    
165
    //
166
    // take ownership of the message only after the test above
167
    //
168
    this->take(msg);
169

    
170
    //
171
    // call the actual method
172
    //
173
    handleMessage(msg);
174

    
175
    AO_store_release(&execState, idleExecution);
176
}
177

    
178

    
179
int cAsyncModule::send(cMessage *msg, int gateid)
180
{
181
    return sendDelayed(msg, 0.0, gateid);
182
}
183

    
184

    
185
int cAsyncModule::send(cMessage *msg, const char *gatename, int sn)
186
{
187
    return sendDelayed(msg, 0.0, gatename, sn);
188
}
189

    
190

    
191
int cAsyncModule::send(cMessage *msg, cGate *outputgate)
192
{
193
    return sendDelayed(msg, 0.0, outputgate);
194
}
195

    
196

    
197
int cAsyncModule::sendDelayed(cMessage *msg, simtime_t delay, int gateid)
198
{
199
    cGate *outgate = gate(gateid);
200
    if (outgate==NULL)
201
        throw cRuntimeError("send()/sendDelayed(): module has no gate #%d", gateid);
202

    
203
    return sendDelayed(msg, delay, outgate);
204
}
205

    
206

    
207
int cAsyncModule::sendDelayed(cMessage *msg, simtime_t delay, const char *gatename, int sn)
208
{
209
    cGate *outgate = gate(gatename, sn);
210
    if (outgate==NULL)
211
       throw cRuntimeError(sn<0 ? "send()/sendDelayed(): module has no gate `%s'":
212
                               "send()/sendDelayed(): module has no gate `%s[%d]'",gatename,sn);
213

    
214
    return sendDelayed(msg, delay, outgate);
215
}
216

    
217

    
218
int cAsyncModule::sendDelayed(cMessage *msg, simtime_t delay, cGate *outputgate)
219
{
220
    msg->setEventDuration(-1);
221

    
222
    //
223
    // check current state
224
    //
225
    AO_t currentState = AO_load_acquire(&execState);
226
    if (currentState == asyncExecution)
227
    {
228
        //
229
        // error checking, omit ownership checks if it breaks
230
        //
231
        if (outputgate==NULL)
232
            throw cRuntimeError("send()/sendDelayed(): gate pointer is NULL");
233
        if (outputgate->getType()=='I')
234
            throw cRuntimeError("send()/sendDelayed(): cannot send via an input gate (`%s')",outputgate->getName());
235
        if (!outputgate->getNextGate())  // NOTE: without this error check, msg would become self-message
236
            throw cRuntimeError("send()/sendDelayed(): gate `%s' not connected",outputgate->getFullName());
237
        if (msg==NULL)
238
            throw cRuntimeError("send()/sendDelayed(): message pointer is NULL");
239
        if (msg->getOwner()!=this)
240
        {
241
            if (this!=simulation.getContext())
242
                throw cRuntimeError("send()/sendDelayed() of module (%s)%s called in the context of "
243
                                        "module (%s)%s: method called from the latter module "
244
                                        "lacks Enter_Method() or Enter_Method_Silent()? "
245
                                        "Also, if message to be sent is passed from that module, "
246
                                        "you'll need to call take(msg) after Enter_Method() as well",
247
                                        getClassName(), getFullPath().c_str(),
248
                                        simulation.getContext()->getClassName(),
249
                                        simulation.getContext()->getFullPath().c_str());
250
            else if (msg->getOwner()==&simulation.msgQueue && msg->isSelfMessage() && msg->getArrivalModuleId()==getId())
251
                throw cRuntimeError("send()/sendDelayed(): cannot send message (%s)%s, it is "
252
                                        "currently scheduled as a self-message for this module",
253
                                        msg->getClassName(), msg->getName());
254
            else if (msg->getOwner()==&simulation.msgQueue && msg->isSelfMessage())
255
                throw cRuntimeError("send()/sendDelayed(): cannot send message (%s)%s, it is "
256
                                        "currently scheduled as a self-message for ANOTHER module",
257
                                        msg->getClassName(), msg->getName());
258
            else if (msg->getOwner()==&simulation.msgQueue)
259
                throw cRuntimeError("send()/sendDelayed(): cannot send message (%s)%s, it is "
260
                                        "currently in scheduled-events, being underway between two modules",
261
                                        msg->getClassName(), msg->getName());
262
            else
263
                throw cRuntimeError("send()/sendDelayed(): cannot send message (%s)%s, "
264
                                        "it is currently contained/owned by (%s)%s",
265
                                        msg->getClassName(), msg->getName(), msg->getOwner()->getClassName(),
266
                                        msg->getOwner()->getFullPath().c_str());
267
        }
268

    
269
        simtime_t t = simTime();
270
        if (delay<0.0)
271
            throw cRuntimeError("sendDelayed(): negative delay %s",delay.str().c_str());
272
        if (t+delay<t_end) // TODO: Consider channel delay!
273
            throw cRuntimeError("sendDelayed(): send delay shorter than processing delay %s", delay.str().c_str());
274

    
275
        //
276
        // set message parameters and send it
277
        //
278
        msg->setSentFrom(this, outputgate->getId(), t+delay);
279
        bool keepit = outputgate->deliver(msg, t+delay);
280
        if (!keepit)
281
            delete msg;
282

    
283
        return 0;
284
    }
285
    else // syncExecution
286
    {
287
        return cSimpleModule::sendDelayed(msg, delay, outputgate);
288
    }
289
}
290

    
291

    
292
int cAsyncModule::sendDirect(cMessage *msg, simtime_t delay, simtime_t duration, cModule *mod, int inputgateid)
293
{
294
    cGate *togate = mod->gate(inputgateid);
295
    if (togate==NULL)
296
        throw cRuntimeError("sendDirect(): module `%s' has no gate #%d",
297
                                mod->getFullPath().c_str(), inputgateid);
298

    
299
    return sendDirect(msg, delay, duration, togate);
300
}
301

    
302

    
303
int cAsyncModule::sendDirect(cMessage *msg, simtime_t delay, simtime_t duration, cModule *mod, const char *inputgatename, int sn)
304
{
305
    if (!mod)
306
        throw cRuntimeError("sendDirect(): module ptr is NULL");
307
    cGate *togate = mod->gate(inputgatename,sn);
308
    if (togate==NULL)
309
        throw cRuntimeError(sn<0 ? "sendDirect(): module `%s' has no gate `%s'":
310
                                "sendDirect(): module `%s' has no gate `%s[%d]'",
311
                                mod->getFullPath().c_str(), inputgatename, sn);
312
    return sendDirect(msg, delay, duration, togate);
313
}
314

    
315

    
316
int cAsyncModule::sendDirect(cMessage *msg, simtime_t delay, simtime_t duration, cGate *togate)
317
{
318

    
319
    msg->setEventDuration(-1);
320

    
321
    //
322
    // check current state
323
    //
324
    AO_t currentState = AO_load_acquire(&execState);
325
    if (currentState == asyncExecution)
326
    {
327
        //
328
        // error checking
329
        //
330
        // Note: it is permitted to send to an output gate. It is especially useful
331
        // with several submodules sending to a single output gate of their parent module.
332
        if (togate==NULL)
333
                throw cRuntimeError("sendDirect(): destination gate pointer is NULL");
334
        if (togate->getPreviousGate())
335
                throw cRuntimeError("sendDirect(): module must have dedicated gate(s) for receiving via sendDirect()"
336
                                                                " (\"from\" side of dest. gate `%s' should NOT be connected)",togate->getFullPath().c_str());
337
        if (msg==NULL)
338
                throw cRuntimeError("sendDirect(): message pointer is NULL");
339
        if (msg->getOwner()!=this)
340
        {
341
                if (this!=simulation.getContext())
342
                        throw cRuntimeError("sendDirect() of module (%s)%s called in the context of "
343
                                                                        "module (%s)%s: method called from the latter module "
344
                                                                        "lacks Enter_Method() or Enter_Method_Silent()? "
345
                                                                        "Also, if message to be sent is passed from that module, "
346
                                                                        "you'll need to call take(msg) after Enter_Method() as well",
347
                                                                        getClassName(), getFullPath().c_str(),
348
                                                                        simulation.getContext()->getClassName(),
349
                                                                        simulation.getContext()->getFullPath().c_str());
350
                else if (msg->getOwner()==&simulation.msgQueue && msg->isSelfMessage() && msg->getArrivalModuleId()==getId())
351
                        throw cRuntimeError("sendDirect(): cannot send message (%s)%s, it is "
352
                                                                        "currently scheduled as a self-message for this module",
353
                                                                        msg->getClassName(), msg->getName());
354
                else if (msg->getOwner()==&simulation.msgQueue && msg->isSelfMessage())
355
                        throw cRuntimeError("sendDirect(): cannot send message (%s)%s, it is "
356
                                                                        "currently scheduled as a self-message for ANOTHER module",
357
                                                                        msg->getClassName(), msg->getName());
358
                else if (msg->getOwner()==&simulation.msgQueue)
359
                        throw cRuntimeError("sendDirect(): cannot send message (%s)%s, it is "
360
                                                                        "currently in scheduled-events, being underway between two modules",
361
                                                                        msg->getClassName(), msg->getName());
362
                else
363
                        throw cRuntimeError("sendDirect(): cannot send message (%s)%s, "
364
                                                                        "it is currently contained/owned by (%s)%s",
365
                                                                        msg->getClassName(), msg->getName(), msg->getOwner()->getClassName(),
366
                                                                        msg->getOwner()->getFullPath().c_str());
367
        }
368

    
369
        // set message parameters and send it
370
        simtime_t time = simTime();
371
        if (time + delay < t_end) // TODO: Consider channel delay!
372
            throw cRuntimeError("sendDelayed(): send delay shorter than processing delay %s",delay.str().c_str());
373

    
374
        msg->setSentFrom(this, -1, time);
375

    
376

    
377

    
378
        bool keepit = togate->deliver(msg, time + delay);
379
        if (!keepit)
380
                delete msg;
381
        return 0;
382
    }
383
    else  // sync execution
384
    {
385
            return cSimpleModule::sendDirect(msg, delay, duration, togate);
386
    }
387
}