Skip to content

Commit 89b21ad

Browse files
misc improvements
1 parent f2fe628 commit 89b21ad

8 files changed

Lines changed: 266 additions & 147 deletions

File tree

apps/kafkaesque_client/lib/kafkaesque_client/admin.ex

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,25 @@ defmodule KafkaesqueClient.Admin do
3434
@doc """
3535
Creates a new topic with the specified configuration.
3636
37+
## Options
38+
39+
- `:partitions` - Number of partitions (default: 1)
40+
- `:batch_size` - Max records per batch (default: 500)
41+
- `:batch_timeout` - Batch timeout in milliseconds (default: 5000)
42+
- `:min_demand` - Min demand for GenStage (default: 5)
43+
- `:max_demand` - Max demand for GenStage (default: 500)
44+
3745
## Examples
3846
3947
Admin.create_topic(admin, "my-topic", partitions: 3)
48+
49+
Admin.create_topic(admin, "high-throughput",
50+
partitions: 5,
51+
batch_size: 1000,
52+
batch_timeout: 1000,
53+
min_demand: 10,
54+
max_demand: 1000
55+
)
4056
"""
4157
@spec create_topic(GenServer.server(), String.t(), keyword()) ::
4258
{:ok, map()} | {:error, term()}
@@ -140,10 +156,18 @@ defmodule KafkaesqueClient.Admin do
140156
@impl true
141157
def handle_call({:create_topic, topic_name, opts}, _from, state) do
142158
partitions = Keyword.get(opts, :partitions, 1)
159+
batch_size = Keyword.get(opts, :batch_size, 0)
160+
batch_timeout_ms = Keyword.get(opts, :batch_timeout, 0)
161+
min_demand = Keyword.get(opts, :min_demand, 0)
162+
max_demand = Keyword.get(opts, :max_demand, 0)
143163

144164
request = %Kafkaesque.CreateTopicRequest{
145165
name: topic_name,
146-
partitions: partitions
166+
partitions: partitions,
167+
batch_size: batch_size,
168+
batch_timeout_ms: batch_timeout_ms,
169+
min_demand: min_demand,
170+
max_demand: max_demand
147171
}
148172

149173
result =

apps/kafkaesque_dashboard/lib/kafkaesque_dashboard/application.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ defmodule KafkaesqueDashboard.Application do
1111
children = [
1212
{NodeJS.Supervisor, [path: SSR.NodeJS.server_path(), pool_size: 4]},
1313
Telemetry,
14+
KafkaesqueDashboard.MetricsCache,
1415
Endpoint
1516
]
1617

apps/kafkaesque_dashboard/lib/kafkaesque_dashboard/live/dashboard_live.ex

Lines changed: 54 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -198,42 +198,34 @@ defmodule KafkaesqueDashboard.DashboardLive do
198198
end
199199

200200
defp get_disk_usage(path) do
201-
# Get actual disk usage for the data directory
202-
case System.cmd("df", ["-k", path]) do
203-
{output, 0} ->
204-
# Parse df output
205-
lines = String.split(output, "\n")
206-
207-
case Enum.at(lines, 1) do
208-
nil ->
209-
%{used_gb: 0.0, total_gb: 10.0}
210-
211-
line ->
212-
parts = String.split(line, ~r/\s+/)
213-
214-
case parts do
215-
[_filesystem, total_kb, used_kb | _] ->
216-
%{
217-
used_gb: String.to_integer(used_kb) / (1024 * 1024),
218-
total_gb: String.to_integer(total_kb) / (1024 * 1024)
219-
}
201+
used_bytes = calculate_directory_size(path)
202+
203+
# Try to get total disk space using Erlang's disk_info if available
204+
total_gb =
205+
case :disksup.get_disk_data() do
206+
[_|_] = disks ->
207+
# Find the disk containing our path
208+
disk = Enum.find(disks, fn {mount, _size, _used} ->
209+
String.starts_with?(path, to_string(mount))
210+
end)
220211

221-
_ ->
222-
%{used_gb: 0.0, total_gb: 10.0}
223-
end
224-
end
212+
case disk do
213+
{_mount, size_kb, _used} ->
214+
size_kb / (1024 * 1024)
215+
_ ->
216+
100.0 # Default 100GB if we can't determine
217+
end
225218

226-
_ ->
227-
# Fallback: calculate size of all files in data directory
228-
used_bytes = calculate_directory_size(path)
219+
_ ->
220+
100.0 # Default 100GB if disksup is not available
221+
end
229222

230-
%{
231-
used_gb: used_bytes / (1024 * 1024 * 1024),
232-
total_gb: 10.0
233-
}
234-
end
223+
%{
224+
used_gb: used_bytes / (1024 * 1024 * 1024),
225+
total_gb: total_gb
226+
}
235227
rescue
236-
_ -> %{used_gb: 0.0, total_gb: 10.0}
228+
_ -> %{used_gb: 0.0, total_gb: 100.0}
237229
end
238230

239231
defp calculate_directory_size(path) do
@@ -341,100 +333,12 @@ defmodule KafkaesqueDashboard.DashboardLive do
341333
end
342334

343335
defp get_consumer_groups do
344-
# Get all consumer groups by scanning offset tables
345-
offsets_dir = Application.get_env(:kafkaesque_core, :offsets_dir, "./offsets")
346-
347-
case File.ls(offsets_dir) do
348-
{:ok, files} ->
349-
# Parse DETS files to find unique consumer groups
350-
groups =
351-
files
352-
|> Enum.filter(&String.ends_with?(&1, "_offsets.dets"))
353-
|> Enum.flat_map(fn file ->
354-
extract_consumer_groups_from_file(Path.join(offsets_dir, file))
355-
end)
356-
|> Enum.group_by(& &1.name)
357-
|> Enum.map(fn {group_name, group_data} ->
358-
# Aggregate data for each group
359-
total_lag = Enum.reduce(group_data, 0, fn g, acc -> acc + g.lag end)
360-
361-
%{
362-
name: group_name,
363-
lag: total_lag,
364-
# Approximate by partition count
365-
members: length(group_data),
366-
# Default state, could be enhanced with real tracking
367-
state: "stable"
368-
}
369-
end)
370-
371-
if groups == [] do
372-
# Return empty list if no consumer groups
373-
[]
374-
else
375-
groups
376-
end
377-
378-
_ ->
379-
[]
380-
end
381-
end
382-
383-
defp extract_consumer_groups_from_file(file_path) do
384-
# Parse filename to get topic and partition
385-
basename = Path.basename(file_path, "_offsets.dets")
386-
387-
case String.split(basename, "_") do
388-
[topic | rest] when rest != [] ->
389-
partition_str = List.last(rest)
390-
partition = String.to_integer(partition_str || "0")
391-
topic_name = Enum.join([topic | Enum.drop(rest, -1)], "_")
392-
393-
# Open DETS file to read consumer groups
394-
table_name = String.to_atom("temp_#{:erlang.unique_integer([:positive])}")
395-
396-
case :dets.open_file(table_name, file: String.to_charlist(file_path), access: :read) do
397-
{:ok, _} ->
398-
groups = extract_groups_from_table(table_name, topic_name, partition)
399-
:dets.close(table_name)
400-
groups
401-
402-
_ ->
403-
[]
404-
end
405-
406-
_ ->
407-
[]
408-
end
409-
end
410-
411-
defp extract_groups_from_table(table_name, topic, partition) do
412-
# Read all entries from DETS table
413-
case :dets.match_object(table_name, :_) do
414-
entries when is_list(entries) ->
415-
entries
416-
|> Enum.map(fn
417-
{{^topic, ^partition, group}, offset} ->
418-
# Calculate lag by comparing with latest offset
419-
latest = get_latest_offset(topic, partition)
420-
lag = max(0, latest - offset)
421-
%{name: group, lag: lag, topic: topic, partition: partition}
422-
423-
_ ->
424-
nil
425-
end)
426-
|> Enum.reject(&is_nil/1)
427-
428-
_ ->
429-
[]
430-
end
431-
end
432-
433-
defp get_latest_offset(topic, partition) do
434-
case SingleFile.get_offsets(topic, partition) do
435-
{:ok, %{latest: latest}} -> latest
436-
_ -> 0
437-
end
336+
# Get cached consumer groups data from MetricsCache
337+
KafkaesqueDashboard.MetricsCache.get_consumer_groups()
338+
rescue
339+
_ ->
340+
# Fallback to empty list if cache is not available
341+
[]
438342
end
439343

440344
defp get_throughput_data do
@@ -669,6 +573,16 @@ defmodule KafkaesqueDashboard.DashboardLive do
669573
|> assign(:telemetry_storage_throughput, metrics[:storage_throughput] || 0)
670574
end
671575

576+
@impl true
577+
def terminate(_reason, _socket) do
578+
# Detach all telemetry handlers to prevent memory leak
579+
Enum.each(@telemetry_events, fn event ->
580+
handler_id = "dashboard-#{Enum.join(event, "-")}"
581+
:telemetry.detach(handler_id)
582+
end)
583+
:ok
584+
end
585+
672586
@impl true
673587
def render(assigns) do
674588
~H"""
@@ -679,7 +593,7 @@ defmodule KafkaesqueDashboard.DashboardLive do
679593
<h1 class="text-4xl font-light text-dark-text mb-2">Kafkaesque Dashboard</h1>
680594
<p class="text-dark-muted">Real-time monitoring for your distributed log system</p>
681595
</div>
682-
596+
683597
<!-- Key Metrics -->
684598
<div class="grid grid-cols-2 md:grid-cols-4 lg:grid-cols-8 gap-4 mb-8">
685599
<.metric_card
@@ -717,7 +631,7 @@ defmodule KafkaesqueDashboard.DashboardLive do
717631
value={"#{Float.round(@topic_stats.storage_size_gb, 1)} GB"}
718632
/>
719633
</div>
720-
634+
721635
<!-- Main Charts Row -->
722636
<div class="grid grid-cols-1 lg:grid-cols-2 gap-6 mb-8">
723637
<!-- Throughput Chart -->
@@ -733,7 +647,7 @@ defmodule KafkaesqueDashboard.DashboardLive do
733647
socket={@socket}
734648
/>
735649
</.panel>
736-
650+
737651
<!-- Consumer Lag Chart -->
738652
<.panel title="Consumer Group Lag">
739653
<.svelte
@@ -749,7 +663,7 @@ defmodule KafkaesqueDashboard.DashboardLive do
749663
/>
750664
</.panel>
751665
</div>
752-
666+
753667
<!-- System Resources Row -->
754668
<div class="grid grid-cols-1 lg:grid-cols-3 gap-6 mb-8">
755669
<!-- Resource Gauges -->
@@ -795,7 +709,7 @@ defmodule KafkaesqueDashboard.DashboardLive do
795709
/>
796710
</div>
797711
</.panel>
798-
712+
799713
<!-- Topic Distribution -->
800714
<.panel title="Topic Size Distribution">
801715
<.svelte
@@ -810,7 +724,7 @@ defmodule KafkaesqueDashboard.DashboardLive do
810724
socket={@socket}
811725
/>
812726
</.panel>
813-
727+
814728
<!-- Network Stats -->
815729
<.panel title="Network I/O">
816730
<div class="space-y-6">
@@ -840,7 +754,7 @@ defmodule KafkaesqueDashboard.DashboardLive do
840754
</div>
841755
</.panel>
842756
</div>
843-
757+
844758
<!-- Tables Row -->
845759
<div class="grid grid-cols-1 lg:grid-cols-2 gap-6 mb-8">
846760
<!-- Topics Table -->
@@ -852,7 +766,7 @@ defmodule KafkaesqueDashboard.DashboardLive do
852766
<:col label="Size" field={:size_mb} />
853767
</.data_table>
854768
</.panel>
855-
769+
856770
<!-- Consumer Groups Table -->
857771
<.panel title="Consumer Groups">
858772
<.data_table rows={@consumer_groups}>
@@ -863,7 +777,7 @@ defmodule KafkaesqueDashboard.DashboardLive do
863777
</.data_table>
864778
</.panel>
865779
</div>
866-
780+
867781
<!-- Partition Health -->
868782
<.panel title="Partition Health">
869783
<div class="grid grid-cols-2 md:grid-cols-4 lg:grid-cols-6 gap-4">
@@ -887,7 +801,7 @@ defmodule KafkaesqueDashboard.DashboardLive do
887801
<% end %>
888802
</div>
889803
</.panel>
890-
804+
891805
<!-- Storage & I/O Metrics Row -->
892806
<div class="grid grid-cols-1 lg:grid-cols-3 gap-6 mb-8 mt-8">
893807
<!-- Storage Metrics -->
@@ -936,7 +850,7 @@ defmodule KafkaesqueDashboard.DashboardLive do
936850
</div>
937851
</div>
938852
</.panel>
939-
853+
940854
<!-- Rebalance Events -->
941855
<.panel title="Recent Rebalances">
942856
<div class="space-y-2">
@@ -963,7 +877,7 @@ defmodule KafkaesqueDashboard.DashboardLive do
963877
<% end %>
964878
</div>
965879
</.panel>
966-
880+
967881
<!-- Consumer Activity -->
968882
<.panel title="Consumer Activity">
969883
<div class="space-y-2">
@@ -991,7 +905,7 @@ defmodule KafkaesqueDashboard.DashboardLive do
991905
</div>
992906
</.panel>
993907
</div>
994-
908+
995909
<!-- Event Stream & Latency Percentiles -->
996910
<div class="grid grid-cols-1 lg:grid-cols-2 gap-6 mb-8">
997911
<!-- Topic Events -->
@@ -1021,7 +935,7 @@ defmodule KafkaesqueDashboard.DashboardLive do
1021935
<% end %>
1022936
</div>
1023937
</.panel>
1024-
938+
1025939
<!-- Latency Percentiles -->
1026940
<.panel title="Consumer Latency Percentiles">
1027941
<div class="space-y-4">

0 commit comments

Comments
 (0)