Statistics
| Branch: | Revision:

root / src / sim / parsim / cparsimpartition.cc @ e1750c09

History | View | Annotate | Download (8.58 KB)

1 01873262 Georg Kunz
//=========================================================================
2
//  CPARSIMSEGMENT.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//  Author: Andras Varga, 2003
8
//          Dept. of Electrical and Computer Systems Engineering,
9
//          Monash University, Melbourne, Australia
10
//
11
//=========================================================================
12
13
/*--------------------------------------------------------------*
14
  Copyright (C) 2003-2008 Andras Varga
15
  Copyright (C) 2006-2008 OpenSim Ltd.
16

17
  This file is distributed WITHOUT ANY WARRANTY. See the file
18
  `license' for details on this and other legal matters.
19
*--------------------------------------------------------------*/
20
21
#include <stdlib.h>
22
#include <stdio.h>
23
#include "cmessage.h"
24
#include "errmsg.h"
25
#include "cplaceholdermod.h"
26
#include "cproxygate.h"
27
#include "cparsimpartition.h"
28
#include "ccommbuffer.h"
29
#include "cparsimcomm.h"
30
#include "cparsimsynchr.h"
31
#include "creceivedexception.h"
32
#include "messagetags.h"
33
#include "cenvir.h"
34
#include "cconfiguration.h"
35
#include "globals.h"
36
#include "cconfigoption.h"
37
#include "regmacros.h"
38
39
USING_NAMESPACE
40
41
Register_Class(cParsimPartition);
42
43
Register_GlobalConfigOption(CFGID_PARSIM_DEBUG, "parsim-debug", CFG_BOOL, "true", "With parallel-simulation=true: turns on printing of log messages from the parallel simulation code.");
44
45
cParsimPartition::cParsimPartition()
46
{
47
    sim = NULL;
48
    comm = NULL;
49
    synch = NULL;
50
    debug = ev.getConfig()->getAsBool(CFGID_PARSIM_DEBUG);
51
}
52
53
cParsimPartition::~cParsimPartition()
54
{
55
}
56
57
void cParsimPartition::setContext(cSimulation *simul, cParsimCommunications *commlayer, cParsimSynchronizer *sync)
58
{
59
    sim = simul;
60
    comm = commlayer;
61
    synch = sync;
62
}
63
64
void cParsimPartition::startRun()
65
{
66
    connectRemoteGates();
67
}
68
69
void cParsimPartition::endRun()
70
{
71
}
72
73
void cParsimPartition::shutdown()
74
{
75
    if (!comm) return; // if initialization failed
76
77
    cException e("Process has shut down");
78
    broadcastException(e);
79
80
    comm->shutdown();
81
}
82
83
84
void cParsimPartition::connectRemoteGates()
85
{
86
    cCommBuffer *buffer = comm->createCommBuffer();
87
88
    //
89
    // Step 1: broadcast what input gates we have that have to be connected
90
    //
91
    ev << "connecting remote gates: step 1 - broadcasting input gates...\n";
92
    for (int modId=0; modId<=sim->getLastModuleId(); modId++)
93
    {
94
        cPlaceholderModule *mod = dynamic_cast<cPlaceholderModule *>(sim->getModule(modId));
95
        if (mod)
96
        {
97
            for (cModule::GateIterator i(mod); !i.end(); i++)
98
            {
99
                cGate *g = i();
100
                // if this is a normal output gate which leads to a simple module,
101
                // send the input gate where it is connected.
102
                if (g->getType()==cGate::OUTPUT && g->getNextGate() &&
103
                    g->getPathEndGate()->getOwnerModule()->isSimple())
104
                {
105
                    cGate *ing = g->getNextGate();
106
                    // pack gate "address" here
107
                    buffer->pack(ing->getOwnerModule()->getId());
108
                    buffer->pack(ing->getId());
109
                    // pack gate name
110
                    buffer->pack(ing->getOwnerModule()->getFullPath().c_str());
111
                    buffer->pack(ing->getName());
112
                    buffer->pack(ing->getIndex());
113
                }
114
            }
115
        }
116
    }
117
    buffer->pack(-1); // "the end"
118
    comm->broadcast(buffer, TAG_SETUP_LINKS);
119
120
    //
121
    // Step 2: collect info broadcast by others, and use it to fill in output cProxyGates
122
    //
123
    ev << "connecting remote gates: step 2 - collecting broadcasts sent by others...\n";
124
    for (int i=0; i<comm->getNumPartitions()-1; i++)
125
    {
126
        // receive:
127
        int tag, remoteProcId;
128
        // note: *must* filter for TAG_SETUP_LINKS here, to prevent race conditions
129
        if (!comm->receiveBlocking(TAG_SETUP_LINKS, buffer, tag, remoteProcId))
130
            throw cRuntimeError("connectRemoteGates() interrupted by user");
131
        ASSERT(tag==TAG_SETUP_LINKS);
132
133
        ev << "  processing msg from procId=" << remoteProcId << "...\n";
134
135
        // process what we got:
136
        while(true)
137
        {
138
            int remoteModId;
139
            int remoteGateId;
140
            char *moduleFullPath;
141
            char *gateName;
142
            int gateIndex;
143
144
            // unpack a gate -- modId==-1 indicates end of packet
145
            buffer->unpack(remoteModId);
146
            if (remoteModId==-1)
147
                break;
148
            buffer->unpack(remoteGateId);
149
            buffer->unpack(moduleFullPath);
150
            buffer->unpack(gateName);
151
            buffer->unpack(gateIndex);
152
153
            // find corresponding output gate (if we have it) and set remote
154
            // gate address to the received one
155
            cModule *m = sim->getModuleByPath(moduleFullPath);
156
            cGate *g = m ? m->gate(gateName,gateIndex) : NULL;
157
            cProxyGate *pg = dynamic_cast<cProxyGate *>(g);
158
159
            ev << "    gate: " << moduleFullPath << "." << gateName << "["  << gateIndex << "] - ";
160
            if (!pg)
161
                ev << "not here\n";
162
            else
163
                ev << "points to (procId=" << remoteProcId << " moduleId=" << remoteModId << " gateId=" << remoteGateId << ")\n";
164
165
            if (pg)
166
            {
167
                pg->setPartition(this);
168
                pg->setRemoteGate(remoteProcId,remoteModId,remoteGateId);
169
            }
170
171
            delete [] moduleFullPath;
172
            delete [] gateName;
173
        }
174
        buffer->assertBufferEmpty();
175
    }
176
    ev << "  done.\n";
177
    comm->recycleCommBuffer(buffer);
178
}
179
180
void cParsimPartition::processOutgoingMessage(cMessage *msg, int procId, int moduleId, int gateId, void *data)
181
{
182
    if (debug) ev << "sending message '" << msg->getFullName() << "' (for T=" <<
183
                 msg->getArrivalTime() << " to procId=" << procId << "\n";
184
185
    synch->processOutgoingMessage(msg, procId, moduleId, gateId, data);
186
}
187
188
void cParsimPartition::processReceivedBuffer(cCommBuffer *buffer, int tag, int sourceProcId)
189
{
190
    opp_string errmsg;
191
    switch (tag)
192
    {
193
        case TAG_TERMINATIONEXCEPTION:
194
            buffer->unpack(errmsg);
195
            throw cReceivedTerminationException(sourceProcId, errmsg.c_str());
196
        case TAG_EXCEPTION:
197
            buffer->unpack(errmsg);
198
            throw cReceivedException(sourceProcId, errmsg.c_str());
199
        default:
200
            throw cRuntimeError("cParsimPartition::processReceivedBuffer(): unexpected tag %d "
201
                                 "from procId %d", tag, sourceProcId);
202
    }
203
    buffer->assertBufferEmpty();
204
}
205
206
void cParsimPartition::processReceivedMessage(cMessage *msg, int destModuleId, int destGateId, int sourceProcId)
207
{
208
    msg->setSrcProcId(sourceProcId);
209
    cModule *mod = sim->getModule(destModuleId);
210
    if (!mod)
211
        throw cRuntimeError("parallel simulation error: destination module id=%d for message \"%s\""
212
                             "from partition %d does not exist (any longer)",
213
                             destModuleId, msg->getName(), sourceProcId);
214
    cGate *g = mod->gate(destGateId);
215
    if (!g)
216
        throw cRuntimeError("parallel simulation error: destination gate %d of module id=%d "
217
                             "for message \"%s\" from partition %d does not exist",
218
                             destGateId, destModuleId, msg->getName(), sourceProcId);
219
220
    // do our best to set the source gate (the gate of a cPlaceholderModule)
221
    cGate *srcg = g->getPathStartGate();
222
    msg->setSentFrom(srcg->getOwnerModule(), srcg->getId(), msg->getSendingTime());
223
224
    // deliver it to the "destination" gate of the connection -- the channel
225
    // has already been simulated in the originating partition.
226
    g->deliver(msg, msg->getArrivalTime());
227
    ev.messageSent_OBSOLETE(msg); //FIXME change to the newer callback methods! messageSendHop() etc
228
}
229
230
void cParsimPartition::broadcastTerminationException(cTerminationException& e)
231
{
232
    // send TAG_TERMINATIONEXCEPTION to all partitions
233
    cCommBuffer *buffer = comm->createCommBuffer();
234
    buffer->pack(e.what());
235
    try
236
    {
237
        comm->broadcast(buffer, TAG_TERMINATIONEXCEPTION);
238
    }
239
    catch (std::exception&)
240
    {
241
        // swallow exceptions -- here we're not interested in them
242
    }
243
    comm->recycleCommBuffer(buffer);
244
}
245
246
void cParsimPartition::broadcastException(std::exception& e)
247
{
248
    // send TAG_EXCEPTION to all partitions
249
    cCommBuffer *buffer = comm->createCommBuffer();
250
    buffer->pack(e.what());
251
    try
252
    {
253
        comm->broadcast(buffer, TAG_EXCEPTION);
254
    }
255
    catch (std::exception&)
256
    {
257
        // swallow any exceptions -- here we're not interested in them
258
    }
259
    comm->recycleCommBuffer(buffer);
260
}
261