Skip to content

Commit d735fad

Browse files
committed
WIP DEBUG
1 parent 13b2491 commit d735fad

4 files changed

Lines changed: 97 additions & 25 deletions

File tree

Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ bip21 = { version = "0.5", features = ["std"], default-features = false }
9191
base64 = { version = "0.22.1", default-features = false, features = ["std"] }
9292
rand = "0.8.5"
9393
chrono = { version = "0.4", default-features = false, features = ["clock"] }
94-
tokio = { version = "1.37", default-features = false, features = [ "rt-multi-thread", "time", "sync", "macros" ] }
94+
tokio = { version = "1.37", default-features = false, features = [ "rt-multi-thread", "time", "sync", "macros", "tracing"] }
95+
console-subscriber = "0.4.1"
9596
esplora-client = { version = "0.12", default-features = false, features = ["tokio", "async-https-rustls"] }
9697
electrum-client = { version = "0.24.0", default-features = false, features = ["proxy", "use-rustls-ring"] }
9798
libc = "0.2"
@@ -100,7 +101,8 @@ serde = { version = "1.0.210", default-features = false, features = ["std", "der
100101
serde_json = { version = "1.0.128", default-features = false, features = ["std"] }
101102
log = { version = "0.4.22", default-features = false, features = ["std"]}
102103

103-
vss-client = "0.3"
104+
#vss-client = "0.3"
105+
vss-client = { path = "../vss-rust-client" }
104106
prost = { version = "0.11.6", default-features = false}
105107

106108
[target.'cfg(windows)'.dependencies]

src/io/vss_store.rs

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,31 @@ impl KVStoreSync for VssStore {
103103
) -> io::Result<()> {
104104
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
105105
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
106-
let fut = self.inner.write_internal(
107-
inner_lock_ref,
108-
locking_key,
109-
version,
110-
primary_namespace,
111-
secondary_namespace,
112-
key,
113-
buf,
114-
);
115-
self.runtime.block_on(fut)
106+
let write_id: u64 = rand::random();
107+
println!("WRITE {} IS SYNC", write_id);
108+
let primary_namespace = primary_namespace.to_string();
109+
let secondary_namespace = secondary_namespace.to_string();
110+
let key = key.to_string();
111+
let inner = Arc::clone(&self.inner);
112+
let fut = async move {
113+
inner
114+
.write_internal(
115+
inner_lock_ref,
116+
locking_key,
117+
version,
118+
&primary_namespace,
119+
&secondary_namespace,
120+
&key,
121+
buf,
122+
write_id,
123+
)
124+
.await
125+
};
126+
// let spawned_fut = self.inner_runtime.spawn(fut);
127+
// self
128+
// .runtime
129+
// .block_on( async { spawned_fut.await.unwrap() })
130+
self.runtime.spawn_block_on(async { fut.await }, write_id)
116131
}
117132

118133
fn remove(
@@ -158,6 +173,8 @@ impl KVStore for VssStore {
158173
let secondary_namespace = secondary_namespace.to_string();
159174
let key = key.to_string();
160175
let inner = Arc::clone(&self.inner);
176+
let write_id: u64 = rand::random();
177+
println!("WRITE {} IS ASYNC", write_id);
161178
Box::pin(async move {
162179
inner
163180
.write_internal(
@@ -168,6 +185,7 @@ impl KVStore for VssStore {
168185
&secondary_namespace,
169186
&key,
170187
buf,
188+
write_id,
171189
)
172190
.await
173191
})
@@ -332,7 +350,7 @@ impl VssStoreInner {
332350

333351
async fn write_internal(
334352
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
335-
primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
353+
primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>, write_id: u64,
336354
) -> io::Result<()> {
337355
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
338356

@@ -345,21 +363,39 @@ impl VssStoreInner {
345363
store_id: self.store_id.clone(),
346364
global_version: None,
347365
transaction_items: vec![KeyValue {
348-
key: obfuscated_key,
366+
key: obfuscated_key.clone(),
349367
version: vss_version,
350368
value: storable.encode_to_vec(),
351369
}],
352370
delete_items: vec![],
353371
};
354372

355-
self.client.put_object(&request).await.map_err(|e| {
373+
println!(
374+
"WRITE {}: {}/{}/{} ({})",
375+
write_id, primary_namespace, secondary_namespace, key, obfuscated_key
376+
);
377+
let fut = self.client.put_object(&request);
378+
// let res =
379+
// tokio::time::timeout(Duration::from_secs(5), fut).await.unwrap().map_err(|e| {
380+
// let msg = format!(
381+
// "Failed to write to key {}/{}/{}: {}",
382+
// primary_namespace, secondary_namespace, key, e
383+
// );
384+
// Error::new(ErrorKind::Other, msg)
385+
// });
386+
let res = fut.await.map_err(|e| {
356387
let msg = format!(
357388
"Failed to write to key {}/{}/{}: {}",
358389
primary_namespace, secondary_namespace, key, e
359390
);
360391
Error::new(ErrorKind::Other, msg)
361-
})?;
392+
});
393+
println!(
394+
"WRITE DONE {}: {}/{}/{} ({})",
395+
write_id, primary_namespace, secondary_namespace, key, obfuscated_key
396+
);
362397

398+
res?;
363399
Ok(())
364400
})
365401
.await
@@ -417,7 +453,9 @@ impl VssStoreInner {
417453
callback: FN,
418454
) -> Result<(), lightning::io::Error> {
419455
let res = {
456+
println!("BEFORE TAKING THE LOCK");
420457
let mut last_written_version = inner_lock_ref.lock().await;
458+
println!("AFTER TAKING THE LOCK");
421459

422460
// Check if we already have a newer version written/removed. This is used in async contexts to realize eventual
423461
// consistency.

src/runtime.rs

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,16 @@ impl Runtime {
2929
let mode = match tokio::runtime::Handle::try_current() {
3030
Ok(handle) => RuntimeMode::Handle(handle),
3131
Err(_) => {
32-
let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build()?;
32+
let rt = tokio::runtime::Builder::new_multi_thread()
33+
.thread_name_fn(|| {
34+
"MY-CUSTOM".to_owned()
35+
})
36+
.worker_threads(5)
37+
.max_blocking_threads(20)
38+
.on_thread_start(|| {
39+
println!("THREAD started");
40+
})
41+
.enable_all().build()?;
3342
RuntimeMode::Owned(rt)
3443
},
3544
};
@@ -67,7 +76,7 @@ impl Runtime {
6776
{
6877
let mut background_tasks = self.background_tasks.lock().unwrap();
6978
let runtime_handle = self.handle();
70-
background_tasks.spawn_on(future, runtime_handle);
79+
background_tasks.spawn_on(async { future.await }, runtime_handle);
7180
}
7281

7382
pub fn spawn_cancellable_background_task<F>(&self, future: F)
@@ -76,7 +85,7 @@ impl Runtime {
7685
{
7786
let mut cancellable_background_tasks = self.cancellable_background_tasks.lock().unwrap();
7887
let runtime_handle = self.handle();
79-
cancellable_background_tasks.spawn_on(future, runtime_handle);
88+
cancellable_background_tasks.spawn_on(async { future.await }, runtime_handle);
8089
}
8190

8291
pub fn spawn_background_processor_task<F>(&self, future: F)
@@ -87,7 +96,7 @@ impl Runtime {
8796
debug_assert!(background_processor_task.is_none(), "Expected no background processor_task");
8897

8998
let runtime_handle = self.handle();
90-
let handle = runtime_handle.spawn(future);
99+
let handle = runtime_handle.spawn(async { future.await });
91100
*background_processor_task = Some(handle);
92101
}
93102

@@ -106,21 +115,39 @@ impl Runtime {
106115
// during `block_on`, as this is the context `block_in_place` would operate on. So we try
107116
// to detect the outer context here, and otherwise use whatever was set during
108117
// initialization.
109-
let handle = tokio::runtime::Handle::try_current().unwrap_or(self.handle().clone());
110-
tokio::task::block_in_place(move || handle.block_on(future))
118+
let runtime_handle = tokio::runtime::Handle::try_current().unwrap_or(self.handle().clone());
119+
tokio::task::block_in_place(move || runtime_handle.block_on(async { future.await }))
120+
}
121+
122+
pub fn spawn_block_on<F: Future + Send + 'static>(&self, future: F, write_id: u64) -> F::Output
123+
where
124+
<F as std::future::Future>::Output: Send + std::fmt::Debug,
125+
{
126+
// While we generally decided not to overthink via which call graph users would enter our
127+
// runtime context, we'd still try to reuse whatever current context would be present
128+
// during `block_on`, as this is the context `block_in_place` would operate on. So we try
129+
// to detect the outer context here, and otherwise use whatever was set during
130+
// initialization.
131+
let runtime_handle = tokio::runtime::Handle::try_current().unwrap_or(self.handle().clone());
132+
println!("RUNTIME STATS BEFORE: {} workers, {} blocking queue depth", runtime_handle.metrics().num_workers(), runtime_handle.metrics().blocking_queue_depth());
133+
let res = tokio::task::block_in_place(move || runtime_handle.block_on(async { future.await }));
134+
let runtime_handle_2 = tokio::runtime::Handle::try_current().unwrap_or(self.handle().clone());
135+
println!("WRITE {} AFTER block_in_place", write_id);
136+
println!("RUNTIME STATS AFTER: {} workers, {} blocking queue depth", runtime_handle_2.metrics().num_workers(), runtime_handle_2.metrics().blocking_queue_depth());
137+
res
111138
}
112139

113140
pub fn abort_cancellable_background_tasks(&self) {
114141
let mut tasks = core::mem::take(&mut *self.cancellable_background_tasks.lock().unwrap());
115142
debug_assert!(tasks.len() > 0, "Expected some cancellable background_tasks");
116143
tasks.abort_all();
117-
self.block_on(async { while let Some(_) = tasks.join_next().await {} })
144+
self.block_on(async move { while let Some(_) = tasks.join_next().await {} })
118145
}
119146

120147
pub fn wait_on_background_tasks(&self) {
121148
let mut tasks = core::mem::take(&mut *self.background_tasks.lock().unwrap());
122149
debug_assert!(tasks.len() > 0, "Expected some background_tasks");
123-
self.block_on(async {
150+
self.block_on(async move {
124151
loop {
125152
let timeout_fut = tokio::time::timeout(
126153
Duration::from_secs(BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS),
@@ -154,7 +181,7 @@ impl Runtime {
154181
self.background_processor_task.lock().unwrap().take()
155182
{
156183
let abort_handle = background_processor_task.abort_handle();
157-
let timeout_res = self.block_on(async {
184+
let timeout_res = self.block_on(async move {
158185
tokio::time::timeout(
159186
Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS),
160187
background_processor_task,

tests/integration_tests_vss.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,20 @@ mod common;
1111

1212
use std::collections::HashMap;
1313

14+
use ldk_node::logger::LogLevel;
1415
use ldk_node::Builder;
1516

1617
#[test]
1718
fn channel_full_cycle_with_vss_store() {
19+
console_subscriber::init();
1820
let (bitcoind, electrsd) = common::setup_bitcoind_and_electrsd();
1921
println!("== Node A ==");
2022
let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap());
2123
let config_a = common::random_config(true);
2224
let mut builder_a = Builder::from_config(config_a.node_config);
2325
builder_a.set_chain_source_esplora(esplora_url.clone(), None);
26+
builder_a.set_filesystem_logger(None, Some(LogLevel::Trace));
27+
builder_a.set_entropy_seed_bytes([42u8; 64]);
2428
let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap();
2529
let node_a = builder_a
2630
.build_with_vss_store_and_fixed_headers(
@@ -35,6 +39,7 @@ fn channel_full_cycle_with_vss_store() {
3539
let config_b = common::random_config(true);
3640
let mut builder_b = Builder::from_config(config_b.node_config);
3741
builder_b.set_chain_source_esplora(esplora_url.clone(), None);
42+
builder_b.set_entropy_seed_bytes([43u8; 64]);
3843
let node_b = builder_b
3944
.build_with_vss_store_and_fixed_headers(
4045
vss_base_url,

0 commit comments

Comments
 (0)