Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions src/dreyfus/src/dreyfus_index_updater.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ update(IndexPid, Index) ->
erlang:put(io_priority, {search, DbName, IndexName}),
{ok, Db} = couch_db:open_int(DbName, []),
try
IdxPurgeSeq = get_local_doc_purge_seq(Db, Index),
DbPurgeSeq = couch_db:get_purge_seq(Db),
TotalPurgeChanges = DbPurgeSeq - IdxPurgeSeq,
TotalUpdateChanges = couch_db:count_changes_since(Db, CurSeq),
TotalPurgeChanges = count_pending_purged_docs_since(Db, IndexPid),
TotalChanges = TotalUpdateChanges + TotalPurgeChanges,

couch_task_status:add_task([
Expand All @@ -49,7 +51,7 @@ update(IndexPid, Index) ->

%ExcludeIdRevs is [{Id1, Rev1}, {Id2, Rev2}, ...]
%The Rev is the final Rev, not purged Rev.
{ok, ExcludeIdRevs} = purge_index(Db, IndexPid, Index),
{ok, ExcludeIdRevs} = purge_index(Db, IndexPid, Index, IdxPurgeSeq),
%% compute on all docs modified since we last computed.

NewCurSeq = couch_db:get_update_seq(Db),
Expand Down Expand Up @@ -87,8 +89,12 @@ load_docs(FDI, {I, IndexPid, Db, Proc, Total, LastCommitTime, ExcludeIdRevs} = A
{ok, setelement(1, Acc, I + 1)}
end.

purge_index(Db, IndexPid, Index) ->
{ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid),
purge_index(Db, IndexPid, Index, IdxPurgeSeq) ->
% Note: we're not using IdxPurgeSeq = clouseau_rpc:get_purge_seq/1 as that
% might return the stale (committed) value and not the newly updated purge
% seq we just set in maybe_create_local_purge_doc/3 on new doc creation.
% Using an old value could result in the invalid_start_purge_seq exception
% being raised when folding over purged infos.
Proc = get_os_process(Index#index.def_lang),
try
true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def]),
Expand Down Expand Up @@ -120,11 +126,6 @@ purge_index(Db, IndexPid, Index) ->
ret_os_process(Proc)
end.

count_pending_purged_docs_since(Db, IndexPid) ->
DbPurgeSeq = couch_db:get_purge_seq(Db),
{ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid),
DbPurgeSeq - IdxPurgeSeq.

update_or_delete_index(IndexPid, Db, DI, Proc) ->
#doc_info{id = Id, revs = [#rev_info{deleted = Del} | _]} = DI,
case Del of
Expand Down Expand Up @@ -152,6 +153,13 @@ update_local_doc(Db, Index, PurgeSeq) ->
DocContent = dreyfus_util:get_local_purge_doc_body(Db, DocId, PurgeSeq, Index),
couch_db:update_doc(Db, DocContent, []).

get_local_doc_purge_seq(Db, Index) ->
DocId = dreyfus_util:get_local_purge_doc_id(Index#index.sig),
% We're implicitly asserting this purge checkpoint doc should exist. This is
% created either on open or during compaction in on_compact handler
{ok, #doc{body = {[_ | _] = Props}}} = couch_db:open_doc(Db, DocId),
couch_util:get_value(<<"purge_seq">>, Props).

update_task(NumChanges) ->
[Changes, Total] = couch_task_status:get([changes_done, total_changes]),
Changes2 = Changes + NumChanges,
Expand Down