Skip to content

Commit f44784c

Browse files
mjcCopilot
andcommitted
Refactor: extract magic numbers, add stacktraces, deduplicate code, add tests
- Extract size thresholds in rules.ex into @size_60_gib/@size_40_gib/@size_25_gib module attributes - Add __STACKTRACE__ to 3 bare rescue blocks in crf_search.ex for production debugging - Log dashboard query timeouts in media.ex (previously swallowed silently) - Consolidate duplicate Repo.insert/stale-error handlers in video_upsert.ex into shared do_insert/2 - Extract safe_to_existing_atom/1 helper from duplicate filter patterns in media.ex - Add crash_reason to Logger metadata config - Add property-based tests for Rules (vmaf_target monotonicity, build_args deduplication) - Expand encoder Broadway producer tests (recovery threshold, dispatch_available) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent bd83b50 commit f44784c

7 files changed

Lines changed: 282 additions & 71 deletions

File tree

config/config.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ config :logger, :console,
117117
:state,
118118
:exit_code,
119119
:result,
120-
:video_info
120+
:video_info,
121+
:crash_reason
121122
]
122123

123124
# Use Jason for JSON parsing in Phoenix

lib/reencodarr/ab_av1/crf_search.ex

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,10 @@ defmodule Reencodarr.AbAv1.CrfSearch do
309309
{:noreply, %{state | partial_line_buffer: "", output_buffer: new_output_buffer}}
310310
rescue
311311
e ->
312-
Logger.error("CrfSearch: Error processing line '#{full_line}': #{Exception.message(e)}")
312+
Logger.error("CrfSearch: Error processing line '#{full_line}': #{Exception.message(e)}",
313+
crash_reason: {e, __STACKTRACE__}
314+
)
315+
313316
{:noreply, %{state | partial_line_buffer: "", output_buffer: [full_line | output_buffer]}}
314317
end
315318
end
@@ -352,7 +355,9 @@ defmodule Reencodarr.AbAv1.CrfSearch do
352355
end
353356
rescue
354357
e ->
355-
Logger.error("CrfSearch: Error in exit_status=0 handler: #{Exception.message(e)}")
358+
Logger.error("CrfSearch: Error in exit_status=0 handler: #{Exception.message(e)}",
359+
crash_reason: {e, __STACKTRACE__}
360+
)
356361
end
357362

358363
perform_crf_search_cleanup(state)
@@ -376,7 +381,10 @@ defmodule Reencodarr.AbAv1.CrfSearch do
376381
handle_crf_search_failure(video, target_vmaf, exit_code, command_line, full_output, state)
377382
rescue
378383
e ->
379-
Logger.error("CrfSearch: Error in exit_status handler: #{Exception.message(e)}")
384+
Logger.error("CrfSearch: Error in exit_status handler: #{Exception.message(e)}",
385+
crash_reason: {e, __STACKTRACE__}
386+
)
387+
380388
perform_crf_search_cleanup(state)
381389
end
382390
end

lib/reencodarr/media.ex

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1130,18 +1130,22 @@ defmodule Reencodarr.Media do
11301130

11311131
defp apply_sort(query, field, dir), do: from(v in query, order_by: [{^dir, field(v, ^field)}])
11321132

1133-
defp maybe_filter_state(query, nil), do: query
1134-
defp maybe_filter_state(query, ""), do: query
1133+
defp safe_to_existing_atom(value) when is_atom(value), do: {:ok, value}
11351134

1136-
defp maybe_filter_state(query, state) when is_binary(state) do
1137-
state_atom = String.to_existing_atom(state)
1138-
from(v in query, where: v.state == ^state_atom)
1135+
defp safe_to_existing_atom(value) when is_binary(value) do
1136+
{:ok, String.to_existing_atom(value)}
11391137
rescue
1140-
ArgumentError -> query
1138+
ArgumentError -> :error
11411139
end
11421140

1143-
defp maybe_filter_state(query, state) when is_atom(state) do
1144-
from(v in query, where: v.state == ^state)
1141+
defp maybe_filter_state(query, nil), do: query
1142+
defp maybe_filter_state(query, ""), do: query
1143+
1144+
defp maybe_filter_state(query, state) do
1145+
case safe_to_existing_atom(state) do
1146+
{:ok, atom} -> from(v in query, where: v.state == ^atom)
1147+
:error -> query
1148+
end
11451149
end
11461150

11471151
defp maybe_filter_search(query, nil), do: query
@@ -1154,16 +1158,13 @@ defmodule Reencodarr.Media do
11541158

11551159
defp maybe_filter_service_type(query, nil), do: query
11561160

1157-
defp maybe_filter_service_type(query, service) when is_binary(service) do
1158-
atom = String.to_existing_atom(service)
1159-
from(v in query, where: v.service_type == ^atom)
1160-
rescue
1161-
ArgumentError -> query
1161+
defp maybe_filter_service_type(query, service) do
1162+
case safe_to_existing_atom(service) do
1163+
{:ok, atom} -> from(v in query, where: v.service_type == ^atom)
1164+
:error -> query
1165+
end
11621166
end
11631167

1164-
defp maybe_filter_service_type(query, service) when is_atom(service),
1165-
do: from(v in query, where: v.service_type == ^service)
1166-
11671168
defp maybe_filter_hdr(query, nil), do: query
11681169
defp maybe_filter_hdr(query, true), do: from(v in query, where: not is_nil(v.hdr))
11691170
defp maybe_filter_hdr(query, false), do: from(v in query, where: is_nil(v.hdr))
@@ -1313,10 +1314,23 @@ defmodule Reencodarr.Media do
13131314

13141315
Map.merge(video_stats || get_default_video_stats(), vmaf_stats || get_default_vmaf_stats())
13151316
rescue
1316-
DBConnection.ConnectionError -> get_default_stats()
1317+
e in DBConnection.ConnectionError ->
1318+
Logger.warning(
1319+
"Dashboard stats query failed with connection error: #{Exception.message(e)}"
1320+
)
1321+
1322+
get_default_stats()
13171323
catch
1318-
:exit, {:timeout, _} -> get_default_stats()
1319-
:exit, {%DBConnection.ConnectionError{}, _} -> get_default_stats()
1324+
:exit, {:timeout, _} ->
1325+
Logger.warning("Dashboard stats query timed out after #{timeout}ms")
1326+
get_default_stats()
1327+
1328+
:exit, {%DBConnection.ConnectionError{} = e, _} ->
1329+
Logger.warning(
1330+
"Dashboard stats query failed with connection error: #{Exception.message(e)}"
1331+
)
1332+
1333+
get_default_stats()
13201334
end
13211335

13221336
defp get_default_video_stats do

lib/reencodarr/media/video_upsert.ex

Lines changed: 15 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -242,18 +242,7 @@ defmodule Reencodarr.Media.VideoUpsert do
242242
{:replace_all_except, [atom()]} | Ecto.Query.t()
243243
) :: {:ok, Video.t()} | {:error, Ecto.Changeset.t()}
244244
defp perform_video_upsert(attrs, on_conflict_query) do
245-
result =
246-
%Video{}
247-
|> Video.changeset(attrs)
248-
|> Repo.insert(
249-
on_conflict: on_conflict_query,
250-
conflict_target: :path,
251-
stale_error_field: :updated_at,
252-
returning: true
253-
)
254-
255-
# Return the result directly, don't wrap in transaction
256-
result
245+
do_insert(attrs, on_conflict_query)
257246
end
258247

259248
@spec perform_single_upsert_in_batch(%{String.t() => any()}) ::
@@ -262,22 +251,12 @@ defmodule Reencodarr.Media.VideoUpsert do
262251
conflict_except = determine_conflict_except_fields(attrs)
263252
on_conflict_query = build_on_conflict_query(attrs, conflict_except)
264253

265-
result =
266-
%Video{}
267-
|> Video.changeset(attrs)
268-
|> Repo.insert(
269-
on_conflict: on_conflict_query,
270-
conflict_target: :path,
271-
stale_error_field: :updated_at,
272-
returning: true
273-
)
274-
275-
case result do
254+
case do_insert(attrs, on_conflict_query) do
276255
{:ok, video} ->
277256
handle_successful_upsert(video)
278257

279258
{:error, %Ecto.Changeset{errors: [updated_at: {"is stale", _}]} = changeset} ->
280-
handle_stale_update_error_in_batch(changeset, attrs)
259+
handle_stale_update_error(changeset, attrs)
281260

282261
{:error, changeset} ->
283262
path = Map.get(attrs, "path", "unknown")
@@ -286,23 +265,6 @@ defmodule Reencodarr.Media.VideoUpsert do
286265
end
287266
end
288267

289-
@spec handle_stale_update_error_in_batch(
290-
Ecto.Changeset.t(),
291-
%{String.t() => any()}
292-
) :: {:ok, Video.t()} | {:error, Ecto.Changeset.t()}
293-
defp handle_stale_update_error_in_batch(changeset, attrs) do
294-
# This is expected when dateAdded is not newer than updated_at - treat as success (skip)
295-
path = Map.get(attrs, "path")
296-
Logger.debug("Skipping update for #{path} in batch - dateAdded not newer than updated_at")
297-
298-
# Return the existing video instead of an error
299-
case Repo.get_by(Video, path: path) do
300-
# Shouldn't happen, but handle gracefully
301-
nil -> {:error, changeset}
302-
existing_video -> {:ok, existing_video}
303-
end
304-
end
305-
306268
@spec handle_upsert_result(
307269
{:ok, Video.t()} | {:error, Ecto.Changeset.t()} | {:error, any()},
308270
%{String.t() => any()}
@@ -334,14 +296,22 @@ defmodule Reencodarr.Media.VideoUpsert do
334296
{:ok, video}
335297
end
336298

299+
defp do_insert(attrs, on_conflict_query) do
300+
%Video{}
301+
|> Video.changeset(attrs)
302+
|> Repo.insert(
303+
on_conflict: on_conflict_query,
304+
conflict_target: :path,
305+
stale_error_field: :updated_at,
306+
returning: true
307+
)
308+
end
309+
337310
defp handle_stale_update_error(changeset, attrs) do
338-
# This is expected when dateAdded is not newer than updated_at - treat as success (skip)
339311
path = Map.get(attrs, "path")
340-
Logger.debug("Skipping update for #{path} - dateAdded not newer than updated_at")
312+
Logger.debug("Skipping update for #{path} dateAdded not newer than updated_at")
341313

342-
# Return the existing video instead of an error
343314
case Repo.get_by(Video, path: path) do
344-
# Shouldn't happen, but handle gracefully
345315
nil -> {:error, changeset}
346316
existing_video -> {:ok, existing_video}
347317
end

lib/reencodarr/rules.ex

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ defmodule Reencodarr.Rules do
99
alias Reencodarr.Core.Parsers
1010
alias Reencodarr.Media
1111

12+
# Size thresholds for VMAF target selection (in bytes)
13+
@size_60_gib 60 * 1024 * 1024 * 1024
14+
@size_40_gib 40 * 1024 * 1024 * 1024
15+
@size_25_gib 25 * 1024 * 1024 * 1024
16+
1217
@doc """
1318
Build arguments for ab-av1 commands.
1419
@@ -404,9 +409,9 @@ defmodule Reencodarr.Rules do
404409
95
405410
"""
406411
@spec vmaf_target(map()) :: integer()
407-
def vmaf_target(%{size: size}) when is_integer(size) and size > 60 * 1024 * 1024 * 1024, do: 91
408-
def vmaf_target(%{size: size}) when is_integer(size) and size > 40 * 1024 * 1024 * 1024, do: 92
409-
def vmaf_target(%{size: size}) when is_integer(size) and size > 25 * 1024 * 1024 * 1024, do: 94
412+
def vmaf_target(%{size: size}) when is_integer(size) and size > @size_60_gib, do: 91
413+
def vmaf_target(%{size: size}) when is_integer(size) and size > @size_40_gib, do: 92
414+
def vmaf_target(%{size: size}) when is_integer(size) and size > @size_25_gib, do: 94
410415
def vmaf_target(_video), do: 95
411416

412417
@doc """

test/reencodarr/encoder/broadway/producer_test.exs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,4 +186,73 @@ defmodule Reencodarr.Encoder.Broadway.ProducerTest do
186186
refute Producer.should_attempt_recovery?(1801)
187187
end
188188
end
189+
190+
describe "recovery threshold boundary" do
191+
alias Reencodarr.Encoder.Broadway.Producer
192+
193+
test "exactly at threshold triggers recovery" do
194+
assert Producer.should_attempt_recovery?(900)
195+
end
196+
197+
test "one below threshold does not trigger" do
198+
refute Producer.should_attempt_recovery?(899)
199+
end
200+
201+
test "one above threshold does not trigger" do
202+
refute Producer.should_attempt_recovery?(901)
203+
end
204+
205+
test "zero never triggers" do
206+
refute Producer.should_attempt_recovery?(0)
207+
end
208+
209+
test "negative values never trigger" do
210+
refute Producer.should_attempt_recovery?(-1)
211+
refute Producer.should_attempt_recovery?(-900)
212+
end
213+
end
214+
215+
describe "update_consecutive_count/2 exhaustive" do
216+
alias Reencodarr.Encoder.Broadway.Producer
217+
218+
test "all non-timeout statuses reset counter" do
219+
for status <- [:available, :busy] do
220+
assert Producer.update_consecutive_count(999, status) == 0,
221+
"Expected #{status} to reset counter"
222+
end
223+
end
224+
225+
test "timeout from zero increments to one" do
226+
assert Producer.update_consecutive_count(0, :timeout) == 1
227+
end
228+
229+
test "timeout accumulates linearly" do
230+
result =
231+
Enum.reduce(1..10, 0, fn _i, acc ->
232+
Producer.update_consecutive_count(acc, :timeout)
233+
end)
234+
235+
assert result == 10
236+
end
237+
238+
test "reset after accumulation returns to zero" do
239+
count =
240+
0
241+
|> Producer.update_consecutive_count(:timeout)
242+
|> Producer.update_consecutive_count(:timeout)
243+
|> Producer.update_consecutive_count(:timeout)
244+
|> Producer.update_consecutive_count(:available)
245+
246+
assert count == 0
247+
end
248+
end
249+
250+
describe "dispatch_available/0" do
251+
alias Reencodarr.Encoder.Broadway.Producer
252+
253+
test "returns error when producer is not running" do
254+
# Producer is not started in test environment
255+
assert {:error, :producer_not_found} = Producer.dispatch_available()
256+
end
257+
end
189258
end

0 commit comments

Comments
 (0)