|
16 | 16 | import logging |
17 | 17 | import sys |
18 | 18 | import threading |
| 19 | +import time |
19 | 20 | import traceback |
20 | 21 | from unittest import mock |
21 | 22 |
|
22 | 23 | from absl.testing import absltest |
23 | 24 | from absl.testing import parameterized |
24 | 25 | from pathwaysutils.debug import watchdog |
25 | 26 |
|
| 27 | + |
26 | 28 | class WatchdogTest(parameterized.TestCase): |
| 29 | + |
| 30 | + @parameterized.parameters([ |
| 31 | + "test", |
| 32 | + "loop", |
| 33 | + "initialization", |
| 34 | + ]) |
| 35 | + def test_watchdog_name(self, watchdog_name): |
| 36 | + mock_thread_cls = self.enter_context( |
| 37 | + mock.patch.object(threading, "Thread", autospec=True) |
| 38 | + ) |
| 39 | + |
| 40 | + with watchdog.watchdog(name=watchdog_name, timeout=1): |
| 41 | + pass |
| 42 | + |
| 43 | + mock_thread_cls.assert_called_once_with(name=watchdog_name, target=mock.ANY) |
| 44 | + |
27 | 45 | def test_watchdog_start_join(self): |
28 | | - with ( |
29 | | - mock.patch.object( |
30 | | - threading.Thread, |
31 | | - "start", |
32 | | - autospec=True, |
33 | | - ) as mock_start, |
34 | | - mock.patch.object(threading.Thread, "join", autospec=True) as mock_join, |
35 | | - ): |
36 | | - with watchdog.watchdog(timeout=1): |
37 | | - mock_start.assert_called_once() |
38 | | - mock_join.assert_not_called() |
| 46 | + mock_start = self.enter_context( |
| 47 | + mock.patch.object(threading.Thread, "start", autospec=True) |
| 48 | + ) |
| 49 | + mock_join = self.enter_context( |
| 50 | + mock.patch.object(threading.Thread, "join", autospec=True) |
| 51 | + ) |
| 52 | + |
| 53 | + with watchdog.watchdog(name="test", timeout=1): |
| 54 | + mock_start.assert_called_once() |
| 55 | + mock_join.assert_not_called() |
39 | 56 |
|
40 | 57 | mock_start.assert_called_once() |
41 | 58 | mock_join.assert_called_once() |
42 | 59 |
|
43 | | - @parameterized.named_parameters([ |
44 | | - ( |
45 | | - "thread 1", |
46 | | - 1, |
47 | | - [ |
| 60 | + @parameterized.named_parameters( |
| 61 | + dict( |
| 62 | + testcase_name="thread_1", |
| 63 | + thread_ident=1, |
| 64 | + expected_log_output=[ |
48 | 65 | "DEBUG:pathwaysutils.debug.watchdog:Thread: 1", |
49 | 66 | "DEBUG:pathwaysutils.debug.watchdog:examplestack1", |
50 | 67 | ], |
51 | 68 | ), |
52 | | - ( |
53 | | - "thread 2", |
54 | | - 2, |
55 | | - [ |
| 69 | + dict( |
| 70 | + testcase_name="thread_2", |
| 71 | + thread_ident=2, |
| 72 | + expected_log_output=[ |
56 | 73 | "DEBUG:pathwaysutils.debug.watchdog:Thread: 2", |
57 | 74 | "DEBUG:pathwaysutils.debug.watchdog:examplestack2", |
58 | 75 | ], |
59 | 76 | ), |
60 | | - ( |
61 | | - "thread 3", |
62 | | - 3, |
63 | | - [ |
| 77 | + dict( |
| 78 | + testcase_name="thread_3", |
| 79 | + thread_ident=3, |
| 80 | + expected_log_output=[ |
64 | 81 | "DEBUG:pathwaysutils.debug.watchdog:Thread: 3", |
65 | 82 | "DEBUG:pathwaysutils.debug.watchdog:", |
66 | 83 | ], |
67 | 84 | ), |
68 | | - ]) |
| 85 | + ) |
69 | 86 | def test_log_thread_strack_succes(self, thread_ident, expected_log_output): |
70 | | - with ( |
| 87 | + self.enter_context( |
71 | 88 | mock.patch.object( |
72 | 89 | sys, |
73 | 90 | "_current_frames", |
74 | 91 | return_value={1: ["example", "stack1"], 2: ["example", "stack2"]}, |
75 | 92 | autospec=True, |
76 | | - ), |
| 93 | + ) |
| 94 | + ) |
| 95 | + self.enter_context( |
77 | 96 | mock.patch.object( |
78 | 97 | traceback, |
79 | 98 | "format_stack", |
80 | 99 | side_effect=lambda stack_str_list: stack_str_list, |
81 | 100 | autospec=True, |
82 | | - ), |
83 | | - ): |
84 | | - mock_thread = mock.create_autospec(threading.Thread, instance=True) |
85 | | - mock_thread.ident = thread_ident |
| 101 | + ) |
| 102 | + ) |
| 103 | + |
| 104 | + mock_thread = mock.create_autospec(threading.Thread, instance=True) |
| 105 | + mock_thread.ident = thread_ident |
86 | 106 |
|
87 | | - with self.assertLogs(watchdog._logger, logging.DEBUG) as log_output: |
88 | | - watchdog._log_thread_stack(mock_thread) |
| 107 | + with self.assertLogs(watchdog._logger, logging.DEBUG) as log_output: |
| 108 | + watchdog._log_thread_stack(mock_thread) |
89 | 109 |
|
90 | 110 | self.assertEqual(log_output.output, expected_log_output) |
91 | 111 |
|
| 112 | + @parameterized.named_parameters( |
| 113 | + dict( |
| 114 | + testcase_name="test_logs_1", |
| 115 | + name_arg="test_logs_1", |
| 116 | + timeout=1, |
| 117 | + repeat=False, |
| 118 | + expected_log_messages=[ |
| 119 | + ( |
| 120 | + "Registering 'test_logs_1' watchdog with timeout 1 seconds" |
| 121 | + " and repeat=False" |
| 122 | + ), |
| 123 | + "Deregistering 'test_logs_1' watchdog", |
| 124 | + ], |
| 125 | + ), |
| 126 | + dict( |
| 127 | + testcase_name="test_logs_2", |
| 128 | + name_arg="test_logs_2", |
| 129 | + timeout=2, |
| 130 | + repeat=True, |
| 131 | + expected_log_messages=[ |
| 132 | + ( |
| 133 | + "Registering 'test_logs_2' watchdog with timeout 2 seconds" |
| 134 | + " and repeat=True" |
| 135 | + ), |
| 136 | + "Deregistering 'test_logs_2' watchdog", |
| 137 | + ], |
| 138 | + ), |
| 139 | + ) |
| 140 | + def test_watchdog_logs( |
| 141 | + self, |
| 142 | + name_arg: str, |
| 143 | + timeout: float, |
| 144 | + repeat: bool, |
| 145 | + expected_log_messages: list[str], |
| 146 | + ): |
| 147 | + # Test registration and deregistration logs |
| 148 | + with self.assertLogs(watchdog._logger, logging.DEBUG) as log_output: |
| 149 | + with watchdog.watchdog(name=name_arg, timeout=timeout, repeat=repeat): |
| 150 | + pass |
| 151 | + |
| 152 | + output_messages = [record.getMessage() for record in log_output.records] |
| 153 | + self.assertEqual(output_messages, expected_log_messages) |
| 154 | + |
| 155 | + def test_watchdog_timeout_logs(self): |
| 156 | + self.enter_context( |
| 157 | + mock.patch.object( |
| 158 | + threading, |
| 159 | + "enumerate", |
| 160 | + return_value=[], |
| 161 | + autospec=True, |
| 162 | + ) |
| 163 | + ) |
| 164 | + |
| 165 | + # Test timeout log |
| 166 | + with self.assertLogs(watchdog._logger, logging.DEBUG) as log_output: |
| 167 | + with watchdog.watchdog(name="test_logs_timeout", timeout=0.01): |
| 168 | + time.sleep(0.02) |
| 169 | + |
| 170 | + output_messages = [record.getMessage() for record in log_output.records] |
| 171 | + self.assertIn( |
| 172 | + "Registering 'test_logs_timeout' watchdog with timeout 0.01 seconds and" |
| 173 | + " repeat=True", |
| 174 | + output_messages, |
| 175 | + ) |
| 176 | + |
| 177 | + self.assertIn( |
| 178 | + "'test_logs_timeout' watchdog thread stack dump every 0.01 seconds." |
| 179 | + " Count: 0", |
| 180 | + output_messages, |
| 181 | + ) |
| 182 | + |
92 | 183 |
|
93 | 184 | if __name__ == "__main__": |
94 | 185 | absltest.main() |
0 commit comments