Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 24 additions & 49 deletions src/container/srv_target.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Google LLC
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
* Copyright 2016-2024 Intel Corporation.
* Copyright 2025 Google LLC
* Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -63,10 +63,11 @@ agg_rate_ctl(void *arg)
return -1;

/*
* XXX temporary workaround: EC aggregation needs to be paused during rebuilding
* to avoid the race between EC rebuild and EC aggregation.
**/
if (ds_pool_is_rebuilding(pool) && cont->sc_ec_agg_active && !param->ap_vos_agg)
* Abort EC aggregation if the pool is rebuilding or if a rebuild
* PREPARE phase has gated new EC agg rounds on this target.
*/
if (!param->ap_vos_agg && cont->sc_ec_agg_active &&
(ds_pool_is_rebuilding(pool) || ds_pool_child_ec_agg_paused(cont->sc_pool)))
return -1;

/* When system is idle or under space pressure, let aggregation run in tight mode */
Expand Down Expand Up @@ -207,10 +208,12 @@ cont_aggregate_runnable(struct ds_cont_child *cont, struct sched_request *req,
return false;
}

if (ds_pool_is_rebuilding(pool) && !vos_agg) {
D_DEBUG(DB_EPC, DF_CONT ": skip EC aggregation during rebuild %d, %d.\n",
if (cont->sc_pool->spc_ec_agg_pause_gate != 0 && !vos_agg) {
D_DEBUG(DB_EPC,
DF_CONT ": skip EC aggregation during rebuild %d, %d gate " DF_U64 "\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid),
atomic_load(&pool->sp_rebuilding), pool->sp_rebuild_scan);
atomic_load(&pool->sp_rebuilding), pool->sp_rebuild_scan,
cont->sc_pool->spc_ec_agg_pause_gate);
return false;
}

Expand Down Expand Up @@ -329,8 +332,7 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb,
uint32_t flags = 0;
int i, rc = 0;

change_hlc = max(cont->sc_snapshot_delete_hlc,
cont->sc_pool->spc_rebuild_end_hlc);
change_hlc = cont->sc_snapshot_delete_hlc;
if (param->ap_full_scan_hlc < change_hlc) {
/* Snapshot has been deleted or rebuild happens since the last
* aggregation, let's restart from 0.
Expand Down Expand Up @@ -378,41 +380,18 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb,
D_DEBUG(DB_EPC, "hlc "DF_X64" epoch "DF_X64"/"DF_X64" agg max "DF_X64"\n",
hlc, epoch_max, epoch_min, cont->sc_aggregation_max);

if (cont->sc_snapshots_nr + 1 < MAX_SNAPSHOT_LOCAL) {
if (cont->sc_snapshots_nr < MAX_SNAPSHOT_LOCAL) {
snapshots = snapshots_local;
} else {
D_ALLOC(snapshots, (cont->sc_snapshots_nr + 1) *
sizeof(daos_epoch_t));
D_ALLOC(snapshots, cont->sc_snapshots_nr * sizeof(daos_epoch_t));
if (snapshots == NULL)
return -DER_NOMEM;
}

if (cont->sc_pool->spc_rebuild_fence != 0) {
uint64_t rebuild_fence = cont->sc_pool->spc_rebuild_fence;
int j;
int insert_idx;

/* insert rebuild_fetch into the snapshot list */
D_DEBUG(DB_EPC, "rebuild fence "DF_X64"\n", rebuild_fence);
for (j = 0, insert_idx = 0; j < cont->sc_snapshots_nr; j++) {
if (cont->sc_snapshots[j] < rebuild_fence) {
snapshots[j] = cont->sc_snapshots[j];
insert_idx++;
} else {
snapshots[j + 1] = cont->sc_snapshots[j];
}
}
snapshots[insert_idx] = rebuild_fence;
snapshots_nr = cont->sc_snapshots_nr + 1;
} else {
/* Since sc_snapshots might be freed by other ULT, let's
* always copy here.
*/
snapshots_nr = cont->sc_snapshots_nr;
if (snapshots_nr > 0)
memcpy(snapshots, cont->sc_snapshots,
snapshots_nr * sizeof(daos_epoch_t));
}
/* Since sc_snapshots might be freed by other ULT, let's always copy here. */
snapshots_nr = cont->sc_snapshots_nr;
if (snapshots_nr > 0)
memcpy(snapshots, cont->sc_snapshots, snapshots_nr * sizeof(daos_epoch_t));

/* Find highest snapshot less than last aggregated epoch. */
for (i = 0; i < snapshots_nr && snapshots[i] < epoch_min; ++i)
Expand Down Expand Up @@ -992,11 +971,11 @@ void
ds_cont_child_wait_ec_agg_pause(struct ds_pool_child *pool_child, int wait_timeout)
{
uint64_t start_time = daos_wallclock_secs();
int wait_intv = 10;
int wait_intv = 1;
int waited = 0;

D_DEBUG(DB_MD, DF_UUID "[%d]: wait for pausing EC aggregation\n",
DP_UUID(pool_child->spc_uuid), dss_get_module_info()->dmi_tgt_id);
D_INFO(DF_UUID "[%d]: wait for local EC aggregation to pause\n",
DP_UUID(pool_child->spc_uuid), dss_get_module_info()->dmi_tgt_id);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks fine to use INFO, then L976 can remove


if (wait_timeout == 0 || wait_timeout > WAIT_EC_PAUSE_MAX)
wait_timeout = WAIT_EC_PAUSE_MAX; /* 10 minutes by default */
Expand All @@ -1005,10 +984,6 @@ ds_cont_child_wait_ec_agg_pause(struct ds_pool_child *pool_child, int wait_timeo
struct ds_cont_child *coc;
bool paused = true;

/* Wait for pausing aggregation
* XXX: There is no global barrier so we always wait for at least 10 seconds to
* lower the chance that remote targets are still running EC aggregation.
*/
if (wait_intv > wait_timeout - waited)
wait_intv = wait_timeout - waited;

Expand All @@ -1025,7 +1000,7 @@ ds_cont_child_wait_ec_agg_pause(struct ds_pool_child *pool_child, int wait_timeo

waited = daos_wallclock_secs() - start_time;
if (waited >= wait_timeout) {
D_WARN("can't pause EC aggregation after %d seconds\n", waited);
D_WARN("can't pause local EC aggregation after %d seconds\n", waited);
return; /* XXX what can I do? */
}

Expand Down
4 changes: 2 additions & 2 deletions src/include/daos/rpc.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
* Copyright 2016-2024 Intel Corporation.
* Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down
26 changes: 15 additions & 11 deletions src/include/daos_srv/pool.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
* Copyright 2016-2024 Intel Corporation.
* Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -175,19 +175,16 @@ struct ds_pool_child {
struct sched_request *spc_flush_req; /* Dedicated VEA flush ULT */
struct sched_request *spc_scrubbing_req; /* Track scrubbing ULT*/
struct sched_request *spc_chkpt_req; /* Track checkpointing ULT*/
d_list_t spc_cont_list;
d_list_t spc_cont_list;
d_list_t spc_srv_cont_hdl; /* Single server cont handle */

/* The current maxim rebuild epoch, (0 if there is no rebuild), so
* vos aggregation can not cross this epoch during rebuild to avoid
* interfering rebuild process.
/* Stores the local HLC token set by the PREPARE phase. Non-zero
* blocks new EC aggregation rounds during rebuild. rebuild_fini_one()
* compares this value against rt_ec_pause_token to decide whether this
* rebuild still owns the gate.
*/
uint64_t spc_rebuild_fence;
uint64_t spc_ec_agg_pause_gate;

/* The HLC when current rebuild ends, which will be used to compare
* with the aggregation full scan start HLC to know whether the
* aggregation needs to be restarted from 0. */
uint64_t spc_rebuild_end_hlc;
uint32_t spc_map_version;
int spc_ref;
ABT_eventual spc_ref_eventual;
Expand Down Expand Up @@ -222,6 +219,13 @@ ds_pool_is_rebuilding(struct ds_pool *pool)
return (atomic_load(&pool->sp_rebuilding) > 0 || pool->sp_rebuild_scan > 0);
}

/* Returns true if EC aggregation is gated by a rebuild PREPARE phase. */
static inline bool
ds_pool_child_ec_agg_paused(struct ds_pool_child *dpc)
{
return dpc->spc_ec_agg_pause_gate != 0;
}

/* encode metadata RPC operation key: HLC time first, in network order, for keys sorted by time.
* allocates the byte-stream, caller must free with D_FREE().
*/
Expand Down
51 changes: 25 additions & 26 deletions src/object/srv_ec_aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1329,15 +1329,13 @@ agg_peer_check_avail(struct ec_agg_param *agg_param, struct ec_agg_entry *entry)
int i;
int rc;

if (ds_pool_is_rebuilding(agg_param->ap_cont->sc_pool->spc_pool)) {
/* We currently pause EC aggregation for rebuild, so just cancel the
* aggregation for the current stripe. It means the following peer status
* check may not be checked at all, but let's keep the code because it could
* be useful in the future.
*/
D_ERROR(DF_UOID " pauses EC aggregation for rebuild\n", DP_UOID(entry->ae_oid));
/* Abort early if rebuild PREPARE has gated EC agg on this target. */
if (ds_pool_child_ec_agg_paused(agg_param->ap_cont->sc_pool)) {
D_DEBUG(DB_EPC, DF_UOID ": abort peer check, EC agg gated for rebuild\n",
DP_UOID(entry->ae_oid));
return -DER_OP_CANCELED;
}
D_ASSERT(!ds_pool_is_rebuilding(agg_param->ap_cont->sc_pool->spc_pool));

rc = pool_map_find_failed_tgts(agg_param->ap_pool_info.api_pool->sp_map, &targets,
&failed_tgts_cnt);
Expand Down Expand Up @@ -1918,12 +1916,15 @@ agg_process_stripe(struct ec_agg_param *agg_param, struct ec_agg_entry *entry)

/* avoid race between EC aggregation and rebuild scanner */
agg_param->ap_cont->sc_ec_agg_updates++;
if (ds_pool_is_rebuilding(agg_param->ap_cont->sc_pool->spc_pool)) {
D_DEBUG(DB_EPC, DF_UOID " abort as rebuild started\n", DP_UOID(entry->ae_oid));
/* Abort early if rebuild PREPARE has gated EC agg on this target. */
if (ds_pool_child_ec_agg_paused(agg_param->ap_cont->sc_pool)) {
D_DEBUG(DB_EPC, DF_UOID ": abort stripe update, EC agg gated for rebuild\n",
DP_UOID(entry->ae_oid));
update_vos = false;
rc = -1;
goto out;
}
D_ASSERT(!ds_pool_is_rebuilding(agg_param->ap_cont->sc_pool->spc_pool));

if (DAOS_FAIL_CHECK(DAOS_FORCE_EC_AGG_FAIL))
D_GOTO(out, rc = -DER_DATA_LOSS);
Expand Down Expand Up @@ -2345,12 +2346,16 @@ ec_aggregate_yield(struct ec_agg_param *agg_param)
{
int rc;

if (ds_pool_is_rebuilding(agg_param->ap_pool_info.api_pool)) {
D_INFO(DF_UUID ": abort ec aggregation, sp_rebuilding %d\n",
/* Gate is set during rebuild PREPARE phase to drain in-flight EC agg rounds.
* By the time rebuild starts (sp_rebuilding > 0), all EC agg must be drained.
*/
if (ds_pool_child_ec_agg_paused(agg_param->ap_cont->sc_pool)) {
D_INFO(DF_UUID ": abort ec aggregation, ec_agg_pause_gate " DF_U64 "\n",
DP_UUID(agg_param->ap_pool_info.api_pool->sp_uuid),
atomic_load(&agg_param->ap_pool_info.api_pool->sp_rebuilding));
agg_param->ap_cont->sc_pool->spc_ec_agg_pause_gate);
return true;
}
D_ASSERT(!ds_pool_is_rebuilding(agg_param->ap_pool_info.api_pool));

D_ASSERT(agg_param->ap_yield_func != NULL);
rc = agg_param->ap_yield_func(agg_param->ap_yield_arg);
Expand Down Expand Up @@ -2557,16 +2562,12 @@ agg_iterate_pre_cb(daos_handle_t ih, vos_iter_entry_t *entry,

D_ASSERT(agg_param->ap_initialized);

/* If rebuild started, abort it to save conflict window with rebuild
* (see obj_inflight_io_check()).
*/
if (ds_pool_is_rebuilding(agg_param->ap_pool_info.api_pool)) {
D_INFO(DF_CONT " abort as rebuild started, sp_rebuilding %d\n",
DP_CONT(agg_param->ap_pool_info.api_pool_uuid,
agg_param->ap_pool_info.api_cont_uuid),
atomic_load(&agg_param->ap_pool_info.api_pool->sp_rebuilding));
/* Abort early per VOS iteration entry if rebuild PREPARE has gated EC agg. */
if (ds_pool_child_ec_agg_paused(agg_param->ap_cont->sc_pool)) {
D_DEBUG(DB_EPC, ": abort iteration, EC agg gated for rebuild\n");
return -1;
}
D_ASSERT(!ds_pool_is_rebuilding(agg_param->ap_pool_info.api_pool));

switch (type) {
case VOS_ITER_OBJ:
Expand Down Expand Up @@ -2846,10 +2847,11 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
agg_clear_extents(&ec_agg_param->ap_agg_entry);
agg_reset_entry(&ec_agg_param->ap_agg_entry, NULL, NULL);

if (rc == -DER_BUSY && !ds_pool_is_rebuilding(cont->sc_pool->spc_pool)) {
if (rc == -DER_BUSY) {
/** Hit an object conflict VOS aggregation or discard. Rather than exiting, let's
* yield and try again.
*/
D_ASSERT(!ds_pool_is_rebuilding(cont->sc_pool->spc_pool));
opm = cont->sc_pool->spc_metrics[DAOS_OBJ_MODULE];
d_tm_inc_counter(opm->opm_ec_agg_blocked, 1);
blocks++;
Expand All @@ -2862,11 +2864,8 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
}

update_hae:
/* clear the flag before next turn's cont_aggregate_runnable(), to save conflict
* window with rebuild (see obj_inflight_io_check()).
*/
if (ds_pool_is_rebuilding(cont->sc_pool->spc_pool))
cont->sc_ec_agg_active = 0;
/* EC agg is globally drained before rebuild starts; they must not run concurrently. */
D_ASSERT(!ds_pool_is_rebuilding(cont->sc_pool->spc_pool));

if (rc == 0) {
/* If pool map updated during this round of aggregation, the sc_ec_agg_eph
Expand Down
23 changes: 18 additions & 5 deletions src/rebuild/rebuild_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,14 @@ struct rebuild_tgt_pool_tracker {
uint64_t rt_stable_epoch;

/* Only used by reclaim job to discard those half-rebuild data */
uint64_t rt_reclaim_epoch;
/* local rebuild epoch mainly to constrain the VOS aggregation
* to make sure aggregation will not cross the epoch
uint64_t rt_reclaim_epoch;
/* HLC token read from spc_ec_agg_pause_gate during START phase.
* Used in rebuild_fini_one() to check whether this rebuild still
* owns the EC agg pause gate.
*/
uint64_t rt_rebuild_fence;
uint64_t rt_ec_pause_token;

uint32_t rt_leader_rank;
uint32_t rt_leader_rank;

/* Global dtx resync version */
uint32_t rt_global_dtx_resync_version;
Expand Down Expand Up @@ -233,6 +234,12 @@ struct rebuild_global {
extern struct rebuild_global rebuild_gst;
extern unsigned int rebuild_wait_ec_pause;

enum rebuild_scan_phase {
RB_SCAN_PREPARE = 0,
RB_SCAN_START = 1,
RB_SCAN_CANCEL = 2,
};

struct rebuild_task {
d_list_t dst_list;
uuid_t dst_pool_uuid;
Expand Down Expand Up @@ -374,6 +381,12 @@ rebuild_tgt_status_check_ult(void *arg);
int
rebuild_tgt_prepare(crt_rpc_t *rpc, struct rebuild_tgt_pool_tracker **p_rpt);

int
rebuild_tgt_stop_agg(crt_rpc_t *rpc, uint64_t *stable_epoch);

int
rebuild_tgt_resume_agg(crt_rpc_t *rpc);

bool
rebuild_status_match(struct rebuild_tgt_pool_tracker *rpt,
enum pool_comp_state states);
Expand Down
27 changes: 25 additions & 2 deletions src/rebuild/scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -1052,8 +1052,6 @@ rebuild_scanner(void *data)
if (child == NULL)
D_GOTO(out, rc = -DER_NONEXIST);

ds_cont_child_wait_ec_agg_pause(child, rebuild_wait_ec_pause);

/* There maybe orphan DTX entries after DTX resync, let's cleanup before rebuild scan. */
rc = dtx_cleanup_orphan(rpt->rt_pool_uuid, rpt->rt_pool->sp_dtx_resync_version);
if (rc != 0)
Expand Down Expand Up @@ -1208,6 +1206,31 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc)

D_INFO(DF_RB "\n", DP_RB_RSI(rsi));

if (rsi->rsi_phase == RB_SCAN_PREPARE) {
uint64_t stable_epoch = 0;

D_INFO(DF_RB ": handle global EC agg pause prepare\n", DP_RB_RSI(rsi));
rc = rebuild_tgt_stop_agg(rpc, &stable_epoch);
rout = crt_reply_get(rpc);
rout->rso_status = rc;
rout->rso_stable_epoch = stable_epoch;
D_INFO(DF_RB ": global EC agg pause prepare done, stable epoch " DF_U64
", rc=" DF_RC "\n",
DP_RB_RSI(rsi), stable_epoch, DP_RC(rc));
dss_rpc_reply(rpc, DAOS_REBUILD_DROP_SCAN);
return;
}

if (rsi->rsi_phase == RB_SCAN_CANCEL) {
D_INFO(DF_RB ": handle global EC agg pause cancel\n", DP_RB_RSI(rsi));
rc = rebuild_tgt_resume_agg(rpc);
rout = crt_reply_get(rpc);
rout->rso_status = rc;
rout->rso_stable_epoch = 0;
dss_rpc_reply(rpc, DAOS_REBUILD_DROP_SCAN);
return;
}

/* If PS leader has been changed, and rebuild version is also increased
* due to adding new failure targets for rebuild, let's abort previous
* rebuild.
Expand Down
Loading
Loading