|
5 | 5 | import builtins |
6 | 6 | import importlib |
7 | 7 | import inspect |
| 8 | +import sys |
8 | 9 |
|
9 | 10 | import grpc |
10 | 11 | import crossplane.function.logging |
@@ -32,9 +33,22 @@ def __init__(self, debug=False): |
32 | 33 | self.logger = crossplane.function.logging.get_logger() |
33 | 34 | self.clazzes = {} |
34 | 35 |
|
| 36 | + def invalidate_module(self, module): |
| 37 | + self.clazzes.clear() |
| 38 | + if module in sys.modules: |
| 39 | + del sys.modules[module] |
| 40 | + importlib.invalidate_caches() |
| 41 | + |
35 | 42 | async def RunFunction( |
36 | 43 | self, request: fnv1.RunFunctionRequest, _: grpc.aio.ServicerContext |
37 | 44 | ) -> fnv1.RunFunctionResponse: |
| 45 | + try: |
| 46 | + return await self.run_function(request) |
| 47 | + except: |
| 48 | + self.logger.exception('Exception thrown in run fuction') |
| 49 | + raise |
| 50 | + |
| 51 | + async def run_function(self, request): |
38 | 52 | composite = request.observed.composite.resource |
39 | 53 | logger = self.logger.bind( |
40 | 54 | apiVersion=composite['apiVersion'], |
@@ -70,54 +84,71 @@ async def RunFunction( |
70 | 84 | try: |
71 | 85 | exec(composite, module.__dict__) |
72 | 86 | except Exception as e: |
73 | | - crossplane.function.response.fatal(response, f"Exec exception: {e}") |
74 | 87 | logger.exception('Exec exception') |
| 88 | + crossplane.function.response.fatal(response, f"Exec exception: {e}") |
75 | 89 | return response |
76 | 90 | composite = ['<script>', 'Composite'] |
77 | 91 | else: |
78 | 92 | composite = composite.rsplit('.', 1) |
79 | 93 | if len(composite) == 1: |
80 | | - crossplane.function.response.fatal(response, f"Composite class name does not include module: {composite[0]}") |
81 | 94 | logger.error(f"Composite class name does not include module: {composite[0]}") |
| 95 | + crossplane.function.response.fatal(response, f"Composite class name does not include module: {composite[0]}") |
82 | 96 | return response |
83 | 97 | try: |
84 | 98 | module = importlib.import_module(composite[0]) |
85 | 99 | except Exception as e: |
| 100 | + logger.error(str(e)) |
86 | 101 | crossplane.function.response.fatal(response, f"Import module exception: {e}") |
87 | | - logger.exception('Import module exception') |
88 | 102 | return response |
89 | 103 | clazz = getattr(module, composite[1], None) |
90 | 104 | if not clazz: |
91 | | - crossplane.function.response.fatal(response, f"{composite[0]} did not define: {composite[1]}") |
92 | 105 | logger.error(f"{composite[0]} did not define: {composite[1]}") |
| 106 | + crossplane.function.response.fatal(response, f"{composite[0]} did not define: {composite[1]}") |
93 | 107 | return response |
94 | 108 | composite = '.'.join(composite) |
95 | 109 | if not inspect.isclass(clazz): |
96 | | - crossplane.function.response.fatal(response, f"{composite} is not a class") |
97 | 110 | logger.error(f"{composite} is not a class") |
| 111 | + crossplane.function.response.fatal(response, f"{composite} is not a class") |
98 | 112 | return response |
99 | 113 | if not issubclass(clazz, BaseComposite): |
100 | | - crossplane.function.response.fatal(response, f"{composite} is not a subclass of BaseComposite") |
101 | 114 | logger.error(f"{composite} is not a subclass of BaseComposite") |
| 115 | + crossplane.function.response.fatal(response, f"{composite} is not a subclass of BaseComposite") |
102 | 116 | return response |
103 | 117 | self.clazzes[composite] = clazz |
104 | 118 |
|
105 | 119 | try: |
106 | 120 | composite = clazz(request, response, logger) |
107 | 121 | except Exception as e: |
108 | | - crossplane.function.response.fatal(response, f"Instatiate exception: {e}") |
109 | 122 | logger.exception('Instatiate exception') |
| 123 | + crossplane.function.response.fatal(response, f"Instatiate exception: {e}") |
110 | 124 | return response |
111 | 125 |
|
112 | 126 | try: |
113 | 127 | result = composite.compose() |
114 | 128 | if asyncio.iscoroutine(result): |
115 | 129 | await result |
116 | 130 | except Exception as e: |
117 | | - crossplane.function.response.fatal(response, f"Compose exception: {e}") |
118 | 131 | logger.exception('Compose exception') |
| 132 | + crossplane.function.response.fatal(response, f"Compose exception: {e}") |
119 | 133 | return response |
120 | 134 |
|
| 135 | + if len(composite.response.requirements.extra_resources): |
| 136 | + requireds = Map() |
| 137 | + for name, selector in composite.response.requirements.extra_resources: |
| 138 | + requireds[name].apiVersion = selector.api_version |
| 139 | + requireds[name].kind = selector.kind |
| 140 | + if selector.namespace: |
| 141 | + requireds[name].namespace = selector.namespace |
| 142 | + if selector.match_name: |
| 143 | + requireds[name].name = selector.match_name |
| 144 | + if len(selector.match_labels.labels): |
| 145 | + for key, value in selector.match_labels.labels: |
| 146 | + requireds[name].labels[key] = value |
| 147 | + if requireds != composite.context._requireds: |
| 148 | + composite.context._requireds = requireds |
| 149 | + logger.debug('Requireds requsted') |
| 150 | + return response |
| 151 | + |
121 | 152 | unknownResources = [] |
122 | 153 | warningResources = [] |
123 | 154 | fatalResources = [] |
@@ -145,32 +176,41 @@ async def RunFunction( |
145 | 176 | elif warning: |
146 | 177 | logger.warning('Observed unknown', destination=destination, source=source) |
147 | 178 | else: |
148 | | - logger.debug('New unknown', destination=destination, source=source) |
| 179 | + logger.debug('Desired unknown', destination=destination, source=source) |
149 | 180 | if resource.observed: |
150 | 181 | resource.desired._patchUnknowns(resource.observed) |
151 | 182 | else: |
152 | 183 | del composite.resources[name] |
| 184 | + |
153 | 185 | if fatalResources: |
154 | | - if not self.debug: |
155 | | - logger.error('Observed resources with unknowns', resources=fatalResources) |
| 186 | + level = logger.error |
| 187 | + reason = 'FatalUnknowns' |
156 | 188 | message = f"Observed resources with unknowns: {','.join(fatalResources)}" |
157 | | - composite.conditions.NoUnknowns(False, 'FatalUnknowns', message) |
158 | | - composite.results.fatal(message, 'FatalUnknowns') |
159 | | - return response |
160 | | - if warningResources: |
161 | | - if not self.debug: |
162 | | - logger.warning('Observed resources with unknowns', resources=fatalResources) |
| 189 | + status = False |
| 190 | + event = composite.events.fatal |
| 191 | + elif warningResources: |
| 192 | + level = logger.warning |
| 193 | + reason = 'ObservedUnknowns' |
163 | 194 | message = f"Observed resources with unknowns: {','.join(warningResources)}" |
164 | | - composite.conditions.NoUnknowns(False, 'ObservedUnknowns', message) |
165 | | - composite.results.warning(message, 'ObservedUnknowns') |
| 195 | + status = False |
| 196 | + event = composite.events.warning |
166 | 197 | elif unknownResources: |
167 | | - if not self.debug: |
168 | | - logger.info('New resources with unknowns', resources=unknownResources) |
169 | | - message = f"New resources with unknowns: {','.join(unknownResources)}" |
170 | | - composite.conditions.NoUnknowns(False, 'NewUnknowns', message) |
171 | | - composite.results.info(message, 'NewUnknowns') |
| 198 | + level = logger.info |
| 199 | + reason = 'DesiredUnknowns' |
| 200 | + message = f"Desired resources with unknowns: {','.join(unknownResources)}" |
| 201 | + status = False |
| 202 | + event = composite.events.info |
172 | 203 | else: |
173 | | - composite.conditions.NoUnknowns(True, 'AllResolved', 'All resources are resolved') |
| 204 | + level = None |
| 205 | + reason = 'AllComposed' |
| 206 | + message = 'All resources are composed' |
| 207 | + status = True |
| 208 | + event = None |
| 209 | + if not self.debug and level: |
| 210 | + level(message) |
| 211 | + composite.conditions.ResourcesComposed(reason, message, status) |
| 212 | + if event: |
| 213 | + event(reason, message) |
174 | 214 |
|
175 | 215 | for name, resource in composite.resources: |
176 | 216 | if resource.autoReady or (resource.autoReady is None and composite.autoReady): |
|
0 commit comments