Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/exec/operator/result_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ struct ResultFileOptions {
// TODO: we should merge parquet_commpression_type/orc_compression_type/compression_type
TFileCompressType::type compression_type = TFileCompressType::PLAIN;

// Deprecated compatibility flag. New FE handles outfile delete_existing_files in FE
// and clears this field before sending the result sink to BE. Keep reading it here
// only for compatibility with older FE during rolling upgrade.
bool delete_existing_files = false;
std::string file_suffix;
//Bring BOM when exporting to CSV format
Expand All @@ -70,6 +73,7 @@ struct ResultFileOptions {
line_delimiter = t_opt.__isset.line_delimiter ? t_opt.line_delimiter : "\n";
max_file_size_bytes =
t_opt.__isset.max_file_size_bytes ? t_opt.max_file_size_bytes : max_file_size_bytes;
// Deprecated compatibility path. New FE should already have cleared this flag.
delete_existing_files =
t_opt.__isset.delete_existing_files ? t_opt.delete_existing_files : false;
file_suffix = t_opt.file_suffix;
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/sink/writer/vfile_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ Status VFileResultWriter::open(RuntimeState* state, RuntimeProfile* profile) {
_file_opts->orc_writer_version < 1) {
return Status::InternalError("orc writer version is less than 1.");
}
// Delete existing files
// Deprecated compatibility path. New FE already deletes the target directory in FE
// and clears delete_existing_files before BE execution. Keep this branch only for
// requests from older FE versions during rolling upgrade.
if (_file_opts->delete_existing_files) {
RETURN_IF_ERROR(_delete_dir());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ public long getMaxFileSizeBytes() {
return maxFileSizeBytes;
}

public boolean shouldDeleteExistingFiles() {
return deleteExistingFiles;
}

public BrokerDesc getBrokerDesc() {
return brokerDesc;
}
Expand Down Expand Up @@ -454,7 +458,6 @@ private void checkOrcType(String orcType, String expectType, boolean isEqual, St
+ ", should use " + expectType + ", but the definition type is " + orcType);
}


private void analyzeForParquetFormat(List<Expr> resultExprs, List<String> colLabels) throws AnalysisException {
if (this.parquetSchemas.isEmpty()) {
genParquetColumnName(resultExprs, colLabels);
Expand Down Expand Up @@ -540,6 +543,9 @@ private void analyzeProperties() throws UserException {
+ " To enable this feature, you need to add `enable_delete_existing_files=true`"
+ " in fe.conf");
}
if (deleteExistingFiles && isLocalOutput) {
throw new AnalysisException("Local file system does not support delete existing files");
}
copiedProps.remove(PROP_DELETE_EXISTING_FILES);
}

Expand Down Expand Up @@ -769,5 +775,3 @@ public TResultFileSinkOptions toSinkOptions() {
return sinkOptions;
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,18 @@ public static void deleteDirectoryWithFileSystem(String path, BrokerDesc brokerD
}
}

public static void deleteParentDirectoryWithFileSystem(String path, BrokerDesc brokerDesc) throws UserException {
deleteDirectoryWithFileSystem(extractParentDirectory(path), brokerDesc);
}

public static String extractParentDirectory(String path) {
int lastSlash = path.lastIndexOf('/');
if (lastSlash >= 0) {
return path.substring(0, lastSlash + 1);
}
return path;
}

public static String printBroker(String brokerName, TNetworkAddress address) {
return brokerName + "[" + address.toString() + "]";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ public void addExportJobAndRegisterTask(ExportJob job) throws Exception {
try {
// delete existing files
if (Boolean.parseBoolean(job.getDeleteExistingFiles())) {
String fullPath = job.getExportPath();
BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, fullPath.lastIndexOf('/') + 1),
job.getBrokerDesc());
BrokerUtil.deleteParentDirectoryWithFileSystem(job.getExportPath(), job.getBrokerDesc());
}
// ATTN: Must add task after edit log, otherwise the job may finish before adding job.
for (int i = 0; i < job.getCopiedTaskExecutors().size(); i++) {
Expand Down Expand Up @@ -554,4 +552,3 @@ public long getJobNum(ExportJobState state) {
return size;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.RemoteFileSystem;
Expand Down Expand Up @@ -176,7 +177,7 @@ private void deleteExistingFilesInFE(String tvfName, Map<String, String> props)
throws Exception {
String filePath = props.get("file_path");
// Extract parent directory from prefix path: s3://bucket/path/to/prefix_ -> s3://bucket/path/to/
String parentDir = extractParentDirectory(filePath);
String parentDir = BrokerUtil.extractParentDirectory(filePath);
LOG.info("TVF sink: deleting existing files in directory: {}", parentDir);

// Copy props for building StorageProperties (exclude write-specific params)
Expand All @@ -198,12 +199,4 @@ private void deleteExistingFilesInFE(String tvfName, Map<String, String> props)
+ parentDir + ": " + deleteStatus.getErrMsg());
}
}

private static String extractParentDirectory(String prefixPath) {
int lastSlash = prefixPath.lastIndexOf('/');
if (lastSlash >= 0) {
return prefixPath.substring(0, lastSlash + 1);
}
return prefixPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public void setBrokerAddr(String ip, int port) {
fileSinkOptions.setBrokerAddresses(Lists.newArrayList(new TNetworkAddress(ip, port)));
}

public void setDeleteExistingFiles(boolean deleteExistingFiles) {
Preconditions.checkNotNull(fileSinkOptions);
fileSinkOptions.setDeleteExistingFiles(deleteExistingFiles);
}

public void resetByDataStreamSink(DataStreamSink dataStreamSink) {
exchNodeId = dataStreamSink.getExchNodeId();
outputPartition = dataStreamSink.getOutputPartition();
Expand Down
40 changes: 38 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.doris.common.profile.ProfileManager.ProfileType;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
import org.apache.doris.common.util.DebugUtil;
Expand Down Expand Up @@ -105,8 +106,10 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
import org.apache.doris.planner.GroupCommitScanNode;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ResultFileSink;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.proto.Data;
import org.apache.doris.proto.InternalService;
Expand Down Expand Up @@ -1298,8 +1301,16 @@ public void executeAndSendResult(boolean isOutfileQuery, boolean isSendFields,
}

coordBase.setIsProfileSafeStmt(this.isProfileSafeStmt());
OutFileClause outFileClause = null;
if (isOutfileQuery) {
outFileClause = queryStmt.getOutFileClause();
Preconditions.checkState(outFileClause != null, "OUTFILE query must have OutFileClause");
}

try {
if (outFileClause != null) {
deleteExistingOutfileFilesInFe(outFileClause);
}
coordBase.exec();
profile.getSummaryProfile().setQueryScheduleFinishTime(TimeUtils.getStartTimeMs());
updateProfile(false);
Expand Down Expand Up @@ -1339,8 +1350,8 @@ public void executeAndSendResult(boolean isOutfileQuery, boolean isSendFields,
sendFields(queryStmt.getColLabels(), queryStmt.getFieldInfos(),
getReturnTypes(queryStmt));
} else {
if (!Strings.isNullOrEmpty(queryStmt.getOutFileClause().getSuccessFileName())) {
outfileWriteSuccess(queryStmt.getOutFileClause());
if (!Strings.isNullOrEmpty(outFileClause.getSuccessFileName())) {
outfileWriteSuccess(outFileClause);
}
sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES);
}
Expand Down Expand Up @@ -1482,6 +1493,31 @@ private void outfileWriteSuccess(OutFileClause outFileClause) throws Exception {
}
}

private void deleteExistingOutfileFilesInFe(OutFileClause outFileClause) throws UserException {
// Handle directory cleanup once in FE so parallel outfile writers never race on deletion.
if (!outFileClause.shouldDeleteExistingFiles()) {
return;
}
Preconditions.checkState(outFileClause.getBrokerDesc() != null,
"delete_existing_files requires a remote outfile sink");
Preconditions.checkState(outFileClause.getBrokerDesc().storageType() != StorageType.LOCAL,
"delete_existing_files is not supported for local outfile sinks");
BrokerUtil.deleteParentDirectoryWithFileSystem(outFileClause.getFilePath(), outFileClause.getBrokerDesc());
clearDeleteExistingFilesInPlan();
}

private void clearDeleteExistingFilesInPlan() {
ResultFileSink resultFileSink = null;
for (PlanFragment fragment : planner.getFragments()) {
if (fragment.getSink() instanceof ResultFileSink) {
Preconditions.checkState(resultFileSink == null, "OUTFILE query should have only one ResultFileSink");
resultFileSink = (ResultFileSink) fragment.getSink();
}
}
Preconditions.checkState(resultFileSink != null, "OUTFILE query must have ResultFileSink");
resultFileSink.setDeleteExistingFiles(false);
}

public static void syncLoadForTablets(List<List<Backend>> backendsList, List<Long> allTabletIds) {
backendsList.forEach(backends -> backends.forEach(backend -> {
if (backend.isAlive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.apache.doris.common.FeConstants;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ResultFileSink;
import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.utframe.TestWithFeService;
Expand All @@ -35,6 +38,8 @@
import org.mockito.Mockito;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -276,4 +281,24 @@ public void testSendBinaryResultRow() throws IOException {
StmtExecutor executor = new StmtExecutor(mockCtx, stmt, false);
executor.sendBinaryResultRow(resultSet);
}

@Test
public void testClearDeleteExistingFilesInPlan() throws Exception {
Planner planner = Mockito.mock(Planner.class);
PlanFragment fragment = Mockito.mock(PlanFragment.class);
ResultFileSink resultFileSink = Mockito.mock(ResultFileSink.class);
Mockito.when(fragment.getSink()).thenReturn(resultFileSink);
Mockito.when(planner.getFragments()).thenReturn(Lists.newArrayList(fragment));

StmtExecutor executor = new StmtExecutor(connectContext, "");
Field plannerField = StmtExecutor.class.getDeclaredField("planner");
plannerField.setAccessible(true);
plannerField.set(executor, planner);

Method clearMethod = StmtExecutor.class.getDeclaredMethod("clearDeleteExistingFilesInPlan");
clearMethod.setAccessible(true);
clearMethod.invoke(executor);

Mockito.verify(resultFileSink).setDeleteExistingFiles(false);
}
}
4 changes: 2 additions & 2 deletions gensrc/thrift/DataSinks.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ struct TResultFileSinkOptions {
14: optional TParquetVersion parquet_version
15: optional string orc_schema

16: optional bool delete_existing_files;
16: optional bool delete_existing_files; // deprecated: FE now handles outfile cleanup and clears this flag before BE execution; kept for compatibility with older FE
17: optional string file_suffix;
18: optional bool with_bom;

Expand Down Expand Up @@ -480,7 +480,7 @@ struct TTVFTableSink {
7: optional string column_separator
8: optional string line_delimiter
9: optional i64 max_file_size_bytes
10: optional bool delete_existing_files
10: optional bool delete_existing_files // deprecated: FE handles TVF cleanup before execution and always sends false
11: optional map<string, string> hadoop_config
12: optional PlanNodes.TFileCompressType compression_type
13: optional i64 backend_id // local TVF: specify BE
Expand Down
14 changes: 14 additions & 0 deletions regression-test/suites/export_p0/test_outfile.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,15 @@ suite("test_outfile") {
assertEquals(response.msg, "success")
def configJson = response.data.rows
boolean enableOutfileToLocal = false
boolean enableDeleteExistingFiles = false
for (Object conf: configJson) {
assert conf instanceof Map
if (((Map<String, String>) conf).get("Name").toLowerCase() == "enable_outfile_to_local") {
enableOutfileToLocal = ((Map<String, String>) conf).get("Value").toLowerCase() == "true"
}
if (((Map<String, String>) conf).get("Name").toLowerCase() == "enable_delete_existing_files") {
enableDeleteExistingFiles = ((Map<String, String>) conf).get("Value").toLowerCase() == "true"
}
}
if (!enableOutfileToLocal) {
logger.warn("Please set enable_outfile_to_local to true to run test_outfile")
Expand Down Expand Up @@ -233,4 +237,14 @@ suite("test_outfile") {
path.delete();
}
}

if (enableDeleteExistingFiles) {
test {
sql """
SELECT 1 INTO OUTFILE "file://${outFile}/test_outfile_delete_existing_files_${uuid}/"
PROPERTIES("delete_existing_files" = "true");
"""
exception "Local file system does not support delete existing files"
}
}
}
Loading
Loading