From d9a93b49b4f59ea43d5bf831d536af08350e2d35 Mon Sep 17 00:00:00 2001 From: Arnav Balyan Date: Sat, 6 Jun 2026 16:37:42 +0530 Subject: [PATCH] update --- .../hive/mapred/PaimonOutputCommitter.java | 34 ++++-- .../mapred/PaimonOutputCommitterTest.java | 105 ++++++++++++++++++ 2 files changed, 131 insertions(+), 8 deletions(-) create mode 100644 paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonOutputCommitterTest.java diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java index 94bc4a675ae8..0fd37beb93fe 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java @@ -144,7 +144,7 @@ public void commitJob(JobContext originalContext) throws IOException { if (table != null) { BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder(); List commitMessagesList = - getAllPreCommitMessage(table.location(), jobContext, table.fileIO()); + getAllPreCommitMessage(table.location(), jobContext, table.fileIO(), false); try (BatchTableCommit batchTableCommit = batchWriteBuilder.newCommit()) { batchTableCommit.commit(commitMessagesList); } catch (Exception e) { @@ -172,7 +172,7 @@ public void abortJob(JobContext originalContext, int status) throws IOException LOG.info("AbortJob {} has started", jobContext.getJobID()); List commitMessagesList = - getAllPreCommitMessage(table.location(), jobContext, table.fileIO()); + getAllPreCommitMessage(table.location(), jobContext, table.fileIO(), true); BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder(); try (BatchTableCommit batchTableCommit = batchWriteBuilder.newCommit()) { batchTableCommit.abort(commitMessagesList); @@ -214,7 +214,7 @@ private void deleteTemporaryFile(JobContext jobContext, Path location, FileIO fi * @return The list of the committed data files */ private static List getAllPreCommitMessage( - Path location, JobContext jobContext, FileIO io) { + Path location, JobContext jobContext, FileIO io, boolean ignoreMissing) { JobConf conf = jobContext.getJobConf(); int totalCommitMessagesSize = @@ -225,7 +225,7 @@ private static List getAllPreCommitMessage( for (int i = 0; i < totalCommitMessagesSize; i++) { Path commitFileLocation = generatePreCommitFileLocation(location, jobContext.getJobID(), i); - commitMessagesList.addAll(readPreCommitFile(commitFileLocation, io)); + commitMessagesList.addAll(readPreCommitFile(commitFileLocation, io, ignoreMissing)); } return commitMessagesList; @@ -270,10 +270,28 @@ private static void createPreCommitFile( } } - private static List readPreCommitFile(Path location, FileIO io) { - try (ObjectInputStream objectInputStream = - new ObjectInputStream(io.newInputStream(location))) { - return (List) objectInputStream.readObject(); + private static List readPreCommitFile( + Path location, FileIO io, boolean ignoreMissing) { + try { + if (!io.exists(location)) { + if (ignoreMissing) { + LOG.warn( + "preCommit file {} was not generated. The task did not " + + "reach prepareCommit, so there are no commit " + + "messages to abort. Skipping commit cleanup " + + "for this task slot.", + location); + return Collections.emptyList(); + } + throw new RuntimeException( + String.format( + "preCommit file %s is missing during commit. Refusing to commit a partial result.", + location)); + } + try (ObjectInputStream objectInputStream = + new ObjectInputStream(io.newInputStream(location))) { + return (List) objectInputStream.readObject(); + } } catch (ClassNotFoundException | IOException e) { throw new RuntimeException( String.format("Can not read or parse CommitMessage file: %s", location)); diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonOutputCommitterTest.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonOutputCommitterTest.java new file mode 100644 index 000000000000..491a422af6f0 --- /dev/null +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonOutputCommitterTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive.mapred; + +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.table.sink.CommitMessage; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.ObjectOutputStream; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link PaimonOutputCommitter}. */ +public class PaimonOutputCommitterTest { + + @Rule public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void readPreCommitFileReturnsEmptyWhenFileMissingAndIgnoreMissing() throws Exception { + Path missing = new Path(folder.newFolder().getAbsolutePath(), "task_0.preCommit"); + List result = invokeReadPreCommitFile(missing, true); + assertThat(result).isEmpty(); + } + + @Test + public void readPreCommitFileThrowsWhenFileMissingAndStrict() throws Exception { + Path missing = new Path(folder.newFolder().getAbsolutePath(), "task_0.preCommit"); + org.assertj.core.api.Assertions.assertThatThrownBy( + () -> invokeReadPreCommitFile(missing, false)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("missing during commit"); + } + + @Test + public void readPreCommitFileThrowsOnCorruptFile() throws Exception { + java.io.File f = folder.newFile("task_0.preCommit"); + try (java.io.FileOutputStream fos = new java.io.FileOutputStream(f)) { + fos.write(new byte[] {0x00, 0x01, 0x02, 0x03}); + } + Path corrupt = new Path(f.getAbsolutePath()); + org.assertj.core.api.Assertions.assertThatThrownBy( + () -> invokeReadPreCommitFile(corrupt, true)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Can not read or parse CommitMessage file"); + } + + @Test + public void readPreCommitFileReturnsListForValidFile() throws Exception { + java.io.File f = folder.newFile("task_0.preCommit"); + try (ObjectOutputStream oos = new ObjectOutputStream(new java.io.FileOutputStream(f))) { + oos.writeObject(Collections.emptyList()); + } + Path written = new Path(f.getAbsolutePath()); + List result = invokeReadPreCommitFile(written, false); + assertThat(result).isNotNull().isEmpty(); + } + + @SuppressWarnings("unchecked") + private static List invokeReadPreCommitFile(Path location, boolean ignoreMissing) + throws Exception { + Method m = + PaimonOutputCommitter.class.getDeclaredMethod( + "readPreCommitFile", + Path.class, + org.apache.paimon.fs.FileIO.class, + boolean.class); + m.setAccessible(true); + try { + return (List) + m.invoke(null, location, LocalFileIO.create(), ignoreMissing); + } catch (java.lang.reflect.InvocationTargetException e) { + Throwable cause = e.getCause(); + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } + if (cause instanceof Error) { + throw (Error) cause; + } + throw new RuntimeException(cause); + } + } +}