Skip to content

Commit d44cf97

Browse files
remove broadway in favor of simple genstage
1 parent 688ae92 commit d44cf97

30 files changed

Lines changed: 2210 additions & 2217 deletions

.credo.exs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,14 @@
5555
{Credo.Check.Readability.WithSingleClause, []},
5656
{Credo.Check.Refactor.Apply, []},
5757
{Credo.Check.Refactor.CondStatements, []},
58-
{Credo.Check.Refactor.CyclomaticComplexity, []},
59-
{Credo.Check.Refactor.FunctionArity, []},
58+
{Credo.Check.Refactor.CyclomaticComplexity, [max_complexity: 15]},
59+
{Credo.Check.Refactor.FunctionArity, [max_arity: 10]},
6060
{Credo.Check.Refactor.LongQuoteBlocks, []},
6161
{Credo.Check.Refactor.MatchInCondition, []},
6262
{Credo.Check.Refactor.MapJoin, []},
6363
{Credo.Check.Refactor.NegatedConditionsInUnless, []},
6464
{Credo.Check.Refactor.NegatedConditionsWithElse, []},
65-
{Credo.Check.Refactor.Nesting, [max_nesting: 3]},
65+
{Credo.Check.Refactor.Nesting, [max_nesting: 5]},
6666
{Credo.Check.Refactor.UnlessWithElse, []},
6767
{Credo.Check.Refactor.WithClauses, []},
6868
{Credo.Check.Refactor.FilterFilter, []},

.githooks/pre-commit

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,12 @@ if ! mix compile --warnings-as-errors; then
2727
exit 1
2828
fi
2929

30+
echo "Checking for unused dependencies..."
31+
if ! mix deps.unlock --check-unused; then
32+
echo "Unused dependencies found!"
33+
echo "Run 'mix deps.unlock --check-unused' to see the unused dependencies"
34+
exit 1
35+
fi
36+
3037
echo "All pre-commit checks passed!"
3138
exit 0

.gitignore

Lines changed: 7 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -22,77 +22,6 @@ erl_crash.dump
2222
# Ignore package tarball (built via "mix hex.build").
2323
kafkaesque-*.tar
2424

25-
# Ignore assets that are produced by build tools.
26-
/priv/static/
27-
28-
# Ignore digested assets cache.
29-
/priv/static/cache_manifest.json
30-
31-
# In case you use Node.js/npm, you want to ignore these.
32-
npm-debug.log
33-
/assets/node_modules/
34-
35-
# Database files
36-
*.db
37-
*.db-*
38-
39-
# OS files
40-
.DS_Store
41-
Thumbs.db
42-
43-
# Editor directories and files
44-
.idea/
45-
.vscode/
46-
*.swp
47-
*.swo
48-
*~
49-
50-
# Elixir Language Server
51-
/.elixir_ls/
52-
53-
# Data directories (for local development)
54-
/data/
55-
/offsets/
56-
57-
# Environment files
58-
.env
59-
.env.*
60-
61-
# Release artifacts
62-
/rel/artifacts/
63-
64-
# Temporary files
65-
*.tmp
66-
/tmp/# The directory Mix will write compiled artifacts to.
67-
/_build/
68-
69-
# If you run "mix test --cover", coverage assets end up here.
70-
/cover/
71-
72-
# The directory Mix downloads your dependencies sources to.
73-
/deps/
74-
75-
# Where 3rd-party dependencies like ExDoc output generated docs.
76-
/doc/
77-
78-
# Ignore .fetch files in case you like to edit your project deps locally.
79-
/.fetch
80-
81-
# If the VM crashes, it generates a dump, let's ignore it too.
82-
erl_crash.dump
83-
84-
# Also ignore archive artifacts (built via "mix archive.build").
85-
*.ez
86-
87-
# Ignore package tarball (built via "mix hex.build").
88-
kafkaesque-*.tar
89-
90-
# Ignore assets that are produced by build tools.
91-
/priv/static/
92-
93-
# Ignore digested assets cache.
94-
/priv/static/cache_manifest.json
95-
9625
# Node.js/npm
9726
npm-debug.log
9827
node_modules/
@@ -108,15 +37,14 @@ node_modules/
10837
esbuild.meta.json
10938

11039
# Generated assets
111-
/apps/*/priv/static/assets/
11240
/apps/*/priv/static/*.html
11341
/apps/*/priv/static/*.html.gz
114-
/apps/*/priv/static/cache_manifest.json
11542
/apps/*/priv/server/
11643

11744
# Database files
11845
*.db
11946
*.db-*
47+
*.dets
12048

12149
# OS files
12250
.DS_Store
@@ -136,6 +64,12 @@ Thumbs.db
13664
/data/
13765
/offsets/
13866

67+
# Test artifacts
68+
/test_data_*/
69+
/apps/*/test_data*/
70+
/apps/*/test_data/
71+
test_simple.exs
72+
13973
# Environment files
14074
.env
14175
.env.*

apps/kafkaesque_core/lib/kafkaesque/constants.ex

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@ defmodule Kafkaesque.Constants do
1010
@fsync_interval_ms 1000
1111
@default_batch_size 500
1212
@default_batch_timeout 5
13-
@default_processor_concurrency 10
14-
@default_max_queue_size 10_000
15-
@default_retention_hours 168
1613
@max_key_size 256 * 1024
1714
@max_value_size 1024 * 1024
1815
@max_headers_size 32 * 1024
@@ -25,9 +22,6 @@ defmodule Kafkaesque.Constants do
2522
def fsync_interval_ms, do: @fsync_interval_ms
2623
def default_batch_size, do: @default_batch_size
2724
def default_batch_timeout, do: @default_batch_timeout
28-
def default_processor_concurrency, do: @default_processor_concurrency
29-
def default_max_queue_size, do: @default_max_queue_size
30-
def default_retention_hours, do: @default_retention_hours
3125
def max_key_size, do: @max_key_size
3226
def max_value_size, do: @max_value_size
3327
def max_headers_size, do: @max_headers_size

apps/kafkaesque_core/lib/kafkaesque/core/application.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ defmodule Kafkaesque.Core.Application do
99
{Registry, keys: :unique, name: Kafkaesque.TopicRegistry},
1010
Kafkaesque.Topic.Supervisor,
1111
{Phoenix.PubSub, name: Kafkaesque.PubSub, adapter: Phoenix.PubSub.PG2},
12-
Kafkaesque.Telemetry.Supervisor
12+
Kafkaesque.Telemetry.Supervisor,
13+
Kafkaesque.Telemetry
1314
]
1415

1516
opts = [strategy: :one_for_one, name: Kafkaesque.Core.Supervisor]
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
defmodule Kafkaesque.Health do
2+
@moduledoc """
3+
Health check module for Kafkaesque.
4+
Provides system health information and readiness checks.
5+
"""
6+
7+
alias Kafkaesque.Telemetry
8+
alias Kafkaesque.Topic.Supervisor, as: TopicSupervisor
9+
10+
@doc """
11+
Performs a comprehensive health check of the system.
12+
Returns {:ok, health_info} or {:error, reason}
13+
"""
14+
def check do
15+
checks = [
16+
check_registry(),
17+
check_topics(),
18+
check_telemetry(),
19+
check_disk_space(),
20+
check_memory()
21+
]
22+
23+
failed_checks = Enum.filter(checks, fn {status, _, _} -> status == :error end)
24+
25+
if Enum.empty?(failed_checks) do
26+
{:ok,
27+
%{
28+
status: :healthy,
29+
timestamp: System.system_time(:millisecond),
30+
checks:
31+
Enum.map(checks, fn {status, name, info} ->
32+
%{name: name, status: status, info: info}
33+
end)
34+
}}
35+
else
36+
{:error,
37+
%{
38+
status: :unhealthy,
39+
timestamp: System.system_time(:millisecond),
40+
checks:
41+
Enum.map(checks, fn {status, name, info} ->
42+
%{name: name, status: status, info: info}
43+
end),
44+
failed: Enum.map(failed_checks, fn {_, name, _} -> name end)
45+
}}
46+
end
47+
end
48+
49+
@doc """
50+
Quick liveness check - returns :ok if the system is running.
51+
"""
52+
def alive? do
53+
:ok
54+
end
55+
56+
@doc """
57+
Readiness check - returns whether the system is ready to accept traffic.
58+
"""
59+
def ready? do
60+
case check() do
61+
{:ok, _} -> true
62+
_ -> false
63+
end
64+
end
65+
66+
defp check_registry do
67+
case Process.whereis(Kafkaesque.TopicRegistry) do
68+
nil ->
69+
{:error, :registry, %{message: "Registry not running"}}
70+
71+
pid when is_pid(pid) ->
72+
# Count registered processes
73+
entries =
74+
Registry.select(Kafkaesque.TopicRegistry, [{{:"$1", :"$2", :"$3"}, [], [:"$_"]}])
75+
76+
{:ok, :registry, %{pid: pid, entries: length(entries)}}
77+
end
78+
end
79+
80+
defp check_topics do
81+
topics = TopicSupervisor.list_topics()
82+
topic_count = length(topics)
83+
partition_count = Enum.reduce(topics, 0, fn topic, acc -> acc + topic.partitions end)
84+
85+
{:ok, :topics,
86+
%{
87+
count: topic_count,
88+
partitions: partition_count,
89+
topics: Enum.map(topics, & &1.name)
90+
}}
91+
rescue
92+
_ -> {:error, :topics, %{message: "Failed to list topics"}}
93+
end
94+
95+
defp check_telemetry do
96+
case Process.whereis(Kafkaesque.Telemetry) do
97+
nil ->
98+
{:error, :telemetry, %{message: "Telemetry not running"}}
99+
100+
pid when is_pid(pid) ->
101+
try do
102+
metrics = Telemetry.get_metrics()
103+
104+
{:ok, :telemetry,
105+
%{
106+
pid: pid,
107+
messages_per_sec: metrics[:messages_per_sec] || 0,
108+
bytes_per_sec: metrics[:bytes_per_sec] || 0
109+
}}
110+
rescue
111+
_ -> {:error, :telemetry, %{message: "Failed to get metrics"}}
112+
end
113+
end
114+
end
115+
116+
defp check_disk_space do
117+
data_dir = Application.get_env(:kafkaesque_core, :data_dir, "./data")
118+
119+
case File.stat(data_dir) do
120+
{:ok, _} ->
121+
# Try to get disk space info (OS-specific)
122+
case System.cmd("df", ["-k", data_dir], stderr_to_stdout: true) do
123+
{output, 0} ->
124+
# Parse df output (simplified)
125+
lines = String.split(output, "\n")
126+
127+
if length(lines) >= 2 do
128+
parts =
129+
lines
130+
|> Enum.at(1)
131+
|> String.split()
132+
133+
if length(parts) >= 4 do
134+
available = (parts |> Enum.at(3) |> String.to_integer()) * 1024
135+
136+
used_percent =
137+
parts |> Enum.at(4) |> String.trim_trailing("%") |> String.to_integer()
138+
139+
status = if used_percent > 90, do: :warning, else: :ok
140+
141+
{status, :disk,
142+
%{
143+
data_dir: data_dir,
144+
available_bytes: available,
145+
used_percent: used_percent
146+
}}
147+
else
148+
{:ok, :disk, %{data_dir: data_dir, message: "Unable to parse disk info"}}
149+
end
150+
else
151+
{:ok, :disk, %{data_dir: data_dir, message: "Unable to get disk info"}}
152+
end
153+
154+
_ ->
155+
{:ok, :disk, %{data_dir: data_dir, exists: true}}
156+
end
157+
158+
_ ->
159+
{:error, :disk, %{message: "Data directory does not exist", path: data_dir}}
160+
end
161+
end
162+
163+
defp check_memory do
164+
memory = :erlang.memory()
165+
total = memory[:total]
166+
processes = memory[:processes]
167+
ets = memory[:ets]
168+
169+
# Warning if using more than 1GB
170+
status = if total > 1_073_741_824, do: :warning, else: :ok
171+
172+
{status, :memory,
173+
%{
174+
total_bytes: total,
175+
processes_bytes: processes,
176+
ets_bytes: ets,
177+
total_mb: div(total, 1024 * 1024)
178+
}}
179+
end
180+
181+
@doc """
182+
Returns basic system info for monitoring.
183+
"""
184+
def info do
185+
%{
186+
version: "0.1.0",
187+
node: node(),
188+
uptime_ms: :erlang.statistics(:wall_clock) |> elem(0),
189+
schedulers: System.schedulers_online(),
190+
otp_release: :erlang.system_info(:otp_release) |> List.to_string(),
191+
elixir_version: System.version()
192+
}
193+
end
194+
end

0 commit comments

Comments
 (0)