Skip to content

Commit 5695b7d

Browse files
committed
csp event loop, async bridges
Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>
1 parent 162f650 commit 5695b7d

34 files changed

Lines changed: 11106 additions & 38 deletions

cpp/csp/engine/RootEngine.cpp

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ RootEngine::RootEngine( const Dictionary & settings ) : Engine( m_cycleStepTable
7575
m_cycleCount( 0 ),
7676
m_settings( settings ),
7777
m_inRealtime( false ),
78+
m_stepping( false ),
7879
m_initSignalCount( g_SIGNAL_COUNT ),
7980
m_pushEventQueue( m_settings.queueWaitTime > TimeDelta::ZERO() )
8081
{
@@ -222,7 +223,7 @@ void RootEngine::runRealtime( DateTime end )
222223

223224
for( auto * group : dirtyGroups )
224225
group -> state = PushGroup::NONE;
225-
226+
226227
dirtyGroups.clear();
227228
haveEvents = false;
228229
}
@@ -289,6 +290,144 @@ void RootEngine::shutdown( std::exception_ptr except_ptr )
289290
m_exception_ptr = except_ptr;
290291
}
291292

293+
void RootEngine::startStepping( DateTime start, DateTime end )
294+
{
295+
if( m_stepping )
296+
CSP_THROW( RuntimeException, "Engine is already in stepping mode" );
297+
298+
preRun( start, end );
299+
300+
m_exception_mutex.lock();
301+
if( m_state != State::SHUTDOWN )
302+
m_state = State::RUNNING;
303+
m_exception_mutex.unlock();
304+
305+
m_stepping = true;
306+
m_inRealtime = m_settings.realtime;
307+
m_stepDirtyGroups.clear();
308+
}
309+
310+
bool RootEngine::step( TimeDelta maxWait )
311+
{
312+
if( !m_stepping )
313+
CSP_THROW( RuntimeException, "Engine is not in stepping mode. Call startStepping() first." );
314+
315+
if( m_state != State::RUNNING || interrupted() )
316+
return false;
317+
318+
// Check if we've passed the end time
319+
if( m_now > m_endTime )
320+
return false;
321+
322+
bool hasWork = false;
323+
324+
if( m_inRealtime )
325+
{
326+
// Realtime mode: check for push events and timers
327+
TimeDelta waitTime = maxWait;
328+
if( waitTime == TimeDelta::NONE() )
329+
waitTime = TimeDelta::ZERO();
330+
331+
if( !m_pendingPushEvents.hasEvents() )
332+
{
333+
DateTime now = DateTime::now();
334+
if( m_scheduler.hasEvents() )
335+
waitTime = std::min( m_scheduler.nextTime() - now, waitTime );
336+
}
337+
338+
// Don't block in step mode - just check for events
339+
bool haveEvents = m_pushEventQueue.wait( waitTime );
340+
341+
m_now = DateTime::now();
342+
if( m_now > m_endTime )
343+
{
344+
m_now = m_endTime;
345+
return false;
346+
}
347+
348+
++m_cycleCount;
349+
350+
// Execute timers that are ready
351+
if( m_scheduler.hasEvents() && m_scheduler.nextTime() <= m_now )
352+
{
353+
DateTime timerTime = m_scheduler.nextTime();
354+
m_now = timerTime;
355+
m_scheduler.executeNextEvents( m_now );
356+
hasWork = true;
357+
}
358+
else if( haveEvents )
359+
{
360+
// Process push events
361+
PushEvent * events = m_pushEventQueue.popAll();
362+
processPendingPushEvents( m_stepDirtyGroups );
363+
processPushEventQueue( events, m_stepDirtyGroups );
364+
365+
for( auto * group : m_stepDirtyGroups )
366+
group -> state = PushGroup::NONE;
367+
m_stepDirtyGroups.clear();
368+
hasWork = true;
369+
}
370+
}
371+
else
372+
{
373+
// Sim mode: process next scheduled event
374+
if( m_scheduler.hasEvents() )
375+
{
376+
m_now = m_scheduler.nextTime();
377+
if( m_now <= m_endTime )
378+
{
379+
++m_cycleCount;
380+
m_scheduler.executeNextEvents( m_now );
381+
hasWork = true;
382+
}
383+
else
384+
{
385+
m_now = m_endTime;
386+
}
387+
}
388+
}
389+
390+
if( hasWork )
391+
{
392+
m_cycleStepTable.executeCycle( m_profiler.get() );
393+
processEndCycle();
394+
}
395+
396+
// Return true if there's more work to do
397+
return m_state == State::RUNNING && !interrupted() &&
398+
( m_scheduler.hasEvents() || m_pendingPushEvents.hasEvents() );
399+
}
400+
401+
void RootEngine::stopStepping()
402+
{
403+
if( !m_stepping )
404+
return;
405+
406+
try
407+
{
408+
postRun();
409+
}
410+
catch( ... )
411+
{
412+
if( !m_exception_ptr )
413+
m_exception_ptr = std::current_exception();
414+
}
415+
416+
m_state = State::DONE;
417+
m_stepping = false;
418+
m_stepDirtyGroups.clear();
419+
420+
if( m_exception_ptr )
421+
std::rethrow_exception( m_exception_ptr );
422+
}
423+
424+
DateTime RootEngine::nextScheduledTime()
425+
{
426+
if( m_scheduler.hasEvents() )
427+
return m_scheduler.nextTime();
428+
return DateTime::NONE();
429+
}
430+
292431
DictionaryPtr RootEngine::engine_stats() const
293432
{
294433
if( !m_profiler )

cpp/csp/engine/RootEngine.h

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class EndCycleListener
2525
public:
2626
virtual ~EndCycleListener() {};
2727
virtual void onEndCycle() = 0;
28-
28+
2929
bool isDirty() const { return m_dirty; }
3030
void setDirtyFlag() { m_dirty = true; }
3131
void clearDirtyFlag() { m_dirty = false; }
@@ -57,6 +57,14 @@ class RootEngine : public Engine
5757
void shutdown();
5858
void shutdown( std::exception_ptr except );
5959

60+
// Step-based execution API for asyncio integration
61+
// Allows external event loops to drive CSP execution one step at a time
62+
void startStepping( DateTime start, DateTime end );
63+
bool step( TimeDelta maxWait = TimeDelta::NONE() ); // Returns true if more work pending
64+
void stopStepping();
65+
bool isStepping() const { return m_stepping; }
66+
DateTime nextScheduledTime(); // Returns next scheduled event time, or NONE if none
67+
6068
Scheduler::Handle reserveSchedulerHandle();
6169
Scheduler::Handle scheduleCallback( TimeDelta delta, Scheduler::Callback cb );
6270
Scheduler::Handle scheduleCallback( DateTime time, Scheduler::Callback cb );
@@ -90,7 +98,7 @@ class RootEngine : public Engine
9098
bool interrupted() const;
9199

92100
PushPullEventQueue & pushPullEventQueue() { return m_pushPullEventQueue; }
93-
101+
94102
protected:
95103
enum State { NONE, STARTING, RUNNING, SHUTDOWN, DONE };
96104
using EndCycleListeners = std::vector<EndCycleListener*>;
@@ -131,8 +139,12 @@ class RootEngine : public Engine
131139
PendingPushEvents m_pendingPushEvents;
132140
Settings m_settings;
133141
bool m_inRealtime;
142+
bool m_stepping; // True when in step-based execution mode
134143
int m_initSignalCount;
135144

145+
// For step-based execution
146+
std::vector<PushGroup *> m_stepDirtyGroups;
147+
136148
PushEventQueue m_pushEventQueue;
137149
//This queue is managed entirely from the PushPullInputAdapter
138150
PushPullEventQueue m_pushPullEventQueue;
@@ -168,7 +180,7 @@ inline Scheduler::Handle RootEngine::scheduleCallback( Scheduler::Handle reserve
168180
if( unlikely( time < m_now ) )
169181
CSP_THROW( ValueError, "Cannot schedule event in the past. new time: " << time << " now: " << m_now );
170182

171-
return m_scheduler.scheduleCallback( reservedHandle, time, std::move( cb ) );
183+
return m_scheduler.scheduleCallback( reservedHandle, time, std::move( cb ) );
172184
}
173185

174186
inline Scheduler::Handle RootEngine::rescheduleCallback( Scheduler::Handle id, csp::DateTime time )

cpp/csp/python/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ set(CSPIMPL_PUBLIC_HEADERS
3232
NumpyConversions.h
3333
NumpyInputAdapter.h
3434
PyAdapterManagerWrapper.h
35+
PyEventLoop.h
3536
PyBasketInputProxy.h
3637
PyBasketOutputProxy.h
3738
PyCppNode.h
@@ -56,6 +57,7 @@ add_library(cspimpl SHARED
5657
NumpyConversions.cpp
5758
PyAdapterManager.cpp
5859
PyAdapterManagerWrapper.cpp
60+
PyEventLoop.cpp
5961
PyConstAdapter.cpp
6062
PyCppNode.cpp
6163
PyEngine.cpp

cpp/csp/python/PyEngine.cpp

Lines changed: 89 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ static void PyEngine_dealloc( PyEngine * self )
9595
{
9696
CSP_BEGIN_METHOD;
9797
self -> ~PyEngine();
98-
Py_TYPE( self ) -> tp_free( self );
98+
Py_TYPE( self ) -> tp_free( self );
9999
CSP_RETURN;
100100
}
101101

@@ -105,7 +105,7 @@ static PyObject * PyEngine_run( PyEngine * self, PyObject * args )
105105

106106
PyObject * pyStart;
107107
PyObject * pyEnd;
108-
if( !PyArg_ParseTuple( args, "OO", &pyStart, &pyEnd ) )
108+
if( !PyArg_ParseTuple( args, "OO", &pyStart, &pyEnd ) )
109109
return nullptr;
110110

111111
auto start = fromPython<DateTime>( pyStart );
@@ -118,8 +118,94 @@ static PyObject * PyEngine_run( PyEngine * self, PyObject * args )
118118
CSP_RETURN_NONE;
119119
}
120120

121+
static PyObject * PyEngine_startStepping( PyEngine * self, PyObject * args )
122+
{
123+
CSP_BEGIN_METHOD;
124+
125+
PyObject * pyStart;
126+
PyObject * pyEnd;
127+
if( !PyArg_ParseTuple( args, "OO", &pyStart, &pyEnd ) )
128+
return nullptr;
129+
130+
auto start = fromPython<DateTime>( pyStart );
131+
auto end = fromPython<DateTime>( pyEnd );
132+
133+
CSP_TRUE_OR_THROW_RUNTIME( self -> engine() -> isRootEngine(), "engine is not root engine" );
134+
self -> rootEngine() -> startStepping( start, end );
135+
136+
Py_RETURN_NONE;
137+
CSP_RETURN_NONE;
138+
}
139+
140+
static PyObject * PyEngine_step( PyEngine * self, PyObject * args )
141+
{
142+
CSP_BEGIN_METHOD;
143+
144+
double maxWaitSeconds = 0.0;
145+
if( !PyArg_ParseTuple( args, "|d", &maxWaitSeconds ) )
146+
return nullptr;
147+
148+
CSP_TRUE_OR_THROW_RUNTIME( self -> engine() -> isRootEngine(), "engine is not root engine" );
149+
150+
TimeDelta maxWait = maxWaitSeconds > 0 ? TimeDelta::fromSeconds( maxWaitSeconds ) : TimeDelta::ZERO();
151+
bool hasMore = self -> rootEngine() -> step( maxWait );
152+
153+
return PyBool_FromLong( hasMore );
154+
CSP_RETURN_NONE;
155+
}
156+
157+
static PyObject * PyEngine_stopStepping( PyEngine * self, PyObject * args )
158+
{
159+
CSP_BEGIN_METHOD;
160+
161+
CSP_TRUE_OR_THROW_RUNTIME( self -> engine() -> isRootEngine(), "engine is not root engine" );
162+
self -> rootEngine() -> stopStepping();
163+
164+
return self -> collectOutputs();
165+
CSP_RETURN_NONE;
166+
}
167+
168+
static PyObject * PyEngine_isStepping( PyEngine * self, PyObject * args )
169+
{
170+
CSP_BEGIN_METHOD;
171+
172+
CSP_TRUE_OR_THROW_RUNTIME( self -> engine() -> isRootEngine(), "engine is not root engine" );
173+
bool stepping = self -> rootEngine() -> isStepping();
174+
175+
return PyBool_FromLong( stepping );
176+
CSP_RETURN_NONE;
177+
}
178+
179+
static PyObject * PyEngine_now( PyEngine * self, PyObject * args )
180+
{
181+
CSP_BEGIN_METHOD;
182+
183+
CSP_TRUE_OR_THROW_RUNTIME( self -> engine() -> isRootEngine(), "engine is not root engine" );
184+
DateTime now = self -> rootEngine() -> now();
185+
186+
return toPython( now );
187+
CSP_RETURN_NONE;
188+
}
189+
190+
static PyObject * PyEngine_nextScheduledTime( PyEngine * self, PyObject * args )
191+
{
192+
CSP_BEGIN_METHOD;
193+
194+
CSP_TRUE_OR_THROW_RUNTIME( self -> engine() -> isRootEngine(), "engine is not root engine" );
195+
DateTime nextTime = self -> rootEngine() -> nextScheduledTime();
196+
197+
return toPython( nextTime );
198+
CSP_RETURN_NONE;
199+
}
200+
121201
static PyMethodDef PyEngine_methods[] = {
122-
{ "run", (PyCFunction) PyEngine_run, METH_VARARGS, "start and run engine" },
202+
{ "run", ( PyCFunction ) PyEngine_run, METH_VARARGS, "start and run engine" },
203+
{ "start_stepping", ( PyCFunction ) PyEngine_startStepping, METH_VARARGS, "start engine in step mode" },
204+
{ "step", ( PyCFunction ) PyEngine_step, METH_VARARGS, "execute one step, returns True if more work pending" },
205+
{ "stop_stepping", ( PyCFunction ) PyEngine_stopStepping, METH_NOARGS, "stop stepping and cleanup" },
206+
{ "is_stepping", ( PyCFunction ) PyEngine_isStepping, METH_NOARGS, "check if in stepping mode" },
207+
{ "now", ( PyCFunction ) PyEngine_now, METH_NOARGS, "get current engine time" },
208+
{ "next_scheduled_time", ( PyCFunction ) PyEngine_nextScheduledTime, METH_NOARGS, "get next scheduled event time" },
123209
{ NULL }
124210
};
125211

0 commit comments

Comments
 (0)