Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 114 additions & 22 deletions src/fabric/src/fabric_view.erl
Original file line number Diff line number Diff line change
Expand Up @@ -309,36 +309,41 @@ get_next_row(State) ->
Counters1 = fabric_dict:update_counter(Worker, -1, Counters0),
{Row, State#collector{rows = Rest, counters = Counters1}}.

reduce_row_dict_take(Key, Dict, <<"raw">>) ->
dict:take(Key, Dict);
reduce_row_dict_take(Key, Dict, _Collation) ->
IsEq = fun(K, _) -> couch_ejson_compare:less(K, Key) =:= 0 end,
KVs = dict:to_list(dict:filter(IsEq, Dict)),
case KVs of
reduce_row_dict_take(Key, RowMap, <<"raw">>) ->
maps:take(Key, RowMap);
reduce_row_dict_take(Key, RowMap, _Collation) ->
EqKeys = [K || K <- maps:keys(RowMap), couch_ejson_compare:less(K, Key) =:= 0],
case EqKeys of
[] ->
error;
[_ | _] ->
{Keys, Vals} = lists:unzip(KVs),
NewDict = lists:foldl(
fun(K, Acc) ->
dict:erase(K, Acc)
end,
Dict,
Keys
),
{lists:flatten(Vals), NewDict}
Vals = [map_get(K, RowMap) || K <- EqKeys],
{lists:flatten(Vals), maps:without(EqKeys, RowMap)}
end.

%% TODO: rectify nil <-> undefined discrepancies
find_next_key(nil, Dir, Collation, RowDict) ->
find_next_key(undefined, Dir, Collation, RowDict);
find_next_key(undefined, Dir, Collation, RowDict) ->
CmpFun = fun(A, B) -> compare(Dir, Collation, A, B) end,
case lists:sort(CmpFun, dict:fetch_keys(RowDict)) of
% Note: we need the smallest key only here. Before this used to effectively
% be hd(lists:sort()). With a potentially expensive collator comparison
% function we'd like to avoid resorting instead we just get the minimum.
case maps:keys(RowDict) of
[] ->
throw(complete);
[Key | _] ->
{Key, nil}
[First | Rest] ->
Less = fun(A, B) -> compare(Dir, Collation, A, B) end,
MinKey = lists:foldl(
fun(Key, Min) ->
case Less(Key, Min) of
true -> Key;
false -> Min
end
end,
First,
Rest
),
{MinKey, nil}
end;
find_next_key([], _, _, _) ->
throw(complete);
Expand Down Expand Up @@ -812,9 +817,9 @@ t_get_next_row_reduce(_) ->
{view_row, #{value => value3, worker => {worker2, W2From}}}
],
Values = [[value1], [value2], [value3]],
RowDict1 = dict:from_list([{key1, ViewRows1}, {key2, undefined}, {key3, undefined}]),
RowDict2 = dict:from_list([{key1, ViewRows2}, {key2, undefined}, {key3, undefined}]),
RowDict3 = dict:from_list([{key2, undefined}, {key3, undefined}]),
RowDict1 = maps:from_list([{key1, ViewRows1}, {key2, undefined}, {key3, undefined}]),
RowDict2 = maps:from_list([{key1, ViewRows2}, {key2, undefined}, {key3, undefined}]),
RowDict3 = maps:from_list([{key2, undefined}, {key3, undefined}]),
Language = <<"language">>,
Collation = <<"raw">>,
Counters1 = [{worker1, 3}, {worker2, 5}],
Expand Down Expand Up @@ -865,4 +870,91 @@ t_get_next_row_reduce(_) ->
?assertEqual({Row1, State3}, get_next_row(State1)),
?assertEqual({Row2, State4}, get_next_row(State2)).

find_next_key_empty_test() ->
?assertThrow(complete, find_next_key(undefined, fwd, <<"raw">>, #{})),
?assertThrow(complete, find_next_key(nil, fwd, <<"raw">>, #{})).

find_next_key_min_raw_fwd_test() ->
RowDict = maps:from_list([{3, a}, {1, b}, {2, c}]),
?assertEqual({1, nil}, find_next_key(undefined, fwd, <<"raw">>, RowDict)),
% a nil key list delegates to the undefined clause
?assertEqual({1, nil}, find_next_key(nil, fwd, <<"raw">>, RowDict)).

find_next_key_min_raw_rev_test() ->
RowDict = maps:from_list([{3, a}, {1, b}, {2, c}]),
?assertEqual({3, nil}, find_next_key(undefined, rev, <<"raw">>, RowDict)).

% Check ICU collator. Assert that sort and find_next_key does the same thing
%
find_next_key_min_collation_test() ->
% Not these are different under ICU vs raw Erlang
% M, a, z (Erlang)
% a, M, z (ICU)
Keys = [<<"z">>, <<"M">>, <<"a">>],
RowDict = maps:from_list([{K, v} || K <- Keys]),
lists:foreach(
fun(Dir) ->
Cmp = fun(A, B) -> compare(Dir, <<"icu">>, A, B) end,
[Expected | _] = lists:sort(Cmp, maps:keys(RowDict)),
?assertEqual({Expected, nil}, find_next_key(undefined, Dir, <<"icu">>, RowDict))
end,
[fwd, rev]
),
% Sanity check
IcuMin = element(1, find_next_key(undefined, fwd, <<"icu">>, RowDict)),
RawMin = element(1, find_next_key(undefined, fwd, <<"raw">>, RowDict)),
?assertEqual(<<"a">>, IcuMin),
?assertEqual(<<"M">>, RawMin),
% They do different things
?assertNotEqual(IcuMin, RawMin).

% Check min scan against sorting and taking the head
find_next_key_matches_sort_test() ->
Keys = [<<"k5">>, <<"k1">>, <<"k3">>, <<"k2">>, <<"k4">>],
RowDict = maps:from_list([{K, v} || K <- Keys]),
lists:foreach(
fun(Dir) ->
CmpFun = fun(A, B) -> compare(Dir, <<"raw">>, A, B) end,
[Expected | _] = lists:sort(CmpFun, maps:keys(RowDict)),
?assertEqual({Expected, nil}, find_next_key(undefined, Dir, <<"raw">>, RowDict))
end,
[fwd, rev]
).

% A key list is returned head-first and untouched.
find_next_key_explicit_keys_test() ->
?assertEqual({k1, [k2, k3]}, find_next_key([k1, k2, k3], fwd, <<"raw">>, #{})),
?assertThrow(complete, find_next_key([], fwd, <<"raw">>, #{})).

% Raw collation: keys match exactly
reduce_row_dict_take_raw_test() ->
RowMap = #{<<"a">> => [r1, r2], <<"b">> => [r3]},
?assertEqual({[r1, r2], #{<<"b">> => [r3]}}, reduce_row_dict_take(<<"a">>, RowMap, <<"raw">>)).

reduce_row_dict_take_raw_missing_test() ->
RowMap = #{<<"a">> => [r1]},
?assertEqual(error, reduce_row_dict_take(<<"x">>, RowMap, <<"raw">>)).

% With ICU use nfc and nfd forms of "é", they should match as equivalent but
% not equal in raw. We're making sure ICU is doing ICU things here.
reduce_row_dict_take_collation_test() ->
Nfc = <<195, 169>>,
Nfd = <<101, 204, 129>>,
% Sanity check
?assertEqual(0, couch_ejson_compare:less(Nfc, Nfd)),
RowMap = #{Nfc => [r1], Nfd => [r2], <<"z">> => [r3]},
{Vals, Rest} = reduce_row_dict_take(Nfc, RowMap, <<"icu">>),
?assertEqual([r1, r2], lists:sort(Vals)),
?assertEqual(#{<<"z">> => [r3]}, Rest).

% With ICU and key that only equals itself
reduce_row_dict_take_collation_single_test() ->
RowMap = #{<<"a">> => [r1], <<"b">> => [r2]},
?assertEqual({[r1], #{<<"b">> => [r2]}}, reduce_row_dict_take(<<"a">>, RowMap, <<"icu">>)).

% With ICU but key is missing altogether
reduce_row_dict_take_collation_missing_test() ->
RowMap = #{<<"a">> => [r1], <<"b">> => [r2]},
?assertEqual(error, reduce_row_dict_take(<<"zzz">>, RowMap, <<"icu">>)).

-endif.
12 changes: 6 additions & 6 deletions src/fabric/src/fabric_view_reduce.erl
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ go2(DbName, DDocId, VName, Workers, {red, {_, Lang, View}, _} = VInfo, Args, Cal
os_proc = OsProc,
reducer = RedSrc,
collation = couch_util:get_value(<<"collation">>, View#mrview.options),
rows = dict:new(),
rows = #{},
user_acc = Acc0,
update_seq =
case UpdateSeq of
Expand Down Expand Up @@ -125,7 +125,7 @@ handle_row(Row0, {Worker, _} = Source, State) ->
true = fabric_dict:is_key(Worker, Counters0),
Row = fabric_view_row:set_worker(Row0, Source),
Key = fabric_view_row:get_key(Row),
Rows = dict:append(Key, Row, Rows0),
Rows = maps:update_with(Key, fun(Rs) -> Rs ++ [Row] end, [Row], Rows0),
C1 = fabric_dict:update_counter(Worker, 1, Counters0),
State1 = State#collector{rows = Rows, counters = C1},
fabric_view:maybe_send_row(State1).
Expand Down Expand Up @@ -284,12 +284,12 @@ t_handle_message_row(_) ->
Counters2 = [{worker, 4}],
Row1 = #view_row{key = key1},
Row11 = #view_row{key = key1, worker = Worker},
Rows1 = dict:from_list([{key1, []}, {key2, []}, {key3, []}]),
Rows3 = dict:from_list([{key1, [Row11]}, {key2, []}, {key3, []}]),
Rows1 = maps:from_list([{key1, []}, {key2, []}, {key3, []}]),
Rows3 = maps:from_list([{key1, [Row11]}, {key2, []}, {key3, []}]),
Row2 = {view_row, #{key => key1}},
Row21 = {view_row, #{key => key1, worker => Worker}},
Rows2 = dict:from_list([{key1, []}, {key2, []}, {key3, []}]),
Rows4 = dict:from_list([{key1, [Row21]}, {key2, []}, {key3, []}]),
Rows2 = maps:from_list([{key1, []}, {key2, []}, {key3, []}]),
Rows4 = maps:from_list([{key1, [Row21]}, {key2, []}, {key3, []}]),
State1 = #collector{counters = Counters1, rows = Rows1},
State2 = #collector{counters = Counters1, rows = Rows2},
State3 = #collector{counters = Counters2, rows = Rows3},
Expand Down