Skip to content

Commit 8bf9070

Browse files
hmmrAndriy Zavadamartinsumner
committed
leveled_bookie:status/1 (#17)
Return a map of status information about the bookie (when the bookie is not a snapshot). The content of the status may change in future releases. --------- Co-authored-by: Andriy Zavada <andriy.zavada@tiot.jp> Co-authored-by: Martin Sumner <martin.sumner@adaptip.co.uk>
1 parent cd60e82 commit 8bf9070

8 files changed

Lines changed: 593 additions & 64 deletions

File tree

src/leveled_bookie.erl

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@
7575
book_loglevel/2,
7676
book_addlogs/2,
7777
book_removelogs/2,
78-
book_headstatus/1
78+
book_headstatus/1,
79+
book_status/1
7980
]).
8081

8182
%% folding API
@@ -1316,6 +1317,24 @@ book_removelogs(Pid, ForcedLogs) ->
13161317
book_headstatus(Pid) ->
13171318
gen_server:call(Pid, head_status, infinity).
13181319

1320+
-spec book_status(pid()) -> map().
1321+
%% @doc
1322+
%% Return a proplist containing the following items:
1323+
%% * current size of the ledger cache;
1324+
%% * number of active journal files;
1325+
%% * average compaction score for the journal;
1326+
%% * current distribution of files across the ledger (e.g. count of files by level);
1327+
%% * current size of the penciller in-memory cache;
1328+
%% * penciller work backlog status;
1329+
%% * last merge time (penciller);
1330+
%% * last compaction time (journal);
1331+
%% * last compaction result (journal) e.g. files compacted and compaction score;
1332+
%% * ratio of metadata to object size (recent PUTs);
1333+
%% * PUT/GET/HEAD recent time/count metrics;
1334+
%% * mean level for recent fetches.
1335+
book_status(Pid) ->
1336+
gen_server:call(Pid, status, infinity).
1337+
13191338
%%%============================================================================
13201339
%%% gen_server callbacks
13211340
%%%============================================================================
@@ -1475,7 +1494,8 @@ handle_call(
14751494
State#state.cache_size,
14761495
State#state.cache_multiple,
14771496
Cache0,
1478-
State#state.penciller
1497+
State#state.penciller,
1498+
State#state.monitor
14791499
)
14801500
of
14811501
{ok, Cache} ->
@@ -1509,7 +1529,8 @@ handle_call({mput, ObjectSpecs, TTL}, From, State) when
15091529
State#state.cache_size,
15101530
State#state.cache_multiple,
15111531
Cache0,
1512-
State#state.penciller
1532+
State#state.penciller,
1533+
State#state.monitor
15131534
)
15141535
of
15151536
{ok, Cache} ->
@@ -1686,7 +1707,8 @@ handle_call({compact_journal, Timeout}, From, State) when
16861707
State#state.cache_size,
16871708
State#state.cache_multiple,
16881709
State#state.ledger_cache,
1689-
State#state.penciller
1710+
State#state.penciller,
1711+
State#state.monitor
16901712
)
16911713
of
16921714
{_, NewCache} ->
@@ -1740,6 +1762,8 @@ handle_call(return_actors, _From, State) ->
17401762
{reply, {ok, State#state.inker, State#state.penciller}, State};
17411763
handle_call(head_status, _From, State) ->
17421764
{reply, {State#state.head_only, State#state.head_lookup}, State};
1765+
handle_call(status, _From, State) ->
1766+
{reply, status(State), State};
17431767
handle_call(Msg, _From, State) ->
17441768
{reply, {unsupported_message, element(1, Msg)}, State}.
17451769

@@ -2877,7 +2901,11 @@ check_in_ledgercache(PK, Hash, Cache, loader) ->
28772901
end.
28782902

28792903
-spec maybepush_ledgercache(
2880-
pos_integer(), pos_integer(), ledger_cache(), pid()
2904+
pos_integer(),
2905+
pos_integer(),
2906+
ledger_cache(),
2907+
pid(),
2908+
leveled_monitor:monitor()
28812909
) ->
28822910
{ok | returned, ledger_cache()}.
28832911
%% @doc
@@ -2890,9 +2918,12 @@ check_in_ledgercache(PK, Hash, Cache, loader) ->
28902918
%% in the reply. Try again later when it isn't busy (and also potentially
28912919
%% implement a slow_offer state to slow down the pace at which PUTs are being
28922920
%% received)
2893-
maybepush_ledgercache(MaxCacheSize, MaxCacheMult, Cache, Penciller) ->
2921+
maybepush_ledgercache(
2922+
MaxCacheSize, MaxCacheMult, Cache, Penciller, {Monitor, _}
2923+
) ->
28942924
Tab = Cache#ledger_cache.mem,
28952925
CacheSize = ets:info(Tab, size),
2926+
leveled_monitor:add_stat(Monitor, {ledger_cache_size_update, CacheSize}),
28962927
TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize, MaxCacheMult),
28972928
if
28982929
TimeToPush ->
@@ -3048,6 +3079,11 @@ maybelog_snap_timing({Pid, _StatsFreq}, BookieTime, PCLTime) when
30483079
maybelog_snap_timing(_Monitor, _, _) ->
30493080
ok.
30503081

3082+
status(#state{monitor = {no_monitor, 0}}) ->
3083+
#{};
3084+
status(#state{monitor = {Monitor, _}}) ->
3085+
leveled_monitor:get_bookie_status(Monitor).
3086+
30513087
%%%============================================================================
30523088
%%% Test
30533089
%%%============================================================================

src/leveled_cdb.erl

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,8 @@ starting({call, From}, {open_writer, Filename}, State) ->
494494
starting({call, From}, {open_reader, Filename}, State) ->
495495
leveled_log:save(State#state.log_options),
496496
?STD_LOG(cdb02, [Filename]),
497+
{Monitor, _} = State#state.monitor,
498+
leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, +1}),
497499
{Handle, Index, LastKey} = open_for_readonly(Filename, false),
498500
State0 = State#state{
499501
handle = Handle,
@@ -505,6 +507,8 @@ starting({call, From}, {open_reader, Filename}, State) ->
505507
starting({call, From}, {open_reader, Filename, LastKey}, State) ->
506508
leveled_log:save(State#state.log_options),
507509
?STD_LOG(cdb02, [Filename]),
510+
{Monitor, _} = State#state.monitor,
511+
leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, +1}),
508512
{Handle, Index, LastKey} = open_for_readonly(Filename, LastKey),
509513
State0 = State#state{
510514
handle = Handle,
@@ -650,6 +654,8 @@ writer(
650654
) when
651655
?IS_DEF(LP)
652656
->
657+
{Monitor, _} = State#state.monitor,
658+
leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, +1}),
653659
ok =
654660
leveled_iclerk:clerk_hashtablecalc(
655661
State#state.hashtree, LP, self()
@@ -880,6 +886,8 @@ delete_pending(
880886
) when
881887
?IS_DEF(FN), ?IS_DEF(IO)
882888
->
889+
{Monitor, _} = State#state.monitor,
890+
leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, -1}),
883891
?STD_LOG(cdb04, [FN, State#state.delete_point]),
884892
close_pendingdelete(IO, FN, State#state.waste_path),
885893
{stop, normal};
@@ -906,6 +914,10 @@ delete_pending(
906914
),
907915
{keep_state_and_data, [?DELETE_TIMEOUT]};
908916
false ->
917+
{Monitor, _} = State#state.monitor,
918+
leveled_monitor:add_stat(
919+
Monitor, {n_active_journal_files_update, -1}
920+
),
909921
?STD_LOG(cdb04, [FN, ManSQN]),
910922
close_pendingdelete(IO, FN, State#state.waste_path),
911923
{stop, normal}

src/leveled_iclerk.erl

Lines changed: 82 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -421,46 +421,69 @@ handle_cast(
421421
CloseFun = ScoringState#scoring_state.close_fun,
422422
SW = ScoringState#scoring_state.start_time,
423423
ScoreParams =
424-
{MaxRunLength, State#state.maxrunlength_compactionperc,
425-
State#state.singlefile_compactionperc},
424+
{
425+
MaxRunLength,
426+
State#state.maxrunlength_compactionperc,
427+
State#state.singlefile_compactionperc
428+
},
426429
{BestRun0, Score} = assess_candidates(Candidates, ScoreParams),
427430
?TMR_LOG(ic003, [Score, length(BestRun0)], SW),
428-
case Score > 0.0 of
429-
true ->
430-
BestRun1 = sort_run(BestRun0),
431-
print_compaction_run(BestRun1, ScoreParams),
432-
ManifestSlice =
433-
compact_files(
434-
BestRun1,
435-
CDBopts,
436-
FilterFun,
437-
FilterServer,
438-
MaxSQN,
439-
State#state.reload_strategy,
440-
State#state.compression_method
441-
),
442-
FilesToDelete =
443-
lists:map(
444-
fun(C) ->
445-
{
446-
C#candidate.low_sqn,
447-
C#candidate.filename,
448-
C#candidate.journal,
449-
undefined
450-
}
451-
end,
452-
BestRun1
453-
),
454-
?STD_LOG(ic002, [length(FilesToDelete)]),
455-
ok = CloseFun(FilterServer),
456-
ok =
457-
leveled_inker:ink_clerkcomplete(
458-
State#state.inker, ManifestSlice, FilesToDelete
459-
);
460-
false ->
461-
ok = CloseFun(FilterServer),
462-
ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], [])
463-
end,
431+
LRL =
432+
case Score > 0.0 of
433+
true ->
434+
BestRun1 = sort_run(BestRun0),
435+
print_compaction_run(BestRun1, ScoreParams),
436+
ManifestSlice =
437+
compact_files(
438+
BestRun1,
439+
CDBopts,
440+
FilterFun,
441+
FilterServer,
442+
MaxSQN,
443+
State#state.reload_strategy,
444+
State#state.compression_method
445+
),
446+
FilesToDelete =
447+
lists:map(
448+
fun(C) ->
449+
{
450+
C#candidate.low_sqn,
451+
C#candidate.filename,
452+
C#candidate.journal,
453+
undefined
454+
}
455+
end,
456+
BestRun1
457+
),
458+
?STD_LOG(ic002, [length(FilesToDelete)]),
459+
ok = CloseFun(FilterServer),
460+
ok =
461+
leveled_inker:ink_clerkcomplete(
462+
State#state.inker, ManifestSlice, FilesToDelete
463+
),
464+
length(BestRun0);
465+
false ->
466+
ok = CloseFun(FilterServer),
467+
ok =
468+
leveled_inker:ink_clerkcomplete(State#state.inker, [], []),
469+
0
470+
end,
471+
{Monitor, _} = CDBopts#cdb_options.monitor,
472+
{MaxScore, MeanScore} = calc_run_stats(Candidates),
473+
{MegaST, SecST, MicroST} = ScoringState#scoring_state.start_time,
474+
StartTimeMilli = (MegaST * 1000000 + SecST) * 1000 + (MicroST div 1000),
475+
leveled_monitor:add_stat(
476+
Monitor,
477+
{
478+
journal_compaction,
479+
MaxScore,
480+
MeanScore,
481+
Score,
482+
LRL,
483+
os:system_time(millisecond) - StartTimeMilli,
484+
StartTimeMilli
485+
}
486+
),
464487
{noreply, State#state{scoring_state = undefined}, hibernate};
465488
handle_cast(
466489
{trim, PersistedSQN, ManifestAsList}, State = #state{inker = Ink}
@@ -584,6 +607,18 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) ->
584607
%%% Internal functions
585608
%%%============================================================================
586609

610+
-spec calc_run_stats(list(candidate())) -> {float(), float()}.
611+
calc_run_stats(Candidates) ->
612+
case lists:map(fun(C) -> C#candidate.compaction_perc end, Candidates) of
613+
L when length(L) > 0 ->
614+
{
615+
lists:max(L),
616+
lists:sum(L) / length(L)
617+
};
618+
_ ->
619+
{0.0, 0.0}
620+
end.
621+
587622
-spec check_single_file(
588623
pid(),
589624
leveled_inker:filterfun(),
@@ -714,7 +749,7 @@ fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) ->
714749
%%
715750
%% Although this requires many loops over the list of the candidate, as the
716751
%% file scores have already been calculated the cost per loop should not be
717-
%% a high burden. Reducing the maximum run length, will reduce the cost of
752+
%% a high burden. Reducing the maximum run length, will reduce the cost if
718753
%% this exercise should be a problem.
719754
%%
720755
%% The score parameters are used to produce the score of the compaction run,
@@ -756,7 +791,7 @@ assess_candidates(AllCandidates, Params) ->
756791
{list(candidate()), float()}.
757792
%% @doc
758793
%% For a given run length, calculate the scores for all consecutive runs of
759-
%% files, comparing the score with the best run which has beens een so far.
794+
%% files, comparing the score with the best run which has beens seen so far.
760795
%% The best is a tuple of the actual run of candidates, along with the score
761796
%% achieved for that run
762797
assess_for_runlength(RunLength, AllCandidates, Params, Best) ->
@@ -775,7 +810,7 @@ assess_for_runlength(RunLength, AllCandidates, Params, Best) ->
775810
-spec score_run(list(candidate()), score_parameters()) -> float().
776811
%% @doc
777812
%% Score a run. Caluclate the avergae score across all the files in the run,
778-
%% and deduct that from a target score. Good candidate runs for comapction
813+
%% and deduct that from a target score. Good candidate runs for compaction
779814
%% have larger (positive) scores. Bad candidate runs for compaction have
780815
%% negative scores.
781816
score_run([], _Params) ->
@@ -1265,7 +1300,10 @@ check_single_file_test() ->
12651300
Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4, RS),
12661301
?assertMatch(37.5, Score1),
12671302
LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> current end,
1268-
Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS),
1303+
Score2 =
1304+
check_single_file(
1305+
CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS
1306+
),
12691307
?assertMatch(100.0, Score2),
12701308
Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3, RS),
12711309
?assertMatch(37.5, Score3),
@@ -1414,7 +1452,8 @@ compact_empty_file_test() ->
14141452
{3, {o, "Bucket", "Key3", null}}
14151453
],
14161454
LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> replaced end,
1417-
Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS),
1455+
Score1 =
1456+
check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS),
14181457
?assert((+0.0 =:= Score1) orelse (-0.0 =:= Score1)),
14191458
ok = leveled_cdb:cdb_deletepending(CDB2),
14201459
ok = leveled_cdb:cdb_destroy(CDB2).

0 commit comments

Comments
 (0)