From afbbb261c52ef3a11dddda8de9f732d31b6cafc3 Mon Sep 17 00:00:00 2001 From: Socrates Date: Mon, 23 Mar 2026 10:40:16 +0800 Subject: [PATCH] [fix](outfile) handle delete_existing_files before parallel export (#61223) Issue Number: N/A Related PR: https://github.com/apache/doris/pull/38400 Problem Summary: When `select ... into outfile` uses `delete_existing_files=true`, parallel outfile writers can race on directory cleanup and delete files uploaded by other writers. This PR follows the same FE-side cleanup pattern used by export in #38400: remote outfile cleanup is executed once in FE before query execution, and the delete flag is cleared before sink options are sent to BE. This PR also aligns local outfile behavior with export: `file:///` does not support `delete_existing_files=true`, so FE rejects that combination during analysis instead of relying on BE-side cleanup. To reduce duplicated logic, the FE-side parent-directory cleanup used by export/outfile/TVF is refactored into shared `BrokerUtil` helpers. (cherry picked from commit 15766536326d9b47828a0894f6e2df8ca1dd612f) --- be/src/exec/operator/result_sink_operator.h | 4 + .../exec/sink/writer/vfile_result_writer.cpp | 4 +- .../apache/doris/analysis/OutFileClause.java | 10 +- .../apache/doris/common/util/BrokerUtil.java | 12 ++ .../java/org/apache/doris/load/ExportMgr.java | 5 +- .../commands/insert/InsertIntoTVFCommand.java | 11 +- .../apache/doris/planner/ResultFileSink.java | 5 + .../org/apache/doris/qe/StmtExecutor.java | 40 +++++- .../org/apache/doris/qe/StmtExecutorTest.java | 25 ++++ gensrc/thrift/DataSinks.thrift | 4 +- .../suites/export_p0/test_outfile.groovy | 14 ++ ...file_parallel_delete_existing_files.groovy | 131 ++++++++++++++++++ 12 files changed, 244 insertions(+), 21 deletions(-) create mode 100644 regression-test/suites/export_p0/test_outfile_parallel_delete_existing_files.groovy diff --git a/be/src/exec/operator/result_sink_operator.h b/be/src/exec/operator/result_sink_operator.h index 752e6a367c9678..7fdbca0b8dc549 100644 --- a/be/src/exec/operator/result_sink_operator.h +++ b/be/src/exec/operator/result_sink_operator.h @@ -57,6 +57,9 @@ struct ResultFileOptions { // TODO: we should merge parquet_commpression_type/orc_compression_type/compression_type TFileCompressType::type compression_type = TFileCompressType::PLAIN; + // Deprecated compatibility flag. New FE handles outfile delete_existing_files in FE + // and clears this field before sending the result sink to BE. Keep reading it here + // only for compatibility with older FE during rolling upgrade. bool delete_existing_files = false; std::string file_suffix; //Bring BOM when exporting to CSV format @@ -70,6 +73,7 @@ struct ResultFileOptions { line_delimiter = t_opt.__isset.line_delimiter ? t_opt.line_delimiter : "\n"; max_file_size_bytes = t_opt.__isset.max_file_size_bytes ? t_opt.max_file_size_bytes : max_file_size_bytes; + // Deprecated compatibility path. New FE should already have cleared this flag. delete_existing_files = t_opt.__isset.delete_existing_files ? t_opt.delete_existing_files : false; file_suffix = t_opt.file_suffix; diff --git a/be/src/exec/sink/writer/vfile_result_writer.cpp b/be/src/exec/sink/writer/vfile_result_writer.cpp index d7e4149c551d7b..c198eaa7e21f4e 100644 --- a/be/src/exec/sink/writer/vfile_result_writer.cpp +++ b/be/src/exec/sink/writer/vfile_result_writer.cpp @@ -96,7 +96,9 @@ Status VFileResultWriter::open(RuntimeState* state, RuntimeProfile* profile) { _file_opts->orc_writer_version < 1) { return Status::InternalError("orc writer version is less than 1."); } - // Delete existing files + // Deprecated compatibility path. New FE already deletes the target directory in FE + // and clears delete_existing_files before BE execution. Keep this branch only for + // requests from older FE versions during rolling upgrade. if (_file_opts->delete_existing_files) { RETURN_IF_ERROR(_delete_dir()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index cc7ecf52c4e6a7..c0fc647fd43f18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -187,6 +187,10 @@ public long getMaxFileSizeBytes() { return maxFileSizeBytes; } + public boolean shouldDeleteExistingFiles() { + return deleteExistingFiles; + } + public BrokerDesc getBrokerDesc() { return brokerDesc; } @@ -454,7 +458,6 @@ private void checkOrcType(String orcType, String expectType, boolean isEqual, St + ", should use " + expectType + ", but the definition type is " + orcType); } - private void analyzeForParquetFormat(List resultExprs, List colLabels) throws AnalysisException { if (this.parquetSchemas.isEmpty()) { genParquetColumnName(resultExprs, colLabels); @@ -540,6 +543,9 @@ private void analyzeProperties() throws UserException { + " To enable this feature, you need to add `enable_delete_existing_files=true`" + " in fe.conf"); } + if (deleteExistingFiles && isLocalOutput) { + throw new AnalysisException("Local file system does not support delete existing files"); + } copiedProps.remove(PROP_DELETE_EXISTING_FILES); } @@ -769,5 +775,3 @@ public TResultFileSinkOptions toSinkOptions() { return sinkOptions; } } - - diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index e5b7c1a7f45376..e44d1d9ea4f859 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -119,6 +119,18 @@ public static void deleteDirectoryWithFileSystem(String path, BrokerDesc brokerD } } + public static void deleteParentDirectoryWithFileSystem(String path, BrokerDesc brokerDesc) throws UserException { + deleteDirectoryWithFileSystem(extractParentDirectory(path), brokerDesc); + } + + public static String extractParentDirectory(String path) { + int lastSlash = path.lastIndexOf('/'); + if (lastSlash >= 0) { + return path.substring(0, lastSlash + 1); + } + return path; + } + public static String printBroker(String brokerName, TNetworkAddress address) { return brokerName + "[" + address.toString() + "]"; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index 8de5a1c95c6359..c2b1be2a230be0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -114,9 +114,7 @@ public void addExportJobAndRegisterTask(ExportJob job) throws Exception { try { // delete existing files if (Boolean.parseBoolean(job.getDeleteExistingFiles())) { - String fullPath = job.getExportPath(); - BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, fullPath.lastIndexOf('/') + 1), - job.getBrokerDesc()); + BrokerUtil.deleteParentDirectoryWithFileSystem(job.getExportPath(), job.getBrokerDesc()); } // ATTN: Must add task after edit log, otherwise the job may finish before adding job. for (int i = 0; i < job.getCopiedTaskExecutors().size(); i++) { @@ -554,4 +552,3 @@ public long getJobNum(ExportJobState state) { return size; } } - diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java index 4847cd028fa3bb..fae7747b5d42f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java @@ -23,6 +23,7 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.Status; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.datasource.property.storage.StorageProperties; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.fs.remote.RemoteFileSystem; @@ -176,7 +177,7 @@ private void deleteExistingFilesInFE(String tvfName, Map props) throws Exception { String filePath = props.get("file_path"); // Extract parent directory from prefix path: s3://bucket/path/to/prefix_ -> s3://bucket/path/to/ - String parentDir = extractParentDirectory(filePath); + String parentDir = BrokerUtil.extractParentDirectory(filePath); LOG.info("TVF sink: deleting existing files in directory: {}", parentDir); // Copy props for building StorageProperties (exclude write-specific params) @@ -198,12 +199,4 @@ private void deleteExistingFilesInFE(String tvfName, Map props) + parentDir + ": " + deleteStatus.getErrMsg()); } } - - private static String extractParentDirectory(String prefixPath) { - int lastSlash = prefixPath.lastIndexOf('/'); - if (lastSlash >= 0) { - return prefixPath.substring(0, lastSlash + 1); - } - return prefixPath; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java index 0ba8311097a313..520e34e27c287b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java @@ -88,6 +88,11 @@ public void setBrokerAddr(String ip, int port) { fileSinkOptions.setBrokerAddresses(Lists.newArrayList(new TNetworkAddress(ip, port))); } + public void setDeleteExistingFiles(boolean deleteExistingFiles) { + Preconditions.checkNotNull(fileSinkOptions); + fileSinkOptions.setDeleteExistingFiles(deleteExistingFiles); + } + public void resetByDataStreamSink(DataStreamSink dataStreamSink) { exchNodeId = dataStreamSink.getExchNodeId(); outputPartition = dataStreamSink.getOutputPartition(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 1a7d4df0c49b0e..b39d5b0d62bbc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -54,6 +54,7 @@ import org.apache.doris.common.profile.ProfileManager.ProfileType; import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder; +import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DebugPointUtil.DebugPoint; import org.apache.doris.common.util.DebugUtil; @@ -105,8 +106,10 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache; import org.apache.doris.planner.GroupCommitScanNode; import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanNode; import org.apache.doris.planner.Planner; +import org.apache.doris.planner.ResultFileSink; import org.apache.doris.planner.ScanNode; import org.apache.doris.proto.Data; import org.apache.doris.proto.InternalService; @@ -1298,8 +1301,16 @@ public void executeAndSendResult(boolean isOutfileQuery, boolean isSendFields, } coordBase.setIsProfileSafeStmt(this.isProfileSafeStmt()); + OutFileClause outFileClause = null; + if (isOutfileQuery) { + outFileClause = queryStmt.getOutFileClause(); + Preconditions.checkState(outFileClause != null, "OUTFILE query must have OutFileClause"); + } try { + if (outFileClause != null) { + deleteExistingOutfileFilesInFe(outFileClause); + } coordBase.exec(); profile.getSummaryProfile().setQueryScheduleFinishTime(TimeUtils.getStartTimeMs()); updateProfile(false); @@ -1339,8 +1350,8 @@ public void executeAndSendResult(boolean isOutfileQuery, boolean isSendFields, sendFields(queryStmt.getColLabels(), queryStmt.getFieldInfos(), getReturnTypes(queryStmt)); } else { - if (!Strings.isNullOrEmpty(queryStmt.getOutFileClause().getSuccessFileName())) { - outfileWriteSuccess(queryStmt.getOutFileClause()); + if (!Strings.isNullOrEmpty(outFileClause.getSuccessFileName())) { + outfileWriteSuccess(outFileClause); } sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES); } @@ -1482,6 +1493,31 @@ private void outfileWriteSuccess(OutFileClause outFileClause) throws Exception { } } + private void deleteExistingOutfileFilesInFe(OutFileClause outFileClause) throws UserException { + // Handle directory cleanup once in FE so parallel outfile writers never race on deletion. + if (!outFileClause.shouldDeleteExistingFiles()) { + return; + } + Preconditions.checkState(outFileClause.getBrokerDesc() != null, + "delete_existing_files requires a remote outfile sink"); + Preconditions.checkState(outFileClause.getBrokerDesc().storageType() != StorageType.LOCAL, + "delete_existing_files is not supported for local outfile sinks"); + BrokerUtil.deleteParentDirectoryWithFileSystem(outFileClause.getFilePath(), outFileClause.getBrokerDesc()); + clearDeleteExistingFilesInPlan(); + } + + private void clearDeleteExistingFilesInPlan() { + ResultFileSink resultFileSink = null; + for (PlanFragment fragment : planner.getFragments()) { + if (fragment.getSink() instanceof ResultFileSink) { + Preconditions.checkState(resultFileSink == null, "OUTFILE query should have only one ResultFileSink"); + resultFileSink = (ResultFileSink) fragment.getSink(); + } + } + Preconditions.checkState(resultFileSink != null, "OUTFILE query must have ResultFileSink"); + resultFileSink.setDeleteExistingFiles(false); + } + public static void syncLoadForTablets(List> backendsList, List allTabletIds) { backendsList.forEach(backends -> backends.forEach(backend -> { if (backend.isAlive()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java index 05af66313b1b6a..3e3f4ba9d49086 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java @@ -24,6 +24,9 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlSerializer; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.Planner; +import org.apache.doris.planner.ResultFileSink; import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData; import org.apache.doris.qe.ConnectContext.ConnectType; import org.apache.doris.utframe.TestWithFeService; @@ -35,6 +38,8 @@ import org.mockito.Mockito; import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -276,4 +281,24 @@ public void testSendBinaryResultRow() throws IOException { StmtExecutor executor = new StmtExecutor(mockCtx, stmt, false); executor.sendBinaryResultRow(resultSet); } + + @Test + public void testClearDeleteExistingFilesInPlan() throws Exception { + Planner planner = Mockito.mock(Planner.class); + PlanFragment fragment = Mockito.mock(PlanFragment.class); + ResultFileSink resultFileSink = Mockito.mock(ResultFileSink.class); + Mockito.when(fragment.getSink()).thenReturn(resultFileSink); + Mockito.when(planner.getFragments()).thenReturn(Lists.newArrayList(fragment)); + + StmtExecutor executor = new StmtExecutor(connectContext, ""); + Field plannerField = StmtExecutor.class.getDeclaredField("planner"); + plannerField.setAccessible(true); + plannerField.set(executor, planner); + + Method clearMethod = StmtExecutor.class.getDeclaredMethod("clearDeleteExistingFilesInPlan"); + clearMethod.setAccessible(true); + clearMethod.invoke(executor); + + Mockito.verify(resultFileSink).setDeleteExistingFiles(false); + } } diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 8add59d47af770..842765273e057d 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -132,7 +132,7 @@ struct TResultFileSinkOptions { 14: optional TParquetVersion parquet_version 15: optional string orc_schema - 16: optional bool delete_existing_files; + 16: optional bool delete_existing_files; // deprecated: FE now handles outfile cleanup and clears this flag before BE execution; kept for compatibility with older FE 17: optional string file_suffix; 18: optional bool with_bom; @@ -480,7 +480,7 @@ struct TTVFTableSink { 7: optional string column_separator 8: optional string line_delimiter 9: optional i64 max_file_size_bytes - 10: optional bool delete_existing_files + 10: optional bool delete_existing_files // deprecated: FE handles TVF cleanup before execution and always sends false 11: optional map hadoop_config 12: optional PlanNodes.TFileCompressType compression_type 13: optional i64 backend_id // local TVF: specify BE diff --git a/regression-test/suites/export_p0/test_outfile.groovy b/regression-test/suites/export_p0/test_outfile.groovy index f33500f5883847..35c5e0b681b115 100644 --- a/regression-test/suites/export_p0/test_outfile.groovy +++ b/regression-test/suites/export_p0/test_outfile.groovy @@ -44,11 +44,15 @@ suite("test_outfile") { assertEquals(response.msg, "success") def configJson = response.data.rows boolean enableOutfileToLocal = false + boolean enableDeleteExistingFiles = false for (Object conf: configJson) { assert conf instanceof Map if (((Map) conf).get("Name").toLowerCase() == "enable_outfile_to_local") { enableOutfileToLocal = ((Map) conf).get("Value").toLowerCase() == "true" } + if (((Map) conf).get("Name").toLowerCase() == "enable_delete_existing_files") { + enableDeleteExistingFiles = ((Map) conf).get("Value").toLowerCase() == "true" + } } if (!enableOutfileToLocal) { logger.warn("Please set enable_outfile_to_local to true to run test_outfile") @@ -233,4 +237,14 @@ suite("test_outfile") { path.delete(); } } + + if (enableDeleteExistingFiles) { + test { + sql """ + SELECT 1 INTO OUTFILE "file://${outFile}/test_outfile_delete_existing_files_${uuid}/" + PROPERTIES("delete_existing_files" = "true"); + """ + exception "Local file system does not support delete existing files" + } + } } diff --git a/regression-test/suites/export_p0/test_outfile_parallel_delete_existing_files.groovy b/regression-test/suites/export_p0/test_outfile_parallel_delete_existing_files.groovy new file mode 100644 index 00000000000000..655146598a9002 --- /dev/null +++ b/regression-test/suites/export_p0/test_outfile_parallel_delete_existing_files.groovy @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_outfile_parallel_delete_existing_files", "p0") { + StringBuilder strBuilder = new StringBuilder() + strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) + if ((context.config.otherConfigs.get("enableTLS")?.toString()?.equalsIgnoreCase("true")) ?: false) { + strBuilder.append(" https://" + context.config.feHttpAddress + "/rest/v1/config/fe") + strBuilder.append(" --cert " + context.config.otherConfigs.get("trustCert") + + " --cacert " + context.config.otherConfigs.get("trustCACert") + + " --key " + context.config.otherConfigs.get("trustCAKey")) + } else { + strBuilder.append(" http://" + context.config.feHttpAddress + "/rest/v1/config/fe") + } + + def process = strBuilder.toString().execute() + def code = process.waitFor() + def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))) + def out = process.getText() + logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def response = parseJson(out.trim()) + assertEquals(response.code, 0) + assertEquals(response.msg, "success") + boolean enableDeleteExistingFiles = false + for (Object conf : response.data.rows) { + assert conf instanceof Map + if (((Map) conf).get("Name").toLowerCase() == "enable_delete_existing_files") { + enableDeleteExistingFiles = ((Map) conf).get("Value").toLowerCase() == "true" + } + } + if (!enableDeleteExistingFiles) { + logger.warn("Please set enable_delete_existing_files to true to run test_outfile_parallel_delete_existing_files") + return + } + + String ak = getS3AK() + String sk = getS3SK() + String s3Endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = context.config.otherConfigs.get("s3BucketName") + String provider = getS3Provider() + String tableName = "test_outfile_parallel_delete_existing_files" + String uuid = UUID.randomUUID().toString() + String outFilePath = "${bucket}/outfile/parallel_delete_existing_files/${uuid}/exp_" + + def exportToS3 = { String filterSql, boolean deleteExistingFiles -> + String deleteProperty = deleteExistingFiles ? "\"delete_existing_files\" = \"true\"," : "" + sql """ + SELECT * FROM ${tableName} ${filterSql} + INTO OUTFILE "s3://${outFilePath}" + FORMAT AS csv + PROPERTIES ( + ${deleteProperty} + "column_separator" = ",", + "s3.endpoint" = "${s3Endpoint}", + "s3.region" = "${region}", + "s3.secret_key" = "${sk}", + "s3.access_key" = "${ak}", + "provider" = "${provider}" + ); + """ + } + + try { + sql """ set enable_parallel_outfile = true """ + sql """ set parallel_pipeline_task_num = 8 """ + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + id INT NOT NULL, + name STRING NOT NULL, + score INT NOT NULL + ) + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 16 + PROPERTIES("replication_num" = "1"); + """ + + sql """ + INSERT INTO ${tableName} + SELECT + number AS id, + concat('name_', cast(number AS string)) AS name, + cast(number % 97 AS int) AS score + FROM numbers("number" = "20000"); + """ + + def expected = sql """ SELECT count(*), sum(id), sum(score) FROM ${tableName}; """ + + exportToS3("WHERE id < 5000", false) + exportToS3("", true) + + def actual = sql """ + SELECT count(*), sum(id), sum(score) FROM S3( + "uri" = "s3://${outFilePath}*", + "s3.endpoint" = "${s3Endpoint}", + "s3.region" = "${region}", + "s3.secret_key" = "${sk}", + "s3.access_key" = "${ak}", + "provider" = "${provider}", + "format" = "csv", + "column_separator" = ",", + "csv_schema" = "id:int;name:string;score:int" + ); + """ + + assertEquals(expected[0][0], actual[0][0]) + assertEquals(expected[0][1], actual[0][1]) + assertEquals(expected[0][2], actual[0][2]) + } finally { + try_sql(""" set enable_parallel_outfile = false """) + } +}