diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java index 12099ffa25..f032c1a05c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java @@ -174,10 +174,10 @@ protected void submit( DependentResourceNode dependentResourceNode, NodeExecutor nodeExecutor, String operation) { + logger() + .debug("Submitting to {}: {} primaryID: {}", operation, dependentResourceNode, primaryID); final Future future = executorService.submit(nodeExecutor); markAsExecuting(dependentResourceNode, future); - logger() - .debug("Submitted to {}: {} primaryID: {}", operation, dependentResourceNode, primaryID); } protected void registerOrDeregisterEventSourceBasedOnActivation( diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 38c93d03ae..016e9b6568 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -183,8 +183,13 @@ public void handleRecentResourceCreate(ResourceID resourceID, R resource) { @Override public Optional get(ResourceID resourceID) { - var res = cache.get(resourceID); + // The order of these two lookups matters. If we queried the informer cache first, + // a race condition could occur: we might not find the resource there yet, then + // process an informer event that evicts the temporary resource cache entry. At that + // point the resource would already be present in the informer cache, but we would + // have missed it in both caches during this call. Optional resource = temporaryResourceCache.getResourceFromCache(resourceID); + var res = cache.get(resourceID); if (comparableResourceVersions && resource.isPresent() && res.filter( diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateCustomResource.java new file mode 100644 index 0000000000..0d25bbfdd4 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateCustomResource.java @@ -0,0 +1,28 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.javaoperatorsdk.operator.baseapi.cachingfilteringupdate; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("cfu") +public class CachingFilteringUpdateCustomResource + extends CustomResource implements Namespaced {} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateIT.java new file mode 100644 index 0000000000..c62c8ca186 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateIT.java @@ -0,0 +1,82 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.javaoperatorsdk.operator.baseapi.cachingfilteringupdate; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class CachingFilteringUpdateIT { + + public static final int RESOURCE_NUMBER = 250; + CachingFilteringUpdateReconciler reconciler = new CachingFilteringUpdateReconciler(); + + @RegisterExtension + LocallyRunOperatorExtension operator = + LocallyRunOperatorExtension.builder().withReconciler(reconciler).build(); + + @Test + void testResourceAccessAfterUpdate() { + for (int i = 0; i < RESOURCE_NUMBER; i++) { + operator.create(createCustomResource(i)); + } + await() + .pollDelay(Duration.ofSeconds(5)) + .atMost(Duration.ofMinutes(1)) + .until( + () -> { + if (reconciler.isIssueFound()) { + // Stop waiting as soon as an issue is detected. + return true; + } + // Use a single representative resource to detect that updates have completed. + var res = + operator.get( + CachingFilteringUpdateCustomResource.class, + "resource" + (RESOURCE_NUMBER - 1)); + return res != null + && res.getStatus() != null + && Boolean.TRUE.equals(res.getStatus().getUpdated()); + }); + + if (operator.getReconcilerOfType(CachingFilteringUpdateReconciler.class).isIssueFound()) { + throw new IllegalStateException("Error already found."); + } + + for (int i = 0; i < RESOURCE_NUMBER; i++) { + var res = operator.get(CachingFilteringUpdateCustomResource.class, "resource" + i); + assertThat(res.getStatus()).isNotNull(); + assertThat(res.getStatus().getUpdated()).isTrue(); + } + } + + public CachingFilteringUpdateCustomResource createCustomResource(int i) { + CachingFilteringUpdateCustomResource resource = new CachingFilteringUpdateCustomResource(); + resource.setMetadata( + new ObjectMetaBuilder() + .withName("resource" + i) + .withNamespace(operator.getNamespace()) + .build()); + return resource; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateReconciler.java new file mode 100644 index 0000000000..1bd60eb2c9 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateReconciler.java @@ -0,0 +1,95 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.javaoperatorsdk.operator.baseapi.cachingfilteringupdate; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; + +@ControllerConfiguration +public class CachingFilteringUpdateReconciler + implements Reconciler { + + private final AtomicBoolean issueFound = new AtomicBoolean(false); + + @Override + public UpdateControl reconcile( + CachingFilteringUpdateCustomResource resource, + Context context) { + + context.resourceOperations().serverSideApply(prepareCM(resource)); + var cachedCM = context.getSecondaryResource(ConfigMap.class); + if (cachedCM.isEmpty()) { + issueFound.set(true); + throw new IllegalStateException("Error for resource: " + ResourceID.fromResource(resource)); + } + + ensureStatusExists(resource); + resource.getStatus().setUpdated(true); + return UpdateControl.patchStatus(resource); + } + + private static ConfigMap prepareCM(CachingFilteringUpdateCustomResource p) { + var cm = + new ConfigMapBuilder() + .withMetadata( + new ObjectMetaBuilder() + .withName(p.getMetadata().getName()) + .withNamespace(p.getMetadata().getNamespace()) + .build()) + .withData(Map.of("name", p.getMetadata().getName())) + .build(); + cm.addOwnerReference(p); + return cm; + } + + @Override + public List> prepareEventSources( + EventSourceContext context) { + InformerEventSource cmES = + new InformerEventSource<>( + InformerEventSourceConfiguration.from( + ConfigMap.class, CachingFilteringUpdateCustomResource.class) + .build(), + context); + return List.of(cmES); + } + + private void ensureStatusExists(CachingFilteringUpdateCustomResource resource) { + CachingFilteringUpdateStatus status = resource.getStatus(); + if (status == null) { + status = new CachingFilteringUpdateStatus(); + resource.setStatus(status); + } + } + + public boolean isIssueFound() { + return issueFound.get(); + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateStatus.java new file mode 100644 index 0000000000..80b6c4ba54 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateStatus.java @@ -0,0 +1,29 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.javaoperatorsdk.operator.baseapi.cachingfilteringupdate; + +public class CachingFilteringUpdateStatus { + + private Boolean updated; + + public Boolean getUpdated() { + return updated; + } + + public void setUpdated(Boolean updated) { + this.updated = updated; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/ComparableResourceVersionsDisabledIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/PreviousAnnotationDisabledIT.java similarity index 100% rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/ComparableResourceVersionsDisabledIT.java rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/PreviousAnnotationDisabledIT.java diff --git a/sample-operators/mysql-schema/src/test/java/io/javaoperatorsdk/operator/sample/MySQLSchemaOperatorE2E.java b/sample-operators/mysql-schema/src/test/java/io/javaoperatorsdk/operator/sample/MySQLSchemaOperatorE2E.java index 1fe4364250..129b502ed8 100644 --- a/sample-operators/mysql-schema/src/test/java/io/javaoperatorsdk/operator/sample/MySQLSchemaOperatorE2E.java +++ b/sample-operators/mysql-schema/src/test/java/io/javaoperatorsdk/operator/sample/MySQLSchemaOperatorE2E.java @@ -127,7 +127,7 @@ void test() { .delete(); await() - .atMost(2, MINUTES) + .atMost(4, MINUTES) .ignoreExceptions() .untilAsserted( () -> {