Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.Map.*;
import java.util.zip.*;

import org.basex.io.out.*;
import org.basex.query.*;
import org.basex.query.expr.*;
import org.basex.query.iter.*;
Expand All @@ -29,9 +28,12 @@
public class ArchiveCreate extends ArchiveFn {
@Override
public Item item(final QueryContext qc, final InputInfo ii) throws QueryException {
final ArrayOutput ao = new ArrayOutput();
create(ao, qc);
return B64.get(ao.finish());
try(SpillOutput so = new SpillOutput(qc)) {
create(so, qc);
return so.result(ARCHIVE_ERROR_X);
} catch(final IOException ex) {
throw ARCHIVE_ERROR_X.get(info, ex);
}
}

/**
Expand All @@ -40,7 +42,7 @@ public Item item(final QueryContext qc, final InputInfo ii) throws QueryExceptio
* @param qc query context
* @throws QueryException query exception
*/
public final void create(final OutputStream os, final QueryContext qc) throws QueryException {
public void create(final OutputStream os, final QueryContext qc) throws QueryException {
final Map<String, Entry<Item, Item>> files = toFiles(arg(0), arg(1), qc);
final CreateOptions options = toOptions(arg(2), new CreateOptions(), qc);
create(files, options, os, qc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.AbstractMap.*;

import org.basex.io.*;
import org.basex.io.out.*;
import org.basex.query.*;
import org.basex.query.value.*;
import org.basex.query.value.item.*;
Expand All @@ -24,6 +23,16 @@
public final class ArchiveCreateFrom extends ArchiveCreate {
@Override
public B64 item(final QueryContext qc, final InputInfo ii) throws QueryException {
try(SpillOutput so = new SpillOutput(qc)) {
create(so, qc);
return so.result(ARCHIVE_ERROR_X);
} catch(final IOException ex) {
throw ARCHIVE_ERROR_X.get(info, ex);
}
}

@Override
public void create(final OutputStream os, final QueryContext qc) throws QueryException {
final IOFile root = new IOFile(toPath(arg(0), qc));
final CreateFromOptions options = toOptions(arg(1), new CreateFromOptions(), qc);
Value entries = arg(2).value(qc);
Expand All @@ -47,20 +56,18 @@ public B64 item(final QueryContext qc, final InputInfo ii) throws QueryException
final int level = level(options);
final String format = options.get(CreateOptions.FORMAT).toLowerCase(Locale.ENGLISH);
final String dir = rootDir && root.parent() != null ? root.name() + '/' : "";
final ArrayOutput ao = new ArrayOutput();
try(ArchiveOut out = ArchiveOut.get(format, info, ao)) {
try(ArchiveOut out = ArchiveOut.get(format, info, os)) {
out.level(level);
try {
for(final Item item : entries) {
final IOFile file = new IOFile(root, toString(item, qc));
if(!file.exists()) throw FILE_NOT_FOUND_X.get(info, file);
if(file.isDir()) throw FILE_IS_DIR_X.get(info, file);
add(new SimpleEntry<>(item, B64.get(file.read())), out, level, dir, qc);
add(new SimpleEntry<>(item, new B64Lazy(file, FILE_NOT_FOUND_X)), out, level, dir, qc);
}
} catch(final IOException ex) {
throw ARCHIVE_ERROR_X.get(info, ex);
}
}
return B64.get(ao.finish());
}
}
126 changes: 126 additions & 0 deletions basex-core/src/main/java/org/basex/query/func/archive/SpillOutput.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package org.basex.query.func.archive;

import java.io.*;
import java.util.*;

import org.basex.io.*;
import org.basex.query.QueryContext;
import org.basex.query.QueryError;
import org.basex.query.value.item.*;
import org.basex.util.*;

/**
* Spill output stream.
*
* This class provides an output stream that buffers data in memory, then spills transparently to a
* temporary file if the data exceeds the threshold. The threshold uses the same formula as
* {@code Add#cache}: half of {@code (maxMemory - freeMemory)}, capped at the maximum array size.
* The result can be retrieved as a binary item via the {@link #result} method, which returns a lazy
* reference to the temporary file if data was spilled, or an in-memory binary item otherwise. The
* temporary file is registered with the query context's resources for automatic deletion when the
* query finishes.
*
* This class is used by {@link ArchiveCreateFrom} and {@link ArchiveCreate}. When creating small
* archives data is kept in memory so unnecessary disk I/O and temp file management is avoided. When
* creating large archives, the threshold-based spill mechanism avoids excessive memory usage and
* potential errors due to out of memory conditions or exceding the maximum array size.
*
* @author BaseX Team, BSD License
*/
final class SpillOutput extends OutputStream {
/** In-memory buffer. */
private byte[] buffer = new byte[Array.INITIAL_CAPACITY];
/** Number of bytes written to the in-memory buffer. */
private int bufSize;
/** Disk output stream (null before spilling). */
private FileOutputStream disk;
/** Temporary file (null before spilling). */
private IOFile tmpFile;
/** Threshold in bytes before spilling to disk. */
private final long threshold;
/** Query context for registering the temporary file on spill. */
private final QueryContext qc;

/**
* Constructor. Computes the spill threshold from current heap availability.
* @param qc query context
*/
SpillOutput(final QueryContext qc) {
this.qc = qc;
final Runtime rt = Runtime.getRuntime();
threshold = Math.min((rt.maxMemory() - rt.freeMemory()) / 2, Array.MAX_SIZE);
}

/**
* Constructor with an explicit spill threshold (used for testing).
* @param qc query context
* @param threshold spill threshold in bytes
*/
SpillOutput(final QueryContext qc, final long threshold) {
this.qc = qc;
this.threshold = threshold;
}

@Override
public void write(final int b) throws IOException {
if(disk == null && bufSize + 1 > threshold) spill();
if(disk != null) {
disk.write(b);
} else {
if(bufSize == buffer.length) buffer = Arrays.copyOf(buffer, Array.newCapacity(bufSize));
buffer[bufSize++] = (byte) b;
}
}

@Override
public void write(final byte[] b, final int off, final int len) throws IOException {
if(disk == null && (long) bufSize + len > threshold) spill();
if(disk != null) {
disk.write(b, off, len);
} else {
final int newSize = bufSize + len;
if(newSize > buffer.length)
buffer = Arrays.copyOf(buffer, Math.max(Array.newCapacity(buffer.length), newSize));
System.arraycopy(b, off, buffer, bufSize, len);
bufSize = newSize;
}
}

/**
* Returns the result as a binary item: a lazy reference to the temporary file
* if data was spilled, or an in-memory binary item otherwise.
* @param error error to raise if the temporary file cannot be read
* @return binary item
*/
B64 result(final QueryError error) {
if(tmpFile != null) return new B64Lazy(tmpFile, error);
return B64.get(bufSize == 0 ? Token.EMPTY : bufSize == buffer.length ? buffer :
Arrays.copyOf(buffer, bufSize));
}

/**
* Closes the disk output stream if one was opened. {@code tmpFile} is intentionally
* not nulled here because {@link #result} may be called after {@code close} and still
* needs it to determine whether data was spilled.
*/
@Override
public void close() throws IOException {
if(disk != null) {
disk.close();
disk = null;
}
}

/**
* Spills the in-memory buffer to a temporary file, registers it for deletion
* when the query context closes, and switches subsequent writes to disk.
* @throws IOException I/O exception
*/
private void spill() throws IOException {
tmpFile = new IOFile(File.createTempFile(Prop.NAME + '-', IO.TMPSUFFIX));
qc.resources.index(TempFiles.class).add(tmpFile);
disk = tmpFile.outputStream();
disk.write(buffer, 0, bufSize);
buffer = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.basex.query.func.archive;

import java.util.*;

import org.basex.io.*;
import org.basex.query.*;

/**
* Temporary files created during query evaluation.
*
* Temporary files that are created during query evaluation and registered in this class will be
* deleted when the query context is closed, ensuring that no temporary files are left behind after
* the query execution.
*
* @author BaseX Team, BSD License
*/
public final class TempFiles implements QueryResource {
/** List of temporary files. */
private final List<IOFile> files = new ArrayList<>();

/**
* Adds a temporary file to be deleted on close.
* @param file temporary file
*/
synchronized void add(final IOFile file) {
files.add(file);
}

@Override
public synchronized void close() {
for(final IOFile file : files) file.delete();
files.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ final void write(final boolean append, final QueryContext qc) throws QueryExcept
} else {
// write full file
try(BufferOutput out = BufferOutput.get(new FileOutputStream(path.toFile(), append))) {
if(arg(1).getClass() == ArchiveCreate.class) {
// optimization: stream archive to disk (no support for ArchiveCreateFrom)
((ArchiveCreate) arg(1)).create(out, qc);
if(arg(1) instanceof final ArchiveCreate ac) {
// optimization: stream archive to disk
ac.create(out, qc);
} else {
IO.write(toBin(arg(1), qc).input(info), out);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

import static org.basex.query.QueryError.*;
import static org.basex.query.func.Function.*;
import static org.junit.jupiter.api.Assertions.*;

import java.io.*;

import org.basex.*;
import org.basex.io.*;
import org.basex.util.*;
import org.junit.jupiter.api.*;

Expand Down Expand Up @@ -95,6 +99,22 @@ public final class ArchiveModuleTest extends SandboxTest {
query("parse-xml(" + _ARCHIVE_EXTRACT_TEXT.args(func.args(DIR, " {}"),
"input.xml") + ") instance of document-node()", true);

// standalone use: verify no temp files remain after query context closes
final File tmpDir = new File(Prop.TEMPDIR);
final int tmpsBefore = tmpDir.listFiles(
f -> f.getName().startsWith(Prop.NAME + '-') && f.getName().endsWith(IO.TMPSUFFIX)).length;
countEntries(func.args(DIR), 5);
assertEquals(tmpsBefore, tmpDir.listFiles(
f -> f.getName().startsWith(Prop.NAME + '-') && f.getName().endsWith(IO.TMPSUFFIX)).length);

// write directly to file (streaming path, no temp file and no in-memory accumulation)
final String tmp = Prop.TEMPDIR + NAME + "createFrom";
query(_FILE_WRITE_BINARY.args(tmp, func.args(DIR)));
countEntries(tmp, 5);
query("parse-xml(" + _ARCHIVE_EXTRACT_TEXT.args(tmp, "input.xml") +
") instance of document-node()", true);
query(_FILE_DELETE.args(tmp));

// errors
error(func.args("UNUNUNKNOWN"), FILE_NO_DIR_X);
error(func.args(DIR, " {}", "UNUNUNKNOWN"), FILE_NOT_FOUND_X);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.basex.query.func.archive;

import static org.junit.jupiter.api.Assertions.*;

import java.io.*;

import org.basex.*;
import org.basex.io.*;
import org.basex.query.*;
import org.basex.query.value.item.*;
import org.basex.util.*;
import org.junit.jupiter.api.*;

/**
* Tests for {@link SpillOutput}.
*
* @author BaseX Team, BSD License
*/
public final class SpillOutputTest extends SandboxTest {
/**
* Small data stays in memory: result is an in-memory binary item, content is correct.
* @throws IOException I/O exception
*/
@Test public void inMemoryPath() throws IOException, QueryException {
final byte[] data = { 1, 2, 3 };
try(QueryContext qc = new QueryContext(context);
SpillOutput so = new SpillOutput(qc, 1024)) {
so.write(data);
final B64 result = so.result(QueryError.ARCHIVE_ERROR_X);
assertFalse(result instanceof B64Lazy, "expected in-memory item");
assertArrayEquals(data, result.binary(null));
}
}

/**
* Data exceeding the threshold spills to disk: result is a lazy reference, content is correct.
* @throws IOException I/O exception
* @throws QueryException query exception
*/
@Test public void spillPath() throws IOException, QueryException {
final byte[] data = { 10, 20, 30, 40, 50 };
try(QueryContext qc = new QueryContext(context);
SpillOutput so = new SpillOutput(qc, 3)) {
so.write(data);
final B64 result = so.result(QueryError.ARCHIVE_ERROR_X);
assertTrue(result instanceof B64Lazy, "expected lazy (spilled) item");
assertArrayEquals(data, result.binary(null));
}
}

/**
* Temp file is deleted when the query context closes.
* @throws IOException I/O exception
*/
@Test public void tempFileDeletedOnQueryClose() throws IOException {
final File tmpDir = new File(Prop.TEMPDIR);
final int before = countTempFiles(tmpDir);

try(QueryContext qc = new QueryContext(context)) {
try(SpillOutput so = new SpillOutput(qc, 0)) {
so.write(new byte[] { 1, 2, 3 });
so.result(QueryError.ARCHIVE_ERROR_X);
}
assertEquals(before + 1, countTempFiles(tmpDir), "temp file should exist while qc is open");
}
assertEquals(before, countTempFiles(tmpDir), "temp file should be deleted after qc closes");
}

/**
* Calling close twice does not throw.
* @throws IOException I/O exception
*/
@Test public void closeIsIdempotent() throws IOException {
try(QueryContext qc = new QueryContext(context)) {
final SpillOutput so = new SpillOutput(qc, 0);
so.write(new byte[] { 1 });
so.close();
assertDoesNotThrow(so::close);
}
}

/**
* Counts BaseX temporary files in a directory.
* @param dir directory to check
* @return number of matching files
*/
private static int countTempFiles(final File dir) {
final File[] files = dir.listFiles(
f -> f.getName().startsWith(Prop.NAME + '-') &&
f.getName().endsWith(IO.TMPSUFFIX));
return files == null ? 0 : files.length;
}
}
Loading
Loading