-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathkafka_replayer.py
More file actions
165 lines (148 loc) · 6.99 KB
/
kafka_replayer.py
File metadata and controls
165 lines (148 loc) · 6.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import kafka
import logging
import six
import time
class KafkaReplayer(object):
"""A utility that allows replaying Kafka records by time range."""
def __init__(self, topic_name, partitions=None, **configs):
"""Create the replayer.
Args:
topic_name: The topic to replay
partitions: Optionally specify the set of partitions (ints) to replay
configs: The configuration kwargs to pass forward to kafka.KafkaConsumer
"""
if not topic_name:
raise ValueError('topic_name is required')
self._topic_name = topic_name
self._partitions = partitions
self._configs = self._configs_with_defaults(configs)
self._log_interval = 10000
self._logger = self._create_logger()
def _create_logger(self):
self._logger = logging.getLogger(__name__)
null_handler = logging.NullHandler()
null_handler.setLevel(logging.DEBUG)
self._logger.addHandler(null_handler)
return self._logger
def _configs_with_defaults(self, configs):
if 'group_id' not in configs:
configs['group_id'] = None
if 'consumer_timeout_ms' not in configs:
configs['consumer_timeout_ms'] = 10000
return configs
def _get_time_millis(self):
return int(round(time.time() * 1000))
def _create_consumer(self):
return kafka.KafkaConsumer(**(self._configs))
def _find_seek_points(self, start_time):
seek_points = {}
consumer = self._create_consumer()
try:
topic_partitions = self._topic_partitions_for_set(self._all_partitions_set(consumer))
for topic_partition in topic_partitions:
# We need to compute the offset independently for each partition via binary search
consumer.assign([topic_partition])
consumer.seek_to_end(topic_partition)
end_offset = consumer.position(topic_partition)
consumer.seek_to_beginning(topic_partition)
start_offset = consumer.position(topic_partition)
target_offset = self._binary_search(consumer, topic_partition, start_offset,
end_offset, start_time)
seek_points[topic_partition] = target_offset
self._logger.debug('Start offset for {0} is {1}'.format(
topic_partition, target_offset))
self._logger.info('Start offsets: {0}'.format(seek_points))
return seek_points
finally:
consumer.close()
def _all_partitions_set(self, consumer):
all_partitions = consumer.partitions_for_topic(self._topic_name)
partitions = None
if self._partitions:
partitions = all_partitions.intersection(self._partitions)
else:
partitions = all_partitions
return partitions
def _topic_partitions_for_set(self, partition_set):
return [kafka.TopicPartition(self._topic_name, p) for p in partition_set]
def _get_next_if_available(self, consumer):
record = None
try:
record = next(consumer)
except StopIteration:
self._logger.debug('Got StopIteration, leaving the record as None')
return record
def _binary_search(self, consumer, tp, start, end, target_time):
# Overall goal: find the earliest offset that is no earlier than the target time
if start == end:
return start
insertion_point = int(start + ((end - start) / 2))
consumer.seek(tp, insertion_point)
record = self._get_next_if_available(consumer)
if record:
ts = record.timestamp
if insertion_point == start:
return start if target_time <= ts else end
elif ts < target_time:
return self._binary_search(consumer, tp, insertion_point + 1, end, target_time)
else:
return self._binary_search(consumer, tp, start, insertion_point, target_time)
return start
def replay(self, start_time, end_time):
"""Replay all specified partitions over the specified time range (inclusive).
Args:
start_time: The start timestamp in milliseconds
end_time: The end timestamp in milliseconds
Yields:
The next ConsumerRecord found within the given time range
Raises:
ValueError: If the specified start or end time is invalid
"""
if start_time < 0:
raise ValueError('start_time must be non-negative')
if end_time < 0:
raise ValueError('end_time must be non-negative')
if start_time > self._get_time_millis():
raise ValueError('start_time must not be in the future')
if start_time > end_time:
raise ValueError('end_time must be at least start_time')
count = 0
last_timestamp = 0
seek_points = self._find_seek_points(start_time)
consumer = self._create_consumer()
try:
# Set up a the consumer to fetch all desired partitions from their seek points
partitions = self._all_partitions_set(consumer)
partition_list = self._topic_partitions_for_set(partitions)
consumer.assign(partition_list)
for tp, offset in six.iteritems(seek_points):
consumer.seek(tp, offset)
while len(partitions) > 0:
record = self._get_next_if_available(consumer)
if not record:
self._logger.info('No more records available. Terminating.')
partitions = set()
else:
last_timestamp = record.timestamp
if last_timestamp > end_time:
# Since partitions are ordered, if we see a too-new timestamp, mark the
# partition complete.
partitions.discard(record.partition)
tp = kafka.TopicPartition(topic=record.topic, partition=record.partition)
if tp not in consumer.paused():
consumer.pause(tp)
self._logger.debug('Completed partition {0}'.format(tp))
elif (record.partition in partitions and last_timestamp >= start_time
and last_timestamp <= end_time):
# Send the record to the client if it's within the specified time range
yield record
count += 1
if count % self._log_interval == 0:
self._logger.debug('Processed {0} offsets, last timestamp: {1}'.format(
count, last_timestamp))
except Exception as e:
self._logger.error('Unexpected exception: {0}'.format(str(e)))
finally:
self._logger.info('Processed {0} offsets, last timestamp: {1}'.format(
count, last_timestamp))
consumer.close()