Project

General

Profile

Statistics
| Branch: | Revision:

root / src / sim / parsim / cfilecomm.cc @ 81ad8b66

History | View | Annotate | Download (7.65 KB)

1
//=========================================================================
2
//  CFILECOMM.CC - part of
3
//
4
//                  OMNeT++/OMNEST
5
//           Discrete System Simulation in C++
6
//
7
//  Author: Andras Varga, 2003
8
//          Dept. of Electrical and Computer Systems Engineering,
9
//          Monash University, Melbourne, Australia
10
//
11
//=========================================================================
12

    
13
/*--------------------------------------------------------------*
14
  Copyright (C) 2003-2008 Andras Varga
15
  Copyright (C) 2006-2008 OpenSim Ltd.
16

17
  This file is distributed WITHOUT ANY WARRANTY. See the file
18
  `license' for details on this and other legal matters.
19
*--------------------------------------------------------------*/
20

    
21
#include <stdio.h>
22
#include <string.h>
23
#include <stdlib.h>
24
#include <sys/types.h>
25
#include <sys/stat.h>
26
#include <errno.h>
27
#include "cexception.h"
28
#include "cfilecomm.h"
29
#include "cfilecommbuffer.h"
30
#include "globals.h"
31
#include "cconfigoption.h"
32
#include "regmacros.h"
33
#include "cenvir.h"
34
#include "cconfiguration.h"
35
#include "parsimutil.h"
36
#include "fileglobber.h"
37
#include "platmisc.h"
38

    
39
USING_NAMESPACE
40

    
41

    
42
Register_Class(cFileCommunications);
43

    
44
Register_GlobalConfigOption(CFGID_FILECOMM_PREFIX, "parsim-filecommunications-prefix", CFG_STRING, "comm/", "When cFileCommunications is selected as parsim communications class: specifies the prefix (directory+potential filename prefix) for creating the files for cross-partition messages.");
45
Register_GlobalConfigOption(CFGID_FILECOMM_READ_PREFIX, "parsim-filecommunications-read-prefix", CFG_STRING, "comm/read/", "When cFileCommunications is selected as parsim communications class: specifies the prefix (directory) where files will be moved after having been consumed.");
46
Register_GlobalConfigOption(CFGID_FILECOMM_PRESERVE_READ, "parsim-filecommunications-preserve-read", CFG_BOOL, "false", "When cFileCommunications is selected as parsim communications class: specifies that consumed files should be moved into another directory instead of being deleted.");
47

    
48
cFileCommunications::cFileCommunications()
49
{
50
    commDirPrefix = ev.getConfig()->getAsString(CFGID_FILECOMM_PREFIX);
51
    readDirPrefix = ev.getConfig()->getAsString(CFGID_FILECOMM_READ_PREFIX);
52
    preserveReadFiles = ev.getConfig()->getAsBool(CFGID_FILECOMM_PRESERVE_READ);
53

    
54
    seqNum = 0;
55
}
56

    
57
cFileCommunications::~cFileCommunications()
58
{
59
}
60

    
61
void cFileCommunications::init()
62
{
63
    // get numPartitions and myProcId from "-p" command-line option
64
    getProcIdFromCommandLineArgs(myProcId, numPartitions, "cFileCommunications");
65
    ev.printf("cFileCommunications: started as process %d out of %d.\n", myProcId, numPartitions);
66

    
67
    // We cannot check here that the communications directory is empty, because
68
    // other partitions may have already sent messages to us...
69
}
70

    
71
void cFileCommunications::shutdown()
72
{
73
}
74

    
75
int cFileCommunications::getNumPartitions() const
76
{
77
    return numPartitions;
78
}
79

    
80
int cFileCommunications::getProcId() const
81
{
82
    return myProcId;
83
}
84

    
85
cCommBuffer *cFileCommunications::createCommBuffer()
86
{
87
    return new cFileCommBuffer();
88
}
89

    
90
void cFileCommunications::recycleCommBuffer(cCommBuffer *buffer)
91
{
92
    delete buffer;
93
}
94

    
95
void cFileCommunications::send(cCommBuffer *buffer, int tag, int destination)
96
{
97
    cFileCommBuffer *b = (cFileCommBuffer *)buffer;
98

    
99
    // to prevent concurrency problems, first create the file as .tmp,
100
    // then rename it to .msg
101
    char fname[100], fname2[100];
102
    sprintf(fname,"%s#%.6d-s%d-d%d-t%d.tmp", commDirPrefix.buffer(), seqNum++, myProcId, destination, tag);
103

    
104
    // create
105
    FILE *f = fopen(fname,"wb");
106
    if (!f)
107
        throw cRuntimeError("cFileCommunications: cannot open %s for write: %s", fname, strerror(errno));
108
    if (fwrite(b->getBuffer(), b->getMessageSize(), 1, f)<1)
109
        throw cRuntimeError("cFileCommunications: cannot write %s: %s", fname, strerror(errno));
110
    if (fclose(f)!=0)
111
        throw cRuntimeError("cFileCommunications: cannot close %s after writing: %s", fname, strerror(errno));
112

    
113
    // rename
114
    strcpy(fname2,fname);
115
    strcpy(fname2+strlen(fname2)-4, ".msg");
116
    if (rename(fname, fname2)!=0)
117
        throw cRuntimeError("cFileCommunications: cannot rename %s to %s: %s", fname, fname2, strerror(errno));
118
}
119

    
120
bool cFileCommunications::receiveBlocking(int filtTag, cCommBuffer *buffer, int& receivedTag, int& sourceProcId)
121
{
122
    while (!receiveNonblocking(filtTag, buffer, receivedTag, sourceProcId))
123
    {
124
        if (ev.idle())
125
            return false;
126
        usleep(100000); // be nice and polite: wait 0.1s
127
    }
128
    return true;
129
}
130

    
131
bool cFileCommunications::receiveNonblocking(int filtTag, cCommBuffer *buffer, int& receivedTag, int& sourceProcId)
132
{
133
    cFileCommBuffer *b = (cFileCommBuffer *)buffer;
134
    b->reset();
135

    
136
    char fmask[100];
137
    char fname2[100];
138
    if (filtTag==PARSIM_ANY_TAG)
139
        sprintf(fmask,"%s#*-s*-d%d-t*.msg", commDirPrefix.buffer(), myProcId);
140
    else
141
        sprintf(fmask,"%s#*-s*-d%d-t%d.msg", commDirPrefix.buffer(), myProcId, filtTag);
142

    
143
    bool ret = false;
144
    const char *fname = FileGlobber(fmask).getNext();
145
    if (fname)
146
    {
147
        ret = true;
148

    
149
        // parse fname
150
        const char *s = strstr(fname, "-s");
151
        sourceProcId = atol(s+2);
152
        const char *t = strstr(fname, "-t");
153
        receivedTag = atol(t+2);
154
        //const char *n = strstr(fname, "#");
155
        //int seqNum = atol(n+2);
156

    
157
        //DBG: printf("%d: filecomm: found %s -- src=%d, tag=%d\n",getProcId(),fname,sourceProcId,receivedTag);
158

    
159
        // read data
160
        struct stat statbuf;
161
        if (stat(fname, &statbuf)!=0)
162
            throw cRuntimeError("cFileCommunications: cannot stat() file %s: ", fname, strerror(errno));
163
        int len = statbuf.st_size;
164
        b->allocateAtLeast(len);
165
        FILE *f = fopen(fname,"rb");
166
        if (!f)
167
        {
168
            // try a bit harder. On Windows, first fopen() sometimes fails with
169
            // "permission denied".
170
            for (int i=0; i<20; i++)
171
            {
172
                usleep(500000); // wait 0.5s
173
                ev.printf("cFileCommunications: retrying to open file %s (previous attempt failed)\n", fname);
174
                f = fopen(fname,"rb");
175
                if (f) break;
176
            }
177
        }
178
        if (!f)
179
            throw cRuntimeError("cFileCommunications: cannot open existing file %s for read: %s", fname, strerror(errno));
180
        if (fread(b->getBuffer(), len, 1, f)==0)
181
            // FIXME condition always fires. why?
182
            //throw cRuntimeError("cFileCommunications: cannot read existing file %s: %s", fname, strerror(errno));
183
            ;
184
        fclose(f);
185
        b->setMessageSize(len);
186

    
187
        if (preserveReadFiles)
188
        {
189
            // move file to 'read' directory
190
            //
191
            // BEWARE: for mysterious reasons, it appears that there cannot be more
192
            // than about 19800 files in a directory. When that point is reached,
193
            // an exception is thrown somewhere inside the standard C library, which
194
            // materializes itself in OMNeT++ as an "Error: (null)" message...
195
            // Strangely, this can be reproduced in both Linux and Windows.
196
            //
197
            strcpy(fname2, readDirPrefix.buffer());
198
            strcat(fname2, fname + strlen(commDirPrefix.buffer()));
199
            if (rename(fname, fname2)!=0)
200
                throw cRuntimeError("cFileCommunications: cannot rename %s to %s: %s", fname, fname2, strerror(errno));
201
        }
202
        else
203
        {
204
            // delete file
205
            if (unlink(fname)!=0)
206
                throw cRuntimeError("cFileCommunications: cannot delete file %s: %s", fname, strerror(errno));
207
        }
208
    }
209
    //DBG: printf("%d: filecomm: nothing found matching %s\n",getProcId(),fmask);
210
    return ret;
211
}
212

    
213

    
214