Project

General

Profile

Statistics
| Branch: | Revision:

root / src / scave / mergernodes.cc @ cbd2c699

History | View | Annotate | Download (7.16 KB)

1
//=========================================================================
2
//  MERGERNODES.CC - part of
3
//                  OMNeT++/OMNEST
4
//           Discrete System Simulation in C++
5
//
6
//  Author: Andras Varga
7
//
8
//=========================================================================
9

    
10
/*--------------------------------------------------------------*
11
  Copyright (C) 1992-2008 Andras Varga
12
  Copyright (C) 2006-2008 OpenSim Ltd.
13

14
  This file is distributed WITHOUT ANY WARRANTY. See the file
15
  `license' for details on this and other legal matters.
16
*--------------------------------------------------------------*/
17

    
18
#include <string.h>
19
#include <algorithm>
20
#include "channel.h"
21
#include "scaveutils.h"
22
#include "mergernodes.h"
23

    
24
USING_NAMESPACE
25

    
26

    
27
Port *MergerNode::addPort()
28
{
29
    ports.push_back(Port(this));
30
    return &(ports.back());
31
}
32

    
33
bool MergerNode::isReady() const
34
{
35
    // every input port must have data available (or already have reached EOF)
36
    for (PortVector::const_iterator it=ports.begin(); it!=ports.end(); it++)
37
        if ((*it)()->length()==0 && !(*it)()->isClosing())
38
            return false;
39
    return true;
40
}
41

    
42
void MergerNode::process()
43
{
44
    // must maintain increasing time order in output, so:
45
    // always read from the port with smallest time value coming --
46
    // if port has reached EOF, skip it
47
    Port *minPort = NULL;
48
    const Datum *minDatum;
49
    for (PortVector::iterator it=ports.begin(); it!=ports.end(); it++)
50
    {
51
        Channel *chan = (*it)();
52
        const Datum *dp = chan->peek();
53
        if (dp && (!minPort || dp->x < minDatum->x))
54
        {
55
            minPort = &(*it);
56
            minDatum = dp;
57
        }
58
    }
59

    
60
    // if we couldn't get any data, all input ports must be at EOF (see isReady())
61
    if (minPort)
62
    {
63
        Datum d;
64
        (*minPort)()->read(&d,1);
65
        out()->write(&d,1);
66
    }
67
}
68

    
69
bool MergerNode::isFinished() const
70
{
71
    // only finished if all ports are at EOF
72
    for (PortVector::const_iterator it=ports.begin(); it!=ports.end(); it++)
73
        if (!(*it)()->eof())
74
            return false;
75
    return true;
76
}
77

    
78
//-------
79

    
80
const char *MergerNodeType::getDescription() const
81
{
82
    return "Merges several series into a single one, maintaining increasing\n"
83
           "time order in the output.";
84
}
85

    
86
void MergerNodeType::getAttributes(StringMap& attrs) const
87
{
88
}
89

    
90
Node *MergerNodeType::create(DataflowManager *mgr, StringMap& attrs) const
91
{
92
    checkAttrNames(attrs);
93

    
94
    Node *node = new MergerNode();
95
    node->setNodeType(this);
96
    mgr->addNode(node);
97
    return node;
98
}
99

    
100
Port *MergerNodeType::getPort(Node *node, const char *portname) const
101
{
102
    MergerNode *node1 = dynamic_cast<MergerNode *>(node);
103
    if (!strcmp(portname,"next-in"))
104
        return node1->addPort();
105
    else if (!strcmp(portname,"out"))
106
        return &(node1->out);
107
    throw opp_runtime_error("no such port `%s'", portname);
108
}
109

    
110
// ---- Aggregator ------
111

    
112
AggregatorNode::AggregatorNode(const std::string &function)
113
    : out(this)
114
{
115
    if (function == "sum") fn = Sum;
116
    else if (function == "average") fn = Average;
117
    else if (function == "count") fn = Count;
118
    else if (function == "minimum") fn = Minimum;
119
    else if (function == "maximum") fn = Maximum;
120
    else fn = Average; //TODO why not error?
121
}
122

    
123
void AggregatorNode::init()
124
{
125
    switch (fn)
126
    {
127
        case Sum: acc = 0.0; break;
128
        case Average: acc = 0.0; count = 0; break;
129
        case Count:   count = 0; break;
130
        case Minimum: acc = POSITIVE_INFINITY; break;
131
        case Maximum: acc = NEGATIVE_INFINITY; break;
132
        default: Assert(false);
133
    }
134
}
135

    
136
void AggregatorNode::collect(double value)
137
{
138
    switch (fn)
139
    {
140
        case Sum: acc += value; break;
141
        case Average: acc += value; count++; break;
142
        case Count:   count++; break;
143
        case Minimum: acc = std::min(value, acc); break;
144
        case Maximum: acc = std::max(value, acc); break;
145
        default: Assert(false);
146
    }
147
}
148

    
149
double AggregatorNode::result()
150
{
151
    switch (fn)
152
    {
153
        case Sum: return acc;
154
        case Average: return acc/count;
155
        case Count:   return count;
156
        case Minimum:
157
        case Maximum: return acc;
158
        default: Assert(false); return NaN;
159
    }
160
}
161

    
162
Port *AggregatorNode::addPort()
163
{
164
    ports.push_back(Port(this));
165
    return &(ports.back());
166
}
167

    
168
bool AggregatorNode::isReady() const
169
{
170
    // every input port must have data available (or already have reached EOF)
171
    for (PortVector::const_iterator it=ports.begin(); it!=ports.end(); it++)
172
        if ((*it)()->length()==0 && !(*it)()->isClosing())
173
            return false;
174
    return true;
175
}
176

    
177
void AggregatorNode::process()
178
{
179
    const Datum *minDatum = NULL;
180
    for (PortVector::iterator it=ports.begin(); it!=ports.end(); it++)
181
    {
182
        Channel *chan = (*it)();
183
        const Datum *dp = chan->peek();
184
        if (dp && (!minDatum || dp->x < minDatum->x))
185
            minDatum = dp;
186
    }
187

    
188
    // if we couldn't get any data, all input ports must be at EOF (see isReady())
189
    if (minDatum)
190
    {
191
        Datum d;
192
        d.x = minDatum->x;
193

    
194
        init();
195
        for (PortVector::iterator it=ports.begin(); it!=ports.end(); it++)
196
        {
197
            Channel *chan = (*it)();
198
            const Datum *dp = chan->peek();
199
            if (dp && dp->x == d.x)
200
            {
201
                Datum d2;
202
                chan->read(&d2,1);
203
                collect(d2.y);
204
            }
205
        }
206

    
207
        d.y = result();
208
        out()->write(&d,1);
209
    }
210
}
211

    
212
bool AggregatorNode::isFinished() const
213
{
214
    // only finished if all ports are at EOF
215
    for (PortVector::const_iterator it=ports.begin(); it!=ports.end(); it++)
216
        if (!(*it)()->eof())
217
            return false;
218
    return true;
219
}
220

    
221
//-------
222

    
223
const char *AggregatorNodeType::getDescription() const
224
{
225
    return "Aggregates several vectors into a single one, aggregating the\n"
226
            "y values at the same time coordinate with the specified function.";
227
}
228

    
229
void AggregatorNodeType::getAttributes(StringMap& attrs) const
230
{
231
    attrs["function"] = "the aggregator function; one of sum,average,count,maximum,minimum";
232
}
233

    
234
void AggregatorNodeType::getAttrDefaults(StringMap& attrs) const
235
{
236
    attrs["function"] = "average";
237
}
238

    
239
void AggregatorNodeType::validateAttrValues(const StringMap& attrs) const
240
{
241
    StringMap::const_iterator it = attrs.find("function");
242
    if (it != attrs.end())
243
    {
244
        const std::string& fn = it->second;
245
        if (fn != "average" && fn != "count" && fn != "minimum" && fn != "maximum")
246
            throw opp_runtime_error("Unknown aggregator function: %s.", fn.c_str());
247
    }
248
}
249

    
250
Node *AggregatorNodeType::create(DataflowManager *mgr, StringMap& attrs) const
251
{
252
    checkAttrNames(attrs);
253

    
254
    StringMap::iterator it = attrs.find("function");
255
    std::string functionName = it == attrs.end() ? "average" : it->second;
256

    
257
    Node *node = new AggregatorNode(functionName);
258
    node->setNodeType(this);
259
    mgr->addNode(node);
260
    return node;
261
}
262

    
263
Port *AggregatorNodeType::getPort(Node *node, const char *portname) const
264
{
265
    AggregatorNode *node1 = dynamic_cast<AggregatorNode*>(node);
266
    if (!strcmp(portname,"next-in"))
267
        return node1->addPort();
268
    else if (!strcmp(portname,"out"))
269
        return &(node1->out);
270
    throw opp_runtime_error("no such port `%s'", portname);
271
}
272

    
273