From 09aa9e45e1756c475d028adfbff81bfe37b21d72 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 23 Mar 2026 08:55:02 -0700 Subject: [PATCH] Drop py2 explicit object inheritence --- kafka/admin/acl_resource.py | 4 ++-- kafka/admin/client.py | 2 +- kafka/admin/config_resource.py | 2 +- kafka/admin/new_partitions.py | 2 +- kafka/admin/new_topic.py | 2 +- kafka/benchmarks/consumer_performance.py | 2 +- kafka/benchmarks/producer_performance.py | 2 +- kafka/client_async.py | 6 +++--- kafka/cluster.py | 2 +- kafka/conn.py | 6 +++--- kafka/consumer/fetcher.py | 16 ++++++++-------- kafka/consumer/group.py | 2 +- kafka/consumer/subscription_state.py | 4 ++-- kafka/coordinator/assignors/abstract.py | 2 +- kafka/coordinator/base.py | 8 ++++---- kafka/coordinator/consumer.py | 2 +- kafka/coordinator/heartbeat.py | 2 +- kafka/coordinator/protocol.py | 2 +- kafka/coordinator/subscription.py | 2 +- kafka/future.py | 2 +- kafka/metrics/compound_stat.py | 2 +- kafka/metrics/kafka_metric.py | 2 +- kafka/metrics/measurable.py | 2 +- kafka/metrics/metric_config.py | 2 +- kafka/metrics/metric_name.py | 2 +- kafka/metrics/metrics.py | 4 ++-- kafka/metrics/quota.py | 2 +- kafka/metrics/stats/histogram.py | 6 +++--- kafka/metrics/stats/percentile.py | 2 +- kafka/metrics/stats/percentiles.py | 2 +- kafka/metrics/stats/rate.py | 2 +- kafka/metrics/stats/sampled_stat.py | 2 +- kafka/metrics/stats/sensor.py | 2 +- kafka/partitioner/default.py | 2 +- kafka/producer/kafka.py | 4 ++-- kafka/producer/producer_batch.py | 2 +- kafka/producer/record_accumulator.py | 6 +++--- kafka/producer/sender.py | 2 +- kafka/producer/transaction_manager.py | 6 +++--- kafka/protocol/list_offsets.py | 2 +- kafka/protocol/parser.py | 4 ++-- kafka/record/default_records.py | 4 ++-- kafka/record/legacy_records.py | 4 ++-- kafka/record/memory_records.py | 2 +- kafka/serializer/abstract.py | 4 ++-- kafka/util.py | 2 +- test/integration/fixtures.py | 2 +- test/service.py | 2 +- test/test_metrics.py | 2 +- test/testutil.py | 2 +- 50 files changed, 78 insertions(+), 78 deletions(-) diff --git a/kafka/admin/acl_resource.py b/kafka/admin/acl_resource.py index a323f69d9..9d104b6a9 100644 --- a/kafka/admin/acl_resource.py +++ b/kafka/admin/acl_resource.py @@ -68,7 +68,7 @@ class ACLResourcePatternType(IntEnum): PREFIXED = 4 -class ACLFilter(object): +class ACLFilter: """Represents a filter to use with describing and deleting ACLs The difference between this class and the ACL class is mainly that @@ -172,7 +172,7 @@ def validate(self): raise IllegalArgumentError("resource_pattern must be a ResourcePattern object") -class ResourcePatternFilter(object): +class ResourcePatternFilter: def __init__( self, resource_type, diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 3b16f0d3f..965680ae9 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -32,7 +32,7 @@ log = logging.getLogger(__name__) -class KafkaAdminClient(object): +class KafkaAdminClient: """A class for administering the Kafka cluster. Warning: diff --git a/kafka/admin/config_resource.py b/kafka/admin/config_resource.py index 347ff915f..eb61dcd50 100644 --- a/kafka/admin/config_resource.py +++ b/kafka/admin/config_resource.py @@ -8,7 +8,7 @@ class ConfigResourceType(IntEnum): TOPIC = 2 -class ConfigResource(object): +class ConfigResource: """A class for specifying config resources. Arguments: resource_type (ConfigResourceType): the type of kafka resource diff --git a/kafka/admin/new_partitions.py b/kafka/admin/new_partitions.py index bc3b8172e..613fb861e 100644 --- a/kafka/admin/new_partitions.py +++ b/kafka/admin/new_partitions.py @@ -1,4 +1,4 @@ -class NewPartitions(object): +class NewPartitions: """A class for new partition creation on existing topics. Note that the length of new_assignments, if specified, must be the difference between the new total number of partitions and the existing number of partitions. Arguments: diff --git a/kafka/admin/new_topic.py b/kafka/admin/new_topic.py index 931e73448..2c53d87bb 100644 --- a/kafka/admin/new_topic.py +++ b/kafka/admin/new_topic.py @@ -1,4 +1,4 @@ -class NewTopic(object): +class NewTopic: """ A class for new topic creation Arguments: name (string): name of the topic diff --git a/kafka/benchmarks/consumer_performance.py b/kafka/benchmarks/consumer_performance.py index f1532f346..e1e6e66ca 100644 --- a/kafka/benchmarks/consumer_performance.py +++ b/kafka/benchmarks/consumer_performance.py @@ -11,7 +11,7 @@ from kafka import KafkaConsumer -class ConsumerPerformance(object): +class ConsumerPerformance: @staticmethod def run(args): try: diff --git a/kafka/benchmarks/producer_performance.py b/kafka/benchmarks/producer_performance.py index 3ab81ffea..bc361b2da 100644 --- a/kafka/benchmarks/producer_performance.py +++ b/kafka/benchmarks/producer_performance.py @@ -11,7 +11,7 @@ from kafka import KafkaProducer -class ProducerPerformance(object): +class ProducerPerformance: @staticmethod def run(args): try: diff --git a/kafka/client_async.py b/kafka/client_async.py index 8553f2d8d..ed51114f3 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -24,7 +24,7 @@ log = logging.getLogger('kafka.client') -class KafkaClient(object): +class KafkaClient: """ A network client for asynchronous request/response network I/O. @@ -1178,7 +1178,7 @@ def send_and_receive(self, node_id, request): return future.value -class IdleConnectionManager(object): +class IdleConnectionManager: def __init__(self, connections_max_idle_ms): if connections_max_idle_ms > 0: self.connections_max_idle = connections_max_idle_ms / 1000 @@ -1234,7 +1234,7 @@ def poll_expired_connection(self): return None -class KafkaClientMetrics(object): +class KafkaClientMetrics: def __init__(self, metrics, metric_group_prefix, conns): self.metrics = metrics self.metric_group_name = metric_group_prefix + '-metrics' diff --git a/kafka/cluster.py b/kafka/cluster.py index 0f37d3936..53915e78f 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -14,7 +14,7 @@ log = logging.getLogger(__name__) -class ClusterMetadata(object): +class ClusterMetadata: """ A class to manage kafka cluster metadata. diff --git a/kafka/conn.py b/kafka/conn.py index 6939488cd..14fd22b42 100755 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -61,7 +61,7 @@ class SSLWantWriteError(Exception): } -class ConnectionStates(object): +class ConnectionStates: DISCONNECTED = '' CONNECTING = '' HANDSHAKE = '' @@ -71,7 +71,7 @@ class ConnectionStates(object): API_VERSIONS_RECV = '' -class BrokerConnection(object): +class BrokerConnection: """Initialize a Kafka broker connection Keyword Arguments: @@ -1229,7 +1229,7 @@ def __str__(self): AFI_NAMES[self._sock_afi], self._sock_addr) -class BrokerConnectionMetrics(object): +class BrokerConnectionMetrics: def __init__(self, metrics, metric_group_prefix, node_id): self.metrics = metrics diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 3afebb5e1..3ac23dc4e 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -51,7 +51,7 @@ class RecordTooLargeError(Errors.KafkaError): pass -class Fetcher(object): +class Fetcher: DEFAULT_CONFIG = { 'key_deserializer': None, 'value_deserializer': None, @@ -906,7 +906,7 @@ def close(self): self._next_partition_records.drain() self._next_in_line_exception_metadata = None - class PartitionRecords(object): + class PartitionRecords: def __init__(self, fetch_offset, tp, records, key_deserializer=None, value_deserializer=None, check_crcs=True, isolation_level=READ_UNCOMMITTED, @@ -1082,7 +1082,7 @@ def _contains_abort_marker(self, batch): return record.abort -class FetchSessionHandler(object): +class FetchSessionHandler: """ FetchSessionHandler maintains the fetch session state for connecting to a broker. @@ -1209,7 +1209,7 @@ def _response_partitions(self, response): for partition_data in partitions} -class FetchMetadata(object): +class FetchMetadata: __slots__ = ('session_id', 'epoch') MAX_EPOCH = 2147483647 @@ -1249,7 +1249,7 @@ def next_incremental(self): FetchMetadata.LEGACY = FetchMetadata(FetchMetadata.INVALID_SESSION_ID, FetchMetadata.FINAL_EPOCH) -class FetchRequestData(object): +class FetchRequestData: __slots__ = ('_to_send', '_to_forget', '_metadata') def __init__(self, to_send, to_forget, metadata): @@ -1288,7 +1288,7 @@ def to_forget(self): return list(partition_data.items()) -class FetchMetrics(object): +class FetchMetrics: __slots__ = ('total_bytes', 'total_records') def __init__(self): @@ -1296,7 +1296,7 @@ def __init__(self): self.total_records = 0 -class FetchResponseMetricAggregator(object): +class FetchResponseMetricAggregator: """ Since we parse the message data for each partition from each fetch response lazily, fetch-level metrics need to be aggregated as the messages @@ -1329,7 +1329,7 @@ def record(self, partition, num_bytes, num_records): self.sensors.record_topic_fetch_metrics(topic, metrics.total_bytes, metrics.total_records) -class FetchManagerMetrics(object): +class FetchManagerMetrics: def __init__(self, metrics, prefix): self.metrics = metrics self.group_name = '%s-fetch-manager-metrics' % (prefix,) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index a04b3e997..8ab3f66e8 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -21,7 +21,7 @@ log = logging.getLogger(__name__) -class KafkaConsumer(object): +class KafkaConsumer: """Consume records from a Kafka cluster. The consumer will transparently handle the failure of servers in the Kafka diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index f71a06ce2..2540303f2 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -23,7 +23,7 @@ class SubscriptionType(IntEnum): USER_ASSIGNED = 3 -class SubscriptionState(object): +class SubscriptionState: """ A class for tracking the topics, partitions, and offsets for the consumer. A partition is "assigned" either directly with assign_from_user() (manual @@ -436,7 +436,7 @@ def position(self, partition): return self.assignment[partition].position -class TopicPartitionState(object): +class TopicPartitionState: def __init__(self): self.paused = False # whether this partition has been paused by the user self.reset_strategy = None # the reset strategy if awaiting_reset is set diff --git a/kafka/coordinator/assignors/abstract.py b/kafka/coordinator/assignors/abstract.py index a14f11ee4..aead28b67 100644 --- a/kafka/coordinator/assignors/abstract.py +++ b/kafka/coordinator/assignors/abstract.py @@ -5,7 +5,7 @@ ) -class AbstractPartitionAssignor(object): +class AbstractPartitionAssignor: """ Abstract assignor implementation which does some common grunt work (in particular collecting partition counts which are always needed in assignors). diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index f87e31618..f4cb7dc56 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -22,13 +22,13 @@ heartbeat_log = logging.getLogger('kafka.coordinator.heartbeat') -class MemberState(object): +class MemberState: UNJOINED = '' # the client is not part of a group REBALANCING = '' # the client has begun rebalancing STABLE = '' # the client has joined and is sending heartbeats -class Generation(object): +class Generation: def __init__(self, generation_id, member_id, protocol): self.generation_id = generation_id self.member_id = member_id @@ -57,7 +57,7 @@ class UnjoinedGroupException(Errors.KafkaError): retriable = True -class BaseCoordinator(object): +class BaseCoordinator: """ BaseCoordinator implements group management for a single group member by interacting with a designated Kafka broker (the coordinator). Group @@ -1036,7 +1036,7 @@ def _handle_heartbeat_response(self, future, send_time, response): future.failure(error) -class GroupCoordinatorMetrics(object): +class GroupCoordinatorMetrics: def __init__(self, heartbeat, metrics, prefix, tags=None): self.heartbeat = heartbeat self.metrics = metrics diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index be3975d24..157600ce2 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -981,7 +981,7 @@ def _do_auto_commit_offsets_async(self): self._commit_offsets_async_on_complete) -class ConsumerCoordinatorMetrics(object): +class ConsumerCoordinatorMetrics: def __init__(self, metrics, metric_group_prefix, subscription): self.metrics = metrics self.metric_group_name = '%s-coordinator-metrics' % (metric_group_prefix,) diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py index 4bf04d4ee..6141f010a 100644 --- a/kafka/coordinator/heartbeat.py +++ b/kafka/coordinator/heartbeat.py @@ -7,7 +7,7 @@ log = logging.getLogger(__name__) -class Heartbeat(object): +class Heartbeat: DEFAULT_CONFIG = { 'group_id': None, 'heartbeat_interval_ms': 3000, diff --git a/kafka/coordinator/protocol.py b/kafka/coordinator/protocol.py index 378bc978c..639b7d59c 100644 --- a/kafka/coordinator/protocol.py +++ b/kafka/coordinator/protocol.py @@ -27,7 +27,7 @@ def partitions(self): for partition in partitions] -class ConsumerProtocol_v0(object): +class ConsumerProtocol_v0: PROTOCOL_TYPE = 'consumer' METADATA = ConsumerProtocolMemberMetadata_v0 ASSIGNMENT = ConsumerProtocolMemberAssignment_v0 diff --git a/kafka/coordinator/subscription.py b/kafka/coordinator/subscription.py index b5c47994d..5b5732567 100644 --- a/kafka/coordinator/subscription.py +++ b/kafka/coordinator/subscription.py @@ -1,4 +1,4 @@ -class Subscription(object): +class Subscription: __slots__ = ('_metadata', '_group_instance_id') def __init__(self, metadata, group_instance_id): self._metadata = metadata diff --git a/kafka/future.py b/kafka/future.py index 5d53c2192..119c652e2 100644 --- a/kafka/future.py +++ b/kafka/future.py @@ -5,7 +5,7 @@ log = logging.getLogger(__name__) -class Future(object): +class Future: error_on_callbacks = False # and errbacks def __init__(self): diff --git a/kafka/metrics/compound_stat.py b/kafka/metrics/compound_stat.py index 656e1122c..31a02181a 100644 --- a/kafka/metrics/compound_stat.py +++ b/kafka/metrics/compound_stat.py @@ -16,7 +16,7 @@ def stats(self): raise NotImplementedError -class NamedMeasurable(object): +class NamedMeasurable: __slots__ = ('_name', '_stat') def __init__(self, metric_name, measurable_stat): diff --git a/kafka/metrics/kafka_metric.py b/kafka/metrics/kafka_metric.py index aaef5d39b..e86b5a149 100644 --- a/kafka/metrics/kafka_metric.py +++ b/kafka/metrics/kafka_metric.py @@ -1,7 +1,7 @@ import time -class KafkaMetric(object): +class KafkaMetric: __slots__ = ('_metric_name', '_measurable', '_config') # NOTE java constructor takes a lock instance diff --git a/kafka/metrics/measurable.py b/kafka/metrics/measurable.py index ef096f31d..fd5be1205 100644 --- a/kafka/metrics/measurable.py +++ b/kafka/metrics/measurable.py @@ -1,7 +1,7 @@ import abc -class AbstractMeasurable(object): +class AbstractMeasurable: """A measurable quantity that can be registered as a metric""" @abc.abstractmethod def measure(self, config, now): diff --git a/kafka/metrics/metric_config.py b/kafka/metrics/metric_config.py index 008dfa6d8..b3e46cf35 100644 --- a/kafka/metrics/metric_config.py +++ b/kafka/metrics/metric_config.py @@ -1,7 +1,7 @@ import sys -class MetricConfig(object): +class MetricConfig: """Configuration values for metrics""" __slots__ = ('quota', '_samples', 'event_window', 'time_window_ms', 'tags') diff --git a/kafka/metrics/metric_name.py b/kafka/metrics/metric_name.py index db351829c..53e59ddb1 100644 --- a/kafka/metrics/metric_name.py +++ b/kafka/metrics/metric_name.py @@ -1,7 +1,7 @@ import copy -class MetricName(object): +class MetricName: """ This class encapsulates a metric's name, logical group and its related attributes (tags). diff --git a/kafka/metrics/metrics.py b/kafka/metrics/metrics.py index f00833837..0b7ea919b 100644 --- a/kafka/metrics/metrics.py +++ b/kafka/metrics/metrics.py @@ -9,7 +9,7 @@ logger = logging.getLogger(__name__) -class Metrics(object): +class Metrics: """ A registry of sensors and metrics. @@ -229,7 +229,7 @@ def register_metric(self, metric): for reporter in self._reporters: reporter.metric_change(metric) - class ExpireSensorTask(object): + class ExpireSensorTask: """ This iterates over every Sensor and triggers a remove_sensor if it has expired. Package private for testing diff --git a/kafka/metrics/quota.py b/kafka/metrics/quota.py index 2cf6e9089..205933ef4 100644 --- a/kafka/metrics/quota.py +++ b/kafka/metrics/quota.py @@ -1,4 +1,4 @@ -class Quota(object): +class Quota: """An upper or lower bound for metrics""" __slots__ = ('_bound', '_upper') diff --git a/kafka/metrics/stats/histogram.py b/kafka/metrics/stats/histogram.py index 019373184..ba5789345 100644 --- a/kafka/metrics/stats/histogram.py +++ b/kafka/metrics/stats/histogram.py @@ -1,7 +1,7 @@ import math -class Histogram(object): +class Histogram: __slots__ = ('_hist', '_count', '_bin_scheme') def __init__(self, bin_scheme): @@ -39,7 +39,7 @@ def __str__(self): values.append('%s:%s' % (float('inf'), self._hist[-1])) return '{%s}' % ','.join(values) - class ConstantBinScheme(object): + class ConstantBinScheme: __slots__ = ('_min', '_max', '_bins', '_bucket_width') def __init__(self, bins, min_val, max_val): @@ -70,7 +70,7 @@ def to_bin(self, x): else: return int(((x - self._min) / self._bucket_width) + 1) - class LinearBinScheme(object): + class LinearBinScheme: __slots__ = ('_bins', '_max', '_scale') def __init__(self, num_bins, max_val): diff --git a/kafka/metrics/stats/percentile.py b/kafka/metrics/stats/percentile.py index 17cbb1fc1..d02ee84de 100644 --- a/kafka/metrics/stats/percentile.py +++ b/kafka/metrics/stats/percentile.py @@ -1,4 +1,4 @@ -class Percentile(object): +class Percentile: __slots__ = ('_metric_name', '_percentile') def __init__(self, metric_name, percentile): diff --git a/kafka/metrics/stats/percentiles.py b/kafka/metrics/stats/percentiles.py index 574557a59..a35b4ab75 100644 --- a/kafka/metrics/stats/percentiles.py +++ b/kafka/metrics/stats/percentiles.py @@ -4,7 +4,7 @@ from kafka.metrics.stats.sampled_stat import AbstractSampledStat -class BucketSizing(object): +class BucketSizing: CONSTANT = 0 LINEAR = 1 diff --git a/kafka/metrics/stats/rate.py b/kafka/metrics/stats/rate.py index 6005bdb47..85550c904 100644 --- a/kafka/metrics/stats/rate.py +++ b/kafka/metrics/stats/rate.py @@ -2,7 +2,7 @@ from kafka.metrics.stats.sampled_stat import AbstractSampledStat -class TimeUnit(object): +class TimeUnit: _names = { 'nanosecond': 0, 'microsecond': 1, diff --git a/kafka/metrics/stats/sampled_stat.py b/kafka/metrics/stats/sampled_stat.py index b3bbfb046..fdda6c79c 100644 --- a/kafka/metrics/stats/sampled_stat.py +++ b/kafka/metrics/stats/sampled_stat.py @@ -81,7 +81,7 @@ def _advance(self, config, time_ms): sample.reset(time_ms) return sample - class Sample(object): + class Sample: def __init__(self, initial_value, now): self.initial_value = initial_value diff --git a/kafka/metrics/stats/sensor.py b/kafka/metrics/stats/sensor.py index 014ef5ae8..db1593c77 100644 --- a/kafka/metrics/stats/sensor.py +++ b/kafka/metrics/stats/sensor.py @@ -5,7 +5,7 @@ from kafka.metrics import KafkaMetric -class Sensor(object): +class Sensor: """ A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on diff --git a/kafka/partitioner/default.py b/kafka/partitioner/default.py index 8004020c0..a33b850cc 100644 --- a/kafka/partitioner/default.py +++ b/kafka/partitioner/default.py @@ -1,7 +1,7 @@ import random -class DefaultPartitioner(object): +class DefaultPartitioner: """Default partitioner. Hashes key to partition using murmur2 hashing (from java client) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index a937138a6..3a1ec216a 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -26,7 +26,7 @@ PRODUCER_CLIENT_ID_SEQUENCE = AtomicInteger() -class KafkaProducer(object): +class KafkaProducer: """A Kafka client that publishes records to the Kafka cluster. The producer is thread safe and sharing a single producer instance across @@ -592,7 +592,7 @@ def close(self, timeout=None, null_logger=False): """ if null_logger: # Disable logger during destruction to avoid touching dangling references - class NullLogger(object): + class NullLogger: def __getattr__(self, name): return lambda *args: None diff --git a/kafka/producer/producer_batch.py b/kafka/producer/producer_batch.py index 85c427aa9..1f5edb80d 100644 --- a/kafka/producer/producer_batch.py +++ b/kafka/producer/producer_batch.py @@ -15,7 +15,7 @@ class FinalState(IntEnum): SUCCEEDED = 2 -class ProducerBatch(object): +class ProducerBatch: def __init__(self, tp, records, now=None): now = time.monotonic() if now is None else now self.max_record_size = 0 diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index ccff983b3..15117dd98 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -13,7 +13,7 @@ log = logging.getLogger(__name__) -class AtomicInteger(object): +class AtomicInteger: def __init__(self, val=0): self._lock = threading.Lock() self._val = val @@ -32,7 +32,7 @@ def get(self): return self._val -class RecordAccumulator(object): +class RecordAccumulator: """ This class maintains a dequeue per TopicPartition that accumulates messages into MessageSets to be sent to the server. @@ -497,7 +497,7 @@ def close(self): self._closed = True -class IncompleteProducerBatches(object): +class IncompleteProducerBatches: """A threadsafe helper class to hold ProducerBatches that haven't been ack'd yet""" def __init__(self): diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 4f6909d94..6a065be66 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -632,7 +632,7 @@ def __str__(self): return "" % (self.config['client_id'], self.config['transactional_id']) -class SenderMetrics(object): +class SenderMetrics: def __init__(self, metrics, client, metadata): self.metrics = metrics diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index dd20cd9b4..2dc4be518 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -22,7 +22,7 @@ NO_SEQUENCE = -1 -class ProducerIdAndEpoch(object): +class ProducerIdAndEpoch: __slots__ = ('producer_id', 'epoch') def __init__(self, producer_id, epoch): @@ -87,7 +87,7 @@ class Priority(IntEnum): END_TXN = 3 -class TransactionManager(object): +class TransactionManager: """ A class which maintains state for transactions. Also keeps the state necessary to ensure idempotent production. """ @@ -491,7 +491,7 @@ def _add_partitions_to_transaction_handler(self): return AddPartitionsToTxnHandler(self, self._pending_partitions_in_transaction) -class TransactionalRequestResult(object): +class TransactionalRequestResult: def __init__(self): self._latch = threading.Event() self._error = None diff --git a/kafka/protocol/list_offsets.py b/kafka/protocol/list_offsets.py index 5afed6af2..d3f7012fd 100644 --- a/kafka/protocol/list_offsets.py +++ b/kafka/protocol/list_offsets.py @@ -4,7 +4,7 @@ UNKNOWN_OFFSET = -1 -class OffsetResetStrategy(object): +class OffsetResetStrategy: LATEST = -1 EARLIEST = -2 NONE = 0 diff --git a/kafka/protocol/parser.py b/kafka/protocol/parser.py index 586b51fa1..274673cf0 100644 --- a/kafka/protocol/parser.py +++ b/kafka/protocol/parser.py @@ -10,7 +10,7 @@ log = logging.getLogger(__name__) -class KafkaProtocol(object): +class KafkaProtocol: """Manage the kafka network protocol Use an instance of KafkaProtocol to manage bytes send/recv'd @@ -46,7 +46,7 @@ def send_request(self, request, correlation_id=None): """Encode and queue a kafka api request for sending. Arguments: - request (object): An un-encoded kafka request. + request : An un-encoded kafka request. correlation_id (int, optional): Optionally specify an ID to correlate requests with responses. If not provided, an ID will be generated automatically. diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 35fcba9cb..496cd8bfe 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -68,7 +68,7 @@ import kafka.codec as codecs -class DefaultRecordBase(object): +class DefaultRecordBase: __slots__ = () @@ -744,7 +744,7 @@ def __str__(self): self._num_records)) -class DefaultRecordMetadata(object): +class DefaultRecordMetadata: __slots__ = ("_size", "_timestamp", "_offset") diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index 4c9bf03dd..b6e141e3b 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -55,7 +55,7 @@ from kafka.errors import CorruptRecordError, UnsupportedCodecError -class LegacyRecordBase(object): +class LegacyRecordBase: __slots__ = () @@ -549,7 +549,7 @@ def estimate_size_in_bytes(cls, magic, compression_type, key, value): return cls.LOG_OVERHEAD + cls.record_size(magic, key, value) -class LegacyRecordMetadata(object): +class LegacyRecordMetadata: __slots__ = ("_crc", "_size", "_timestamp", "_offset") diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py index d203dd8c7..3ef2c3bfc 100644 --- a/kafka/record/memory_records.py +++ b/kafka/record/memory_records.py @@ -119,7 +119,7 @@ def __next__(self): next = __next__ -class MemoryRecordsBuilder(object): +class MemoryRecordsBuilder: __slots__ = ("_builder", "_batch_size", "_buffer", "_next_offset", "_closed", "_magic", "_bytes_written", "_producer_id", "_producer_epoch") diff --git a/kafka/serializer/abstract.py b/kafka/serializer/abstract.py index b656b5cef..529662b07 100644 --- a/kafka/serializer/abstract.py +++ b/kafka/serializer/abstract.py @@ -1,7 +1,7 @@ import abc -class Serializer(object): +class Serializer: __meta__ = abc.ABCMeta def __init__(self, **config): @@ -15,7 +15,7 @@ def close(self): pass -class Deserializer(object): +class Deserializer: __meta__ = abc.ABCMeta def __init__(self, **config): diff --git a/kafka/util.py b/kafka/util.py index 1e21730c9..a09d02d2d 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -69,7 +69,7 @@ def ensure_valid_topic_name(topic): raise ValueError('Topic name "{0}" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"'.format(topic)) -class WeakMethod(object): +class WeakMethod: """ Callable that weakly references a method and the object it is bound to. It is based on https://stackoverflow.com/a/24287465. diff --git a/test/integration/fixtures.py b/test/integration/fixtures.py index 6a7422fdb..bd856f57f 100644 --- a/test/integration/fixtures.py +++ b/test/integration/fixtures.py @@ -59,7 +59,7 @@ def gen_ssl_resources(directory): """.format(directory)) -class Fixture(object): +class Fixture: kafka_version = os.environ.get('KAFKA_VERSION', '0.11.0.2') scala_version = os.environ.get("SCALA_VERSION", '2.8.0') project_root = os.environ.get('PROJECT_ROOT', diff --git a/test/service.py b/test/service.py index 408351544..2113de209 100644 --- a/test/service.py +++ b/test/service.py @@ -15,7 +15,7 @@ log = logging.getLogger(__name__) -class ExternalService(object): +class ExternalService: def __init__(self, host, port): log.info("Using already running service at %s:%d", host, port) self.host = host diff --git a/test/test_metrics.py b/test/test_metrics.py index be909d308..04aac3956 100644 --- a/test/test_metrics.py +++ b/test/test_metrics.py @@ -464,7 +464,7 @@ def measure(self, config, now): return self._value -class TimeKeeper(object): +class TimeKeeper: """ A clock that you can manually advance by calling sleep """ diff --git a/test/testutil.py b/test/testutil.py index 48459c2c8..06e907c22 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -50,7 +50,7 @@ def maybe_skip_unsupported_compression(compression_type): pytest.skip("Compression libraries not installed for %s" % (compression_type,)) -class Timer(object): +class Timer: def __enter__(self): self.start = time.monotonic() return self