@@ -136,7 +136,9 @@ void KafkaAdapterManager::setConfProperties( RdKafka::Conf * conf, const Diction
136136void KafkaAdapterManager::forceShutdown ( const std::string & err )
137137{
138138 m_unrecoverableError = true ; // So we can alert the producer to stop trying to flush
139- forceConsumerReplayComplete ();
139+ // Force all adapters replay complete so they dont stay blocked
140+ for ( auto &[_,topicData] : m_topics )
141+ topicData.markReplayComplete ();
140142 try
141143 {
142144 CSP_THROW ( RuntimeException, " Kafka fatal error. " + err );
@@ -147,12 +149,6 @@ void KafkaAdapterManager::forceShutdown( const std::string & err )
147149 }
148150}
149151
150- void KafkaAdapterManager::forceConsumerReplayComplete ()
151- {
152- for ( auto & consumer : m_consumerVector )
153- consumer -> forceReplayCompleted ();
154- }
155-
156152void KafkaAdapterManager::start ( DateTime starttime, DateTime endtime )
157153{
158154 std::string errstr;
@@ -166,6 +162,14 @@ void KafkaAdapterManager::start( DateTime starttime, DateTime endtime )
166162 }
167163 }
168164
165+ // wildcard subscription has no guarantee of being in order
166+ // we flag replay complete as soon as we identify it.
167+ for ( auto &[_,topicData] : m_topics )
168+ {
169+ if ( topicData.wildcardSubscriber )
170+ topicData.wildcardSubscriber -> flagReplayComplete ();
171+ }
172+
169173 // start all consumers
170174 for ( auto & it : m_consumerVector )
171175 it -> start ( starttime );
@@ -244,6 +248,95 @@ void KafkaAdapterManager::pollProducers()
244248 }
245249}
246250
251+ void KafkaAdapterManager::onMessage ( RdKafka::Message * msg ) const
252+ {
253+ auto topicIt = m_topics.find ( msg -> topic_name () );
254+ if ( topicIt == m_topics.end () )
255+ {
256+ std::string errmsg = " KafkaAdapterManager: Message received on unknown topic: " + msg -> topic_name () +
257+ " errcode: " + RdKafka::err2str ( msg -> err () ) + " error: " + msg -> errstr ();
258+ pushStatus ( StatusLevel::ERROR, KafkaStatusMessageType::MSG_RECV_ERROR, errmsg );
259+ return ;
260+ }
261+ auto & topicData = topicIt -> second;
262+
263+ if ( !msg -> key () )
264+ {
265+ std::string errmsg = " KafkaAdapterManager: Message received with null key on topic " + msg -> topic_name () + " ." ;
266+ pushStatus ( StatusLevel::ERROR, KafkaStatusMessageType::MSG_RECV_ERROR, errmsg );
267+ return ;
268+ }
269+
270+ auto subscribersIt = topicData.subscribers .find ( *msg -> key () );
271+ if ( subscribersIt != topicData.subscribers .end () )
272+ {
273+ bool live = topicData.flaggedReplayComplete ;
274+ for ( auto it : subscribersIt -> second )
275+ it -> onMessage ( msg, live );
276+ }
277+
278+ // Note we always have to tick wildcard as live because it can get messages from multiple
279+ // partitions, some which may have done replaying and some not ( not to mention that data can be out of order )
280+ if ( topicData.wildcardSubscriber )
281+ topicData.wildcardSubscriber -> onMessage ( msg, true );
282+ }
283+
284+ // Called from individual consumers once that partitions they are servicing for a given topic have all hit EOF
285+ void KafkaAdapterManager::markConsumerReplayDone ( KafkaConsumer * consumer, const std::string & topic )
286+ {
287+ auto topicIt = m_topics.find ( topic );
288+ assert ( topicIt != m_topics.end () );
289+ if ( topicIt == m_topics.end () )
290+ return ;
291+
292+ topicIt -> second.markConsumerReplayDone ( consumer );
293+ }
294+
295+ /* ** TopicData ***/
296+ void KafkaAdapterManager::TopicData::addSubscriber ( KafkaConsumer * consumer, const std::string & key, KafkaSubscriber * subscriber )
297+ {
298+ consumers.emplace ( consumer, false );
299+ if ( key.empty () )
300+ {
301+ assert ( wildcardSubscriber == nullptr );
302+ wildcardSubscriber = subscriber;
303+ }
304+ else
305+ subscribers[key].emplace_back ( subscriber );
306+ }
307+
308+ void KafkaAdapterManager::TopicData::markConsumerReplayDone ( KafkaConsumer * consumer )
309+ {
310+ auto it = consumers.find ( consumer );
311+ assert ( it != consumers.end () );
312+ it -> second = true ;
313+
314+ for ( auto [_,done] : consumers )
315+ {
316+ if ( !done )
317+ return ;
318+ }
319+
320+ // All consumer partitions for the given topic are done replaying
321+ markReplayComplete ();
322+ }
323+
324+ void KafkaAdapterManager::TopicData::markReplayComplete ()
325+ {
326+ // this can be called from multiple consumer threads
327+ bool prevVal = flaggedReplayComplete.exchange ( true );
328+ if ( !prevVal )
329+ {
330+ // Flag all regular subscribers
331+ for ( auto & subscriberEntry : subscribers )
332+ {
333+ for ( auto * subscriber : subscriberEntry.second )
334+ subscriber -> flagReplayComplete ();
335+ }
336+ }
337+ }
338+ /* ** end TopicData ***/
339+
247340PushInputAdapter * KafkaAdapterManager::getInputAdapter ( CspTypePtr & type, PushMode pushMode, const Dictionary & properties )
248341{
249342 std::string topic = properties.get <std::string>( " topic" );
@@ -274,25 +367,16 @@ OutputAdapter * KafkaAdapterManager::getOutputAdapter( CspTypePtr & type, const
274367 }
275368}
276369
277- KafkaConsumer * KafkaAdapterManager::getConsumer ( const std::string & topic, const Dictionary & properties )
370+ KafkaConsumer * KafkaAdapterManager::getConsumer ( const Dictionary & properties )
278371{
279- // If we have seen this topic before, look up the consumer for it in the map
280- // Otherwise, make a new consumer (and insert it into the map)
281- // If we have reached m_maxThreads, then round-robin the topic onto a consumer (and insert it into the map)
282- if ( m_consumerMap.find ( topic ) != m_consumerMap.end () )
283- {
284- return m_consumerMap[ topic ].get ();
285- }
286372 if ( m_consumerVector.size () < m_maxThreads )
287373 {
288374 auto consumer = std::make_shared<KafkaConsumer>( this , properties );
289375 m_consumerVector.emplace_back ( consumer );
290- m_consumerMap.emplace ( topic, consumer );
291- return m_consumerMap[ topic ].get ();
376+ return consumer.get ();
292377 }
293378
294379 auto consumer = m_consumerVector[ m_consumerIdx++ ];
295- m_consumerMap.emplace ( topic, consumer );
296380 if ( m_consumerIdx >= m_maxThreads )
297381 m_consumerIdx = 0 ;
298382 return consumer.get ();
@@ -308,7 +392,9 @@ KafkaSubscriber * KafkaAdapterManager::getSubscriber( const std::string & topic,
308392 std::unique_ptr<KafkaSubscriber> subscriber ( new KafkaSubscriber ( this , properties ) );
309393 rv.first -> second = std::move ( subscriber );
310394
311- this -> getConsumer ( topic, properties ) -> addSubscriber ( topic, key, rv.first -> second.get () );
395+ auto * consumer = this -> getConsumer ( properties );
396+ consumer -> addTopic ( topic );
397+ m_topics[ topic ].addSubscriber ( consumer, key, rv.first -> second.get () );
312398 }
313399
314400 return rv.first -> second.get ();
0 commit comments