Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kafka/admin/acl_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class ACLResourcePatternType(IntEnum):
PREFIXED = 4


class ACLFilter(object):
class ACLFilter:
"""Represents a filter to use with describing and deleting ACLs

The difference between this class and the ACL class is mainly that
Expand Down Expand Up @@ -172,7 +172,7 @@ def validate(self):
raise IllegalArgumentError("resource_pattern must be a ResourcePattern object")


class ResourcePatternFilter(object):
class ResourcePatternFilter:
def __init__(
self,
resource_type,
Expand Down
2 changes: 1 addition & 1 deletion kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
log = logging.getLogger(__name__)


class KafkaAdminClient(object):
class KafkaAdminClient:
"""A class for administering the Kafka cluster.

Warning:
Expand Down
2 changes: 1 addition & 1 deletion kafka/admin/config_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class ConfigResourceType(IntEnum):
TOPIC = 2


class ConfigResource(object):
class ConfigResource:
"""A class for specifying config resources.
Arguments:
resource_type (ConfigResourceType): the type of kafka resource
Expand Down
2 changes: 1 addition & 1 deletion kafka/admin/new_partitions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class NewPartitions(object):
class NewPartitions:
"""A class for new partition creation on existing topics. Note that the length of new_assignments, if specified,
must be the difference between the new total number of partitions and the existing number of partitions.
Arguments:
Expand Down
2 changes: 1 addition & 1 deletion kafka/admin/new_topic.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class NewTopic(object):
class NewTopic:
""" A class for new topic creation
Arguments:
name (string): name of the topic
Expand Down
2 changes: 1 addition & 1 deletion kafka/benchmarks/consumer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from kafka import KafkaConsumer


class ConsumerPerformance(object):
class ConsumerPerformance:
@staticmethod
def run(args):
try:
Expand Down
2 changes: 1 addition & 1 deletion kafka/benchmarks/producer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from kafka import KafkaProducer


class ProducerPerformance(object):
class ProducerPerformance:
@staticmethod
def run(args):
try:
Expand Down
6 changes: 3 additions & 3 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
log = logging.getLogger('kafka.client')


class KafkaClient(object):
class KafkaClient:
"""
A network client for asynchronous request/response network I/O.

Expand Down Expand Up @@ -1178,7 +1178,7 @@ def send_and_receive(self, node_id, request):
return future.value


class IdleConnectionManager(object):
class IdleConnectionManager:
def __init__(self, connections_max_idle_ms):
if connections_max_idle_ms > 0:
self.connections_max_idle = connections_max_idle_ms / 1000
Expand Down Expand Up @@ -1234,7 +1234,7 @@ def poll_expired_connection(self):
return None


class KafkaClientMetrics(object):
class KafkaClientMetrics:
def __init__(self, metrics, metric_group_prefix, conns):
self.metrics = metrics
self.metric_group_name = metric_group_prefix + '-metrics'
Expand Down
2 changes: 1 addition & 1 deletion kafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
log = logging.getLogger(__name__)


class ClusterMetadata(object):
class ClusterMetadata:
"""
A class to manage kafka cluster metadata.

Expand Down
6 changes: 3 additions & 3 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class SSLWantWriteError(Exception):
}


class ConnectionStates(object):
class ConnectionStates:
DISCONNECTED = '<disconnected>'
CONNECTING = '<connecting>'
HANDSHAKE = '<handshake>'
Expand All @@ -71,7 +71,7 @@ class ConnectionStates(object):
API_VERSIONS_RECV = '<checking_api_versions_recv>'


class BrokerConnection(object):
class BrokerConnection:
"""Initialize a Kafka broker connection

Keyword Arguments:
Expand Down Expand Up @@ -1229,7 +1229,7 @@ def __str__(self):
AFI_NAMES[self._sock_afi], self._sock_addr)


class BrokerConnectionMetrics(object):
class BrokerConnectionMetrics:
def __init__(self, metrics, metric_group_prefix, node_id):
self.metrics = metrics

Expand Down
16 changes: 8 additions & 8 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class RecordTooLargeError(Errors.KafkaError):
pass


class Fetcher(object):
class Fetcher:
DEFAULT_CONFIG = {
'key_deserializer': None,
'value_deserializer': None,
Expand Down Expand Up @@ -906,7 +906,7 @@ def close(self):
self._next_partition_records.drain()
self._next_in_line_exception_metadata = None

class PartitionRecords(object):
class PartitionRecords:
def __init__(self, fetch_offset, tp, records,
key_deserializer=None, value_deserializer=None,
check_crcs=True, isolation_level=READ_UNCOMMITTED,
Expand Down Expand Up @@ -1082,7 +1082,7 @@ def _contains_abort_marker(self, batch):
return record.abort


class FetchSessionHandler(object):
class FetchSessionHandler:
"""
FetchSessionHandler maintains the fetch session state for connecting to a broker.

Expand Down Expand Up @@ -1209,7 +1209,7 @@ def _response_partitions(self, response):
for partition_data in partitions}


class FetchMetadata(object):
class FetchMetadata:
__slots__ = ('session_id', 'epoch')

MAX_EPOCH = 2147483647
Expand Down Expand Up @@ -1249,7 +1249,7 @@ def next_incremental(self):
FetchMetadata.LEGACY = FetchMetadata(FetchMetadata.INVALID_SESSION_ID, FetchMetadata.FINAL_EPOCH)


class FetchRequestData(object):
class FetchRequestData:
__slots__ = ('_to_send', '_to_forget', '_metadata')

def __init__(self, to_send, to_forget, metadata):
Expand Down Expand Up @@ -1288,15 +1288,15 @@ def to_forget(self):
return list(partition_data.items())


class FetchMetrics(object):
class FetchMetrics:
__slots__ = ('total_bytes', 'total_records')

def __init__(self):
self.total_bytes = 0
self.total_records = 0


class FetchResponseMetricAggregator(object):
class FetchResponseMetricAggregator:
"""
Since we parse the message data for each partition from each fetch
response lazily, fetch-level metrics need to be aggregated as the messages
Expand Down Expand Up @@ -1329,7 +1329,7 @@ def record(self, partition, num_bytes, num_records):
self.sensors.record_topic_fetch_metrics(topic, metrics.total_bytes, metrics.total_records)


class FetchManagerMetrics(object):
class FetchManagerMetrics:
def __init__(self, metrics, prefix):
self.metrics = metrics
self.group_name = '%s-fetch-manager-metrics' % (prefix,)
Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
log = logging.getLogger(__name__)


class KafkaConsumer(object):
class KafkaConsumer:
"""Consume records from a Kafka cluster.

The consumer will transparently handle the failure of servers in the Kafka
Expand Down
4 changes: 2 additions & 2 deletions kafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class SubscriptionType(IntEnum):
USER_ASSIGNED = 3


class SubscriptionState(object):
class SubscriptionState:
"""
A class for tracking the topics, partitions, and offsets for the consumer.
A partition is "assigned" either directly with assign_from_user() (manual
Expand Down Expand Up @@ -436,7 +436,7 @@ def position(self, partition):
return self.assignment[partition].position


class TopicPartitionState(object):
class TopicPartitionState:
def __init__(self):
self.paused = False # whether this partition has been paused by the user
self.reset_strategy = None # the reset strategy if awaiting_reset is set
Expand Down
2 changes: 1 addition & 1 deletion kafka/coordinator/assignors/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
)


class AbstractPartitionAssignor(object):
class AbstractPartitionAssignor:
"""
Abstract assignor implementation which does some common grunt work (in particular collecting
partition counts which are always needed in assignors).
Expand Down
8 changes: 4 additions & 4 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
heartbeat_log = logging.getLogger('kafka.coordinator.heartbeat')


class MemberState(object):
class MemberState:
UNJOINED = '<unjoined>' # the client is not part of a group
REBALANCING = '<rebalancing>' # the client has begun rebalancing
STABLE = '<stable>' # the client has joined and is sending heartbeats


class Generation(object):
class Generation:
def __init__(self, generation_id, member_id, protocol):
self.generation_id = generation_id
self.member_id = member_id
Expand Down Expand Up @@ -57,7 +57,7 @@ class UnjoinedGroupException(Errors.KafkaError):
retriable = True


class BaseCoordinator(object):
class BaseCoordinator:
"""
BaseCoordinator implements group management for a single group member
by interacting with a designated Kafka broker (the coordinator). Group
Expand Down Expand Up @@ -1036,7 +1036,7 @@ def _handle_heartbeat_response(self, future, send_time, response):
future.failure(error)


class GroupCoordinatorMetrics(object):
class GroupCoordinatorMetrics:
def __init__(self, heartbeat, metrics, prefix, tags=None):
self.heartbeat = heartbeat
self.metrics = metrics
Expand Down
2 changes: 1 addition & 1 deletion kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ def _do_auto_commit_offsets_async(self):
self._commit_offsets_async_on_complete)


class ConsumerCoordinatorMetrics(object):
class ConsumerCoordinatorMetrics:
def __init__(self, metrics, metric_group_prefix, subscription):
self.metrics = metrics
self.metric_group_name = '%s-coordinator-metrics' % (metric_group_prefix,)
Expand Down
2 changes: 1 addition & 1 deletion kafka/coordinator/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
log = logging.getLogger(__name__)


class Heartbeat(object):
class Heartbeat:
DEFAULT_CONFIG = {
'group_id': None,
'heartbeat_interval_ms': 3000,
Expand Down
2 changes: 1 addition & 1 deletion kafka/coordinator/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def partitions(self):
for partition in partitions]


class ConsumerProtocol_v0(object):
class ConsumerProtocol_v0:
PROTOCOL_TYPE = 'consumer'
METADATA = ConsumerProtocolMemberMetadata_v0
ASSIGNMENT = ConsumerProtocolMemberAssignment_v0
Expand Down
2 changes: 1 addition & 1 deletion kafka/coordinator/subscription.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class Subscription(object):
class Subscription:
__slots__ = ('_metadata', '_group_instance_id')
def __init__(self, metadata, group_instance_id):
self._metadata = metadata
Expand Down
2 changes: 1 addition & 1 deletion kafka/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
log = logging.getLogger(__name__)


class Future(object):
class Future:
error_on_callbacks = False # and errbacks

def __init__(self):
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/compound_stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def stats(self):
raise NotImplementedError


class NamedMeasurable(object):
class NamedMeasurable:
__slots__ = ('_name', '_stat')

def __init__(self, metric_name, measurable_stat):
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/kafka_metric.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import time


class KafkaMetric(object):
class KafkaMetric:
__slots__ = ('_metric_name', '_measurable', '_config')

# NOTE java constructor takes a lock instance
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/measurable.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import abc


class AbstractMeasurable(object):
class AbstractMeasurable:
"""A measurable quantity that can be registered as a metric"""
@abc.abstractmethod
def measure(self, config, now):
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/metric_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import sys


class MetricConfig(object):
class MetricConfig:
"""Configuration values for metrics"""
__slots__ = ('quota', '_samples', 'event_window', 'time_window_ms', 'tags')

Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/metric_name.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import copy


class MetricName(object):
class MetricName:
"""
This class encapsulates a metric's name, logical group and its
related attributes (tags).
Expand Down
4 changes: 2 additions & 2 deletions kafka/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
logger = logging.getLogger(__name__)


class Metrics(object):
class Metrics:
"""
A registry of sensors and metrics.

Expand Down Expand Up @@ -229,7 +229,7 @@ def register_metric(self, metric):
for reporter in self._reporters:
reporter.metric_change(metric)

class ExpireSensorTask(object):
class ExpireSensorTask:
"""
This iterates over every Sensor and triggers a remove_sensor
if it has expired. Package private for testing
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/quota.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class Quota(object):
class Quota:
"""An upper or lower bound for metrics"""
__slots__ = ('_bound', '_upper')

Expand Down
Loading
Loading