-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_udp_client.py
More file actions
157 lines (121 loc) · 4.31 KB
/
test_udp_client.py
File metadata and controls
157 lines (121 loc) · 4.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import errno
import logging
import socket
import time
from collections.abc import Generator
from typing import Any
from unittest import mock
import pytest
from statsd import StatsdClient
@pytest.fixture()
def receiver_socket() -> Generator[socket.socket, None, None]:
sock = socket.socket(type=socket.SOCK_DGRAM)
sock.bind(("", 0))
sock.settimeout(0)
try:
yield sock
finally:
try:
sock.shutdown(socket.SHUT_RDWR)
sock.close()
except OSError:
pass
def _read_from_socket(sock: socket.socket) -> str:
# TODO: Fix this. There's a case where the test code races the sockets and
# by the time we call this, the socket has not received the packets.
# Sleeping solves it for now, but it's not ideal.
time.sleep(0.001)
try:
data = sock.recv(4096)
except OSError as err:
if err.errno == errno.EAGAIN:
return ""
raise
else:
return data.decode("ascii")
def test_no_buffering_sends_immediately(receiver_socket: socket.socket) -> None:
host, port = receiver_socket.getsockname()
client = StatsdClient(host=host, port=port, max_buffer_size=0)
client.increment("foo", 1)
assert _read_from_socket(receiver_socket) == "foo:1|c"
assert _read_from_socket(receiver_socket) == ""
client.increment("foo", 2)
assert _read_from_socket(receiver_socket) == "foo:2|c"
def test_buffering(receiver_socket: socket.socket) -> None:
host, port = receiver_socket.getsockname()
client = StatsdClient(host=host, port=port, max_buffer_size=36)
# None of these should lead to a send.
client.increment("foo", 1)
client.increment("foo", 2)
client.increment("foo", 3)
client.increment("foo", 4)
assert _read_from_socket(receiver_socket) == ""
# At this point we've gone over the buffer size, we should see a flush and
# start a new buffer.
client.increment("foo", 5)
assert (
_read_from_socket(receiver_socket)
== "foo:1|c\nfoo:2|c\nfoo:3|c\nfoo:4|c"
)
assert _read_from_socket(receiver_socket) == ""
# This should flush the remaining entry.
client._close()
assert _read_from_socket(receiver_socket) == "foo:5|c"
def test_broken_pipe(receiver_socket: socket.socket, caplog: Any) -> None:
host, port = receiver_socket.getsockname()
client = StatsdClient(host=host, port=port, max_buffer_size=0)
with (
mock.patch("socket.socket.send", side_effect=[2]),
caplog.at_level(
logging.WARNING,
),
):
# Should not raise.
client.increment("foo", 1)
assert len(caplog.records) == 1
assert "Broken pipe" in caplog.text
def test_socket_errors_are_logged_not_raised(
receiver_socket: socket.socket,
caplog: Any,
) -> None:
host, port = receiver_socket.getsockname()
client = StatsdClient(host=host, port=port, max_buffer_size=0)
with (
mock.patch(
"socket.socket.send",
side_effect=[OSError("Broken socket")],
),
caplog.at_level(logging.WARNING),
):
# Should not raise.
client.increment("foo", 1)
assert len(caplog.records) == 1
assert "Error sending packet" in caplog.text
def test_unexpected_exceptions_are_logged_not_raised(
receiver_socket: socket.socket,
caplog: Any,
) -> None:
host, port = receiver_socket.getsockname()
client = StatsdClient(host=host, port=port, max_buffer_size=0)
with (
mock.patch(
"socket.socket.send",
side_effect=[ValueError("Random error")],
),
caplog.at_level(logging.ERROR),
):
# Should not raise.
client.increment("foo", 1)
assert len(caplog.records) == 1
assert "Traceback (most recent call last)" in caplog.text
assert "ValueError: Random error" in caplog.text
def test_close_before_anything_happened(receiver_socket: socket.socket) -> None:
host, port = receiver_socket.getsockname()
client = StatsdClient(host=host, port=port, max_buffer_size=0)
client._close()
def test_call_after_close_raises(receiver_socket: socket.socket) -> None:
host, port = receiver_socket.getsockname()
client = StatsdClient(host=host, port=port, max_buffer_size=0)
client._close()
with pytest.raises(RuntimeError):
client.increment("foo", 1)