diff --git a/test/integration/fixtures.py b/test/integration/fixtures.py index 21c9a0130..dceab32f2 100644 --- a/test/integration/fixtures.py +++ b/test/integration/fixtures.py @@ -34,21 +34,29 @@ def create_topics(broker, topic_names, num_partitions=None, replication_factor=N for topic_name in topic_names: # TODO: verify kafka-topics.sh support for early 0.8 brokers broker._create_topic_via_cli(topic_name, num_partitions, replication_factor) + _wait_for_topics(broker, topic_names) def _create_topics_via_admin(broker, topic_names, num_partitions, replication_factor): - from kafka.admin import NewTopic - params = broker._enrich_client_params({}, client_id='topic_creator') - admin = KafkaAdminClient(**params) - try: - topics = [NewTopic(name, num_partitions, replication_factor) for name in topic_names] - admin.create_topics(topics, wait_for_metadata=True) - except InvalidReplicationFactorError: - time.sleep(0.5) - topics = [NewTopic(name, num_partitions, replication_factor) for name in topic_names] - admin.create_topics(topics, wait_for_metadata=True) - finally: - admin.close() + params = broker._enrich_client_params({'api_version': env_kafka_version(), 'client_id': 'topic_creator'}) + with KafkaAdminClient(**params) as admin: + try: + topics = {name: {'num_partitions': num_partitions, 'replication_factor': replication_factor} + for name in topic_names} + admin.create_topics(topics, wait_for_metadata=True) + except InvalidReplicationFactorError: + # wait and try again + # in CI the brokers sometimes take a while to find themselves + time.sleep(0.5) + topics = {name: {'num_partitions': num_partitions, 'replication_factor': replication_factor} + for name in topic_names} + admin.create_topics(topics, wait_for_metadata=True) + + +def _wait_for_topics(broker, topics): + params = broker._enrich_client_params({'api_version': env_kafka_version(), 'client_id': 'topic_creator'}) + with KafkaAdminClient(**params) as admin: + admin.wait_for_topics(topics) def client_params(broker, client_id='client', **overrides):