Project

General

Profile

Statistics
| Branch: | Revision:

root / src / sim / parsim / cnullmessageprot.cc @ 81ad8b66

History | View | Annotate | Download (10.5 KB)

1
//=========================================================================
2
//  CNULLMESSAGEPROT.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//  Author: Andras Varga, 2003
8
//          Dept. of Electrical and Computer Systems Engineering,
9
//          Monash University, Melbourne, Australia
10
//
11
//=========================================================================
12

    
13
/*--------------------------------------------------------------*
14
  Copyright (C) 2003-2008 Andras Varga
15
  Copyright (C) 2006-2008 OpenSim Ltd.
16

17
  This file is distributed WITHOUT ANY WARRANTY. See the file
18
  `license' for details on this and other legal matters.
19
*--------------------------------------------------------------*/
20

    
21

    
22
#include "cmessage.h"
23
#include "cmodule.h"
24
#include "cgate.h"
25
#include "cenvir.h"
26
#include "cconfiguration.h"
27
#include "cnullmessageprot.h"
28
#include "clinkdelaylookahead.h"
29
#include "cparsimpartition.h"
30
#include "cparsimcomm.h"
31
#include "ccommbuffer.h"
32
#include "messagetags.h"
33
#include "globals.h"
34
#include "cconfigoption.h"
35
#include "regmacros.h"
36
#include "cplaceholdermod.h"
37
#include "cproxygate.h"
38
#include "cchannel.h"
39

    
40
USING_NAMESPACE
41

    
42
Register_Class(cNullMessageProtocol);
43

    
44
Register_GlobalConfigOption(CFGID_PARSIM_NULLMESSAGEPROTOCOL_LOOKAHEAD_CLASS, "parsim-nullmessageprotocol-lookahead-class", CFG_STRING, "cLinkDelayLookahead", "When cNullMessageProtocol is selected as parsim synchronization class: specifies the C++ class that calculates lookahead. The class should subclass from cNMPLookahead.");
45
Register_GlobalConfigOption(CFGID_PARSIM_NULLMESSAGEPROTOCOL_LAZINESS, "parsim-nullmessageprotocol-laziness", CFG_DOUBLE, "0.5", "When cNullMessageProtocol is selected as parsim synchronization class: specifies the laziness of sending null messages. Values in the range [0,1) are accepted. Laziness=0 causes null messages to be sent out immediately as a new EOT is learned, which may result in excessive null message traffic.");
46
extern cConfigOption *CFGID_PARSIM_DEBUG; // registered in cparsimpartition.cc
47

    
48

    
49
cNullMessageProtocol::cNullMessageProtocol() : cParsimProtocolBase()
50
{
51
    numSeg = 0;
52
    segInfo = NULL;
53

    
54
    debug = ev.getConfig()->getAsBool(CFGID_PARSIM_DEBUG);
55
    std::string lookhClass = ev.getConfig()->getAsString(CFGID_PARSIM_NULLMESSAGEPROTOCOL_LOOKAHEAD_CLASS);
56
    lookaheadcalc = dynamic_cast<cNMPLookahead *>(createOne(lookhClass.c_str()));
57
    if (!lookaheadcalc) \
58
         throw cRuntimeError("Class \"%s\" is not subclassed from cNMPLookahead", lookhClass.c_str());
59
    laziness = ev.getConfig()->getAsDouble(CFGID_PARSIM_NULLMESSAGEPROTOCOL_LAZINESS);
60
}
61

    
62
cNullMessageProtocol::~cNullMessageProtocol()
63
{
64
    delete lookaheadcalc;
65

    
66
    // segInfo[*].eitEvent/eotEvent messages already deleted with msgQueue.clear()
67
    delete [] segInfo;
68
}
69

    
70
void cNullMessageProtocol::setContext(cSimulation *sim, cParsimPartition *seg, cParsimCommunications *co)
71
{
72
    cParsimProtocolBase::setContext(sim, seg, co);
73
    lookaheadcalc->setContext(sim, seg, co);
74
}
75

    
76
void cNullMessageProtocol::startRun()
77
{
78
    ev << "starting Null Message Protocol...\n";
79

    
80
    delete [] segInfo;
81

    
82
    numSeg = comm->getNumPartitions();
83
    segInfo = new PartitionInfo[numSeg];
84
    int myProcId = comm->getProcId();
85

    
86
    char buf[30];
87
    int i;
88

    
89
    // temporarily initialize everything to zero.
90
    for (i=0; i<numSeg; i++)
91
    {
92
        segInfo[i].eotEvent = NULL;
93
        segInfo[i].eitEvent = NULL;
94
        segInfo[i].lastEotSent = 0.0;
95
    }
96

    
97
    // Note boot sequence: first we have to schedule all "resend-EOT" events,
98
    // so that the simulation will start by sending out null messages --
99
    // otherwise we'd end up sitting blocked on an EIT event forever!
100

    
101
    // create "resend-EOT" events and schedule them to zero (1st thing to do)
102
    ev << "  scheduling 'resend-EOT' events...\n";
103
    for (i=0; i<numSeg; i++)
104
    {
105
        if (i!=myProcId)
106
        {
107
            sprintf(buf,"resendEOT-%d", i);
108
            cMessage *eotMsg =  new cMessage(buf,MK_PARSIM_RESENDEOT);
109
            eotMsg->setContextPointer((void *)(long)i);  // khmm...
110
            segInfo[i].eotEvent = eotMsg;
111
            rescheduleEvent(eotMsg, 0.0);
112
        }
113
    }
114

    
115
    // create EIT events and schedule them to zero (null msgs will bump them)
116
    ev << "  scheduling 'EIT' events...\n";
117
    for (i=0; i<numSeg; i++)
118
    {
119
        if (i!=myProcId)
120
        {
121
            sprintf(buf,"EIT-%d", i);
122
            cMessage *eitMsg =  new cMessage(buf,MK_PARSIM_EIT);
123
            segInfo[i].eitEvent = eitMsg;
124
            rescheduleEvent(eitMsg, 0.0);
125
        }
126
    }
127

    
128
    // start lookahead calculator too
129
    lookaheadcalc->startRun();
130

    
131
    ev << "  setup done.\n";
132

    
133
}
134

    
135
void cNullMessageProtocol::endRun()
136
{
137
    lookaheadcalc->endRun();
138
}
139

    
140
void cNullMessageProtocol::processOutgoingMessage(cMessage *msg, int destProcId, int destModuleId, int destGateId, void *data)
141
{
142
    // calculate lookahead
143
    simtime_t lookahead = lookaheadcalc->getCurrentLookahead(msg, destProcId, data);
144
    simtime_t eot = sim->getSimTime() + lookahead;
145
    if (eot < segInfo[destProcId].lastEotSent)
146
        throw cRuntimeError("cNullMessageProtocol error: attempt to decrease EOT");
147

    
148
    // send a null message only if EOT is better than last time
149
    bool sendNull = (eot > segInfo[destProcId].lastEotSent);
150

    
151
    // send message
152
    cCommBuffer *buffer = comm->createCommBuffer();
153
    if (sendNull)
154
    {
155
        // update "resend-EOT" timer
156
        segInfo[destProcId].lastEotSent = eot;
157
        simtime_t eotResendTime = sim->getSimTime() + lookahead*laziness;
158
        rescheduleEvent(segInfo[destProcId].eotEvent, eotResendTime);
159

    
160
        {if (debug) ev.printf("piggybacking null msg on `%s' to %d, lookahead=%s, EOT=%s; next resend at %s\n",
161
                              msg->getName(),destProcId,SIMTIME_STR(lookahead),SIMTIME_STR(eot),SIMTIME_STR(eotResendTime));}
162

    
163
        // send cMessage with piggybacked null message
164
        buffer->pack(eot);
165
        buffer->pack(destModuleId);
166
        buffer->pack(destGateId);
167
        buffer->packObject(msg);
168
        comm->send(buffer, TAG_CMESSAGE_WITH_NULLMESSAGE, destProcId);
169
    }
170
    else
171
    {
172
        {if (debug) ev.printf("sending `%s' to %d\n",msg->getName(),destProcId);}
173

    
174
        // send cMessage
175
        buffer->pack(destModuleId);
176
        buffer->pack(destGateId);
177
        buffer->packObject(msg);
178
        comm->send(buffer, TAG_CMESSAGE, destProcId);
179
    }
180
    comm->recycleCommBuffer(buffer);
181
}
182

    
183
void cNullMessageProtocol::processReceivedBuffer(cCommBuffer *buffer, int tag, int sourceProcId)
184
{
185
    int destModuleId;
186
    int destGateId;
187
    cMessage *msg;
188
    simtime_t eit;
189

    
190
    switch (tag)
191
    {
192
        case TAG_CMESSAGE_WITH_NULLMESSAGE:
193
            buffer->unpack(eit);
194
            processReceivedEIT(sourceProcId, eit);
195
            buffer->unpack(destModuleId);
196
            buffer->unpack(destGateId);
197
            msg = (cMessage *)buffer->unpackObject();
198
            processReceivedMessage(msg, destModuleId, destGateId, sourceProcId);
199
            break;
200

    
201
        case TAG_CMESSAGE:
202
            buffer->unpack(destModuleId);
203
            buffer->unpack(destGateId);
204
            msg = (cMessage *)buffer->unpackObject();
205
            processReceivedMessage(msg, destModuleId, destGateId, sourceProcId);
206
            break;
207

    
208
        case TAG_NULLMESSAGE:
209
            buffer->unpack(eit);
210
            processReceivedEIT(sourceProcId, eit);
211
            break;
212

    
213
        default:
214
            partition->processReceivedBuffer(buffer, tag, sourceProcId);
215
            break;
216
    }
217
    buffer->assertBufferEmpty();
218
}
219

    
220
void cNullMessageProtocol::processReceivedEIT(int sourceProcId, simtime_t eit)
221
{
222
    cMessage *eitMsg = segInfo[sourceProcId].eitEvent;
223

    
224
    {if (debug) ev.printf("null msg received from %d, EIT=%s, rescheduling EIT event\n", sourceProcId, SIMTIME_STR(eit));}
225

    
226
    // sanity check
227
    ASSERT(eit > eitMsg->getArrivalTime());
228

    
229
    // reschedule it to the EIT just received
230
    rescheduleEvent(eitMsg, eit);
231
}
232

    
233
cMessage *cNullMessageProtocol::getNextEvent()
234
{
235
    // our EIT and resendEOT messages are always scheduled, so the FES can
236
    // only be empty if there are no other partitions at all -- "no events" then
237
    // means we're finished.
238
    if (sim->msgQueue.isEmpty())
239
        return NULL;
240

    
241
    // we could do a receiveNonblocking() call here to look at our mailbox,
242
    // but for performance reasons we don't -- it's enough to read it
243
    // (receiveBlocking()) when we're stuck on an EIT. Or should we do it
244
    // every 1000 events or so? If MPI receive buffer fills up it can cause
245
    // deadlock.
246
    //receiveNonblocking();
247

    
248
    cMessage *msg;
249
    while (true)
250
    {
251
        msg = sim->msgQueue.peekFirst();
252
        if (msg->getKind() == MK_PARSIM_RESENDEOT)
253
        {
254
            // send null messages if window closed for a partition
255
            int procId = (long) msg->getContextPointer();  // khmm...
256
            sendNullMessage(procId, msg->getArrivalTime());
257
        }
258
        else if (msg->getKind() == MK_PARSIM_EIT)
259
        {
260
            // wait until it gets out of the way (i.e. we get a higher EIT)
261
            {if (debug) ev.printf("blocking on EIT event `%s'\n", msg->getName());}
262
            if (!receiveBlocking())
263
                return NULL;
264
        }
265
        else
266
        {
267
            // just a normal event -- go ahead with it
268
            break;
269
        }
270
    }
271
    return msg;
272
}
273

    
274
void cNullMessageProtocol::sendNullMessage(int procId, simtime_t now)
275
{
276
    // calculate EOT and sending of next null message
277
    simtime_t lookahead = lookaheadcalc->getCurrentLookahead(procId);
278
    simtime_t eot = now + lookahead;
279

    
280
    // ensure that even with eager resend, we only send out EOTs that
281
    // differ from previous one!
282
    if (eot == segInfo[procId].lastEotSent)
283
        return;
284
    if (eot < segInfo[procId].lastEotSent)
285
        throw cRuntimeError("cNullMessageProtocol error: attempt to decrease EOT");
286
    segInfo[procId].lastEotSent = eot;
287

    
288
    // calculate time of next null message sending, and schedule "resend-EOT" event
289
    simtime_t eotResendTime = now + lookahead*laziness;
290
    rescheduleEvent(segInfo[procId].eotEvent, eotResendTime);
291

    
292
    {if (debug) ev.printf("sending null msg to %d, lookahead=%s, EOT=%s; next resend at %s\n",procId,SIMTIME_STR(lookahead),SIMTIME_STR(eot),SIMTIME_STR(eotResendTime));}
293

    
294
    // send out null message
295
    cCommBuffer *buffer = comm->createCommBuffer();
296
    buffer->pack(eot);
297
    comm->send(buffer, TAG_NULLMESSAGE, procId);
298
    comm->recycleCommBuffer(buffer);
299
}
300

    
301
void cNullMessageProtocol::rescheduleEvent(cMessage *msg, simtime_t t)
302
{
303
    sim->msgQueue.remove(msg);  // also works if the event is not currently scheduled
304
    msg->setArrivalTime(t);
305
    sim->msgQueue.insert(msg);
306
}
307

    
308