diff --git a/.github/workflows/release-debian.yml b/.github/workflows/disabled/release-debian.yml similarity index 100% rename from .github/workflows/release-debian.yml rename to .github/workflows/disabled/release-debian.yml diff --git a/.github/workflows/tests-codecheck.yml b/.github/workflows/tests-codecheck.yml index 9ce80f9..deb5477 100644 --- a/.github/workflows/tests-codecheck.yml +++ b/.github/workflows/tests-codecheck.yml @@ -28,7 +28,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.8, 3.9, '3.10'] + python-version: ['3.10', 3.11, 3.12, 3.13, 3.14] steps: - name: Checkout repo uses: actions/checkout@v5 diff --git a/.github/workflows/tests-unit.yml b/.github/workflows/tests-unit.yml index a101f68..df3f667 100644 --- a/.github/workflows/tests-unit.yml +++ b/.github/workflows/tests-unit.yml @@ -11,7 +11,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.8, 3.9, '3.10', 3.11] + python-version: ['3.10', 3.11, 3.12, 3.13, 3.14] steps: - name: Checkout repo uses: actions/checkout@v5 diff --git a/beka/peering.py b/beka/peering.py index 9170d0d..f00ced9 100644 --- a/beka/peering.py +++ b/beka/peering.py @@ -1,14 +1,15 @@ +import threading import time -from eventlet import sleep, GreenPool -from eventlet.queue import Queue -import eventlet.greenthread as greenthread - from .chopper import Chopper from .event import EventTimerExpired, EventMessageReceived from .bgp_message import BgpMessageParser, BgpMessagePacker from .error import SocketClosedError, IdleError +# Sentinel placed on output queues during shutdown to wake any thread +# blocked in ``Queue.get()`` so it can observe ``_stop_event`` and exit. +_QUEUE_POISON = object() + class Peering(object): def __init__( @@ -16,8 +17,7 @@ def __init__( ): self.input_stream = None self.chopper = None - self.pool = None - self.eventlets = None + self.threads = None self.parser = None self.packer = None self.state_machine = state_machine @@ -27,6 +27,7 @@ def __init__( self.route_handler = route_handler self.error_handler = error_handler self.start_time = int(time.time()) + self._stop_event = threading.Event() def uptime(self): return int(time.time()) - self.start_time @@ -34,30 +35,34 @@ def uptime(self): def run(self): self.input_stream = self.socket.makefile(mode="rb") self.chopper = Chopper(self.input_stream) - self.pool = GreenPool() self.parser = BgpMessageParser() self.packer = BgpMessagePacker() self.state_machine.open_handler = self.open_handler - self.eventlets = [] - - self.eventlets.append(self.pool.spawn(self.send_messages)) - self.eventlets.append(self.pool.spawn(self.print_route_updates)) - self.eventlets.append(self.pool.spawn(self.kick_timers)) - self.eventlets.append(self.pool.spawn(self.receive_messages)) - self.pool.waitall() + targets = ( + self.send_messages, + self.print_route_updates, + self.kick_timers, + self.receive_messages, + ) + self.threads = [ + threading.Thread(target=t, name=t.__name__, daemon=True) for t in targets + ] + for thread in self.threads: + thread.start() + for thread in self.threads: + thread.join() def open_handler(self, capabilities): self.parser.capabilities = capabilities self.packer.capabilities = capabilities def receive_messages(self): - while True: - sleep(0) + while not self._stop_event.is_set(): try: message_type, serialised_message = self.chopper.next() except SocketClosedError as e: - if self.error_handler: + if self.error_handler and not self._stop_event.is_set(): self.error_handler("Peering %s: %s" % (self.peer_address, e)) self.shutdown() break @@ -73,25 +78,30 @@ def receive_messages(self): break def send_messages(self): - while True: - sleep(0) + while not self._stop_event.is_set(): message = self.state_machine.output_messages.get() + if message is _QUEUE_POISON: + break self.socket.send(self.packer.pack(message)) def empty_message_queue(self): while self.state_machine.output_messages.qsize(): message = self.state_machine.output_messages.get() + if message is _QUEUE_POISON: + continue self.socket.send(self.packer.pack(message)) def print_route_updates(self): - while True: - sleep(0) + while not self._stop_event.is_set(): route_update = self.state_machine.route_updates.get() + if route_update is _QUEUE_POISON: + break self.route_handler(route_update) def kick_timers(self): - while True: - sleep(1) + # ``Event.wait(timeout)`` returns True as soon as the event is set, + # giving prompt shutdown without burning CPU between ticks. + while not self._stop_event.wait(timeout=1): tick = int(time.time()) try: self.state_machine.event(EventTimerExpired(), tick) @@ -102,6 +112,16 @@ def kick_timers(self): break def shutdown(self): + if self._stop_event.is_set(): + return + self._stop_event.set() self.empty_message_queue() - for eventlet in self.eventlets: - eventlet.kill() + # Unblock any thread parked in ``Queue.get()``. Each consumer + # checks ``_stop_event`` after waking and exits. + self.state_machine.output_messages.put(_QUEUE_POISON) + self.state_machine.route_updates.put(_QUEUE_POISON) + # Force ``chopper.next()``'s underlying recv to return. + try: + self.socket.shutdown(2) # socket.SHUT_RDWR + except OSError: + pass diff --git a/beka/state_machine.py b/beka/state_machine.py index d1f7da4..b7c860d 100644 --- a/beka/state_machine.py +++ b/beka/state_machine.py @@ -1,4 +1,4 @@ -from eventlet.queue import Queue +from queue import Queue from collections import OrderedDict from .event import Event diff --git a/beka/stream_server.py b/beka/stream_server.py index 56a1d04..60fd907 100644 --- a/beka/stream_server.py +++ b/beka/stream_server.py @@ -1,16 +1,20 @@ import socket - -from eventlet import GreenPool, listen -import eventlet.greenthread as greenthread +import threading +from concurrent.futures import ThreadPoolExecutor class StreamServer: + """Listen on ``address`` and dispatch each accepted socket to ``handler`` in its own thread.""" + + DEFAULT_MAX_HANDLERS = 64 + def __init__(self, address, handler): self.address = address self.handler = handler - self.greenlets = set() self.running = False self.server = None + self._executor = None + self._accept_thread = None def _family(self): if ":" in self.address[0]: @@ -19,24 +23,35 @@ def _family(self): def serve_forever(self): self.running = True - self.server = listen(self.address, self._family()) - pool = GreenPool() - + self.server = socket.socket(self._family(), socket.SOCK_STREAM) + self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server.bind(self.address) + self.server.listen() + self._executor = ThreadPoolExecutor( + max_workers=self.DEFAULT_MAX_HANDLERS, + thread_name_prefix="beka-handler", + ) + self._accept_thread = threading.current_thread() try: while self.running: - sock, address = self.server.accept() - pool.spawn(self.call_handler, sock, address) - self.greenlets - except OSError: - pass - - def call_handler(self, sock, address): - self.greenlets.add(greenthread.getcurrent()) - self.handler(sock, address) - self.greenlets.remove(greenthread.getcurrent()) + try: + sock, address = self.server.accept() + except OSError: + # accept() raises after stop() shuts the listening socket + break + self._executor.submit(self.handler, sock, address) + finally: + self._executor.shutdown(wait=False) + self._executor = None def stop(self): self.running = False - for greenlet in self.greenlets: - greenlet.kill() - self.server.shutdown(socket.SHUT_RDWR) + if self.server is not None: + try: + self.server.shutdown(socket.SHUT_RDWR) + except OSError: + pass + try: + self.server.close() + except OSError: + pass diff --git a/requirements.txt b/requirements.txt index 6dad793..5581c21 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1 @@ -eventlet>=0.33.3 pbr>=1.9 diff --git a/run.py b/run.py index 5025d68..18f3a4f 100644 --- a/run.py +++ b/run.py @@ -1,9 +1,8 @@ import signal import sys +import threading import yaml -from eventlet import GreenPool - from beka.beka import Beka @@ -20,7 +19,7 @@ def __init__(self): def run(self): signal.signal(signal.SIGINT, self.signal_handler) - pool = GreenPool() + threads = [] with open("beka.yaml", encoding="utf-8") as file: config = yaml.safe_load(file.read()) @@ -46,9 +45,14 @@ def run(self): for route in router["routes"]: beka.add_route(route["prefix"], route["next_hop"]) self.bekas.append(beka) - pool.spawn_n(beka.run) - pool.waitall() - printmsg("All greenlets gone, exiting") + thread = threading.Thread( + target=beka.run, name="beka-%s" % router["local_address"] + ) + thread.start() + threads.append(thread) + for thread in threads: + thread.join() + printmsg("All threads gone, exiting") def signal_handler(self, _signal, _frame): printmsg("[SIGINT] Shutting down") diff --git a/setup.cfg b/setup.cfg index f4b2556..3d130b5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [metadata] -name = beka +name = c65beka summary = A bare-bones BGP speaker long_description = Beka is a fairly basic BGP speaker. It can send @@ -7,14 +7,11 @@ long_description = but not too much else. It is designed to be simple to use and to extend, without too much overhead. - It uses eventlet for concurrency, but is easy enough to port to - gevent if that takes your fancy. - - More information at https://github.com/faucetsdn/beka + More information at https://github.com/c65sdn/beka license = Apache-2 author = Sam Russell author-email = sam.h.russell@gmail.com -home-page = https://github.com/faucetsdn/beka +home-page = https://github.com/c65sdn/beka classifiers = Development Status :: 2 - Pre-Alpha Intended Audience :: System Administrators diff --git a/setup.py b/setup.py index 3d901d3..95762bb 100755 --- a/setup.py +++ b/setup.py @@ -6,11 +6,11 @@ from setuptools import setup -if sys.version_info < (3,): +if sys.version_info < (3, 10): print( """You are trying to install beka on python {py} -beka is not compatible with python 2, please upgrade to python 3.8 or newer.""".format( +beka is not compatible with python earlier than 3.10, please upgrade.""".format( py=".".join([str(v) for v in sys.version_info[:3]]) ), file=sys.stderr, @@ -20,6 +20,6 @@ setup( name="beka", setup_requires=["pbr>=1.9", "setuptools>=17.1"], - python_requires=">=3.8", + python_requires=">=3.10", pbr=True, ) diff --git a/test/unit/test_peering.py b/test/unit/test_peering.py index 09d6208..0a4d0dd 100644 --- a/test/unit/test_peering.py +++ b/test/unit/test_peering.py @@ -1,9 +1,9 @@ +import threading +import time import unittest +from queue import Queue from unittest.mock import patch, call -from eventlet import GreenPool, sleep -from eventlet.queue import Queue - from beka.peering import Peering @@ -26,19 +26,16 @@ def __init__(self): class FakeSocket: # pylint: disable=too-few-public-methods """Mocked Socket""" - def __init__(self): - pass - def makefile(self, *args, **kwargs): # pylint: disable=unused-argument return None + def shutdown(self, _how): # pragma: no cover - exercised via Peering.shutdown + pass + class FakeChopper: # pylint: disable=too-few-public-methods """Mocked Chopper""" - def __init__(self): - pass - class PeeringTestCase(unittest.TestCase): def setUp(self): @@ -55,25 +52,40 @@ def setUp(self): def test_print_route_updates(self): fake_route_update = "FAKE ROUTE UPDATE" self.state_machine.route_updates.put(fake_route_update) - pool = GreenPool() - eventlet = pool.spawn(self.peering.print_route_updates) - for _ in range(10): - sleep(0) - if self.route_catcher.route_updates: - break + thread = threading.Thread(target=self.peering.print_route_updates, daemon=True) + thread.start() + deadline = time.monotonic() + 1.0 + while not self.route_catcher.route_updates and time.monotonic() < deadline: + time.sleep(0.01) self.assertEqual(len(self.route_catcher.route_updates), 1) self.assertEqual(self.route_catcher.route_updates[0], fake_route_update) - eventlet.kill() + # Cooperative shutdown unblocks the consumer's Queue.get(). + self.peering.shutdown() + thread.join(timeout=1) + self.assertFalse(thread.is_alive()) def test_run_starts_threads(self): - with patch("beka.peering.GreenPool") as GreenPool: + with patch("beka.peering.threading.Thread") as Thread: self.peering.run() - GreenPool().spawn.assert_has_calls( + Thread.assert_has_calls( [ - call(self.peering.send_messages), - call(self.peering.print_route_updates), - call(self.peering.kick_timers), - call(self.peering.receive_messages), - ] + call( + target=self.peering.send_messages, name="send_messages", daemon=True + ), + call( + target=self.peering.print_route_updates, + name="print_route_updates", + daemon=True, + ), + call(target=self.peering.kick_timers, name="kick_timers", daemon=True), + call( + target=self.peering.receive_messages, + name="receive_messages", + daemon=True, + ), + ], + any_order=False, ) - assert GreenPool().waitall.call_count == 1 + # Each thread should be started and then joined. + self.assertEqual(Thread.return_value.start.call_count, 4) + self.assertEqual(Thread.return_value.join.call_count, 4)