Revision 6b81f4fa

View differences:

configure.user
19 19
#    no  -> use barrier messages
20 20
#
21 21
NOBARRIER=yes
22

  
23 22
#
24 23
# In combination with NOBARRIER: Define the cache line size used for aligning
25 24
# synchronization data structures to cache line borders.
include/clockedthreadpool.h
113 113
     * Inserts a new task in the task queue. Internal use only.
114 114
     */
115 115
    virtual void insertTask(cMessage* msg, simtime_t duration);
116

  
117
    /**
118
     * Used By cSpinningThreadpool
119
     * Returns true if msg is scheduled before the minimal Barrier (barrierMin).
120
     * Returns false otherwise.
121
     */
122
    virtual bool isBeforeBarrier(cMessage* msg);
116 123
    //}@
117 124
};
118 125

  
include/cthreadpool.h
143 143
     * Inserts a new task in the task queue. Internal use only.
144 144
     */
145 145
    virtual void insertTask(cMessage* msg, simtime_t duration) = 0;
146

  
147
    /**
148
     * Used by cSpinningThreadpool
149
     * Returns true if msg is scheduled before the minimal Barrier (barrierMin).
150
     * Returns false otherwise.
151
     */
152
    virtual bool isBeforeBarrier(cMessage* msg) = 0;
146 153
    //}@
147 154
};
148 155

  
src/sim/clockedthreadpool.cc
188 188
{
189 189
    throw cRuntimeError("Invalid parameter combination: LockedThreadpool cannot be combined with NOBARRIER!");
190 190
}
191

  
192
bool cLockedThreadPool::isBeforeBarrier(cMessage* msg)
193
{
194
    throw cRuntimeError("Invalid parameter combination: LockedThreadpool cannot be combined with NOBARRIER!");
195
}
196

  
src/sim/cscheduler.cc
53 53
cMessage *cSequentialScheduler::getNextEvent()
54 54
{
55 55
#ifdef NOBARRIER
56
    // Do we have to wait for a barrier?
57
    if (sim->threadPool) sim->threadPool->waitAtBarrier(&(sim->msgQueue));
58

  
56 59
    //
57 60
    // If we retrieve a valid msg from the queue, we return it:
58 61
    //
59 62
    cMessage *msg = sim->msgQueue.removeFirst();
60
    if (msg)
63

  
64
    if (msg) {
65
        /*
66
         * Set Duration
67
         */
68
        cSimpleModule* mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
69
        if (mod->isAsyncModule()) {
70
            msg->setEventDuration(((cAsyncModule*) mod)->getProcessingDelay(msg));
71
        }
72
        /*
73
         * return message
74
         */
61 75
        return msg;
76
    }
77

  
62 78

  
63 79
    //
64 80
    // if there is no event left and we don't use the threadpool, end the sim
......
79 95
    //
80 96
    // If there is a msg now, we return it:
81 97
    //
82
    if (msg)
98
    if (msg) {
99
        /*
100
         * Set Duration
101
         */
102
        cSimpleModule* mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
103
        if (mod->isAsyncModule()) {
104
            msg->setEventDuration(((cAsyncModule*) mod)->getProcessingDelay(msg));
105
        }
106
        /*
107
         * return message
108
         */
83 109
        return msg;
110
    }
84 111

  
85 112
    //
86 113
    // If there is still no message in the queue, there are
......
94 121
    cMessage *msg = NULL;
95 122
    cBarrierMessage* barrier = NULL;
96 123

  
97
    while (msg == NULL)
124
    while (!msg)
98 125
    {
99 126
        msg = sim->msgQueue.removeFirst();
100 127
        if (!msg)
......
147 174

  
148 175
cMessage *cEEFScheduler::getNextEvent()
149 176
{
177

  
178
    //TODO: check if barriermessage is still vaild before waiting (may speed up things, as IES may not run empty)
179

  
180
    cMessage *msg = NULL;
181
    cSimpleModule* mod = NULL;
182
    cAsyncModule* aMod = NULL;
183

  
150 184
#ifdef NOBARRIER
151
    assert(false); //Not yet implemented
152
    //
153
    // If we retrieve a valid msg from the queue, we return it:
154
    //
155
    cMessage *msg = sim->msgQueue.peekFirst();
156
    if (msg)
157
    return msg;
158 185

  
186
    while (!(sim->msgQueue.empty()))
187
       {
188
        msg = sim->msgQueue.removeFirst();
189

  
190
        mod = (cSimpleModule*) sim->getModule(msg->getArrivalModuleId());
191

  
192
        if (!(mod->isAsyncModule())) {
193
            break;
194
        }
195
        aMod = (cAsyncModule*) mod;
196
        simtime_t duration = aMod->getProcessingDelay(msg);
197
        msg->setEventDuration(duration);
198
        bool mayPar = aMod->mayParallelize(msg, duration);
199

  
200
        if (!mayPar) {
201
            break;
202
        }
203
        /*
204
         * If there is an active barrier in the threadpool before msg, break
205
         */
206
        if (sim->threadPool->isBeforeBarrier(msg)) {
207
            break;
208
        }
209
        /*
210
         * If an event in the IES would cause a barrier before msg, break
211
         */
212
        if (!independentEventsHeap.empty() && msg->getArrivalTime() >= independentEventsHeap.peekFirst()->getTend()) {
213
            break;
214
        }
215
        printf(
216
                "adding to IEH: %s, tstart=%f, tend= %f, now First in IEH: ",
217
                ((cSimpleModule*) sim->getModule(msg->getArrivalModuleId()))->getName(),SIMTIME_DBL(msg->getArrivalTime()),
218
                SIMTIME_DBL(msg->getTend()));
219

  
220
        independentEventsHeap.insert(msg);
221

  
222
        printf(
223
                "%s, length=%i\n",
224
                ((cSimpleModule*) sim->getModule(
225
                        independentEventsHeap.peekFirst()->getArrivalModuleId()))->getName(),
226
                independentEventsHeap.length());
227
        msg = NULL;
228
    }
229

  
230
    if (msg) sim->msgQueue.insert(msg);
231

  
232
    if(independentEventsHeap.empty()) {
233
        if (!sim->msgQueue.empty()) {
234
            // Do we have to wait for a barrier?
235
            if (sim->threadPool) sim->threadPool->waitAtBarrier(&(sim->msgQueue));
236
            return sim->msgQueue.removeFirst();
237
        }
238
    } else {
239
        return independentEventsHeap.getFirst();
240
    }
241
    // At this point, both IES and FES are empty
159 242
    //
160 243
    // if there is no event left and we don't use the threadpool, end the sim
161 244
    //
......
170 253
    {
171 254
        __asm__ ("pause");
172 255
    }
173
    msg = sim->msgQueue.peekFirst();
256
    msg = sim->msgQueue.removeFirst();
174 257

  
175 258
    //
176 259
    // If there is a msg now, we return it:
......
187 270
    throw cTerminationException(eENDEDOK);
188 271

  
189 272
#else
190

  
191
    //TODO: Optimize to save state when the IES has to be emptied
192

  
193
    cMessage *msg;
194
    cBarrierMessage* barrier;
195
    cSimpleModule* mod;
196
    cAsyncModule* aMod;
273
    cBarrierMessage* barrier = NULL;
197 274
    /*
198 275
     * Fill up independent event heap
199 276
     */
......
245 322
            cBarrierMessage* barrier = new cBarrierMessage();
246 323
            barrier->setArrival(aMod, -1, now + duration);
247 324
            msg->setBarrier(barrier);
248
            // insert user supplied message in task queue.
249 325
            sim->msgQueue.insert(barrier);
250 326

  
251 327
            printf(
252
                    "adding to IEH: %s, tend= %f, now First in IEH: ",
253
                    ((cSimpleModule*) sim->getModule(msg->getArrivalModuleId()))->getName(),
254
                    SIMTIME_DBL(msg->getTend()));
328
                     "adding to IEH: %s, tstart=%f, tend= %f, now First in IEH: ", SIMTIME_DBL(msg->getArrivalTime()),
329
                     ((cSimpleModule*) sim->getModule(msg->getArrivalModuleId()))->getName(),
330
                     SIMTIME_DBL(msg->getTend()));
255 331

  
256 332
            independentEventsHeap.insert(msg);
257 333

  
src/sim/csimulation.cc
641 641

  
642 642
    try
643 643
    {
644
#ifdef NOBARRIER
645
        //TODO move to scheduler?
646
        // Do we have to wait for a barrier?
647
        if(threaded)
648
        {
649
            threadPool->waitAtBarrier(&msgQueue);
650
        }
651
#endif
652

  
653 644
        msg = schedulerp->getNextEvent();
654 645

  
655 646
        //Advance Simulation Time
......
670 661
            // in parallel
671 662
            cAsyncModule* aMod = (cAsyncModule*) mod;
672 663

  
673
            simtime_t duration = aMod->getProcessingDelay(msg);
664
            simtime_t duration = msg->getEventDuration();
674 665
            if (duration < SimTime::simTimeZero)
675 666
            {
676 667
                throw cRuntimeError("negative event duration not allowed.");
677 668
            }
678
            msg->setEventDuration(duration);
679 669

  
680 670
            bool mayPar = aMod->mayParallelize(msg, duration);
681 671
            // execute this event in parallel
src/sim/cspinningthreadpool.cc
44 44
}
45 45

  
46 46
cSpinningThreadPool::cSpinningThreadPool() :
47
    useMainThread(false), barrierMin(0)
47
    useMainThread(false), barrierMin(-1)
48 48
{
49 49
    AO_store(&cancel, 0);
50 50
}
......
207 207
    } while (true);
208 208
}
209 209
#undef BARRIERVALID
210

  
210
/*
211
 * TODO: also update barrierMin if not vaild anymore
212
 */
211 213
bool cSpinningThreadPool::isBeforeBarrier(cMessage* msg) {
212
    return (barrierMin >= 0 && threadStates[barrierMin*SPACING] < *msg);
214
    return (barrierMin >=0 && threadStates[barrierMin*SPACING] < *msg);
215
    /*
216
    if (barrierMin >=0) {
217
        printf("barriertime=%f ",SIMTIME_DBL(threadStates[barrierMin*SPACING].barrierTime));
218
        return (threadStates[barrierMin*SPACING] < *msg);
219
    }
220
    return false; //is this correct?*/
213 221
}
214 222

  
215 223
Register_PerRunConfigOption(CFGID_SPINNING_THREADPOOL_THREAD_POOL_SIZE, "thread-pool-size", CFG_INT, "5", "Number of worker threads");

Also available in: Unified diff