Skip to content

Commit c1377e8

Browse files
add worker pool and health check
1 parent 89b21ad commit c1377e8

7 files changed

Lines changed: 655 additions & 1 deletion

File tree

apps/kafkaesque_core/lib/kafkaesque/core/application.ex

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ defmodule Kafkaesque.Core.Application do
1010
Kafkaesque.Topic.Supervisor,
1111
{Phoenix.PubSub, name: Kafkaesque.PubSub, adapter: Phoenix.PubSub.PG2},
1212
Kafkaesque.Telemetry.Supervisor,
13-
Kafkaesque.Telemetry
13+
Kafkaesque.Telemetry,
14+
Kafkaesque.Health.Monitor,
15+
Kafkaesque.Workers.PoolSupervisor
1416
]
1517

1618
opts = [strategy: :one_for_one, name: Kafkaesque.Core.Supervisor]
Lines changed: 336 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,336 @@
1+
defmodule Kafkaesque.Health.Monitor do
2+
@moduledoc """
3+
Continuous health monitoring for Kafkaesque with automatic status updates.
4+
Performs periodic health checks and maintains system health state.
5+
6+
## Features
7+
- Automatic health checks every 5 seconds
8+
- Failure thresholds before marking unhealthy
9+
- Custom health check registration
10+
- Integration with monitoring systems
11+
"""
12+
13+
use GenServer
14+
require Logger
15+
16+
@check_interval 5_000
17+
@unhealthy_threshold 3
18+
19+
defstruct [
20+
:checks,
21+
:status,
22+
:failures,
23+
:last_check_time,
24+
:check_results
25+
]
26+
27+
def start_link(opts \\ []) do
28+
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
29+
end
30+
31+
@doc """
32+
Get current health status.
33+
"""
34+
def status do
35+
GenServer.call(__MODULE__, :status)
36+
catch
37+
:exit, {:noproc, _} -> {:error, :monitor_not_running}
38+
end
39+
40+
@doc """
41+
Register a custom health check function.
42+
"""
43+
def register_check(name, fun) when is_atom(name) and is_function(fun, 0) do
44+
GenServer.call(__MODULE__, {:register, name, fun})
45+
end
46+
47+
@doc """
48+
Unregister a health check.
49+
"""
50+
def unregister_check(name) when is_atom(name) do
51+
GenServer.call(__MODULE__, {:unregister, name})
52+
end
53+
54+
@doc """
55+
Get detailed health information.
56+
"""
57+
def get_health_info do
58+
GenServer.call(__MODULE__, :get_health_info)
59+
catch
60+
:exit, {:noproc, _} -> {:error, :monitor_not_running}
61+
end
62+
63+
@doc """
64+
Force an immediate health check.
65+
"""
66+
def check_now do
67+
GenServer.cast(__MODULE__, :check_now)
68+
end
69+
70+
@impl true
71+
def init(_opts) do
72+
# Schedule first check
73+
schedule_check()
74+
75+
state = %__MODULE__{
76+
checks: %{},
77+
status: :healthy,
78+
failures: %{},
79+
last_check_time: nil,
80+
check_results: %{}
81+
}
82+
83+
# Register default checks
84+
default_checks = %{
85+
storage: &check_storage/0,
86+
memory: &check_memory/0,
87+
process_count: &check_process_count/0,
88+
disk_space: &check_disk_space/0,
89+
telemetry: &check_telemetry/0
90+
}
91+
92+
Logger.info("Health monitor started with #{map_size(default_checks)} default checks")
93+
{:ok, %{state | checks: default_checks}}
94+
end
95+
96+
@impl true
97+
def handle_call(:status, _from, state) do
98+
{:reply, state.status, state}
99+
end
100+
101+
def handle_call(:get_health_info, _from, state) do
102+
info = %{
103+
status: state.status,
104+
last_check: state.last_check_time,
105+
checks: state.check_results,
106+
failures: state.failures
107+
}
108+
{:reply, info, state}
109+
end
110+
111+
def handle_call({:register, name, fun}, _from, state) do
112+
Logger.info("Registered health check: #{name}")
113+
new_checks = Map.put(state.checks, name, fun)
114+
{:reply, :ok, %{state | checks: new_checks}}
115+
end
116+
117+
def handle_call({:unregister, name}, _from, state) do
118+
Logger.info("Unregistered health check: #{name}")
119+
new_checks = Map.delete(state.checks, name)
120+
new_failures = Map.delete(state.failures, name)
121+
new_results = Map.delete(state.check_results, name)
122+
123+
{:reply, :ok,
124+
%{state |
125+
checks: new_checks,
126+
failures: new_failures,
127+
check_results: new_results}}
128+
end
129+
130+
@impl true
131+
def handle_cast(:check_now, state) do
132+
{:noreply, perform_health_checks(state)}
133+
end
134+
135+
@impl true
136+
def handle_info(:check, state) do
137+
new_state = perform_health_checks(state)
138+
schedule_check()
139+
{:noreply, new_state}
140+
end
141+
142+
defp perform_health_checks(state) do
143+
results = Enum.map(state.checks, fn {name, check_fun} ->
144+
{name, safe_check(check_fun)}
145+
end)
146+
147+
# Update failure counts
148+
failures = Enum.reduce(results, state.failures, fn {name, result}, acc ->
149+
case result do
150+
{:ok, _} -> Map.delete(acc, name)
151+
{:warning, _} -> Map.put(acc, name, Map.get(acc, name, 0)) # Don't increment for warnings
152+
{:error, _} -> Map.update(acc, name, 1, &(&1 + 1))
153+
end
154+
end)
155+
156+
# Determine overall status
157+
status = cond do
158+
# Any critical errors with threshold exceeded
159+
Enum.any?(failures, fn {_, count} -> count >= @unhealthy_threshold end) ->
160+
:unhealthy
161+
162+
# Any warnings or non-critical errors
163+
Enum.any?(results, fn {_, r} -> match?({:warning, _}, r) end) ->
164+
:degraded
165+
166+
# All checks passed
167+
true ->
168+
:healthy
169+
end
170+
171+
# Log status changes
172+
if status != state.status do
173+
case status do
174+
:unhealthy ->
175+
Logger.error("System health changed to UNHEALTHY")
176+
:degraded ->
177+
Logger.warning("System health changed to DEGRADED")
178+
:healthy ->
179+
Logger.info("System health changed to HEALTHY")
180+
end
181+
182+
# Broadcast health status change
183+
Phoenix.PubSub.broadcast(
184+
Kafkaesque.PubSub,
185+
"health:status",
186+
{:health_status_changed, status}
187+
)
188+
end
189+
190+
# Convert results to map for easier access
191+
check_results = Enum.into(results, %{})
192+
193+
%{state |
194+
failures: failures,
195+
status: status,
196+
last_check_time: System.system_time(:millisecond),
197+
check_results: check_results}
198+
end
199+
200+
defp safe_check(fun) do
201+
fun.()
202+
rescue
203+
error -> {:error, Exception.message(error)}
204+
catch
205+
:exit, reason -> {:error, {:exit, reason}}
206+
end
207+
208+
defp check_storage do
209+
# Check if storage processes are running
210+
case Registry.select(Kafkaesque.TopicRegistry, [
211+
{{{:storage, :_, :_}, :"$1", :_}, [], [:"$1"]}
212+
]) do
213+
[] ->
214+
{:warning, "No storage processes running"}
215+
216+
pids ->
217+
alive_count = Enum.count(pids, &Process.alive?/1)
218+
total_count = length(pids)
219+
220+
if alive_count == total_count do
221+
{:ok, %{storage_processes: alive_count}}
222+
else
223+
{:error, "#{total_count - alive_count} storage processes down"}
224+
end
225+
end
226+
end
227+
228+
defp check_memory do
229+
memory = :erlang.memory()
230+
total = memory[:total]
231+
processes = memory[:processes]
232+
ets = memory[:ets]
233+
234+
total_mb = div(total, 1024 * 1024)
235+
236+
cond do
237+
total > 2_000_000_000 -> # 2GB
238+
{:error, %{total_mb: total_mb, message: "Memory usage critical"}}
239+
240+
total > 1_000_000_000 -> # 1GB
241+
{:warning, %{total_mb: total_mb, message: "Memory usage high"}}
242+
243+
true ->
244+
{:ok, %{
245+
total_mb: total_mb,
246+
processes_mb: div(processes, 1024 * 1024),
247+
ets_mb: div(ets, 1024 * 1024)
248+
}}
249+
end
250+
end
251+
252+
defp check_process_count do
253+
count = :erlang.system_info(:process_count)
254+
limit = :erlang.system_info(:process_limit)
255+
usage_percent = round(count * 100 / limit)
256+
257+
cond do
258+
count > 100_000 ->
259+
{:error, %{count: count, limit: limit, usage_percent: usage_percent}}
260+
261+
usage_percent > 80 ->
262+
{:warning, %{count: count, limit: limit, usage_percent: usage_percent}}
263+
264+
true ->
265+
{:ok, %{count: count, limit: limit, usage_percent: usage_percent}}
266+
end
267+
end
268+
269+
defp check_disk_space do
270+
data_dir = Application.get_env(:kafkaesque_core, :data_dir, "./data")
271+
272+
case File.stat(data_dir) do
273+
{:ok, _} ->
274+
case System.cmd("df", ["-k", data_dir], stderr_to_stdout: true) do
275+
{output, 0} ->
276+
lines = String.split(output, "\n")
277+
278+
if length(lines) >= 2 do
279+
parts = lines |> Enum.at(1) |> String.split()
280+
281+
if length(parts) >= 4 do
282+
available = (parts |> Enum.at(3) |> String.to_integer()) * 1024
283+
used_percent = parts |> Enum.at(4) |> String.trim_trailing("%") |> String.to_integer()
284+
285+
cond do
286+
used_percent > 95 ->
287+
{:error, %{used_percent: used_percent, available_mb: div(available, 1024 * 1024)}}
288+
289+
used_percent > 90 ->
290+
{:warning, %{used_percent: used_percent, available_mb: div(available, 1024 * 1024)}}
291+
292+
true ->
293+
{:ok, %{used_percent: used_percent, available_mb: div(available, 1024 * 1024)}}
294+
end
295+
else
296+
{:ok, %{data_dir: data_dir}}
297+
end
298+
else
299+
{:ok, %{data_dir: data_dir}}
300+
end
301+
302+
_ ->
303+
{:ok, %{data_dir: data_dir, note: "Unable to get disk stats"}}
304+
end
305+
306+
_ ->
307+
{:error, "Data directory does not exist: #{data_dir}"}
308+
end
309+
end
310+
311+
defp check_telemetry do
312+
case Process.whereis(Kafkaesque.Telemetry) do
313+
nil ->
314+
{:error, "Telemetry process not running"}
315+
316+
pid when is_pid(pid) ->
317+
if Process.alive?(pid) do
318+
try do
319+
metrics = Kafkaesque.Telemetry.get_metrics()
320+
{:ok, %{
321+
messages_per_sec: metrics[:messages_per_sec] || 0,
322+
bytes_per_sec: metrics[:bytes_per_sec] || 0
323+
}}
324+
rescue
325+
_ -> {:error, "Failed to get telemetry metrics"}
326+
end
327+
else
328+
{:error, "Telemetry process not alive"}
329+
end
330+
end
331+
end
332+
333+
defp schedule_check do
334+
Process.send_after(self(), :check, @check_interval)
335+
end
336+
end

0 commit comments

Comments
 (0)