diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 4368099725c..a64bd5b8940 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -54,7 +54,10 @@ storm.nimbus.zookeeper.acls.fixup: true
storm.auth.simple-white-list.users: []
storm.cluster.state.store: "org.apache.storm.cluster.ZKStateStorageFactory"
-storm.meta.serialization.delegate: "org.apache.storm.serialization.GzipThriftSerializationDelegate"
+storm.meta.serialization.delegate: "org.apache.storm.serialization.ZstdBridgeThriftSerializationDelegate"
+storm.compression.zstd.level: 3
+storm.compression.zstd.max.decompressed.bytes: 104857600
+storm.compression.gzip.max.decompressed.bytes: 104857600
storm.codedistributor.class: "org.apache.storm.codedistributor.LocalFileSystemCodeDistributor"
storm.workers.artifacts.dir: "workers-artifacts"
storm.health.check.dir: "healthchecks"
diff --git a/docs/Cluster-State-Serialization.md b/docs/Cluster-State-Serialization.md
new file mode 100644
index 00000000000..8581448f409
--- /dev/null
+++ b/docs/Cluster-State-Serialization.md
@@ -0,0 +1,124 @@
+---
+title: Cluster State Serialization
+layout: documentation
+documentation: true
+---
+
+This page describes how Storm serializes the *meta* state it persists in
+ZooKeeper (and other configured state stores) such as topology assignments, Nimbus
+summaries, `StormBase` records, log configs, credentials, worker heartbeats,
+profile requests, errors, etc.
+
+It is distinct from
+[tuple serialization](Serialization.html), which covers payloads exchanged
+between spouts and bolts at runtime via Kryo.
+
+## Background
+
+All cluster state writes go through `Utils.serialize(...)` /
+`Utils.deserialize(...)`, which in turn delegate to a pluggable
+`SerializationDelegate` selected by the
+`storm.meta.serialization.delegate` config.
+
+## Configuration
+
+| Key | Default | Range | Description |
+|---|---|---|---|
+| `storm.meta.serialization.delegate` | `org.apache.storm.serialization.ZstdBridgeThriftSerializationDelegate` | any `SerializationDelegate` impl | Class used to (de)serialize cluster state. |
+| `storm.compression.zstd.level` | `3` | `1`–`19` | Zstandard compression level. Higher = smaller + slower. Levels 20–22 are rejected by the validator. |
+| `storm.compression.zstd.max.decompressed.bytes` | `104857600` (100 MiB) | `> 0` | Hard cap on the size of any zstd-decompressed payload. |
+| `storm.compression.gzip.max.decompressed.bytes` | `104857600` (100 MiB) | `> 0` | Hard cap on the size of any gzip-decompressed payload. Also enforced by `GzipSerializationDelegate`. |
+
+## Choosing a delegate
+
+* **`ZstdBridgeThriftSerializationDelegate`** *(default)* — recommended.
+ Writes zstd, reads anything previously written. Use this unless you
+ have a specific reason not to.
+* **`ZstdThriftSerializationDelegate`** — pure zstd, refuses non-zstd
+ input. Only safe to deploy after every znode in your state store has
+ been rewritten by a bridge delegate (e.g. by submitting / killing each
+ topology, or by force-rewriting Nimbus state). Use only when you want
+ to *enforce* the new format.
+* **`GzipBridgeThriftSerializationDelegate`** — legacy default; still
+ available for clusters that want to roll forward without touching the
+ codec.
+* **`ThriftSerializationDelegate`** — raw Thrift.
+
+## Migration to Zstandard compression
+
+Starting with Apache Storm 3.X, Zstandard is supported as the default
+compression codec for cluster state, replacing gzip for better
+performance — faster compression and decompression at comparable or
+better ratios. Earlier versions used `GzipThriftSerializationDelegate`,
+wrapped by `GzipBridgeThriftSerializationDelegate` to allow rolling
+upgrades from clusters that had previously stored raw Thrift bytes; the
+new `ZstdBridgeThriftSerializationDelegate` plays the equivalent bridge
+role for the gzip to zstd transition.
+
+| Area | Gzip | Zstandard |
+|---|---------------------------------------------------------|-----------------------------------------------------|
+| Default delegate | `GzipThriftSerializationDelegate` (via `GzipBridge...`) | `ZstdBridgeThriftSerializationDelegate` |
+| Compression codec | gzip (`java.util.zip`) | Zstandard (via `commons-compress` + `zstd-jni`) |
+| Decompression bound | none | bounded (`BoundedInputStream`), default 100 MiB |
+| Format detection | gzip magic only | gzip magic *and* zstd magic |
+| Config validation | none for compression | `ZstdLevelValidator` (1–19), positive bounds checks |
+
+### Zstandard `SerializationDelegate` implementations
+
+* `ZstdThriftSerializationDelegate`: pure zstd Thrift codec. Serializes
+ any `TBase` with zstd at the configured level; deserialization
+ requires the input to begin with the zstd magic number
+ (`0xFD2FB528`).
+* `ZstdBridgeThriftSerializationDelegate`: the new default, implemented to
+ allow rolling upgrades from clusters that had previously stored payloads
+ as gzip-compressed. Always *writes* zstd. On read, dispatches based on a
+ magic-byte sniff:
+
+```
+ZstdBridgeThriftSerializationDelegate.deserialize(bytes)
+ ├── bytes starts with zstd magic (0xFD2FB528) delegates to ZstdThriftSerializationDelegate
+ └── otherwise, delegates to GzipBridgeThriftSerializationDelegate.deserialize(bytes)
+ ├── bytes starts with gzip magic (0x1F8B) delegates to GzipThriftSerializationDelegate
+ └── otherwise delegates to ThriftSerializationDelegate (raw Thrift)
+```
+
+This delegation chain is the key property that makes the new default
+rolling-upgrade safe: nodes running the new code can still read every
+older payload that may already exist in ZooKeeper, while new writes use
+zstd.
+
+### Zip-bomb protection
+
+`GzipUtils.decompress` and `ZstdUtils.decompress` (both in
+`org.apache.storm.utils.Utils`) wrap the decompressor stream in an Apache
+Commons `BoundedInputStream` with `maxCount` set to the configured cap.
+After draining the bounded stream, the underlying decompressor is probed
+with one extra `read()`; if any byte remains, the call fails with:
+
+```
+Decompression threshold exceeded! Possible security risk or invalid data size.
+```
+
+The same guard is applied to the legacy `GzipSerializationDelegate` (the
+non-Thrift Java-serialization variant).
+
+### Upgrading an existing cluster
+
+1. **Roll Nimbus and Supervisors onto the new build.** The bridge
+ delegate is the default, so no config change is required for a safe
+ upgrade.
+2. **(Optional) Tune `storm.compression.zstd.level`** if you want a
+ tighter compression / latency trade-off. Most state writes are
+ infrequent; level 3 is a good default.
+3. **(Optional) Tune `storm.compression.zstd.max.decompressed.bytes`** if
+ you legitimately persist payloads larger than 100 MiB. The cap
+ guards against malformed or hostile data, raise it deliberately.
+4. **(Optional) Switch to the strict `ZstdThriftSerializationDelegate`**
+ *only* after every legacy payload has been rewritten. The bridge
+ delegate is sufficient for the vast majority of deployments.
+
+### Dependencies
+
+The zstd codec is provided by Apache Commons Compress
+(`org.apache.commons:commons-compress`) backed by the `com.github.luben:zstd-jni`
+native binding.
diff --git a/docs/Serialization.md b/docs/Serialization.md
index 4d2749105a6..0e7cc2e87a3 100644
--- a/docs/Serialization.md
+++ b/docs/Serialization.md
@@ -5,6 +5,8 @@ documentation: true
---
This page is about how the serialization system in Storm works for versions 0.6.0 and onwards. Storm used a different serialization system prior to 0.6.0 which is documented on [Serialization (prior to 0.6.0)](Serialization-\(prior-to-0.6.0\).html).
+> This page covers **tuple** serialization (data flowing between spouts and bolts). For how Storm serializes the meta state it persists in ZooKeeper and related configuration, see [Cluster State Serialization](Cluster-State-Serialization.html).
+
Tuples can be comprised of objects of any types. Since Storm is a distributed system, it needs to know how to serialize and deserialize objects when they're passed between tasks.
Storm uses [Kryo](https://github.com/EsotericSoftware/kryo) for serialization. Kryo is a flexible and fast serialization library that produces small serializations.
diff --git a/pom.xml b/pom.xml
index 4d3863693ac..c1ba6755507 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,6 +82,7 @@
1.28.0
+ 1.5.7-82.22.03.20.01.6.0
@@ -514,6 +515,11 @@
commons-compress${commons-compress.version}
+
+ com.github.luben
+ zstd-jni
+ ${zstd-jni.version}
+ org.apache.commonscommons-exec
diff --git a/storm-client/pom.xml b/storm-client/pom.xml
index 62d6c20d9d3..3a7ee315d1a 100644
--- a/storm-client/pom.xml
+++ b/storm-client/pom.xml
@@ -135,6 +135,14 @@
curator-testtest
+
+ org.apache.commons
+ commons-compress
+
+
+ com.github.luben
+ zstd-jni
+
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index e332b726b28..f91d30ca3c0 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1458,7 +1458,24 @@ public class Config extends HashMap {
*/
@IsString
public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate";
-
+ /**
+ * GZIP max decompression bytes. Defaults to 104857600 (100MB).
+ */
+ @IsPositiveNumber(includeZero = false)
+ public static final String STORM_COMPRESSION_GZIP_MAX_DECOMPRESSED_BYTES = "storm.compression.gzip.max.decompressed.bytes";
+ /**
+ * Zstandard compression level.
+ * Supported range: 1 to 19. Default: 3.
+ * Prohibited: Levels 20-22 (Ultra mode) are not allowed as they
+ * require dramatically more working memory per call.
+ */
+ @CustomValidator(validatorClass = ConfigValidation.ZstdLevelValidator.class)
+ public static final String STORM_COMPRESSION_ZSTD_LEVEL = "storm.compression.zstd.level";
+ /**
+ * Zstandard max decompression bytes. Defaults to 104857600 (100MB).
+ */
+ @IsPositiveNumber(includeZero = false)
+ public static final String STORM_COMPRESSION_ZSTD_MAX_DECOMPRESSED_BYTES = "storm.compression.zstd.max.decompressed.bytes";
/**
* Configure the topology metrics reporters to be used on workers.
*/
diff --git a/storm-client/src/jvm/org/apache/storm/serialization/GzipBridgeThriftSerializationDelegate.java b/storm-client/src/jvm/org/apache/storm/serialization/GzipBridgeThriftSerializationDelegate.java
index bc966112426..67ae15ada00 100644
--- a/storm-client/src/jvm/org/apache/storm/serialization/GzipBridgeThriftSerializationDelegate.java
+++ b/storm-client/src/jvm/org/apache/storm/serialization/GzipBridgeThriftSerializationDelegate.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
diff --git a/storm-client/src/jvm/org/apache/storm/serialization/GzipSerializationDelegate.java b/storm-client/src/jvm/org/apache/storm/serialization/GzipSerializationDelegate.java
index 9c4045854d5..d12c2b01879 100644
--- a/storm-client/src/jvm/org/apache/storm/serialization/GzipSerializationDelegate.java
+++ b/storm-client/src/jvm/org/apache/storm/serialization/GzipSerializationDelegate.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
@@ -20,15 +20,22 @@
import java.util.Map;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
+import org.apache.storm.Config;
+import org.apache.storm.shade.org.apache.commons.io.input.BoundedInputStream;
+import org.apache.storm.utils.ObjectReader;
/**
* Note, this assumes it's deserializing a gzip byte stream, and will err if it encounters any other serialization.
*/
public class GzipSerializationDelegate implements SerializationDelegate {
+ private static final int DEFAULT_MAX_DECOMPRESSED_BYTES = 100 * 1024 * 1024;
+ private int maxDecompressedBytes;
+
@Override
public void prepare(Map topoConf) {
- // No-op
+ this.maxDecompressedBytes = ObjectReader.getInt(topoConf.getOrDefault(Config.STORM_COMPRESSION_GZIP_MAX_DECOMPRESSED_BYTES,
+ DEFAULT_MAX_DECOMPRESSED_BYTES));
}
@Override
@@ -47,17 +54,21 @@ public byte[] serialize(Object object) {
@Override
public T deserialize(byte[] bytes, Class clazz) {
- try {
- ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
- GZIPInputStream gis = new GZIPInputStream(bis);
- ObjectInputStream ois = new ObjectInputStream(gis);
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ GZIPInputStream gis = new GZIPInputStream(bis);
+ BoundedInputStream lis = BoundedInputStream.builder()
+ .setMaxCount(this.maxDecompressedBytes)
+ .setInputStream(gis)
+ .setPropagateClose(true)
+ .get();
+ ObjectInputStream ois = new ObjectInputStream(lis)) {
Object ret = ois.readObject();
- ois.close();
- return (T) ret;
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
+ if (gis.read() != -1) {
+ throw new IOException("Decompression threshold exceeded! Possible security risk or invalid data size.");
+ }
+ return clazz.cast(ret);
+ } catch (IOException | ClassNotFoundException e) {
+ throw new RuntimeException("Deserialization failed: " + e.getMessage(), e);
}
}
}
diff --git a/storm-client/src/jvm/org/apache/storm/serialization/GzipThriftSerializationDelegate.java b/storm-client/src/jvm/org/apache/storm/serialization/GzipThriftSerializationDelegate.java
index f518628a82d..ae3bc7ac1cf 100644
--- a/storm-client/src/jvm/org/apache/storm/serialization/GzipThriftSerializationDelegate.java
+++ b/storm-client/src/jvm/org/apache/storm/serialization/GzipThriftSerializationDelegate.java
@@ -19,10 +19,12 @@
package org.apache.storm.serialization;
import java.util.Map;
+import org.apache.storm.Config;
import org.apache.storm.thrift.TBase;
import org.apache.storm.thrift.TDeserializer;
import org.apache.storm.thrift.TException;
import org.apache.storm.thrift.TSerializer;
+import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Utils;
/**
@@ -30,15 +32,19 @@
*/
public class GzipThriftSerializationDelegate implements SerializationDelegate {
+ private static final int DEFAULT_MAX_DECOMPRESSED_BYTES = 100 * 1024 * 1024;
+ private int maxDecompressedBytes;
+
@Override
public void prepare(Map topoConf) {
- // No-op
+ this.maxDecompressedBytes = ObjectReader.getInt(topoConf.getOrDefault(Config.STORM_COMPRESSION_GZIP_MAX_DECOMPRESSED_BYTES,
+ DEFAULT_MAX_DECOMPRESSED_BYTES));
}
@Override
public byte[] serialize(Object object) {
try {
- return Utils.gzip(new TSerializer().serialize((TBase) object));
+ return Utils.GzipUtils.compress(new TSerializer().serialize((TBase) object));
} catch (TException e) {
throw new RuntimeException(e);
}
@@ -48,7 +54,7 @@ public byte[] serialize(Object object) {
public T deserialize(byte[] bytes, Class clazz) {
try {
TBase instance = (TBase) clazz.newInstance();
- new TDeserializer().deserialize(instance, Utils.gunzip(bytes));
+ new TDeserializer().deserialize(instance, Utils.GzipUtils.decompress(bytes, this.maxDecompressedBytes));
return (T) instance;
} catch (Exception e) {
throw new RuntimeException(e);
diff --git a/storm-client/src/jvm/org/apache/storm/serialization/ZstdBridgeThriftSerializationDelegate.java b/storm-client/src/jvm/org/apache/storm/serialization/ZstdBridgeThriftSerializationDelegate.java
new file mode 100644
index 00000000000..a3484b7fd17
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/serialization/ZstdBridgeThriftSerializationDelegate.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.serialization;
+
+import java.util.Map;
+import org.apache.storm.utils.Utils;
+
+/**
+ * Always writes Zstd out, but tests incoming bytes to determine the format.
+ * If Zstd magic is found, it uses {@link ZstdThriftSerializationDelegate}.
+ * If not, it falls back to {@link ThriftSerializationDelegate} for raw Thrift.
+ */
+public class ZstdBridgeThriftSerializationDelegate implements SerializationDelegate {
+
+ private final GzipBridgeThriftSerializationDelegate defaultDelegate = new GzipBridgeThriftSerializationDelegate();
+ private final ZstdThriftSerializationDelegate zstdDelegate = new ZstdThriftSerializationDelegate();
+
+ @Override
+ public void prepare(Map topoConf) {
+ defaultDelegate.prepare(topoConf);
+ zstdDelegate.prepare(topoConf);
+ }
+
+ @Override
+ public byte[] serialize(Object object) {
+ // Always compress new data with Zstd
+ return zstdDelegate.serialize(object);
+ }
+
+ @Override
+ public T deserialize(byte[] bytes, Class clazz) {
+ if (Utils.ZstdUtils.isZstd(bytes)) {
+ return zstdDelegate.deserialize(bytes, clazz);
+ } else {
+ // Fallback to ZstdBridgeThriftSerializationDelegate
+ // it delegates to the proper SerializationDelegate (GzipThriftSerializationDelegate or ThriftSerializationDelegate)
+ return defaultDelegate.deserialize(bytes, clazz);
+ }
+ }
+}
diff --git a/storm-client/src/jvm/org/apache/storm/serialization/ZstdThriftSerializationDelegate.java b/storm-client/src/jvm/org/apache/storm/serialization/ZstdThriftSerializationDelegate.java
new file mode 100644
index 00000000000..d3b990a7b3e
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/serialization/ZstdThriftSerializationDelegate.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.serialization;
+
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.thrift.TBase;
+import org.apache.storm.thrift.TDeserializer;
+import org.apache.storm.thrift.TException;
+import org.apache.storm.thrift.TSerializer;
+import org.apache.storm.thrift.transport.TTransportException;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
+
+/**
+ * Note, this assumes it's deserializing a zstd byte stream, and will err if it encounters any other serialization.
+ */
+public class ZstdThriftSerializationDelegate implements SerializationDelegate {
+
+ private static final int DEFAULT_MAX_DECOMPRESSED_BYTES = 100 * 1024 * 1024;
+ private static final int DEFAULT_ZSTD_COMPRESSION_LEVEL = 3;
+
+ private int zstdCompressionLevel;
+ private int maxDecompressedBytes;
+
+ @Override
+ public void prepare(Map topoConf) {
+ this.zstdCompressionLevel = ObjectReader.getInt(topoConf.getOrDefault(Config.STORM_COMPRESSION_ZSTD_LEVEL,
+ DEFAULT_ZSTD_COMPRESSION_LEVEL));
+ this.maxDecompressedBytes = ObjectReader.getInt(topoConf.getOrDefault(Config.STORM_COMPRESSION_ZSTD_MAX_DECOMPRESSED_BYTES,
+ DEFAULT_MAX_DECOMPRESSED_BYTES));
+ }
+
+ @Override
+ public byte[] serialize(Object object) {
+ if (!(object instanceof TBase)) {
+ throw new IllegalArgumentException("Object must be an instance of TBase");
+ }
+ try {
+ TSerializer serializer = new TSerializer();
+ byte[] thriftData = serializer.serialize((TBase, ?>) object);
+ return Utils.ZstdUtils.compress(thriftData, this.zstdCompressionLevel);
+ } catch (TTransportException e) {
+ throw new RuntimeException("Failed to initialize Thrift Serializer", e);
+ } catch (TException e) {
+ throw new RuntimeException("Failed to serialize Thrift object", e);
+ }
+ }
+
+ @Override
+ public T deserialize(byte[] bytes, Class clazz) {
+ if (!Utils.ZstdUtils.isZstd(bytes)) {
+ throw new RuntimeException(
+ String.format("Cannot deserialize [%s]. Expected zstd compressed bytes, but received unknown format.",
+ clazz.getSimpleName())
+ );
+ }
+ try {
+ TDeserializer deserializer = new TDeserializer();
+ byte[] decompressed = Utils.ZstdUtils.decompress(bytes, this.maxDecompressedBytes);
+ TBase, ?> instance = clazz.asSubclass(TBase.class).getDeclaredConstructor().newInstance();
+ deserializer.deserialize(instance, decompressed);
+ return (T) instance;
+ } catch (TTransportException e) {
+ throw new RuntimeException("Failed to initialize Thrift Deserializer", e);
+ } catch (ReflectiveOperationException | TException e) {
+ throw new RuntimeException("Failed to deserialize bytes to " + clazz.getName(), e);
+ }
+ }
+
+}
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index b639863f836..7ba3298fee2 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -34,6 +34,8 @@
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.lang.Thread.UncaughtExceptionHandler;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.net.InetAddress;
@@ -43,6 +45,7 @@
import java.net.URLEncoder;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
@@ -72,6 +75,9 @@
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import javax.security.auth.Subject;
+import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;
+import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream;
+import org.apache.commons.io.IOUtils;
import org.apache.storm.Config;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.ClientBlobStore;
@@ -96,6 +102,7 @@
import org.apache.storm.shade.net.minidev.json.JSONValue;
import org.apache.storm.shade.net.minidev.json.parser.ParseException;
import org.apache.storm.shade.org.apache.commons.io.FileUtils;
+import org.apache.storm.shade.org.apache.commons.io.input.BoundedInputStream;
import org.apache.storm.shade.org.apache.commons.io.input.ClassLoaderObjectInputStream;
import org.apache.storm.shade.org.apache.commons.lang3.StringUtils;
import org.apache.storm.shade.org.apache.zookeeper.ZooDefs;
@@ -930,33 +937,181 @@ public static List