From 8f348cad17375d76a7c85f1381318664fb66aaa7 Mon Sep 17 00:00:00 2001 From: tarunm Date: Fri, 19 Jun 2026 11:04:22 +0000 Subject: [PATCH 1/3] fix(minion): fail single-segment task when segment upload fails BaseSingleSegmentConversionExecutor caught upload exceptions, logged and metered them, then returned the conversion result normally -- so the minion task reported SUCCESS even though the converted segment was never uploaded. Helix marked the task COMPLETED and never retried, silently leaving the segment un-refreshed/un-purged/un-compacted. The swallow was an accidental control-flow change introduced in #10978 (observability for upload/download failures): the download branch metered, logged, and rethrew, but the upload branch omitted the rethrow. Rethrow the upload exception, mirroring the download path, so the task fails and is retried. Move the tarred-file cleanup into a finally block so it still runs on the failure path, and remove the now-dead uploadSuccessful flag. Affects all single-segment conversion tasks (RealtimeToOffline, Purge, RefreshSegment, UpsertCompaction, etc.). On upload failure these now report task FAILURE (and retry) instead of SUCCESS; operators alerting on task state will see failures that were previously hidden. --- .../BaseSingleSegmentConversionExecutor.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java index d3a0494dce7f..241069bcbc8f 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java @@ -195,7 +195,6 @@ public SegmentConversionResult executeTask(PinotTaskConfig pinotTaskConfig) BatchConfigProperties.SegmentPushType pushType = getSegmentPushType(configs); _eventObserver.notifyProgress(_pinotTaskConfig, "Uploading segment: " + segmentName + " (push mode: " + pushType + ")"); - boolean uploadSuccessful = true; try { switch (pushType) { case TAR: @@ -210,19 +209,17 @@ public SegmentConversionResult executeTask(PinotTaskConfig pinotTaskConfig) throw new UnsupportedOperationException("Unrecognized push mode: " + pushType); } } catch (Exception e) { - uploadSuccessful = false; _minionMetrics.addMeteredTableValue(tableNameWithType, MinionMeter.SEGMENT_UPLOAD_FAIL_COUNT, 1L); LOGGER.error("Segment upload failed for segment {}, table {}", segmentName, tableNameWithType, e); _eventObserver.notifyTaskError(_pinotTaskConfig, e); - } - if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) { - LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath()); - } - - if (uploadSuccessful) { - LOGGER.info("Done executing {} on table: {}, segment: {}", taskType, tableNameWithType, segmentName); + throw e; + } finally { + if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) { + LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath()); + } } + LOGGER.info("Done executing {} on table: {}, segment: {}", taskType, tableNameWithType, segmentName); return segmentConversionResult; } finally { FileUtils.deleteQuietly(tempDataDir); From 2f3526f316654d93fba8c00c1d0762e95181486b Mon Sep 17 00:00:00 2001 From: tarunm Date: Fri, 19 Jun 2026 11:42:57 +0000 Subject: [PATCH 2/3] test(minion): add regression test for single-segment upload-failure handling Adds BaseSingleSegmentConversionExecutorTest covering executeTask's upload- failure path: a failed segment upload must propagate (task fails and retries) instead of being silently reported as success. The test static-mocks SegmentConversionUtils.uploadSegment and uses a test-only executor that stubs the download/CRC/convert/ZK-modifier hooks so executeTask reaches the upload step without a server, controller, or deep store. Includes a success-path control test. Verified to fail on the pre-fix executor and pass on the fixed one. --- ...seSingleSegmentConversionExecutorTest.java | 192 ++++++++++++++++++ 1 file changed, 192 insertions(+) create mode 100644 pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutorTest.java diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutorTest.java new file mode 100644 index 000000000000..f2110fdd2813 --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutorTest.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; +import org.apache.pinot.common.metrics.MinionMetrics; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.minion.MinionContext; +import org.apache.pinot.minion.event.MinionEventObservers; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.spi.config.instance.InstanceType; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +/** + * Tests the {@link BaseSingleSegmentConversionExecutor#executeTask} upload-failure handling: a segment-upload failure + * must propagate so the task is marked failed (and retried) rather than being silently reported as successful. + */ +public class BaseSingleSegmentConversionExecutorTest { + private static final File TEMP_DIR = + new File(FileUtils.getTempDirectory(), "BaseSingleSegmentConversionExecutorTest"); + private static final File SEGMENT_DIR = new File(TEMP_DIR, "segment"); + private static final File DATA_DIR = new File(TEMP_DIR, "minionData"); + + private static final int NUM_ROWS = 5; + private static final String RAW_TABLE_NAME = "testTable"; + private static final String TABLE_NAME_WITH_TYPE = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME); + private static final String SEGMENT_NAME = "testSegment"; + private static final String TASK_TYPE = "TestSingleSegmentConversionTask"; + private static final String TASK_ID = "Task_" + TASK_TYPE + "_0"; + private static final long SEGMENT_CRC = 100L; + private static final String D1 = "d1"; + + private File _segmentIndexDir; + + @BeforeClass + public void setUp() + throws Exception { + FileUtils.deleteDirectory(TEMP_DIR); + MinionMetrics.register(Mockito.mock(MinionMetrics.class)); + + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(D1, FieldSpec.DataType.INT).build(); + List rows = new ArrayList<>(NUM_ROWS); + for (int i = 0; i < NUM_ROWS; i++) { + GenericRow row = new GenericRow(); + row.putValue(D1, i); + rows.add(row); + } + + SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema); + config.setInstanceType(InstanceType.MINION); + config.setOutDir(SEGMENT_DIR.getPath()); + config.setSegmentName(SEGMENT_NAME); + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(config, new GenericRowRecordReader(rows)); + driver.build(); + _segmentIndexDir = new File(SEGMENT_DIR, SEGMENT_NAME); + + Assert.assertTrue(DATA_DIR.mkdirs()); + MinionContext.getInstance().setDataDir(DATA_DIR); + // executeTask resolves the event observer from the registry by task id; register one so it is non-null. + MinionEventObservers.getInstance().addMinionEventObserver(TASK_ID, MinionTaskTestUtils.getMinionProgressObserver()); + } + + @Test + public void testExecuteTaskRethrowsWhenUploadFails() + throws Exception { + try (MockedStatic mocked = Mockito.mockStatic(SegmentConversionUtils.class)) { + mocked.when(() -> SegmentConversionUtils.uploadSegment(Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.any(File.class))) + .thenThrow(new RuntimeException("simulated upload failure")); + + TestSingleSegmentConversionExecutor executor = new TestSingleSegmentConversionExecutor(); + try { + executor.executeTask(createTaskConfig()); + Assert.fail("executeTask must rethrow when segment upload fails, not report success"); + } catch (RuntimeException e) { + Assert.assertEquals(e.getMessage(), "simulated upload failure"); + } + } + } + + @Test + public void testExecuteTaskSucceedsWhenUploadSucceeds() + throws Exception { + try (MockedStatic mocked = Mockito.mockStatic(SegmentConversionUtils.class)) { + // uploadSegment is a no-op by default for the mocked static, simulating a successful upload. + TestSingleSegmentConversionExecutor executor = new TestSingleSegmentConversionExecutor(); + SegmentConversionResult result = executor.executeTask(createTaskConfig()); + Assert.assertEquals(result.getSegmentName(), SEGMENT_NAME); + Assert.assertEquals(result.getTableNameWithType(), TABLE_NAME_WITH_TYPE); + mocked.verify(() -> SegmentConversionUtils.uploadSegment(Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.any(File.class))); + } + } + + private PinotTaskConfig createTaskConfig() { + Map configs = new HashMap<>(); + configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_WITH_TYPE); + configs.put(MinionConstants.SEGMENT_NAME_KEY, SEGMENT_NAME); + configs.put(MinionConstants.DOWNLOAD_URL_KEY, "http://unused/download"); + configs.put(MinionConstants.UPLOAD_URL_KEY, "http://unused/upload"); + configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, Long.toString(SEGMENT_CRC)); + configs.put("TASK_ID", TASK_ID); + return new PinotTaskConfig(TASK_TYPE, configs); + } + + @AfterClass + public void tearDown() + throws Exception { + // Restore the process-global state mutated in setUp so it does not leak into other test classes. + MinionEventObservers.getInstance().removeMinionEventObserver(TASK_ID); + MinionContext.getInstance().setDataDir(null); + FileUtils.deleteDirectory(TEMP_DIR); + } + + /** + * Minimal concrete executor that stubs out the infrastructure-dependent hooks (download, CRC check, conversion, ZK + * metadata modifier) so {@code executeTask} runs to the upload step without a server, controller, or deep store. + */ + private class TestSingleSegmentConversionExecutor extends BaseSingleSegmentConversionExecutor { + @Override + protected File downloadSegmentToLocalAndUntar(String tableNameWithType, String segmentName, String deepstoreURL, + String taskType, File tempDataDir, String suffix) + throws Exception { + File indexDir = new File(tempDataDir, "inputSegment"); + FileUtils.copyDirectory(_segmentIndexDir, indexDir); + return indexDir; + } + + @Override + protected long getSegmentCrc(String tableNameWithType, String segmentName) { + return SEGMENT_CRC; + } + + @Override + protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File indexDir, File workingDir) + throws Exception { + File convertedDir = new File(workingDir, SEGMENT_NAME); + FileUtils.copyDirectory(indexDir, convertedDir); + return new SegmentConversionResult.Builder().setFile(convertedDir) + .setTableNameWithType(pinotTaskConfig.getConfigs().get(MinionConstants.TABLE_NAME_KEY)) + .setSegmentName(SEGMENT_NAME).build(); + } + + @Override + protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig, + SegmentConversionResult segmentConversionResult) { + return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, + Collections.emptyMap()); + } + } +} From ba5a53b4be16aaabceff8d8a9e9a5fd5fa29b3fc Mon Sep 17 00:00:00 2001 From: tarunm Date: Mon, 22 Jun 2026 05:01:49 +0000 Subject: [PATCH 3/3] fix(minion): clean up staged tar when metadata push fails In METADATA push mode, uploadSegmentWithMetadata stages the converted tar to the output PinotFS before sending metadata to the controller. With the upload-failure rethrow, a failed metadata send now triggers a task retry whose moveSegmentToOutputPinotFS would fail with "Output file already exists" (overwriteOutput defaults to false), making transient push failures permanently stuck. Delete the staged tar on metadata-push failure before rethrowing so the retry can re-stage it and self-heal. Cleanup-on-failure is used instead of forcing overwriteOutput=true so the existing collision guard stays intact. Add a regression test verifying the staged tar is deleted on metadata-push failure (fails without the cleanup). --- .../BaseSingleSegmentConversionExecutor.java | 18 ++++++- ...seSingleSegmentConversionExecutorTest.java | 53 +++++++++++++++++++ 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java index 241069bcbc8f..8a2e9442fc15 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java @@ -253,8 +253,22 @@ private void uploadSegmentWithMetadata(Map configs, PinotTaskCon try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(configs, outputSegmentDirURI)) { Map segmentUriToTarPathMap = SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec, new String[]{outputSegmentTarURI.toString()}); - SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, metadataHeaders, - parameters); + try { + SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, metadataHeaders, + parameters); + } catch (Exception e) { + // The tar was already staged to the output PinotFS before this failure. If the task is retried, the next + // moveSegmentToOutputPinotFS() would fail with "Output file already exists" (overwriteOutput defaults to + // false), making transient metadata-push failures permanently stuck. Delete the staged tar so the retry can + // re-stage it and self-heal. + try { + outputFileFS.delete(outputSegmentTarURI, true); + } catch (Exception deleteException) { + LOGGER.warn("Failed to delete staged segment tar: {} after metadata push failure, the next retry may fail " + + "with 'Output file already exists'", outputSegmentTarURI, deleteException); + } + throw e; + } } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutorTest.java index f2110fdd2813..186bbed65aa6 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutorTest.java @@ -19,6 +19,7 @@ package org.apache.pinot.plugin.minion.tasks; import java.io.File; +import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -33,6 +34,7 @@ import org.apache.pinot.minion.event.MinionEventObservers; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; +import org.apache.pinot.segment.local.utils.SegmentPushUtils; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.config.table.TableConfig; @@ -40,6 +42,8 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.mockito.MockedStatic; @@ -144,6 +148,55 @@ private PinotTaskConfig createTaskConfig() { return new PinotTaskConfig(TASK_TYPE, configs); } + /** + * Verifies that when a METADATA-mode push fails after the converted tar was already staged to the output PinotFS, + * the staged tar is deleted before the exception propagates. Without this cleanup the rethrow would make the retry + * fail in moveSegmentToOutputPinotFS with "Output file already exists" (overwriteOutput defaults to false), so + * transient push failures would never self-heal. + */ + @Test + public void testExecuteTaskCleansUpStagedTarWhenMetadataPushFails() + throws Exception { + File outputDir = new File(TEMP_DIR, "output"); + FileUtils.forceMkdir(outputDir); + PinotFS mockOutputFS = Mockito.mock(PinotFS.class); + Mockito.when(mockOutputFS.exists(Mockito.any())).thenReturn(false); + + try (MockedStatic minionTaskUtils = + Mockito.mockStatic(MinionTaskUtils.class, Mockito.CALLS_REAL_METHODS); + MockedStatic segmentPushUtils = + Mockito.mockStatic(SegmentPushUtils.class, Mockito.CALLS_REAL_METHODS)) { + minionTaskUtils.when(() -> MinionTaskUtils.getOutputPinotFS(Mockito.any(), Mockito.any())) + .thenReturn(mockOutputFS); + segmentPushUtils.when(() -> SegmentPushUtils.sendSegmentUriAndMetadata(Mockito.any(), Mockito.any(), + Mockito.any(), Mockito.anyList(), Mockito.anyList())) + .thenThrow(new RuntimeException("simulated metadata push failure")); + + TestSingleSegmentConversionExecutor executor = new TestSingleSegmentConversionExecutor(); + try { + executor.executeTask(createMetadataPushTaskConfig(outputDir)); + Assert.fail("executeTask must rethrow when metadata push fails"); + } catch (RuntimeException e) { + Assert.assertEquals(e.getMessage(), "simulated metadata push failure"); + } + // The staged tar must be deleted so a retry can re-stage it and self-heal. + Mockito.verify(mockOutputFS).delete(Mockito.any(URI.class), Mockito.eq(true)); + } + } + + private PinotTaskConfig createMetadataPushTaskConfig(File outputDir) { + Map configs = new HashMap<>(); + configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_WITH_TYPE); + configs.put(MinionConstants.SEGMENT_NAME_KEY, SEGMENT_NAME); + configs.put(MinionConstants.DOWNLOAD_URL_KEY, "http://unused/download"); + configs.put(MinionConstants.UPLOAD_URL_KEY, "http://unused/upload"); + configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, Long.toString(SEGMENT_CRC)); + configs.put("TASK_ID", TASK_ID); + configs.put(BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.METADATA.name()); + configs.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI, outputDir.toURI().toString()); + return new PinotTaskConfig(TASK_TYPE, configs); + } + @AfterClass public void tearDown() throws Exception {