diff --git a/audit-server/audit-dispatcher/dispatcher-app/pom.xml b/audit-server/audit-dispatcher/dispatcher-app/pom.xml
index d18d487e38..161256d346 100644
--- a/audit-server/audit-dispatcher/dispatcher-app/pom.xml
+++ b/audit-server/audit-dispatcher/dispatcher-app/pom.xml
@@ -85,6 +85,12 @@
${project.version}
provided
+
+ org.apache.ranger
+ audit-dispatcher-opensearch
+ ${project.version}
+ provided
+
org.apache.ranger
audit-dispatcher-solr
diff --git a/audit-server/audit-dispatcher/dispatcher-common/pom.xml b/audit-server/audit-dispatcher/dispatcher-common/pom.xml
index 3b717af732..2bba520102 100644
--- a/audit-server/audit-dispatcher/dispatcher-common/pom.xml
+++ b/audit-server/audit-dispatcher/dispatcher-common/pom.xml
@@ -147,6 +147,20 @@
spring-context
${springframework.version}
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ ${junit.jupiter.version}
+ test
+
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/pom.xml b/audit-server/audit-dispatcher/dispatcher-opensearch/pom.xml
new file mode 100644
index 0000000000..715c4dafb1
--- /dev/null
+++ b/audit-server/audit-dispatcher/dispatcher-opensearch/pom.xml
@@ -0,0 +1,210 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.ranger
+ ranger
+ 3.0.0-SNAPSHOT
+ ../../..
+
+
+ audit-dispatcher-opensearch
+ jar
+ Ranger Audit Dispatcher OpenSearch
+ Kafka dispatcher service for indexing audits into OpenSearch/ElasticSearch
+
+
+ UTF-8
+
+
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${fasterxml.jackson.version}
+
+
+
+ org.apache.commons
+ commons-lang3
+ ${commons.lang3.version}
+
+
+
+
+ org.apache.httpcomponents
+ httpasyncclient
+ ${httpcomponents.httpasyncclient.version}
+
+
+ commons-logging
+ *
+
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ ${httpcomponents.httpclient.version}
+
+
+ commons-logging
+ *
+
+
+
+
+ org.apache.httpcomponents
+ httpcore
+ ${httpcomponents.httpcore.version}
+
+
+ org.apache.httpcomponents
+ httpcore-nio
+ ${httpcomponents.httpcore.version}
+
+
+
+
+ org.apache.ranger
+ ranger-audit-dest-es
+ ${project.version}
+
+
+ org.apache.hadoop
+ hadoop-client-api
+
+
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+ provided
+
+
+ log4j
+ *
+
+
+ org.slf4j
+ *
+
+
+
+
+ org.apache.ranger
+ ranger-audit-core
+ ${project.version}
+ provided
+
+
+ org.apache.hadoop
+ hadoop-client-api
+
+
+
+
+ org.apache.ranger
+ ranger-audit-dispatcher-common
+ ${project.version}
+ provided
+
+
+ org.apache.hadoop
+ hadoop-client-api
+
+
+
+
+ org.apache.ranger
+ ranger-audit-server-common
+ ${project.version}
+ provided
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+ provided
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ ${junit.jupiter.version}
+ test
+
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
+
+ org.mockito
+ mockito-junit-jupiter
+ ${mockito.version}
+ test
+
+
+
+
+ audit-dispatcher-opensearch-${project.version}
+
+
+ true
+ src/main/resources
+
+
+
+
+ org.apache.maven.plugins
+ maven-pmd-plugin
+
+
+ ${project.parent.basedir}/dev-support/ranger-pmd-ruleset.xml
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ copy-dependencies
+
+ copy-dependencies
+
+ package
+
+ ${project.build.directory}/lib
+ runtime
+
+
+
+
+
+
+
diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/AuditEventOpenSearchDocMapper.java b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/AuditEventOpenSearchDocMapper.java
new file mode 100644
index 0000000000..1a3cc76e7c
--- /dev/null
+++ b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/AuditEventOpenSearchDocMapper.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.dispatcher;
+
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+
+public final class AuditEventOpenSearchDocMapper {
+ private static final ThreadLocal DATE_FORMAT = ThreadLocal.withInitial(() -> {
+ SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+ format.setTimeZone(TimeZone.getTimeZone("UTC"));
+ return format;
+ });
+
+ private AuditEventOpenSearchDocMapper() {
+ }
+
+ public static Map toDoc(final AuthzAuditEvent auditEvent) {
+ Map doc = new HashMap<>();
+
+ doc.put("id", auditEvent.getEventId());
+ doc.put("access", auditEvent.getAccessType());
+ doc.put("enforcer", auditEvent.getAclEnforcer());
+ doc.put("agent", auditEvent.getAgentId());
+ doc.put("repo", auditEvent.getRepositoryName());
+ doc.put("sess", auditEvent.getSessionId());
+ doc.put("reqUser", auditEvent.getUser());
+ doc.put("reqData", auditEvent.getRequestData());
+ doc.put("resource", auditEvent.getResourcePath());
+ doc.put("cliIP", auditEvent.getClientIP());
+ doc.put("cliType", auditEvent.getClientType());
+ doc.put("logType", auditEvent.getLogType());
+ doc.put("result", auditEvent.getAccessResult());
+ doc.put("policy", auditEvent.getPolicyId());
+ doc.put("repoType", auditEvent.getRepositoryType());
+ doc.put("resType", auditEvent.getResourceType());
+ doc.put("reason", auditEvent.getResultReason());
+ doc.put("action", auditEvent.getAction());
+
+ Date eventTime = auditEvent.getEventTime();
+ doc.put("evtTime", eventTime != null ? DATE_FORMAT.get().format(eventTime) : null);
+
+ doc.put("seq_num", auditEvent.getSeqNum());
+ doc.put("event_count", auditEvent.getEventCount());
+ doc.put("event_dur_ms", auditEvent.getEventDurationMS());
+ doc.put("tags", auditEvent.getTags());
+ doc.put("datasets", auditEvent.getDatasets());
+ doc.put("projects", auditEvent.getProjects());
+ doc.put("datasetIds", auditEvent.getDatasetIds());
+ doc.put("cluster", auditEvent.getClusterName());
+ doc.put("zoneName", auditEvent.getZoneName());
+ doc.put("agentHost", auditEvent.getAgentHostname());
+ doc.put("policyVersion", auditEvent.getPolicyVersion());
+ doc.put("additionalInfo", auditEvent.getAdditionalInfo());
+
+ return doc;
+ }
+}
diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/OpenSearchDispatcherManager.java b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/OpenSearchDispatcherManager.java
new file mode 100644
index 0000000000..b078b34b8f
--- /dev/null
+++ b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/OpenSearchDispatcherManager.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.dispatcher;
+
+import org.apache.ranger.audit.dispatcher.kafka.AuditDispatcher;
+import org.apache.ranger.audit.dispatcher.kafka.AuditDispatcherTracker;
+import org.apache.ranger.audit.dispatcher.kafka.AuditOpenSearchDispatcher;
+import org.apache.ranger.audit.provider.MiscUtil;
+import org.apache.ranger.audit.server.AuditServerConstants;
+import org.apache.ranger.audit.utils.AuditServerLogFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+public final class OpenSearchDispatcherManager {
+ private static final Logger LOG = LoggerFactory.getLogger(OpenSearchDispatcherManager.class);
+ private static final String CONFIG_DISPATCHER_TYPE = AuditServerConstants.PROP_DISPATCHER_TYPE;
+ private static final String TYPE_OPENSEARCH = "opensearch";
+ private static final String ES_DEST_PROP = "xasecure.audit.destination.elasticsearch";
+ private static final int MAX_INIT_ATTEMPTS = 5;
+ private static final long INIT_RETRY_MS = 5000L;
+ private static final long SHUTDOWN_WAIT_MS = 10000L;
+
+ private final AuditDispatcherTracker tracker = AuditDispatcherTracker.getInstance();
+ private AuditDispatcher dispatcher;
+ private Thread dispatcherThread;
+
+ public void init(final Properties props) {
+ LOG.info("==> OpenSearchDispatcherManager.init()");
+
+ String dispatcherType = System.getProperty(CONFIG_DISPATCHER_TYPE);
+ if (dispatcherType != null && !dispatcherType.equalsIgnoreCase(TYPE_OPENSEARCH)) {
+ LOG.info("Skipping OpenSearchDispatcherManager initialization since dispatcher type is {}", dispatcherType);
+ return;
+ }
+
+ try {
+ if (props == null) {
+ LOG.error("Configuration properties are null");
+ throw new RuntimeException("Failed to load configuration");
+ }
+
+ boolean isEnabled = MiscUtil.getBooleanProperty(props, ES_DEST_PROP, false);
+ if (!isEnabled) {
+ String clsName = MiscUtil.getStringProperty(props, AuditServerConstants.PROP_DISPATCHER_CLASS);
+ if (clsName != null && clsName.contains("AuditOpenSearchDispatcher")) {
+ isEnabled = true;
+ }
+ }
+
+ if (!isEnabled) {
+ LOG.warn("OpenSearch destination is disabled ({}=false). No dispatchers will be created.", ES_DEST_PROP);
+ return;
+ }
+
+ initializeDispatcher(props, AuditServerConstants.PROP_DISPATCHER_PREFIX);
+
+ if (dispatcher == null) {
+ throw new RuntimeException("No OpenSearch dispatcher was created. Verify that " + ES_DEST_PROP + "=true and classes are configured correctly.");
+ } else {
+ LOG.info("Created OpenSearch dispatcher");
+
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ LOG.info("JVM shutdown detected, stopping OpenSearchDispatcherManager");
+ shutdown();
+ }, "OpenSearchDispatcher-ShutdownHook"));
+
+ startDispatcher();
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to initialize OpenSearchDispatcherManager", e);
+ throw new RuntimeException("Failed to initialize OpenSearchDispatcherManager", e);
+ }
+
+ LOG.info("<== OpenSearchDispatcherManager.init()");
+ }
+
+ private void initializeDispatcher(final Properties props, final String propPrefix) {
+ LOG.info("==> OpenSearchDispatcherManager.initializeDispatcher()");
+
+ String clsStr = MiscUtil.getStringProperty(props, AuditServerConstants.PROP_DISPATCHER_CLASS, AuditOpenSearchDispatcher.class.getName());
+ String className = clsStr.split(",")[0].trim();
+
+ if (className.isEmpty()) {
+ LOG.error("Dispatcher class name is empty");
+ return;
+ }
+
+ long retryDelay = INIT_RETRY_MS;
+
+ for (int attempt = 1; attempt <= MAX_INIT_ATTEMPTS; attempt++) {
+ try {
+ Class> cls = Class.forName(className);
+ dispatcher = (AuditDispatcher) cls.getConstructor(Properties.class, String.class).newInstance(props, propPrefix);
+ tracker.addActiveDispatcher(TYPE_OPENSEARCH, dispatcher);
+ LOG.info("Successfully initialized dispatcher class: {}", cls.getName());
+ break;
+ } catch (ClassNotFoundException e) {
+ LOG.error("Dispatcher class not found: {}. Ensure the class is on the classpath.", className, e);
+ break;
+ } catch (Exception e) {
+ if (attempt < MAX_INIT_ATTEMPTS) {
+ LOG.warn("Dispatcher init attempt {}/{} failed, retrying in {}ms...", attempt, MAX_INIT_ATTEMPTS, retryDelay, e);
+ try {
+ Thread.sleep(retryDelay);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ retryDelay *= 2;
+ } else {
+ LOG.error("Error initializing dispatcher class after {} attempts: {}", MAX_INIT_ATTEMPTS, className, e);
+ }
+ }
+ }
+
+ LOG.info("<== OpenSearchDispatcherManager.initializeDispatcher()");
+ }
+
+ private void startDispatcher() {
+ LOG.info("==> OpenSearchDispatcherManager.startDispatcher()");
+
+ logStartupBanner();
+
+ if (dispatcher != null) {
+ try {
+ String name = dispatcher.getClass().getSimpleName();
+ dispatcherThread = new Thread(dispatcher, name);
+ dispatcherThread.setDaemon(true);
+ dispatcherThread.start();
+ LOG.info("Started {} thread [Thread-ID: {}, Thread-Name: '{}']", name, dispatcherThread.getId(), dispatcherThread.getName());
+ } catch (Exception e) {
+ LOG.error("Error starting dispatcher: {}", dispatcher.getClass().getSimpleName(), e);
+ }
+ }
+
+ LOG.info("<== OpenSearchDispatcherManager.startDispatcher()");
+ }
+
+ private void logStartupBanner() {
+ LOG.info("########## OPENSEARCH DISPATCHER SERVICE STARTUP ##########");
+
+ if (dispatcher == null) {
+ LOG.warn("WARNING: No OpenSearch dispatchers are enabled!");
+ LOG.warn("Verify: {}=true in configuration", ES_DEST_PROP);
+ } else {
+ AuditServerLogFormatter.LogBuilder builder = AuditServerLogFormatter.builder("OpenSearch Dispatcher Status");
+ String type = dispatcher.getClass().getSimpleName();
+ builder.add(type, "ENABLED");
+ builder.add("Topic", dispatcher.getTopicName());
+ builder.logInfo(LOG);
+ LOG.info("Starting OpenSearch dispatcher thread...");
+ }
+
+ LOG.info("##########################################################");
+ }
+
+ public void shutdown() {
+ LOG.info("==> OpenSearchDispatcherManager.shutdown()");
+
+ if (dispatcher != null) {
+ try {
+ LOG.info("Shutting down dispatcher: {}", dispatcher.getClass().getSimpleName());
+ dispatcher.shutdown();
+ LOG.info("Dispatcher shutdown completed: {}", dispatcher.getClass().getSimpleName());
+ } catch (Exception e) {
+ LOG.error("Error shutting down dispatcher: {}", dispatcher.getClass().getSimpleName(), e);
+ }
+ }
+
+ if (dispatcherThread != null && dispatcherThread.isAlive()) {
+ try {
+ LOG.info("Waiting for thread to terminate: {}", dispatcherThread.getName());
+ dispatcherThread.join(SHUTDOWN_WAIT_MS);
+ if (dispatcherThread.isAlive()) {
+ LOG.warn("Thread did not terminate within {}ms: {}", SHUTDOWN_WAIT_MS, dispatcherThread.getName());
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for thread to terminate: {}", dispatcherThread.getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ dispatcher = null;
+ dispatcherThread = null;
+ tracker.clearActiveDispatcher(TYPE_OPENSEARCH);
+
+ LOG.info("<== OpenSearchDispatcherManager.shutdown() - OpenSearch dispatcher stopped");
+ }
+}
diff --git a/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/kafka/AuditOpenSearchDispatcher.java b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/kafka/AuditOpenSearchDispatcher.java
new file mode 100644
index 0000000000..c073ca11fc
--- /dev/null
+++ b/audit-server/audit-dispatcher/dispatcher-opensearch/src/main/java/org/apache/ranger/audit/dispatcher/kafka/AuditOpenSearchDispatcher.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.dispatcher.kafka;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpStatus;
+import org.apache.http.auth.AuthSchemeProvider;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.AuthSchemes;
+import org.apache.http.config.Lookup;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.impl.auth.SPNegoSchemeFactory;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.http.util.EntityUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.ranger.audit.dispatcher.AuditEventOpenSearchDocMapper;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import org.apache.ranger.audit.provider.MiscUtil;
+import org.apache.ranger.audit.server.AuditServerConstants;
+import org.apache.ranger.audit.utils.AuditServerLogFormatter;
+import org.apache.ranger.authorization.credutils.CredentialsProviderUtil;
+import org.apache.ranger.authorization.credutils.kerberos.KerberosCredentialsProvider;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public class AuditOpenSearchDispatcher extends AuditDispatcherBase {
+ private static final Logger LOG = LoggerFactory.getLogger(AuditOpenSearchDispatcher.class);
+ private static final String DEFAULT_GROUP = "ranger_audit_opensearch_dispatcher_group";
+ private static final String DEFAULT_INDEX = "ranger_audits";
+ private static final long RETRY_SLEEP_MS = 5000L;
+ private static final int DEFAULT_PORT = 9200;
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final TypeReference