From 5cdca97f28d7b8ae16b5ce136584dfb077af3d51 Mon Sep 17 00:00:00 2001 From: Suryakant Agarwal Date: Wed, 13 Aug 2025 12:30:37 +0530 Subject: [PATCH 01/11] Fix Bug For Executions Failing With Only One Schedule --- .../scheduler/GobblinServiceJobScheduler.java | 9 ++- .../GobblinServiceJobSchedulerTest.java | 66 +++++++++++++++++-- 2 files changed, 69 insertions(+), 6 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index 85f4daf78f8..eeefabad5ba 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -698,8 +698,13 @@ public void executeImpl(JobExecutionContext context) throws JobExecutionExceptio } else { jobProps.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY, String.valueOf(triggerTimeMillis)); - _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {} nextTriggerTime: {} - Job triggered by " - + "scheduler", triggerTimeMillis, utcDateAsUTCEpochMillis(trigger.getNextFireTime())); + if(trigger.getNextFireTime() == null){ + _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {} nextTriggerTime: NA - Job triggered by " + + "scheduler", triggerTimeMillis); + } else { + _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {} nextTriggerTime: {} - Job triggered by " + + "scheduler", triggerTimeMillis, utcDateAsUTCEpochMillis(trigger.getNextFireTime())); + } } jobScheduler.runJob(jobProps, jobListener); } catch (Throwable t) { diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java index 0801a18b230..967c3c17541 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java @@ -73,10 +73,7 @@ import static org.apache.gobblin.runtime.spec_catalog.FlowCatalog.FLOWSPEC_STORE_DIR_KEY; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class GobblinServiceJobSchedulerTest { @@ -120,6 +117,67 @@ public void testIsNextRunWithinRangeToSchedule() { } + /** + * Test that demonstrates the NullPointerException issue in GobblinServiceJob.executeImpl + * when getNextFireTime() returns null for a one-time scheduled flow. + */ + @Test + public void testGobblinServiceJobExecuteImplWithNullNextFireTime() throws Exception { + // Create a mock JobExecutionContext + org.quartz.JobExecutionContext mockContext = mock(org.quartz.JobExecutionContext.class); + org.quartz.JobDetail mockJobDetail = mock(org.quartz.JobDetail.class); + org.quartz.JobDataMap mockJobDataMap = mock(org.quartz.JobDataMap.class); + org.quartz.Trigger mockTrigger = mock(org.quartz.Trigger.class); + + // Mock the job detail and data map + when(mockContext.getJobDetail()).thenReturn(mockJobDetail); + when(mockContext.getTrigger()).thenReturn(mockTrigger); + when(mockJobDetail.getJobDataMap()).thenReturn(mockJobDataMap); + when(mockJobDetail.getKey()).thenReturn(new org.quartz.JobKey("testFlow", "testGroup")); + + // Mock the job data map contents + Properties jobProps = new Properties(); + jobProps.setProperty(ConfigurationKeys.FLOW_NAME_KEY, "testFlow"); + jobProps.setProperty(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup"); + jobProps.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY, "0"); + + org.apache.gobblin.scheduler.JobScheduler mockJobScheduler = mock(org.apache.gobblin.scheduler.JobScheduler.class); + org.apache.gobblin.runtime.listeners.JobListener mockJobListener = mock(org.apache.gobblin.runtime.listeners.JobListener.class); + + when(mockJobDataMap.get(GobblinServiceJobScheduler.JOB_SCHEDULER_KEY)).thenReturn(mockJobScheduler); + when(mockJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY)).thenReturn(jobProps); + when(mockJobDataMap.get(GobblinServiceJobScheduler.JOB_LISTENER_KEY)).thenReturn(mockJobListener); + + // Mock the trigger to return null for getNextFireTime() (simulating a one-time scheduled job) + when(mockTrigger.getPreviousFireTime()).thenReturn(new java.util.Date()); + when(mockTrigger.getNextFireTime()).thenReturn(null); // This is the key - null for one-time jobs + + // Create an instance of GobblinServiceJob + GobblinServiceJobScheduler.GobblinServiceJob job = new GobblinServiceJobScheduler.GobblinServiceJob(); + + // Mock the runJob method to avoid actual execution + doNothing().when(mockJobScheduler).runJob(any(Properties.class), any(org.apache.gobblin.runtime.listeners.JobListener.class)); + + try { + // This should not throw a NullPointerException if the code is properly handling null values + job.executeImpl(mockContext); + + // If we reach here, the method handled null getNextFireTime() gracefully + // Verify that runJob was called + verify(mockJobScheduler, times(1)).runJob(any(Properties.class), any(org.apache.gobblin.runtime.listeners.JobListener.class)); + + } catch (org.quartz.JobExecutionException e) { + // Check if the cause is a NullPointerException + if (e.getCause() instanceof NullPointerException) { + Assert.fail("GobblinServiceJob.executeImpl threw NullPointerException when getNextFireTime() is null. " + + "This indicates the scheduler cannot handle one-time scheduled flows properly."); + } else { + // Re-throw if it's a different type of exception + throw e; + } + } + } + /** * Test whenever JobScheduler is calling setActive, the FlowSpec is loading into scheduledFlowSpecs (eventually) */ From 436e0819fb1d1e53ebd65672a48f62b77989f583 Mon Sep 17 00:00:00 2001 From: Suryakant Agarwal Date: Wed, 13 Aug 2025 16:31:46 +0530 Subject: [PATCH 02/11] Updating Gobblin Checkstyle --- .../scheduler/GobblinServiceJobScheduler.java | 12 +++++++----- .../GobblinServiceJobSchedulerTest.java | 17 ++++++++++------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index eeefabad5ba..fe15cb1122d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -698,12 +698,14 @@ public void executeImpl(JobExecutionContext context) throws JobExecutionExceptio } else { jobProps.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY, String.valueOf(triggerTimeMillis)); - if(trigger.getNextFireTime() == null){ - _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {} nextTriggerTime: NA - Job triggered by " - + "scheduler", triggerTimeMillis); + if (trigger.getNextFireTime() == null) { + _log.info( + jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {} nextTriggerTime: NA - Job triggered by " + + "scheduler", triggerTimeMillis); } else { - _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {} nextTriggerTime: {} - Job triggered by " - + "scheduler", triggerTimeMillis, utcDateAsUTCEpochMillis(trigger.getNextFireTime())); + _log.info( + jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {} nextTriggerTime: {} - Job triggered by " + + "scheduler", triggerTimeMillis, utcDateAsUTCEpochMillis(trigger.getNextFireTime())); } } jobScheduler.runJob(jobProps, jobListener); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java index 967c3c17541..9ec2b57502e 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java @@ -122,7 +122,8 @@ public void testIsNextRunWithinRangeToSchedule() { * when getNextFireTime() returns null for a one-time scheduled flow. */ @Test - public void testGobblinServiceJobExecuteImplWithNullNextFireTime() throws Exception { + public void testGobblinServiceJobExecuteImplWithNullNextFireTime() + throws Exception { // Create a mock JobExecutionContext org.quartz.JobExecutionContext mockContext = mock(org.quartz.JobExecutionContext.class); org.quartz.JobDetail mockJobDetail = mock(org.quartz.JobDetail.class); @@ -142,7 +143,8 @@ public void testGobblinServiceJobExecuteImplWithNullNextFireTime() throws Except jobProps.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY, "0"); org.apache.gobblin.scheduler.JobScheduler mockJobScheduler = mock(org.apache.gobblin.scheduler.JobScheduler.class); - org.apache.gobblin.runtime.listeners.JobListener mockJobListener = mock(org.apache.gobblin.runtime.listeners.JobListener.class); + org.apache.gobblin.runtime.listeners.JobListener mockJobListener = + mock(org.apache.gobblin.runtime.listeners.JobListener.class); when(mockJobDataMap.get(GobblinServiceJobScheduler.JOB_SCHEDULER_KEY)).thenReturn(mockJobScheduler); when(mockJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY)).thenReturn(jobProps); @@ -156,7 +158,8 @@ public void testGobblinServiceJobExecuteImplWithNullNextFireTime() throws Except GobblinServiceJobScheduler.GobblinServiceJob job = new GobblinServiceJobScheduler.GobblinServiceJob(); // Mock the runJob method to avoid actual execution - doNothing().when(mockJobScheduler).runJob(any(Properties.class), any(org.apache.gobblin.runtime.listeners.JobListener.class)); + doNothing().when(mockJobScheduler) + .runJob(any(Properties.class), any(org.apache.gobblin.runtime.listeners.JobListener.class)); try { // This should not throw a NullPointerException if the code is properly handling null values @@ -164,13 +167,13 @@ public void testGobblinServiceJobExecuteImplWithNullNextFireTime() throws Except // If we reach here, the method handled null getNextFireTime() gracefully // Verify that runJob was called - verify(mockJobScheduler, times(1)).runJob(any(Properties.class), any(org.apache.gobblin.runtime.listeners.JobListener.class)); - + verify(mockJobScheduler, times(1)).runJob(any(Properties.class), + any(org.apache.gobblin.runtime.listeners.JobListener.class)); } catch (org.quartz.JobExecutionException e) { // Check if the cause is a NullPointerException if (e.getCause() instanceof NullPointerException) { - Assert.fail("GobblinServiceJob.executeImpl threw NullPointerException when getNextFireTime() is null. " + - "This indicates the scheduler cannot handle one-time scheduled flows properly."); + Assert.fail("GobblinServiceJob.executeImpl threw NullPointerException when getNextFireTime() is null. " + + "This indicates the scheduler cannot handle one-time scheduled flows properly."); } else { // Re-throw if it's a different type of exception throw e; From 9940fcd511c9c9c70e4312a8c906a416daa4920d Mon Sep 17 00:00:00 2001 From: Suryakant Agarwal Date: Wed, 13 Aug 2025 18:53:42 +0530 Subject: [PATCH 03/11] Update Review Comments --- .../modules/scheduler/GobblinServiceJobSchedulerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java index 9ec2b57502e..607c0de9be7 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java @@ -140,7 +140,6 @@ public void testGobblinServiceJobExecuteImplWithNullNextFireTime() Properties jobProps = new Properties(); jobProps.setProperty(ConfigurationKeys.FLOW_NAME_KEY, "testFlow"); jobProps.setProperty(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup"); - jobProps.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY, "0"); org.apache.gobblin.scheduler.JobScheduler mockJobScheduler = mock(org.apache.gobblin.scheduler.JobScheduler.class); org.apache.gobblin.runtime.listeners.JobListener mockJobListener = From 506a2e05b837ebf9e24f5b2ff13c7221a956aec7 Mon Sep 17 00:00:00 2001 From: Suryakant Agarwal Date: Wed, 13 Aug 2025 20:08:40 +0530 Subject: [PATCH 04/11] Removing Repetative ClassPaths From Test --- .../GobblinServiceJobSchedulerTest.java | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java index 607c0de9be7..f4271975b5d 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java @@ -31,6 +31,11 @@ import org.mockito.MockitoAnnotations; import org.mockito.invocation.Invocation; import org.mockito.stubbing.Answer; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.Trigger; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -52,10 +57,12 @@ import org.apache.gobblin.runtime.api.SpecCatalogListener; import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher; +import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; import org.apache.gobblin.runtime.spec_catalog.FlowCatalogTest; import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.scheduler.JobScheduler; import org.apache.gobblin.scheduler.SchedulerService; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.flow.MockedSpecCompiler; @@ -125,10 +132,10 @@ public void testIsNextRunWithinRangeToSchedule() { public void testGobblinServiceJobExecuteImplWithNullNextFireTime() throws Exception { // Create a mock JobExecutionContext - org.quartz.JobExecutionContext mockContext = mock(org.quartz.JobExecutionContext.class); - org.quartz.JobDetail mockJobDetail = mock(org.quartz.JobDetail.class); - org.quartz.JobDataMap mockJobDataMap = mock(org.quartz.JobDataMap.class); - org.quartz.Trigger mockTrigger = mock(org.quartz.Trigger.class); + JobExecutionContext mockContext = mock(JobExecutionContext.class); + JobDetail mockJobDetail = mock(JobDetail.class); + JobDataMap mockJobDataMap = mock(JobDataMap.class); + Trigger mockTrigger = mock(Trigger.class); // Mock the job detail and data map when(mockContext.getJobDetail()).thenReturn(mockJobDetail); @@ -141,9 +148,9 @@ public void testGobblinServiceJobExecuteImplWithNullNextFireTime() jobProps.setProperty(ConfigurationKeys.FLOW_NAME_KEY, "testFlow"); jobProps.setProperty(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup"); - org.apache.gobblin.scheduler.JobScheduler mockJobScheduler = mock(org.apache.gobblin.scheduler.JobScheduler.class); - org.apache.gobblin.runtime.listeners.JobListener mockJobListener = - mock(org.apache.gobblin.runtime.listeners.JobListener.class); + JobScheduler mockJobScheduler = mock(JobScheduler.class); + JobListener mockJobListener = + mock(JobListener.class); when(mockJobDataMap.get(GobblinServiceJobScheduler.JOB_SCHEDULER_KEY)).thenReturn(mockJobScheduler); when(mockJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY)).thenReturn(jobProps); @@ -158,7 +165,7 @@ public void testGobblinServiceJobExecuteImplWithNullNextFireTime() // Mock the runJob method to avoid actual execution doNothing().when(mockJobScheduler) - .runJob(any(Properties.class), any(org.apache.gobblin.runtime.listeners.JobListener.class)); + .runJob(any(Properties.class), any(JobListener.class)); try { // This should not throw a NullPointerException if the code is properly handling null values @@ -167,8 +174,8 @@ public void testGobblinServiceJobExecuteImplWithNullNextFireTime() // If we reach here, the method handled null getNextFireTime() gracefully // Verify that runJob was called verify(mockJobScheduler, times(1)).runJob(any(Properties.class), - any(org.apache.gobblin.runtime.listeners.JobListener.class)); - } catch (org.quartz.JobExecutionException e) { + any(JobListener.class)); + } catch (JobExecutionException e) { // Check if the cause is a NullPointerException if (e.getCause() instanceof NullPointerException) { Assert.fail("GobblinServiceJob.executeImpl threw NullPointerException when getNextFireTime() is null. " From b5a946d47fc47e7d043714ca20d31ad2033d1e64 Mon Sep 17 00:00:00 2001 From: Suryakant Agarwal Date: Tue, 26 Aug 2025 10:59:10 +0530 Subject: [PATCH 05/11] Add Metric When A FLow Spec Already Exists For A New Flow --- .../gobblin/metrics/ServiceMetricNames.java | 1 + .../restli/FlowConfigsV2ResourceHandler.java | 21 +++++++++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java index 7809e785f57..347633e1d2a 100644 --- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java +++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java @@ -58,6 +58,7 @@ public class ServiceMetricNames { public static final String CREATE_FLOW_METER = "CreateFlow"; public static final String DELETE_FLOW_METER = "DeleteFlow"; + public static final String FLOW_SPEC_EXISTS_FOR_ADHOC_FLOW = "flowSpecExistsForAdhocFlow"; public static final String RUN_IMMEDIATELY_FLOW_METER = "RunImmediatelyFlow"; public static final String SUCCESSFUL_FLOW_METER = "SuccessfulFlows"; public static final String START_SLA_EXCEEDED_FLOWS_METER = "StartSLAExceededFlows"; diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java index 055724d83d6..2a24336727b 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java @@ -86,6 +86,7 @@ public class FlowConfigsV2ResourceHandler implements FlowConfigsResourceHandlerI protected FlowCatalog flowCatalog; protected final ContextAwareMeter createFlow; protected final ContextAwareMeter deleteFlow; + protected final ContextAwareMeter flowSpecExistsForAdhocFlow; protected final ContextAwareMeter runImmediatelyFlow; @Inject @@ -100,6 +101,8 @@ public FlowConfigsV2ResourceHandler(@Named(InjectionNames.SERVICE_NAME) String s MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.DELETE_FLOW_METER)); this.runImmediatelyFlow = metricContext.contextAwareMeter( MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.RUN_IMMEDIATELY_FLOW_METER)); + this.flowSpecExistsForAdhocFlow = metricContext.contextAwareMeter( + MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames. RUN_IMMEDIATELY_FLOW_METER)); } public FlowConfig getFlowConfig(FlowId flowId) @@ -248,8 +251,22 @@ public CreateKVResponse, FlowConfig> cr // Return conflict and take no action if flowSpec has already been created if (this.flowCatalog.exists(flowSpec.getUri())) { log.warn("FlowSpec with URI {} already exists, no action will be taken", flowSpec.getUri()); - return new CreateKVResponse<>(new RestLiServiceException(HttpStatus.S_409_CONFLICT, - "FlowSpec with URI " + flowSpec.getUri() + " already exists, no action will be taken")); + try { + FlowSpec storedFlowSpec = this.flowCatalog.getSpecs(flowSpec.getUri()); + if(!storedFlowSpec.isScheduled()){ + log.error("FlowSpec Already Exists As Adhoc Flow with URI: " + flowSpec.getUri() +); + if(!flowSpec.isScheduled()){ + flowSpecExistsForAdhocFlow.mark(); + } + }else{ + log.error("FlowSpec Already Exists As Scheduled Flow with URI: " + flowSpec.getUri() + ""); + } + } catch (SpecNotFoundException e) { + log.error("Error Retrieving FLow For Existing Flow With URI: " + flowSpec.getUri()); + } finally { + return new CreateKVResponse<>(new RestLiServiceException(HttpStatus.S_409_CONFLICT, + "FlowSpec with URI " + flowSpec.getUri() + " already exists, no action will be taken")); + } } Map responseMap; From 3d7c21ddc41e78e0d8f36d436d96be2c92c06352 Mon Sep 17 00:00:00 2001 From: Suryakant Agarwal Date: Tue, 26 Aug 2025 13:04:33 +0530 Subject: [PATCH 06/11] Adding Metrics For Ahhoc Flow Scep Failure and 409 sent to Users --- .../modules/restli/FlowConfigsV2ResourceHandler.java | 4 ++-- .../task/DagProcessingEngineMetrics.java | 12 ++++++++++++ .../service/modules/orchestration/task/DagTask.java | 2 +- .../modules/orchestration/task/LaunchDagTask.java | 9 ++++++++- 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java index 2a24336727b..312b5d3ba27 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java @@ -254,12 +254,12 @@ public CreateKVResponse, FlowConfig> cr try { FlowSpec storedFlowSpec = this.flowCatalog.getSpecs(flowSpec.getUri()); if(!storedFlowSpec.isScheduled()){ - log.error("FlowSpec Already Exists As Adhoc Flow with URI: " + flowSpec.getUri() +); + log.error("FlowSpec Already Exists As Adhoc Flow with URI: " + flowSpec.getUri()); if(!flowSpec.isScheduled()){ flowSpecExistsForAdhocFlow.mark(); } }else{ - log.error("FlowSpec Already Exists As Scheduled Flow with URI: " + flowSpec.getUri() + ""); + log.error("FlowSpec Already Exists As Scheduled Flow with URI: " + flowSpec.getUri()); } } catch (SpecNotFoundException e) { log.error("Error Retrieving FLow For Existing Flow With URI: " + flowSpec.getUri()); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java index 25412b1aeb6..572045d2514 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java @@ -59,6 +59,8 @@ public class DagProcessingEngineMetrics { private final HashMap dagActionsActSucceededMeterByDagActionType = new HashMap<>(); private final HashMap dagActionsConcludeFailedMeterByDagActionType = new HashMap<>(); private final HashMap dagActionsConcludeSucceededMeterByDagActionType = new HashMap<>(); + private final HashMap dagActionsConcludeFlowSpecRemovalSucceededMetreByDagActionType = new HashMap<>(); + private final HashMap dagActionsConcludeFlowSpecRemovalFailedMetreByDagActionType = new HashMap<>(); private final HashMap dagActionsDeleteFailedMeterByDagActionType = new HashMap<>(); private final HashMap dagActionsDeleteSucceededMeterByDagActionType = new HashMap<>(); private final HashMap dagActionsAverageProcessingDelayMillisMeterByDagActionType = new HashMap<>(); @@ -89,6 +91,8 @@ public void registerAllMetrics() { registerMetricForEachDagActionType(this.dagActionsActSucceededMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_ACT_SUCCEEDED); registerMetricForEachDagActionType(this.dagActionsConcludeFailedMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FAILED); registerMetricForEachDagActionType(this.dagActionsConcludeSucceededMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_SUCCEEDED); + registerMetricForEachDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMetreByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED); + registerMetricForEachDagActionType(this.dagActionsConcludeFlowSpecRemovalFailedMetreByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_FAILED); registerMetricForEachDagActionType(this.dagActionsDeleteFailedMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_DELETE_FAILED); registerMetricForEachDagActionType(this.dagActionsDeleteSucceededMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_DELETE_SUCCEEDED); registerMetricForEachDagActionType(this.dagActionsAverageProcessingDelayMillisMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_AVERAGE_PROCESSING_DELAY_MILLIS); @@ -163,6 +167,14 @@ public void markDagActionsConclude(DagActionStore.DagActionType dagActionType, b } } + public void markDagActionsConflowFlowSpecRemoval(DagActionStore.DagActionType dagActionType, boolean succeeded){ + if(succeeded){ + updateMetricForDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMetreByDagActionType, dagActionType); + } else { + updateMetricForDagActionType(this.dagActionsConcludeFlowSpecRemovalFailedMetreByDagActionType, dagActionType); + } + } + public void markDagActionsDeleted(DagActionStore.DagActionType dagActionType, boolean succeeded) { if (succeeded) { updateMetricForDagActionType(this.dagActionsDeleteSucceededMeterByDagActionType, dagActionType); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java index e00a59df45a..54bb6ff3108 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java @@ -40,7 +40,7 @@ public abstract class DagTask { @Getter public final DagActionStore.DagAction dagAction; protected final DagManagementStateStore dagManagementStateStore; private final LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus; - private final DagProcessingEngineMetrics dagProcEngineMetrics; + protected final DagProcessingEngineMetrics dagProcEngineMetrics; public DagTask(DagActionStore.DagAction dagAction, LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus, DagManagementStateStore dagManagementStateStore, DagProcessingEngineMetrics dagProcEngineMetrics) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java index fa3f291d92c..a31c9391d44 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java @@ -54,7 +54,14 @@ public final boolean conclude() { FlowSpec flowSpec = this.dagManagementStateStore.getFlowSpec(FlowSpec.Utils.createFlowSpecUri(dagId.getFlowId())); if (!flowSpec.isScheduled()) { - dagManagementStateStore.removeFlowSpec(flowSpec.getUri(), new Properties(), false); + try { + //This can throw Runtime, IllegalState and IO Exceptions which are not caught here. + dagManagementStateStore.removeFlowSpec(flowSpec.getUri(), new Properties(), false); + } catch (Exception e) { + super.dagProcEngineMetrics.markDagActionsConflowFlowSpecRemoval(this.dagAction.getDagActionType(), false); + log.error("Failed to Remove The FlowSpec For Adhoc Flow with URI: " + flowSpec.getUri()); + return false; + } } return true; } From 1e8d44452a5e94a24efca401a8f414118c5262d5 Mon Sep 17 00:00:00 2001 From: Suryakant Agarwal Date: Tue, 26 Aug 2025 13:20:21 +0530 Subject: [PATCH 07/11] Fixing Code Style --- .../java/org/apache/gobblin/metrics/ServiceMetricNames.java | 2 ++ .../modules/restli/FlowConfigsV2ResourceHandler.java | 6 +++--- .../orchestration/task/DagProcessingEngineMetrics.java | 4 ++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java index 347633e1d2a..606b613f0ec 100644 --- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java +++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java @@ -97,6 +97,8 @@ public class ServiceMetricNames { public static final String DAG_ACTIONS_ACT_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActFailed."; public static final String DAG_ACTIONS_ACT_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActSucceeded."; public static final String DAG_ACTIONS_CONCLUDE_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFailed."; + public static final String DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFlowSpecRemovalSucceeded"; + public static final String DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFlowSpecRemovalFailed"; public static final String DAG_ACTIONS_CONCLUDE_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeSucceeded."; public static final String DAG_ACTIONS_DELETE_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteSucceeded."; public static final String DAG_ACTIONS_DELETE_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteFailed."; diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java index 312b5d3ba27..f81898767b6 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java @@ -253,12 +253,12 @@ public CreateKVResponse, FlowConfig> cr log.warn("FlowSpec with URI {} already exists, no action will be taken", flowSpec.getUri()); try { FlowSpec storedFlowSpec = this.flowCatalog.getSpecs(flowSpec.getUri()); - if(!storedFlowSpec.isScheduled()){ + if (!storedFlowSpec.isScheduled()) { log.error("FlowSpec Already Exists As Adhoc Flow with URI: " + flowSpec.getUri()); - if(!flowSpec.isScheduled()){ + if (!flowSpec.isScheduled()) { flowSpecExistsForAdhocFlow.mark(); } - }else{ + } else { log.error("FlowSpec Already Exists As Scheduled Flow with URI: " + flowSpec.getUri()); } } catch (SpecNotFoundException e) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java index 572045d2514..e60a8a0a57f 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java @@ -167,8 +167,8 @@ public void markDagActionsConclude(DagActionStore.DagActionType dagActionType, b } } - public void markDagActionsConflowFlowSpecRemoval(DagActionStore.DagActionType dagActionType, boolean succeeded){ - if(succeeded){ + public void markDagActionsConflowFlowSpecRemoval(DagActionStore.DagActionType dagActionType, boolean succeeded) { + if (succeeded) { updateMetricForDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMetreByDagActionType, dagActionType); } else { updateMetricForDagActionType(this.dagActionsConcludeFlowSpecRemovalFailedMetreByDagActionType, dagActionType); From 9d1c02e5ad44cdb85d6b7b7530c64d6f108fa9b6 Mon Sep 17 00:00:00 2001 From: Suryakant Agarwal Date: Tue, 26 Aug 2025 13:34:07 +0530 Subject: [PATCH 08/11] Fixing Code Style For FlowConfigsV2ResourceHandler --- .../service/modules/restli/FlowConfigsV2ResourceHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java index f81898767b6..f01a3a4b1e9 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java @@ -252,7 +252,7 @@ public CreateKVResponse, FlowConfig> cr if (this.flowCatalog.exists(flowSpec.getUri())) { log.warn("FlowSpec with URI {} already exists, no action will be taken", flowSpec.getUri()); try { - FlowSpec storedFlowSpec = this.flowCatalog.getSpecs(flowSpec.getUri()); + FlowSpec storedFlowSpec = this.flowCatalog.getSpecs(flowSpec.getUri()); if (!storedFlowSpec.isScheduled()) { log.error("FlowSpec Already Exists As Adhoc Flow with URI: " + flowSpec.getUri()); if (!flowSpec.isScheduled()) { From f5e8dc3ce98673823d0d60f0fd26ae789c543ee3 Mon Sep 17 00:00:00 2001 From: Suryakant Agarwal Date: Tue, 26 Aug 2025 13:37:02 +0530 Subject: [PATCH 09/11] Changing Method Name and Code Style --- .../service/modules/restli/FlowConfigsV2ResourceHandler.java | 2 +- .../modules/orchestration/task/DagProcessingEngineMetrics.java | 2 +- .../service/modules/orchestration/task/LaunchDagTask.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java index f01a3a4b1e9..a716ff3191e 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java @@ -86,7 +86,7 @@ public class FlowConfigsV2ResourceHandler implements FlowConfigsResourceHandlerI protected FlowCatalog flowCatalog; protected final ContextAwareMeter createFlow; protected final ContextAwareMeter deleteFlow; - protected final ContextAwareMeter flowSpecExistsForAdhocFlow; + protected final ContextAwareMeter flowSpecExistsForAdhocFlow; protected final ContextAwareMeter runImmediatelyFlow; @Inject diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java index e60a8a0a57f..d78a362039a 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java @@ -167,7 +167,7 @@ public void markDagActionsConclude(DagActionStore.DagActionType dagActionType, b } } - public void markDagActionsConflowFlowSpecRemoval(DagActionStore.DagActionType dagActionType, boolean succeeded) { + public void markDagActionsConcludeFlowSpecRemoval(DagActionStore.DagActionType dagActionType, boolean succeeded) { if (succeeded) { updateMetricForDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMetreByDagActionType, dagActionType); } else { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java index a31c9391d44..b5fbb960a55 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java @@ -58,7 +58,7 @@ public final boolean conclude() { //This can throw Runtime, IllegalState and IO Exceptions which are not caught here. dagManagementStateStore.removeFlowSpec(flowSpec.getUri(), new Properties(), false); } catch (Exception e) { - super.dagProcEngineMetrics.markDagActionsConflowFlowSpecRemoval(this.dagAction.getDagActionType(), false); + super.dagProcEngineMetrics.markDagActionsConcludeFlowSpecRemoval(this.dagAction.getDagActionType(), false); log.error("Failed to Remove The FlowSpec For Adhoc Flow with URI: " + flowSpec.getUri()); return false; } From bb0ba838af6fa2d01ab6326a38381c9d074efe1f Mon Sep 17 00:00:00 2001 From: Suryakant Agarwal Date: Tue, 26 Aug 2025 13:58:47 +0530 Subject: [PATCH 10/11] Updating Review Comments --- .../org/apache/gobblin/metrics/ServiceMetricNames.java | 4 ++-- .../modules/restli/FlowConfigsV2ResourceHandler.java | 6 +++--- .../orchestration/task/DagProcessingEngineMetrics.java | 7 ++++--- .../service/modules/orchestration/task/LaunchDagTask.java | 1 + 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java index 606b613f0ec..9edc5e3525e 100644 --- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java +++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java @@ -97,8 +97,8 @@ public class ServiceMetricNames { public static final String DAG_ACTIONS_ACT_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActFailed."; public static final String DAG_ACTIONS_ACT_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActSucceeded."; public static final String DAG_ACTIONS_CONCLUDE_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFailed."; - public static final String DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFlowSpecRemovalSucceeded"; - public static final String DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFlowSpecRemovalFailed"; + public static final String DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFlowSpecRemovalSucceeded."; + public static final String DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFlowSpecRemovalFailed."; public static final String DAG_ACTIONS_CONCLUDE_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeSucceeded."; public static final String DAG_ACTIONS_DELETE_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteSucceeded."; public static final String DAG_ACTIONS_DELETE_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteFailed."; diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java index a716ff3191e..1e85169de41 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java @@ -102,7 +102,7 @@ public FlowConfigsV2ResourceHandler(@Named(InjectionNames.SERVICE_NAME) String s this.runImmediatelyFlow = metricContext.contextAwareMeter( MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.RUN_IMMEDIATELY_FLOW_METER)); this.flowSpecExistsForAdhocFlow = metricContext.contextAwareMeter( - MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames. RUN_IMMEDIATELY_FLOW_METER)); + MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.FLOW_SPEC_EXISTS_FOR_ADHOC_FLOW)); } public FlowConfig getFlowConfig(FlowId flowId) @@ -254,12 +254,12 @@ public CreateKVResponse, FlowConfig> cr try { FlowSpec storedFlowSpec = this.flowCatalog.getSpecs(flowSpec.getUri()); if (!storedFlowSpec.isScheduled()) { - log.error("FlowSpec Already Exists As Adhoc Flow with URI: " + flowSpec.getUri()); + log.warn("FlowSpec Already Exists As Adhoc Flow with URI: " + flowSpec.getUri()); if (!flowSpec.isScheduled()) { flowSpecExistsForAdhocFlow.mark(); } } else { - log.error("FlowSpec Already Exists As Scheduled Flow with URI: " + flowSpec.getUri()); + log.warn("FlowSpec Already Exists As Scheduled Flow with URI: " + flowSpec.getUri()); } } catch (SpecNotFoundException e) { log.error("Error Retrieving FLow For Existing Flow With URI: " + flowSpec.getUri()); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java index d78a362039a..3cd7b412289 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java @@ -59,7 +59,8 @@ public class DagProcessingEngineMetrics { private final HashMap dagActionsActSucceededMeterByDagActionType = new HashMap<>(); private final HashMap dagActionsConcludeFailedMeterByDagActionType = new HashMap<>(); private final HashMap dagActionsConcludeSucceededMeterByDagActionType = new HashMap<>(); - private final HashMap dagActionsConcludeFlowSpecRemovalSucceededMetreByDagActionType = new HashMap<>(); + private final HashMap + dagActionsConcludeFlowSpecRemovalSucceededMeterByDagActionType = new HashMap<>(); private final HashMap dagActionsConcludeFlowSpecRemovalFailedMetreByDagActionType = new HashMap<>(); private final HashMap dagActionsDeleteFailedMeterByDagActionType = new HashMap<>(); private final HashMap dagActionsDeleteSucceededMeterByDagActionType = new HashMap<>(); @@ -91,7 +92,7 @@ public void registerAllMetrics() { registerMetricForEachDagActionType(this.dagActionsActSucceededMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_ACT_SUCCEEDED); registerMetricForEachDagActionType(this.dagActionsConcludeFailedMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FAILED); registerMetricForEachDagActionType(this.dagActionsConcludeSucceededMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_SUCCEEDED); - registerMetricForEachDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMetreByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED); + registerMetricForEachDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED); registerMetricForEachDagActionType(this.dagActionsConcludeFlowSpecRemovalFailedMetreByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_FAILED); registerMetricForEachDagActionType(this.dagActionsDeleteFailedMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_DELETE_FAILED); registerMetricForEachDagActionType(this.dagActionsDeleteSucceededMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_DELETE_SUCCEEDED); @@ -169,7 +170,7 @@ public void markDagActionsConclude(DagActionStore.DagActionType dagActionType, b public void markDagActionsConcludeFlowSpecRemoval(DagActionStore.DagActionType dagActionType, boolean succeeded) { if (succeeded) { - updateMetricForDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMetreByDagActionType, dagActionType); + updateMetricForDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMeterByDagActionType, dagActionType); } else { updateMetricForDagActionType(this.dagActionsConcludeFlowSpecRemovalFailedMetreByDagActionType, dagActionType); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java index b5fbb960a55..183f394f69b 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java @@ -62,6 +62,7 @@ public final boolean conclude() { log.error("Failed to Remove The FlowSpec For Adhoc Flow with URI: " + flowSpec.getUri()); return false; } + super.dagProcEngineMetrics.markDagActionsConcludeFlowSpecRemoval(this.dagAction.getDagActionType(), true); } return true; } From 05af7928ba23c6f29e70ce6c484df17b8920caea Mon Sep 17 00:00:00 2001 From: Suryakant Agarwal Date: Tue, 26 Aug 2025 14:00:28 +0530 Subject: [PATCH 11/11] Updating Review Comments --- .../orchestration/task/DagProcessingEngineMetrics.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java index 3cd7b412289..be39a7d27ee 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java @@ -61,7 +61,8 @@ public class DagProcessingEngineMetrics { private final HashMap dagActionsConcludeSucceededMeterByDagActionType = new HashMap<>(); private final HashMap dagActionsConcludeFlowSpecRemovalSucceededMeterByDagActionType = new HashMap<>(); - private final HashMap dagActionsConcludeFlowSpecRemovalFailedMetreByDagActionType = new HashMap<>(); + private final HashMap + dagActionsConcludeFlowSpecRemovalFailedMeterByDagActionType = new HashMap<>(); private final HashMap dagActionsDeleteFailedMeterByDagActionType = new HashMap<>(); private final HashMap dagActionsDeleteSucceededMeterByDagActionType = new HashMap<>(); private final HashMap dagActionsAverageProcessingDelayMillisMeterByDagActionType = new HashMap<>(); @@ -93,7 +94,7 @@ public void registerAllMetrics() { registerMetricForEachDagActionType(this.dagActionsConcludeFailedMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FAILED); registerMetricForEachDagActionType(this.dagActionsConcludeSucceededMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_SUCCEEDED); registerMetricForEachDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED); - registerMetricForEachDagActionType(this.dagActionsConcludeFlowSpecRemovalFailedMetreByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_FAILED); + registerMetricForEachDagActionType(this.dagActionsConcludeFlowSpecRemovalFailedMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_FAILED); registerMetricForEachDagActionType(this.dagActionsDeleteFailedMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_DELETE_FAILED); registerMetricForEachDagActionType(this.dagActionsDeleteSucceededMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_DELETE_SUCCEEDED); registerMetricForEachDagActionType(this.dagActionsAverageProcessingDelayMillisMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_AVERAGE_PROCESSING_DELAY_MILLIS); @@ -172,7 +173,7 @@ public void markDagActionsConcludeFlowSpecRemoval(DagActionStore.DagActionType d if (succeeded) { updateMetricForDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMeterByDagActionType, dagActionType); } else { - updateMetricForDagActionType(this.dagActionsConcludeFlowSpecRemovalFailedMetreByDagActionType, dagActionType); + updateMetricForDagActionType(this.dagActionsConcludeFlowSpecRemovalFailedMeterByDagActionType, dagActionType); } }