diff --git a/zeppelin-common/src/main/java/org/apache/zeppelin/common/ConfigTimeUtils.java b/zeppelin-common/src/main/java/org/apache/zeppelin/common/ConfigTimeUtils.java new file mode 100644 index 00000000000..382439269e7 --- /dev/null +++ b/zeppelin-common/src/main/java/org/apache/zeppelin/common/ConfigTimeUtils.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.common; + +import java.time.Duration; + +/** + * Utility for parsing time values from configuration strings. + * Consolidates the logic previously duplicated in + * {@code ZeppelinConfiguration.timeUnitToMill()} and + * {@code TimeoutLifecycleManager.parseTimeValue()}. + */ +public final class ConfigTimeUtils { + private ConfigTimeUtils() {} + + /** + * Parses a configuration time string to milliseconds. + * + *

Supported formats: + *

+ * + * @param value the time string from a configuration property + * @return the equivalent duration in milliseconds + * @throws IllegalArgumentException if {@code value} is null or empty + * @throws java.time.format.DateTimeParseException if the value is not a recognised format + */ + public static long parseTimeValueToMillis(String value) { + if (value == null || value.trim().isEmpty()) { + throw new IllegalArgumentException("Time value must not be null or empty"); + } + try { + return Long.parseLong(value); + } catch (NumberFormatException e) { + if (value.endsWith("ms")) { + return Long.parseLong(value.substring(0, value.length() - 2)); + } + return Duration.parse("PT" + value).toMillis(); + } + } +} diff --git a/zeppelin-common/src/main/java/org/apache/zeppelin/common/InterpreterConfigKeys.java b/zeppelin-common/src/main/java/org/apache/zeppelin/common/InterpreterConfigKeys.java new file mode 100644 index 00000000000..e3373c71a4f --- /dev/null +++ b/zeppelin-common/src/main/java/org/apache/zeppelin/common/InterpreterConfigKeys.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.common; + +/** + * Canonical string constants for interpreter-related configuration keys. + * Use these instead of plain string literals to catch typos at compile time. + */ +public final class InterpreterConfigKeys { + private InterpreterConfigKeys() {} + + public static final String INTERPRETER_CONNECTION_POOL_SIZE = + "zeppelin.interpreter.connection.poolsize"; + public static final String INTERPRETER_LIFECYCLE_MANAGER_CLASS = + "zeppelin.interpreter.lifecyclemanager.class"; + public static final String INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL = + "zeppelin.interpreter.lifecyclemanager.timeout.checkinterval"; + public static final String INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD = + "zeppelin.interpreter.lifecyclemanager.timeout.threshold"; + public static final String PROXY_URL = "zeppelin.proxy.url"; + public static final String PROXY_USER = "zeppelin.proxy.user"; + public static final String PROXY_PASSWORD = "zeppelin.proxy.password"; + public static final String INTERPRETER_DEP_MVNREPO = "zeppelin.interpreter.dep.mvnRepo"; + public static final String SCHEDULER_THREADPOOL_SIZE = "zeppelin.scheduler.threadpool.size"; +} diff --git a/zeppelin-common/src/test/java/org/apache/zeppelin/common/ConfigTimeUtilsTest.java b/zeppelin-common/src/test/java/org/apache/zeppelin/common/ConfigTimeUtilsTest.java new file mode 100644 index 00000000000..94563de9300 --- /dev/null +++ b/zeppelin-common/src/test/java/org/apache/zeppelin/common/ConfigTimeUtilsTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.common; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class ConfigTimeUtilsTest { + + @Test + void plainNumberReturnedAsMillis() { + assertEquals(60000L, ConfigTimeUtils.parseTimeValueToMillis("60000")); + } + + @Test + void msSuffixParsed() { + assertEquals(500L, ConfigTimeUtils.parseTimeValueToMillis("500ms")); + } + + @Test + void hourUnitParsed() { + assertEquals(3600000L, ConfigTimeUtils.parseTimeValueToMillis("1H")); + } + + @Test + void minuteUnitParsed() { + assertEquals(1800000L, ConfigTimeUtils.parseTimeValueToMillis("30M")); + } + + @Test + void secondUnitParsed() { + assertEquals(10000L, ConfigTimeUtils.parseTimeValueToMillis("10S")); + } + + @Test + void compoundDurationParsed() { + assertEquals(5400000L, ConfigTimeUtils.parseTimeValueToMillis("1H30M")); + } + + @Test + void defaultCheckIntervalCompatible() { + assertEquals(60000L, ConfigTimeUtils.parseTimeValueToMillis("60000")); + } + + @Test + void defaultThresholdCompatible() { + assertEquals(3600000L, ConfigTimeUtils.parseTimeValueToMillis("3600000")); + } + + @Test + void nullInputThrows() { + assertThrows(IllegalArgumentException.class, + () -> ConfigTimeUtils.parseTimeValueToMillis(null)); + } + + @Test + void emptyInputThrows() { + assertThrows(IllegalArgumentException.class, + () -> ConfigTimeUtils.parseTimeValueToMillis("")); + } + + @Test + void invalidFormatThrows() { + assertThrows(Exception.class, + () -> ConfigTimeUtils.parseTimeValueToMillis("abc")); + } +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java index 75c96580771..6d505df464e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java @@ -18,13 +18,14 @@ package org.apache.zeppelin.interpreter.lifecycle; import org.apache.thrift.TException; +import org.apache.zeppelin.common.ConfigTimeUtils; +import org.apache.zeppelin.common.InterpreterConfigKeys; import org.apache.zeppelin.interpreter.LifecycleManager; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer; import org.apache.zeppelin.scheduler.ExecutorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.Duration; import java.util.Properties; import java.util.concurrent.ScheduledExecutorService; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -50,11 +51,11 @@ public class TimeoutLifecycleManager extends LifecycleManager { public TimeoutLifecycleManager(Properties properties, RemoteInterpreterServer remoteInterpreterServer) { super(properties, remoteInterpreterServer); - long checkInterval = parseTimeValue(properties.getProperty( - "zeppelin.interpreter.lifecyclemanager.timeout.checkinterval", - String.valueOf(DEFAULT_CHECK_INTERVAL))); - long timeoutThreshold = parseTimeValue(properties.getProperty( - "zeppelin.interpreter.lifecyclemanager.timeout.threshold", + long checkInterval = ConfigTimeUtils.parseTimeValueToMillis(properties.getProperty( + InterpreterConfigKeys.INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL, + String.valueOf(DEFAULT_CHECK_INTERVAL))); + long timeoutThreshold = ConfigTimeUtils.parseTimeValueToMillis(properties.getProperty( + InterpreterConfigKeys.INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD, String.valueOf(DEFAULT_TIMEOUT_THRESHOLD))); ScheduledExecutorService checkScheduler = ExecutorFactory.singleton() .createOrGetScheduled("TimeoutLifecycleManager", 1); @@ -74,17 +75,6 @@ public TimeoutLifecycleManager(Properties properties, timeoutThreshold); } - static long parseTimeValue(String value) { - try { - return Long.parseLong(value); - } catch (NumberFormatException e) { - if (value.endsWith("ms")) { - return Long.parseLong(value.substring(0, value.length() - 2)); - } - return Duration.parse("PT" + value).toMillis(); - } - } - @Override public void onInterpreterProcessStarted(String interpreterGroupId) { LOGGER.info("Interpreter process: {} is started", interpreterGroupId); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index e733fd57f8c..bf3baa2605c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -35,6 +35,7 @@ import org.apache.zeppelin.helium.ApplicationLoader; import org.apache.zeppelin.helium.HeliumAppAngularObjectRegistry; import org.apache.zeppelin.helium.HeliumPackage; +import org.apache.zeppelin.common.InterpreterConfigKeys; import org.apache.zeppelin.interpreter.Constants; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -211,7 +212,7 @@ public void init(Map properties) throws InterpreterRPCException, if (!isTest) { int connectionPoolSize = Integer.parseInt( - zProperties.getProperty("zeppelin.interpreter.connection.poolsize", "100")); + zProperties.getProperty(InterpreterConfigKeys.INTERPRETER_CONNECTION_POOL_SIZE, "100")); LOGGER.info("Creating RemoteInterpreterEventClient with connection pool size: {}", connectionPoolSize); intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort, @@ -268,8 +269,8 @@ public boolean isRunning() { private LifecycleManager createLifecycleManager() throws Exception { String lifecycleManagerClass = zProperties.getProperty( - "zeppelin.interpreter.lifecyclemanager.class", - "org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager"); + InterpreterConfigKeys.INTERPRETER_LIFECYCLE_MANAGER_CLASS, + "org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager"); Class clazz = Class.forName(lifecycleManagerClass); LOGGER.info("Creating interpreter lifecycle manager: {}", lifecycleManagerClass); return (LifecycleManager) clazz.getConstructor(Properties.class, RemoteInterpreterServer.class) @@ -335,10 +336,10 @@ public void createInterpreter(String interpreterGroupId, String sessionId, Strin } depLoader = new DependencyResolver(localRepoPath, - zProperties.getProperty("zeppelin.proxy.url"), - zProperties.getProperty("zeppelin.proxy.user"), - zProperties.getProperty("zeppelin.proxy.password"), - zProperties.getProperty("zeppelin.interpreter.dep.mvnRepo")); + zProperties.getProperty(InterpreterConfigKeys.PROXY_URL), + zProperties.getProperty(InterpreterConfigKeys.PROXY_USER), + zProperties.getProperty(InterpreterConfigKeys.PROXY_PASSWORD), + zProperties.getProperty(InterpreterConfigKeys.INTERPRETER_DEP_MVNREPO)); appLoader = new ApplicationLoader(resourcePool, depLoader); resultCacheInSeconds = @@ -486,7 +487,7 @@ public void reconnect(String host, int port) throws InterpreterRPCException, TEx this.intpEventServerHost = host; this.intpEventServerPort = port; intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort, - Integer.parseInt(zProperties.getProperty("zeppelin.interpreter.connection.poolsize", "100"))); + Integer.parseInt(zProperties.getProperty(InterpreterConfigKeys.INTERPRETER_CONNECTION_POOL_SIZE, "100"))); intpEventClient.setIntpGroupId(interpreterGroupId); this.angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), intpEventClient); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java index a4178704b59..371d1cfce5c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.scheduler; +import org.apache.zeppelin.common.InterpreterConfigKeys; import org.apache.zeppelin.util.ExecutorUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +60,7 @@ private static int getSchedulerPoolSize() { if (envValue != null) { return Integer.parseInt(envValue); } - String propValue = System.getProperty("zeppelin.scheduler.threadpool.size"); + String propValue = System.getProperty(InterpreterConfigKeys.SCHEDULER_THREADPOOL_SIZE); if (propValue != null) { return Integer.parseInt(propValue); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 3b9ebee0bad..4eb4074e2e7 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -48,6 +47,8 @@ import org.apache.commons.configuration2.io.FileLocationStrategy; import org.apache.commons.configuration2.tree.ImmutableNode; import org.apache.commons.lang3.StringUtils; +import org.apache.zeppelin.common.ConfigTimeUtils; +import org.apache.zeppelin.common.InterpreterConfigKeys; import org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager; import org.apache.zeppelin.util.Util; import org.slf4j.Logger; @@ -969,10 +970,9 @@ public enum ConfVars { ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_INTERPRETER_JUPYTER_KERNELS("zeppelin.interpreter.jupyter.kernels", "python:python,ir:r"), ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"), - ZEPPELIN_INTERPRETER_DEP_MVNREPO("zeppelin.interpreter.dep.mvnRepo", - "https://repo1.maven.org/maven2/"), + ZEPPELIN_INTERPRETER_DEP_MVNREPO(InterpreterConfigKeys.INTERPRETER_DEP_MVNREPO, "https://repo1.maven.org/maven2/"), ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 600000L), - ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE("zeppelin.interpreter.connection.poolsize", 100), + ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE(InterpreterConfigKeys.INTERPRETER_CONNECTION_POOL_SIZE, 100), ZEPPELIN_INTERPRETER_GROUP_DEFAULT("zeppelin.interpreter.group.default", "spark"), ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100), ZEPPELIN_INTERPRETER_INCLUDES("zeppelin.interpreter.include", ""), @@ -1073,17 +1073,17 @@ public enum ConfVars { ZEPPELIN_SERVER_RPC_PORTRANGE("zeppelin.server.rpc.portRange", ":"), ZEPPELIN_INTERPRETER_RPC_PORTRANGE("zeppelin.interpreter.rpc.portRange", ":"), - ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS("zeppelin.interpreter.lifecyclemanager.class", - NullLifecycleManager.class.getName()), + ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS( + InterpreterConfigKeys.INTERPRETER_LIFECYCLE_MANAGER_CLASS, NullLifecycleManager.class.getName()), ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL( - "zeppelin.interpreter.lifecyclemanager.timeout.checkinterval", 60000L), + InterpreterConfigKeys.INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL, 60000L), ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD( - "zeppelin.interpreter.lifecyclemanager.timeout.threshold", 3600000L), + InterpreterConfigKeys.INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD, 3600000L), ZEPPELIN_INTERPRETER_YARN_MONITOR_INTERVAL_SECS( "zeppelin.interpreter.yarn.monitor.interval_secs", 10), - ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE("zeppelin.scheduler.threadpool.size", 100), + ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE(InterpreterConfigKeys.SCHEDULER_THREADPOOL_SIZE, 100), ZEPPELIN_OWNER_ROLE("zeppelin.notebook.default.owner.username", ""), @@ -1120,9 +1120,9 @@ public enum ConfVars { ZEPPELIN_NOTEBOOK_CRON_ENABLE("zeppelin.notebook.cron.enable", false), ZEPPELIN_NOTEBOOK_CRON_FOLDERS("zeppelin.notebook.cron.folders", null), ZEPPELIN_NOTEBOOK_MARKDOWN_ESCAPE_HTML("zeppelin.notebook.markdown.escape.html", true), - ZEPPELIN_PROXY_URL("zeppelin.proxy.url", null), - ZEPPELIN_PROXY_USER("zeppelin.proxy.user", null), - ZEPPELIN_PROXY_PASSWORD("zeppelin.proxy.password", null), + ZEPPELIN_PROXY_URL(InterpreterConfigKeys.PROXY_URL, null), + ZEPPELIN_PROXY_USER(InterpreterConfigKeys.PROXY_USER, null), + ZEPPELIN_PROXY_PASSWORD(InterpreterConfigKeys.PROXY_PASSWORD, null), ZEPPELIN_SEARCH_ENABLE("zeppelin.search.enable", true), ZEPPELIN_SEARCH_INDEX_REBUILD("zeppelin.search.index.rebuild", false), ZEPPELIN_SEARCH_USE_DISK("zeppelin.search.use.disk", true), @@ -1221,13 +1221,13 @@ public boolean getBooleanValue() { } } + /** + * @deprecated Use {@link ConfigTimeUtils#parseTimeValueToMillis(String)} instead. + * Unlike this method, the replacement also accepts plain numeric strings (ms). + */ + @Deprecated public static long timeUnitToMill(String timeStrWithUnit) { - // If `timeStrWithUnit` doesn't include time unit, - // `Duration.parse` would fail to parse and throw Exception. - if (timeStrWithUnit.endsWith("ms")) { - return Long.parseLong(timeStrWithUnit.substring(0, timeStrWithUnit.length() - 2)); - } - return Duration.parse("PT" + timeStrWithUnit).toMillis(); + return ConfigTimeUtils.parseTimeValueToMillis(timeStrWithUnit); } }