Project

General

Profile

Statistics
| Branch: | Revision:

root / src / sim / parsim / cnamedpipecomm-win.cc @ 81ad8b66

History | View | Annotate | Download (9.18 KB)

1
//=========================================================================
2
// CNAMEDPIPECOMM-WIN.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//  Author: Andras Varga, 2003
8
//          Dept. of Electrical and Computer Systems Engineering,
9
//          Monash University, Melbourne, Australia
10
//
11
//=========================================================================
12

    
13
/*--------------------------------------------------------------*
14
  Copyright (C) 2003-2008 Andras Varga
15
  Copyright (C) 2006-2008 OpenSim Ltd.
16

17
  This file is distributed WITHOUT ANY WARRANTY. See the file
18
  `license' for details on this and other legal matters.
19
*--------------------------------------------------------------*/
20

    
21

    
22
#include "cnamedpipecomm.h"
23

    
24
#ifdef USE_WINDOWS_PIPES
25

    
26
#include <stdio.h>
27
#include <string.h>
28
#include <stdlib.h>
29
#include <io.h>   // _sleep()
30
#include <string>
31
#include <iostream>
32

    
33
#include "cexception.h"
34
#include "cmemcommbuffer.h"
35
#include "globals.h"
36
#include "regmacros.h"
37
#include "cenvir.h"
38
#include "cconfiguration.h"
39
#include "cconfigoption.h"
40
#include "parsimutil.h"
41

    
42
USING_NAMESPACE
43

    
44

    
45
Register_Class(cNamedPipeCommunications);
46

    
47
Register_GlobalConfigOption(CFGID_PARSIM_NAMEDPIPECOMM_PREFIX, "parsim-namedpipecommunications-prefix", CFG_STRING, "omnetpp", "When cNamedPipeCommunications is selected as parsim communications class: selects the name prefix for Windows named pipes created.");
48

    
49

    
50
#define sleep(x)  _sleep((x)*1000)
51
#define usleep(x) _sleep((x)/1000)
52

    
53
#define PIPE_INBUFFERSIZE  (1024*1024) /*1MB*/
54

    
55

    
56
//FIXME resolve duplication -- we have this in Envir as well
57
static std::string getWindowsError()
58
{
59
    long errorcode = GetLastError();
60
    LPVOID lpMsgBuf;
61
    FormatMessage( FORMAT_MESSAGE_ALLOCATE_BUFFER |
62
                   FORMAT_MESSAGE_FROM_SYSTEM |
63
                   FORMAT_MESSAGE_IGNORE_INSERTS,
64
                   NULL,
65
                   errorcode,
66
                   MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
67
                   (LPTSTR) &lpMsgBuf,
68
                   0,
69
                   NULL );
70
    ((char *)lpMsgBuf)[strlen((char *)lpMsgBuf)-3] = '\0';  // chop ".\r\n"
71

    
72
    std::stringstream out;
73
    out << "error " << errorcode << ": " << (const char *)lpMsgBuf;
74
    LocalFree(lpMsgBuf);
75
    return out.str();
76
}
77

    
78
struct PipeHeader
79
{
80
    int tag;
81
    unsigned long contentLength;
82
};
83

    
84
cNamedPipeCommunications::cNamedPipeCommunications()
85
{
86
    prefix = ev.getConfig()->getAsString(CFGID_PARSIM_NAMEDPIPECOMM_PREFIX);
87
    rpipes = NULL;
88
    wpipes = NULL;
89
    rrBase = 0;
90
}
91

    
92
cNamedPipeCommunications::~cNamedPipeCommunications()
93
{
94
    delete [] rpipes;
95
    delete [] wpipes;
96
}
97

    
98
void cNamedPipeCommunications::init()
99
{
100
    // get numPartitions and myProcId from "-p" command-line option
101
    getProcIdFromCommandLineArgs(myProcId, numPartitions, "cNamedPipeCommunications");
102
    ev.printf("cNamedPipeCommunications: started as process %d out of %d.\n", myProcId, numPartitions);
103

    
104
    // create and open pipes for read
105
    int i;
106
    rpipes = new HANDLE[numPartitions];
107
    for (i=0; i<numPartitions; i++)
108
    {
109
        if (i==myProcId)
110
        {
111
            rpipes[i] = INVALID_HANDLE_VALUE;
112
            continue;
113
        }
114

    
115
        char fname[256];
116
        sprintf(fname,"\\\\.\\pipe\\%s-%d-%d", prefix.buffer(), myProcId, i);
117
        ev.printf("cNamedPipeCommunications: creating pipe '%s' for read...\n", fname);
118

    
119
        int openMode = PIPE_ACCESS_INBOUND;
120
        int pipeMode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
121
        rpipes[i] = CreateNamedPipe(fname, openMode, pipeMode, 1, 0, PIPE_INBUFFERSIZE, ~0UL, NULL);
122
        if (rpipes[i] == INVALID_HANDLE_VALUE)
123
            throw cRuntimeError("cNamedPipeCommunications: CreateNamedPipe operation failed: %s", getWindowsError().c_str());
124
    }
125

    
126
    // open pipes for write
127
    wpipes = new HANDLE[numPartitions];
128
    for (i=0; i<numPartitions; i++)
129
    {
130
        if (i==myProcId)
131
        {
132
            wpipes[i] = INVALID_HANDLE_VALUE;
133
            continue;
134
        }
135

    
136
        char fname[256];
137
        sprintf(fname,"\\\\.\\pipe\\%s-%d-%d", prefix.buffer(), i, myProcId);
138
        ev.printf("cNamedPipeCommunications: opening pipe '%s' for write...\n", fname);
139
        for (int k=0; k<60; k++)
140
        {
141
            if (k>0 && k%5==0) ev.printf("retry %d of 60...\n", k);
142
            wpipes[i] = CreateFile(fname, GENERIC_WRITE, 0, NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL);
143
            if (wpipes[i]!=INVALID_HANDLE_VALUE)
144
                break;
145
            sleep(1);
146
        }
147
        if (wpipes[i] == INVALID_HANDLE_VALUE)
148
            throw cRuntimeError("cNamedPipeCommunications: CreateFile operation failed: %s", getWindowsError().c_str());
149
    }
150

    
151
    // now wait until everybody else also opens the pipes for write
152
    for (i=0; i<numPartitions; i++)
153
    {
154
        if (i==myProcId)
155
            continue;
156
        ev.printf("cNamedPipeCommunications: opening pipe from procId=%d for read...\n", i);
157
        if (!ConnectNamedPipe(rpipes[i], NULL) && GetLastError()!=ERROR_PIPE_CONNECTED)
158
            throw cRuntimeError("cNamedPipeCommunications: ConnectNamedPipe operation failed: %s", getWindowsError().c_str());
159
    }
160

    
161
}
162

    
163
void cNamedPipeCommunications::shutdown()
164
{
165
}
166

    
167
int cNamedPipeCommunications::getNumPartitions() const
168
{
169
    return numPartitions;
170
}
171

    
172
int cNamedPipeCommunications::getProcId() const
173
{
174
    return myProcId;
175
}
176

    
177
cCommBuffer *cNamedPipeCommunications::createCommBuffer()
178
{
179
    return new cMemCommBuffer();
180
}
181

    
182
void cNamedPipeCommunications::recycleCommBuffer(cCommBuffer *buffer)
183
{
184
    delete buffer;
185
}
186

    
187
void cNamedPipeCommunications::send(cCommBuffer *buffer, int tag, int destination)
188
{
189
    cMemCommBuffer *b = (cMemCommBuffer *)buffer;
190
    HANDLE h = wpipes[destination];
191

    
192
    struct PipeHeader ph;
193
    ph.tag = tag;
194
    ph.contentLength = b->getMessageSize();
195

    
196
    unsigned long bytesWritten;
197
    if (!WriteFile(h, &ph, sizeof(ph), &bytesWritten, 0))
198
        throw cRuntimeError("cNamedPipeCommunications: cannot write pipe to procId=%d: %s", destination, getWindowsError().c_str());
199
    if (!WriteFile(h, b->getBuffer(), ph.contentLength, &bytesWritten, 0))
200
        throw cRuntimeError("cNamedPipeCommunications: cannot write pipe to procId=%d: %s", destination, getWindowsError().c_str());
201
}
202

    
203
bool cNamedPipeCommunications::receive(int filtTag, cCommBuffer *buffer, int& receivedTag, int& sourceProcId, bool blocking)
204
{
205
    bool recv = doReceive(buffer, receivedTag, sourceProcId, blocking);
206
    // TBD implement tag filtering
207
    if (recv && filtTag!=PARSIM_ANY_TAG && filtTag!=receivedTag)
208
        throw cRuntimeError("cNamedPipeCommunications: tag filtering not implemented");
209
    return recv;
210
}
211

    
212
bool cNamedPipeCommunications::doReceive(cCommBuffer *buffer, int& receivedTag, int& sourceProcId, bool blocking)
213
{
214
    cMemCommBuffer *b = (cMemCommBuffer *)buffer;
215
    b->reset();
216

    
217
    // TBD may be refined to handle blocking=true (right now just returns immediately)
218

    
219
    // select pipe to read
220
    int i, k;
221
    for (k=0; k<numPartitions; k++)
222
    {
223
        i = (rrBase+k)%numPartitions; // shift by rrBase for Round-Robin query
224
        if (i==myProcId)
225
            continue;
226
        unsigned long bytesAvail, bytesLeft;
227
        if (!PeekNamedPipe(rpipes[i], NULL, 0, NULL, &bytesAvail, &bytesLeft))
228
            throw cRuntimeError("cNamedPipeCommunications: cannot peek pipe to procId=%d: %s",
229
                                    i, getWindowsError().c_str());
230
        if (bytesAvail>0)
231
            break;
232
    }
233
    if (k==numPartitions)
234
        return false;
235

    
236
    rrBase = (rrBase+1)%numPartitions;
237
    sourceProcId = i;
238
    HANDLE h = rpipes[i];
239

    
240
    // read message from selected pipe (handle h)
241
    unsigned long bytesRead;
242
    struct PipeHeader ph;
243
    if (!ReadFile(h, &ph, sizeof(ph), &bytesRead, NULL))
244
        throw cRuntimeError("cNamedPipeCommunications: cannot read from pipe to procId=%d: %s",
245
                                sourceProcId, getWindowsError().c_str());
246
    if (bytesRead<sizeof(ph))
247
        throw cRuntimeError("cNamedPipeCommunications: ReadFile returned less data than expected");
248

    
249
    receivedTag = ph.tag;
250
    b->allocateAtLeast(ph.contentLength);
251
    b->setMessageSize(ph.contentLength);
252

    
253
    if (!ReadFile(h, b->getBuffer(), ph.contentLength, &bytesRead, NULL))
254
        throw cRuntimeError("cNamedPipeCommunications: cannot read from pipe to procId=%d: %s",
255
                                sourceProcId, getWindowsError().c_str());
256
    if (bytesRead<ph.contentLength)
257
        throw cRuntimeError("cNamedPipeCommunications: ReadFile returned less data than expected");
258
    return true;
259

    
260
}
261

    
262
bool cNamedPipeCommunications::receiveBlocking(int filtTag, cCommBuffer *buffer, int& receivedTag, int& sourceProcId)
263
{
264
    // receive() currently doesn't handle blocking (PeekNamedPipe() returns
265
    // immediately if nothing has been received), so we need to sleep a little
266
    // between invocations, in order to save CPU cycles.
267
    while (!receive(filtTag, buffer, receivedTag, sourceProcId, true))
268
    {
269
        if (ev.idle())
270
            return false;
271
        usleep(10000); // be polite and wait 0.01s
272
    }
273
    return true;
274
}
275

    
276
bool cNamedPipeCommunications::receiveNonblocking(int filtTag, cCommBuffer *buffer, int& receivedTag, int& sourceProcId)
277
{
278
    return receive(filtTag, buffer, receivedTag, sourceProcId, false);
279
}
280

    
281
#endif  /* USE_WINDOWS_PIPES */
282

    
283

    
284

    
285