Skip to content

Commit 42107c5

Browse files
author
Patrick J. McNerthney
committed
Implement ConfigMap and Secret based python packages.
1 parent fb6f050 commit 42107c5

File tree

8 files changed

+267
-41
lines changed

8 files changed

+267
-41
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,4 @@ WORKDIR /
4040
USER nonroot:nonroot
4141
COPY --from=build --chown=nonroot:nonroot /venv/fn /venv/fn
4242
EXPOSE 9443
43-
ENTRYPOINT ["/venv/fn/bin/pythonic"]
43+
ENTRYPOINT ["/venv/fn/bin/python", "-m", "crossplane.pythonic.main"]

crossplane/pythonic/function.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@ def __init__(self, debug=False):
3232
self.logger = crossplane.function.logging.get_logger()
3333
self.clazzes = {}
3434

35+
def invalidate_module(self, module):
36+
if module in sys.modules:
37+
del sys.modules[module]
38+
for composite in [composite for composite in self.clazzes.keys()]:
39+
if '\n' not in composite:
40+
if composite.rsplit('.', 1)[0] == module:
41+
del self.clazzes[composite]
42+
3543
async def RunFunction(
3644
self, request: fnv1.RunFunctionRequest, _: grpc.aio.ServicerContext
3745
) -> fnv1.RunFunctionResponse:
@@ -70,52 +78,52 @@ async def RunFunction(
7078
try:
7179
exec(composite, module.__dict__)
7280
except Exception as e:
73-
crossplane.function.response.fatal(response, f"Exec exception: {e}")
7481
logger.exception('Exec exception')
82+
crossplane.function.response.fatal(response, f"Exec exception: {e}")
7583
return response
7684
composite = ['<script>', 'Composite']
7785
else:
7886
composite = composite.rsplit('.', 1)
7987
if len(composite) == 1:
80-
crossplane.function.response.fatal(response, f"Composite class name does not include module: {composite[0]}")
8188
logger.error(f"Composite class name does not include module: {composite[0]}")
89+
crossplane.function.response.fatal(response, f"Composite class name does not include module: {composite[0]}")
8290
return response
8391
try:
8492
module = importlib.import_module(composite[0])
8593
except Exception as e:
94+
logger.error(str(e))
8695
crossplane.function.response.fatal(response, f"Import module exception: {e}")
87-
logger.exception('Import module exception')
8896
return response
8997
clazz = getattr(module, composite[1], None)
9098
if not clazz:
91-
crossplane.function.response.fatal(response, f"{composite[0]} did not define: {composite[1]}")
9299
logger.error(f"{composite[0]} did not define: {composite[1]}")
100+
crossplane.function.response.fatal(response, f"{composite[0]} did not define: {composite[1]}")
93101
return response
94102
composite = '.'.join(composite)
95103
if not inspect.isclass(clazz):
96-
crossplane.function.response.fatal(response, f"{composite} is not a class")
97104
logger.error(f"{composite} is not a class")
105+
crossplane.function.response.fatal(response, f"{composite} is not a class")
98106
return response
99107
if not issubclass(clazz, BaseComposite):
100-
crossplane.function.response.fatal(response, f"{composite} is not a subclass of BaseComposite")
101108
logger.error(f"{composite} is not a subclass of BaseComposite")
109+
crossplane.function.response.fatal(response, f"{composite} is not a subclass of BaseComposite")
102110
return response
103111
self.clazzes[composite] = clazz
104112

105113
try:
106114
composite = clazz(request, response, logger)
107115
except Exception as e:
108-
crossplane.function.response.fatal(response, f"Instatiate exception: {e}")
109116
logger.exception('Instatiate exception')
117+
crossplane.function.response.fatal(response, f"Instatiate exception: {e}")
110118
return response
111119

112120
try:
113121
result = composite.compose()
114122
if asyncio.iscoroutine(result):
115123
await result
116124
except Exception as e:
117-
crossplane.function.response.fatal(response, f"Compose exception: {e}")
118125
logger.exception('Compose exception')
126+
crossplane.function.response.fatal(response, f"Compose exception: {e}")
119127
return response
120128

121129
unknownResources = []

crossplane/pythonic/main.py

Lines changed: 80 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
11
"""The composition function's main CLI."""
22

3-
import warnings
4-
warnings.filterwarnings('ignore', module='^google[.]protobuf[.]runtime_version$', lineno=98)
5-
63
import argparse
4+
import asyncio
75
import os
6+
import pathlib
87
import shlex
8+
import signal
99
import sys
10+
import traceback
11+
12+
import crossplane.function.logging
13+
import crossplane.function.proto.v1.run_function_pb2_grpc as grpcv1
14+
import grpc
1015
import pip._internal.cli.main
11-
from crossplane.function import logging, runtime
1216

1317
from . import function
1418

1519

16-
def main():
20+
async def main():
1721
parser = argparse.ArgumentParser('Forta Crossplane Function')
1822
parser.add_argument(
1923
'--debug', '-d',
@@ -27,20 +31,33 @@ def main():
2731
)
2832
parser.add_argument(
2933
'--tls-certs-dir',
34+
default=os.getenv('TLS_SERVER_CERTS_DIR'),
3035
help='Serve using mTLS certificates.',
3136
)
3237
parser.add_argument(
3338
'--insecure',
3439
action='store_true',
3540
help='Run without mTLS credentials. If you supply this flag --tls-certs-dir will be ignored.',
3641
)
42+
parser.add_argument(
43+
'--packages',
44+
action='store_true',
45+
help='Discover python packages from function-pythonic ConfigMaps and Secrets.'
46+
)
47+
parser.add_argument(
48+
'--packages-namespace',
49+
action='append',
50+
default=[],
51+
help='Namespaces to discover function-pythonic ConfigMaps and Secrets in, default is cluster wide.',
52+
)
3753
parser.add_argument(
3854
'--pip-install',
3955
help='Pip install command to install additional Python packages.'
4056
)
4157
parser.add_argument(
4258
'--python-path',
4359
action='append',
60+
default=[],
4461
help='Filing system directories to add to the python path',
4562
)
4663
parser.add_argument(
@@ -49,33 +66,76 @@ def main():
4966
help='Allow oversized protobuf messages'
5067
)
5168
args = parser.parse_args()
52-
if not args.tls_certs_dir:
53-
args.tls_certs_dir = os.getenv('TLS_SERVER_CERTS_DIR')
69+
70+
if args.debug:
71+
crossplane.function.logging.configure(crossplane.function.logging.Level.DEBUG)
72+
else:
73+
crossplane.function.logging.configure(crossplane.function.logging.Level.INFO)
5474

5575
if args.pip_install:
5676
pip._internal.cli.main.main(['install', *shlex.split(args.pip_install)])
5777

58-
if args.python_path:
59-
for path in reversed(args.python_path):
60-
sys.path.insert(0, path)
78+
for path in reversed(args.python_path):
79+
sys.path.insert(0, path)
6180

6281
if args.allow_oversize_protos:
6382
from google.protobuf.internal import api_implementation
6483
if api_implementation._c_module:
6584
api_implementation._c_module.SetAllowOversizeProtos(True)
6685

67-
logging.configure(logging.Level.DEBUG if args.debug else logging.Level.INFO)
68-
runtime.serve(
69-
function.FunctionRunner(args.debug),
70-
args.address,
71-
creds=runtime.load_credentials(args.tls_certs_dir),
72-
insecure=args.insecure,
73-
)
86+
grpc.aio.init_grpc_aio()
87+
grpc_runner = function.FunctionRunner(args.debug)
88+
grpc_server = grpc.aio.server()
89+
grpcv1.add_FunctionRunnerServiceServicer_to_server(grpc_runner, grpc_server)
90+
if args.tls_certs_dir:
91+
certs = pathlib.Path(args.tls_certs_dir)
92+
grpc_server.add_secure_port(
93+
args.address,
94+
grpc.ssl_server_credentials(
95+
private_key_certificate_chain_pairs=[(
96+
(certs / 'tls.key').read_bytes(),
97+
(certs / 'tls.crt').read_bytes(),
98+
)],
99+
root_certificates=(certs / 'ca.crt').read_bytes(),
100+
require_client_auth=True,
101+
),
102+
)
103+
else:
104+
if not args.insecure:
105+
raise ValueError('Either --tls-certs-dir or --insecure must be specified')
106+
grpc_server.add_insecure_port(args.address)
107+
await grpc_server.start()
108+
109+
if args.packages:
110+
import kopf._core.actions.loggers
111+
import kopf._core.reactor.running
112+
from . import packages
113+
sys.path.insert(0, str(packages.PACKAGES_DIR))
114+
sys.dont_write_bytecode = True
115+
packages.register_grpc_runner(grpc_runner)
116+
kopf._core.actions.loggers.configure()
117+
@kopf.on.cleanup()
118+
async def stop(logger=None, **_):
119+
await grpc_server.stop(5)
120+
async with asyncio.TaskGroup() as tasks:
121+
tasks.create_task(grpc_server.wait_for_termination())
122+
tasks.create_task(kopf._core.reactor.running.operator(
123+
standalone=True,
124+
clusterwide=not args.packages_namespace,
125+
namespaces=args.packages_namespace,
126+
))
127+
else:
128+
def stop():
129+
asyncio.ensure_future(grpc_server.stop(5))
130+
loop = asyncio.get_event_loop()
131+
loop.add_signal_handler(signal.SIGINT, stop)
132+
loop.add_signal_handler(signal.SIGTERM, stop)
133+
await grpc_server.wait_for_termination()
74134

75135

76136
if __name__ == "__main__":
77137
try:
78-
main()
79-
except Exception as e:
80-
print(f"Exception running main: {e}", file=sys.stderr)
138+
asyncio.run(main())
139+
except:
140+
print(traceback.format_exc())
81141
sys.exit(1)

crossplane/pythonic/packages.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
2+
import base64
3+
import importlib
4+
import pathlib
5+
import sys
6+
7+
import kopf
8+
9+
10+
PACKAGES_DIR = pathlib.Path(sys.prefix) / 'lib' / 'function-pythonic'
11+
GRPC_RUNNER = None
12+
13+
14+
@kopf.on.create('', 'v1', 'configmaps', labels={'function-pythonic.package': kopf.PRESENT})
15+
@kopf.on.resume('', 'v1', 'configmaps', labels={'function-pythonic.package': kopf.PRESENT})
16+
@kopf.on.create('', 'v1', 'secrets', labels={'function-pythonic.package': kopf.PRESENT})
17+
@kopf.on.resume('', 'v1', 'secrets', labels={'function-pythonic.package': kopf.PRESENT})
18+
async def create(body, logger, **_):
19+
package_dir, package = get_package_dir(body)
20+
if package_dir:
21+
package_dir.mkdir(parents=True, exist_ok=True)
22+
secret = body['kind'] == 'Secret'
23+
invalidate = False
24+
for name, text in body.get('data', {}).items():
25+
package_file = package_dir / name
26+
if secret:
27+
package_file.write_bytes(base64.b64decode(text.encode('utf-8')))
28+
else:
29+
package_file.write_text(text)
30+
if package_file.suffixes == ['.py']:
31+
logger.info(f"Created module: {'.'.join(package + [package_file.stem])}")
32+
invalidate = True
33+
else:
34+
logger.info(f"Created file: {'/'.join(package + [name])}")
35+
if invalidate:
36+
importlib.invalidate_caches()
37+
38+
39+
@kopf.on.update('', 'v1', 'configmaps', labels={'function-pythonic.package': kopf.PRESENT})
40+
@kopf.on.update('', 'v1', 'secrets', labels={'function-pythonic.package': kopf.PRESENT})
41+
async def update(body, old, logger, **_):
42+
old_package_dir, old_package = get_package_dir(old)
43+
if old_package_dir:
44+
old_data = old.get('data', {})
45+
else:
46+
old_data = {}
47+
old_names = set(old_data.keys())
48+
package_dir, package = get_package_dir(body, logger)
49+
if package_dir:
50+
package_dir.mkdir(parents=True, exist_ok=True)
51+
secret = body['kind'] == 'Secret'
52+
invalidate = False
53+
for name, text in body.get('data', {}).items():
54+
if package_dir == old_package_dir and text == old_data.get(name, None):
55+
old_names.discard(name)
56+
else:
57+
package_file = package_dir / name
58+
if secret:
59+
package_file.write_bytes(base64.b64decode(text.encode('utf-8')))
60+
else:
61+
package_file.write_text(text)
62+
if package_file.suffixes == ['.py']:
63+
module = '.'.join(package + [package_file.stem])
64+
if name in old_names:
65+
GRPC_RUNNER.invalidate_module(module)
66+
old_names.discard(name)
67+
logger.info(f"Updated module: {module}")
68+
invalidate = True
69+
else:
70+
logger.info(f"Updated file: {'/'.join(old_package + [name])}")
71+
if invalidate:
72+
importlib.invalidate_caches()
73+
if old_package_dir:
74+
for name in old_names:
75+
package_file = old_package_dir / name
76+
package_file.unlink(missing_ok=True)
77+
if package_file.suffixes == ['.py']:
78+
module = '.'.join(old_package + [package_file.stem])
79+
GRPC_RUNNER.invalidate_module(module)
80+
logger.info(f"Removed module: {module}")
81+
else:
82+
logger.info(f"Removed file: {'/'.join(old_package + [name])}")
83+
logger.info('update deleted: %s', package_file)
84+
while old_package and old_package_dir.is_dir() and not list(old_package_dir.iterdir()):
85+
old_package_dir.rmdir()
86+
module = '.'.join(old_package)
87+
GRPC_RUNNER.invalidate_module(module)
88+
logger.info(f"Removed package: {module}")
89+
old_package_dir = old_package_dir.parent
90+
old_package.pop()
91+
92+
93+
@kopf.on.delete('', 'v1', 'configmaps', labels={'function-pythonic.package': kopf.PRESENT})
94+
@kopf.on.delete('', 'v1', 'secrets', labels={'function-pythonic.package': kopf.PRESENT})
95+
async def delete(old, logger, **_):
96+
package_dir, package = get_package_dir(old)
97+
if package_dir:
98+
for name in old.get('data', {}).keys():
99+
package_file = package_dir / name
100+
package_file.unlink(missing_ok=True)
101+
if package_file.suffixes == ['.py']:
102+
module = '.'.join(package + [package_file.stem])
103+
GRPC_RUNNER.invalidate_module(module)
104+
logger.info(f"Deleted module: {module}")
105+
else:
106+
logger.info(f"Deleted file: {'/'.join(package + [name])}")
107+
while package and package_dir.is_dir() and not list(package_dir.iterdir()):
108+
package_dir.rmdir()
109+
module = '.'.join(package)
110+
GRPC_RUNNER.invalidate_module(module)
111+
logger.info(f"Deleted package: {module}")
112+
package_dir = package_dir.parent
113+
package.pop()
114+
115+
116+
def register_grpc_runner(runner):
117+
global GRPC_RUNNER
118+
GRPC_RUNNER = runner
119+
120+
121+
def get_package_dir(body, logger=None):
122+
package = body.get('metadata', {}).get('labels', {}).get('function-pythonic.package', None)
123+
if package is None:
124+
if logger:
125+
logger.error('function-pythonic.package label is missing')
126+
return None, None
127+
package_dir = PACKAGES_DIR
128+
if package == '':
129+
package = []
130+
else:
131+
package = package.split('.')
132+
for segment in package:
133+
if not segment.isidentifier():
134+
if logger:
135+
logger.error('Package has invalid package name: %s', package)
136+
return None, None
137+
package_dir = package_dir / segment
138+
return package_dir, package

examples/eks-cluster/functions.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ apiVersion: pkg.crossplane.io/v1beta1
22
kind: Function
33
metadata:
44
name: function-pythonic
5-
#annotations:
6-
# render.crossplane.io/runtime: Development
5+
annotations:
6+
render.crossplane.io/runtime: Development
77
spec:
88
package: ghcr.io/fortra/function-pythonic:v0.0.5

0 commit comments

Comments
 (0)