Skip to content

Commit 2759537

Browse files
committed
fix(pegboard): isolate runner config dc lookup failures
1 parent 0392214 commit 2759537

1 file changed

Lines changed: 37 additions & 23 deletions

File tree

engine/packages/pegboard/src/ops/runner/list_runner_config_enabled_dcs.rs

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use anyhow::Result;
22
use epoxy_protocol::generated::v2::CachingBehavior;
3-
use futures_util::{StreamExt, TryStreamExt};
3+
use futures_util::StreamExt;
44
use gas::prelude::*;
55

66
use crate::keys;
@@ -59,27 +59,41 @@ async fn list_runner_config_enabled_dcs_inner(
5959
ctx: &OperationCtx,
6060
input: &Input,
6161
) -> Result<Vec<u16>> {
62-
futures_util::stream::iter(ctx.config().topology().datacenters.clone())
63-
.map(|dc| async move {
64-
let runner_config_key = keys::runner_config::GlobalDataKey::new(
65-
dc.datacenter_label,
66-
input.namespace_id,
67-
input.runner_name.clone(),
68-
);
69-
let res = ctx
70-
.op(epoxy::ops::kv::get_optimistic::Input {
71-
replica_id: ctx.config().epoxy_replica_id(),
72-
key: namespace::keys::subspace().pack(&runner_config_key),
73-
caching_behavior: CachingBehavior::Optimistic,
74-
target_replicas: None,
75-
save_empty: true,
76-
})
77-
.await?;
62+
Ok(
63+
futures_util::stream::iter(ctx.config().topology().datacenters.clone())
64+
.map(|dc| async move {
65+
let runner_config_key = keys::runner_config::GlobalDataKey::new(
66+
dc.datacenter_label,
67+
input.namespace_id,
68+
input.runner_name.clone(),
69+
);
70+
let res = ctx
71+
.op(epoxy::ops::kv::get_optimistic::Input {
72+
replica_id: ctx.config().epoxy_replica_id(),
73+
key: namespace::keys::subspace().pack(&runner_config_key),
74+
caching_behavior: CachingBehavior::Optimistic,
75+
target_replicas: None,
76+
save_empty: true,
77+
})
78+
.await;
7879

79-
Ok(res.value.map(|_| dc.datacenter_label))
80-
})
81-
.buffer_unordered(512)
82-
.try_filter_map(|x| std::future::ready(Ok(x)))
83-
.try_collect::<Vec<_>>()
84-
.await
80+
match res {
81+
Ok(res) => res.value.map(|_| dc.datacenter_label),
82+
Err(err) => {
83+
tracing::warn!(
84+
?err,
85+
namespace_id=?input.namespace_id,
86+
runner_name=%input.runner_name,
87+
dc_label=dc.datacenter_label,
88+
"failed to read runner config from dc"
89+
);
90+
None
91+
}
92+
}
93+
})
94+
.buffer_unordered(512)
95+
.filter_map(std::future::ready)
96+
.collect::<Vec<_>>()
97+
.await,
98+
)
8599
}

0 commit comments

Comments
 (0)