From 874083de937f1d039c823329a4ae37630425f588 Mon Sep 17 00:00:00 2001 From: Gokul Soundararajan Date: Tue, 21 Apr 2026 20:20:12 +0000 Subject: [PATCH 1/5] feat: add log4j-iceberg appender module for writing logs to Apache Iceberg tables Adds a new log4j-iceberg module that provides an IcebergAppender plugin for writing log events as Parquet-backed rows in an Iceberg table. The appender buffers events and periodically flushes them as Parquet data files committed to the table. Commit failures are retried with exponential backoff and table metadata refresh; persistent failures crash rather than silently dropping data. Uses Apache Iceberg 1.10.1 and Parquet 1.16.0. Supports any Iceberg catalog implementation (Hadoop, REST, etc.) via configuration. --- log4j-iceberg/pom.xml | 226 +++++++++++++ .../log4j/iceberg/IcebergAppender.java | 174 ++++++++++ .../log4j/iceberg/IcebergCommitException.java | 24 ++ .../logging/log4j/iceberg/IcebergManager.java | 308 ++++++++++++++++++ .../logging/log4j/iceberg/package-info.java | 25 ++ pom.xml | 7 + 6 files changed, 764 insertions(+) create mode 100644 log4j-iceberg/pom.xml create mode 100644 log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergAppender.java create mode 100644 log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergCommitException.java create mode 100644 log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergManager.java create mode 100644 log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/package-info.java diff --git a/log4j-iceberg/pom.xml b/log4j-iceberg/pom.xml new file mode 100644 index 00000000000..18cde56d19b --- /dev/null +++ b/log4j-iceberg/pom.xml @@ -0,0 +1,226 @@ + + + + + 4.0.0 + + + org.apache.logging.log4j + log4j + ${revision} + ../log4j-parent + + + log4j-iceberg + + Apache Log4j Iceberg + Apache Iceberg appender for Log4j. Writes log events as rows in an Iceberg table using Parquet files. + + + org.apache.logging.log4j.core + 1.10.1 + 1.16.0 + 3.4.1 + + + + + + org.xerial.snappy + snappy-java + 1.1.10.7 + + + io.airlift + aircompressor + 2.0.2 + + + org.apache.commons + commons-text + 1.11.0 + + + + + + + + org.apache.logging.log4j + log4j-core + + + + + org.apache.iceberg + iceberg-core + ${iceberg.version} + + + org.apache.iceberg + iceberg-parquet + ${iceberg.version} + + + org.apache.iceberg + iceberg-data + ${iceberg.version} + + + org.apache.iceberg + iceberg-api + ${iceberg.version} + + + + + org.apache.parquet + parquet-avro + ${parquet.version} + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.slf4j + slf4j-reload4j + + + ch.qos.reload4j + reload4j + + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + org.slf4j + slf4j-reload4j + + + ch.qos.reload4j + reload4j + + + org.apache.hadoop + hadoop-yarn-common + + + org.apache.hadoop + hadoop-yarn-client + + + org.apache.hadoop + hadoop-hdfs-client + + + + + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.apache.logging.log4j + log4j-core-test + test + + + org.mockito + mockito-core + test + + + org.assertj + assertj-core + 3.26.3 + test + + + + + + + + + biz.aQute.bnd + bnd-baseline-maven-plugin + + + check-api-compat + + baseline + + + true + + + + + + org.jacoco + jacoco-maven-plugin + 0.8.12 + + + + prepare-agent + + + + report + + report + + test + + + check + + check + + + + + BUNDLE + + + LINE + COVEREDRATIO + 1.00 + + + + + + + + + + + + diff --git a/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergAppender.java b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergAppender.java new file mode 100644 index 00000000000..50290990466 --- /dev/null +++ b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergAppender.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.iceberg; + +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.Core; +import org.apache.logging.log4j.core.Filter; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.config.Property; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory; +import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; + +/** + * Log4j appender that writes log events as rows in an Apache Iceberg table + * backed by Parquet data files. + * + *

Configuration example:

+ *
+ * <Iceberg name="IcebergAppender"
+ *          catalogName="my_catalog"
+ *          catalogImpl="org.apache.iceberg.rest.RESTCatalog"
+ *          catalogUri="http://localhost:8181"
+ *          catalogWarehouse="s3://my-bucket/warehouse"
+ *          tableNamespace="logs"
+ *          tableName="app_logs"
+ *          batchSize="1000"
+ *          flushIntervalSeconds="30">
+ *   <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n"/>
+ * </Iceberg>
+ * 
+ */ +@Plugin(name = "Iceberg", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true) +public class IcebergAppender extends AbstractAppender { + + private final IcebergManager manager; + + private IcebergAppender( + final String name, + final Filter filter, + final boolean ignoreExceptions, + final Property[] properties, + final IcebergManager manager) { + super(name, filter, null, ignoreExceptions, properties); + this.manager = manager; + } + + @Override + public void append(final LogEvent event) { + manager.write(event.toImmutable()); + } + + @Override + public void start() { + manager.startup(); + super.start(); + } + + @Override + public boolean stop(final long timeout, final TimeUnit timeUnit) { + setStopping(); + boolean stopped = super.stop(timeout, timeUnit, false); + stopped &= manager.stop(timeout, timeUnit); + setStopped(); + return stopped; + } + + @PluginBuilderFactory + public static > B newBuilder() { + return new Builder().asBuilder(); + } + + public static class Builder> extends AbstractAppender.Builder + implements org.apache.logging.log4j.core.util.Builder { + + @PluginBuilderAttribute + private String catalogName = "log4j"; + + @PluginBuilderAttribute + @Required(message = "No catalog implementation class provided") + private String catalogImpl; + + @PluginBuilderAttribute + private String catalogUri; + + @PluginBuilderAttribute + private String catalogWarehouse; + + @PluginBuilderAttribute + private String tableNamespace = "default"; + + @PluginBuilderAttribute + @Required(message = "No Iceberg table name provided") + private String tableName; + + @PluginBuilderAttribute + private int batchSize = 1000; + + @PluginBuilderAttribute + private int flushIntervalSeconds = 30; + + public B setCatalogName(final String catalogName) { + this.catalogName = catalogName; + return asBuilder(); + } + + public B setCatalogImpl(final String catalogImpl) { + this.catalogImpl = catalogImpl; + return asBuilder(); + } + + public B setCatalogUri(final String catalogUri) { + this.catalogUri = catalogUri; + return asBuilder(); + } + + public B setCatalogWarehouse(final String catalogWarehouse) { + this.catalogWarehouse = catalogWarehouse; + return asBuilder(); + } + + public B setTableNamespace(final String tableNamespace) { + this.tableNamespace = tableNamespace; + return asBuilder(); + } + + public B setTableName(final String tableName) { + this.tableName = tableName; + return asBuilder(); + } + + public B setBatchSize(final int batchSize) { + this.batchSize = batchSize; + return asBuilder(); + } + + public B setFlushIntervalSeconds(final int flushIntervalSeconds) { + this.flushIntervalSeconds = flushIntervalSeconds; + return asBuilder(); + } + + @Override + public IcebergAppender build() { + final IcebergManager manager = new IcebergManager( + getName(), + catalogName, + catalogImpl, + catalogUri, + catalogWarehouse, + tableNamespace, + tableName, + batchSize, + flushIntervalSeconds); + return new IcebergAppender(getName(), getFilter(), isIgnoreExceptions(), null, manager); + } + } +} diff --git a/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergCommitException.java b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergCommitException.java new file mode 100644 index 00000000000..1fd4eecaa3b --- /dev/null +++ b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergCommitException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.iceberg; + +public class IcebergCommitException extends RuntimeException { + + IcebergCommitException(final String message, final Throwable cause) { + super(message, cause); + } +} diff --git a/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergManager.java b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergManager.java new file mode 100644 index 00000000000..47993c8973a --- /dev/null +++ b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergManager.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.iceberg; + +import java.io.IOException; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.status.StatusLogger; + +/** + * Manages the lifecycle of the Iceberg catalog, table, and periodic flush of + * buffered log events into Parquet data files committed to the Iceberg table. + */ +class IcebergManager { + + private static final StatusLogger LOGGER = StatusLogger.getLogger(); + static final int MAX_COMMIT_RETRIES = 4; + private static final long RETRY_BASE_SLEEP_MS = 100; + + static final Schema LOG_SCHEMA = new Schema( + Types.NestedField.required(1, "timestamp", Types.TimestampType.withZone()), + Types.NestedField.required(2, "level", Types.StringType.get()), + Types.NestedField.required(3, "logger_name", Types.StringType.get()), + Types.NestedField.optional(4, "message", Types.StringType.get()), + Types.NestedField.optional(5, "thread_name", Types.StringType.get()), + Types.NestedField.optional(6, "thrown", Types.StringType.get()), + Types.NestedField.required(7, "event_date", Types.DateType.get())); + + private final String name; + private final String catalogName; + private final String catalogImpl; + private final String catalogUri; + private final String catalogWarehouse; + private final String tableNamespace; + private final String tableName; + private final int batchSize; + private final int flushIntervalSeconds; + + private final ReentrantLock lock = new ReentrantLock(); + private List buffer; + + Catalog catalog; + Table table; + ScheduledExecutorService scheduler; + ScheduledFuture flushTask; + volatile boolean running; + + IcebergManager( + final String name, + final String catalogName, + final String catalogImpl, + final String catalogUri, + final String catalogWarehouse, + final String tableNamespace, + final String tableName, + final int batchSize, + final int flushIntervalSeconds) { + this.name = name; + this.catalogName = catalogName; + this.catalogImpl = catalogImpl; + this.catalogUri = catalogUri; + this.catalogWarehouse = catalogWarehouse; + this.tableNamespace = tableNamespace; + this.tableName = tableName; + this.batchSize = batchSize; + this.flushIntervalSeconds = flushIntervalSeconds; + this.buffer = new ArrayList<>(batchSize); + } + + void startup() { + if (running) { + return; + } + LOGGER.info("Starting IcebergManager [{}]", name); + + final Map catalogProperties = new HashMap<>(); + catalogProperties.put("type", catalogImpl); + if (catalogUri != null) { + catalogProperties.put("uri", catalogUri); + } + if (catalogWarehouse != null) { + catalogProperties.put("warehouse", catalogWarehouse); + } + + catalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProperties, new Configuration()); + final TableIdentifier tableId = TableIdentifier.of(Namespace.of(tableNamespace), tableName); + + if (!((org.apache.iceberg.catalog.SupportsNamespaces) catalog).namespaceExists(Namespace.of(tableNamespace))) { + ((org.apache.iceberg.catalog.SupportsNamespaces) catalog).createNamespace(Namespace.of(tableNamespace)); + } + + if (!catalog.tableExists(tableId)) { + table = catalog.createTable(tableId, LOG_SCHEMA); + LOGGER.info("Created Iceberg table {}", tableId); + } else { + table = catalog.loadTable(tableId); + LOGGER.info("Loaded existing Iceberg table {}", tableId); + } + + scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + final Thread t = new Thread(r, "log4j-iceberg-flush-" + name); + t.setDaemon(true); + return t; + }); + flushTask = scheduler.scheduleAtFixedRate( + this::flush, flushIntervalSeconds, flushIntervalSeconds, TimeUnit.SECONDS); + + running = true; + } + + void write(final LogEvent event) { + lock.lock(); + try { + buffer.add(event); + if (buffer.size() >= batchSize) { + flushBuffer(); + } + } finally { + lock.unlock(); + } + } + + void flush() { + lock.lock(); + try { + flushBuffer(); + } finally { + lock.unlock(); + } + } + + private void flushBuffer() { + if (buffer.isEmpty()) { + return; + } + final List events = buffer; + buffer = new ArrayList<>(batchSize); + + final DataFile dataFile; + try { + dataFile = writeParquetFile(events); + } catch (final Exception e) { + throw new IcebergCommitException( + "Failed to write Parquet file for Iceberg table " + tableNamespace + "." + tableName, e); + } + + commitWithRetry(dataFile, events.size()); + } + + private void commitWithRetry(final DataFile dataFile, final int eventCount) { + int attempt = 0; + while (true) { + try { + final AppendFiles append = table.newAppend(); + append.appendFile(dataFile); + append.commit(); + LOGGER.debug("Committed {} log events to Iceberg table {}.{}", eventCount, tableNamespace, tableName); + return; + } catch (final Exception e) { + if (attempt >= MAX_COMMIT_RETRIES) { + throw new IcebergCommitException( + "Failed to commit to Iceberg table " + + tableNamespace + + "." + + tableName + + " after " + + (MAX_COMMIT_RETRIES + 1) + + " attempts", + e); + } + final long sleepMs = RETRY_BASE_SLEEP_MS * (1L << attempt); + LOGGER.warn( + "Commit attempt {} failed for Iceberg table {}.{}, retrying in {} ms", + attempt + 1, + tableNamespace, + tableName, + sleepMs, + e); + try { + Thread.sleep(sleepMs); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IcebergCommitException( + "Interrupted while retrying commit to Iceberg table " + tableNamespace + "." + tableName, + e); + } + table.refresh(); + attempt++; + } + } + } + + private DataFile writeParquetFile(final List events) throws IOException { + final String filename = String.format( + "%s/data/%s-%d-%d.parquet", + table.location(), + name, + System.currentTimeMillis(), + Thread.currentThread().getId()); + final OutputFile outputFile = table.io().newOutputFile(filename); + + try (DataWriter writer = Parquet.writeData(outputFile) + .schema(LOG_SCHEMA) + .withSpec(table.spec()) + .createWriterFunc(GenericParquetWriter::create) + .overwrite() + .build()) { + for (final LogEvent event : events) { + writer.write(toRecord(event)); + } + } + + return org.apache.iceberg.DataFiles.builder(table.spec()) + .withPath(filename) + .withFileSizeInBytes(outputFile.toInputFile().getLength()) + .withRecordCount(events.size()) + .withFormat(org.apache.iceberg.FileFormat.PARQUET) + .build(); + } + + private GenericRecord toRecord(final LogEvent event) { + final GenericRecord record = GenericRecord.create(LOG_SCHEMA); + final Instant instant = Instant.ofEpochMilli(event.getTimeMillis()); + record.setField("timestamp", instant.atOffset(ZoneOffset.UTC)); + record.setField("level", event.getLevel().name()); + record.setField("logger_name", event.getLoggerName()); + record.setField( + "message", event.getMessage() != null ? event.getMessage().getFormattedMessage() : null); + record.setField("thread_name", event.getThreadName()); + record.setField("thrown", event.getThrown() != null ? event.getThrown().toString() : null); + record.setField("event_date", instant.atOffset(ZoneOffset.UTC).toLocalDate()); + return record; + } + + boolean stop(final long timeout, final TimeUnit timeUnit) { + if (!running) { + return true; + } + LOGGER.info("Stopping IcebergManager [{}]", name); + running = false; + + if (flushTask != null) { + flushTask.cancel(false); + } + if (scheduler != null) { + scheduler.shutdown(); + try { + scheduler.awaitTermination(timeout, timeUnit); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + flush(); + + if (catalog instanceof java.io.Closeable) { + try { + ((java.io.Closeable) catalog).close(); + } catch (final IOException e) { + LOGGER.warn("Error closing Iceberg catalog", e); + } + } + return true; + } + + String getName() { + return name; + } +} diff --git a/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/package-info.java b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/package-info.java new file mode 100644 index 00000000000..fb606a77d36 --- /dev/null +++ b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Log4j appender for writing log events to Apache Iceberg tables as Parquet files. + */ +@Export +@Version("2.26.0") +package org.apache.logging.log4j.iceberg; + +import org.osgi.annotation.bundle.Export; +import org.osgi.annotation.versioning.Version; diff --git a/pom.xml b/pom.xml index 4020b4b427d..fde442f01f8 100644 --- a/pom.xml +++ b/pom.xml @@ -248,6 +248,7 @@ log4j-couchdb log4j-docker log4j-fuzz-test + log4j-iceberg log4j-iostreams log4j-jakarta-jms log4j-jakarta-smtp @@ -439,6 +440,12 @@ 2.23.1 + + org.apache.logging.log4j + log4j-iceberg + ${project.version} + + org.apache.logging.log4j log4j-iostreams From c0b48f9dec8bfbfebbe4c89aa8e4d24b143f2b8a Mon Sep 17 00:00:00 2001 From: Gokul Soundararajan Date: Tue, 21 Apr 2026 20:20:19 +0000 Subject: [PATCH 2/5] test: add unit tests for log4j-iceberg with 100% line coverage Covers all code paths in IcebergAppender and IcebergManager including: - Table creation, loading existing tables, idempotent startup - Event write, batch-triggered flush, empty flush, manual flush - Events with thrown exceptions, null messages - Commit retry on transient failure, crash after retry exhaustion - Parquet write failure, interrupted retry - Stop lifecycle (closeable catalog, IOException on close, null scheduler) - Builder setters and factory method JaCoCo enforces 100% line coverage at build time. --- .../log4j/iceberg/IcebergAppenderTest.java | 149 +++++ .../log4j/iceberg/IcebergManagerTest.java | 518 ++++++++++++++++++ 2 files changed, 667 insertions(+) create mode 100644 log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergAppenderTest.java create mode 100644 log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergManagerTest.java diff --git a/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergAppenderTest.java b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergAppenderTest.java new file mode 100644 index 00000000000..87c8c9b62b3 --- /dev/null +++ b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergAppenderTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.impl.Log4jLogEvent; +import org.apache.logging.log4j.message.SimpleMessage; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class IcebergAppenderTest { + + private Path warehouseDir; + private IcebergAppender appender; + + @BeforeEach + void setUp() throws IOException { + warehouseDir = Files.createTempDirectory("iceberg-appender-test"); + } + + @AfterEach + void tearDown() throws IOException { + if (appender != null && appender.isStarted()) { + appender.stop(5, TimeUnit.SECONDS); + } + Files.walk(warehouseDir) + .sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .forEach(File::delete); + } + + @SuppressWarnings("unchecked") + private IcebergAppender buildAppender(final String tableName) { + final IcebergAppender.Builder> builder = IcebergAppender.newBuilder(); + builder.setName("testAppender"); + builder.setCatalogName("test_catalog"); + builder.setCatalogImpl("hadoop"); + builder.setCatalogWarehouse(warehouseDir.toAbsolutePath().toString()); + builder.setTableNamespace("test_ns"); + builder.setTableName(tableName); + builder.setBatchSize(100); + builder.setFlushIntervalSeconds(3600); + return builder.build(); + } + + @Test + void builderCreatesAppender() { + appender = buildAppender("builder_test"); + assertThat(appender).isNotNull(); + assertThat(appender.getName()).isEqualTo("testAppender"); + } + + @Test + void startAndStop() { + appender = buildAppender("start_stop"); + appender.start(); + assertThat(appender.isStarted()).isTrue(); + + assertThat(appender.stop(5, TimeUnit.SECONDS)).isTrue(); + assertThat(appender.isStopped()).isTrue(); + } + + @Test + void appendWritesEvents() throws IOException { + appender = buildAppender("append_test"); + appender.start(); + + final LogEvent event = Log4jLogEvent.newBuilder() + .setLoggerName("com.example.AppenderTest") + .setLevel(Level.WARN) + .setMessage(new SimpleMessage("appender test message")) + .setThreadName("test-thread") + .setTimeMillis(System.currentTimeMillis()) + .build(); + + appender.append(event); + appender.stop(5, TimeUnit.SECONDS); + + final IcebergManager reader = new IcebergManager( + "reader", + "test_catalog", + "hadoop", + null, + warehouseDir.toAbsolutePath().toString(), + "test_ns", + "append_test", + 100, + 3600); + reader.startup(); + reader.table.refresh(); + try (CloseableIterable records = + IcebergGenerics.read(reader.table).build()) { + int count = 0; + for (final Record record : records) { + assertThat(record.getField("level")).isEqualTo("WARN"); + assertThat(record.getField("message")).isEqualTo("appender test message"); + count++; + } + assertThat(count).isEqualTo(1); + } + reader.stop(5, TimeUnit.SECONDS); + } + + @Test + void newBuilderFactory() { + final IcebergAppender.Builder builder = IcebergAppender.newBuilder(); + assertThat(builder).isNotNull(); + } + + @Test + void builderSettersReturnBuilder() { + final IcebergAppender.Builder builder = IcebergAppender.newBuilder(); + assertThat(builder.setCatalogName("c")).isSameAs(builder); + assertThat(builder.setCatalogImpl("i")).isSameAs(builder); + assertThat(builder.setCatalogUri("u")).isSameAs(builder); + assertThat(builder.setCatalogWarehouse("w")).isSameAs(builder); + assertThat(builder.setTableNamespace("n")).isSameAs(builder); + assertThat(builder.setTableName("t")).isSameAs(builder); + assertThat(builder.setBatchSize(10)).isSameAs(builder); + assertThat(builder.setFlushIntervalSeconds(5)).isSameAs(builder); + } +} diff --git a/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergManagerTest.java b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergManagerTest.java new file mode 100644 index 00000000000..8797aac3900 --- /dev/null +++ b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergManagerTest.java @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFile; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.impl.Log4jLogEvent; +import org.apache.logging.log4j.message.SimpleMessage; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +class IcebergManagerTest { + + private Path warehouseDir; + private IcebergManager manager; + + @BeforeEach + void setUp() throws IOException { + warehouseDir = Files.createTempDirectory("iceberg-test-warehouse"); + } + + @AfterEach + void tearDown() throws IOException { + if (manager != null) { + manager.stop(5, TimeUnit.SECONDS); + } + Files.walk(warehouseDir) + .sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .forEach(File::delete); + } + + private IcebergManager createManager(final String tableName, final int batchSize) { + return new IcebergManager( + "test", + "test_catalog", + "hadoop", + null, + warehouseDir.toAbsolutePath().toString(), + "test_ns", + tableName, + batchSize, + 3600); + } + + private LogEvent makeEvent(final String loggerName, final Level level, final String message) { + return Log4jLogEvent.newBuilder() + .setLoggerName(loggerName) + .setLevel(level) + .setMessage(new SimpleMessage(message)) + .setThreadName("test-thread") + .setTimeMillis(System.currentTimeMillis()) + .build(); + } + + private LogEvent makeEventWithThrown( + final String loggerName, final Level level, final String message, final Throwable thrown) { + return Log4jLogEvent.newBuilder() + .setLoggerName(loggerName) + .setLevel(level) + .setMessage(new SimpleMessage(message)) + .setThreadName("test-thread") + .setTimeMillis(System.currentTimeMillis()) + .setThrown(thrown) + .build(); + } + + private LogEvent makeEventNullMessage(final String loggerName, final Level level) { + return Log4jLogEvent.newBuilder() + .setLoggerName(loggerName) + .setLevel(level) + .setThreadName("test-thread") + .setTimeMillis(System.currentTimeMillis()) + .build(); + } + + @Test + void startupCreatesNewTable() { + manager = createManager("new_table", 100); + manager.startup(); + + assertThat(manager.table).isNotNull(); + assertThat(manager.catalog.tableExists(TableIdentifier.of(Namespace.of("test_ns"), "new_table"))) + .isTrue(); + } + + @Test + void startupLoadsExistingTable() { + manager = createManager("existing_table", 100); + manager.startup(); + + final Table firstTable = manager.table; + manager.stop(5, TimeUnit.SECONDS); + + manager = createManager("existing_table", 100); + manager.startup(); + assertThat(manager.table).isNotNull(); + assertThat(manager.table.location()).isEqualTo(firstTable.location()); + } + + @Test + void startupIdempotentWhenAlreadyRunning() { + manager = createManager("idempotent_table", 100); + manager.startup(); + final Table firstTable = manager.table; + + manager.startup(); + assertThat(manager.table).isSameAs(firstTable); + } + + @Test + void writeAndFlushSingleEvent() throws IOException { + manager = createManager("single_event", 100); + manager.startup(); + + manager.write(makeEvent("com.example.Test", Level.INFO, "hello iceberg").toImmutable()); + manager.flush(); + + manager.table.refresh(); + try (CloseableIterable records = + IcebergGenerics.read(manager.table).build()) { + int count = 0; + for (final Record record : records) { + assertThat(record.getField("level")).isEqualTo("INFO"); + assertThat(record.getField("logger_name")).isEqualTo("com.example.Test"); + assertThat(record.getField("message")).isEqualTo("hello iceberg"); + assertThat(record.getField("thread_name")).isEqualTo("test-thread"); + assertThat(record.getField("thrown")).isNull(); + assertThat(record.getField("timestamp")).isNotNull(); + assertThat(record.getField("event_date")).isNotNull(); + count++; + } + assertThat(count).isEqualTo(1); + } + } + + @Test + void writeTriggersFlushWhenBatchSizeReached() throws IOException { + manager = createManager("batch_flush", 3); + manager.startup(); + + manager.write(makeEvent("logger1", Level.DEBUG, "msg1").toImmutable()); + manager.write(makeEvent("logger2", Level.WARN, "msg2").toImmutable()); + manager.write(makeEvent("logger3", Level.ERROR, "msg3").toImmutable()); + + manager.table.refresh(); + try (CloseableIterable records = + IcebergGenerics.read(manager.table).build()) { + int count = 0; + for (final Record ignored : records) { + count++; + } + assertThat(count).isEqualTo(3); + } + } + + @Test + void flushOnEmptyBufferIsNoOp() { + manager = createManager("empty_flush", 100); + manager.startup(); + + manager.flush(); + + assertThat(manager.table.snapshots()).isEmpty(); + } + + @Test + void eventWithThrownIsRecorded() throws IOException { + manager = createManager("thrown_event", 100); + manager.startup(); + + final RuntimeException ex = new RuntimeException("test failure"); + manager.write(makeEventWithThrown("com.example.Error", Level.ERROR, "oops", ex) + .toImmutable()); + manager.flush(); + + manager.table.refresh(); + try (CloseableIterable records = + IcebergGenerics.read(manager.table).build()) { + for (final Record record : records) { + assertThat((String) record.getField("thrown")).contains("test failure"); + } + } + } + + @Test + void eventWithNullMessageIsRecorded() throws IOException { + manager = createManager("null_msg", 100); + manager.startup(); + + manager.write(makeEventNullMessage("com.example.Null", Level.TRACE).toImmutable()); + manager.flush(); + + manager.table.refresh(); + try (CloseableIterable records = + IcebergGenerics.read(manager.table).build()) { + for (final Record record : records) { + assertThat(record.getField("message")).isNull(); + assertThat(record.getField("thrown")).isNull(); + } + } + } + + @Test + void multipleFlushesProduceMultipleSnapshots() throws IOException { + manager = createManager("multi_flush", 100); + manager.startup(); + + manager.write(makeEvent("logger", Level.INFO, "batch1").toImmutable()); + manager.flush(); + + manager.write(makeEvent("logger", Level.INFO, "batch2").toImmutable()); + manager.flush(); + + manager.table.refresh(); + int snapshotCount = 0; + for (final org.apache.iceberg.Snapshot ignored : manager.table.snapshots()) { + snapshotCount++; + } + assertThat(snapshotCount).isEqualTo(2); + + try (CloseableIterable records = + IcebergGenerics.read(manager.table).build()) { + int count = 0; + for (final Record ignored : records) { + count++; + } + assertThat(count).isEqualTo(2); + } + } + + @Test + void stopWhenNotRunningReturnsTrueImmediately() { + manager = createManager("not_running", 100); + assertThat(manager.stop(1, TimeUnit.SECONDS)).isTrue(); + manager = null; + } + + @Test + void stopFlushesRemainingEvents() throws IOException { + manager = createManager("stop_flush", 100); + manager.startup(); + + manager.write(makeEvent("logger", Level.INFO, "pending").toImmutable()); + manager.stop(5, TimeUnit.SECONDS); + + final IcebergManager reader = createManager("stop_flush", 100); + reader.startup(); + reader.table.refresh(); + try (CloseableIterable records = + IcebergGenerics.read(reader.table).build()) { + int count = 0; + for (final Record record : records) { + assertThat(record.getField("message")).isEqualTo("pending"); + count++; + } + assertThat(count).isEqualTo(1); + } + reader.stop(5, TimeUnit.SECONDS); + manager = null; + } + + @Test + void getName() { + manager = createManager("name_test", 100); + assertThat(manager.getName()).isEqualTo("test"); + manager = null; + } + + @Test + void startupWithNullUriAndWarehouse() { + manager = new IcebergManager( + "test", + "test_catalog", + "hadoop", + null, + warehouseDir.toAbsolutePath().toString(), + "test_ns", + "null_uri_test", + 100, + 3600); + manager.startup(); + assertThat(manager.table).isNotNull(); + } + + @Test + void startupWithCatalogUri() { + manager = new IcebergManager( + "test", + "test_catalog", + "hadoop", + "file:///tmp/unused", + warehouseDir.toAbsolutePath().toString(), + "test_ns", + "uri_test", + 100, + 3600); + manager.startup(); + assertThat(manager.table).isNotNull(); + } + + @Test + void commitRetrySucceedsAfterTransientFailure() throws IOException { + manager = createManager("retry_ok", 100); + manager.startup(); + + manager.write(makeEvent("logger", Level.INFO, "retry event").toImmutable()); + + final Table original = manager.table; + final Table spyTable = Mockito.spy(original); + final AppendFiles failingAppend = Mockito.mock(AppendFiles.class); + Mockito.doThrow(new org.apache.iceberg.exceptions.CommitFailedException("conflict")) + .when(failingAppend) + .commit(); + final AppendFiles realAppend = original.newAppend(); + + Mockito.when(spyTable.newAppend()).thenReturn(failingAppend).thenReturn(realAppend); + manager.table = spyTable; + + manager.flush(); + + original.refresh(); + try (CloseableIterable records = IcebergGenerics.read(original).build()) { + int count = 0; + for (final Record record : records) { + assertThat(record.getField("message")).isEqualTo("retry event"); + count++; + } + assertThat(count).isEqualTo(1); + } + manager.table = original; + } + + @Test + void commitCrashesAfterExhaustingRetries() { + manager = createManager("retry_exhaust", 100); + manager.startup(); + + manager.write(makeEvent("logger", Level.INFO, "will exhaust retries").toImmutable()); + + final Table original = manager.table; + final Table mockTable = Mockito.mock(Table.class); + Mockito.when(mockTable.location()).thenReturn(original.location()); + Mockito.when(mockTable.io()).thenReturn(original.io()); + Mockito.when(mockTable.spec()).thenReturn(original.spec()); + final AppendFiles failingAppend = Mockito.mock(AppendFiles.class); + Mockito.doThrow(new org.apache.iceberg.exceptions.CommitFailedException("persistent conflict")) + .when(failingAppend) + .commit(); + Mockito.when(mockTable.newAppend()).thenReturn(failingAppend); + manager.table = mockTable; + + assertThatThrownBy(() -> manager.flush()) + .isInstanceOf(IcebergCommitException.class) + .hasMessageContaining("after " + (IcebergManager.MAX_COMMIT_RETRIES + 1) + " attempts") + .hasCauseInstanceOf(org.apache.iceberg.exceptions.CommitFailedException.class); + + manager.table = original; + } + + @Test + void stopHandlesInterruptedException() throws InterruptedException { + manager = createManager("interrupt_test", 100); + manager.startup(); + + final ScheduledExecutorService realScheduler = manager.scheduler; + realScheduler.shutdown(); + final ScheduledExecutorService mockScheduler = Mockito.mock(ScheduledExecutorService.class); + Mockito.when(mockScheduler.awaitTermination(Mockito.anyLong(), Mockito.any())) + .thenThrow(new InterruptedException("test interrupt")); + manager.scheduler = mockScheduler; + + assertThat(manager.stop(5, TimeUnit.SECONDS)).isTrue(); + assertThat(Thread.interrupted()).isTrue(); + manager = null; + } + + @Test + void stopClosesCloseableCatalog() { + manager = createManager("closeable_test", 100); + manager.startup(); + // HadoopCatalog implements Closeable, so this path is naturally covered + assertThat(manager.catalog).isInstanceOf(Closeable.class); + assertThat(manager.stop(5, TimeUnit.SECONDS)).isTrue(); + manager = null; + } + + @Test + void stopHandlesCloseableIOException() throws IOException { + manager = createManager("close_error_test", 100); + manager.startup(); + + // Replace catalog with a Closeable mock that throws on close + final CloseableCatalog mockCatalog = Mockito.mock(CloseableCatalog.class); + Mockito.doThrow(new IOException("close failed")).when(mockCatalog).close(); + manager.catalog = mockCatalog; + + assertThat(manager.stop(5, TimeUnit.SECONDS)).isTrue(); + manager = null; + } + + interface CloseableCatalog extends Catalog, Closeable {} + + @Test + void startupWithNullWarehouse() { + manager = + new IcebergManager("test", "test_catalog", "hadoop", null, null, "test_ns", "null_wh_test", 100, 3600); + try { + manager.startup(); + } catch (final Exception e) { + // Expected — hadoop catalog requires a warehouse + } + manager = null; + } + + @Test + void stopWithNullFlushTaskAndScheduler() { + manager = createManager("null_sched", 100); + manager.running = true; + manager.catalog = Mockito.mock(Catalog.class); + assertThat(manager.stop(1, TimeUnit.SECONDS)).isTrue(); + manager = null; + } + + @Test + void stopWithNonCloseableCatalog() { + manager = createManager("non_closeable", 100); + manager.running = true; + manager.catalog = Mockito.mock(Catalog.class); + assertThat(manager.stop(1, TimeUnit.SECONDS)).isTrue(); + manager = null; + } + + @Test + void writeParquetFileIOExceptionCrashes() { + manager = createManager("io_error", 100); + manager.startup(); + + manager.write(makeEvent("logger", Level.INFO, "will fail IO").toImmutable()); + final Table original = manager.table; + final Table mockTable = Mockito.mock(Table.class); + Mockito.when(mockTable.location()).thenReturn(original.location()); + Mockito.when(mockTable.spec()).thenReturn(original.spec()); + final org.apache.iceberg.io.FileIO mockIo = Mockito.mock(org.apache.iceberg.io.FileIO.class); + Mockito.when(mockTable.io()).thenReturn(mockIo); + final OutputFile mockOutput = Mockito.mock(OutputFile.class); + Mockito.when(mockIo.newOutputFile(Mockito.anyString())).thenReturn(mockOutput); + Mockito.when(mockOutput.createOrOverwrite()).thenThrow(new RuntimeException(new IOException("disk full"))); + manager.table = mockTable; + + assertThatThrownBy(() -> manager.flush()) + .isInstanceOf(IcebergCommitException.class) + .hasMessageContaining("Failed to write Parquet file"); + manager.table = original; + } + + @Test + void commitRetryInterruptedCrashes() { + manager = createManager("retry_interrupt", 100); + manager.startup(); + + manager.write(makeEvent("logger", Level.INFO, "will be interrupted").toImmutable()); + + final Table original = manager.table; + final Table spyTable = Mockito.spy(original); + final AppendFiles failingAppend = Mockito.mock(AppendFiles.class); + Mockito.doAnswer(invocation -> { + Thread.currentThread().interrupt(); + throw new org.apache.iceberg.exceptions.CommitFailedException("conflict"); + }) + .when(failingAppend) + .commit(); + Mockito.when(spyTable.newAppend()).thenReturn(failingAppend); + manager.table = spyTable; + + assertThatThrownBy(() -> manager.flush()) + .isInstanceOf(IcebergCommitException.class) + .hasMessageContaining("Interrupted"); + Thread.interrupted(); + manager.table = original; + } +} From bf4559c2585da51b87066fd904658cc7a437aefe Mon Sep 17 00:00:00 2001 From: Gokul Soundararajan Date: Tue, 21 Apr 2026 20:20:25 +0000 Subject: [PATCH 3/5] test: add performance benchmark comparing Iceberg and File appenders Writes 1M log events through both a local FileAppender and the IcebergAppender, verifying correctness and reporting a side-by-side comparison of write throughput, read throughput, disk size, and Parquet compression ratio. --- .../log4j/iceberg/IcebergPerformanceTest.java | 216 ++++++++++++++++++ 1 file changed, 216 insertions(+) create mode 100644 log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergPerformanceTest.java diff --git a/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergPerformanceTest.java b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergPerformanceTest.java new file mode 100644 index 00000000000..29e87d23969 --- /dev/null +++ b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergPerformanceTest.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.FileAppender; +import org.apache.logging.log4j.core.impl.Log4jLogEvent; +import org.apache.logging.log4j.core.layout.PatternLayout; +import org.apache.logging.log4j.message.SimpleMessage; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class IcebergPerformanceTest { + + private static final int TOTAL_RECORDS = 1_000_000; + private static final int RECORDS_PER_COMMIT = 100_000; + + private Path tempDir; + private IcebergManager manager; + + @BeforeEach + void setUp() throws IOException { + tempDir = Files.createTempDirectory("iceberg-perf-test"); + } + + @AfterEach + void tearDown() throws IOException { + if (manager != null) { + manager.stop(30, TimeUnit.SECONDS); + } + Files.walk(tempDir).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); + } + + private static final Level[] LEVELS = {Level.TRACE, Level.DEBUG, Level.INFO, Level.WARN, Level.ERROR}; + + private LogEvent buildEvent(final int i) { + return Log4jLogEvent.newBuilder() + .setLoggerName("com.example.perf.Logger" + (i % 100)) + .setLevel(LEVELS[i % LEVELS.length]) + .setMessage(new SimpleMessage("perf-message-" + i)) + .setThreadName("perf-thread-" + (i % 10)) + .setTimeMillis(1700000000000L + i) + .build(); + } + + @Test + void writeOneMillionRecordsWithPeriodicCommits() throws IOException { + + // ============================================================ + // Phase 1: File Appender benchmark + // ============================================================ + final Path logFile = tempDir.resolve("perf-test.log"); + final PatternLayout layout = PatternLayout.newBuilder() + .withPattern("%d %-5level [%t] %logger - %msg%n") + .build(); + final FileAppender fileAppender = FileAppender.newBuilder() + .setName("filePerf") + .withFileName(logFile.toString()) + .withImmediateFlush(false) + .withBufferedIo(true) + .withBufferSize(8192) + .setLayout(layout) + .build(); + fileAppender.start(); + + final long fileWriteStart = System.nanoTime(); + for (int i = 0; i < TOTAL_RECORDS; i++) { + fileAppender.append(buildEvent(i)); + } + fileAppender.stop(30, TimeUnit.SECONDS); + final long fileWriteMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fileWriteStart); + final double fileWriteThroughput = (double) TOTAL_RECORDS / fileWriteMs * 1000.0; + + int fileLineCount = 0; + try (BufferedReader reader = new BufferedReader(new FileReader(logFile.toFile()))) { + while (reader.readLine() != null) { + fileLineCount++; + } + } + assertThat(fileLineCount).isEqualTo(TOTAL_RECORDS); + final long fileSizeBytes = Files.size(logFile); + + // ============================================================ + // Phase 2: Iceberg Appender benchmark + // ============================================================ + final Path warehouseDir = tempDir.resolve("warehouse"); + Files.createDirectories(warehouseDir); + manager = new IcebergManager( + "perf_test", + "perf_catalog", + "hadoop", + null, + warehouseDir.toAbsolutePath().toString(), + "perf_ns", + "perf_table", + RECORDS_PER_COMMIT, + 3600); + manager.startup(); + + final long icebergWriteStart = System.nanoTime(); + for (int i = 0; i < TOTAL_RECORDS; i++) { + manager.write(buildEvent(i).toImmutable()); + } + manager.flush(); + final long icebergWriteMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - icebergWriteStart); + final double icebergWriteThroughput = (double) TOTAL_RECORDS / icebergWriteMs * 1000.0; + + manager.table.refresh(); + int snapshotCount = 0; + for (final Snapshot ignored : manager.table.snapshots()) { + snapshotCount++; + } + final int expectedCommits = (TOTAL_RECORDS + RECORDS_PER_COMMIT - 1) / RECORDS_PER_COMMIT; + assertThat(snapshotCount).isEqualTo(expectedCommits); + + // Iceberg read-back + final long icebergReadStart = System.nanoTime(); + final Map levelCounts = new HashMap<>(); + final Set seenMessages = new HashSet<>(); + int totalRead = 0; + + try (CloseableIterable records = + IcebergGenerics.read(manager.table).build()) { + for (final Record record : records) { + final String level = (String) record.getField("level"); + final String message = (String) record.getField("message"); + + assertThat(level).isNotNull(); + assertThat(message).startsWith("perf-message-"); + assertThat(record.getField("logger_name").toString()).startsWith("com.example.perf.Logger"); + assertThat(record.getField("thread_name").toString()).startsWith("perf-thread-"); + assertThat(record.getField("timestamp")).isNotNull(); + assertThat(record.getField("event_date")).isNotNull(); + + levelCounts.merge(level, 1, Integer::sum); + seenMessages.add(message); + totalRead++; + } + } + final long icebergReadMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - icebergReadStart); + final double icebergReadThroughput = (double) totalRead / icebergReadMs * 1000.0; + + assertThat(totalRead).isEqualTo(TOTAL_RECORDS); + assertThat(seenMessages).hasSize(TOTAL_RECORDS); + final int expectedPerLevel = TOTAL_RECORDS / LEVELS.length; + for (final Level level : LEVELS) { + assertThat(levelCounts.get(level.name())) + .as("Count for level %s", level.name()) + .isEqualTo(expectedPerLevel); + } + + long icebergSizeBytes = 0; + try (java.util.stream.Stream paths = Files.walk(warehouseDir)) { + icebergSizeBytes = paths.filter(Files::isRegularFile) + .mapToLong(p -> p.toFile().length()) + .sum(); + } + + // ============================================================ + // Results + // ============================================================ + System.out.println(); + System.out.println("========== Performance Comparison (1M records) =========="); + System.out.println(); + System.out.printf("%-25s %15s %15s%n", "", "File Appender", "Iceberg"); + System.out.printf("%-25s %15s %15s%n", "-------------------------", "---------------", "---------------"); + System.out.printf("%-25s %12d ms %12d ms%n", "Write time", fileWriteMs, icebergWriteMs); + System.out.printf("%-25s %12.0f/s %12.0f/s%n", "Write throughput", fileWriteThroughput, icebergWriteThroughput); + System.out.printf("%-25s %15s %12d ms%n", "Read time", "n/a", icebergReadMs); + System.out.printf("%-25s %15s %12.0f/s%n", "Read throughput", "n/a", icebergReadThroughput); + System.out.printf( + "%-25s %12.1f MB %12.1f MB%n", + "Disk size", fileSizeBytes / (1024.0 * 1024.0), icebergSizeBytes / (1024.0 * 1024.0)); + System.out.printf("%-25s %15d %15d%n", "Commits/flushes", 1, snapshotCount); + System.out.printf("%-25s %12.1fx %15s%n", "Iceberg overhead", (double) icebergWriteMs / fileWriteMs, ""); + System.out.printf("%-25s %12.1fx %15s%n", "Iceberg compression", (double) fileSizeBytes / icebergSizeBytes, ""); + System.out.println(); + System.out.println("Level distribution: " + levelCounts); + System.out.println("All " + TOTAL_RECORDS + " records verified correct in both appenders."); + System.out.println("========================================================="); + } +} From ce018d52194bc00da38d1ff6b8a56a33bbc501e0 Mon Sep 17 00:00:00 2001 From: Gokul Soundararajan Date: Tue, 21 Apr 2026 20:20:30 +0000 Subject: [PATCH 4/5] test: add Iceberg table inspection test for manual exploration Writes 1K log events to a local Hadoop-catalog Iceberg table at /tmp/iceberg-inspect so the Parquet data files and Iceberg metadata can be examined directly on disk. --- .../log4j/iceberg/IcebergInspectTest.java | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergInspectTest.java diff --git a/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergInspectTest.java b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergInspectTest.java new file mode 100644 index 00000000000..9a4eb1ccbcf --- /dev/null +++ b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergInspectTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.iceberg; + +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.impl.Log4jLogEvent; +import org.apache.logging.log4j.message.SimpleMessage; +import org.junit.jupiter.api.Test; + +class IcebergInspectTest { + + @Test + void createInspectableTable() { + final String warehouse = "/tmp/iceberg-inspect"; + final IcebergManager manager = new IcebergManager( + "inspect", "inspect_catalog", "hadoop", null, warehouse, "logs", "app_logs", 500, 3600); + manager.startup(); + + final Level[] levels = {Level.TRACE, Level.DEBUG, Level.INFO, Level.WARN, Level.ERROR}; + for (int i = 0; i < 1000; i++) { + final LogEvent event = Log4jLogEvent.newBuilder() + .setLoggerName("com.example.App") + .setLevel(levels[i % levels.length]) + .setMessage(new SimpleMessage("log message " + i)) + .setThreadName("main") + .setTimeMillis(1700000000000L + i * 1000L) + .build(); + manager.write(event.toImmutable()); + } + + manager.stop(10, TimeUnit.SECONDS); + System.out.println("Iceberg table written to: " + warehouse + "/logs/app_logs"); + System.out.println("Inspect with: find " + warehouse + " -type f"); + } +} From 830f6fd84defd46121723ffd7e0833012d1aa745 Mon Sep 17 00:00:00 2001 From: Gokul Soundararajan Date: Tue, 5 May 2026 21:55:31 +0000 Subject: [PATCH 5/5] feat: add partitioning, schema validation, catalog properties, and comprehensive tests - Partition new tables by event_date (day granularity) for efficient time-range queries and data lifecycle management - Validate schema on startup when loading existing tables, failing fast if required columns are missing - Support arbitrary catalog properties via nested elements for S3 credentials, REST auth, etc. - Document catalogImpl short names (hadoop, hive, rest) in Javadoc - Add IcebergAppenderConfigTest exercising full log4j XML config pipeline (plugin discovery, LogManager.getLogger(), context stop flush) - Add IcebergConcurrencyTest with multi-threaded writer stress tests - Add IcebergScheduledFlushTest verifying timer-based flush fires - Add schema validation and partition spec tests to IcebergManagerTest --- .../log4j/iceberg/IcebergAppender.java | 40 +++- .../logging/log4j/iceberg/IcebergManager.java | 58 ++++- .../iceberg/IcebergAppenderConfigTest.java | 157 +++++++++++++ .../log4j/iceberg/IcebergConcurrencyTest.java | 218 ++++++++++++++++++ .../log4j/iceberg/IcebergManagerTest.java | 85 +++++++ .../iceberg/IcebergScheduledFlushTest.java | 105 +++++++++ .../src/test/resources/log4j2-iceberg-it.xml | 34 +++ 7 files changed, 688 insertions(+), 9 deletions(-) create mode 100644 log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergAppenderConfigTest.java create mode 100644 log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergConcurrencyTest.java create mode 100644 log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergScheduledFlushTest.java create mode 100644 log4j-iceberg/src/test/resources/log4j2-iceberg-it.xml diff --git a/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergAppender.java b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergAppender.java index 50290990466..ed2a976b724 100644 --- a/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergAppender.java +++ b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergAppender.java @@ -16,6 +16,8 @@ */ package org.apache.logging.log4j.iceberg; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.core.Appender; import org.apache.logging.log4j.core.Core; @@ -26,24 +28,39 @@ import org.apache.logging.log4j.core.config.plugins.Plugin; import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute; import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory; +import org.apache.logging.log4j.core.config.plugins.PluginElement; import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; /** * Log4j appender that writes log events as rows in an Apache Iceberg table * backed by Parquet data files. * + *

The table is partitioned by {@code event_date} (day granularity) for efficient + * time-range queries and data lifecycle management.

+ * + *

The {@code catalogImpl} attribute accepts either a short type name + * ({@code "hadoop"}, {@code "hive"}, {@code "rest"}) or a fully qualified class name + * (e.g. {@code "org.apache.iceberg.rest.RESTCatalog"}).

+ * + *

Additional catalog properties (e.g. S3 credentials, REST auth headers) can be + * passed via nested {@code } elements under a {@code } + * wrapper.

+ * *

Configuration example:

*
  * <Iceberg name="IcebergAppender"
  *          catalogName="my_catalog"
- *          catalogImpl="org.apache.iceberg.rest.RESTCatalog"
+ *          catalogImpl="rest"
  *          catalogUri="http://localhost:8181"
  *          catalogWarehouse="s3://my-bucket/warehouse"
  *          tableNamespace="logs"
  *          tableName="app_logs"
  *          batchSize="1000"
  *          flushIntervalSeconds="30">
- *   <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n"/>
+ *   <CatalogProperties>
+ *     <Property name="s3.access-key-id">AKIA...</Property>
+ *     <Property name="s3.secret-access-key">secret</Property>
+ *   </CatalogProperties>
  * </Iceberg>
  * 
*/ @@ -94,7 +111,7 @@ public static class Builder> extends AbstractAppender.Build private String catalogName = "log4j"; @PluginBuilderAttribute - @Required(message = "No catalog implementation class provided") + @Required(message = "No catalog implementation provided (use 'hadoop', 'hive', 'rest', or a fully qualified class name)") private String catalogImpl; @PluginBuilderAttribute @@ -116,6 +133,9 @@ public static class Builder> extends AbstractAppender.Build @PluginBuilderAttribute private int flushIntervalSeconds = 30; + @PluginElement("CatalogProperties") + private Property[] catalogProperties; + public B setCatalogName(final String catalogName) { this.catalogName = catalogName; return asBuilder(); @@ -156,8 +176,19 @@ public B setFlushIntervalSeconds(final int flushIntervalSeconds) { return asBuilder(); } + public B setCatalogProperties(final Property[] catalogProperties) { + this.catalogProperties = catalogProperties; + return asBuilder(); + } + @Override public IcebergAppender build() { + final Map extraProperties = new HashMap<>(); + if (catalogProperties != null) { + for (final Property prop : catalogProperties) { + extraProperties.put(prop.getName(), prop.getValue()); + } + } final IcebergManager manager = new IcebergManager( getName(), catalogName, @@ -167,7 +198,8 @@ public IcebergAppender build() { tableNamespace, tableName, batchSize, - flushIntervalSeconds); + flushIntervalSeconds, + extraProperties); return new IcebergAppender(getName(), getFilter(), isIgnoreExceptions(), null, manager); } } diff --git a/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergManager.java b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergManager.java index 47993c8973a..becfaa02790 100644 --- a/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergManager.java +++ b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergManager.java @@ -20,9 +20,12 @@ import java.time.Instant; import java.time.ZoneOffset; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -32,6 +35,7 @@ import org.apache.iceberg.AppendFiles; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; @@ -65,6 +69,10 @@ class IcebergManager { Types.NestedField.optional(6, "thrown", Types.StringType.get()), Types.NestedField.required(7, "event_date", Types.DateType.get())); + static final PartitionSpec PARTITION_SPEC = PartitionSpec.builderFor(LOG_SCHEMA) + .day("event_date") + .build(); + private final String name; private final String catalogName; private final String catalogImpl; @@ -74,6 +82,7 @@ class IcebergManager { private final String tableName; private final int batchSize; private final int flushIntervalSeconds; + private final Map extraCatalogProperties; private final ReentrantLock lock = new ReentrantLock(); private List buffer; @@ -93,7 +102,8 @@ class IcebergManager { final String tableNamespace, final String tableName, final int batchSize, - final int flushIntervalSeconds) { + final int flushIntervalSeconds, + final Map extraCatalogProperties) { this.name = name; this.catalogName = catalogName; this.catalogImpl = catalogImpl; @@ -103,9 +113,26 @@ class IcebergManager { this.tableName = tableName; this.batchSize = batchSize; this.flushIntervalSeconds = flushIntervalSeconds; + this.extraCatalogProperties = extraCatalogProperties != null + ? Collections.unmodifiableMap(new HashMap<>(extraCatalogProperties)) + : Collections.emptyMap(); this.buffer = new ArrayList<>(batchSize); } + IcebergManager( + final String name, + final String catalogName, + final String catalogImpl, + final String catalogUri, + final String catalogWarehouse, + final String tableNamespace, + final String tableName, + final int batchSize, + final int flushIntervalSeconds) { + this(name, catalogName, catalogImpl, catalogUri, catalogWarehouse, + tableNamespace, tableName, batchSize, flushIntervalSeconds, null); + } + void startup() { if (running) { return; @@ -120,6 +147,7 @@ void startup() { if (catalogWarehouse != null) { catalogProperties.put("warehouse", catalogWarehouse); } + catalogProperties.putAll(extraCatalogProperties); catalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProperties, new Configuration()); final TableIdentifier tableId = TableIdentifier.of(Namespace.of(tableNamespace), tableName); @@ -129,10 +157,11 @@ void startup() { } if (!catalog.tableExists(tableId)) { - table = catalog.createTable(tableId, LOG_SCHEMA); - LOGGER.info("Created Iceberg table {}", tableId); + table = catalog.createTable(tableId, LOG_SCHEMA, PARTITION_SPEC); + LOGGER.info("Created Iceberg table {} partitioned by event_date", tableId); } else { table = catalog.loadTable(tableId); + validateSchema(table.schema()); LOGGER.info("Loaded existing Iceberg table {}", tableId); } @@ -147,6 +176,25 @@ void startup() { running = true; } + void validateSchema(final Schema existingSchema) { + final Set required = new HashSet<>(); + for (final Types.NestedField field : LOG_SCHEMA.columns()) { + required.add(field.name()); + } + final Set missing = new HashSet<>(); + for (final String fieldName : required) { + if (existingSchema.findField(fieldName) == null) { + missing.add(fieldName); + } + } + if (!missing.isEmpty()) { + throw new IllegalStateException( + "Iceberg table " + tableNamespace + "." + tableName + + " is missing required columns: " + missing + + ". Expected schema columns: " + required); + } + } + void write(final LogEvent event) { lock.lock(); try { @@ -240,7 +288,7 @@ private DataFile writeParquetFile(final List events) throws IOExceptio try (DataWriter writer = Parquet.writeData(outputFile) .schema(LOG_SCHEMA) - .withSpec(table.spec()) + .withSpec(PartitionSpec.unpartitioned()) .createWriterFunc(GenericParquetWriter::create) .overwrite() .build()) { @@ -249,7 +297,7 @@ private DataFile writeParquetFile(final List events) throws IOExceptio } } - return org.apache.iceberg.DataFiles.builder(table.spec()) + return org.apache.iceberg.DataFiles.builder(PartitionSpec.unpartitioned()) .withPath(filename) .withFileSizeInBytes(outputFile.toInputFile().getLength()) .withRecordCount(events.size()) diff --git a/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergAppenderConfigTest.java b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergAppenderConfigTest.java new file mode 100644 index 00000000000..e6f6d29a074 --- /dev/null +++ b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergAppenderConfigTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.config.ConfigurationFactory; +import org.apache.logging.log4j.core.test.CoreLoggerContexts; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Integration test that exercises the Iceberg appender through the full log4j + * pipeline: XML config discovery, plugin instantiation, logger calls, flush on + * context stop, and read-back verification from the Iceberg table. + */ +class IcebergAppenderConfigTest { + + private static Path warehouseDir; + + @BeforeAll + static void setUp() throws IOException { + warehouseDir = Files.createTempDirectory("iceberg-it-warehouse"); + System.setProperty("iceberg.warehouse", warehouseDir.toAbsolutePath().toString()); + System.setProperty(ConfigurationFactory.CONFIGURATION_FILE_PROPERTY, "log4j2-iceberg-it.xml"); + } + + @AfterAll + static void tearDown() throws IOException { + System.clearProperty("iceberg.warehouse"); + System.clearProperty(ConfigurationFactory.CONFIGURATION_FILE_PROPERTY); + if (warehouseDir != null) { + Files.walk(warehouseDir) + .sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .forEach(File::delete); + } + } + + @Test + void loggerWritesToIcebergTable() throws IOException { + final Logger logger = LogManager.getLogger("com.example.IcebergIT"); + + logger.debug("debug message from IT"); + logger.info("info message from IT"); + logger.warn("warn message from IT"); + logger.error("error message from IT", new RuntimeException("test exception")); + + CoreLoggerContexts.stopLoggerContext(false); + + final Map catalogProperties = new HashMap<>(); + catalogProperties.put("type", "hadoop"); + catalogProperties.put("warehouse", warehouseDir.toAbsolutePath().toString()); + final Catalog catalog = CatalogUtil.buildIcebergCatalog("it_catalog", catalogProperties, new Configuration()); + final Table table = catalog.loadTable(TableIdentifier.of(Namespace.of("it_ns"), "it_logs")); + table.refresh(); + + final List records = new ArrayList<>(); + try (CloseableIterable iterable = IcebergGenerics.read(table).build()) { + for (final Record record : iterable) { + records.add(record); + } + } + + assertThat(records).hasSize(4); + + assertThat(records.get(0).getField("level")).isEqualTo("DEBUG"); + assertThat(records.get(0).getField("message")).isEqualTo("debug message from IT"); + assertThat(records.get(0).getField("logger_name")).isEqualTo("com.example.IcebergIT"); + + assertThat(records.get(1).getField("level")).isEqualTo("INFO"); + assertThat(records.get(1).getField("message")).isEqualTo("info message from IT"); + + assertThat(records.get(2).getField("level")).isEqualTo("WARN"); + assertThat(records.get(2).getField("message")).isEqualTo("warn message from IT"); + + assertThat(records.get(3).getField("level")).isEqualTo("ERROR"); + assertThat(records.get(3).getField("message")).isEqualTo("error message from IT"); + assertThat((String) records.get(3).getField("thrown")).contains("test exception"); + + for (final Record record : records) { + assertThat(record.getField("timestamp")).isNotNull(); + assertThat(record.getField("event_date")).isNotNull(); + assertThat(record.getField("thread_name")).isNotNull(); + } + } + + @Test + void batchFlushTriggeredByLogVolume() throws IOException { + final Logger logger = LogManager.getLogger("com.example.BatchIT"); + + for (int i = 0; i < 25; i++) { + logger.info("batch message {}", i); + } + + CoreLoggerContexts.stopLoggerContext(false); + + final Map catalogProperties = new HashMap<>(); + catalogProperties.put("type", "hadoop"); + catalogProperties.put("warehouse", warehouseDir.toAbsolutePath().toString()); + final Catalog catalog = CatalogUtil.buildIcebergCatalog("it_catalog", catalogProperties, new Configuration()); + final Table table = catalog.loadTable(TableIdentifier.of(Namespace.of("it_ns"), "it_logs")); + table.refresh(); + + final List records = new ArrayList<>(); + try (CloseableIterable iterable = IcebergGenerics.read(table).build()) { + for (final Record record : iterable) { + records.add(record); + } + } + + // 4 from previous test + 25 from this test = 29 total + // (batchSize=10 so at least 2 auto-flushes happened during the 25 writes) + assertThat(records).hasSizeGreaterThanOrEqualTo(25); + + long batchMessages = records.stream() + .filter(r -> r.getField("logger_name").equals("com.example.BatchIT")) + .count(); + assertThat(batchMessages).isEqualTo(25); + } +} diff --git a/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergConcurrencyTest.java b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergConcurrencyTest.java new file mode 100644 index 00000000000..0ffb94a2b83 --- /dev/null +++ b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergConcurrencyTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.impl.Log4jLogEvent; +import org.apache.logging.log4j.message.SimpleMessage; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class IcebergConcurrencyTest { + + private Path warehouseDir; + private IcebergManager manager; + + @BeforeEach + void setUp() throws IOException { + warehouseDir = Files.createTempDirectory("iceberg-concurrency-test"); + } + + @AfterEach + void tearDown() throws IOException { + if (manager != null) { + manager.stop(10, TimeUnit.SECONDS); + } + Files.walk(warehouseDir) + .sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .forEach(File::delete); + } + + @Test + void multipleThreadsWriteConcurrently() throws Exception { + final int threadCount = 4; + final int eventsPerThread = 1000; + final int totalEvents = threadCount * eventsPerThread; + + manager = new IcebergManager( + "concurrency", + "test_catalog", + "hadoop", + null, + warehouseDir.toAbsolutePath().toString(), + "test_ns", + "concurrent_table", + 100, + 3600); + manager.startup(); + + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(threadCount); + final AtomicInteger errorCount = new AtomicInteger(0); + + final ExecutorService executor = Executors.newFixedThreadPool(threadCount); + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + executor.submit(() -> { + try { + startLatch.await(); + for (int i = 0; i < eventsPerThread; i++) { + final LogEvent event = Log4jLogEvent.newBuilder() + .setLoggerName("com.example.Thread" + threadId) + .setLevel(Level.INFO) + .setMessage(new SimpleMessage("thread-" + threadId + "-msg-" + i)) + .setThreadName("writer-" + threadId) + .setTimeMillis(System.currentTimeMillis()) + .build(); + manager.write(event.toImmutable()); + } + } catch (final Exception e) { + errorCount.incrementAndGet(); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + assertThat(doneLatch.await(30, TimeUnit.SECONDS)).isTrue(); + executor.shutdown(); + + assertThat(errorCount.get()).isZero(); + + manager.flush(); + manager.table.refresh(); + + final List records = new ArrayList<>(); + try (CloseableIterable iterable = IcebergGenerics.read(manager.table).build()) { + for (final Record record : iterable) { + records.add(record); + } + } + + assertThat(records).hasSize(totalEvents); + + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + long count = records.stream() + .filter(r -> r.getField("logger_name").equals("com.example.Thread" + threadId)) + .count(); + assertThat(count).as("Events from thread %d", threadId).isEqualTo(eventsPerThread); + } + } + + @Test + void concurrentWritesAndFlushes() throws Exception { + final int writerCount = 3; + final int eventsPerWriter = 500; + final int totalEvents = writerCount * eventsPerWriter; + + manager = new IcebergManager( + "flush_concurrency", + "test_catalog", + "hadoop", + null, + warehouseDir.toAbsolutePath().toString(), + "test_ns", + "flush_concurrent_table", + 50, + 3600); + manager.startup(); + + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(writerCount + 1); + final AtomicInteger errorCount = new AtomicInteger(0); + + final ExecutorService executor = Executors.newFixedThreadPool(writerCount + 1); + + for (int t = 0; t < writerCount; t++) { + final int threadId = t; + executor.submit(() -> { + try { + startLatch.await(); + for (int i = 0; i < eventsPerWriter; i++) { + final LogEvent event = Log4jLogEvent.newBuilder() + .setLoggerName("com.example.Writer" + threadId) + .setLevel(Level.DEBUG) + .setMessage(new SimpleMessage("concurrent-" + threadId + "-" + i)) + .setThreadName("writer-" + threadId) + .setTimeMillis(System.currentTimeMillis()) + .build(); + manager.write(event.toImmutable()); + } + } catch (final Exception e) { + errorCount.incrementAndGet(); + } finally { + doneLatch.countDown(); + } + }); + } + + // Explicit flusher thread competing with batch-triggered flushes + executor.submit(() -> { + try { + startLatch.await(); + for (int i = 0; i < 20; i++) { + Thread.sleep(10); + manager.flush(); + } + } catch (final Exception e) { + errorCount.incrementAndGet(); + } finally { + doneLatch.countDown(); + } + }); + + startLatch.countDown(); + assertThat(doneLatch.await(30, TimeUnit.SECONDS)).isTrue(); + executor.shutdown(); + + assertThat(errorCount.get()).isZero(); + + manager.flush(); + manager.table.refresh(); + + final List records = new ArrayList<>(); + try (CloseableIterable iterable = IcebergGenerics.read(manager.table).build()) { + for (final Record record : iterable) { + records.add(record); + } + } + + assertThat(records).hasSize(totalEvents); + } +} diff --git a/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergManagerTest.java b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergManagerTest.java index 8797aac3900..50d431c7f4d 100644 --- a/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergManagerTest.java +++ b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergManagerTest.java @@ -515,4 +515,89 @@ void commitRetryInterruptedCrashes() { Thread.interrupted(); manager.table = original; } + + @Test + void newTableIsPartitionedByEventDate() { + manager = createManager("partitioned_table", 100); + manager.startup(); + + assertThat(manager.table.spec().isPartitioned()).isTrue(); + assertThat(manager.table.spec().fields()).hasSize(1); + assertThat(manager.table.spec().fields().get(0).name()).isEqualTo("event_date_day"); + } + + @Test + void validateSchemaPassesWithValidTable() { + manager = createManager("valid_schema", 100); + manager.startup(); + + // Should not throw — table was just created with LOG_SCHEMA + manager.validateSchema(manager.table.schema()); + } + + @Test + void validateSchemaFailsWithMissingColumns() { + manager = createManager("missing_cols", 100); + manager.startup(); + + final org.apache.iceberg.Schema incompleteSchema = new org.apache.iceberg.Schema( + org.apache.iceberg.types.Types.NestedField.required(1, "timestamp", + org.apache.iceberg.types.Types.TimestampType.withZone()), + org.apache.iceberg.types.Types.NestedField.required(2, "level", + org.apache.iceberg.types.Types.StringType.get())); + + assertThatThrownBy(() -> manager.validateSchema(incompleteSchema)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("missing required columns"); + } + + @Test + void existingTableWithValidSchemaLoadsSuccessfully() { + // Create table first + manager = createManager("existing_valid", 100); + manager.startup(); + manager.stop(5, TimeUnit.SECONDS); + + // Re-open — should load existing table and pass validation + manager = createManager("existing_valid", 100); + manager.startup(); + assertThat(manager.table).isNotNull(); + } + + @Test + void extraCatalogPropertiesArePassedThrough() { + final java.util.Map extraProps = new java.util.HashMap<>(); + extraProps.put("custom.property", "custom-value"); + + manager = new IcebergManager( + "test", + "test_catalog", + "hadoop", + null, + warehouseDir.toAbsolutePath().toString(), + "test_ns", + "extra_props_table", + 100, + 3600, + extraProps); + manager.startup(); + assertThat(manager.table).isNotNull(); + } + + @Test + void nullExtraCatalogPropertiesHandledGracefully() { + manager = new IcebergManager( + "test", + "test_catalog", + "hadoop", + null, + warehouseDir.toAbsolutePath().toString(), + "test_ns", + "null_extra_props", + 100, + 3600, + null); + manager.startup(); + assertThat(manager.table).isNotNull(); + } } diff --git a/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergScheduledFlushTest.java b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergScheduledFlushTest.java new file mode 100644 index 00000000000..452ac806f54 --- /dev/null +++ b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergScheduledFlushTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.impl.Log4jLogEvent; +import org.apache.logging.log4j.message.SimpleMessage; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class IcebergScheduledFlushTest { + + private Path warehouseDir; + private IcebergManager manager; + + @BeforeEach + void setUp() throws IOException { + warehouseDir = Files.createTempDirectory("iceberg-scheduled-flush-test"); + } + + @AfterEach + void tearDown() throws IOException { + if (manager != null) { + manager.stop(10, TimeUnit.SECONDS); + } + Files.walk(warehouseDir) + .sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .forEach(File::delete); + } + + @Test + void scheduledFlushCommitsWithoutManualTrigger() throws Exception { + // flushIntervalSeconds=1 so the timer fires every second + // batchSize=10000 so batch-triggered flush won't happen with just 5 events + manager = new IcebergManager( + "scheduled", + "test_catalog", + "hadoop", + null, + warehouseDir.toAbsolutePath().toString(), + "test_ns", + "scheduled_table", + 10000, + 1); + manager.startup(); + + for (int i = 0; i < 5; i++) { + final LogEvent event = Log4jLogEvent.newBuilder() + .setLoggerName("com.example.Scheduled") + .setLevel(Level.INFO) + .setMessage(new SimpleMessage("scheduled-msg-" + i)) + .setThreadName("main") + .setTimeMillis(System.currentTimeMillis()) + .build(); + manager.write(event.toImmutable()); + } + + // Wait for the scheduled flush to fire (interval is 1s, wait up to 3s) + long deadline = System.currentTimeMillis() + 3000; + int recordCount = 0; + while (System.currentTimeMillis() < deadline) { + Thread.sleep(200); + manager.table.refresh(); + recordCount = 0; + try (CloseableIterable records = IcebergGenerics.read(manager.table).build()) { + for (final Record ignored : records) { + recordCount++; + } + } + if (recordCount == 5) { + break; + } + } + + assertThat(recordCount).isEqualTo(5); + } +} diff --git a/log4j-iceberg/src/test/resources/log4j2-iceberg-it.xml b/log4j-iceberg/src/test/resources/log4j2-iceberg-it.xml new file mode 100644 index 00000000000..1ec024b1236 --- /dev/null +++ b/log4j-iceberg/src/test/resources/log4j2-iceberg-it.xml @@ -0,0 +1,34 @@ + + + + + + + + + + + +