-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathdocker.py
More file actions
144 lines (124 loc) · 4.97 KB
/
docker.py
File metadata and controls
144 lines (124 loc) · 4.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import re
import socket
import logging
from functools import cache
from typing import Callable
from localstack import config
from localstack.utils.docker_utils import DOCKER_CLIENT
from localstack.extensions.api import Extension
from localstack.utils.container_utils.container_client import PortMappings
from localstack.utils.net import get_addressable_container_host
from localstack.utils.sync import retry
LOG = logging.getLogger(__name__)
logging.getLogger("localstack_paradedb").setLevel(
logging.DEBUG if config.DEBUG else logging.INFO
)
logging.basicConfig()
class DatabaseDockerContainerExtension(Extension):
"""
Utility class to create a LocalStack Extension which runs a Docker container
for a database service that uses a native protocol (e.g., PostgreSQL).
Unlike HTTP-based services, database connections are made directly to the
exposed container port rather than through the LocalStack gateway.
"""
name: str
"""Name of this extension, which must be overridden in a subclass."""
image_name: str
"""Docker image name"""
container_ports: list[int]
"""List of network ports of the Docker container spun up by the extension"""
command: list[str] | None
"""Optional command (and flags) to execute in the container."""
env_vars: dict[str, str] | None
"""Optional environment variables to pass to the container."""
health_check_port: int | None
"""Port to use for health check (defaults to first port in container_ports)."""
health_check_fn: Callable[[], bool] | None
"""Optional custom health check function."""
def __init__(
self,
image_name: str,
container_ports: list[int],
command: list[str] | None = None,
env_vars: dict[str, str] | None = None,
health_check_port: int | None = None,
health_check_fn: Callable[[], bool] | None = None,
):
self.image_name = image_name
if not container_ports:
raise ValueError("container_ports is required")
self.container_ports = container_ports
self.container_name = re.sub(r"\W", "-", f"ls-ext-{self.name}")
self.command = command
self.env_vars = env_vars
self.health_check_port = health_check_port or container_ports[0]
self.health_check_fn = health_check_fn
self.container_host = get_addressable_container_host()
def on_extension_load(self):
LOG.info("Loading ParadeDB extension")
def on_platform_start(self):
LOG.info("Starting ParadeDB extension - launching container")
self.start_container()
def on_platform_shutdown(self):
self._remove_container()
@cache
def start_container(self) -> None:
LOG.debug("Starting extension container %s", self.container_name)
port_mapping = PortMappings()
for port in self.container_ports:
port_mapping.add(port)
kwargs = {}
if self.command:
kwargs["command"] = self.command
if self.env_vars:
kwargs["env_vars"] = self.env_vars
try:
DOCKER_CLIENT.run_container(
self.image_name,
detach=True,
remove=True,
name=self.container_name,
ports=port_mapping,
**kwargs,
)
except Exception as e:
LOG.debug("Failed to start container %s: %s", self.container_name, e)
raise
def _check_health():
if self.health_check_fn:
assert self.health_check_fn()
else:
# Default: TCP socket check
self._check_tcp_port(self.container_host, self.health_check_port)
try:
retry(_check_health, retries=60, sleep=1)
except Exception as e:
LOG.info("Failed to connect to container %s: %s", self.container_name, e)
self._remove_container()
raise
LOG.info(
"Successfully started extension container %s on %s:%s",
self.container_name,
self.container_host,
self.health_check_port,
)
def _check_tcp_port(self, host: str, port: int, timeout: float = 2.0) -> None:
"""Check if a TCP port is accepting connections."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
sock.connect((host, port))
sock.close()
except (socket.timeout, socket.error) as e:
raise AssertionError(f"Port {port} not ready: {e}")
def _remove_container(self):
LOG.debug("Stopping extension container %s", self.container_name)
DOCKER_CLIENT.remove_container(
self.container_name, force=True, check_existence=False
)
def get_connection_info(self) -> dict:
"""Return connection information for the database."""
return {
"host": self.container_host,
"ports": {port: port for port in self.container_ports},
}