Project

General

Profile

Statistics
| Branch: | Revision:

root / src / sim / parsim / cmpicomm.cc @ 81ad8b66

History | View | Annotate | Download (6.33 KB)

1 01873262 Georg Kunz
//=========================================================================
2
//  CMPICOMM.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//  Author: Andras Varga, 2003
8
//          Dept. of Electrical and Computer Systems Engineering,
9
//          Monash University, Melbourne, Australia
10
//
11
//=========================================================================
12
13
/*--------------------------------------------------------------*
14
  Copyright (C) 2003-2008 Andras Varga
15
  Copyright (C) 2006-2008 OpenSim Ltd.
16

17
  This file is distributed WITHOUT ANY WARRANTY. See the file
18
  `license' for details on this and other legal matters.
19
*--------------------------------------------------------------*/
20
21
#ifdef WITH_MPI
22
23
#include <mpi.h> // Intel MPI wants <mpi.h> to precede <stdio.h>
24
#include <stdio.h>
25
#include "cmpicomm.h"
26
#include "cmpicommbuffer.h"
27
#include "globals.h"
28
#include "regmacros.h"
29
#include "cenvir.h"
30
#include "cconfiguration.h"
31
#include "cconfigoption.h"
32
#include "platmisc.h"
33
34
USING_NAMESPACE
35
36
Register_Class(cMPICommunications);
37
38
Register_GlobalConfigOption(CFGID_PARSIM_MPICOMMUNICATIONS_MPIBUFFER, "parsim-mpicommunications-mpibuffer", CFG_INT, NULL, "When cMPICommunications is selected as parsim communications class: specifies the size of the MPI communications buffer. The default is to calculate a buffer size based on the number of partitions.");
39
40
// default is 256k. If too small, simulation can block in MPI send calls.
41
#define MPI_SEND_BUFFER_PER_PARTITION (256*1024)
42
43
44
cMPICommunications::cMPICommunications()
45
{
46
    recycledBuffer = NULL;
47
}
48
49
cMPICommunications::~cMPICommunications()
50
{
51
    // do nothing: MPI_finalize() has been called in shutdown()
52
    delete recycledBuffer;
53
}
54
55
void cMPICommunications::init()
56
{
57
    // sanity check
58
    int argc = ev.getArgCount();
59
    char **argv = ev.getArgVector();
60
    for (int i=1; i<argc; i++)
61
        if (argv[i][0]=='-' && argv[i][1]=='p')
62
            ev.printf("WARNING: cMPICommunications doesn't need -p command-line option, ignored\n");
63
64
    // init MPI
65
    MPI_Init(&argc, &argv);
66
67
    // get numPartitions and myRank from MPI
68
    MPI_Comm_size(MPI_COMM_WORLD, &numPartitions);
69
    MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
70
71
    ev.printf("cMPICommunications: started as process %d out of %d.\n", myRank, numPartitions);
72
    if (numPartitions==1)
73
        ev.printf("WARNING: MPI thinks this process is the only one in the session "
74
                  "(did you use mpirun to start this program?)\n");
75
76
    // set up MPI send buffer (+16K prevents MPI_Buffer_attach() error if numPartitions==1)
77
    int defaultBufSize = MPI_SEND_BUFFER_PER_PARTITION * (numPartitions-1) + 16384;
78
    int bufSize = ev.getConfig()->getAsInt(CFGID_PARSIM_MPICOMMUNICATIONS_MPIBUFFER);
79
    if (bufSize<=0) bufSize = defaultBufSize;
80
    char *buf = new char[bufSize];
81
    MPI_Buffer_attach(buf, bufSize);
82
}
83
84
void cMPICommunications::shutdown()
85
{
86
    // wait a little before exiting MPI, to prevent peers getting killed by SIGPIPE
87
    // on a write before they would get a chance to process the broadcastException.
88
    usleep(1000000);  // 1s
89
    MPI_Finalize();
90
}
91
92
int cMPICommunications::getNumPartitions() const
93
{
94
    return numPartitions;
95
}
96
97
int cMPICommunications::getProcId() const
98
{
99
    return myRank;
100
}
101
102
cMPICommBuffer *cMPICommunications::doCreateCommBuffer()
103
{
104
    return new cMPICommBuffer();
105
}
106
107
cCommBuffer *cMPICommunications::createCommBuffer()
108
{
109
    // we pool only one reusable buffer -- additional buffers are created/deleted on demand
110
    cMPICommBuffer *buffer;
111
    if (recycledBuffer)
112
    {
113
        buffer = recycledBuffer;
114
        buffer->reset();
115
        recycledBuffer = NULL;
116
    }
117
    else
118
    {
119
        buffer = doCreateCommBuffer();
120
    }
121
    return buffer;
122
}
123
124
void cMPICommunications::recycleCommBuffer(cCommBuffer *buffer)
125
{
126
    // we pool only one reusable buffer -- additional buffer are created/deleted on demand
127
    if (!recycledBuffer)
128
        recycledBuffer = (cMPICommBuffer *)buffer;
129
    else
130
        delete buffer;
131
}
132
133
void cMPICommunications::send(cCommBuffer *buffer, int tag, int destination)
134
{
135
    cMPICommBuffer *b = (cMPICommBuffer *)buffer;
136
    // Note: we must use *buffered* send, otherwise we may block here and
137
    // cause deadlock
138
    int status = MPI_Bsend(b->getBuffer(), b->getMessageSize(), MPI_PACKED, destination, tag, MPI_COMM_WORLD);
139
    if (status!=MPI_SUCCESS)
140
        throw cRuntimeError("cMPICommunications::send(): MPI error %d", status);
141
}
142
143
void cMPICommunications::broadcast(cCommBuffer *buffer, int tag)
144
{
145
    // TBD make use of MPI call instead
146
    cParsimCommunications::broadcast(buffer, tag);
147
}
148
149
bool cMPICommunications::receiveBlocking(int filtTag, cCommBuffer *buffer, int& receivedTag, int& sourceProcId)
150
{
151
    // use MPI_Probe() to determine message size, then receive it
152
    cMPICommBuffer *b = (cMPICommBuffer *)buffer;
153
    MPI_Status status;
154
    int msgsize;
155
    if (filtTag==PARSIM_ANY_TAG) filtTag=MPI_ANY_TAG;
156
    MPI_Probe(MPI_ANY_SOURCE, filtTag, MPI_COMM_WORLD, &status);
157
    MPI_Get_count(&status, MPI_PACKED, &msgsize);
158
    b->allocateAtLeast(msgsize);
159
    int err = MPI_Recv(b->getBuffer(), b->getBufferLength(), MPI_PACKED, MPI_ANY_SOURCE, filtTag, MPI_COMM_WORLD, &status);
160
    if (err!=MPI_SUCCESS)
161
        throw cRuntimeError("cMPICommunications::receiveBlocking(): MPI error %d", err);
162
    b->setMessageSize(msgsize);
163
    receivedTag = status.MPI_TAG;
164
    sourceProcId = status.MPI_SOURCE;
165
    return true;
166
}
167
168
bool cMPICommunications::receiveNonblocking(int filtTag, cCommBuffer *buffer, int& receivedTag, int& sourceProcId)
169
{
170
    // probe if we have something to receive ...
171
    cMPICommBuffer *b = (cMPICommBuffer *)buffer;
172
    MPI_Status status;
173
    int flag;
174
    if (filtTag==PARSIM_ANY_TAG) filtTag=MPI_ANY_TAG;
175
    MPI_Iprobe(MPI_ANY_SOURCE, filtTag, MPI_COMM_WORLD, &flag, &status);
176
177
    // ... and receive it if we do
178
    if (flag)
179
    {
180
        int msgsize;
181
        MPI_Get_count(&status, MPI_PACKED, &msgsize);
182
        b->allocateAtLeast(msgsize);
183
        int err = MPI_Recv(b->getBuffer(), b->getBufferLength(), MPI_PACKED, MPI_ANY_SOURCE, filtTag, MPI_COMM_WORLD, &status);
184
        if (err!=MPI_SUCCESS)
185
            throw cRuntimeError("cMPICommunications::receiveNonBlocking(): MPI error %d", err);
186
        b->setMessageSize(msgsize);
187
        receivedTag = status.MPI_TAG;
188
        sourceProcId = status.MPI_SOURCE;
189
        return true;
190
    }
191
    return false;
192
}
193
194
#endif  // WITH_MPI