Skip to content

Commit 66e66bf

Browse files
Allow keyword configuration for producers
1 parent 07a7591 commit 66e66bf

6 files changed

Lines changed: 60 additions & 39 deletions

File tree

README.md

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -305,8 +305,8 @@ Multiple Producers configuration:
305305
306306
```elixir
307307
config :kaffe,
308-
producers: %{
309-
"producer_1" => [
308+
producers: [
309+
producer_1: [
310310
endpoints: [kafka1: 9092], # [hostname: port]
311311
topics: ["kafka-topic"],
312312
@@ -319,11 +319,11 @@ config :kaffe,
319319
password: System.get_env("KAFFE_PRODUCER_PASSWORD")
320320
}
321321
],
322-
"producer_2" => [
322+
producer_2: [
323323
endpoints: [kafka2: 9092], # [hostname: port]
324324
topics: ["another-kafka-topic"]
325325
]
326-
}
326+
]
327327
```
328328
329329
The `partition_strategy` setting can be one of:
@@ -375,7 +375,7 @@ There are several ways to produce:
375375
```
376376
377377
```elixir
378-
Kaffe.Producer.produce_sync_with_client("producer_1", "topic", [{"key1", "value1"}, {"key2", "value2"}])
378+
Kaffe.Producer.produce_sync_with_client(:producer_1, "topic", [{"key1", "value1"}, {"key2", "value2"}])
379379
```
380380
381381
- `topic`/`partition`/`message_list` - Produce each message in the list to the given `topic`/`partition`.
@@ -387,7 +387,7 @@ There are several ways to produce:
387387
```
388388
389389
```elixir
390-
Kaffe.Producer.produce_sync_with_client("producer_1", "topic", 2, [{"key1", "value1"}, {"key2", "value2"}])
390+
Kaffe.Producer.produce_sync_with_client(:producer_1, "topic", 2, [{"key1", "value1"}, {"key2", "value2"}])
391391
```
392392
393393
- `key`/`value` - The key/value will be produced to the first topic given to the producer when it was started. The partition will be selected with the chosen strategy or given function.
@@ -397,7 +397,7 @@ There are several ways to produce:
397397
```
398398
399399
```elixir
400-
Kaffe.Producer.produce_sync_with_client("producer_1", "key", "value")
400+
Kaffe.Producer.produce_sync_with_client(:producer_1, "key", "value")
401401
```
402402
403403
- `topic`/`key`/`value` - The key/value will be produced to the given topic.
@@ -407,7 +407,7 @@ There are several ways to produce:
407407
```
408408
409409
```elixir
410-
Kaffe.Producer.produce_sync_with_client("producer_1", "whitelist", "key", "value")
410+
Kaffe.Producer.produce_sync_with_client(:producer_1, "whitelist", "key", "value")
411411
```
412412
413413
- `topic`/`partition`/`key`/`value` - The key/value will be produced to the given topic/partition.
@@ -417,7 +417,7 @@ There are several ways to produce:
417417
```
418418
419419
```elixir
420-
Kaffe.Producer.produce_sync_with_client("producer_1", "whitelist", 2, "key", "value")
420+
Kaffe.Producer.produce_sync_with_client(:producer_1, "whitelist", 2, "key", "value")
421421
```
422422
423423
**NOTE**: With this approach Kaffe will not calculate the next partition since it assumes you're taking over that job by giving it a specific partition.

config/test.exs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ config :kaffe,
2525
}
2626
]
2727
},
28-
producers: %{
29-
"producer_name" => [
28+
producers: [
29+
producer_name: [
3030
endpoints: [kafka: 9092],
3131
topics: ["kaffe-test"],
3232
sasl: %{
@@ -35,4 +35,4 @@ config :kaffe,
3535
password: System.get_env("KAFFE_PRODUCER_PASSWORD")
3636
}
3737
]
38-
}
38+
]

lib/kaffe.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ defmodule Kaffe do
1818

1919
if producers = Application.get_env(:kaffe, :producers) do
2020
producers
21-
|> Map.keys()
21+
|> Enum.map(&elem(&1, 0))
2222
|> Enum.each(fn config_key ->
2323
Logger.debug("event#start_producer_client=#{__MODULE__}_#{config_key}")
2424
config = Kaffe.Config.Producer.configuration(config_key)

lib/kaffe/config/producer.ex

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ defmodule Kaffe.Config.Producer do
6060

6161
require Logger
6262

63-
@default_producer_config_key "producer"
63+
@default_producer_config_key :producer
6464

6565
def configuration(config_key) do
6666
%{
@@ -141,33 +141,33 @@ defmodule Kaffe.Config.Producer do
141141
def config_get!(config_key, key) do
142142
:kaffe
143143
|> Application.get_env(:producers)
144-
|> Map.fetch!(config_key)
144+
|> Access.fetch!(config_key)
145145
|> Keyword.fetch!(key)
146146
end
147147

148148
def config_get(config_key, key, default) do
149149
:kaffe
150150
|> Application.get_env(:producers)
151-
|> Map.fetch!(config_key)
151+
|> Access.fetch!(config_key)
152152
|> Keyword.get(key, default)
153153
end
154154

155155
def list_config_keys do
156156
:kaffe
157157
|> Application.get_env(:producers)
158-
|> Map.keys()
158+
|> Enum.map(&elem(&1, 0))
159159
end
160160

161161
@doc """
162162
Sets :kaffe, :producers application env if :kaffe, :producer is present.
163163
164164
Provides backward compatibility between single producer and multiple producers.
165-
`#{@default_producer_config_key}` config key is used for multiple producers config.
165+
`:#{@default_producer_config_key}` config key is used for multiple producers config.
166166
"""
167167
@spec maybe_set_producers_env!() :: :ok
168168
def maybe_set_producers_env! do
169169
single_config = Application.get_env(:kaffe, :producer) || []
170-
multiple_config = Application.get_env(:kaffe, :producers) || %{}
170+
multiple_config = Application.get_env(:kaffe, :producers) || []
171171

172172
if !Enum.empty?(single_config) and !Enum.empty?(multiple_config) do
173173
raise("""
@@ -178,7 +178,7 @@ defmodule Kaffe.Config.Producer do
178178
end
179179

180180
if !Enum.empty?(single_config) and Enum.empty?(multiple_config) do
181-
multiple_config = %{@default_producer_config_key => single_config}
181+
multiple_config = [{@default_producer_config_key, single_config}]
182182

183183
Logger.info("""
184184
Configuration for single producer is specified in :kaffe, :producer.

test/kaffe/config/producer_test.exs

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
defmodule Kaffe.Config.ProducerTest do
22
use ExUnit.Case, async: true
33

4-
@default_producer "producer_name"
4+
@default_producer :producer_name
55

66
describe "configuration/0" do
77
test "correct settings are extracted" do
@@ -75,19 +75,19 @@ defmodule Kaffe.Config.ProducerTest do
7575
test "correct settings are extracted for different producer clients" do
7676
config = Application.get_env(:kaffe, :producers)
7777

78-
multiple_producers_config = %{
79-
"producer_1" => [
78+
multiple_producers_config = [
79+
producer_1: [
8080
endpoints: [kafka1: 9092],
8181
topics: ["kaffe-test-1"],
8282
sasl: %{mechanism: :plain, login: "Alice", password: "ecilA"},
8383
ssl: true
8484
],
85-
"producer_2" => [
85+
producer_2: [
8686
endpoints: [kafka2: 9092],
8787
topics: ["kaffe-test-2"],
8888
compression: :zstd
8989
]
90-
}
90+
]
9191

9292
Application.put_env(:kaffe, :producers, multiple_producers_config)
9393

@@ -141,8 +141,28 @@ defmodule Kaffe.Config.ProducerTest do
141141
Application.put_env(:kaffe, :producers, config)
142142
end)
143143

144-
assert Kaffe.Config.Producer.configuration("producer_1") == expected_producer_config_1
145-
assert Kaffe.Config.Producer.configuration("producer_2") == expected_producer_config_2
144+
assert Kaffe.Config.Producer.configuration(:producer_1) == expected_producer_config_1
145+
assert Kaffe.Config.Producer.configuration(:producer_2) == expected_producer_config_2
146+
end
147+
148+
test "the same settings are extracted from map and keyword configuration" do
149+
keyword_config = Application.get_env(:kaffe, :producers)
150+
assert is_list(keyword_config)
151+
assert keyword_config != []
152+
153+
producer_config_from_keyword = Kaffe.Config.Producer.configuration(@default_producer)
154+
155+
map_config = Map.new(keyword_config)
156+
157+
Application.put_env(:kaffe, :producers, map_config)
158+
159+
on_exit(fn ->
160+
Application.put_env(:kaffe, :producers, keyword_config)
161+
end)
162+
163+
producer_config_from_map = Kaffe.Config.Producer.configuration(@default_producer)
164+
165+
assert producer_config_from_keyword == producer_config_from_map
146166
end
147167
end
148168

@@ -163,7 +183,8 @@ defmodule Kaffe.Config.ProducerTest do
163183

164184
test "returns :ok if producers are configured with the :producers key" do
165185
producers_config = Application.get_env(:kaffe, :producers)
166-
assert is_map(producers_config)
186+
assert is_list(producers_config)
187+
assert producers_config != []
167188

168189
Application.delete_env(:kaffe, :producer)
169190

@@ -175,7 +196,7 @@ defmodule Kaffe.Config.ProducerTest do
175196

176197
test "returns :ok and sets :kaffe, :producers if producer is configured with the :producer key" do
177198
producers_config = Application.get_env(:kaffe, :producers)
178-
producer_config = producers_config |> Map.values() |> List.first()
199+
producer_config = producers_config |> Keyword.values() |> List.first()
179200

180201
Application.delete_env(:kaffe, :producers)
181202
Application.put_env(:kaffe, :producer, producer_config)
@@ -188,12 +209,12 @@ defmodule Kaffe.Config.ProducerTest do
188209
assert :ok == Kaffe.Config.Producer.maybe_set_producers_env!()
189210

190211
assert Application.get_env(:kaffe, :producer) == producer_config
191-
assert Application.get_env(:kaffe, :producers) == %{"producer" => producer_config}
212+
assert Application.get_env(:kaffe, :producers) == [producer: producer_config]
192213
end
193214

194215
test "logs message if :kaffe, :producers was set with configuration from :kaffe, :producer" do
195216
producers_config = Application.get_env(:kaffe, :producers)
196-
producer_config = producers_config |> Map.values() |> List.first()
217+
producer_config = producers_config |> Keyword.values() |> List.first()
197218

198219
Application.delete_env(:kaffe, :producers)
199220
Application.put_env(:kaffe, :producer, producer_config)
@@ -214,13 +235,13 @@ defmodule Kaffe.Config.ProducerTest do
214235
To ensure backward compatibility :kaffe, :producers was set to a map \
215236
with default producer name as the key and single producer config as the value:
216237
217-
config :kaffe, producers: %{\"producer\" => #{inspect(producer_config)}}
238+
config :kaffe, producers: [producer: #{inspect(producer_config)}]
218239
"""
219240
end
220241

221242
test "raises error is both :producer and :producers keys are present" do
222243
producers_config = Application.get_env(:kaffe, :producers)
223-
producer_config = producers_config |> Map.values() |> List.first()
244+
producer_config = producers_config |> Keyword.values() |> List.first()
224245

225246
Application.put_env(:kaffe, :producer, producer_config)
226247

test/kaffe/producer_test.exs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ defmodule Kaffe.ProducerTest do
33

44
alias Kaffe.Producer
55

6-
@default_client_config_key "producer_name"
6+
@default_client_config_key :producer_name
77
@default_client_name :kaffe_producer_client_producer_name
88

99
setup do
@@ -97,25 +97,25 @@ defmodule Kaffe.ProducerTest do
9797
setup do
9898
current_producers_config = Application.get_env(:kaffe, :producers)
9999

100-
producers_config = %{
101-
"producer_1" => [
100+
producers_config = [
101+
producer_1: [
102102
endpoints: [kafka1: 9092],
103103
topics: ["topic", "topic2"],
104104
partition_strategy: :md5
105105
],
106-
"producer_2" => [
106+
producer_2: [
107107
endpoints: [kafka2: 9092],
108108
topics: ["topic"],
109109
partition_strategy: :md5
110110
]
111-
}
111+
]
112112

113113
Application.put_env(:kaffe, :producers, producers_config)
114114
on_exit(fn -> Application.put_env(:kaffe, :producers, current_producers_config) end)
115115

116116
producers =
117117
producers_config
118-
|> Map.keys()
118+
|> Keyword.keys()
119119
|> Enum.map(&{&1, Kaffe.Config.Producer.configuration(&1)})
120120

121121
%{producers: producers}

0 commit comments

Comments
 (0)