Add integration tests for Kafka messaging#85
Conversation
|
With the PR being opened against a feature branch, no CI is actually triggering. Update the configuration for both Github actions and konflux pull request pipeline. |
All three MUST FIX issues and the SHOULD FIX addressed:
|
CI triggers now include
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #85 +/- ##
==========================================
- Coverage 83.69% 83.65% -0.04%
==========================================
Files 13 13
Lines 1325 1328 +3
==========================================
+ Hits 1109 1111 +2
- Misses 216 217 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
/retest |
a94a676 to
2f0c5bc
Compare
The Switched to the official |
The Fixed by adding:
No changes to the Kafka image, environment variables, or any other part of the pipeline. |
The Fixed by mounting a third |
Fixed by adding |
Code Review: Add integration tests for Kafka messagingAcceptance Criteria ComplianceAll six acceptance criteria from the plan are satisfied by the implementation. The three original "separate Kafka test" criteria (AC2–4) are fulfilled via Kafka assertions embedded in
Files Claimed vs. Present in DiffAll four files mentioned in the PR description and commit message appear in Prior Review RoundsAll issues raised in prior rounds have been addressed: unused NITUnnecessary print(f" ✓ All 3 compose-created Kafka messages verified")The |
lubomir
left a comment
There was a problem hiding this comment.
test_auth_builder_can_post_compose makes a POST request that produces a message but doesn validate that. It should check it.
|
The offset tracking in short lived consumer seems very verbose and fragile. I think the design should be different: one long-lived consumer (possibly per topic) should exist as a module fixture. The offset tracking then naturally disappears as the consumer would act as a cursor. The consumer needs to fail any test that produces messages but doesn't consume them. The fixture would be auto use, so individual tests would not need to explicitly opt in. |
Redesigned the Kafka consumer layer as requested: Long-lived module-scoped consumer — a single Auto-use drain check —
Misleading comment in Removed the now-unused |
Code Review: Add integration tests for Kafka messagingAcceptance Criteria ComplianceAll six acceptance criteria from the spec are satisfied:
Files Claimed vs. Present in DiffAll four files mentioned in the PR description appear in Prior Review RoundsAll issues raised in prior rounds are resolved: unused SHOULD FIX
The shared The safest fix is to raise an for tp_key, recs in records.items():
if recs:
if tp_key.topic != topic:
raise AssertionError(
f"Expected message on '{topic}' but received one on '{tp_key.topic}': {recs[0].value!r}"
)
return recs[0].valueCI currently passes, which suggests messages arrive in the expected order in practice, but the silent-discard behaviour is a latent correctness hazard that could mask real bugs. NIT
|
|
The kafka_consumer=None optional parameter pattern is still manual and skippable The whole point of the autouse drain check was to make forgotten messages impossible to miss. But the drain check can only catch messages that arrive within 500ms. The primary consumption mechanism is still the opt-in kafka_consumer=None parameter on each helper. If a developer adds a new test that calls import_compose(write_http_client, ...) without passing kafka_consumer, the message won't be consumed inline. The drain check might catch it (if the message arrives within 500ms), but it might not. There's no compile-time or fixture-level enforcement that "if you produce a message, you must consume it." |
Two changes to address the feedback:
|
Code Review: Add integration tests for Kafka messagingAcceptance Criteria ComplianceAll six acceptance criteria from the plan are satisfied:
Files Claimed vs. Present in DiffAll four files mentioned in the PR description appear in Prior Review RoundsAll previously raised issues are resolved: unused NITTwo adjacent f-strings in raise AssertionError(
f"No message received on Kafka topic '{topic}'" f" within {timeout_ms} ms"
)These are two adjacent f-strings that Python concatenates at compile time. They should be merged into one: raise AssertionError(
f"No message received on Kafka topic '{topic}' within {timeout_ms} ms"
) |
Code Review: Add integration tests for Kafka messagingAcceptance Criteria ComplianceAll six acceptance criteria from the plan are satisfied:
Files Claimed vs. Present in DiffAll four files mentioned in the PR description appear in Prior Review RoundsAll issues raised in prior rounds are resolved: unused AssessmentThe final design (long-lived module-scoped consumer, autouse drain check, required |
|
Create another helper to remove the assert duplication: |
Added |
Code Review: Add integration tests for Kafka messagingAcceptance Criteria ComplianceAll six acceptance criteria from the plan are satisfied:
Files Claimed vs. Present in DiffAll four files mentioned in the PR description appear in Prior Review RoundsAll previously raised issues are resolved: unused NITTwo adjacent f-strings in raise AssertionError(
f"Expected message on topic '{topic}' but received one on"
f" '{tp_key.topic}' (offset={rec.offset}, value={rec.value!r})"
)These two f-strings can be merged into one: raise AssertionError(
f"Expected message on topic '{topic}' but received one on"
f" '{tp_key.topic}' (offset={rec.offset}, value={rec.value!r})"
)Wait — they are already implicitly concatenated by Python and are effectively one string. The concern is purely stylistic: a single f-string would be cleaner. This was raised in a prior round but was not addressed in the last commit. NIT
consumer = KafkaConsumer(
...
consumer_timeout_ms=_KAFKA_CONSUMER_TIMEOUT_MS,
...
)
|
|
Fix the nits above. |
Deploy a single-node Apache Kafka 3.9.2 broker (KRaft mode) in the EaaS ephemeral namespace before CTS starts. Expose it as kafka:9092 (plain PLAINTEXT). Use emptyDir volumes and an initContainer to satisfy OpenShift's restricted-v2 SCC (arbitrary non-root UID). Add a long-lived module-scoped kafka_consumer fixture that subscribes to all three CTS topics (cts.compose-created/tagged/untagged) and seeks to the current end at startup so only messages from this test run are visible. The consumer acts as a cursor; successive _consume_one calls advance it, eliminating offset-snapshot bookkeeping. Add an autouse _kafka_drain_check fixture that polls for 500 ms after each test and fails immediately if any unconsumed message is found. Embed Kafka assertions in the existing workflow helpers (import_compose, tag_compose, untag_compose) via _assert_compose_message. kafka_consumer is a required keyword-only argument in all three helpers so it is structurally impossible to produce a Kafka message without consuming it. Also fix a genuine bug in cts/messaging.py: kafka-python requires Python None (not the string "none") for no compression; convert the string value before constructing KafkaProducer. Generated-By: OpenCode (google-vertex-anthropic/claude-sonnet-4-6@default)
Merged the two adjacent f-strings in |
Code Review: Add integration tests for Kafka messagingAcceptance Criteria ComplianceAll six acceptance criteria from the plan are satisfied:
Files Claimed vs. Present in DiffAll four files mentioned in the PR description appear in Prior Review RoundsAll previously raised issues are resolved except one (see NIT below): unused NIT
consumer = KafkaConsumer(
...
consumer_timeout_ms=_KAFKA_CONSUMER_TIMEOUT_MS,
...
)
|
Add integration tests for Kafka messaging
Adds end-to-end verification that CTS publishes Kafka messages when images are
created, tagged, and untagged. This is tracked by
release-engineering/cts#84.
What changed
.tekton/integration-test-eaas.yamlAdded a
deploy-kafkatask that provisions a single-node Apache Kafka 3.9.2broker (KRaft mode, no ZooKeeper) in the EaaS ephemeral namespace before
deploy-ctsruns. The broker is exposed askafka:9092(plain PLAINTEXT, noTLS or SASL). Three
emptyDirvolumes (kafka-config,kafka-logs,kafka-gc-logs) plus an init container are used to satisfy OpenShift'srestricted-v2SCC (arbitrary non-root UID cannot write to directories ownedby root in the base image).
tests/test_integration_api.pykafka_urlfixture — readsKAFKA_URLfrom the environment; the test isskipped when the variable is absent, so the three new tests are no-ops in
environments that don't have Kafka.
_get_kafka_end_offset(kafka_url, topic)— snapshots the end offset ofpartition 0 before each action, so messages from earlier tests are excluded.
Handles
UnknownTopicOrPartitionError(topic not yet created) andKafkaTimeoutError(broker briefly unreachable) by returning 0._consume_kafka_message(kafka_url, topic, start_offset, ...)— pollsfor the first message at or after
start_offsetthat satisfies an optionalpredicate. Uses manual partition assignment (consumer.assign+seek)with
group_id=Noneto avoid the group-coordinator protocol. Raises a clearAssertionErroron timeout or broker disconnect.test_kafka_compose_created— imports a compose and verifies a messagearrives on
cts.compose-createdreferencing that compose ID.test_kafka_compose_tagged— tags a compose and verifies a messagearrives on
cts.compose-tagged.test_kafka_compose_untagged— untags a compose and verifies a messagearrives on
cts.compose-untagged.cts/messaging.pyFixed a bug where
compression_type="none"(a string) was passed toKafkaProducer.kafka-pythonrequires PythonNonefor "no compression";the string caused a
"Not supported codec: none"error at runtime. The valueis now converted:
None if v == "none" else v.test-requirements.txtAdded
kafka-pythonso the consumer helpers are available in the testenvironment.
.github/workflows/gating.yaml/.tekton/cts-pull-request.yamlAdded
feature/integration-teststo the CI branch trigger lists so thepipeline runs on push and pull-request events targeting that branch.