Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 12 additions & 38 deletions src/container/srv_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ cont_aggregate_runnable(struct ds_cont_child *cont, struct sched_request *req,
if (ds_pool_is_rebuilding(pool) && !vos_agg) {
D_DEBUG(DB_EPC, DF_CONT ": skip EC aggregation during rebuild %d, %d.\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid),
atomic_load(&pool->sp_rebuilding), pool->sp_rebuild_scan);
atomic_load(&pool->sp_rebuilding), atomic_load(&pool->sp_rebuild_scanning));
return false;
}

Expand Down Expand Up @@ -319,8 +319,7 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb,
daos_epoch_t epoch_max, epoch_min;
daos_epoch_range_t epoch_range;
struct sched_request *req = cont2req(cont, param->ap_vos_agg);
uint64_t hlc = d_hlc_get();
uint64_t change_hlc;
uint64_t hlc = d_hlc_get();
uint64_t interval;
uint64_t snapshots_local[MAX_SNAPSHOT_LOCAL] = { 0 };
uint64_t *snapshots = NULL;
Expand All @@ -329,16 +328,14 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb,
uint32_t flags = 0;
int i, rc = 0;

change_hlc = max(cont->sc_snapshot_delete_hlc,
cont->sc_pool->spc_rebuild_end_hlc);
if (param->ap_full_scan_hlc < change_hlc) {
/* Snapshot has been deleted or rebuild happens since the last
if (param->ap_full_scan_hlc < cont->sc_snapshot_delete_hlc) {
/* Snapshot has been deleted since the last
* aggregation, let's restart from 0.
*/
epoch_min = 0;
flags |= VOS_AGG_FL_FORCE_SCAN;
D_DEBUG(DB_EPC, "change hlc "DF_X64" > full "DF_X64"\n",
change_hlc, param->ap_full_scan_hlc);
D_DEBUG(DB_EPC, "snapshot del hlc " DF_X64 " > full " DF_X64 "\n",
cont->sc_snapshot_delete_hlc, param->ap_full_scan_hlc);
} else {
epoch_min = get_hae(cont, param->ap_vos_agg);
}
Expand Down Expand Up @@ -378,41 +375,18 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb,
D_DEBUG(DB_EPC, "hlc "DF_X64" epoch "DF_X64"/"DF_X64" agg max "DF_X64"\n",
hlc, epoch_max, epoch_min, cont->sc_aggregation_max);

if (cont->sc_snapshots_nr + 1 < MAX_SNAPSHOT_LOCAL) {
if (cont->sc_snapshots_nr < MAX_SNAPSHOT_LOCAL) {
snapshots = snapshots_local;
} else {
D_ALLOC(snapshots, (cont->sc_snapshots_nr + 1) *
sizeof(daos_epoch_t));
D_ALLOC(snapshots, cont->sc_snapshots_nr * sizeof(daos_epoch_t));
if (snapshots == NULL)
return -DER_NOMEM;
}

if (cont->sc_pool->spc_rebuild_fence != 0) {
uint64_t rebuild_fence = cont->sc_pool->spc_rebuild_fence;
int j;
int insert_idx;

/* insert rebuild_fetch into the snapshot list */
D_DEBUG(DB_EPC, "rebuild fence "DF_X64"\n", rebuild_fence);
for (j = 0, insert_idx = 0; j < cont->sc_snapshots_nr; j++) {
if (cont->sc_snapshots[j] < rebuild_fence) {
snapshots[j] = cont->sc_snapshots[j];
insert_idx++;
} else {
snapshots[j + 1] = cont->sc_snapshots[j];
}
}
snapshots[insert_idx] = rebuild_fence;
snapshots_nr = cont->sc_snapshots_nr + 1;
} else {
/* Since sc_snapshots might be freed by other ULT, let's
* always copy here.
*/
snapshots_nr = cont->sc_snapshots_nr;
if (snapshots_nr > 0)
memcpy(snapshots, cont->sc_snapshots,
snapshots_nr * sizeof(daos_epoch_t));
}
/* Since sc_snapshots might be freed by other ULT, let's always copy here. */
snapshots_nr = cont->sc_snapshots_nr;
if (snapshots_nr > 0)
memcpy(snapshots, cont->sc_snapshots, snapshots_nr * sizeof(daos_epoch_t));

/* Find highest snapshot less than last aggregated epoch. */
for (i = 0; i < snapshots_nr && snapshots[i] < epoch_min; ++i)
Expand Down
4 changes: 2 additions & 2 deletions src/include/daos_srv/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ struct ds_cont_child {
* VOS aggregation will use this boundary. We will optimize it later.
*/
uint64_t sc_ec_agg_eph_boundary;
/* The current EC aggregate epoch for this xstream */
/* The local EC aggregation epoch for this xstream */
uint64_t sc_ec_agg_eph;
/* Used by ds_cont_eph_report() to query the minimum ec_agg_eph and stable_eph
* from all local VOS.
Expand Down Expand Up @@ -160,7 +160,7 @@ struct ds_cont_child {
struct agg_param {
void *ap_data;
struct ds_cont_child *ap_cont;
daos_epoch_t ap_full_scan_hlc;
daos_epoch_t ap_full_scan_hlc;
bool ap_vos_agg;
};

Expand Down
21 changes: 5 additions & 16 deletions src/include/daos_srv/pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,8 @@ struct ds_pool {
uint32_t sp_rebuild_gen;
ATOMIC int sp_rebuilding;
ATOMIC int sp_discarding;
/**
* someone has already messaged this pool to for rebuild scan,
* NB: all xstreams can do lockless-write on it but it's OK
*/
int sp_rebuild_scan;
/* someone has already messaged this pool to for rebuild scan */
ATOMIC int sp_rebuild_scanning;

int sp_discard_status;
/** path to ephemeral metrics */
Expand Down Expand Up @@ -178,16 +175,7 @@ struct ds_pool_child {
d_list_t spc_cont_list;
d_list_t spc_srv_cont_hdl; /* Single server cont handle */

/* The current maxim rebuild epoch, (0 if there is no rebuild), so
* vos aggregation can not cross this epoch during rebuild to avoid
* interfering rebuild process.
*/
uint64_t spc_rebuild_fence;

/* The HLC when current rebuild ends, which will be used to compare
* with the aggregation full scan start HLC to know whether the
* aggregation needs to be restarted from 0. */
uint64_t spc_rebuild_end_hlc;
uint64_t spc_rebuild_start;
uint32_t spc_map_version;
int spc_ref;
ABT_eventual spc_ref_eventual;
Expand Down Expand Up @@ -219,7 +207,8 @@ struct ds_pool_svc_op_val {
static inline bool
ds_pool_is_rebuilding(struct ds_pool *pool)
{
return (atomic_load(&pool->sp_rebuilding) > 0 || pool->sp_rebuild_scan > 0);
return (atomic_load(&pool->sp_rebuilding) > 0 ||
atomic_load(&pool->sp_rebuild_scanning) > 0);
}

/* encode metadata RPC operation key: HLC time first, in network order, for keys sorted by time.
Expand Down
2 changes: 1 addition & 1 deletion src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -3439,7 +3439,7 @@ obj_local_enum(struct obj_io_context *ioc, crt_rpc_t *rpc,
* by setting this flag.
* NB: it's a lockess write to shared data structure and it's harmless.
*/
ioc->ioc_coc->sc_pool->spc_pool->sp_rebuild_scan = 1;
atomic_fetch_add(&ioc->ioc_coc->sc_pool->spc_pool->sp_rebuild_scanning, 1);
flags = DTX_FOR_MIGRATION;
}

Expand Down
8 changes: 5 additions & 3 deletions src/rebuild/rebuild_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ struct rebuild_tgt_pool_tracker {

/* Only used by reclaim job to discard those half-rebuild data */
uint64_t rt_reclaim_epoch;
/* local rebuild epoch mainly to constrain the VOS aggregation
* to make sure aggregation will not cross the epoch
/*
* XX: remove this.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean replace by other method?

* rebuild_fini_one() compare this value against rt_rebuild_start to
* decide whether this rebuild still owns this vos pool's rebuild.
*/
uint64_t rt_rebuild_fence;
uint64_t rt_rebuild_start;

uint32_t rt_leader_rank;

Expand Down
6 changes: 3 additions & 3 deletions src/rebuild/scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -1316,8 +1316,6 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc)
D_GOTO(out, rc);
}

atomic_fetch_add(&rpt->rt_pool->sp_rebuilding, 1); /* reset in rebuild_tgt_fini */

rpt_get(rpt);
/* step-3: start scan leader */
rc = dss_ult_create(rebuild_scan_leader, rpt, DSS_XS_SELF, 0, 0, NULL);
Expand All @@ -1331,8 +1329,10 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc)
tls->rebuild_pool_status = rc;

if (rpt) {
if (rc)
if (rc) {
atomic_fetch_sub(&rpt->rt_pool->sp_rebuilding, 1);
rpt_delete(rpt);
}
rpt_put(rpt);
}
rout = crt_reply_get(rpc);
Expand Down
39 changes: 18 additions & 21 deletions src/rebuild/srv.c
Original file line number Diff line number Diff line change
Expand Up @@ -2869,20 +2869,15 @@ rebuild_fini_one(void *arg)
if (dpc == NULL)
return 0;

/* Reset rebuild epoch, then reset the aggregation epoch, so
* it can aggregate the rebuild epoch.
*/
D_ASSERT(rpt->rt_rebuild_fence != 0);
if (rpt->rt_rebuild_fence == dpc->spc_rebuild_fence) {
dpc->spc_rebuild_fence = 0;
dpc->spc_rebuild_end_hlc = d_hlc_get();
D_DEBUG(DB_REBUILD, DF_RB ": Reset aggregation end hlc " DF_U64 "\n",
DP_RB_RPT(rpt), dpc->spc_rebuild_end_hlc);
D_ASSERT(rpt->rt_rebuild_start != 0);
if (rpt->rt_rebuild_start == dpc->spc_rebuild_start) {
dpc->spc_rebuild_start = 0;
D_DEBUG(DB_REBUILD, DF_RB ": Reset rebuild start epoch\n", DP_RB_RPT(rpt));
} else {
D_DEBUG(DB_REBUILD,
DF_RB ": pool is still being rebuilt rt_rebuild_fence " DF_U64
" spc_rebuild_fence " DF_U64 "\n",
DP_RB_RPT(rpt), rpt->rt_rebuild_fence, dpc->spc_rebuild_fence);
DF_RB ": pool is still being rebuilt rt_rebuild_start " DF_U64
" spc_rebuild_start " DF_U64 "\n",
DP_RB_RPT(rpt), rpt->rt_rebuild_start, dpc->spc_rebuild_start);
}

ds_pool_child_put(dpc);
Expand All @@ -2900,7 +2895,9 @@ rebuild_tgt_fini(struct rebuild_tgt_pool_tracker *rpt)

D_ASSERT(atomic_load(&rpt->rt_pool->sp_rebuilding) > 0);
atomic_fetch_sub(&rpt->rt_pool->sp_rebuilding, 1);
rpt->rt_pool->sp_rebuild_scan = 0;

D_ASSERT(atomic_load(&rpt->rt_pool->sp_rebuild_scanning) > 0);
atomic_store(&rpt->rt_pool->sp_rebuild_scanning, 0);

ABT_mutex_lock(rpt->rt_lock);
ABT_cond_signal(rpt->rt_global_dtx_wait_cond);
Expand Down Expand Up @@ -3127,13 +3124,10 @@ rebuild_prepare_one(void *data)

D_ASSERT(dss_get_module_info()->dmi_xs_id != 0);

/* Set the rebuild epoch per VOS container, so VOS aggregation will not
* cross the epoch to cause problem.
*/
D_ASSERT(rpt->rt_rebuild_fence != 0);
dpc->spc_rebuild_fence = rpt->rt_rebuild_fence;
D_ASSERT(rpt->rt_rebuild_start != 0);
dpc->spc_rebuild_start = rpt->rt_rebuild_start;
D_DEBUG(DB_REBUILD, DF_RB " open local container " DF_UUID " rebuild eph " DF_X64 "\n",
DP_RB_RPT(rpt), DP_UUID(rpt->rt_coh_uuid), rpt->rt_rebuild_fence);
DP_RB_RPT(rpt), DP_UUID(rpt->rt_coh_uuid), rpt->rt_rebuild_start);

put:
ds_pool_child_put(dpc);
Expand Down Expand Up @@ -3213,6 +3207,8 @@ rebuild_tgt_prepare(crt_rpc_t *rpc, struct rebuild_tgt_pool_tracker **p_rpt)
DL_ERROR(rc, DF_RB " cannot find pool", DP_RB_RSI(rsi));
return rc;
}
/* must set rebuild flag before yield, it's reset in rebuild_tgt_fini */
atomic_fetch_add(&pool->sp_rebuilding, 1);

if (ds_pool_get_version(pool) < rsi->rsi_rebuild_ver) {
D_INFO(DF_RB " map %u < rsi_rebuild_ver %u\n", DP_RB_RSI(rsi),
Expand Down Expand Up @@ -3275,12 +3271,12 @@ rebuild_tgt_prepare(crt_rpc_t *rpc, struct rebuild_tgt_pool_tracker **p_rpt)
if (pool_tls == NULL)
D_GOTO(out, rc = -DER_NOMEM);

rpt->rt_rebuild_fence = d_hlc_get();
rpt->rt_rebuild_start = d_hlc_get();
rc = ds_pool_task_collective(rpt->rt_pool_uuid,
PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT,
rebuild_prepare_one, rpt, 0);
if (rc) {
rpt->rt_rebuild_fence = 0;
rpt->rt_rebuild_start = 0;
rebuild_pool_tls_destroy(pool_tls);
D_GOTO(out, rc);
}
Expand All @@ -3299,6 +3295,7 @@ rebuild_tgt_prepare(crt_rpc_t *rpc, struct rebuild_tgt_pool_tracker **p_rpt)
}
rpt_put(rpt);
}
atomic_fetch_sub(&pool->sp_rebuilding, 1);
ds_pool_put(pool);
}
daos_prop_fini(&prop);
Expand Down
Loading