From 4d03219d80f288a353b0fb8bf9fc554709c46122 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 24 Jun 2026 08:08:58 -0700 Subject: [PATCH 1/2] Also wait_for_topics on <0.10 test fixtures --- test/integration/fixtures.py | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/test/integration/fixtures.py b/test/integration/fixtures.py index 21c9a0130..4d6e4f80d 100644 --- a/test/integration/fixtures.py +++ b/test/integration/fixtures.py @@ -34,21 +34,29 @@ def create_topics(broker, topic_names, num_partitions=None, replication_factor=N for topic_name in topic_names: # TODO: verify kafka-topics.sh support for early 0.8 brokers broker._create_topic_via_cli(topic_name, num_partitions, replication_factor) + _wait_for_topics(broker, topic_names) def _create_topics_via_admin(broker, topic_names, num_partitions, replication_factor): - from kafka.admin import NewTopic params = broker._enrich_client_params({}, client_id='topic_creator') - admin = KafkaAdminClient(**params) - try: - topics = [NewTopic(name, num_partitions, replication_factor) for name in topic_names] - admin.create_topics(topics, wait_for_metadata=True) - except InvalidReplicationFactorError: - time.sleep(0.5) - topics = [NewTopic(name, num_partitions, replication_factor) for name in topic_names] - admin.create_topics(topics, wait_for_metadata=True) - finally: - admin.close() + with KafkaAdminClient(**params) as admin: + try: + topics = {name: {'num_partitions': num_partitions, 'replication_factor': replication_factor} + for name in topic_names} + admin.create_topics(topics, wait_for_metadata=True) + except InvalidReplicationFactorError: + # wait and try again + # in CI the brokers sometimes take a while to find themselves + time.sleep(0.5) + topics = {name: {'num_partitions': num_partitions, 'replication_factor': replication_factor} + for name in topic_names} + admin.create_topics(topics, wait_for_metadata=True) + + +def _wait_for_topics(broker, topics): + params = broker._enrich_client_params({}, client_id='topic_creator') + with KafkaAdminClient(**params) as admin: + admin.wait_for_topics(topics) def client_params(broker, client_id='client', **overrides): From ebe722a4ca2e133d60defece8fa330747697bab7 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 24 Jun 2026 08:22:02 -0700 Subject: [PATCH 2/2] pass api_version to create_topics admin client --- test/integration/fixtures.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integration/fixtures.py b/test/integration/fixtures.py index 4d6e4f80d..dceab32f2 100644 --- a/test/integration/fixtures.py +++ b/test/integration/fixtures.py @@ -38,7 +38,7 @@ def create_topics(broker, topic_names, num_partitions=None, replication_factor=N def _create_topics_via_admin(broker, topic_names, num_partitions, replication_factor): - params = broker._enrich_client_params({}, client_id='topic_creator') + params = broker._enrich_client_params({'api_version': env_kafka_version(), 'client_id': 'topic_creator'}) with KafkaAdminClient(**params) as admin: try: topics = {name: {'num_partitions': num_partitions, 'replication_factor': replication_factor} @@ -54,7 +54,7 @@ def _create_topics_via_admin(broker, topic_names, num_partitions, replication_fa def _wait_for_topics(broker, topics): - params = broker._enrich_client_params({}, client_id='topic_creator') + params = broker._enrich_client_params({'api_version': env_kafka_version(), 'client_id': 'topic_creator'}) with KafkaAdminClient(**params) as admin: admin.wait_for_topics(topics)