11// TODO: Look into how to properly resolve `clippy::result_large_err`.
22// This will need changes in our and upstream error types.
33#![ allow( clippy:: result_large_err) ]
4- use std:: { future :: Future , sync:: Arc } ;
4+ use std:: sync:: Arc ;
55
66use anyhow:: anyhow;
77use clap:: Parser ;
88use futures:: { FutureExt , StreamExt , TryFutureExt } ;
9- use product_config:: ProductConfigManager ;
109use stackable_operator:: {
1110 YamlSchema ,
12- cli:: { Command , OperatorEnvironmentOptions , RunArguments } ,
13- client:: { self , Client } ,
11+ cli:: { Command , RunArguments } ,
12+ client,
1413 eos:: EndOfSupportChecker ,
1514 k8s_openapi:: api:: {
1615 apps:: v1:: DaemonSet ,
1716 core:: v1:: { ConfigMap , Service } ,
1817 } ,
1918 kube:: {
20- Api , CustomResourceExt as _,
19+ CustomResourceExt as _,
2120 core:: DeserializeGuard ,
2221 runtime:: {
2322 Controller ,
@@ -26,13 +25,9 @@ use stackable_operator::{
2625 } ,
2726 } ,
2827 logging:: controller:: report_controller_reconciled,
29- namespace:: WatchNamespace ,
3028 shared:: yaml:: SerializeOptions ,
3129 telemetry:: Tracing ,
32- utils:: {
33- cluster_info:: KubernetesClusterInfo ,
34- signal:: { self , SignalWatcher } ,
35- } ,
30+ utils:: signal:: { self , SignalWatcher } ,
3631} ;
3732
3833use crate :: {
@@ -144,17 +139,63 @@ async fn main() -> anyhow::Result<()> {
144139 . run ( sigterm_watcher. handle ( ) )
145140 . map_err ( |err| anyhow ! ( err) . context ( "failed to run webhook server" ) ) ;
146141
147- let controller = create_controller (
148- client. clone ( ) ,
149- product_config,
150- watch_namespace,
151- operator_image. clone ( ) ,
152- operator_image,
153- kubernetes_cluster_info,
154- operator_environment,
155- sigterm_watcher. handle ( ) ,
156- )
157- . map ( anyhow:: Ok ) ;
142+ let event_recorder = Arc :: new ( Recorder :: new (
143+ client. as_kube_client ( ) ,
144+ Reporter {
145+ controller : OPA_FULL_CONTROLLER_NAME . to_string ( ) ,
146+ instance : None ,
147+ } ,
148+ ) ) ;
149+
150+ let controller = Controller :: new (
151+ watch_namespace. get_api :: < DeserializeGuard < v1alpha2:: OpaCluster > > ( & client) ,
152+ watcher:: Config :: default ( ) ,
153+ ) ;
154+
155+ let controller = controller
156+ . owns (
157+ watch_namespace. get_api :: < DeserializeGuard < DaemonSet > > ( & client) ,
158+ watcher:: Config :: default ( ) ,
159+ )
160+ . owns (
161+ watch_namespace. get_api :: < DeserializeGuard < ConfigMap > > ( & client) ,
162+ watcher:: Config :: default ( ) ,
163+ )
164+ . owns (
165+ watch_namespace. get_api :: < DeserializeGuard < Service > > ( & client) ,
166+ watcher:: Config :: default ( ) ,
167+ )
168+ . graceful_shutdown_on ( sigterm_watcher. handle ( ) )
169+ . run (
170+ controller:: reconcile_opa,
171+ controller:: error_policy,
172+ Arc :: new ( controller:: Ctx {
173+ client : client. clone ( ) ,
174+ product_config,
175+ opa_bundle_builder_image : operator_image. clone ( ) ,
176+ user_info_fetcher_image : operator_image,
177+ operator_environment,
178+ cluster_info : kubernetes_cluster_info,
179+ } ) ,
180+ )
181+ // We can let the reporting happen in the background
182+ . for_each_concurrent (
183+ 16 , // concurrency limit
184+ |result| {
185+ // The event_recorder needs to be shared across all invocations, so that
186+ // events are correctly aggregated
187+ let event_recorder = event_recorder. clone ( ) ;
188+ async move {
189+ report_controller_reconciled (
190+ & event_recorder,
191+ OPA_FULL_CONTROLLER_NAME ,
192+ & result,
193+ )
194+ . await ;
195+ }
196+ } ,
197+ )
198+ . map ( anyhow:: Ok ) ;
158199
159200 let delayed_controller = async {
160201 signal:: crd_established ( & client, v1alpha2:: OpaCluster :: crd_name ( ) , None ) . await ?;
@@ -167,69 +208,3 @@ async fn main() -> anyhow::Result<()> {
167208
168209 Ok ( ( ) )
169210}
170-
171- /// This creates an instance of a [`Controller`] which waits for incoming events and reconciles them.
172- ///
173- /// This is an async method and the returned future needs to be consumed to make progress.
174- async fn create_controller < F > (
175- client : Client ,
176- product_config : ProductConfigManager ,
177- watch_namespace : WatchNamespace ,
178- opa_bundle_builder_image : String ,
179- user_info_fetcher_image : String ,
180- cluster_info : KubernetesClusterInfo ,
181- operator_environment : OperatorEnvironmentOptions ,
182- shutdown_signal : F ,
183- ) where
184- F : Future < Output = ( ) > + Send + Sync + ' static ,
185- {
186- let opa_api: Api < DeserializeGuard < v1alpha2:: OpaCluster > > = watch_namespace. get_api ( & client) ;
187- let daemonsets_api: Api < DeserializeGuard < DaemonSet > > = watch_namespace. get_api ( & client) ;
188- let configmaps_api: Api < DeserializeGuard < ConfigMap > > = watch_namespace. get_api ( & client) ;
189- let services_api: Api < DeserializeGuard < Service > > = watch_namespace. get_api ( & client) ;
190-
191- let controller = Controller :: new ( opa_api, watcher:: Config :: default ( ) )
192- . owns ( daemonsets_api, watcher:: Config :: default ( ) )
193- . owns ( configmaps_api, watcher:: Config :: default ( ) )
194- . owns ( services_api, watcher:: Config :: default ( ) ) ;
195-
196- let event_recorder = Arc :: new ( Recorder :: new (
197- client. as_kube_client ( ) ,
198- Reporter {
199- controller : OPA_FULL_CONTROLLER_NAME . to_string ( ) ,
200- instance : None ,
201- } ,
202- ) ) ;
203- controller
204- . graceful_shutdown_on ( shutdown_signal)
205- . run (
206- controller:: reconcile_opa,
207- controller:: error_policy,
208- Arc :: new ( controller:: Ctx {
209- client : client. clone ( ) ,
210- product_config,
211- opa_bundle_builder_image,
212- user_info_fetcher_image,
213- operator_environment,
214- cluster_info,
215- } ) ,
216- )
217- // We can let the reporting happen in the background
218- . for_each_concurrent (
219- 16 , // concurrency limit
220- |result| {
221- // The event_recorder needs to be shared across all invocations, so that
222- // events are correctly aggregated
223- let event_recorder = event_recorder. clone ( ) ;
224- async move {
225- report_controller_reconciled (
226- & event_recorder,
227- OPA_FULL_CONTROLLER_NAME ,
228- & result,
229- )
230- . await ;
231- }
232- } ,
233- )
234- . await ;
235- }
0 commit comments