Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 32 additions & 32 deletions contrib/tiflash-columnar-hub/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 40 additions & 21 deletions contrib/tiflash-columnar-hub/hub-runtime/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,7 @@ pub unsafe fn run_proxy(argc: c_int, argv: *const *const c_char, helper_ptr: *co
})
.unwrap_or_default();
let status_config_json = build_status_config_json(&config);
let mut store_registration: Option<(metapb::Store, u32)> = None;
let mut heartbeat_context: Option<(u64, u32)> = None;
let heartbeat_shutdown = Arc::new(AtomicBool::new(false));
let mut heartbeat_handle: Option<thread::JoinHandle<()>> = None;
Expand All @@ -1196,13 +1197,7 @@ pub unsafe fn run_proxy(argc: c_int, argv: *const *const c_char, helper_ptr: *co
let store_id = ensure_store_id(pd_client.as_ref(), &data_dir);
let store = build_store(&config, store_id);
let start_time = store.get_start_timestamp() as u32;
pd_client.put_store(store).unwrap_or_else(|err| {
panic!(
"failed to register TiFlash Columnar Hub store {} to PD: {}",
store_id, err
)
});
heartbeat_context = Some((store_id, start_time));
store_registration = Some((store, start_time));
}

let cloud_helper = CloudHelper::new(
Expand Down Expand Up @@ -1246,26 +1241,42 @@ pub unsafe fn run_proxy(argc: c_int, argv: *const *const c_char, helper_ptr: *co
Some(server)
};

while matches!(
helper.handle_get_engine_store_server_status(),
EngineStoreServerStatus::Idle
) {
info!("wait for engine-store server to start");
let mut engine_store_status = helper.handle_get_engine_store_server_status();
while matches!(engine_store_status, EngineStoreServerStatus::Idle) {
thread::sleep(Duration::from_millis(200));
engine_store_status = helper.handle_get_engine_store_server_status();
}

hub.set_status(RaftProxyStatus::Running);
if let Some((store_id, start_time)) = heartbeat_context {
heartbeat_handle = Some(spawn_store_heartbeat_loop(
pd_client.clone(),
store_id,
start_time,
data_dir.clone(),
heartbeat_shutdown.clone(),
));
if matches!(engine_store_status, EngineStoreServerStatus::Running) {
info!("engine-store server is running");

// Only register the store and start the heartbeat loop after the engine-store server is running. This ensures that TiFlash does not register itself before the PD successfully bootstrapped the cluster with init TiKV servers.
if let Some((store, start_time)) = store_registration.take() {
let store_id = store.get_id();
pd_client.put_store(store).unwrap_or_else(|err| {
panic!(
"failed to register TiFlash Columnar Hub store {} to PD: {}",
store_id, err
)
});
heartbeat_context = Some((store_id, start_time));
}

hub.set_status(RaftProxyStatus::Running);
if let Some((store_id, start_time)) = heartbeat_context {
heartbeat_handle = Some(spawn_store_heartbeat_loop(
pd_client.clone(),
store_id,
start_time,
data_dir.clone(),
heartbeat_shutdown.clone(),
));
}
}

loop {
match helper.handle_get_engine_store_server_status() {
match engine_store_status {
EngineStoreServerStatus::Running => thread::sleep(Duration::from_millis(200)),
EngineStoreServerStatus::Stopping => {
hub.set_status(RaftProxyStatus::Stopped);
Expand All @@ -1284,14 +1295,22 @@ pub unsafe fn run_proxy(argc: c_int, argv: *const *const c_char, helper_ptr: *co
}
EngineStoreServerStatus::Idle => thread::sleep(Duration::from_millis(200)),
}
engine_store_status = helper.handle_get_engine_store_server_status();
}

info!(
"found engine-store server status is {:?}, start to stop all services in columnar hub",
engine_store_status
);

if let Some(handle) = heartbeat_handle.take() {
let _ = handle.join();
}
if let Some(server) = status_server.take() {
server.stop();
}

info!("TiFlash Columnar Hub has been stopped");
}

#[cfg(test)]
Expand Down