diff --git a/bin/storm-autocreds-fetch b/bin/storm-autocreds-fetch new file mode 100755 index 00000000000..d585be72b69 --- /dev/null +++ b/bin/storm-autocreds-fetch @@ -0,0 +1,134 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Fetch the storm-autocreds plugin and its (Hadoop/HBase) runtime dependencies +# into the daemon classpath, so Nimbus/Supervisor can populate and renew HDFS +# and HBase delegation tokens on a secure (Kerberos) cluster. +# +# These jars are intentionally NOT bundled in the binary distribution to keep it +# small; only secure-Hadoop deployments need them. See +# external/storm-autocreds/README.md for details. + +set -euo pipefail + +usage() { + cat <<'EOF' +Usage: storm-autocreds-fetch [options] [-- ] + +Resolves org.apache.storm:storm-autocreds and its runtime dependencies from a +Maven repository (Maven Central by default) and copies them into the Storm +daemon classpath directory (extlib-daemon). + +Options: + --version Storm version to fetch (default: read from $STORM_HOME/RELEASE) + --dest Target directory (default: $STORM_HOME/extlib-daemon) + -h, --help Show this help + +Any arguments after "--" are passed through to Maven, e.g. to use an internal +mirror or an offline local repository: + storm-autocreds-fetch -- -s /path/settings.xml + storm-autocreds-fetch -- -Dmaven.repo.local=/path/to/offline-repo -o +EOF +} + +# Resolve symlinks so STORM_HOME is correct even when invoked via a link. +PRG="${0}" +while [ -h "${PRG}" ]; do + ls=$(ls -ld "${PRG}") + link=$(expr "${ls}" : '.*-> \(.*\)$') + if expr "${link}" : '/.*' > /dev/null; then + PRG="${link}" + else + PRG="$(dirname "${PRG}")/${link}" + fi +done +STORM_BIN_DIR=$(dirname "${PRG}") +STORM_HOME=$(cd "${STORM_BIN_DIR}/.." && pwd) + +VERSION="" +DEST="" +MVN_ARGS=() +while [ $# -gt 0 ]; do + case "${1}" in + --version) VERSION="${2}"; shift 2 ;; + --dest) DEST="${2}"; shift 2 ;; + -h|--help) usage; exit 0 ;; + --) shift; MVN_ARGS=("$@"); break ;; + *) echo "Unknown option: ${1}" >&2; usage; exit 1 ;; + esac +done + +if [ -z "${VERSION}" ]; then + if [ -f "${STORM_HOME}/RELEASE" ]; then + VERSION=$(tr -d '[:space:]' < "${STORM_HOME}/RELEASE") + fi +fi +if [ -z "${VERSION}" ]; then + echo "Error: could not determine Storm version. Pass --version ." >&2 + exit 1 +fi + +if [ -z "${DEST}" ]; then + DEST="${STORM_HOME}/extlib-daemon" +fi + +MVN="${MAVEN_HOME:+${MAVEN_HOME}/bin/}mvn" +if ! command -v "${MVN}" > /dev/null 2>&1; then + echo "Error: '${MVN}' not found on PATH. Install Apache Maven or set MAVEN_HOME." >&2 + exit 1 +fi + +mkdir -p "${DEST}" + +# Use a throwaway POM that depends on storm-autocreds; copy-dependencies then +# pulls the exact runtime closure (honoring the exclusions declared in the +# published storm-autocreds POM). storm-client is 'provided' there and is +# correctly skipped, since it already ships in lib/. +TMP_DIR=$(mktemp -d) +trap 'rm -rf "${TMP_DIR}"' EXIT +cat > "${TMP_DIR}/pom.xml" < + 4.0.0 + org.apache.storm.tools + storm-autocreds-fetch + ${VERSION} + pom + + + org.apache.storm + storm-autocreds + ${VERSION} + + + +EOF + +echo "Fetching org.apache.storm:storm-autocreds:${VERSION} (runtime closure) into:" +echo " ${DEST}" +"${MVN}" -q -f "${TMP_DIR}/pom.xml" \ + org.apache.maven.plugins:maven-dependency-plugin:copy-dependencies \ + -DincludeScope=runtime \ + -DoutputDirectory="${DEST}" \ + ${MVN_ARGS[@]+"${MVN_ARGS[@]}"} + +echo "Done. ${DEST} now contains:" +ls -1 "${DEST}" | sed 's/^/ /' +echo +echo "Restart the Storm daemons (Nimbus, Supervisor) to pick up the new classpath," +echo "then configure the autocreds plugins in storm.yaml. See" +echo "external/storm-autocreds/README.md for the required settings." diff --git a/bin/storm-kafka-monitor b/bin/storm-kafka-monitor index 9bd11054cb5..a586c1d8d58 100755 --- a/bin/storm-kafka-monitor +++ b/bin/storm-kafka-monitor @@ -49,4 +49,11 @@ if [ -z "$JAVA_HOME" ]; then else JAVA="$JAVA_HOME/bin/java" fi -exec $JAVA $STORM_JAAS_CONF_PARAM $STORM_JAR_JVM_OPTS -cp "$STORM_BASE_DIR/lib-tools/storm-kafka-monitor/*" org.apache.storm.kafka.monitor.KafkaOffsetLagUtil "$@" +# The storm-kafka-monitor jars are not bundled in the distribution; they are fetched on demand. +KAFKA_MONITOR_LIB="$STORM_BASE_DIR/lib-tools/storm-kafka-monitor" +if ! ls "$KAFKA_MONITOR_LIB"/*.jar >/dev/null 2>&1; then + echo "storm-kafka-monitor is not installed (no jars in $KAFKA_MONITOR_LIB)." >&2 + echo "Run '$STORM_BIN_DIR/storm-kafka-monitor-fetch' to download it, then retry." >&2 + exit 1 +fi +exec $JAVA $STORM_JAAS_CONF_PARAM $STORM_JAR_JVM_OPTS -cp "$KAFKA_MONITOR_LIB/*" org.apache.storm.kafka.monitor.KafkaOffsetLagUtil "$@" diff --git a/bin/storm-kafka-monitor-fetch b/bin/storm-kafka-monitor-fetch new file mode 100755 index 00000000000..573312d87ef --- /dev/null +++ b/bin/storm-kafka-monitor-fetch @@ -0,0 +1,133 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Fetch the storm-kafka-monitor tool and its (Kafka client) runtime dependencies +# into lib-tools/storm-kafka-monitor, enabling the "Kafka spout lag" display in +# the Storm UI and the bin/storm-kafka-monitor command. +# +# These jars are intentionally NOT bundled in the binary distribution to keep it +# small; they are only needed when running Kafka spouts and wanting lag info. The +# UI degrades gracefully when they are absent. See +# external/storm-kafka-monitor/README.md for details. + +set -euo pipefail + +usage() { + cat <<'EOF' +Usage: storm-kafka-monitor-fetch [options] [-- ] + +Resolves org.apache.storm:storm-kafka-monitor and its runtime dependencies from +a Maven repository (Maven Central by default) and copies them into +lib-tools/storm-kafka-monitor. + +Options: + --version Storm version to fetch (default: read from $STORM_HOME/RELEASE) + --dest Target directory (default: $STORM_HOME/lib-tools/storm-kafka-monitor) + -h, --help Show this help + +Any arguments after "--" are passed through to Maven, e.g. to use an internal +mirror or an offline local repository: + storm-kafka-monitor-fetch -- -s /path/settings.xml + storm-kafka-monitor-fetch -- -Dmaven.repo.local=/path/to/offline-repo -o +EOF +} + +# Resolve symlinks so STORM_HOME is correct even when invoked via a link. +PRG="${0}" +while [ -h "${PRG}" ]; do + ls=$(ls -ld "${PRG}") + link=$(expr "${ls}" : '.*-> \(.*\)$') + if expr "${link}" : '/.*' > /dev/null; then + PRG="${link}" + else + PRG="$(dirname "${PRG}")/${link}" + fi +done +STORM_BIN_DIR=$(dirname "${PRG}") +STORM_HOME=$(cd "${STORM_BIN_DIR}/.." && pwd) + +VERSION="" +DEST="" +MVN_ARGS=() +while [ $# -gt 0 ]; do + case "${1}" in + --version) VERSION="${2}"; shift 2 ;; + --dest) DEST="${2}"; shift 2 ;; + -h|--help) usage; exit 0 ;; + --) shift; MVN_ARGS=("$@"); break ;; + *) echo "Unknown option: ${1}" >&2; usage; exit 1 ;; + esac +done + +if [ -z "${VERSION}" ]; then + if [ -f "${STORM_HOME}/RELEASE" ]; then + VERSION=$(tr -d '[:space:]' < "${STORM_HOME}/RELEASE") + fi +fi +if [ -z "${VERSION}" ]; then + echo "Error: could not determine Storm version. Pass --version ." >&2 + exit 1 +fi + +if [ -z "${DEST}" ]; then + DEST="${STORM_HOME}/lib-tools/storm-kafka-monitor" +fi + +MVN="${MAVEN_HOME:+${MAVEN_HOME}/bin/}mvn" +if ! command -v "${MVN}" > /dev/null 2>&1; then + echo "Error: '${MVN}' not found on PATH. Install Apache Maven or set MAVEN_HOME." >&2 + exit 1 +fi + +mkdir -p "${DEST}" + +# Use a throwaway POM that depends on storm-kafka-monitor; copy-dependencies then +# pulls the exact runtime closure. The artifact itself is a direct dependency and +# is therefore copied too. +TMP_DIR=$(mktemp -d) +trap 'rm -rf "${TMP_DIR}"' EXIT +cat > "${TMP_DIR}/pom.xml" < + 4.0.0 + org.apache.storm.tools + storm-kafka-monitor-fetch + ${VERSION} + pom + + + org.apache.storm + storm-kafka-monitor + ${VERSION} + + + +EOF + +echo "Fetching org.apache.storm:storm-kafka-monitor:${VERSION} (runtime closure) into:" +echo " ${DEST}" +"${MVN}" -q -f "${TMP_DIR}/pom.xml" \ + org.apache.maven.plugins:maven-dependency-plugin:copy-dependencies \ + -DincludeScope=runtime \ + -DoutputDirectory="${DEST}" \ + ${MVN_ARGS[@]+"${MVN_ARGS[@]}"} + +echo "Done. ${DEST} now contains:" +ls -1 "${DEST}" | sed 's/^/ /' +echo +echo "Restart the Storm UI to enable Kafka spout lag display, or run" +echo "bin/storm-kafka-monitor directly. See external/storm-kafka-monitor/README.md." diff --git a/bin/storm.py b/bin/storm.py index 81d6e4e4d8e..eb1afa2f85f 100755 --- a/bin/storm.py +++ b/bin/storm.py @@ -107,6 +107,7 @@ def confvalue(name, storm_config_opts, extrapaths, overriding_conf_file=None, da def get_classpath(extrajars, daemon=True, client=False): ret = get_wildcard_dir(STORM_DIR) + ret.extend(get_wildcard_dir(STORM_COMMON_LIB_DIR)) if client: ret.extend(get_wildcard_dir(STORM_WORKER_LIB_DIR)) else: @@ -125,9 +126,9 @@ def get_classpath(extrajars, daemon=True, client=False): def init_storm_env(within_unittest=False): global NORMAL_CLASS_PATH, STORM_DIR, USER_CONF_DIR, STORM_CONF_DIR, STORM_WORKER_LIB_DIR, STORM_LIB_DIR,\ - STORM_TOOLS_LIB_DIR, STORM_WEBAPP_LIB_DIR, STORM_BIN_DIR, STORM_LOG4J2_CONF_DIR, STORM_SUPERVISOR_LOG_FILE,\ - CLUSTER_CONF_DIR, JAR_JVM_OPTS, JAVA_HOME, JAVA_CMD, CONF_FILE, STORM_EXT_CLASSPATH, \ - STORM_EXT_CLASSPATH_DAEMON, LOCAL_TTL_DEFAULT + STORM_COMMON_LIB_DIR, STORM_TOOLS_LIB_DIR, STORM_WEBAPP_LIB_DIR, STORM_BIN_DIR, STORM_LOG4J2_CONF_DIR,\ + STORM_SUPERVISOR_LOG_FILE, CLUSTER_CONF_DIR, JAR_JVM_OPTS, JAVA_HOME, JAVA_CMD, CONF_FILE, \ + STORM_EXT_CLASSPATH, STORM_EXT_CLASSPATH_DAEMON, LOCAL_TTL_DEFAULT NORMAL_CLASS_PATH = cygpath if sys.platform == 'cygwin' else identity STORM_DIR = os.sep.join(os.path.realpath( __file__ ).split(os.sep)[:-2]) @@ -141,6 +142,10 @@ def init_storm_env(within_unittest=False): STORM_WORKER_LIB_DIR = os.path.join(STORM_DIR, "lib-worker") STORM_LIB_DIR = os.path.join(STORM_DIR, "lib") + # Jars shared by the daemon (lib) and worker (lib-worker) classpaths are de-duplicated into + # lib-common to keep the distribution small. It is added to both classpaths; absent in older + # layouts, in which case it contributes nothing. + STORM_COMMON_LIB_DIR = os.path.join(STORM_DIR, "lib-common") STORM_TOOLS_LIB_DIR = os.path.join(STORM_DIR, "lib-tools") STORM_WEBAPP_LIB_DIR = os.path.join(STORM_DIR, "lib-webapp") diff --git a/bin/test_storm.py b/bin/test_storm.py index 11c8057b0a8..a4466991909 100644 --- a/bin/test_storm.py +++ b/bin/test_storm.py @@ -66,6 +66,19 @@ def test_get_classpath(self): expected = ":".join(extrajars) self.assertEqual(s[-len(expected):], expected) + def test_get_classpath_includes_lib_common(self): + extrajars = [] + # When lib-common exists, it is included on both the daemon and the client classpaths. + storm.STORM_COMMON_LIB_DIR = storm.STORM_BIN_DIR + expected = os.path.join(storm.STORM_BIN_DIR, "*") + for client in (True, False): + cp = storm.get_classpath(extrajars, daemon=True, client=client) + self.assertIn(expected, cp.split(os.pathsep)) + # When it does not exist, it contributes nothing (backward compatible with older layouts). + storm.STORM_COMMON_LIB_DIR = os.path.join(storm.STORM_DIR, "no-such-lib-common") + cp = storm.get_classpath(extrajars, daemon=True, client=False) + self.assertNotIn(os.path.join(storm.STORM_COMMON_LIB_DIR, "*"), cp.split(os.pathsep)) + def test_resolve_dependencies(self): artifacts = "org.apache.commons.commons-api" artifact_repositories = "maven-central" diff --git a/external/storm-autocreds/README.md b/external/storm-autocreds/README.md new file mode 100644 index 00000000000..02459352994 --- /dev/null +++ b/external/storm-autocreds/README.md @@ -0,0 +1,101 @@ +# Storm Auto Credentials (HDFS / HBase) + +`storm-autocreds` lets Storm automatically acquire, distribute and renew +**Hadoop delegation tokens** so that topologies can talk to a secure (Kerberos) +HDFS or HBase cluster without distributing keytabs to every worker host. + +* On topology submission, **Nimbus** obtains delegation tokens on behalf of the + submitting user and ships them with the topology. +* **Workers** unpack the tokens into their `Subject` / `UserGroupInformation`. +* **Nimbus** periodically renews the tokens for long-running topologies. + +See `docs/SECURITY.md` ("Automatic Credentials Push and Renewal") for the full +design. + +## Why the jars are not bundled + +Because these plugins run on the **daemon** classpath (Nimbus/Supervisor) and +pull in the full Hadoop and HBase client dependency trees, they are **not** +shipped inside the binary distribution — only secure-Hadoop deployments need +them, and bundling them would bloat the distribution for everyone. This is the +same convention used by the other `external/*` connectors. + +## Installing + +The plugins must be present on the **daemon** classpath, i.e. in +`$STORM_HOME/extlib-daemon` on Nimbus and the Supervisors. + +### Option 1 — use the helper script (recommended) + +The distribution ships a helper that resolves `storm-autocreds` and its runtime +dependencies from Maven Central and copies them into `extlib-daemon`: + +```bash +$STORM_HOME/bin/storm-autocreds-fetch +``` + +It detects the Storm version from `$STORM_HOME/RELEASE`. Useful options: + +```bash +# explicit version / target directory +bin/storm-autocreds-fetch --version 3.0.0 --dest /opt/storm/extlib-daemon + +# pass extra arguments through to Maven (internal mirror / offline repo) +bin/storm-autocreds-fetch -- -s /etc/maven/settings.xml +bin/storm-autocreds-fetch -- -Dmaven.repo.local=/srv/offline-repo -o +``` + +Maven must be available on the host running the script (it does not have to be +installed on the cluster nodes — you can run it once and copy the resulting jars +to every daemon host). + +### Option 2 — build from source + +```bash +mvn -pl external/storm-autocreds -am package +cp external/storm-autocreds/target/storm-autocreds-*.jar \ + $(find ~/.m2 -name 'hadoop-auth-*.jar' -o -name 'hbase-client-*.jar') \ + $STORM_HOME/extlib-daemon/ +``` + +(Prefer Option 1 — it resolves the complete, correct dependency closure for you.) + +Restart Nimbus and the Supervisors after adding the jars so the new classpath +takes effect. + +## Configuring + +Add the following to `storm.yaml`. The `*Nimbus` classes run on Nimbus (acquire +and renew tokens); the non-`Nimbus` classes run in the worker (unpack tokens). + +```yaml +# Worker side: unpack the tokens into the worker Subject. +topology.auto-credentials: + - org.apache.storm.hdfs.security.AutoHDFS + - org.apache.storm.hbase.security.AutoHBase + +# Nimbus side: obtain the tokens on behalf of the submitter. +nimbus.autocredential.plugins.classes: + - org.apache.storm.hdfs.security.AutoHDFSNimbus + - org.apache.storm.hbase.security.AutoHBaseNimbus + +# Nimbus side: renew the tokens for long-running topologies. +nimbus.credential.renewers.classes: + - org.apache.storm.hdfs.security.AutoHDFSNimbus + - org.apache.storm.hbase.security.AutoHBaseNimbus +``` + +Relevant credential settings: + +| Setting | Purpose | +|---|---| +| `hdfs.keytab.file` / `hdfs.kerberos.principal` | Nimbus principal used to fetch HDFS tokens | +| `hdfs.kerberos.principal` | HDFS service principal | +| `hbase.keytab.file` / `hbase.kerberos.principal` | Nimbus principal used to fetch HBase tokens | +| `topology.hdfs.uri` | NameNode URI (defaults to the cluster `fs.defaultFS`) | +| `hdfsCredentialsConfigKeys` / `hbaseCredentialsConfigKeys` | optional list of per-cluster config keys when talking to multiple clusters | + +Use only the HDFS or only the HBase entries if you need just one of them. + +For the full secure-cluster setup (Kerberos, impersonation, ACLs) see +`docs/SECURITY.md`. diff --git a/external/storm-kafka-monitor/README.md b/external/storm-kafka-monitor/README.md index a483f4bdef9..8e5bd37ef4e 100644 --- a/external/storm-kafka-monitor/README.md +++ b/external/storm-kafka-monitor/README.md @@ -2,6 +2,30 @@ Tool to query kafka spout lags and show in Storm UI +## Installation + +The storm-kafka-monitor jars (and their Kafka client dependencies) are **not** +bundled in the binary distribution to keep it small — they are only needed to +display Kafka spout lag in the UI or to run the `storm-kafka-monitor` command. +The Storm UI degrades gracefully when they are absent (no lag is shown and a +hint is logged once). + +To enable it, install the jars on the UI host with the helper script, then +restart the UI: + +```bash +$STORM_HOME/bin/storm-kafka-monitor-fetch +``` + +It resolves `org.apache.storm:storm-kafka-monitor` and its runtime dependencies +from Maven Central into `lib-tools/storm-kafka-monitor`. Useful options: + +```bash +bin/storm-kafka-monitor-fetch --version 3.0.0 +# pass extra arguments through to Maven (internal mirror / offline repo) +bin/storm-kafka-monitor-fetch -- -s /etc/maven/settings.xml +``` + ## Usage This tool provides a way to query kafka offsets that the spout has consumed successfully and the latest offsets in kafka. It provides an easy way to see how the topology is performing. It is a command line diff --git a/storm-client/test/py/test_storm_cli.py b/storm-client/test/py/test_storm_cli.py index 2815de42f1c..8322897b04e 100644 --- a/storm-client/test/py/test_storm_cli.py +++ b/storm-client/test/py/test_storm_cli.py @@ -62,7 +62,7 @@ def test_jar_command(self): self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=+topology.blobstore.map%3D%27%7B%22key1%22%3A%7B%22localname%22%3A%22blob_file%22%2C+%22uncompress%22%3Afalse%7D%2C%22key2%22%3A%7B%7D%7D%27', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib-worker:' + self.storm_dir + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib-worker:' + self.storm_dir + '/extlib:example/storm-starter/storm-starter-topologies-*.jar:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin:./external/storm-redis/storm-redis-1.1.0.jar:./external/storm-kafka-client/storm-kafka-client-1.1.0.jar"', '-Dstorm.jar=example/storm-starter/storm-starter-topologies-*.jar', '-Dstorm.dependency.jars=./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka-client/storm-kafka-client-1.1.0.jar"', '-Dstorm.dependency.artifacts={}', 'org.apache.storm.starter.RollingTopWords', 'blobstore-remote2', 'remote' @@ -80,7 +80,7 @@ def test_jar_command(self): self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib-worker:' + self.storm_dir + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib-worker:' + self.storm_dir + '/extlib:/path/to/jar.jar:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin:', '-Dstorm.jar=/path/to/jar.jar', '-Dstorm.dependency.jars=', '-Dstorm.dependency.artifacts={}', 'some.Topology.Class', '-name', 'run-topology', 'randomArgument', '-randomFlag', 'randomFlagValue', @@ -93,7 +93,7 @@ def test_localconfvalue_command(self): self.base_test( ["storm", "localconfvalue", "conf_name"], self.mock_popen, mock.call([ self.java_cmd, '-client', '-Dstorm.options=', - '-Dstorm.conf.file=', '-cp', self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf', + '-Dstorm.conf.file=', '-cp', self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir +'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf', 'org.apache.storm.command.ConfigValue', 'conf_name' ], stdout=-1 ) @@ -103,7 +103,7 @@ def test_remoteconfvalue_command(self): self.base_test( ["storm", "remoteconfvalue", "conf_name"], self.mock_popen, mock.call([ self.java_cmd, '-client', '-Dstorm.options=', - '-Dstorm.conf.file=', '-cp', self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf', + '-Dstorm.conf.file=', '-cp', self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf', 'org.apache.storm.command.ConfigValue', 'conf_name' ], stdout=-1 ) @@ -124,7 +124,7 @@ def test_local_command(self): self.java_cmd, '-client','-Ddaemon.name=', '-Dstorm.options=', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:example/storm-starter/storm-starter-topologies-*.jar:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin:./external/storm-redis/storm-redis-1.1.0.jar:./external/storm-kafka-client/storm-kafka-client-1.1.0.jar"', @@ -144,7 +144,7 @@ def test_kill_command(self): self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin', 'org.apache.storm.command.KillTopology', 'doomed_topology' ]) ) @@ -157,7 +157,7 @@ def test_upload_credentials_command(self): self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=test%3Dtest', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=/some/other/storm.yaml', - '-cp', self.storm_dir + '/*:' + self.storm_dir + '/lib:' + + '-cp', self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + @@ -173,7 +173,7 @@ def test_blobstore_command(self): self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin', 'org.apache.storm.command.Blobstore', 'create', 'mytopo:data.tgz', '-f', 'data.tgz', '-a', 'u:alice:rwa,u:bob:rw,o::r']) @@ -188,7 +188,7 @@ def test_blobstore_command(self): '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin', 'org.apache.storm.command.Blobstore', 'list']) ) @@ -202,7 +202,7 @@ def test_blobstore_command(self): '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin', 'org.apache.storm.command.Blobstore', 'list', 'wordstotrack']) ) @@ -215,7 +215,7 @@ def test_blobstore_command(self): self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin', 'org.apache.storm.command.Blobstore', 'update', '-f', '/wordsToTrack.list', 'wordstotrack']) @@ -229,7 +229,7 @@ def test_blobstore_command(self): self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin', 'org.apache.storm.command.Blobstore', 'cat', 'wordstotrack']) ) @@ -242,7 +242,7 @@ def test_activate_command(self): self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin', 'org.apache.storm.command.Activate', 'doomed_topology' ]) @@ -256,7 +256,7 @@ def test_deactivate_command(self): self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin', 'org.apache.storm.command.Deactivate', 'doomed_topology' ]) @@ -270,7 +270,7 @@ def test_rebalance_command(self): self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin', 'org.apache.storm.command.Rebalance', 'doomed_topology' ]) @@ -284,7 +284,7 @@ def test_list_command(self): self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin', 'org.apache.storm.command.ListTopologies' ]) @@ -298,7 +298,7 @@ def test_nimbus_command(self): self.java_cmd, '-server', '-Ddaemon.name=nimbus', '-Dstorm.options=', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf', '-Djava.deserialization.disabled=true', '-Dlogfile.name=nimbus.log', '-Dlog4j.configurationFile=' + self.storm_dir + '/log4j2/cluster.xml', @@ -314,7 +314,7 @@ def test_supervisor_command(self): self.java_cmd, '-server', '-Ddaemon.name=supervisor', '-Dstorm.options=', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf', '-Djava.deserialization.disabled=true', '-Dlogfile.name=supervisor.log', '-Dlog4j.configurationFile=' + self.storm_dir + '/log4j2/cluster.xml', @@ -330,7 +330,7 @@ def test_pacemaker_command(self): self.java_cmd, '-server', '-Ddaemon.name=pacemaker', '-Dstorm.options=', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf', '-Djava.deserialization.disabled=true', '-Dlogfile.name=pacemaker.log', '-Dlog4j.configurationFile=' + self.storm_dir + '/log4j2/cluster.xml', @@ -346,7 +346,7 @@ def test_ui_command(self): self.java_cmd, '-server', '-Ddaemon.name=ui', '-Dstorm.options=', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/lib-webapp:' + self.storm_dir + '/conf', '-Djava.deserialization.disabled=true', '-Dlogfile.name=ui.log', @@ -363,7 +363,7 @@ def test_logviewer_command(self): self.java_cmd, '-server', '-Ddaemon.name=logviewer', '-Dstorm.options=', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/lib-webapp:' + self.storm_dir + '/conf', '-Djava.deserialization.disabled=true', '-Dlogfile.name=logviewer.log', @@ -380,7 +380,7 @@ def test_drpc_command(self): self.java_cmd, '-server', '-Ddaemon.name=drpc', '-Dstorm.options=', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/lib-webapp:' + self.storm_dir + '/conf', '-Djava.deserialization.disabled=true', '-Dlogfile.name=drpc.log', @@ -397,7 +397,7 @@ def test_drpc_client_command(self): self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin', 'org.apache.storm.command.BasicDrpcClient', 'exclaim', 'a', 'exclaim', 'b', 'test', 'bar' ]) @@ -409,7 +409,7 @@ def test_drpc_client_command(self): self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', '-Dstorm.conf.file=', '-cp', - self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + + self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin', 'org.apache.storm.command.BasicDrpcClient', '-f', 'exclaim', 'a', 'b' ]) @@ -422,7 +422,7 @@ def test_healthcheck_command(self): self.java_cmd, [ self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=', '-Dstorm.home=' + self.storm_dir, '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=', - '-Dstorm.conf.file=', '-cp', self.storm_dir + '/*:' + self.storm_dir + '/lib:' + + '-Dstorm.conf.file=', '-cp', self.storm_dir + '/*:' + self.storm_dir + '/lib-common:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin', 'org.apache.storm.command.HealthCheck' ]) diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java index 7d8d7bbc8d0..a230e1e7a25 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java +++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java @@ -54,6 +54,11 @@ public class TopologySpoutLag { BOOTSTRAP_CONFIG, SECURITY_PROTOCOL_CONFIG)); private static final Logger LOGGER = LoggerFactory.getLogger(TopologySpoutLag.class); + // The storm-kafka-monitor jars are not bundled in the binary distribution; operators install them + // on demand (bin/storm-kafka-monitor-fetch). Log the "not installed" hint at most once to avoid + // spamming the UI logs, which poll the lag endpoint periodically. + private static volatile boolean warnedMonitorMissing = false; + public static Map> lag(StormTopology stormTopology, Map topologyConf) { Map> result = new HashMap<>(); Map spouts = stormTopology.get_spouts(); @@ -69,6 +74,24 @@ public static Map> lag(StormTopology stormTopology, return result; } + /** + * Checks whether the storm-kafka-monitor jars (invoked by bin/storm-kafka-monitor) are present. + * They are not bundled in the binary distribution and are fetched on demand, so the UI must + * degrade gracefully when they are absent rather than failing the lag shell-out. + * + * @return true if the monitor appears installed, or if STORM_BASE_DIR is unknown (in which case + * the legacy behavior of attempting the shell-out is preserved). + */ + private static boolean isKafkaMonitorInstalled() { + String stormHomeDir = System.getenv("STORM_BASE_DIR"); + if (stormHomeDir == null) { + return true; + } + File libDir = new File(new File(stormHomeDir, "lib-tools"), "storm-kafka-monitor"); + File[] jars = libDir.listFiles((dir, name) -> name.endsWith(".jar")); + return jars != null && jars.length > 0; + } + private static List getCommandLineOptionsForNewKafkaSpout(Map jsonConf) { LOGGER.debug("json configuration: {}", jsonConf); @@ -166,7 +189,18 @@ private static Map getLagResultForKafka(String spoutId, SpoutSpe LOGGER.debug("Command to run: {}", commands); // if commands contains one or more null value, spout is compiled with lower version of storm-kafka-client - if (!commands.contains(null)) { + if (!commands.contains(null) && !isKafkaMonitorInstalled()) { + errorMsg = "Kafka spout lag monitoring is unavailable because the storm-kafka-monitor " + + "jars are not installed. They are no longer bundled in the binary distribution; " + + "run 'bin/storm-kafka-monitor-fetch' on the UI host (and restart the UI) to enable it."; + if (!warnedMonitorMissing) { + warnedMonitorMissing = true; + LOGGER.info(errorMsg); + } + if (extraPropertiesFile != null) { + extraPropertiesFile.delete(); + } + } else if (!commands.contains(null)) { try { String resultFromMonitor = new ShellCommandRunnerImpl().execCommand(commands.toArray(new String[0])); diff --git a/storm-dist/binary/final-package/pom.xml b/storm-dist/binary/final-package/pom.xml index 8471b307c62..dce19eaf6ed 100644 --- a/storm-dist/binary/final-package/pom.xml +++ b/storm-dist/binary/final-package/pom.xml @@ -43,6 +43,73 @@ apache-storm-${project.version} + + + org.apache.maven.plugins + maven-dependency-plugin + + + stage-daemon-lib + prepare-package + + copy-dependencies + + + runtime + ${project.build.directory}/staging/lib + + + + + + org.apache.maven.plugins + maven-resources-plugin + + + stage-common-lib + prepare-package + + copy-resources + + + ${project.build.directory}/staging/lib-common + + + ${project.basedir}/../storm-client-bin/target/client/client/lib-worker + + *.jar + + + + + + + + + org.codehaus.mojo + exec-maven-plugin + + + dedup-libs + prepare-package + + exec + + + python3 + + ${project.basedir}/src/main/scripts/dedup-libs.py + ${project.build.directory}/staging + + + + + org.apache.maven.plugins maven-assembly-plugin diff --git a/storm-dist/binary/final-package/src/main/assembly/binary.xml b/storm-dist/binary/final-package/src/main/assembly/binary.xml index ee5cb60e3ed..5e62e2bc00d 100644 --- a/storm-dist/binary/final-package/src/main/assembly/binary.xml +++ b/storm-dist/binary/final-package/src/main/assembly/binary.xml @@ -24,21 +24,23 @@ zip - - - - false - lib - false - - - + - ${project.basedir}/../storm-client-bin/target/client/client/ - . + ${project.build.directory}/staging/lib + lib - */** + *.jar + + + + ${project.build.directory}/staging/lib-common + lib-common + + *.jar @@ -232,21 +234,27 @@ - + - ${project.basedir}/../storm-kafka-monitor-bin/target/kafka-monitor/kafka-monitor/lib-kafka-monitor - lib-tools/storm-kafka-monitor + ${project.basedir}/../../../external/storm-kafka-monitor + external/storm-kafka-monitor - *jar + README.* - + - ${project.basedir}/../storm-autocreds-bin/target/autocreds/autocreds/lib-autocreds + ${project.basedir}/../../../external/storm-autocreds external/storm-autocreds - *jar + README.* diff --git a/storm-dist/binary/final-package/src/main/scripts/dedup-libs.py b/storm-dist/binary/final-package/src/main/scripts/dedup-libs.py new file mode 100755 index 00000000000..d6af3ea107d --- /dev/null +++ b/storm-dist/binary/final-package/src/main/scripts/dedup-libs.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# De-duplicates the jars shared by the daemon (lib) and worker (lib-worker) +# classpaths of an assembled Storm distribution into a single lib-common +# directory, to keep the distribution small. +# +# The worker classpath is a strict subset of the daemon classpath, so every +# lib-worker jar is moved into lib-common; any byte-identical copy in lib is +# then removed. bin/storm.py adds lib-common to BOTH classpaths, so: +# daemon classpath = lib-common + lib (unchanged jar set) +# worker classpath = lib-common (+ lib-worker, now empty) +# +# Only byte-identical jars (same name AND same sha-256) are de-duplicated, so a +# version mismatch is never silently merged. Tool classpaths (lib-tools/*, +# lib-webapp) are intentionally left untouched: their wrappers do not include +# lib-common, so their jars must stay in place. + +import hashlib +import os +import shutil +import sys + + +def sha256(path): + h = hashlib.sha256() + with open(path, "rb") as fh: + for chunk in iter(lambda: fh.read(1 << 20), b""): + h.update(chunk) + return h.hexdigest() + + +def jars(directory): + if not os.path.isdir(directory): + return {} + return {f: os.path.join(directory, f) for f in os.listdir(directory) if f.endswith(".jar")} + + +def dedup(dist_root): + lib = os.path.join(dist_root, "lib") + worker = os.path.join(dist_root, "lib-worker") + common = os.path.join(dist_root, "lib-common") + + # Absorb the worker jars into lib-common. This handles two layouts: + # - exploded distribution: lib-worker is present, lib-common is created here; + # - staged build: lib-common is already populated, lib-worker is absent. + worker_jars = jars(worker) + if worker_jars: + os.makedirs(common, exist_ok=True) + for name, src in sorted(worker_jars.items()): + dst = os.path.join(common, name) + if not os.path.exists(dst): + shutil.move(src, dst) + else: + os.remove(src) + # lib-worker is now empty; remove it so the layout is unambiguous. + if os.path.isdir(worker) and not os.listdir(worker): + os.rmdir(worker) + + common_jars = jars(common) + if not common_jars: + print(f"dedup-libs: no jars in {common} (and none in {worker}); nothing to do") + return 0 + + lib_jars = jars(lib) + reclaimed = 0 + removed = 0 + for name, common_copy in sorted(common_jars.items()): + # Drop the byte-identical copy from the daemon lib dir, if present. + lib_copy = lib_jars.get(name) + if lib_copy and os.path.exists(lib_copy) and sha256(lib_copy) == sha256(common_copy): + reclaimed += os.path.getsize(lib_copy) + os.remove(lib_copy) + removed += 1 + + print(f"dedup-libs: lib-common has {len(common_jars)} jar(s); " + f"removed {removed} duplicate(s) from lib, reclaimed {reclaimed / (1024 * 1024):.1f} MB") + return 0 + + +def main(argv): + if len(argv) != 2: + print("Usage: dedup-libs.py ", file=sys.stderr) + return 2 + return dedup(argv[1]) + + +if __name__ == "__main__": + sys.exit(main(sys.argv)) diff --git a/storm-dist/binary/pom.xml b/storm-dist/binary/pom.xml index 94209592e3c..3b58038d172 100644 --- a/storm-dist/binary/pom.xml +++ b/storm-dist/binary/pom.xml @@ -47,9 +47,7 @@ storm-client-bin storm-webapp-bin - storm-autocreds-bin storm-submit-tools-bin - storm-kafka-monitor-bin final-package diff --git a/storm-dist/binary/storm-autocreds-bin/pom.xml b/storm-dist/binary/storm-autocreds-bin/pom.xml deleted file mode 100644 index 8b42b902fff..00000000000 --- a/storm-dist/binary/storm-autocreds-bin/pom.xml +++ /dev/null @@ -1,63 +0,0 @@ - - - - 4.0.0 - - org.apache.storm - apache-storm-bin - 3.0.0-SNAPSHOT - - storm-autocreds-bin - pom - - - Storm Autocreds Binary - - - org.apache.storm - storm-autocreds - ${project.version} - - - - - autocreds - - - org.apache.maven.plugins - maven-assembly-plugin - - - prepare-package - - single - - - - - false - false - - ${project.basedir}/src/main/assembly/storm-autocreds.xml - - false - - - - - diff --git a/storm-dist/binary/storm-autocreds-bin/src/main/assembly/storm-autocreds.xml b/storm-dist/binary/storm-autocreds-bin/src/main/assembly/storm-autocreds.xml deleted file mode 100644 index 1cd914cb7b7..00000000000 --- a/storm-dist/binary/storm-autocreds-bin/src/main/assembly/storm-autocreds.xml +++ /dev/null @@ -1,33 +0,0 @@ - - - - storm-autocreds-bin - - dir - - - - false - lib-autocreds - false - - - diff --git a/storm-dist/binary/storm-kafka-monitor-bin/pom.xml b/storm-dist/binary/storm-kafka-monitor-bin/pom.xml deleted file mode 100644 index 4d9e569a95a..00000000000 --- a/storm-dist/binary/storm-kafka-monitor-bin/pom.xml +++ /dev/null @@ -1,69 +0,0 @@ - - - - 4.0.0 - - org.apache.storm - apache-storm-bin - 3.0.0-SNAPSHOT - - storm-kafka-monitor-bin - pom - - - Storm Kafka Monitor Binary - - - org.apache.storm - storm-kafka-monitor - ${project.version} - - - - org.apache.kafka - kafka-clients - compile - - - - - kafka-monitor - - - org.apache.maven.plugins - maven-assembly-plugin - - - prepare-package - - single - - - - - false - false - - ${project.basedir}/src/main/assembly/storm-kafka-monitor.xml - - false - - - - - diff --git a/storm-dist/binary/storm-kafka-monitor-bin/src/main/assembly/storm-kafka-monitor.xml b/storm-dist/binary/storm-kafka-monitor-bin/src/main/assembly/storm-kafka-monitor.xml deleted file mode 100644 index 021b472c87f..00000000000 --- a/storm-dist/binary/storm-kafka-monitor-bin/src/main/assembly/storm-kafka-monitor.xml +++ /dev/null @@ -1,33 +0,0 @@ - - - - storm-kafka-monitor-bin - - dir - - - - false - lib-kafka-monitor - false - - -