Project

General

Profile

Statistics
| Branch: | Revision:

root / src / scave / dataflowmanager.cc @ a3be1d55

History | View | Annotate | Download (8.1 KB)

1
//=========================================================================
2
//  DATAFLOWMANAGER.CC - part of
3
//                  OMNeT++/OMNEST
4
//           Discrete System Simulation in C++
5
//
6
//  Author: Andras Varga
7
//
8
//=========================================================================
9

    
10
/*--------------------------------------------------------------*
11
  Copyright (C) 1992-2008 Andras Varga
12
  Copyright (C) 2006-2008 OpenSim Ltd.
13

14
  This file is distributed WITHOUT ANY WARRANTY. See the file
15
  `license' for details on this and other legal matters.
16
*--------------------------------------------------------------*/
17

    
18
#include <stdio.h>
19
#include <string.h>
20
#include "channel.h"
21
#include "nodetype.h"
22
#include "commonnodes.h"
23
#include "dataflowmanager.h"
24

    
25
USING_NAMESPACE
26

    
27

    
28
DataflowManager::DataflowManager()
29
{
30
    lastnode = 0;
31
    threshold = 1000;
32
}
33

    
34
DataflowManager::~DataflowManager()
35
{
36
    unsigned int i;
37
    for (i=0; i<nodes.size(); i++)
38
        delete nodes[i];
39
    for (i=0; i<channels.size(); i++)
40
        delete channels[i];
41
}
42

    
43
void DataflowManager::addNode(Node *node)
44
{
45
    nodes.push_back(node);
46
    node->setDataflowManager(this);
47
}
48

    
49
void DataflowManager::addChannel(Channel *channel)
50
{
51
    channels.push_back(channel);
52
}
53

    
54
void DataflowManager::connect(Port *src, Port *dest)
55
{
56
    if (src->getChannel())
57
        throw opp_runtime_error("connect: source port already connected");
58
    if (dest->getChannel())
59
        throw opp_runtime_error("connect: destination port already connected");
60
    if (!dest->getNode())
61
        throw opp_runtime_error("connect: port's owner node not filled in");
62

    
63
    Channel *ch = new Channel();
64
    addChannel(ch);
65

    
66
    src->setChannel(ch);
67
    dest->setChannel(ch);
68
    ch->setProducerNode(src->getNode());
69
    ch->setConsumerNode(dest->getNode());
70
}
71

    
72
// FIXME: validate node attributes
73

    
74
void DataflowManager::execute(IProgressMonitor *monitor)
75
{
76
    if (nodes.size()==0)
77
        return;
78

    
79
    //
80
    // repeat until all nodes have finished:
81
    //   select a node which is:
82
    //     - ready and not finished
83
    //     - and its input channel has buffered a lot
84
    //     - or it's a source node and its output channel is (nearly) empty
85
    //   then call process() on it
86
    // deadlock is when a node has not finished yet but none of the others are ready;
87
    // deadlock should not (i.e. will not) happen with proper scheduling
88
    //
89
    int64 onePercentFileSize = 0;
90
    int64 bytesRead = 0;
91
    int readPercentage = 0;
92
    if (monitor)
93
    {
94
        onePercentFileSize = getTotalBytesToBeRead() / 100;
95
        monitor->beginTask("Executing dataflow network", 100);
96
    }
97

    
98
    while (true)
99
    {
100
        ReaderNode *readerNode = NULL;
101
        int64 readBefore = 0;
102
        Node *node = selectNode();
103
        if (!node)
104
            break;
105

    
106
        if (monitor)
107
        {
108
            if (monitor->isCanceled())
109
            {
110
                monitor->done();
111
                return;
112
            }
113
            if (isReaderNode(node))
114
            {
115
                readerNode = dynamic_cast<ReaderNode *>(node);
116
                readBefore = readerNode->getNumReadBytes();
117
            }
118
        }
119

    
120
        DBG(("execute: invoking %s\n", node->getNodeType()->getName()));
121
        node->process();
122

    
123
        if (monitor)
124
        {
125
            if (onePercentFileSize > 0 && readerNode)
126
            {
127
                bytesRead += (readerNode->getNumReadBytes() - readBefore);
128
                int currentPercentage = bytesRead / onePercentFileSize;
129
                if (currentPercentage > readPercentage)
130
                {
131
                    monitor->worked(currentPercentage - readPercentage);
132
                    readPercentage = currentPercentage;
133
                }
134
            }
135
        }
136
    }
137

    
138
    if (monitor)
139
        monitor->done();
140

    
141
    DBG(("execute: processing finished\n"));
142

    
143
    // propagate finished state to all nodes (transitive closure)
144
    unsigned int i=0;
145
    while (i<nodes.size())
146
    {
147
        if (nodes[i]->getAlreadyFinished())
148
            i++;
149
        else if (!updateNodeFinished(nodes[i]))
150
            i++;  // if not finished, skip it
151
        else
152
            i=0;  // if one node finished, start over (to let it cascade)
153
    }
154

    
155
    // check all nodes have finished now
156
    for (i=0; i<nodes.size(); i++)
157
        if (!nodes[i]->getAlreadyFinished())
158
            throw opp_runtime_error("execute: deadlock: no ready nodes but node %s not finished",
159
                                nodes[i]->getNodeType()->getName());
160

    
161
    // check all channel buffers are empty
162
    for (i=0; i<channels.size(); i++)
163
        if (!channels[i]->eof())
164
            throw opp_runtime_error("execute: all nodes finished but channel %d not at eof", i);
165
}
166

    
167
bool DataflowManager::updateNodeFinished(Node *node)
168
{
169
    //
170
    // if node says it's isFinished():
171
    // - call consumerClose() on its input channels (they'll ignore futher writes then);
172
    // - call close() on its output channels
173
    // - set getAlreadyFinished() state flag in node, so that we won't keep asking it
174
    //
175
    if (!node->isFinished())
176
        return false;
177

    
178
    DBG(("DBG: %s finished\n", node->getNodeType()->getName()));
179
    node->setAlreadyFinished();
180
    int nc = channels.size();
181
    for (int i=0; i!=nc; i++)
182
    {
183
        Channel *ch = channels[i];
184
        if (ch->getConsumerNode()==node)
185
        {
186
            DBG(("DBG:   one input closed\n"));
187
            ch->consumerClose();
188
        }
189
        if (ch->getProducerNode()==node)
190
        {
191
            DBG(("DBG:   one output closed\n"));
192
            ch->close();
193
        }
194
    }
195
    return true;
196
}
197

    
198
Node *DataflowManager::selectNode()
199
{
200
    // if a channel has buffered too much, try to schedule its consumer node
201
    int nc = channels.size();
202
    for (int j=0; j!=nc; j++)
203
    {
204
        Channel *ch = channels[j];
205
        if (ch->length()>threshold && ch->getConsumerNode()->isReady())
206
        {
207
            Node *node = ch->getConsumerNode();
208
            Assert(!node->getAlreadyFinished());
209
            if (!updateNodeFinished(node))
210
                return node;
211
        }
212
    }
213

    
214
    // round robin scheduling
215
    int n = nodes.size();
216
    int i = lastnode;
217
    assert(n!=0);
218
    do
219
    {
220
        CONTINUE:
221
            i = (i+1)%n;
222
            Node *node = nodes[i];
223
            if (!node->getAlreadyFinished())
224
            {
225
                if (updateNodeFinished(node))
226
                {
227
                    // When a node finished, some node might get ready that was
228
                    // not ready before (e.g. has some buffered data), so start over the loop.
229
                    i = lastnode;
230
                    goto CONTINUE;
231
                }
232
                else if (node->isReady())
233
                {
234
                    if (i==lastnode)
235
                        DBG(("DBG: %s invoked again -- perhaps its process() doesn't do as much at once as it could?\n", node->getNodeType()->getName()));
236
                    lastnode = i;
237
                    return node;
238
                }
239
            }
240
    }
241
    while (i!=lastnode);
242

    
243
    return NULL;
244
}
245

    
246
bool DataflowManager::isReaderNode(Node *node)
247
{
248
    return strcmp(node->getNodeType()->getCategory(), "reader-node") == 0;
249
}
250

    
251
int64 DataflowManager::getTotalBytesToBeRead()
252
{
253
    int64 totalFileSize = 0;
254
    for (int i = 0; i < (int)nodes.size(); ++i)
255
    {
256
        if (isReaderNode(nodes[i]))
257
        {
258
            ReaderNode *readerNode = dynamic_cast<ReaderNode *>(nodes[i]);
259
            totalFileSize += readerNode->getFileSize();
260
        }
261
    }
262
    return totalFileSize;
263
}
264

    
265
void DataflowManager::dump()
266
{
267
    printf("DATAFLOW NETWORK:\n");
268
    int n = nodes.size();
269
    printf("Nodes (%d):\n", n);
270
    for (int i=0; i<n; i++)
271
    {
272
        Node *node = nodes[i];
273
        NodeType *nodeType = node->getNodeType();
274
        printf(" node[%d]: %p %s\n", i, node, nodeType->getName());
275
    }
276

    
277
    int nc = channels.size();
278
    printf("Channels (%d):\n", nc);
279
    for (int j=0; j<nc; j++)
280
    {
281
        Channel *ch = channels[j];
282
        Node *prodNode = ch->getProducerNode();
283
        Node *consNode = ch->getConsumerNode();
284
        printf(" channel[%d]: node %p %s --> node %p %s\n", j,
285
             prodNode, prodNode->getNodeType()->getName(),
286
             consNode, consNode->getNodeType()->getName());
287
    }
288
    fflush(stdout);
289
}
290