Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9804de7
stub for leveled_bookie:status/1
hmmr Oct 9, 2025
779bd05
bookie_status: ledger_cache_size
hmmr Oct 11, 2025
ddb2587
bookie_status: redoing using leveled_monitor WIP
hmmr Oct 20, 2025
93a8546
bookie_status: redoing using leveled_monitor WIP
hmmr Oct 23, 2025
fc9d1fb
bookie_status: redoing using leveled_monitor WIP
hmmr Oct 23, 2025
b7fd887
fixups
hmmr Oct 27, 2025
da36264
bookie_status: redoing using leveled_monitor WIP penciller_last_merge…
hmmr Oct 28, 2025
b93116e
bookie_status: redoing using leveled_monitor WIP journal_last_compact…
hmmr Oct 28, 2025
4600df2
bookie_status: redoing using leveled_monitor mostly done
hmmr Oct 28, 2025
d576670
oblige erlfmt
hmmr Oct 29, 2025
12dab0b
bookie_status: fetch_count_by_level
hmmr Oct 29, 2025
447a066
Merge branch 'openriak-3.4' into tiot/openriak-3.4/leveled_bookie_status
hmmr Jan 15, 2026
acd0b57
ct test for bookie status report wip
hmmr Jan 19, 2026
dffe29a
avoid funs taking #state{}; assemble enriched_bookie_statue in situ
hmmr Jan 22, 2026
4314334
spec and typo fixes
hmmr Jan 22, 2026
11e30b9
include all keys, with values 'undefined' initially, in bookie status
hmmr Jan 22, 2026
56d848e
extend book status report ct case
hmmr Jan 28, 2026
d9a2a07
move journal_last_compaction_time_update to where compaction results …
hmmr Feb 1, 2026
4f1fabb
extend bookie status report test (delete+compact again)
hmmr Feb 1, 2026
644b2bf
fixup for d9a2a078a114
hmmr Feb 2, 2026
83032d2
typo
hmmr Feb 2, 2026
bd54c7e
more typos
hmmr Feb 2, 2026
526a362
extend, tweak book status report, now with journal and penciller merges
hmmr Feb 2, 2026
e1faffc
move n_active_journal_files_update increment to cdb_roll handler
hmmr Feb 3, 2026
04d4bd7
better counting and checking of active journal files in bookie_status ct
hmmr Feb 3, 2026
27749c8
fix unused vars after removing code that used them
hmmr Feb 3, 2026
ddf15c4
also incr n_active_journal_files on open_reader
hmmr Feb 4, 2026
0bf6905
have all items in bookie status map at outset
hmmr Feb 5, 2026
df7954d
get right the counting of active journal files
hmmr Feb 5, 2026
74a7b0c
add missing penciller_work_backlog_status_update, provide initial value
Feb 10, 2026
3c795a1
recent_putgethead_counts exists as 3 individual stats items, rm it
Feb 11, 2026
b2915f5
drop compaction score sample, report {min,max}_compaction_score instead
Feb 11, 2026
2ca7735
better still report avg_compaction_score instead of min_
Feb 11, 2026
dba1ab3
Journal last compaction stats
martinsumner Feb 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 42 additions & 6 deletions src/leveled_bookie.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@
book_loglevel/2,
book_addlogs/2,
book_removelogs/2,
book_headstatus/1
book_headstatus/1,
book_status/1
]).

%% folding API
Expand Down Expand Up @@ -1316,6 +1317,24 @@ book_removelogs(Pid, ForcedLogs) ->
book_headstatus(Pid) ->
gen_server:call(Pid, head_status, infinity).

-spec book_status(pid()) -> map().
%% @doc
%% Return a proplist containing the following items:
%% * current size of the ledger cache;
%% * number of active journal files;
%% * average compaction score for the journal;
%% * current distribution of files across the ledger (e.g. count of files by level);
%% * current size of the penciller in-memory cache;
%% * penciller work backlog status;
%% * last merge time (penciller);
%% * last compaction time (journal);
%% * last compaction result (journal) e.g. files compacted and compaction score;
%% * ratio of metadata to object size (recent PUTs);
%% * PUT/GET/HEAD recent time/count metrics;
%% * mean level for recent fetches.
book_status(Pid) ->
gen_server:call(Pid, status, infinity).

%%%============================================================================
%%% gen_server callbacks
%%%============================================================================
Expand Down Expand Up @@ -1475,7 +1494,8 @@ handle_call(
State#state.cache_size,
State#state.cache_multiple,
Cache0,
State#state.penciller
State#state.penciller,
State#state.monitor
)
of
{ok, Cache} ->
Expand Down Expand Up @@ -1509,7 +1529,8 @@ handle_call({mput, ObjectSpecs, TTL}, From, State) when
State#state.cache_size,
State#state.cache_multiple,
Cache0,
State#state.penciller
State#state.penciller,
State#state.monitor
)
of
{ok, Cache} ->
Expand Down Expand Up @@ -1686,7 +1707,8 @@ handle_call({compact_journal, Timeout}, From, State) when
State#state.cache_size,
State#state.cache_multiple,
State#state.ledger_cache,
State#state.penciller
State#state.penciller,
State#state.monitor
)
of
{_, NewCache} ->
Expand Down Expand Up @@ -1740,6 +1762,8 @@ handle_call(return_actors, _From, State) ->
{reply, {ok, State#state.inker, State#state.penciller}, State};
handle_call(head_status, _From, State) ->
{reply, {State#state.head_only, State#state.head_lookup}, State};
handle_call(status, _From, State) ->
{reply, status(State), State};
handle_call(Msg, _From, State) ->
{reply, {unsupported_message, element(1, Msg)}, State}.

Expand Down Expand Up @@ -2877,7 +2901,11 @@ check_in_ledgercache(PK, Hash, Cache, loader) ->
end.

-spec maybepush_ledgercache(
pos_integer(), pos_integer(), ledger_cache(), pid()
pos_integer(),
pos_integer(),
ledger_cache(),
pid(),
leveled_monitor:monitor()
) ->
{ok | returned, ledger_cache()}.
%% @doc
Expand All @@ -2890,9 +2918,12 @@ check_in_ledgercache(PK, Hash, Cache, loader) ->
%% in the reply. Try again later when it isn't busy (and also potentially
%% implement a slow_offer state to slow down the pace at which PUTs are being
%% received)
maybepush_ledgercache(MaxCacheSize, MaxCacheMult, Cache, Penciller) ->
maybepush_ledgercache(
MaxCacheSize, MaxCacheMult, Cache, Penciller, {Monitor, _}
) ->
Tab = Cache#ledger_cache.mem,
CacheSize = ets:info(Tab, size),
leveled_monitor:add_stat(Monitor, {ledger_cache_size_update, CacheSize}),
TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize, MaxCacheMult),
if
TimeToPush ->
Expand Down Expand Up @@ -3048,6 +3079,11 @@ maybelog_snap_timing({Pid, _StatsFreq}, BookieTime, PCLTime) when
maybelog_snap_timing(_Monitor, _, _) ->
ok.

status(#state{monitor = {no_monitor, 0}}) ->
#{};
status(#state{monitor = {Monitor, _}}) ->
leveled_monitor:get_bookie_status(Monitor).

%%%============================================================================
%%% Test
%%%============================================================================
Expand Down
12 changes: 12 additions & 0 deletions src/leveled_cdb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,8 @@ starting({call, From}, {open_writer, Filename}, State) ->
starting({call, From}, {open_reader, Filename}, State) ->
leveled_log:save(State#state.log_options),
?STD_LOG(cdb02, [Filename]),
{Monitor, _} = State#state.monitor,
leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, +1}),
{Handle, Index, LastKey} = open_for_readonly(Filename, false),
State0 = State#state{
handle = Handle,
Expand All @@ -505,6 +507,8 @@ starting({call, From}, {open_reader, Filename}, State) ->
starting({call, From}, {open_reader, Filename, LastKey}, State) ->
leveled_log:save(State#state.log_options),
?STD_LOG(cdb02, [Filename]),
{Monitor, _} = State#state.monitor,
leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, +1}),
{Handle, Index, LastKey} = open_for_readonly(Filename, LastKey),
State0 = State#state{
handle = Handle,
Expand Down Expand Up @@ -650,6 +654,8 @@ writer(
) when
?IS_DEF(LP)
->
{Monitor, _} = State#state.monitor,
leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, +1}),
ok =
leveled_iclerk:clerk_hashtablecalc(
State#state.hashtree, LP, self()
Expand Down Expand Up @@ -880,6 +886,8 @@ delete_pending(
) when
?IS_DEF(FN), ?IS_DEF(IO)
->
{Monitor, _} = State#state.monitor,
leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, -1}),
?STD_LOG(cdb04, [FN, State#state.delete_point]),
close_pendingdelete(IO, FN, State#state.waste_path),
{stop, normal};
Expand All @@ -906,6 +914,10 @@ delete_pending(
),
{keep_state_and_data, [?DELETE_TIMEOUT]};
false ->
{Monitor, _} = State#state.monitor,
leveled_monitor:add_stat(
Monitor, {n_active_journal_files_update, -1}
),
?STD_LOG(cdb04, [FN, ManSQN]),
close_pendingdelete(IO, FN, State#state.waste_path),
{stop, normal}
Expand Down
125 changes: 82 additions & 43 deletions src/leveled_iclerk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -421,46 +421,69 @@ handle_cast(
CloseFun = ScoringState#scoring_state.close_fun,
SW = ScoringState#scoring_state.start_time,
ScoreParams =
{MaxRunLength, State#state.maxrunlength_compactionperc,
State#state.singlefile_compactionperc},
{
MaxRunLength,
State#state.maxrunlength_compactionperc,
State#state.singlefile_compactionperc
},
{BestRun0, Score} = assess_candidates(Candidates, ScoreParams),
?TMR_LOG(ic003, [Score, length(BestRun0)], SW),
case Score > 0.0 of
true ->
BestRun1 = sort_run(BestRun0),
print_compaction_run(BestRun1, ScoreParams),
ManifestSlice =
compact_files(
BestRun1,
CDBopts,
FilterFun,
FilterServer,
MaxSQN,
State#state.reload_strategy,
State#state.compression_method
),
FilesToDelete =
lists:map(
fun(C) ->
{
C#candidate.low_sqn,
C#candidate.filename,
C#candidate.journal,
undefined
}
end,
BestRun1
),
?STD_LOG(ic002, [length(FilesToDelete)]),
ok = CloseFun(FilterServer),
ok =
leveled_inker:ink_clerkcomplete(
State#state.inker, ManifestSlice, FilesToDelete
);
false ->
ok = CloseFun(FilterServer),
ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], [])
end,
LRL =
case Score > 0.0 of
true ->
BestRun1 = sort_run(BestRun0),
print_compaction_run(BestRun1, ScoreParams),
ManifestSlice =
compact_files(
BestRun1,
CDBopts,
FilterFun,
FilterServer,
MaxSQN,
State#state.reload_strategy,
State#state.compression_method
),
FilesToDelete =
lists:map(
fun(C) ->
{
C#candidate.low_sqn,
C#candidate.filename,
C#candidate.journal,
undefined
}
end,
BestRun1
),
?STD_LOG(ic002, [length(FilesToDelete)]),
ok = CloseFun(FilterServer),
ok =
leveled_inker:ink_clerkcomplete(
State#state.inker, ManifestSlice, FilesToDelete
),
length(BestRun0);
false ->
ok = CloseFun(FilterServer),
ok =
leveled_inker:ink_clerkcomplete(State#state.inker, [], []),
0
end,
{Monitor, _} = CDBopts#cdb_options.monitor,
{MaxScore, MeanScore} = calc_run_stats(Candidates),
{MegaST, SecST, MicroST} = ScoringState#scoring_state.start_time,
StartTimeMilli = (MegaST * 1000000 + SecST) * 1000 + (MicroST div 1000),
leveled_monitor:add_stat(
Monitor,
{
journal_compaction,
MaxScore,
MeanScore,
Score,
LRL,
os:system_time(millisecond) - StartTimeMilli,
StartTimeMilli
}
),
{noreply, State#state{scoring_state = undefined}, hibernate};
handle_cast(
{trim, PersistedSQN, ManifestAsList}, State = #state{inker = Ink}
Expand Down Expand Up @@ -584,6 +607,18 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) ->
%%% Internal functions
%%%============================================================================

-spec calc_run_stats(list(candidate())) -> {float(), float()}.
calc_run_stats(Candidates) ->
case lists:map(fun(C) -> C#candidate.compaction_perc end, Candidates) of
L when length(L) > 0 ->
{
lists:max(L),
lists:sum(L) / length(L)
};
_ ->
{0.0, 0.0}
end.

-spec check_single_file(
pid(),
leveled_inker:filterfun(),
Expand Down Expand Up @@ -714,7 +749,7 @@ fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) ->
%%
%% Although this requires many loops over the list of the candidate, as the
%% file scores have already been calculated the cost per loop should not be
%% a high burden. Reducing the maximum run length, will reduce the cost of
%% a high burden. Reducing the maximum run length, will reduce the cost if
%% this exercise should be a problem.
%%
%% The score parameters are used to produce the score of the compaction run,
Expand Down Expand Up @@ -756,7 +791,7 @@ assess_candidates(AllCandidates, Params) ->
{list(candidate()), float()}.
%% @doc
%% For a given run length, calculate the scores for all consecutive runs of
%% files, comparing the score with the best run which has beens een so far.
%% files, comparing the score with the best run which has beens seen so far.
%% The best is a tuple of the actual run of candidates, along with the score
%% achieved for that run
assess_for_runlength(RunLength, AllCandidates, Params, Best) ->
Expand All @@ -775,7 +810,7 @@ assess_for_runlength(RunLength, AllCandidates, Params, Best) ->
-spec score_run(list(candidate()), score_parameters()) -> float().
%% @doc
%% Score a run. Caluclate the avergae score across all the files in the run,
%% and deduct that from a target score. Good candidate runs for comapction
%% and deduct that from a target score. Good candidate runs for compaction
%% have larger (positive) scores. Bad candidate runs for compaction have
%% negative scores.
score_run([], _Params) ->
Expand Down Expand Up @@ -1265,7 +1300,10 @@ check_single_file_test() ->
Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4, RS),
?assertMatch(37.5, Score1),
LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> current end,
Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS),
Score2 =
check_single_file(
CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS
),
?assertMatch(100.0, Score2),
Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3, RS),
?assertMatch(37.5, Score3),
Expand Down Expand Up @@ -1414,7 +1452,8 @@ compact_empty_file_test() ->
{3, {o, "Bucket", "Key3", null}}
],
LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> replaced end,
Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS),
Score1 =
check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS),
?assert((+0.0 =:= Score1) orelse (-0.0 =:= Score1)),
ok = leveled_cdb:cdb_deletepending(CDB2),
ok = leveled_cdb:cdb_destroy(CDB2).
Expand Down
Loading