Project

General

Profile

Statistics
| Branch: | Revision:

root / src / sim / casyncmodule.cc @ a3be1d55

History | View | Annotate | Download (14.6 KB)

1 01873262 Georg Kunz
//=========================================================================
2
//  CASYNCMODULE.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//
8
//   Member functions of
9
//    cAsyncModule : derived class of cSimpleModule for asynchronous event handling
10
//
11
//  Author: Georg Kunz
12
//
13
//=========================================================================
14
15
/*--------------------------------------------------------------*
16
  Copyright (C) 2009 Georg Kunz
17

18
  This file is distributed WITHOUT ANY WARRANTY. See the file
19
  `license' for details on this and other legal matters.
20
*--------------------------------------------------------------*/
21
22
#include <cstring>
23
#include <stdlib.h>
24
25
#include "casyncmodule.h"
26
#include "cthreadpool.h"
27
#include "cconfiguration.h"
28
#include "cconfigoption.h"
29
30
31
Register_PerRunConfigOption(CFGID_ASYNCMODULE_PARZERODUR, "parallelize-zero-duration-events", CFG_BOOL, "false", "Parallelize Events with zero duration");
32
33
34
cAsyncModule::cAsyncModule(const char *name, cModule *parent, unsigned stacksize) :
35
                t_end(0.0)
36
{
37
    AO_store(&execState, 0);
38
39
    //
40
        // how many local random number generators do we need? This should become a
41
        // special parameter of every asyncmodule. Not yet implemented.
42
        //
43
        unsigned nrNumGens = 1;
44
        /*
45
         cPar *p = (cPar*)paramv.get("number-local-rngs");
46
         if (p)
47
         nrNumGens = *p;
48

49
         std::cout << "number-local-rngs in " << fullName() << ": " << nrNumGens << std::endl;
50
         */
51
        numGen = new cNumberGenerator(nrNumGens);
52
53
        //
54
        // find out if events with 0 durations also be parallelized?
55
        //
56
        parZeroDur = ev.getConfig()->getAsBool(CFGID_ASYNCMODULE_PARZERODUR);
57
58
        //
59
        // init state flag to non-busy since no thread is active yet
60
        //
61
        AO_store(&busy, 0);
62
}
63
64
65
cAsyncModule::~cAsyncModule()
66
{
67
  delete numGen;
68
}
69
70
71
int cAsyncModule::scheduleAt(simtime_t t, cMessage *msg)
72
{
73
    //
74
    // check current state
75
    //
76
    AO_t currentState = AO_load_acquire(&execState);
77
    if (currentState == asyncExecution)
78
    {
79
        if (msg==NULL)
80
          throw cRuntimeError("scheduleAt(): message pointer is NULL");
81
        if (t < t_end)
82
                throw cRuntimeError("scheduleAt(): event cannot be scheduled at "
83
                                "%s because this precedes the end of the processing duration "
84
                                "at %s of the scheduling event.",
85
                                t.str().c_str(), t_end.str().c_str());
86
87
        // insert this message in the FES. When the scheduler arrives at this
88
        // message it will trigger the processing of the associated task
89
        msg->setSentFrom(this, -1, t_end);
90
        msg->setArrival(this, -1, t);
91
        simulation.msgQueue.insert(msg);
92
        return 0;
93
    }
94
    else // synch execution
95
    {
96
        return cSimpleModule::scheduleAt(t, msg);
97
    }
98
}
99
100
101
void cAsyncModule::callHandleAsyncMessage(cMessage* msg)
102
{
103
    //
104
    // check if an event is being processed currently
105
    //
106
    AO_t oldState, newState;
107
    AO_store_write(&newState, asyncExecution);
108
    AO_store_write(&oldState, idleExecution);
109
110
    int res = AO_compare_and_swap_full(&execState, oldState, newState);
111
    if (res == 0)
112
    {
113
        throw cRuntimeError(this, "Race condition detected: Cannot process "
114
                                      "asynchronous event - already processing "
115
                                      "event in module %s", this->getName());
116
    }
117
118
    //
119
    // set some internal state
120
    //
121
    t_end = msg->getArrivalTime() + msg->getEventDuration();
122
    // set the simtime for this event
123
    //
124
    cThreadPool::setSimTime(msg->getArrivalTime());
125
    // corresponds to setContextModule(mod);
126
    cThreadPool::setContext(this);
127
    cThreadPool::setDefaultOwner(this);
128
129
    // take ownership of the message only after the test above
130
    this->take(msg);
131
132
    //
133
    // call the actual method
134
    //
135
    handleMessage(msg);
136
137
    AO_store_release(&execState, idleExecution);
138
139
}
140
141
142
void cAsyncModule::callHandleMessage(cMessage* msg)
143
{
144
    //
145
    // check if an event is being processed currently
146
    //
147
    AO_t oldState, newState;
148
    AO_store_write(&newState, syncExecution);
149
    AO_store_write(&oldState, idleExecution);
150
151
    int res = AO_compare_and_swap_full(&execState, oldState, newState);
152
    if (res == 0)
153
    {
154
        throw cRuntimeError(this, "Race condition detected: Cannot process "
155
                                      "synchronous event - already processing "
156
                                      "event in module %s", this->getName());
157
    }
158
159
    t_end = msg->getArrivalTime() + msg->getEventDuration();
160
161
    //
162
    // take ownership of the message only after the test above
163
    //
164
    this->take(msg);
165
166
    //
167
    // call the actual method
168
    //
169
    handleMessage(msg);
170
171
    AO_store_release(&execState, idleExecution);
172
}
173
174
175
int cAsyncModule::send(cMessage *msg, int gateid)
176
{
177
    return sendDelayed(msg, 0.0, gateid);
178
}
179
180
181
int cAsyncModule::send(cMessage *msg, const char *gatename, int sn)
182
{
183
    return sendDelayed(msg, 0.0, gatename, sn);
184
}
185
186
187
int cAsyncModule::send(cMessage *msg, cGate *outputgate)
188
{
189
    return sendDelayed(msg, 0.0, outputgate);
190
}
191
192
193
int cAsyncModule::sendDelayed(cMessage *msg, simtime_t delay, int gateid)
194
{
195
    cGate *outgate = gate(gateid);
196
    if (outgate==NULL)
197
        throw cRuntimeError("send()/sendDelayed(): module has no gate #%d", gateid);
198
199
    return sendDelayed(msg, delay, outgate);
200
}
201
202
203
int cAsyncModule::sendDelayed(cMessage *msg, simtime_t delay, const char *gatename, int sn)
204
{
205
    cGate *outgate = gate(gatename, sn);
206
    if (outgate==NULL)
207
       throw cRuntimeError(sn<0 ? "send()/sendDelayed(): module has no gate `%s'":
208
                               "send()/sendDelayed(): module has no gate `%s[%d]'",gatename,sn);
209
210
    return sendDelayed(msg, delay, outgate);
211
}
212
213
214
int cAsyncModule::sendDelayed(cMessage *msg, simtime_t delay, cGate *outputgate)
215
{
216
    //
217
    // check current state
218
    //
219
    AO_t currentState = AO_load_acquire(&execState);
220
    if (currentState == asyncExecution)
221
    {
222
        //
223
        // error checking, omit ownership checks if it breaks
224
        //
225
        if (outputgate==NULL)
226
            throw cRuntimeError("send()/sendDelayed(): gate pointer is NULL");
227
        if (outputgate->getType()=='I')
228
            throw cRuntimeError("send()/sendDelayed(): cannot send via an input gate (`%s')",outputgate->getName());
229
        if (!outputgate->getNextGate())  // NOTE: without this error check, msg would become self-message
230
            throw cRuntimeError("send()/sendDelayed(): gate `%s' not connected",outputgate->getFullName());
231
        if (msg==NULL)
232
            throw cRuntimeError("send()/sendDelayed(): message pointer is NULL");
233
        if (msg->getOwner()!=this)
234
        {
235
            if (this!=simulation.getContext())
236
                throw cRuntimeError("send()/sendDelayed() of module (%s)%s called in the context of "
237
                                        "module (%s)%s: method called from the latter module "
238
                                        "lacks Enter_Method() or Enter_Method_Silent()? "
239
                                        "Also, if message to be sent is passed from that module, "
240
                                        "you'll need to call take(msg) after Enter_Method() as well",
241
                                        getClassName(), getFullPath().c_str(),
242
                                        simulation.getContext()->getClassName(),
243
                                        simulation.getContext()->getFullPath().c_str());
244
            else if (msg->getOwner()==&simulation.msgQueue && msg->isSelfMessage() && msg->getArrivalModuleId()==getId())
245
                throw cRuntimeError("send()/sendDelayed(): cannot send message (%s)%s, it is "
246
                                        "currently scheduled as a self-message for this module",
247
                                        msg->getClassName(), msg->getName());
248
            else if (msg->getOwner()==&simulation.msgQueue && msg->isSelfMessage())
249
                throw cRuntimeError("send()/sendDelayed(): cannot send message (%s)%s, it is "
250
                                        "currently scheduled as a self-message for ANOTHER module",
251
                                        msg->getClassName(), msg->getName());
252
            else if (msg->getOwner()==&simulation.msgQueue)
253
                throw cRuntimeError("send()/sendDelayed(): cannot send message (%s)%s, it is "
254
                                        "currently in scheduled-events, being underway between two modules",
255
                                        msg->getClassName(), msg->getName());
256
            else
257
                throw cRuntimeError("send()/sendDelayed(): cannot send message (%s)%s, "
258
                                        "it is currently contained/owned by (%s)%s",
259
                                        msg->getClassName(), msg->getName(), msg->getOwner()->getClassName(),
260
                                        msg->getOwner()->getFullPath().c_str());
261
        }
262
263
        simtime_t t = simTime();
264
        if (delay<0.0)
265
            throw cRuntimeError("sendDelayed(): negative delay %s",delay.str().c_str());
266
        if (t+delay<t_end) // TODO: Consider channel delay!
267
            throw cRuntimeError("sendDelayed(): send delay shorter than processing delay %s", delay.str().c_str());
268
269
        //
270
        // set message parameters and send it
271
        //
272
        msg->setSentFrom(this, outputgate->getId(), t+delay);
273
        bool keepit = outputgate->deliver(msg, t+delay);
274
        if (!keepit)
275
            delete msg;
276
277
        return 0;
278
    }
279
    else // syncExecution
280
    {
281
        return cSimpleModule::sendDelayed(msg, delay, outputgate);
282
    }
283
}
284
285
286
int cAsyncModule::sendDirect(cMessage *msg, simtime_t delay, simtime_t duration, cModule *mod, int inputgateid)
287
{
288
    cGate *togate = mod->gate(inputgateid);
289
    if (togate==NULL)
290
        throw cRuntimeError("sendDirect(): module `%s' has no gate #%d",
291
                                mod->getFullPath().c_str(), inputgateid);
292
293
    return sendDirect(msg, delay, duration, togate);
294
}
295
296
297
int cAsyncModule::sendDirect(cMessage *msg, simtime_t delay, simtime_t duration, cModule *mod, const char *inputgatename, int sn)
298
{
299
    if (!mod)
300
        throw cRuntimeError("sendDirect(): module ptr is NULL");
301
    cGate *togate = mod->gate(inputgatename,sn);
302
    if (togate==NULL)
303
        throw cRuntimeError(sn<0 ? "sendDirect(): module `%s' has no gate `%s'":
304
                                "sendDirect(): module `%s' has no gate `%s[%d]'",
305
                                mod->getFullPath().c_str(), inputgatename, sn);
306
    return sendDirect(msg, delay, duration, togate);
307
}
308
309
310
int cAsyncModule::sendDirect(cMessage *msg, simtime_t delay, simtime_t duration, cGate *togate)
311
{
312
    //
313
    // check current state
314
    //
315
    AO_t currentState = AO_load_acquire(&execState);
316
    if (currentState == asyncExecution)
317
    {
318
        //
319
        // error checking
320
        //
321
        // Note: it is permitted to send to an output gate. It is especially useful
322
        // with several submodules sending to a single output gate of their parent module.
323
        if (togate==NULL)
324
                throw cRuntimeError("sendDirect(): destination gate pointer is NULL");
325
        if (togate->getPreviousGate())
326
                throw cRuntimeError("sendDirect(): module must have dedicated gate(s) for receiving via sendDirect()"
327
                                                                " (\"from\" side of dest. gate `%s' should NOT be connected)",togate->getFullPath().c_str());
328
        if (msg==NULL)
329
                throw cRuntimeError("sendDirect(): message pointer is NULL");
330
        if (msg->getOwner()!=this)
331
        {
332
                if (this!=simulation.getContext())
333
                        throw cRuntimeError("sendDirect() of module (%s)%s called in the context of "
334
                                                                        "module (%s)%s: method called from the latter module "
335
                                                                        "lacks Enter_Method() or Enter_Method_Silent()? "
336
                                                                        "Also, if message to be sent is passed from that module, "
337
                                                                        "you'll need to call take(msg) after Enter_Method() as well",
338
                                                                        getClassName(), getFullPath().c_str(),
339
                                                                        simulation.getContext()->getClassName(),
340
                                                                        simulation.getContext()->getFullPath().c_str());
341
                else if (msg->getOwner()==&simulation.msgQueue && msg->isSelfMessage() && msg->getArrivalModuleId()==getId())
342
                        throw cRuntimeError("sendDirect(): cannot send message (%s)%s, it is "
343
                                                                        "currently scheduled as a self-message for this module",
344
                                                                        msg->getClassName(), msg->getName());
345
                else if (msg->getOwner()==&simulation.msgQueue && msg->isSelfMessage())
346
                        throw cRuntimeError("sendDirect(): cannot send message (%s)%s, it is "
347
                                                                        "currently scheduled as a self-message for ANOTHER module",
348
                                                                        msg->getClassName(), msg->getName());
349
                else if (msg->getOwner()==&simulation.msgQueue)
350
                        throw cRuntimeError("sendDirect(): cannot send message (%s)%s, it is "
351
                                                                        "currently in scheduled-events, being underway between two modules",
352
                                                                        msg->getClassName(), msg->getName());
353
                else
354
                        throw cRuntimeError("sendDirect(): cannot send message (%s)%s, "
355
                                                                        "it is currently contained/owned by (%s)%s",
356
                                                                        msg->getClassName(), msg->getName(), msg->getOwner()->getClassName(),
357
                                                                        msg->getOwner()->getFullPath().c_str());
358
        }
359
360
        // set message parameters and send it
361
        simtime_t time = simTime();
362
        if (time + delay < t_end) // TODO: Consider channel delay!
363
            throw cRuntimeError("sendDelayed(): send delay shorter than processing delay %s",delay.str().c_str());
364
365
        msg->setSentFrom(this, -1, time);
366
367
        bool keepit = togate->deliver(msg, time + delay);
368
        if (!keepit)
369
                delete msg;
370
        return 0;
371
    }
372
    else  // sync execution
373
    {
374
            return cSimpleModule::sendDirect(msg, delay, duration, togate);
375
    }
376
}