-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Expand file tree
/
Copy path__init__.py
More file actions
84 lines (74 loc) · 2.62 KB
/
__init__.py
File metadata and controls
84 lines (74 loc) · 2.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import argparse
import logging
from kafka import KafkaConsumer
from kafka.errors import KafkaError
def main_parser():
parser = argparse.ArgumentParser(
prog='python -m kafka.consumer',
description='Kafka console consumer',
)
parser.add_argument(
'-b', '--bootstrap-servers', type=str, action='append', required=True,
help='host:port for cluster bootstrap servers')
parser.add_argument(
'-t', '--topic', type=str, action='append', dest='topics', required=True,
help='subscribe to topic')
parser.add_argument(
'-g', '--group', type=str, required=True,
help='consumer group')
parser.add_argument(
'-c', '--extra-config', type=str, action='append',
help='additional configuration properties for kafka consumer')
parser.add_argument(
'-l', '--log-level', type=str, default='CRITICAL',
help='logging level, passed to logging.basicConfig')
parser.add_argument(
'-f', '--format', type=str, default='str',
help='output format: str|raw|full')
parser.add_argument(
'--encoding', type=str, default='utf-8', help='encoding to use for str output decode()')
return parser
_LOGGING_LEVELS = {'NOTSET': 0, 'DEBUG': 10, 'INFO': 20, 'WARNING': 30, 'ERROR': 40, 'CRITICAL': 50}
def build_kwargs(props):
kwargs = {}
for prop in props or []:
k, v = prop.split('=')
try:
v = int(v)
except ValueError:
pass
if v == 'None':
v = None
elif v == 'False':
v = False
elif v == 'True':
v = True
kwargs[k] = v
return kwargs
def run_cli(args=None):
parser = main_parser()
config = parser.parse_args(args)
if config.log_level:
logging.basicConfig(level=_LOGGING_LEVELS[config.log_level.upper()])
if config.format not in ('str', 'raw', 'full'):
raise ValueError('Unrecognized format: %s' % config.format)
logger = logging.getLogger(__name__)
kwargs = build_kwargs(config.extra_config)
consumer = KafkaConsumer(bootstrap_servers=config.bootstrap_servers, group_id=config.group, **kwargs)
consumer.subscribe(config.topics)
try:
for m in consumer:
if config.format == 'str':
print(m.value.decode(config.encoding))
elif config.format == 'full':
print(m)
else:
print(m.value)
except KeyboardInterrupt:
logger.info('Bye!')
return 0
except Exception:
logger.critical('Error!', exc_info=True)
return 1
finally:
consumer.close()