diff --git a/be/src/exec/operator/result_sink_operator.h b/be/src/exec/operator/result_sink_operator.h index 752e6a367c9678..7fdbca0b8dc549 100644 --- a/be/src/exec/operator/result_sink_operator.h +++ b/be/src/exec/operator/result_sink_operator.h @@ -57,6 +57,9 @@ struct ResultFileOptions { // TODO: we should merge parquet_commpression_type/orc_compression_type/compression_type TFileCompressType::type compression_type = TFileCompressType::PLAIN; + // Deprecated compatibility flag. New FE handles outfile delete_existing_files in FE + // and clears this field before sending the result sink to BE. Keep reading it here + // only for compatibility with older FE during rolling upgrade. bool delete_existing_files = false; std::string file_suffix; //Bring BOM when exporting to CSV format @@ -70,6 +73,7 @@ struct ResultFileOptions { line_delimiter = t_opt.__isset.line_delimiter ? t_opt.line_delimiter : "\n"; max_file_size_bytes = t_opt.__isset.max_file_size_bytes ? t_opt.max_file_size_bytes : max_file_size_bytes; + // Deprecated compatibility path. New FE should already have cleared this flag. delete_existing_files = t_opt.__isset.delete_existing_files ? t_opt.delete_existing_files : false; file_suffix = t_opt.file_suffix; diff --git a/be/src/exec/sink/writer/vfile_result_writer.cpp b/be/src/exec/sink/writer/vfile_result_writer.cpp index d7e4149c551d7b..c198eaa7e21f4e 100644 --- a/be/src/exec/sink/writer/vfile_result_writer.cpp +++ b/be/src/exec/sink/writer/vfile_result_writer.cpp @@ -96,7 +96,9 @@ Status VFileResultWriter::open(RuntimeState* state, RuntimeProfile* profile) { _file_opts->orc_writer_version < 1) { return Status::InternalError("orc writer version is less than 1."); } - // Delete existing files + // Deprecated compatibility path. New FE already deletes the target directory in FE + // and clears delete_existing_files before BE execution. Keep this branch only for + // requests from older FE versions during rolling upgrade. if (_file_opts->delete_existing_files) { RETURN_IF_ERROR(_delete_dir()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index cc7ecf52c4e6a7..c0fc647fd43f18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -187,6 +187,10 @@ public long getMaxFileSizeBytes() { return maxFileSizeBytes; } + public boolean shouldDeleteExistingFiles() { + return deleteExistingFiles; + } + public BrokerDesc getBrokerDesc() { return brokerDesc; } @@ -454,7 +458,6 @@ private void checkOrcType(String orcType, String expectType, boolean isEqual, St + ", should use " + expectType + ", but the definition type is " + orcType); } - private void analyzeForParquetFormat(List resultExprs, List colLabels) throws AnalysisException { if (this.parquetSchemas.isEmpty()) { genParquetColumnName(resultExprs, colLabels); @@ -540,6 +543,9 @@ private void analyzeProperties() throws UserException { + " To enable this feature, you need to add `enable_delete_existing_files=true`" + " in fe.conf"); } + if (deleteExistingFiles && isLocalOutput) { + throw new AnalysisException("Local file system does not support delete existing files"); + } copiedProps.remove(PROP_DELETE_EXISTING_FILES); } @@ -769,5 +775,3 @@ public TResultFileSinkOptions toSinkOptions() { return sinkOptions; } } - - diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index e5b7c1a7f45376..e44d1d9ea4f859 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -119,6 +119,18 @@ public static void deleteDirectoryWithFileSystem(String path, BrokerDesc brokerD } } + public static void deleteParentDirectoryWithFileSystem(String path, BrokerDesc brokerDesc) throws UserException { + deleteDirectoryWithFileSystem(extractParentDirectory(path), brokerDesc); + } + + public static String extractParentDirectory(String path) { + int lastSlash = path.lastIndexOf('/'); + if (lastSlash >= 0) { + return path.substring(0, lastSlash + 1); + } + return path; + } + public static String printBroker(String brokerName, TNetworkAddress address) { return brokerName + "[" + address.toString() + "]"; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index 8de5a1c95c6359..c2b1be2a230be0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -114,9 +114,7 @@ public void addExportJobAndRegisterTask(ExportJob job) throws Exception { try { // delete existing files if (Boolean.parseBoolean(job.getDeleteExistingFiles())) { - String fullPath = job.getExportPath(); - BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, fullPath.lastIndexOf('/') + 1), - job.getBrokerDesc()); + BrokerUtil.deleteParentDirectoryWithFileSystem(job.getExportPath(), job.getBrokerDesc()); } // ATTN: Must add task after edit log, otherwise the job may finish before adding job. for (int i = 0; i < job.getCopiedTaskExecutors().size(); i++) { @@ -554,4 +552,3 @@ public long getJobNum(ExportJobState state) { return size; } } - diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java index 4847cd028fa3bb..fae7747b5d42f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java @@ -23,6 +23,7 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.Status; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.datasource.property.storage.StorageProperties; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.fs.remote.RemoteFileSystem; @@ -176,7 +177,7 @@ private void deleteExistingFilesInFE(String tvfName, Map props) throws Exception { String filePath = props.get("file_path"); // Extract parent directory from prefix path: s3://bucket/path/to/prefix_ -> s3://bucket/path/to/ - String parentDir = extractParentDirectory(filePath); + String parentDir = BrokerUtil.extractParentDirectory(filePath); LOG.info("TVF sink: deleting existing files in directory: {}", parentDir); // Copy props for building StorageProperties (exclude write-specific params) @@ -198,12 +199,4 @@ private void deleteExistingFilesInFE(String tvfName, Map props) + parentDir + ": " + deleteStatus.getErrMsg()); } } - - private static String extractParentDirectory(String prefixPath) { - int lastSlash = prefixPath.lastIndexOf('/'); - if (lastSlash >= 0) { - return prefixPath.substring(0, lastSlash + 1); - } - return prefixPath; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java index 0ba8311097a313..520e34e27c287b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java @@ -88,6 +88,11 @@ public void setBrokerAddr(String ip, int port) { fileSinkOptions.setBrokerAddresses(Lists.newArrayList(new TNetworkAddress(ip, port))); } + public void setDeleteExistingFiles(boolean deleteExistingFiles) { + Preconditions.checkNotNull(fileSinkOptions); + fileSinkOptions.setDeleteExistingFiles(deleteExistingFiles); + } + public void resetByDataStreamSink(DataStreamSink dataStreamSink) { exchNodeId = dataStreamSink.getExchNodeId(); outputPartition = dataStreamSink.getOutputPartition(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 1a7d4df0c49b0e..b39d5b0d62bbc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -54,6 +54,7 @@ import org.apache.doris.common.profile.ProfileManager.ProfileType; import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder; +import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DebugPointUtil.DebugPoint; import org.apache.doris.common.util.DebugUtil; @@ -105,8 +106,10 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache; import org.apache.doris.planner.GroupCommitScanNode; import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanNode; import org.apache.doris.planner.Planner; +import org.apache.doris.planner.ResultFileSink; import org.apache.doris.planner.ScanNode; import org.apache.doris.proto.Data; import org.apache.doris.proto.InternalService; @@ -1298,8 +1301,16 @@ public void executeAndSendResult(boolean isOutfileQuery, boolean isSendFields, } coordBase.setIsProfileSafeStmt(this.isProfileSafeStmt()); + OutFileClause outFileClause = null; + if (isOutfileQuery) { + outFileClause = queryStmt.getOutFileClause(); + Preconditions.checkState(outFileClause != null, "OUTFILE query must have OutFileClause"); + } try { + if (outFileClause != null) { + deleteExistingOutfileFilesInFe(outFileClause); + } coordBase.exec(); profile.getSummaryProfile().setQueryScheduleFinishTime(TimeUtils.getStartTimeMs()); updateProfile(false); @@ -1339,8 +1350,8 @@ public void executeAndSendResult(boolean isOutfileQuery, boolean isSendFields, sendFields(queryStmt.getColLabels(), queryStmt.getFieldInfos(), getReturnTypes(queryStmt)); } else { - if (!Strings.isNullOrEmpty(queryStmt.getOutFileClause().getSuccessFileName())) { - outfileWriteSuccess(queryStmt.getOutFileClause()); + if (!Strings.isNullOrEmpty(outFileClause.getSuccessFileName())) { + outfileWriteSuccess(outFileClause); } sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES); } @@ -1482,6 +1493,31 @@ private void outfileWriteSuccess(OutFileClause outFileClause) throws Exception { } } + private void deleteExistingOutfileFilesInFe(OutFileClause outFileClause) throws UserException { + // Handle directory cleanup once in FE so parallel outfile writers never race on deletion. + if (!outFileClause.shouldDeleteExistingFiles()) { + return; + } + Preconditions.checkState(outFileClause.getBrokerDesc() != null, + "delete_existing_files requires a remote outfile sink"); + Preconditions.checkState(outFileClause.getBrokerDesc().storageType() != StorageType.LOCAL, + "delete_existing_files is not supported for local outfile sinks"); + BrokerUtil.deleteParentDirectoryWithFileSystem(outFileClause.getFilePath(), outFileClause.getBrokerDesc()); + clearDeleteExistingFilesInPlan(); + } + + private void clearDeleteExistingFilesInPlan() { + ResultFileSink resultFileSink = null; + for (PlanFragment fragment : planner.getFragments()) { + if (fragment.getSink() instanceof ResultFileSink) { + Preconditions.checkState(resultFileSink == null, "OUTFILE query should have only one ResultFileSink"); + resultFileSink = (ResultFileSink) fragment.getSink(); + } + } + Preconditions.checkState(resultFileSink != null, "OUTFILE query must have ResultFileSink"); + resultFileSink.setDeleteExistingFiles(false); + } + public static void syncLoadForTablets(List> backendsList, List allTabletIds) { backendsList.forEach(backends -> backends.forEach(backend -> { if (backend.isAlive()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java index 05af66313b1b6a..3e3f4ba9d49086 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java @@ -24,6 +24,9 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlSerializer; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.Planner; +import org.apache.doris.planner.ResultFileSink; import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData; import org.apache.doris.qe.ConnectContext.ConnectType; import org.apache.doris.utframe.TestWithFeService; @@ -35,6 +38,8 @@ import org.mockito.Mockito; import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -276,4 +281,24 @@ public void testSendBinaryResultRow() throws IOException { StmtExecutor executor = new StmtExecutor(mockCtx, stmt, false); executor.sendBinaryResultRow(resultSet); } + + @Test + public void testClearDeleteExistingFilesInPlan() throws Exception { + Planner planner = Mockito.mock(Planner.class); + PlanFragment fragment = Mockito.mock(PlanFragment.class); + ResultFileSink resultFileSink = Mockito.mock(ResultFileSink.class); + Mockito.when(fragment.getSink()).thenReturn(resultFileSink); + Mockito.when(planner.getFragments()).thenReturn(Lists.newArrayList(fragment)); + + StmtExecutor executor = new StmtExecutor(connectContext, ""); + Field plannerField = StmtExecutor.class.getDeclaredField("planner"); + plannerField.setAccessible(true); + plannerField.set(executor, planner); + + Method clearMethod = StmtExecutor.class.getDeclaredMethod("clearDeleteExistingFilesInPlan"); + clearMethod.setAccessible(true); + clearMethod.invoke(executor); + + Mockito.verify(resultFileSink).setDeleteExistingFiles(false); + } } diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 8add59d47af770..842765273e057d 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -132,7 +132,7 @@ struct TResultFileSinkOptions { 14: optional TParquetVersion parquet_version 15: optional string orc_schema - 16: optional bool delete_existing_files; + 16: optional bool delete_existing_files; // deprecated: FE now handles outfile cleanup and clears this flag before BE execution; kept for compatibility with older FE 17: optional string file_suffix; 18: optional bool with_bom; @@ -480,7 +480,7 @@ struct TTVFTableSink { 7: optional string column_separator 8: optional string line_delimiter 9: optional i64 max_file_size_bytes - 10: optional bool delete_existing_files + 10: optional bool delete_existing_files // deprecated: FE handles TVF cleanup before execution and always sends false 11: optional map hadoop_config 12: optional PlanNodes.TFileCompressType compression_type 13: optional i64 backend_id // local TVF: specify BE diff --git a/regression-test/suites/export_p0/test_outfile.groovy b/regression-test/suites/export_p0/test_outfile.groovy index f33500f5883847..35c5e0b681b115 100644 --- a/regression-test/suites/export_p0/test_outfile.groovy +++ b/regression-test/suites/export_p0/test_outfile.groovy @@ -44,11 +44,15 @@ suite("test_outfile") { assertEquals(response.msg, "success") def configJson = response.data.rows boolean enableOutfileToLocal = false + boolean enableDeleteExistingFiles = false for (Object conf: configJson) { assert conf instanceof Map if (((Map) conf).get("Name").toLowerCase() == "enable_outfile_to_local") { enableOutfileToLocal = ((Map) conf).get("Value").toLowerCase() == "true" } + if (((Map) conf).get("Name").toLowerCase() == "enable_delete_existing_files") { + enableDeleteExistingFiles = ((Map) conf).get("Value").toLowerCase() == "true" + } } if (!enableOutfileToLocal) { logger.warn("Please set enable_outfile_to_local to true to run test_outfile") @@ -233,4 +237,14 @@ suite("test_outfile") { path.delete(); } } + + if (enableDeleteExistingFiles) { + test { + sql """ + SELECT 1 INTO OUTFILE "file://${outFile}/test_outfile_delete_existing_files_${uuid}/" + PROPERTIES("delete_existing_files" = "true"); + """ + exception "Local file system does not support delete existing files" + } + } } diff --git a/regression-test/suites/export_p0/test_outfile_parallel_delete_existing_files.groovy b/regression-test/suites/export_p0/test_outfile_parallel_delete_existing_files.groovy new file mode 100644 index 00000000000000..655146598a9002 --- /dev/null +++ b/regression-test/suites/export_p0/test_outfile_parallel_delete_existing_files.groovy @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_outfile_parallel_delete_existing_files", "p0") { + StringBuilder strBuilder = new StringBuilder() + strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) + if ((context.config.otherConfigs.get("enableTLS")?.toString()?.equalsIgnoreCase("true")) ?: false) { + strBuilder.append(" https://" + context.config.feHttpAddress + "/rest/v1/config/fe") + strBuilder.append(" --cert " + context.config.otherConfigs.get("trustCert") + + " --cacert " + context.config.otherConfigs.get("trustCACert") + + " --key " + context.config.otherConfigs.get("trustCAKey")) + } else { + strBuilder.append(" http://" + context.config.feHttpAddress + "/rest/v1/config/fe") + } + + def process = strBuilder.toString().execute() + def code = process.waitFor() + def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))) + def out = process.getText() + logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def response = parseJson(out.trim()) + assertEquals(response.code, 0) + assertEquals(response.msg, "success") + boolean enableDeleteExistingFiles = false + for (Object conf : response.data.rows) { + assert conf instanceof Map + if (((Map) conf).get("Name").toLowerCase() == "enable_delete_existing_files") { + enableDeleteExistingFiles = ((Map) conf).get("Value").toLowerCase() == "true" + } + } + if (!enableDeleteExistingFiles) { + logger.warn("Please set enable_delete_existing_files to true to run test_outfile_parallel_delete_existing_files") + return + } + + String ak = getS3AK() + String sk = getS3SK() + String s3Endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = context.config.otherConfigs.get("s3BucketName") + String provider = getS3Provider() + String tableName = "test_outfile_parallel_delete_existing_files" + String uuid = UUID.randomUUID().toString() + String outFilePath = "${bucket}/outfile/parallel_delete_existing_files/${uuid}/exp_" + + def exportToS3 = { String filterSql, boolean deleteExistingFiles -> + String deleteProperty = deleteExistingFiles ? "\"delete_existing_files\" = \"true\"," : "" + sql """ + SELECT * FROM ${tableName} ${filterSql} + INTO OUTFILE "s3://${outFilePath}" + FORMAT AS csv + PROPERTIES ( + ${deleteProperty} + "column_separator" = ",", + "s3.endpoint" = "${s3Endpoint}", + "s3.region" = "${region}", + "s3.secret_key" = "${sk}", + "s3.access_key" = "${ak}", + "provider" = "${provider}" + ); + """ + } + + try { + sql """ set enable_parallel_outfile = true """ + sql """ set parallel_pipeline_task_num = 8 """ + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + id INT NOT NULL, + name STRING NOT NULL, + score INT NOT NULL + ) + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 16 + PROPERTIES("replication_num" = "1"); + """ + + sql """ + INSERT INTO ${tableName} + SELECT + number AS id, + concat('name_', cast(number AS string)) AS name, + cast(number % 97 AS int) AS score + FROM numbers("number" = "20000"); + """ + + def expected = sql """ SELECT count(*), sum(id), sum(score) FROM ${tableName}; """ + + exportToS3("WHERE id < 5000", false) + exportToS3("", true) + + def actual = sql """ + SELECT count(*), sum(id), sum(score) FROM S3( + "uri" = "s3://${outFilePath}*", + "s3.endpoint" = "${s3Endpoint}", + "s3.region" = "${region}", + "s3.secret_key" = "${sk}", + "s3.access_key" = "${ak}", + "provider" = "${provider}", + "format" = "csv", + "column_separator" = ",", + "csv_schema" = "id:int;name:string;score:int" + ); + """ + + assertEquals(expected[0][0], actual[0][0]) + assertEquals(expected[0][1], actual[0][1]) + assertEquals(expected[0][2], actual[0][2]) + } finally { + try_sql(""" set enable_parallel_outfile = false """) + } +}