Skip to content

Commit 51d92d7

Browse files
implement client and (hopefully) fix ci
1 parent 205a960 commit 51d92d7

23 files changed

Lines changed: 3339 additions & 186 deletions

File tree

Dockerfile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ COPY config config
3838
COPY apps/kafkaesque_core/mix.exs apps/kafkaesque_core/
3939
COPY apps/kafkaesque_server/mix.exs apps/kafkaesque_server/
4040
COPY apps/kafkaesque_dashboard/mix.exs apps/kafkaesque_dashboard/
41+
COPY apps/kafkaesque_proto/mix.exs apps/kafkaesque_proto/
42+
COPY apps/kafkaesque_client/mix.exs apps/kafkaesque_client/
43+
COPY apps/kafkaesque_test_support/mix.exs apps/kafkaesque_test_support/
4144

4245
# Get and compile dependencies
4346
ENV MIX_ENV=prod
@@ -48,6 +51,9 @@ RUN mix deps.compile
4851
COPY apps/kafkaesque_core apps/kafkaesque_core
4952
COPY apps/kafkaesque_server apps/kafkaesque_server
5053
COPY apps/kafkaesque_dashboard apps/kafkaesque_dashboard
54+
COPY apps/kafkaesque_proto apps/kafkaesque_proto
55+
COPY apps/kafkaesque_client apps/kafkaesque_client
56+
COPY apps/kafkaesque_test_support apps/kafkaesque_test_support
5157
COPY proto proto
5258

5359
# Copy built assets from assets stage
Lines changed: 201 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,222 @@
11
defmodule KafkaesqueClient do
22
@moduledoc """
3-
Elixir client SDK for Kafkaesque.
3+
Elixir client SDK for Kafkaesque - A Kafka-compatible distributed log service.
44
5-
Provides a high-level API for producing and consuming messages.
5+
This module provides factory methods for creating producers, consumers, and admin clients
6+
that mirror Apache Kafka's client API design.
7+
8+
## Quick Start
9+
10+
# Create a producer
11+
{:ok, producer} = KafkaesqueClient.create_producer(
12+
bootstrap_servers: ["localhost:50051"],
13+
acks: :all
14+
)
15+
16+
# Send a message
17+
record = KafkaesqueClient.producer_record("events", "key-1", "value-1")
18+
{:ok, metadata} = KafkaesqueClient.Producer.send_sync(producer, record)
19+
20+
# Create a consumer
21+
{:ok, consumer} = KafkaesqueClient.create_consumer(
22+
bootstrap_servers: ["localhost:50051"],
23+
group_id: "my-group",
24+
auto_offset_reset: :earliest
25+
)
26+
27+
# Subscribe and poll
28+
:ok = KafkaesqueClient.Consumer.subscribe(consumer, ["events"])
29+
records = KafkaesqueClient.Consumer.poll(consumer)
30+
31+
# Create an admin client
32+
{:ok, admin} = KafkaesqueClient.create_admin(
33+
bootstrap_servers: ["localhost:50051"]
34+
)
35+
36+
# Create a topic
37+
{:ok, topic} = KafkaesqueClient.Admin.create_topic(admin, "new-topic", partitions: 3)
638
"""
739

40+
alias KafkaesqueClient.{Admin, Consumer, Producer}
41+
alias KafkaesqueClient.Connection.Supervisor, as: ConnectionSupervisor
42+
alias KafkaesqueClient.Record.{OffsetAndMetadata, ProducerRecord, TopicPartition}
43+
844
@doc """
9-
Connects to a Kafkaesque server.
45+
Starts the client application and its supervision tree.
46+
47+
This is typically called automatically when the client is added as a dependency,
48+
but can be called manually if needed.
1049
"""
11-
def connect(host, port, opts \\ []) do
12-
channel_opts = Keyword.get(opts, :channel_opts, [])
13-
{:ok, channel} = GRPC.Stub.connect("#{host}:#{port}", channel_opts)
14-
{:ok, channel}
50+
def start(_type \\ :normal, _args \\ []) do
51+
children = [
52+
ConnectionSupervisor
53+
]
54+
55+
opts = [strategy: :one_for_one, name: KafkaesqueClient.Supervisor]
56+
Supervisor.start_link(children, opts)
1557
end
1658

1759
@doc """
18-
Produces a batch of records to a topic.
60+
Creates a new producer with the given configuration.
61+
62+
## Options
63+
64+
- `:bootstrap_servers` - List of Kafkaesque servers (required)
65+
- `:acks` - `:none`, `:leader`, or `:all` (default: `:leader`)
66+
- `:batch_size` - Maximum batch size in bytes (default: 16384)
67+
- `:linger_ms` - Time to wait before sending incomplete batches (default: 100)
68+
- `:compression_type` - Compression type (default: `:none`, future: `:gzip`, `:snappy`)
69+
- `:max_retries` - Maximum number of retries (default: 3)
70+
- `:retry_backoff_ms` - Backoff between retries (default: 100)
71+
72+
## Examples
73+
74+
{:ok, producer} = KafkaesqueClient.create_producer(
75+
bootstrap_servers: ["localhost:50051"],
76+
acks: :all,
77+
batch_size: 32768,
78+
linger_ms: 200
79+
)
1980
"""
20-
def produce(_channel, _topic, _records, _opts \\ []) do
21-
# Implementation will come later
22-
{:ok, :produced}
81+
def create_producer(opts) when is_list(opts) do
82+
config = Map.new(opts)
83+
Producer.start_link(config)
2384
end
2485

2586
@doc """
26-
Consumes records from a topic.
87+
Creates a new consumer with the given configuration.
88+
89+
## Options
90+
91+
- `:bootstrap_servers` - List of Kafkaesque servers (required)
92+
- `:group_id` - Consumer group ID (required)
93+
- `:auto_offset_reset` - `:earliest` or `:latest` (default: `:latest`)
94+
- `:enable_auto_commit` - Enable auto-commit (default: `true`)
95+
- `:auto_commit_interval_ms` - Auto-commit interval (default: 5000)
96+
- `:session_timeout_ms` - Session timeout (default: 30000)
97+
- `:max_poll_records` - Maximum records per poll (default: 500)
98+
99+
## Examples
100+
101+
{:ok, consumer} = KafkaesqueClient.create_consumer(
102+
bootstrap_servers: ["localhost:50051"],
103+
group_id: "my-service",
104+
auto_offset_reset: :earliest,
105+
enable_auto_commit: true
106+
)
27107
"""
28-
def consume(_channel, _topic, _group, _opts \\ []) do
29-
# Implementation will come later
30-
{:ok, :stream}
108+
def create_consumer(opts) when is_list(opts) do
109+
config = Map.new(opts)
110+
Consumer.start_link(config)
31111
end
32112

33113
@doc """
34-
Commits offsets for a consumer group.
114+
Creates a new admin client with the given configuration.
115+
116+
## Options
117+
118+
- `:bootstrap_servers` - List of Kafkaesque servers (required)
119+
- `:request_timeout_ms` - Request timeout (default: 30000)
120+
121+
## Examples
122+
123+
{:ok, admin} = KafkaesqueClient.create_admin(
124+
bootstrap_servers: ["localhost:50051"]
125+
)
126+
"""
127+
def create_admin(opts) when is_list(opts) do
128+
config = Map.new(opts)
129+
Admin.start_link(config)
130+
end
131+
132+
@doc """
133+
Creates a ProducerRecord for sending to Kafkaesque.
134+
135+
## Examples
136+
137+
# Simple record with topic and value
138+
record = KafkaesqueClient.producer_record("events", "my-value")
139+
140+
# Record with key
141+
record = KafkaesqueClient.producer_record("events", "key-1", "value-1")
142+
143+
# Record with headers
144+
record = KafkaesqueClient.producer_record("events", "key-1", "value-1",
145+
headers: %{"trace_id" => "abc-123"}
146+
)
147+
148+
# Full record specification
149+
record = %ProducerRecord{
150+
topic: "events",
151+
partition: 0,
152+
key: "user-123",
153+
value: Jason.encode!(%{action: "login"}),
154+
headers: %{"trace_id" => "abc-123"},
155+
timestamp: System.system_time(:millisecond)
156+
}
157+
"""
158+
def producer_record(topic, value) when is_binary(topic) and is_binary(value) do
159+
ProducerRecord.new(topic, value)
160+
end
161+
162+
def producer_record(topic, key, value) when is_binary(topic) do
163+
ProducerRecord.new(topic, key, value)
164+
end
165+
166+
def producer_record(topic, key, value, opts) when is_binary(topic) and is_list(opts) do
167+
base_record = ProducerRecord.new(topic, key, value)
168+
struct(base_record, opts)
169+
end
170+
171+
@doc """
172+
Creates a TopicPartition reference.
173+
174+
## Examples
175+
176+
tp = KafkaesqueClient.topic_partition("events", 0)
177+
"""
178+
def topic_partition(topic, partition \\ 0) do
179+
TopicPartition.new(topic, partition)
180+
end
181+
182+
@doc """
183+
Creates an OffsetAndMetadata for committing.
184+
185+
## Examples
186+
187+
om = KafkaesqueClient.offset_and_metadata(100)
188+
om = KafkaesqueClient.offset_and_metadata(100, "checkpoint-1")
189+
"""
190+
def offset_and_metadata(offset, metadata \\ nil) do
191+
OffsetAndMetadata.new(offset, metadata)
192+
end
193+
194+
@doc """
195+
Attaches default telemetry handlers for logging metrics.
196+
197+
This sets up handlers that log all client telemetry events.
198+
Call this once at application startup if you want default logging.
199+
200+
## Examples
201+
202+
KafkaesqueClient.attach_telemetry_handlers()
203+
"""
204+
def attach_telemetry_handlers do
205+
KafkaesqueClient.Telemetry.attach_default_handlers()
206+
end
207+
208+
@doc """
209+
Returns telemetry event definitions for monitoring.
210+
211+
Use this with Telemetry.Metrics for Prometheus/Grafana integration.
212+
213+
## Examples
214+
215+
def metrics do
216+
KafkaesqueClient.telemetry_metrics()
217+
end
35218
"""
36-
def commit_offsets(_channel, _group, _offsets) do
37-
# Implementation will come later
38-
{:ok, :committed}
219+
def telemetry_metrics do
220+
KafkaesqueClient.Telemetry.metrics_definitions()
39221
end
40222
end

0 commit comments

Comments
 (0)