-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathMessage.py
More file actions
111 lines (80 loc) · 3.16 KB
/
Message.py
File metadata and controls
111 lines (80 loc) · 3.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""
Copyright (c) 2015-2016 Cisco Systems, Inc. and others. All rights reserved.
This program and the accompanying materials are made available under the
terms of the Eclipse Public License v1.0 which accompanies this distribution,
and is available at http://www.eclipse.org/legal/epl-v10.html
"""
class Message(object):
"""
Kafka Message class. This class processes header of raw Kafka messages.
"""
TYPE_PEER = "PEER"
TYPE_ROUTER = "ROUTER"
def __init__(self, data=None, parse_headers=True):
"""
Handle the message by parsing header of it.
:param data: Raw Kafka message as string.
:param parse_headers: If headers parsing is required. May be disabled to speed up.
"""
if data and not data.strip(): # If "data" is not string, throws error.
raise ValueError("Invalid data!", data)
self.version = float()
self.type = str()
self.collector_hash_id = str()
self.length = long()
self.records = long()
self.router_hash_id = str()
self.content = str()
self.content_pos = int()
self.router_ip = str()
self.__parse(data, parse_headers)
if data:
self.__parse(data, parse_headers)
def __parse(self, data, parse_headers=True):
"""
Parses header of raw Kafka messages and set the version, length, number of records and router hash id.
:param data: Raw Kafka message as string.
:param parse_headers: If headers parsing is required. May be disabled to speed up.
"""
if parse_headers:
data_end_pos = data.find("\n\n")
header_data = data[:data_end_pos]
self.content_pos = data_end_pos + 2
self.content = data[self.content_pos:]
headers = header_data.split("\n")
for header in headers:
value = header.split(":")[1].strip()
attr = header.split(":")[0].strip()
# Attribute names are from http://openbmp.org/#!docs/MESSAGE_BUS_API.md headers
if attr == "V":
self.version = float(value)
elif attr == "C_HASH_ID":
self.collector_hash_id = value
elif attr == "T":
self.type = value
elif attr == "L":
self.length = long(value)
elif attr == "R":
self.records = long(value)
elif attr == "R_HASH_ID":
self.router_hash_id = value
elif attr == "R_IP":
self.router_ip = value
else:
self.content = data
def get_version(self):
return self.version
def get_collector_hash_id(self):
return self.collector_hash_id
def get_length(self):
return self.length
def get_records(self):
return self.records
def get_router_hash_id(self):
return self.router_hash_id
def get_router_ip(self):
return self.router_ip
def get_content_pos(self):
return self.content_pos
def get_content(self):
return self.content