diff --git a/fastdeploy/inter_communicator/engine_worker_queue.py b/fastdeploy/inter_communicator/engine_worker_queue.py index 7e3f88486c8..b64fcacda33 100644 --- a/fastdeploy/inter_communicator/engine_worker_queue.py +++ b/fastdeploy/inter_communicator/engine_worker_queue.py @@ -837,6 +837,10 @@ def clear_data(self): self.lock.acquire() self.tasks[:] = list() self.client_read_flag[:] = [1] * self.num_client + if self.is_single_node: + self.exist_tasks_intra_signal.value[0] = 0 + else: + self.exist_tasks_inter_signal.set(0) self.lock.release() llm_logger.info("clear data for engine worker queue") diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index efe2fa6a344..8182e06990b 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -567,10 +567,10 @@ def event_loop_normal(self) -> None: self.model_weights_status.value[0] = ( ModelWeightsStatus.UPDATING ) # 所有 Rank 已同步唤醒,启动权重更新流程 - continue + continue if self.exist_task_signal.value[0] == ExistTaskStatus.EXIST or self.task_queue.read_finish_flag.get() == 1: - logger.info(f"Rank: {self.local_rank} Detected new requests.") + logger.debug(f"Rank: {self.local_rank} Detected new requests.") self.engine_forward_signal.value[0] = 1 tasks, read_finish = self.task_queue.get_tasks() # Only one of all tp_size client will get read_finish == True.