From aa80f75557cf7dd3e385ee96b635b844ba135f4a Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Tue, 31 Mar 2026 08:49:52 +0000 Subject: [PATCH 1/4] [BugFix] reset exist tasks signal in clear_data --- fastdeploy/inter_communicator/engine_worker_queue.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fastdeploy/inter_communicator/engine_worker_queue.py b/fastdeploy/inter_communicator/engine_worker_queue.py index 7e3f88486c8..b64fcacda33 100644 --- a/fastdeploy/inter_communicator/engine_worker_queue.py +++ b/fastdeploy/inter_communicator/engine_worker_queue.py @@ -837,6 +837,10 @@ def clear_data(self): self.lock.acquire() self.tasks[:] = list() self.client_read_flag[:] = [1] * self.num_client + if self.is_single_node: + self.exist_tasks_intra_signal.value[0] = 0 + else: + self.exist_tasks_inter_signal.set(0) self.lock.release() llm_logger.info("clear data for engine worker queue") From 37a3ed902c601d9ccd86d576903faee24b1f536b Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Tue, 31 Mar 2026 10:30:42 +0000 Subject: [PATCH 2/4] [Fix] fix stale exist tasks signal after weight update --- fastdeploy/worker/worker_process.py | 1 + 1 file changed, 1 insertion(+) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index efe2fa6a344..60f61403983 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -549,6 +549,7 @@ def event_loop_normal(self) -> None: f"Rank: {self.local_rank} has updated parameters. {self.model_weights_status.value[0]}" ) self.model_weights_signal[0] = ModelWeightsStatus.NORMAL + continue elif self.model_weights_signal[0] == ModelWeightsStatus.CLEARING: logger.info( f"Rank: {self.local_rank} has cleared parameters. {self.model_weights_status.value[0]}" From 325ed2e618d644c05feda1b69d70de8bd6bcd345 Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Tue, 31 Mar 2026 10:34:36 +0000 Subject: [PATCH 3/4] [Chore] downgrade detected new requests log to DEBUG level --- fastdeploy/worker/worker_process.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 60f61403983..13ab8c25d30 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -571,7 +571,7 @@ def event_loop_normal(self) -> None: continue if self.exist_task_signal.value[0] == ExistTaskStatus.EXIST or self.task_queue.read_finish_flag.get() == 1: - logger.info(f"Rank: {self.local_rank} Detected new requests.") + logger.debug(f"Rank: {self.local_rank} Detected new requests.") self.engine_forward_signal.value[0] = 1 tasks, read_finish = self.task_queue.get_tasks() # Only one of all tp_size client will get read_finish == True. From c2c57a42de534ed0ce3dcb13a9017cfc1947a61b Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Tue, 31 Mar 2026 11:23:35 +0000 Subject: [PATCH 4/4] [fix] adjust continue place --- fastdeploy/worker/worker_process.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 13ab8c25d30..8182e06990b 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -549,7 +549,6 @@ def event_loop_normal(self) -> None: f"Rank: {self.local_rank} has updated parameters. {self.model_weights_status.value[0]}" ) self.model_weights_signal[0] = ModelWeightsStatus.NORMAL - continue elif self.model_weights_signal[0] == ModelWeightsStatus.CLEARING: logger.info( f"Rank: {self.local_rank} has cleared parameters. {self.model_weights_status.value[0]}" @@ -568,7 +567,7 @@ def event_loop_normal(self) -> None: self.model_weights_status.value[0] = ( ModelWeightsStatus.UPDATING ) # 所有 Rank 已同步唤醒,启动权重更新流程 - continue + continue if self.exist_task_signal.value[0] == ExistTaskStatus.EXIST or self.task_queue.read_finish_flag.get() == 1: logger.debug(f"Rank: {self.local_rank} Detected new requests.")