Project

General

Profile

Statistics
| Branch: | Revision:

root / src / scave / indexedvectorfile.cc @ a3be1d55

History | View | Annotate | Download (16.8 KB)

1
//=========================================================================
2
//  INDEXEDVECTORFILE.CC - part of
3
//                  OMNeT++/OMNEST
4
//           Discrete System Simulation in C++
5
//
6
//  Author: Tamas Borbely
7
//
8
//=========================================================================
9

    
10
/*--------------------------------------------------------------*
11
  Copyright (C) 2006-2008 OpenSim Ltd.
12

13
  This file is distributed WITHOUT ANY WARRANTY. See the file
14
  `license' for details on this and other legal matters.
15
*--------------------------------------------------------------*/
16

    
17
#include <locale.h>
18
#include <stdlib.h>
19
#include "platmisc.h"
20
#include "exception.h"
21
#include "linetokenizer.h"
22
#include "channel.h"
23
#include "stringutil.h"
24
#include "indexedvectorfile.h"
25
#include "scaveutils.h"
26

    
27
USING_NAMESPACE
28

    
29
#define LL  INT64_PRINTF_FORMAT
30
#define VECTOR_FILE_VERSION 2
31

    
32
//=========================================================================
33

    
34
IndexedVectorFileReader::IndexedVectorFileReader(const char *filename, int vectorId)
35
    : fname(filename), index(NULL), vector(NULL), currentBlock(NULL)
36
{
37
    std::string ifname = IndexFile::getIndexFileName(filename);
38
    IndexFileReader indexReader(ifname.c_str());
39
    index = indexReader.readAll(); // XXX do not read whole index
40
    vector = index->getVectorById(vectorId);
41

    
42
    if (!vector)
43
        throw opp_runtime_error("Vector with vectorId %d not found in file '%s'",
44
                vectorId, filename);
45
}
46

    
47
IndexedVectorFileReader::~IndexedVectorFileReader()
48
{
49
    if (index != NULL)
50
        delete index;
51
}
52

    
53
// see filemgrs.h
54
#define MIN_BUFFER_SIZE 512
55

    
56
#ifdef CHECK
57
#undef CHECK
58
#endif
59
#define CHECK(cond, msg, block, line) \
60
            if (!(cond))\
61
            {\
62
                throw opp_runtime_error("Invalid vector file syntax: %s, file %s, block offset %"LL"d, line in block %d", \
63
                                        msg, fname.c_str(), (int64)block.startOffset, line);\
64
            }
65

    
66
void IndexedVectorFileReader::loadBlock(const Block &block)
67
{
68
    if (currentBlock == &block)
69
        return;
70

    
71
    if (currentBlock != NULL) {
72
        currentBlock = NULL;
73
        currentEntries.clear();
74
    }
75

    
76
    size_t bufferSize = vector->blockSize;
77
    if (bufferSize < MIN_BUFFER_SIZE)
78
        bufferSize = MIN_BUFFER_SIZE;
79
    FileReader reader(fname.c_str(), bufferSize);
80

    
81
    long count=block.getCount();
82
    reader.seekTo(block.startOffset);
83
    currentEntries.resize(count);
84

    
85
    char *line, **tokens;
86
    int numTokens;
87
    LineTokenizer tokenizer;
88
    int id;
89

    
90
    std::string columns = vector->columns;
91
    int columnsNo = columns.size();
92

    
93
    for (int i=0; i<count; ++i)
94
    {
95
        CHECK(line=reader.getNextLineBufferPointer(), "Unexpected end of file", block, i);
96
        int len = reader.getCurrentLineLength();
97

    
98
        tokenizer.tokenize(line, len);
99
        tokens=tokenizer.tokens();
100
        numTokens = tokenizer.numTokens();
101

    
102
        CHECK(numTokens >= (int)columns.size() + 1, "Line is too short", block, i);
103
        CHECK(parseInt(tokens[0],id) && id==vector->vectorId, "Missing or unexpected vector id", block, i);
104

    
105
        OutputVectorEntry &entry = currentEntries[i];
106
        entry.serial = block.startSerial+i;
107
        for (int j = 0; j < columnsNo; ++j)
108
        {
109
            switch (columns[j])
110
            {
111
            case 'E': CHECK(parseInt64(tokens[j+1], entry.eventNumber), "Malformed event number", block, i); break;
112
            case 'T': CHECK(parseSimtime(tokens[j+1], entry.simtime), "Malformed simulation time", block, i); break;
113
            case 'V': CHECK(parseDouble(tokens[j+1], entry.value), "Malformed vector value", block, i); break;
114
            default: CHECK(false, "Unknown column", block, i); break;
115
            }
116
        }
117
    }
118

    
119
    currentBlock = &block;
120
}
121

    
122
OutputVectorEntry *IndexedVectorFileReader::getEntryBySerial(long serial)
123
{
124
    if (serial<0 || serial>=vector->getCount())
125
        return NULL;
126

    
127
    if (currentBlock == NULL || !currentBlock->contains(serial))
128
    {
129
        loadBlock(*(vector->getBlockBySerial(serial)));
130
    }
131

    
132
    return &currentEntries.at(serial - currentBlock->startSerial);
133
}
134

    
135
OutputVectorEntry *IndexedVectorFileReader::getEntryBySimtime(simultime_t simtime, bool after)
136
{
137
    const Block *block = vector->getBlockBySimtime(simtime, after);
138
    if (block)
139
    {
140
        loadBlock(*block);
141
        if (after)
142
        {
143
            for (Entries::iterator it = currentEntries.begin(); it != currentEntries.end(); ++it) // FIXME: binary search
144
                if (it->simtime >= simtime)
145
                    return &(*it);
146
        }
147
        else
148
        {
149
            for (Entries::reverse_iterator it = currentEntries.rbegin(); it != currentEntries.rend(); ++it)  // FIXME: binary search
150
                if (it->simtime <= simtime)
151
                    return &(*it);
152
        }
153
    }
154
    return NULL;
155
}
156

    
157
OutputVectorEntry *IndexedVectorFileReader::getEntryByEventnum(eventnumber_t eventNum, bool after)
158
{
159
    const Block *block = vector->getBlockByEventnum(eventNum, after);
160
    if (block)
161
    {
162
        loadBlock(*block);
163
        if (after)
164
        {
165
            for (Entries::iterator it = currentEntries.begin(); it != currentEntries.end(); ++it) // FIXME: binary search
166
                if (it->eventNumber >= eventNum)
167
                    return &(*it);
168
        }
169
        else
170
        {
171
            for (Entries::reverse_iterator it = currentEntries.rbegin(); it != currentEntries.rend(); ++it) // FIXME: binary search
172
                if (it->eventNumber <= eventNum)
173
                    return &(*it);
174
        }
175
    }
176
    return NULL;
177
}
178

    
179
long IndexedVectorFileReader::collectEntriesInSimtimeInterval(simultime_t startTime, simultime_t endTime, Entries &out)
180
{
181
    Blocks::size_type startIndex;
182
    Blocks::size_type endIndex;
183
    vector->getBlocksInSimtimeInterval(startTime, endTime, /*out*/ startIndex, /*out*/ endIndex);
184

    
185
    Entries::size_type count = 0;
186
    for (Blocks::size_type i = startIndex; i < endIndex; i++)
187
    {
188
        const Block &block = vector->blocks[i];
189
        loadBlock(block);
190
        for (long j = 0; j < block.getCount(); ++j)
191
        {
192
            OutputVectorEntry &entry = currentEntries[j];
193
            if (startTime <= entry.simtime && entry.simtime <= endTime)
194
            {
195
                out.push_back(entry);
196
                count++;
197
            }
198
            else if (entry.simtime > endTime)
199
                break;
200
        }
201
    }
202
    return count;
203
}
204

    
205
long IndexedVectorFileReader::collectEntriesInEventnumInterval(eventnumber_t startEventNum, eventnumber_t endEventNum, Entries &out)
206
{
207
    Blocks::size_type startIndex;
208
    Blocks::size_type endIndex;
209
    vector->getBlocksInEventnumInterval(startEventNum, endEventNum, /*out*/ startIndex, /*out*/ endIndex);
210

    
211
    Entries::size_type count = 0;
212
    for (Blocks::size_type i = startIndex; i < endIndex; i++)
213
    {
214
        const Block &block = vector->blocks[i];
215
        loadBlock(block);
216

    
217
        for (long j = 0; j < block.getCount(); ++j)
218
        {
219
            OutputVectorEntry &entry = currentEntries[j];
220
            if (startEventNum <= entry.eventNumber && entry.eventNumber <= endEventNum)
221
            {
222
                out.push_back(entry);
223
                count++;
224
            }
225
            else if (entry.eventNumber > endEventNum)
226
                break;
227
        }
228
    }
229
    return count;
230
}
231

    
232
//=========================================================================
233

    
234
#ifdef CHECK
235
#undef CHECK
236
#endif
237
#define CHECK(printf) if (printf<0) throw opp_runtime_error("Cannot write vector file '%s'", fileName.c_str());
238

    
239

    
240
static FILE *openFile(const std::string fileName)
241
{
242
    FILE *f = fopen(fileName.c_str(),"w");
243
    if (f==NULL)
244
        throw opp_runtime_error("Cannot open vector file `%s'", fileName.c_str());
245
    setlocale(LC_NUMERIC, "C"); // write '.' as decimal marker
246
    return f;
247
}
248

    
249
IndexedVectorFileWriterNode::IndexedVectorFileWriterNode(const char *fileName, const char *indexFileName, int blockSize, const char *fileHeader)
250
{
251
    f = NULL;
252
    indexWriter = NULL;
253
    this->prec = DEFAULT_PRECISION;
254
    this->fileHeader = (fileHeader ? fileHeader : "");
255
    this->fileName = fileName;
256
    this->indexFileName = indexFileName;
257
    this->blockSize = blockSize;
258
}
259

    
260
IndexedVectorFileWriterNode::~IndexedVectorFileWriterNode()
261
{
262
    for (PortVector::iterator it=ports.begin(); it!=ports.end(); it++)
263
        delete *it;
264
}
265

    
266
Port *IndexedVectorFileWriterNode::addVector(int vectorId, const char *module, const char *name, const char *columns)
267
{
268
    VectorInputPort *inputport = new VectorInputPort(vectorId, module, name, columns, blockSize, this);
269
    ports.push_back(inputport);
270
    return inputport;
271
}
272

    
273
Port *IndexedVectorFileWriterNode::addVector(const VectorResult &vector)
274
{
275
    VectorInputPort *inputport = new VectorInputPort(vector.vectorId, vector.moduleNameRef->c_str(), vector.nameRef->c_str(),
276
                                            vector.columns.c_str(), blockSize, this);
277
    inputport->vector.attributes = vector.attributes;
278
    ports.push_back(inputport);
279
    return inputport;
280
}
281

    
282
bool IndexedVectorFileWriterNode::isReady() const
283
{
284
    for (PortVector::const_iterator it=ports.begin(); it!=ports.end(); it++)
285
    {
286
        VectorInputPort *port=*it;
287
        if (port->getChannel()->length()>0 || (port->getChannel()->isClosing() && port->hasBufferedData()))
288
            return true;
289
    }
290
    return false;
291
}
292

    
293
void IndexedVectorFileWriterNode::process()
294
{
295
    // open file if needed
296
    if (!f)
297
    {
298
        f = openFile(fileName);
299
        // print file header
300
        CHECK(fprintf(f,"%s\n", fileHeader.c_str()));
301
        CHECK(fprintf(f,"version %d\n", VECTOR_FILE_VERSION));
302

    
303
        // print run attributes
304
        run.writeToFile(f, fileName.c_str());
305

    
306
        // print vector declarations and attributes
307
        for (PortVector::iterator it=ports.begin(); it!=ports.end(); it++)
308
        {
309
            const VectorData &vector = (*it)->vector;
310
            CHECK(fprintf(f, "vector %d  %s  %s  %s\n",
311
                    vector.vectorId, QUOTE(vector.moduleName.c_str()), QUOTE(vector.name.c_str()), vector.columns.c_str()));
312
            for (StringMap::const_iterator attr=vector.attributes.begin(); attr!=vector.attributes.end(); attr++)
313
                CHECK(fprintf(f,"attr %s  %s\n", QUOTE(attr->first.c_str()), QUOTE(attr->second.c_str())));
314
        }
315
    }
316

    
317
    for (PortVector::iterator it=ports.begin(); it!=ports.end(); it++)
318
    {
319
        VectorInputPort *port=*it;
320
        if (!port->finished)
321
        {
322
            if (port->getChannel()->length()>0)
323
                writeRecordsToBuffer(port);
324
            if (port->getChannel()->eof()) {
325
                if (port->hasBufferedData())
326
                    writeBufferToFile(port);
327
                writeIndex(port);
328
                port->finished = true;
329
            }
330
        }
331
    }
332

    
333
}
334

    
335
bool IndexedVectorFileWriterNode::isFinished() const
336
{
337
    for (PortVector::const_iterator it=ports.begin(); it!=ports.end(); it++)
338
    {
339
        VectorInputPort *port=*it;
340
        if (!port->finished)
341
            return false;
342
    }
343

    
344
    // close output vector and index files
345
    if (f != NULL)
346
        fclose(f);
347
    if (indexWriter != NULL)
348
    {
349
        indexWriter->writeFingerprint(fileName);
350
        delete indexWriter;
351
    }
352

    
353
    return true;
354
}
355

    
356
void IndexedVectorFileWriterNode::bufferPrintf(VectorInputPort *port, const char *format...)
357
{
358
    va_list va;
359
    va_start(va, format);
360
    int count = vsprintf(port->bufferPtr, format, va);
361
    va_end(va);
362
    if (count < 0)
363
        throw opp_runtime_error("Cannot write data to output buffer");
364
    port->bufferPtr+=count;
365
}
366

    
367
void IndexedVectorFileWriterNode::writeRecordsToBuffer(VectorInputPort *port)
368
{
369
    assert(port->vector.blocks.size() > 0);
370

    
371
    int vectorId = port->vector.vectorId;
372
    Channel *chan = port->getChannel();
373
    int n = chan->length();
374
    std::string &columns = port->vector.columns;
375
    int colno = columns.size();
376
    Datum a;
377
    char buf[64];
378
    char *endp;
379

    
380
    if (colno == 2 && columns[0] == 'T' && columns[1] == 'V')
381
    {
382
        for (int i=0; i<n; i++)
383
        {
384
            chan->read(&a,1);
385
            if (port->bufferPtr - port->buffer >= port->bufferSize - 100)
386
                writeBufferToFile(port);
387
            if (a.xp.isNil())
388
                bufferPrintf(port, "%d\t%.*g\t%.*g\n", vectorId, prec, a.x, prec, a.y);
389
            else
390
                bufferPrintf(port, "%d\t%s\t%.*g\n", vectorId, BigDecimal::ttoa(buf, a.xp, endp), prec, a.y);
391
            port->bufferNumOfRecords++;
392
            port->vector.blocks.back().collect(-1, a.x, a.y);
393
        }
394
    }
395
    else if (colno == 3 && columns[0] == 'E' && columns[1] == 'T' && columns[2] == 'V')
396
    {
397
        for (int i=0; i<n; i++)
398
        {
399
            chan->read(&a,1);
400
            if (port->bufferPtr - port->buffer >= port->bufferSize - 100)
401
                writeBufferToFile(port);
402
            if (a.xp.isNil())
403
                bufferPrintf(port, "%d\t%"LL"d\t%.*g\t%.*g\n", vectorId, a.eventNumber, prec, a.x, prec, a.y);
404
            else
405
                bufferPrintf(port, "%d\t%"LL"d\t%s\t%.*g\n", vectorId, a.eventNumber, BigDecimal::ttoa(buf, a.xp, endp), prec, a.y);
406
            port->bufferNumOfRecords++;
407
            port->vector.blocks.back().collect(a.eventNumber, a.x, a.y);
408
        }
409
    }
410
    else
411
    {
412
        for (int i=0; i<n; i++)
413
        {
414
            chan->read(&a,1);
415
            bufferPrintf(port,"%d", vectorId);
416
            for (int j=0; j<colno; ++j)
417
            {
418
                bufferPrintf(port, "\t");
419
                switch (columns[j])
420
                {
421
                case 'T':
422
                    if (a.xp.isNil())
423
                        bufferPrintf(port,"%.*g", prec, a.x);
424
                    else
425
                        bufferPrintf(port,"%s", BigDecimal::ttoa(buf, a.xp, endp)); break;
426
                case 'V': bufferPrintf(port,"%.*g", prec, a.y); break;
427
                case 'E': bufferPrintf(port,"%"LL"d", a.eventNumber); break;
428
                default: throw opp_runtime_error("unknown column type: '%c'", columns[j]);
429
                }
430
            }
431
            bufferPrintf(port, "\n");
432
            port->bufferNumOfRecords++;
433
            port->vector.blocks.back().collect(a.eventNumber, a.x, a.y);
434
        }
435
    }
436
}
437

    
438
void IndexedVectorFileWriterNode::writeBufferToFile(VectorInputPort *port)
439
{
440
    assert(f!=NULL);
441
    assert(port->vector.blocks.size() > 0);
442

    
443
    Block &currentBlock = port->vector.blocks.back();
444
    currentBlock.startOffset = opp_ftell(f);
445

    
446
    CHECK(fputs(port->buffer, f));
447
    currentBlock.size = (int64)(opp_ftell(f) - currentBlock.startOffset);
448
    port->vector.collect(currentBlock);
449
    port->clearBuffer();
450
    port->vector.blocks.push_back(Block());
451
}
452

    
453
void IndexedVectorFileWriterNode::writeIndex(VectorInputPort *port)
454
{
455
    if (indexWriter == NULL)
456
    {
457
        indexWriter = new IndexFileWriter(indexFileName.c_str(), prec);
458
        indexWriter->writeRun(run);
459
    }
460

    
461
    indexWriter->writeVector(port->vector);
462
}
463

    
464

    
465
//=========================================================================
466

    
467
const char *IndexedVectorFileWriterNodeType::getDescription() const
468
{
469
    return "Writes the output (several streams) into an indexed output vector file.";
470
}
471

    
472
void IndexedVectorFileWriterNodeType::getAttributes(StringMap& attrs) const
473
{
474
    attrs["filename"] = "name of the output vector file (.vec)";
475
    attrs["indexfilename"] = "name of the output index file (.vci)";
476
    attrs["blocksize"] = "size of the blocks of each vector";
477
    attrs["fileheader"] = "header written into the output vector file";
478
}
479

    
480
Node *IndexedVectorFileWriterNodeType::create(DataflowManager *mgr, StringMap& attrs) const
481
{
482
    checkAttrNames(attrs);
483

    
484
    const char *fileName = attrs["filename"].c_str();
485
    const char *indexFileName = attrs["indexfilename"].c_str();
486
    int blockSize = atoi(attrs["blocksize"].c_str());
487
    std::string header = attrs["fileheader"];
488

    
489
    IndexedVectorFileWriterNode *node = new IndexedVectorFileWriterNode(fileName, indexFileName, blockSize);
490
    node->setHeader(header);
491
    node->setNodeType(this);
492
    mgr->addNode(node);
493
    return node;
494
}
495

    
496
Port *IndexedVectorFileWriterNodeType::getPort(Node *node, const char *portname) const
497
{
498
    // vector id is used as port name
499
    IndexedVectorFileWriterNode *node1 = dynamic_cast<IndexedVectorFileWriterNode *>(node);
500

    
501
    LineTokenizer tokenizer;
502
    int vectorId;
503
    int numTokens = tokenizer.tokenize(portname, strlen(portname));
504
    char **tokens = tokenizer.tokens();
505
    if (numTokens < 3 || numTokens > 4)
506
    {
507
        throw opp_runtime_error(
508
                "IndexedVectorFileWriterNodeType::getPort(): "
509
                "expected '<vectorId> <module> <name> [<columns>]', received '%s' ",
510
                portname);
511
    }
512
    if (!parseInt(tokens[0], vectorId))
513
        throw opp_runtime_error(
514
                "IndexedVectorFileWriterNodeType::getPort(): "
515
                "expected an integer as vectorId, received '%s'",
516
                tokens[0]);
517

    
518
    const char* moduleName = tokens[1];
519
    const char* name = tokens[2];
520
    const char* columns = (numTokens < 4 ? "TV" : tokens[3]);
521
    return node1->addVector(vectorId, moduleName, name, columns);
522
}
523

    
524