Project

General

Profile

Statistics
| Branch: | Revision:

root / src / sim / parsim / cnamedpipecomm.cc @ 81ad8b66

History | View | Annotate | Download (7.76 KB)

1
//=========================================================================
2
//  CNAMEDPIPECOMM.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//  Author: Andras Varga, 2003
8
//          Dept. of Electrical and Computer Systems Engineering,
9
//          Monash University, Melbourne, Australia
10
//
11
//=========================================================================
12

    
13
/*--------------------------------------------------------------*
14
  Copyright (C) 2003-2008 Andras Varga
15
  Copyright (C) 2006-2008 OpenSim Ltd.
16

17
  This file is distributed WITHOUT ANY WARRANTY. See the file
18
  `license' for details on this and other legal matters.
19
*--------------------------------------------------------------*/
20

    
21

    
22
#include "cnamedpipecomm.h"
23

    
24
#ifndef USE_WINDOWS_PIPES
25

    
26
#include <stdio.h>
27
#include <string.h>
28
#include <stdlib.h>
29
#include <sys/types.h>
30
#include <sys/stat.h>
31
#include <errno.h>
32
#include <fcntl.h>
33
#include <unistd.h>
34
#include "cexception.h"
35
#include "cmemcommbuffer.h"
36
#include "globals.h"
37
#include "regmacros.h"
38
#include "cconfigoption.h"
39
#include "cenvir.h"
40
#include "cconfiguration.h"
41
#include "parsimutil.h"
42

    
43
USING_NAMESPACE
44

    
45

    
46
Register_Class(cNamedPipeCommunications);
47

    
48
Register_GlobalConfigOption(CFGID_PARSIM_NAMEDPIPECOMM_PREFIX, "parsim-namedpipecommunications-prefix", CFG_STRING, "comm/", "When cNamedPipeCommunications is selected as parsim communications class: selects the prefix (directory+potential filename prefix) where name pipes are created in the file system.");
49

    
50

    
51
static int readBytes(int fd, void *buf, int len)
52
{
53
    int tot = 0;
54
    while (tot<len)
55
    {
56
        int n = read(fd, (char *)buf+tot, len-tot);
57
        if (n==-1)
58
        {
59
            if (errno==EAGAIN) // this is not an error
60
                n = 0;
61
            else
62
                return -1;
63
        }
64
        tot += n;
65
    }
66
    return tot;
67
}
68

    
69
struct PipeHeader
70
{
71
    int tag;
72
    int contentLength;
73
};
74

    
75
cNamedPipeCommunications::cNamedPipeCommunications()
76
{
77
    prefix = ev.getConfig()->getAsString(CFGID_PARSIM_NAMEDPIPECOMM_PREFIX);
78
    rpipes = NULL;
79
    wpipes = NULL;
80
    rrBase = 0;
81
}
82

    
83
cNamedPipeCommunications::~cNamedPipeCommunications()
84
{
85
    delete [] rpipes;
86
    delete [] wpipes;
87
}
88

    
89
void cNamedPipeCommunications::init()
90
{
91
    // get numPartitions and myProcId from "-p" command-line option
92
    getProcIdFromCommandLineArgs(myProcId, numPartitions, "cNamedPipeCommunications");
93
    ev.printf("cNamedPipeCommunications: started as process %d out of %d.\n", myProcId, numPartitions);
94

    
95
    // create and open pipes for read
96
    int i;
97
    rpipes = new int[numPartitions];
98
    for (i=0; i<numPartitions; i++)
99
    {
100
        if (i==myProcId)
101
        {
102
            rpipes[i] = -1;
103
            continue;
104
        }
105

    
106
        char fname[256];
107
        sprintf(fname,"%spipe-%d-%d", prefix.buffer(), myProcId, i);
108
        ev.printf("cNamedPipeCommunications: creating and opening pipe '%s' for read...\n", fname);
109
        unlink(fname);
110
        if (mknod(fname, S_IFIFO|0600, 0)==-1)
111
            throw cRuntimeError("cNamedPipeCommunications: cannot create pipe '%s': %s", fname, strerror(errno));
112
        rpipes[i] = open(fname,O_RDONLY|O_NONBLOCK);
113
        if (rpipes[i]==-1)
114
            throw cRuntimeError("cNamedPipeCommunications: cannot open pipe '%s' for read: %s", fname, strerror(errno));
115

    
116
        if (rpipes[i] > maxFdPlus1)
117
            maxFdPlus1 = rpipes[i];
118
    }
119
    maxFdPlus1 += 1;
120

    
121
    // open pipes for write
122
    wpipes = new int[numPartitions];
123
    for (i=0; i<numPartitions; i++)
124
    {
125
        if (i==myProcId)
126
        {
127
            wpipes[i] = -1;
128
            continue;
129
        }
130

    
131
        char fname[256];
132
        sprintf(fname,"%spipe-%d-%d", prefix.buffer(), i, myProcId);
133
        ev.printf("cNamedPipeCommunications: opening pipe '%s' for write...\n", fname);
134
        wpipes[i] = open(fname,O_WRONLY);
135
        for (int k=0; k<30 && wpipes[i]==-1; k++)
136
        {
137
            sleep(1);
138
            wpipes[i] = open(fname,O_WRONLY);
139
        }
140
        if (wpipes[i]==-1)
141
            throw cRuntimeError("cNamedPipeCommunications: cannot open pipe '%s' for write: %s", fname, strerror(errno));
142
    }
143
}
144

    
145
void cNamedPipeCommunications::shutdown()
146
{
147
    for (int i=0; i<numPartitions; i++)
148
    {
149
        close(rpipes[i]);
150
        close(wpipes[i]);
151
    }
152
}
153

    
154
int cNamedPipeCommunications::getNumPartitions() const
155
{
156
    return numPartitions;
157
}
158

    
159
int cNamedPipeCommunications::getProcId() const
160
{
161
    return myProcId;
162
}
163

    
164
cCommBuffer *cNamedPipeCommunications::createCommBuffer()
165
{
166
    return new cMemCommBuffer();
167
}
168

    
169
void cNamedPipeCommunications::recycleCommBuffer(cCommBuffer *buffer)
170
{
171
    delete buffer;
172
}
173

    
174
void cNamedPipeCommunications::send(cCommBuffer *buffer, int tag, int destination)
175
{
176
    cMemCommBuffer *b = (cMemCommBuffer *)buffer;
177
    int fd = wpipes[destination];
178

    
179
    struct PipeHeader ph;
180
    ph.tag = tag;
181
    ph.contentLength = b->getMessageSize();
182
    if (write(fd, &ph, sizeof(ph))==-1)
183
        throw cRuntimeError("cNamedPipeCommunications: cannot write pipe to procId=%d: %s", destination, strerror(errno));
184
    if (write(fd, b->getBuffer(), ph.contentLength)==-1)
185
        throw cRuntimeError("cNamedPipeCommunications: cannot write pipe to procId=%d: %s", destination, strerror(errno));
186
}
187

    
188
bool cNamedPipeCommunications::receive(int filtTag, cCommBuffer *buffer, int& receivedTag, int& sourceProcId, bool blocking)
189
{
190
    bool recv = doReceive(buffer, receivedTag, sourceProcId, blocking);
191
    // TBD implement tag filtering
192
    if (recv && filtTag!=PARSIM_ANY_TAG && filtTag!=receivedTag)
193
        throw cRuntimeError("cNamedPipeCommunications: tag filtering not implemented");
194
    return recv;
195
}
196

    
197
bool cNamedPipeCommunications::doReceive(cCommBuffer *buffer, int& receivedTag, int& sourceProcId, bool blocking)
198
{
199
    cMemCommBuffer *b = (cMemCommBuffer *)buffer;
200
    b->reset();
201

    
202
    // create file descriptor set for select() call
203
    int i;
204
    fd_set fdset;
205
    FD_ZERO(&fdset);
206
    for (i=0; i<numPartitions; i++)
207
        if (rpipes[i]!=-1)
208
            FD_SET(rpipes[i], &fdset);
209

    
210
    struct timeval tv;
211
    tv.tv_sec = blocking ? 1 : 0; // if blocking, wait 1 sec
212
    tv.tv_usec = 0;
213

    
214
    int ret = select(maxFdPlus1, &fdset, NULL, NULL, &tv);
215
    if (ret>0)
216
    {
217
        rrBase = (rrBase+1)%numPartitions;
218
        for (int k=0; k<numPartitions; k++)
219
        {
220
            i = (rrBase+k)%numPartitions; // shift by rrBase for Round-Robin query
221
            if (rpipes[i]!=-1 && FD_ISSET(rpipes[i],&fdset))
222
            {
223
                struct PipeHeader ph;
224
                if (readBytes(rpipes[i], &ph, sizeof(ph))==-1)
225
                    throw cRuntimeError("cNamedPipeCommunications: cannot read from pipe "
226
                                        "to procId=%d: %s", sourceProcId, strerror(errno));
227

    
228
                sourceProcId = i;
229
                receivedTag = ph.tag;
230
                b->allocateAtLeast(ph.contentLength);
231
                b->setMessageSize(ph.contentLength);
232

    
233
                if (readBytes(rpipes[i], b->getBuffer(), ph.contentLength)==-1)
234
                    throw cRuntimeError("cNamedPipeCommunications: cannot read from pipe "
235
                                        "to procId=%d: %s", sourceProcId, strerror(errno));
236
                return true;
237
            }
238
        }
239
    }
240

    
241
    return false;
242
}
243

    
244
bool cNamedPipeCommunications::receiveBlocking(int filtTag, cCommBuffer *buffer, int& receivedTag, int& sourceProcId)
245
{
246
    // select() call inside receive() will block for max 1s, yielding CPU
247
    // to other processes in the meantime
248
    while (!receive(filtTag, buffer, receivedTag, sourceProcId, true))
249
    {
250
        if (ev.idle())
251
            return false;
252
    }
253
    return true;
254
}
255

    
256
bool cNamedPipeCommunications::receiveNonblocking(int filtTag, cCommBuffer *buffer, int& receivedTag, int& sourceProcId)
257
{
258
    return receive(filtTag, buffer, receivedTag, sourceProcId, false);
259
}
260

    
261
#endif /* (!USE_WINDOWS_PIPES) */
262

    
263