Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 14 additions & 43 deletions src/container/srv_target.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/**
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -184,7 +184,7 @@ cont_aggregate_runnable(struct ds_cont_child *cont, struct sched_request *req,
if (ds_pool_is_rebuilding(pool) && !vos_agg) {
D_DEBUG(DB_EPC, DF_CONT ": skip EC aggregation during rebuild %d, %d.\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid),
atomic_load(&pool->sp_rebuilding), pool->sp_rebuild_scan);
atomic_load(&pool->sp_rebuilding), atomic_load(&pool->sp_rebuild_enum));
return false;
}

Expand Down Expand Up @@ -293,8 +293,7 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb,
daos_epoch_t epoch_max, epoch_min;
daos_epoch_range_t epoch_range;
struct sched_request *req = cont2req(cont, param->ap_vos_agg);
uint64_t hlc = d_hlc_get();
uint64_t change_hlc;
uint64_t hlc = d_hlc_get();
uint64_t interval;
uint64_t snapshots_local[MAX_SNAPSHOT_LOCAL] = { 0 };
uint64_t *snapshots = NULL;
Expand All @@ -303,16 +302,14 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb,
uint32_t flags = 0;
int i, rc = 0;

change_hlc = max(cont->sc_snapshot_delete_hlc,
cont->sc_pool->spc_rebuild_end_hlc);
if (param->ap_full_scan_hlc < change_hlc) {
/* Snapshot has been deleted or rebuild happens since the last
if (param->ap_full_scan_hlc < cont->sc_snapshot_delete_hlc) {
/* Snapshot has been deleted since the last
* aggregation, let's restart from 0.
*/
epoch_min = 0;
flags |= VOS_AGG_FL_FORCE_SCAN;
D_DEBUG(DB_EPC, "change hlc "DF_X64" > full "DF_X64"\n",
change_hlc, param->ap_full_scan_hlc);
D_DEBUG(DB_EPC, "snapshot del hlc " DF_X64 " > full " DF_X64 "\n",
cont->sc_snapshot_delete_hlc, param->ap_full_scan_hlc);
} else {
epoch_min = get_hae(cont, param->ap_vos_agg);
}
Expand Down Expand Up @@ -352,50 +349,24 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb,
D_DEBUG(DB_EPC, "hlc "DF_X64" epoch "DF_X64"/"DF_X64" agg max "DF_X64"\n",
hlc, epoch_max, epoch_min, cont->sc_aggregation_max);

if (cont->sc_snapshots_nr + 1 < MAX_SNAPSHOT_LOCAL) {
snapshots_nr = cont->sc_snapshots_nr;
if (snapshots_nr < MAX_SNAPSHOT_LOCAL) {
snapshots = snapshots_local;
} else {
D_ALLOC(snapshots, (cont->sc_snapshots_nr + 1) *
sizeof(daos_epoch_t));
D_ALLOC(snapshots, snapshots_nr * sizeof(daos_epoch_t));
if (snapshots == NULL)
return -DER_NOMEM;
}

if (cont->sc_pool->spc_rebuild_fence != 0) {
uint64_t rebuild_fence = cont->sc_pool->spc_rebuild_fence;
int j;
int insert_idx;

/* insert rebuild_fetch into the snapshot list */
D_DEBUG(DB_EPC, "rebuild fence "DF_X64"\n", rebuild_fence);
for (j = 0, insert_idx = 0; j < cont->sc_snapshots_nr; j++) {
if (cont->sc_snapshots[j] < rebuild_fence) {
snapshots[j] = cont->sc_snapshots[j];
insert_idx++;
} else {
snapshots[j + 1] = cont->sc_snapshots[j];
}
}
snapshots[insert_idx] = rebuild_fence;
snapshots_nr = cont->sc_snapshots_nr + 1;
} else {
/* Since sc_snapshots might be freed by other ULT, let's
* always copy here.
*/
snapshots_nr = cont->sc_snapshots_nr;
if (snapshots_nr > 0)
memcpy(snapshots, cont->sc_snapshots,
snapshots_nr * sizeof(daos_epoch_t));
}
/* Since sc_snapshots might be freed by other ULT, let's always copy here. */
if (snapshots_nr > 0)
memcpy(snapshots, cont->sc_snapshots, snapshots_nr * sizeof(daos_epoch_t));

/* Find highest snapshot less than last aggregated epoch. */
for (i = 0; i < snapshots_nr && snapshots[i] < epoch_min; ++i)
;

if (i == 0)
epoch_range.epr_lo = 0;
else
epoch_range.epr_lo = snapshots[i - 1] + 1;
epoch_range.epr_lo = epoch_min != 0 ? epoch_min + 1 : 0;

if (epoch_range.epr_lo >= epoch_max)
D_GOTO(free, rc = 0);
Expand Down
6 changes: 3 additions & 3 deletions src/include/daos_srv/container.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* (C) Copyright 2015-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -112,7 +112,7 @@ struct ds_cont_child {
* VOS aggregation will use this boundary. We will optimize it later.
*/
uint64_t sc_ec_agg_eph_boundary;
/* The current EC aggregate epoch for this xstream */
/* The local EC aggregation epoch for this xstream */
uint64_t sc_ec_agg_eph;
/* Used by cont_ec_eph_query_ult to query the minimum EC agg epoch from all
* local VOS.
Expand Down Expand Up @@ -142,7 +142,7 @@ struct ds_cont_child {
struct agg_param {
void *ap_data;
struct ds_cont_child *ap_cont;
daos_epoch_t ap_full_scan_hlc;
daos_epoch_t ap_full_scan_hlc;
bool ap_vos_agg;
};

Expand Down
21 changes: 4 additions & 17 deletions src/include/daos_srv/pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,10 @@ struct ds_pool {
* rebuild job.
*/
uint32_t sp_rebuild_gen;
ATOMIC int sp_rebuilding;
ATOMIC int sp_discarding;
/**
* someone has already messaged this pool to for rebuild scan,
* NB: all xstreams can do lockless-write on it but it's OK
*/
int sp_rebuild_scan;
ATOMIC int sp_rebuilding;
/* someone has already messaged this pool to for rebuild object/key enumeration */
ATOMIC int sp_rebuild_enum;

int sp_discard_status;
/** path to ephemeral metrics */
Expand Down Expand Up @@ -174,16 +171,6 @@ struct ds_pool_child {
struct sched_request *spc_chkpt_req; /* Track checkpointing ULT*/
d_list_t spc_cont_list;

/* The current maxim rebuild epoch, (0 if there is no rebuild), so
* vos aggregation can not cross this epoch during rebuild to avoid
* interfering rebuild process.
*/
uint64_t spc_rebuild_fence;

/* The HLC when current rebuild ends, which will be used to compare
* with the aggregation full scan start HLC to know whether the
* aggregation needs to be restarted from 0. */
uint64_t spc_rebuild_end_hlc;
uint32_t spc_map_version;
int spc_ref;
ABT_eventual spc_ref_eventual;
Expand Down Expand Up @@ -215,7 +202,7 @@ struct ds_pool_svc_op_val {
static inline bool
ds_pool_is_rebuilding(struct ds_pool *pool)
{
return (atomic_load(&pool->sp_rebuilding) > 0 || pool->sp_rebuild_scan > 0);
return (atomic_load(&pool->sp_rebuilding) > 0 || atomic_load(&pool->sp_rebuild_enum) > 0);
}

/* encode metadata RPC operation key: HLC time first, in network order, for keys sorted by time.
Expand Down
17 changes: 1 addition & 16 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -2486,20 +2486,6 @@ obj_inflight_io_check(struct ds_cont_child *child, uint32_t opc,
D_ERROR("reintegrating " DF_UUID " retry.\n", DP_UUID(pool->sp_uuid));
return -DER_UPDATE_AGAIN;
}

/* All I/O during rebuilding, needs to wait for the rebuild fence to
* be generated (see rebuild_prepare_one()), which will create a boundary
* for rebuild, so the data after boundary(epoch) should not be rebuilt,
* which otherwise might be written duplicately, which might cause
* the failure in VOS.
*/
if ((flags & ORF_REBUILDING_IO) &&
(!child->sc_pool->spc_pool->sp_disable_rebuild &&
child->sc_pool->spc_rebuild_fence == 0)) {
D_ERROR("rebuilding "DF_UUID" retry.\n", DP_UUID(child->sc_pool->spc_uuid));
return -DER_UPDATE_AGAIN;
}

return 0;
}

Expand Down Expand Up @@ -3447,9 +3433,8 @@ obj_local_enum(struct obj_io_context *ioc, crt_rpc_t *rpc,
if (oei->oei_flags & ORF_FOR_MIGRATION) {
/* just in case ds_pool::sp_rebuilding is not set, pause my local EC aggregation
* by setting this flag.
* NB: it's a lockess write to shared data structure and it's harmless.
*/
ioc->ioc_coc->sc_pool->spc_pool->sp_rebuild_scan = 1;
atomic_store(&ioc->ioc_coc->sc_pool->spc_pool->sp_rebuild_enum, 1);
flags = DTX_FOR_MIGRATION;
}

Expand Down
12 changes: 4 additions & 8 deletions src/rebuild/rebuild_internal.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/**
* (C) Copyright 2017-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -77,12 +77,7 @@ struct rebuild_tgt_pool_tracker {
uint64_t rt_stable_epoch;

/* Only used by reclaim job to discard those half-rebuild data */
uint64_t rt_reclaim_epoch;
/* local rebuild epoch mainly to constrain the VOS aggregation
* to make sure aggregation will not cross the epoch
*/
uint64_t rt_rebuild_fence;

uint64_t rt_reclaim_epoch;
uint32_t rt_leader_rank;

/* Global dtx resync version */
Expand Down Expand Up @@ -339,7 +334,8 @@ void
rebuild_tgt_status_check_ult(void *arg);

int
rebuild_tgt_prepare(crt_rpc_t *rpc, struct rebuild_tgt_pool_tracker **p_rpt);
rebuild_tgt_prepare(struct ds_pool *pool, struct rebuild_scan_in *rsi,
struct rebuild_tgt_pool_tracker **p_rpt);

bool
rebuild_status_match(struct rebuild_tgt_pool_tracker *rpt,
Expand Down
29 changes: 20 additions & 9 deletions src/rebuild/scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,7 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc)
struct rebuild_scan_out *rso;
struct rebuild_pool_tls *tls = NULL;
struct rebuild_tgt_pool_tracker *rpt = NULL;
struct ds_pool *pool = NULL;
int rc;

rsi = crt_req_get(rpc);
Expand All @@ -1214,6 +1215,13 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc)
rsi->rsi_rebuild_ver, rsi->rsi_rebuild_gen, rsi->rsi_master_rank,
rsi->rsi_leader_term, RB_OP_STR(rsi->rsi_rebuild_op));

rc = ds_pool_lookup(rsi->rsi_pool_uuid, &pool);
if (rc) {
D_ERROR("Can not find pool " DF_UUID ": %d\n", DP_UUID(rsi->rsi_pool_uuid), rc);
D_GOTO(out, rc);
}
atomic_fetch_add(&pool->sp_rebuilding, 1);

/* If PS leader has been changed, and rebuild version is also increased
* due to adding new failure targets for rebuild, let's abort previous
* rebuild.
Expand Down Expand Up @@ -1321,7 +1329,7 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc)
if (daos_fail_check(DAOS_REBUILD_TGT_START_FAIL))
D_GOTO(out, rc = -DER_INVAL);

rc = rebuild_tgt_prepare(rpc, &rpt);
rc = rebuild_tgt_prepare(pool, rsi, &rpt);
if (rc)
D_GOTO(out, rc);

Expand All @@ -1333,8 +1341,6 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc)
D_GOTO(out, rc);
}

atomic_fetch_add(&rpt->rt_pool->sp_rebuilding, 1); /* reset in rebuild_tgt_fini */

rpt_get(rpt);
/* step-3: start scan leader */
rc = dss_ult_create(rebuild_scan_leader, rpt, DSS_XS_SELF, 0, 0, NULL);
Expand All @@ -1344,14 +1350,19 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc)
}

out:
if (tls && tls->rebuild_pool_status == 0 && rc != 0)
tls->rebuild_pool_status = rc;

if (rpt) {
if (rc)
if (rc != 0) {
if (tls && tls->rebuild_pool_status == 0)
tls->rebuild_pool_status = rc;
if (rpt)
rpt_delete(rpt);
rpt_put(rpt);
else if (pool) /* otherwise rpt_put() will decrease this for me */
atomic_fetch_sub(&pool->sp_rebuilding, 1);
}
if (pool)
ds_pool_put(pool);
if (rpt)
rpt_put(rpt);

rso = crt_reply_get(rpc);
rso->rso_status = rc;
rso->rso_stable_epoch = d_hlc_get();
Expand Down
Loading
Loading