Skip to content

Commit 07576a6

Browse files
committed
Merge branch 'main' into rba/kafka_optim
2 parents 95dbfc8 + 2958468 commit 07576a6

15 files changed

Lines changed: 206 additions & 92 deletions

.github/workflows/build.yml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ jobs:
7272
FULL_RUN: ${{ steps.setuppush.outputs.FULL_RUN || steps.setuppr.outputs.FULL_RUN || steps.setupmanual.outputs.FULL_RUN || steps.setupschedule.outputs.FULL_RUN }}
7373
steps:
7474
- name: Checkout
75-
uses: actions/checkout@v5
75+
uses: actions/checkout@v6
7676
with:
7777
# for pull_request so we can do HEAD^2
7878
fetch-depth: 2
@@ -159,7 +159,7 @@ jobs:
159159

160160
steps:
161161
- name: Checkout
162-
uses: actions/checkout@v5
162+
uses: actions/checkout@v6
163163
with:
164164
submodules: recursive
165165

@@ -239,7 +239,7 @@ jobs:
239239

240240
steps:
241241
- name: Checkout
242-
uses: actions/checkout@v5
242+
uses: actions/checkout@v6
243243
with:
244244
submodules: recursive
245245
fetch-depth: 0
@@ -358,7 +358,7 @@ jobs:
358358

359359
steps:
360360
- name: Checkout
361-
uses: actions/checkout@v5
361+
uses: actions/checkout@v6
362362
with:
363363
submodules: recursive
364364

@@ -448,7 +448,7 @@ jobs:
448448

449449
steps:
450450
- name: Checkout
451-
uses: actions/checkout@v5
451+
uses: actions/checkout@v6
452452
with:
453453
submodules: recursive
454454

@@ -554,7 +554,7 @@ jobs:
554554

555555
steps:
556556
- name: Checkout
557-
uses: actions/checkout@v5
557+
uses: actions/checkout@v6
558558
with:
559559
submodules: recursive
560560

@@ -635,7 +635,7 @@ jobs:
635635

636636
steps:
637637
- name: Checkout
638-
uses: actions/checkout@v5
638+
uses: actions/checkout@v6
639639
with:
640640
submodules: recursive
641641
fetch-depth: 0

.github/workflows/conda.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ jobs:
7676
runs-on: ${{ matrix.os }}
7777
steps:
7878
- name: Checkout
79-
uses: actions/checkout@v5
79+
uses: actions/checkout@v6
8080

8181
- uses: mamba-org/setup-micromamba@v2
8282
with:

.github/workflows/wiki-lint.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ jobs:
3030

3131
steps:
3232
- name: Checkout
33-
uses: actions/checkout@v5
33+
uses: actions/checkout@v6
3434
with:
3535
submodules: recursive
3636

.github/workflows/wiki-publish.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ jobs:
2121
runs-on: ubuntu-latest
2222
steps:
2323
- name: Checkout
24-
uses: actions/checkout@v5
24+
uses: actions/checkout@v6
2525

2626
- name: Upload Documentation to Wiki
2727
uses: Andrew-Chen-Wang/github-wiki-action@v5

cpp/csp/adapters/kafka/KafkaInputAdapter.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ void KafkaInputAdapter::processMessage( RdKafka::Message* message, bool live, cs
9191
if( ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE )
9292
msgTime = DateTime::fromMilliseconds( ts.timestamp );
9393

94-
if( type() -> type() == CspType::Type::STRUCT )
94+
if( dataType() -> type() == CspType::Type::STRUCT )
9595
{
9696
auto tick = m_converter -> asStruct( message -> payload(), message -> len() );
9797

@@ -117,7 +117,7 @@ void KafkaInputAdapter::processMessage( RdKafka::Message* message, bool live, cs
117117
if( shouldProcessMessage( pushLive, msgTime ) )
118118
pushTick(pushLive, msgTime, std::move(tick), batch);
119119
}
120-
else if( type() -> type() == CspType::Type::STRING )
120+
else if( dataType() -> type() == CspType::Type::STRING )
121121
{
122122
bool pushLive = shouldPushLive(live, msgTime);
123123
if( shouldProcessMessage( pushLive, msgTime ) )

cpp/csp/engine/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ set(ENGINE_PUBLIC_HEADERS
4141
OutputAdapter.h
4242
PendingPushEvents.h
4343
Profiler.h
44+
PushPullEvent.h
4445
PushEvent.h
4546
PullInputAdapter.h
4647
PushInputAdapter.h

cpp/csp/engine/PushPullEvent.h

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#ifndef _IN_CSP_ENGINE_PUSHPULLEVENT_H
2+
#define _IN_CSP_ENGINE_PUSHPULLEVENT_H
3+
4+
namespace csp
5+
{
6+
7+
class PushPullInputAdapter;
8+
9+
struct PushPullEvent
10+
{
11+
PushPullEvent( PushPullInputAdapter *adapter_, DateTime time_ ) : time( time_ ),
12+
adapter( adapter_ ),
13+
next( nullptr )
14+
{}
15+
16+
DateTime time;
17+
PushPullInputAdapter * adapter;
18+
PushPullEvent * next;
19+
};
20+
21+
template<typename T>
22+
struct TypedPushPullEvent : public PushPullEvent
23+
{
24+
TypedPushPullEvent( PushPullInputAdapter *adapter, DateTime time,
25+
T &&d ) : PushPullEvent( adapter, time ),
26+
data( std::forward<T>( d ) )
27+
{}
28+
29+
typename std::remove_reference<T>::type data;
30+
};
31+
32+
}
33+
34+
#endif

cpp/csp/engine/PushPullInputAdapter.cpp

Lines changed: 57 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,51 +6,66 @@ PushPullInputAdapter::PushPullInputAdapter( Engine *engine, CspTypePtr &type, Pu
66
PushGroup *group, bool adjustOutOfOrderTime )
77
: PushInputAdapter(engine, type, pushMode, group),
88
m_nextPullEvent(nullptr),
9+
m_tailEvent(nullptr),
910
m_notifiedEndOfPull(false),
1011
m_adjustOutOfOrderTime(adjustOutOfOrderTime)
12+
{
13+
}
14+
15+
PushPullInputAdapter::~PushPullInputAdapter()
1116
{
1217
//free up any unused events
13-
while( m_nextPullEvent )
18+
PushPullEvent * event = nextPullEvent();
19+
while( event )
1420
{
15-
delete m_nextPullEvent;
16-
m_nextPullEvent = nextPullEvent();
21+
delete event;
22+
event = nextPullEvent();
1723
}
1824
}
1925

2026
void PushPullInputAdapter::start( DateTime start, DateTime end )
2127
{
22-
m_nextPullEvent = nextPullEvent();
23-
if( m_nextPullEvent )
24-
{
25-
m_timerHandle = rootEngine() -> scheduleCallback( m_nextPullEvent -> time,
26-
[this]() { return processNextPullEvent() ? nullptr : this; } );
27-
}
28+
auto * nextEvent = nextPullEvent();
29+
if( nextEvent )
30+
scheduleNextPullEvent( nextEvent );
2831
}
2932

3033
void PushPullInputAdapter::stop()
3134
{
3235
rootEngine() -> cancelCallback( m_timerHandle );
3336
//shouldnt need to lock at this point
34-
m_threadQueue.emplace( nullptr );
37+
auto * replayCompleteEvent = new PushPullEvent( this, DateTime::NONE() );
38+
rootEngine() -> pushPullEventQueue().push( replayCompleteEvent );
39+
}
40+
41+
void PushPullInputAdapter::scheduleNextPullEvent( PushPullEvent * nextEvent )
42+
{
43+
//Note that we make nextEvent mutable in the lambda since we need to be able to update it in processNextPullEvent
44+
//which can return false to force a rescheduled re-attempt with a new event pointer
45+
m_timerHandle = rootEngine() -> scheduleCallback( nextEvent -> time,
46+
[this, nextEvent]() mutable
47+
{
48+
return processNextPullEvent( nextEvent ) ? nullptr : this;
49+
} );
3550
}
3651

37-
bool PushPullInputAdapter::processNextPullEvent()
52+
bool PushPullInputAdapter::processNextPullEvent( PushPullEvent *& nextEvent )
3853
{
3954
bool consumed = switchCspType( dataType(),
40-
[ this ]( auto tag )
55+
[ this, &nextEvent ]( auto tag )
4156
{
4257
using T = typename decltype(tag)::type;
43-
TypedPullDataEvent<T> *tevent = static_cast<TypedPullDataEvent<T> *>( m_nextPullEvent );
58+
TypedPushPullEvent<T> *tevent = static_cast<TypedPushPullEvent<T> *>( nextEvent );
4459

4560
bool consumed = consumeTick( tevent -> data );
4661
assert( consumed );
4762

4863
delete tevent;
4964

50-
while( ( m_nextPullEvent = nextPullEvent() ) &&
51-
m_nextPullEvent -> time == rootEngine() -> now() )
65+
while( ( nextEvent = nextPullEvent() ) &&
66+
nextEvent -> time == rootEngine() -> now() )
5267
{
53-
tevent = static_cast<TypedPullDataEvent<T> *>( m_nextPullEvent );
68+
tevent = static_cast<TypedPushPullEvent<T> *>( nextEvent );
5469
consumed = consumeTick( tevent -> data );
5570
if( !consumed )
5671
return false;
@@ -60,38 +75,41 @@ bool PushPullInputAdapter::processNextPullEvent()
6075
return true;
6176
} );
6277

63-
if( consumed && m_nextPullEvent )
64-
{
65-
m_timerHandle = rootEngine() -> scheduleCallback( m_nextPullEvent->time,
66-
[this]() { return processNextPullEvent() ? nullptr : this; } );
67-
}
78+
if( consumed && nextEvent )
79+
scheduleNextPullEvent( nextEvent );
6880

6981
return consumed;
7082
}
7183

72-
PushPullInputAdapter::PullDataEvent * PushPullInputAdapter::nextPullEvent()
84+
PushPullEvent * PushPullInputAdapter::nextPullEvent()
7385
{
74-
//spin while we wait for data
75-
while( m_poppedPullEvents.empty() )
86+
while( m_nextPullEvent == nullptr )
7687
{
77-
std::lock_guard<std::mutex> g( m_queueMutex );
78-
m_threadQueue.swap( m_poppedPullEvents );
88+
//Any PushPullInputAdapter instance can update events on any other adapter
89+
PushPullEvent * event = rootEngine() -> pushPullEventQueue().popAll();
90+
while( event )
91+
{
92+
PushPullEvent * next = event -> next;
93+
event -> adapter -> setNextPushPullEvent( event );
94+
event = next;
95+
}
7996
}
8097

81-
auto * event = m_poppedPullEvents.front();
82-
m_poppedPullEvents.pop();
98+
//DateTime of None is the sentinel value for replay complete
99+
if( m_nextPullEvent -> time.isNone() )
100+
return nullptr;
101+
102+
auto * event = m_nextPullEvent;
103+
m_nextPullEvent = m_nextPullEvent -> next;
83104

84-
if( event )
85-
{
86-
//Always force time before start to start. There are two possibilities:
87-
//- User asked to replay from EARLIEST, so they should get all ticks replayed and we cant replay before starttime
88-
//- User asked to replay from STARTTIME in which case, if the adapter is written correctly, we shouldnt get ticks before starttime
89-
if( unlikely( event -> time < rootEngine() -> startTime() ) )
90-
event -> time = rootEngine() -> startTime();
91-
92-
if( m_adjustOutOfOrderTime )
93-
event -> time = std::max( event -> time, rootEngine() -> now() );
94-
}
105+
//Always force time before start to start. There are two possibilities:
106+
//- User asked to replay from EARLIEST, so they should get all ticks replayed and we cant replay before starttime
107+
//- User asked to replay from STARTTIME in which case, if the adapter is written correctly, we shouldnt get ticks before starttime
108+
if( unlikely( event -> time < rootEngine() -> startTime() ) )
109+
event -> time = rootEngine() -> startTime();
110+
111+
if( m_adjustOutOfOrderTime )
112+
event -> time = std::max( event -> time, rootEngine() -> now() );
95113

96114
return event;
97115
}

0 commit comments

Comments
 (0)