@@ -52,6 +52,7 @@ Stream::Stream()
5252 , _remote_consumed(0 )
5353 , _cur_buf_size(0 )
5454 , _local_consumed(0 )
55+ , _atomic_local_consumed(0 )
5556 , _parse_rpc_response(false )
5657 , _pending_buf(NULL )
5758 , _start_idle_timer_us(0 )
@@ -287,14 +288,21 @@ void Stream::SetConnected(const StreamSettings* remote_settings) {
287288 CHECK (_host_socket != NULL );
288289 RPC_VLOG << " stream=" << id () << " is connected to stream_id="
289290 << _remote_settings.stream_id () << " at host_socket=" << *_host_socket;
290- _connected = true ;
291+ _connected. store ( true , butil::memory_order_release) ;
291292 _connect_meta.ec = 0 ;
292293 TriggerOnConnectIfNeed ();
293294 if (remote_settings == NULL ) {
294295 // Start the timer at server-side
295296 // Client-side timer would triggered in Consume after received the first
296297 // message which is the very RPC response
297298 StartIdleTimer ();
299+ } else {
300+ // send first feedback for client-side stream if it already consumed data
301+ if (_remote_settings.need_feedback ()) {
302+ auto consumed_bytes = _atomic_local_consumed.load (butil::memory_order_acquire);
303+ if (consumed_bytes > 0 )
304+ SendFeedback (consumed_bytes);
305+ }
298306 }
299307}
300308
@@ -620,20 +628,34 @@ int Stream::Consume(void *meta, bthread::TaskIterator<butil::IOBuf*>& iter) {
620628 }
621629 mb.flush ();
622630
623- if (s->_remote_settings .need_feedback () && mb.total_length () > 0 ) {
624- s->_local_consumed += mb.total_length ();
625- s->SendFeedback ();
631+ auto total_length = mb.total_length ();
632+ if (total_length > 0 ) {
633+ // fast path for connected stream
634+ if (s->_connected .load (butil::memory_order_acquire)){
635+ if (s->_remote_settings .need_feedback ()) {
636+ s->_local_consumed += total_length;
637+ s->SendFeedback (s->_local_consumed );
638+ }
639+ } else {
640+ // Under the scenario of batch creation of Streams, there is concurrency between SetConnected and Consume for the same stream,
641+ // and it is necessary to ensure the memory order.
642+ s->_local_consumed = s->_atomic_local_consumed .fetch_add (total_length, butil::memory_order_release) + total_length;
643+ if (s->_connected .load (butil::memory_order_acquire) && s->_remote_settings .need_feedback ()) {
644+ s->SendFeedback (s->_local_consumed );
645+ }
646+ }
626647 }
648+
627649 s->StartIdleTimer ();
628650 return 0 ;
629651}
630652
631- void Stream::SendFeedback () {
653+ void Stream::SendFeedback (int64_t _consumed_bytes ) {
632654 StreamFrameMeta fm;
633655 fm.set_frame_type (FRAME_TYPE_FEEDBACK);
634656 fm.set_stream_id (_remote_settings.stream_id ());
635657 fm.set_source_stream_id (id ());
636- fm.mutable_feedback ()->set_consumed_size (_local_consumed );
658+ fm.mutable_feedback ()->set_consumed_size (_consumed_bytes );
637659 butil::IOBuf out;
638660 policy::PackStreamMessage (&out, fm, NULL );
639661 WriteToHostSocket (&out);
0 commit comments