Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions .github/workflows/lint-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:
cache-dependency-glob: "uv.lock"

- name: Install project
run: uv sync --group test
run: uv sync --group test --extra redis

- name: Run unit tests
run: COVERAGE_FILE=.coverage.py${{ matrix.python_version }}.unit uv run coverage run -m pytest ./tests/unit/
Expand Down Expand Up @@ -133,17 +133,18 @@ jobs:
cache-dependency-glob: "uv.lock"

- name: Install project
run: uv sync --group test
run: uv sync --group test --extra redis

- name: Run backing services
run: |
docker compose up postgres rabbitmq -d
docker compose up -d
sleep 10 # Wait for services to start

- name: Run integration tests
run: COVERAGE_FILE=.coverage.py${{ matrix.python_version }}.integration uv run coverage run -m pytest ./tests/integration/ -m "not tuner"
env:
RABBITMQ_URL: amqp://user:password@localhost:5672/
REDIS_URL: redis://default:password@localhost:6379/
RAY_ENABLE_UV_RUN_RUNTIME_ENV: 0
PLUGBOARD_IO_READ_TIMEOUT: 5.0

Expand Down Expand Up @@ -177,12 +178,13 @@ jobs:
cache-dependency-glob: "uv.lock"

- name: Install project
run: uv sync --group test
run: uv sync --group test --extra redis

- name: Run tuner tests
run: COVERAGE_FILE=.coverage.py${{ matrix.python_version }}.integration.tuner uv run coverage run -m pytest ./tests/integration/ -m "tuner"
env:
RABBITMQ_URL: amqp://user:password@localhost:5672/
REDIS_URL: redis://default:password@localhost:6379/
RAY_ENABLE_UV_RUN_RUNTIME_ENV: 0
PLUGBOARD_IO_READ_TIMEOUT: 5.0

Expand Down
18 changes: 10 additions & 8 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: plugboard

services:
rabbitmq:
image: rabbitmq:4.0-rc-management-alpine
image: rabbitmq:4.2-management-alpine
container_name: rabbitmq
ports:
- 5672:5672
Expand All @@ -17,7 +17,7 @@ services:
restart: always

postgres:
image: postgres:18-alpine
image: postgres:18.2-alpine
container_name: postgres
environment:
- POSTGRES_USER=plugboard
Expand All @@ -31,14 +31,16 @@ services:
- main
restart: always

valkey:
image: valkey/valkey:8.0-alpine
container_name: valkey
command: valkey-server --dir /var/lib/valkey --bind 0.0.0.0 -::1 --protected-mode no
redis:
image: redis:8.6-alpine
container_name: redis
environment:
- REDIS_PASSWORD=password
command: redis-server --bind 0.0.0.0 --requirepass password
ports:
- 6379:6379
volumes:
- valkey-data:/var/lib/valkey
- redis-data:/data
networks:
- main
restart: always
Expand All @@ -50,4 +52,4 @@ networks:
volumes:
rabbitmq-data:
postgres-data:
valkey-data:
redis-data:
2 changes: 2 additions & 0 deletions docs/api/connector/connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@
- RabbitMQChannel
- RayConnector
- RayChannel
- RedisConnector
- RedisChannel
- ZMQConnector
- ZMQChannel
1 change: 1 addition & 0 deletions docs/usage/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Plugboard can make use of a message broker for data exchange between components
| Option Name | Description | Default Value |
|----------------------------|----------------------------------------------------------|---------------|
| `RABBITMQ_URL` | URL for RabbitMQ AMQP message broker (must include credentials if required) | |
| `REDIS_URL` | URL for Redis message broker (must include credentials if required) | |

## Job ID

Expand Down
3 changes: 3 additions & 0 deletions plugboard/connector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from plugboard.connector.connector_builder import ConnectorBuilder
from plugboard.connector.rabbitmq_channel import RabbitMQChannel, RabbitMQConnector
from plugboard.connector.ray_channel import RayChannel, RayConnector
from plugboard.connector.redis_channel import RedisChannel, RedisConnector
from plugboard.connector.serde_channel import SerdeChannel
from plugboard.connector.zmq_channel import ZMQChannel, ZMQConnector

Expand All @@ -20,6 +21,8 @@
"RabbitMQConnector",
"RayChannel",
"RayConnector",
"RedisChannel",
"RedisConnector",
"SerdeChannel",
"ZMQChannel",
"ZMQConnector",
Expand Down
12 changes: 10 additions & 2 deletions plugboard/connector/rabbitmq_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,13 @@ def __setstate__(self, state: dict) -> None:

@inject
async def connect_send(
self, rabbitmq_conn: AbstractRobustConnection = Provide[DI.rabbitmq_conn]
self, rabbitmq_conn: AbstractRobustConnection | None = Provide[DI.rabbitmq_conn]
) -> RabbitMQChannel:
"""Returns a `RabbitMQ` channel for sending messages."""
if rabbitmq_conn is None:
raise RuntimeError(
"RabbitMQ connection not available. Ensure RabbitMQ URL is configured."
)
async with self._send_channel_lock:
if self._send_channel is not None:
return self._send_channel
Expand All @@ -150,9 +154,13 @@ async def connect_send(

@inject
async def connect_recv(
self, rabbitmq_conn: AbstractRobustConnection = Provide[DI.rabbitmq_conn]
self, rabbitmq_conn: AbstractRobustConnection | None = Provide[DI.rabbitmq_conn]
) -> RabbitMQChannel:
"""Returns a `RabbitMQ` channel for receiving messages."""
if rabbitmq_conn is None:
raise RuntimeError(
"RabbitMQ connection not available. Ensure RabbitMQ URL is configured."
)
cm = self._recv_channel_lock if self.spec.mode != ConnectorMode.PUBSUB else nullcontext()
async with cm:
if self._recv_channel is not None:
Expand Down
189 changes: 189 additions & 0 deletions plugboard/connector/redis_channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
"""Provides RedisChannel and RedisConnector."""

from __future__ import annotations

import asyncio
import typing as _t

from plugboard_schemas.connector import ConnectorMode
from that_depends import Provide, inject

from plugboard.connector.connector import Connector
from plugboard.connector.serde_channel import SerdeChannel
from plugboard.exceptions import ChannelClosedError
from plugboard.utils import DI, depends_on_optional


try:
from redis.asyncio import Redis
from redis.asyncio.client import PubSub
except ImportError:
pass


class RedisChannel(SerdeChannel):
"""`RedisChannel` for sending and receiving messages via Redis."""

@depends_on_optional("redis")
def __init__(
self,
*args: _t.Any,
send_fn: _t.Optional[_t.Callable[[bytes], _t.Awaitable[None]]] = None,
recv_fn: _t.Optional[_t.Callable[[], _t.Awaitable[bytes]]] = None,
pubsub: _t.Optional[PubSub] = None,
**kwargs: _t.Any,
) -> None:
"""Instantiates a `RedisChannel`.

Uses Redis to provide communication between components on different processes.
Requires a Redis server to be running with the URL set in the `REDIS_URL`
environment variable.

Args:
send_fn: Optional; A callable for sending messages to the Redis channel.
recv_fn: Optional; A callable for receiving messages from the Redis channel.
pubsub: Optional; The Redis `PubSub` instance, used in pub-sub mode.
"""
super().__init__(*args, **kwargs)
self._send_fn = send_fn
self._recv_fn = recv_fn
self._pubsub = pubsub

# Set initial state based on intended usage
self._is_send_closed = send_fn is None
self._is_recv_closed = recv_fn is None

async def send(self, msg: bytes) -> None:
"""Send a message to the Redis channel."""
if self._is_send_closed or self._send_fn is None:
raise ChannelClosedError("Channel is closed for sending")
await self._send_fn(msg)

async def recv(self) -> bytes:
"""Receive a message from the Redis channel."""
if self._is_recv_closed or self._recv_fn is None:
raise ChannelClosedError("Channel is closed for receiving")
return await self._recv_fn()

async def close(self) -> None:
"""Closes the `RedisChannel`."""
# If we are a sender, send the close message (via super().close())
if not self._is_send_closed:
await super().close()
self._is_send_closed = True

if self._pubsub is not None:
await self._pubsub.unsubscribe()
await self._pubsub.close()
self._pubsub = None

self._is_recv_closed = True


class RedisConnector(Connector):
"""`RedisConnector` connects components via Redis."""

@depends_on_optional("redis")
def __init__(self, *args: _t.Any, **kwargs: _t.Any) -> None:
"""Instantiates a `RedisConnector`.

Uses Redis to connect components via either pipeline (list-based) or pub-sub
(channel-based) mode. Requires a Redis server to be running with the URL set
in the `REDIS_URL` environment variable.
"""
super().__init__(*args, **kwargs)
self._topic: str = (
str(self.spec.source) if self.spec.mode == ConnectorMode.PUBSUB else self.spec.id
)
self._send_channel: _t.Optional[RedisChannel] = None
self._send_channel_lock = asyncio.Lock()
self._recv_channel: _t.Optional[RedisChannel] = None
self._recv_channel_lock = asyncio.Lock()

def __getstate__(self) -> dict:
state = self.__dict__.copy()
for attr in ("_send_channel", "_recv_channel", "_send_channel_lock", "_recv_channel_lock"):
if attr in state:
del state[attr]
return state

def __setstate__(self, state: dict) -> None:
self.__dict__.update(state)
self._send_channel = None
self._send_channel_lock = asyncio.Lock()
self._recv_channel = None
self._recv_channel_lock = asyncio.Lock()

@inject
async def _get_key(self, job_id: str = Provide[DI.job_id]) -> str:
return f"{job_id}.{self._topic}"

@inject
async def connect_send(
self, redis_client: Redis | None = Provide[DI.redis_client]
) -> RedisChannel:
"""Returns a `RedisChannel` for sending messages."""
if redis_client is None:
raise RuntimeError("Redis client not available. Ensure Redis URL is configured.")
async with self._send_channel_lock:
if self._send_channel is not None:
return self._send_channel

key = await self._get_key()
send_fn = self._build_send_fn(redis_client, key)
self._send_channel = RedisChannel(send_fn=send_fn)
return self._send_channel

def _build_send_fn(
self, redis_client: Redis, key: str
) -> _t.Callable[[bytes], _t.Awaitable[None]]:
if self.spec.mode == ConnectorMode.PIPELINE:

async def send_fn(msg: bytes) -> None:
await redis_client.lpush(key, msg) # type: ignore[misc]
else:

async def send_fn(msg: bytes) -> None:
await redis_client.publish(key, msg)

return send_fn

@inject
async def connect_recv(
self, redis_client: Redis | None = Provide[DI.redis_client]
) -> RedisChannel:
"""Returns a `RedisChannel` for receiving messages."""
if redis_client is None:
raise RuntimeError("Redis client not available. Ensure Redis URL is configured.")
key = await self._get_key()
if self.spec.mode == ConnectorMode.PIPELINE:
async with self._recv_channel_lock:
if self._recv_channel is not None:
return self._recv_channel
recv_fn = self._build_recv_fn(redis_client, key)
channel = RedisChannel(recv_fn=recv_fn)
self._recv_channel = channel
else: # ConnectorMode.PUBSUB
pubsub = redis_client.pubsub()
await pubsub.subscribe(key)
recv_fn = self._build_recv_fn(redis_client, key, pubsub=pubsub)
channel = RedisChannel(recv_fn=recv_fn, pubsub=pubsub)
return channel

def _build_recv_fn(
self, redis_client: Redis, key: str, pubsub: _t.Optional[PubSub] = None
) -> _t.Callable[[], _t.Awaitable[bytes]]:
if self.spec.mode == ConnectorMode.PIPELINE:

async def recv_fn() -> bytes:
result = await redis_client.brpop([key], timeout=None) # type: ignore[misc]
return result[1]
else:
if pubsub is None:
raise ValueError("PubSub instance required for PUBSUB mode")

async def recv_fn() -> bytes:
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=None)
return message["data"]

return recv_fn
Loading
Loading