Skip to content

Commit 8693a5a

Browse files
committed
Setup CI for adapter with service dependencies
Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>
1 parent d2380e2 commit 8693a5a

7 files changed

Lines changed: 126 additions & 104 deletions

File tree

.github/workflows/build.yml

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,6 @@ jobs:
545545
env:
546546
CSP_TEST_SKIP_EXAMPLES: "1"
547547

548-
549548
#################################
550549
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
551550
#~~~~~~~~~|##########|~~~~~~~~~~#
@@ -611,6 +610,79 @@ jobs:
611610
run: make test TEST_ARGS="-k TestDBReader"
612611
if: ${{ contains( 'sqlalchemy', matrix.package )}}
613612

613+
###########################
614+
#~~~~~~~~~~~~~~~~~~~~~~~~~#
615+
#~~~~~~|#############|~~~~#
616+
#~~~~~~|#|~~~~~~~/##/~~~~~#
617+
#~~~~~~|#|~~~~~/##/~~~~~~~#
618+
#~~~~~~~~~~~~/##/~~~~~~~~~#
619+
#~~~~~~~~~~/##/~~~~~~~~~~~#
620+
#~~~~~~~~/##/~~~~~~~~~~~~~#
621+
#~~~~~~/##/~~~~~~~~~~~~~~~#
622+
#~~~~~~~~~~~~~~~~~~~~~~~~~#
623+
# Test Service Adapters #
624+
#~~~~~~~~~~~~~~~~~~~~~~~~~#
625+
test_adapters:
626+
needs:
627+
- initialize
628+
- build
629+
630+
strategy:
631+
matrix:
632+
os:
633+
- ubuntu-20.04
634+
python-version:
635+
- 3.9
636+
adapter:
637+
- kafka
638+
639+
runs-on: ${{ matrix.os }}
640+
641+
steps:
642+
- name: Checkout
643+
uses: actions/checkout@v4
644+
with:
645+
submodules: recursive
646+
647+
- name: Set up Python ${{ matrix.python-version }}
648+
uses: ./.github/actions/setup-python
649+
with:
650+
version: '${{ matrix.python-version }}'
651+
652+
- name: Install python dependencies
653+
run: make requirements
654+
655+
- name: Install test dependencies
656+
shell: bash
657+
run: sudo apt-get install graphviz
658+
659+
# Download artifact
660+
- name: Download wheel
661+
uses: actions/download-artifact@v4
662+
with:
663+
name: csp-dist-${{ runner.os }}-${{ runner.arch }}-${{ matrix.python-version }}
664+
665+
- name: Install wheel
666+
run: python -m pip install -U *manylinux2014*.whl --target .
667+
668+
- name: Spin up adapter service
669+
run: make dockerup ADAPTER=${{ matrix.adapter }} DOCKERARGS="--wait --wait-timeout 30"
670+
671+
- name: Wait a few seconds after images have been spun up
672+
run: sleep 30
673+
674+
# Run tests
675+
- name: Setup test flags
676+
shell: bash
677+
run: echo "CSP_TEST_$( echo ${{ matrix.adapter }} | awk '{print toupper($0)}' )=1" >> $GITHUB_ENV
678+
679+
- name: Python Test Steps
680+
run: make test TEST_ARGS="-k ${{ matrix.adapter }}"
681+
682+
- name: Spin down adapter service
683+
run: make dockerdown ADAPTER=${{ matrix.adapter }}
684+
if: ${{ always() }}
685+
614686
#############################
615687
#~~~~~~~~~~~~~~~~~~~~~~~~~~~#
616688
#~~~~~~|#############|~~~~~~#
@@ -664,3 +736,4 @@ jobs:
664736
uses: pypa/gh-action-pypi-publish@release/v1
665737
with:
666738
repository-url: https://test.pypi.org/legacy
739+

Makefile

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,15 +86,16 @@ tests: test
8686

8787
.PHONY: dockerup dockerps dockerdown initpodmanmac
8888
ADAPTER := kafka
89-
DOCKER := podman
89+
DOCKER := docker
90+
DOCKERARGS :=
9091

9192
initpodmanmac:
9293
podman machine stop
9394
podman machine set --cpus 4 --memory 8096
9495
podman machine start
9596

9697
dockerup: ## spin up docker compose services for adapter testing
97-
$(DOCKER) compose -f ci/$(ADAPTER)/docker-compose.yml up -d
98+
$(DOCKER) compose -f ci/$(ADAPTER)/docker-compose.yml up -d $(DOCKERARGS)
9899

99100
dockerps: ## spin up docker compose services for adapter testing
100101
$(DOCKER) compose -f ci/$(ADAPTER)/docker-compose.yml ps

ci/kafka/docker-compose.yml

Lines changed: 25 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -87,84 +87,6 @@ services:
8787
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
8888
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
8989

90-
control-center:
91-
image: confluentinc/cp-enterprise-control-center:7.5.3
92-
hostname: control-center
93-
container_name: control-center
94-
depends_on:
95-
- broker
96-
- schema-registry
97-
- connect
98-
- ksqldb-server
99-
ports:
100-
- "9021:9021"
101-
environment:
102-
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
103-
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
104-
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
105-
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
106-
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
107-
CONTROL_CENTER_REPLICATION_FACTOR: 1
108-
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
109-
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
110-
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
111-
PORT: 9021
112-
113-
ksqldb-server:
114-
image: confluentinc/cp-ksqldb-server:7.5.3
115-
hostname: ksqldb-server
116-
container_name: ksqldb-server
117-
depends_on:
118-
- broker
119-
- connect
120-
ports:
121-
- "8088:8088"
122-
environment:
123-
KSQL_CONFIG_DIR: "/etc/ksql"
124-
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
125-
KSQL_HOST_NAME: ksqldb-server
126-
KSQL_LISTENERS: "http://0.0.0.0:8088"
127-
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
128-
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
129-
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
130-
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
131-
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
132-
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
133-
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
134-
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
135-
136-
# ksqldb-cli:
137-
# image: confluentinc/cp-ksqldb-cli:7.5.3
138-
# container_name: ksqldb-cli
139-
# depends_on:
140-
# - broker
141-
# - connect
142-
# - ksqldb-server
143-
# entrypoint: /bin/sh
144-
# tty: true
145-
146-
# ksql-datagen:
147-
# image: confluentinc/ksqldb-examples:7.5.3
148-
# hostname: ksql-datagen
149-
# container_name: ksql-datagen
150-
# depends_on:
151-
# - ksqldb-server
152-
# - broker
153-
# - schema-registry
154-
# - connect
155-
# command: "bash -c 'echo Waiting for Kafka to be ready... && \
156-
# cub kafka-ready -b broker:29092 1 40 && \
157-
# echo Waiting for Confluent Schema Registry to be ready... && \
158-
# cub sr-ready schema-registry 8081 40 && \
159-
# echo Waiting a few seconds for topic creation to finish... && \
160-
# sleep 11 && \
161-
# tail -f /dev/null'"
162-
# environment:
163-
# KSQL_CONFIG_DIR: "/etc/ksql"
164-
# STREAMS_BOOTSTRAP_SERVERS: broker:29092
165-
# STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
166-
# STREAMS_SCHEMA_REGISTRY_PORT: 8081
167-
16890
rest-proxy:
16991
image: confluentinc/cp-kafka-rest:7.5.3
17092
depends_on:
@@ -178,4 +100,28 @@ services:
178100
KAFKA_REST_HOST_NAME: rest-proxy
179101
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
180102
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
181-
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
103+
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
104+
105+
# Uncomment for a helpful UI
106+
# control-center:
107+
# image: confluentinc/cp-enterprise-control-center:7.5.3
108+
# hostname: control-center
109+
# container_name: control-center
110+
# depends_on:
111+
# - broker
112+
# - schema-registry
113+
# - connect
114+
# ports:
115+
# - "9021:9021"
116+
# environment:
117+
# CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
118+
# CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
119+
# CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
120+
# CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
121+
# CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
122+
# CONTROL_CENTER_REPLICATION_FACTOR: 1
123+
# CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
124+
# CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
125+
# CONFLUENT_METRICS_TOPIC_REPLICATION: 1
126+
# PORT: 9021
127+

csp/adapters/kafka.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def __init__(
7373

7474
consumer_properties = {
7575
"group.id": group_id,
76-
# To get end of parition notification for live / not live flag
76+
# To get end of partition notification for live / not live flag
7777
"enable.partition.eof": "true",
7878
}
7979

csp/tests/adapters/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@ def kafkabroker():
1212
def kafkaadapter(kafkabroker):
1313
group_id = "group.id123"
1414
_kafkaadapter = KafkaAdapterManager(
15-
broker=kafkabroker, group_id=group_id, rd_kafka_conf_options={"allow.auto.create.topics": "true"}
15+
broker=kafkabroker, group_id=group_id
1616
)
1717
return _kafkaadapter

csp/tests/adapters/test_kafka.py

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,10 @@ def graph(count: int):
7979
}
8080

8181
topic = f"test.metadata.{os.getpid()}"
82-
_precreate_topic(topic)
8382
subKey = "foo"
8483
pubKey = ["mapped_a", "mapped_b", "mapped_c"]
8584

86-
c = csp.count(csp.timer(timedelta(seconds=0.1)))
85+
c = csp.count(csp.timer(timedelta(seconds=0.5)))
8786
t = csp.sample(c, csp.const("foo"))
8887

8988
pubStruct = MetaPubData.collectts(
@@ -104,22 +103,23 @@ def graph(count: int):
104103
)
105104

106105
csp.add_graph_output("sub_data", sub_data)
107-
# csp.print('sub', sub_data)
106+
csp.print('sub', sub_data)
108107
# Wait for at least count ticks and until we get a live tick
109-
done_flag = csp.count(sub_data) >= count
110-
done_flag = csp.and_(done_flag, sub_data.mapped_live is True)
108+
done_flag = csp.and_(csp.count(sub_data) >= count, sub_data.mapped_live == True) # noqa: E712
111109
stop = csp.filter(done_flag, done_flag)
112110
csp.stop_engine(stop)
113111

114-
count = 5
115-
results = csp.run(graph, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True)
112+
results = csp.run(graph, 5, starttime=datetime.utcnow(), endtime=timedelta(seconds=20), realtime=True)
116113
assert len(results["sub_data"]) >= 5
117114
print(results)
118115
for result in results["sub_data"]:
119116
assert result[1].mapped_partition >= 0
120117
assert result[1].mapped_offset >= 0
121118
assert result[1].mapped_live is not None
122119
assert result[1].mapped_timestamp < datetime.utcnow()
120+
# first record should be non live
121+
assert results["sub_data"][0][1].mapped_live is False
122+
# last record should be live
123123
assert results["sub_data"][-1][1].mapped_live
124124

125125
@pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests")
@@ -145,8 +145,7 @@ def graph(symbols: list, count: int):
145145
struct_field_map = {"b": "b2", "i": "i2", "d": "d2", "s": "s2", "dt": "dt2"}
146146

147147
done_flags = []
148-
topic = f"mktdata.{os.getpid()}"
149-
_precreate_topic(topic)
148+
150149
for symbol in symbols:
151150
kafkaadapter.publish(msg_mapper, topic, symbol, b, field_map="b")
152151
kafkaadapter.publish(msg_mapper, topic, symbol, i, field_map="i")
@@ -183,10 +182,12 @@ def graph(symbols: list, count: int):
183182
stop = csp.filter(stop, stop)
184183
csp.stop_engine(stop)
185184

185+
topic = f"mktdata.{os.getpid()}"
186+
_precreate_topic(topic)
186187
symbols = ["AAPL", "MSFT"]
187188
count = 100
188189
results = csp.run(
189-
graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True
190+
graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True
190191
)
191192
for symbol in symbols:
192193
pub = results[f"pall_{symbol}"]
@@ -212,7 +213,7 @@ def pub_graph():
212213
csp.stop_engine(stop)
213214
# csp.print('pub', struct)
214215

215-
csp.run(pub_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True)
216+
csp.run(pub_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True)
216217

217218
# grab start/end times
218219
def get_times_graph():
@@ -232,7 +233,7 @@ def get_times_graph():
232233
# csp.print('sub', data)
233234
# csp.print('status', kafkaadapter.status())
234235

235-
all_data = csp.run(get_times_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True)[
236+
all_data = csp.run(get_times_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True)[
236237
"data"
237238
]
238239
min_time = all_data[0][1].dt
@@ -258,7 +259,7 @@ def get_data(start_offset, expected_count):
258259
KafkaStartOffset.EARLIEST,
259260
10,
260261
starttime=datetime.utcnow(),
261-
endtime=timedelta(seconds=30),
262+
endtime=timedelta(seconds=10),
262263
realtime=True,
263264
)["data"]
264265
# print(res)
@@ -276,7 +277,7 @@ def get_data(start_offset, expected_count):
276277
assert len(res) == 0
277278

278279
res = csp.run(
279-
get_data, KafkaStartOffset.START_TIME, 10, starttime=min_time, endtime=timedelta(seconds=30), realtime=True
280+
get_data, KafkaStartOffset.START_TIME, 10, starttime=min_time, endtime=timedelta(seconds=10), realtime=True
280281
)["data"]
281282
assert len(res) == 10
282283

@@ -287,12 +288,12 @@ def get_data(start_offset, expected_count):
287288
stime = all_data[2][1].dt + timedelta(milliseconds=1)
288289
expected = [x for x in all_data if x[1].dt >= stime]
289290
res = csp.run(
290-
get_data, stime, len(expected), starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True
291+
get_data, stime, len(expected), starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True
291292
)["data"]
292293
assert len(res) == len(expected)
293294

294295
res = csp.run(
295-
get_data, timedelta(seconds=0), len(expected), starttime=stime, endtime=timedelta(seconds=30), realtime=True
296+
get_data, timedelta(seconds=0), len(expected), starttime=stime, endtime=timedelta(seconds=10), realtime=True
296297
)["data"]
297298
assert len(res) == len(expected)
298299

@@ -314,8 +315,6 @@ def graph(symbols: list, count: int):
314315
msg_mapper = RawBytesMessageMapper()
315316

316317
done_flags = []
317-
topic = f"test_str.{os.getpid()}"
318-
_precreate_topic(topic)
319318
for symbol in symbols:
320319
topic = f"test_str.{os.getpid()}"
321320
kafkaadapter.publish(msg_mapper, topic, symbol, d)
@@ -356,10 +355,13 @@ def graph(symbols: list, count: int):
356355
stop = csp.filter(stop, stop)
357356
csp.stop_engine(stop)
358357

358+
topic = f"test_str.{os.getpid()}"
359+
_precreate_topic(topic)
360+
359361
symbols = ["AAPL", "MSFT"]
360362
count = 10
361363
results = csp.run(
362-
graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True
364+
graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True
363365
)
364366
# print(results)
365367
for symbol in symbols:

csp/tests/adapters/test_status.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class SubData(csp.Struct):
1414
a: bool
1515

1616

17-
class TestStatus:
17+
class TestStatusKafka:
1818
@pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests")
1919
def test_basic(self, kafkaadapter):
2020
topic = f"csp.unittest.{os.getpid()}"

0 commit comments

Comments
 (0)