diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index 8bf794062ef45..6c38c62040684 100755 --- a/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -2153,4 +2153,18 @@ public static Collection emptyIfNull(@Nullable Collection col) { public static Map emptyIfNull(@Nullable Map map) { return map == null ? Collections.emptyMap() : map; } + + /** + * @param arr Array. + * @param el Element. + * @return Element index or {@code -1} if not found. + */ + public static int indexOf(T[] arr, T el) { + for (int i = 0; i < arr.length; i++) { + if (Objects.equals(arr[i], el)) + return i; + } + + return -1; + } } diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java index d7ed2b5124613..d60fe72b6d98d 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java @@ -22,6 +22,7 @@ import org.apache.ignite.util.CacheMetricsCommandTest; import org.apache.ignite.util.CdcCommandTest; import org.apache.ignite.util.CdcResendCommandTest; +import org.apache.ignite.util.GridCommandHandlerClassPathTest; import org.apache.ignite.util.GridCommandHandlerConsistencyBinaryTest; import org.apache.ignite.util.GridCommandHandlerConsistencyCountersTest; import org.apache.ignite.util.GridCommandHandlerConsistencyRepairCorrectnessAtomicTest; @@ -77,7 +78,8 @@ SecurityCommandHandlerPermissionsTest.class, - IdleVerifyDumpTest.class + IdleVerifyDumpTest.class, + GridCommandHandlerClassPathTest.class }) public class IgniteControlUtilityTestSuite2 { } diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClassPathTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClassPathTest.java new file mode 100644 index 0000000000000..6b3cfad39d3ce --- /dev/null +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClassPathTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.util; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.stream.Collectors; +import org.junit.Test; + +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; + +/** + * Test for --classpath command. + */ +public class GridCommandHandlerClassPathTest extends GridCommandHandlerAbstractTest { + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + cleanPersistenceDir(); + + startGrids(2); + + super.beforeTestsStarted(); + } + + /** Tests --create command. */ + @Test + public void testCreate() throws Exception { + String jars = Files.list(Path.of(getClass().getClassLoader().getResource(".").getPath() + "../")) + .map(Path::toAbsolutePath) + .map(Path::toString) + .filter(f -> f.endsWith("jar")) + .collect(Collectors.joining(",")); + + Path dir = Path.of(getClass().getClassLoader().getResource(".").getPath() + "../../../core/target"); + + System.out.println("dir = " + dir); + + String coreJars = Files.list(dir) + .map(Path::toAbsolutePath) + .map(Path::toString) + .filter(f -> f.endsWith("jar")) + .collect(Collectors.joining(",")); + + jars += "," + coreJars; + + injectTestSystemOut(); + + final TestCommandHandler hnd = newCommandHandler(createTestLogger()); + + assertEquals(EXIT_CODE_OK, execute(hnd, "--class-path", "create", "--name", "mysuperapp", "--files", jars)); + + String outStr = testOut.toString(); + + stopAllGrids(); + + System.out.println(outStr); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 742105bf4da59..e9a0a1a14afd3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.cache.query.index.IndexProcessor; import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor; +import org.apache.ignite.internal.classpath.ClassPathProcessor; import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager; import org.apache.ignite.internal.managers.collision.GridCollisionManager; import org.apache.ignite.internal.managers.communication.GridIoManager; @@ -648,6 +649,11 @@ public interface GridKernalContext extends Iterable { */ public RollingUpgradeProcessor rollingUpgrade(); + /** + * @return Class path processor. + */ + public ClassPathProcessor classPath(); + /** * Executor that is in charge of processing user async continuations. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 2a7c7f66afa67..831d2a71b6f74 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.binary.GridBinaryMarshaller; import org.apache.ignite.internal.cache.query.index.IndexProcessor; import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor; +import org.apache.ignite.internal.classpath.ClassPathProcessor; import org.apache.ignite.internal.maintenance.MaintenanceProcessor; import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager; import org.apache.ignite.internal.managers.collision.GridCollisionManager; @@ -367,6 +368,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable @GridToStringExclude private RollingUpgradeProcessor rollUpProc; + /** Classpath processor. */ + @GridToStringExclude + private ClassPathProcessor classPathProc; + /** */ private Thread.UncaughtExceptionHandler hnd; @@ -603,6 +608,8 @@ else if (comp instanceof PerformanceStatisticsProcessor) perfStatProc = (PerformanceStatisticsProcessor)comp; else if (comp instanceof RollingUpgradeProcessor) rollUpProc = (RollingUpgradeProcessor)comp; + else if (comp instanceof ClassPathProcessor) + classPathProc = (ClassPathProcessor)comp; else if (comp instanceof IndexProcessor) indexProc = (IndexProcessor)comp; else if (!(comp instanceof DiscoveryNodeValidationProcessor @@ -1117,6 +1124,11 @@ public void recoveryMode(boolean recoveryMode) { return rollUpProc; } + /** {@inheritDoc} */ + @Override public ClassPathProcessor classPath() { + return classPathProc; + } + /** {@inheritDoc} */ @Override public Executor getAsyncContinuationExecutor() { return config().getAsyncContinuationExecutor() == null diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index ebaa9d0bbabcd..dd5176f9cb8e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -90,6 +90,7 @@ import org.apache.ignite.internal.binary.BinaryUtils; import org.apache.ignite.internal.cache.query.index.IndexProcessor; import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor; +import org.apache.ignite.internal.classpath.ClassPathProcessor; import org.apache.ignite.internal.cluster.ClusterGroupAdapter; import org.apache.ignite.internal.cluster.IgniteClusterEx; import org.apache.ignite.internal.maintenance.MaintenanceProcessor; @@ -1018,6 +1019,7 @@ public void start( // Start the encryption manager after assigning the discovery manager to context, so it will be // able to register custom event listener. startManager(new GridEncryptionManager(ctx)); + startProcessor(new ClassPathProcessor(ctx)); startProcessor(new PdsConsistentIdProcessor(ctx)); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathDeployToAllRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathDeployToAllRequest.java new file mode 100644 index 0000000000000..dcf75b91eb177 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathDeployToAllRequest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.classpath; + +import java.io.Serializable; +import java.util.UUID; +import org.apache.ignite.internal.util.distributed.DistributedProcess; + +/** + * Class path deploy to all request for {@link DistributedProcess} initiate message. + */ +public class ClassPathDeployToAllRequest implements Serializable { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Ignite class path id. */ + final UUID icpId; + + /** */ + public ClassPathDeployToAllRequest(UUID icpId) { + this.icpId = icpId; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathDeployToAllResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathDeployToAllResponse.java new file mode 100644 index 0000000000000..0bab548ff6945 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathDeployToAllResponse.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.classpath; + +import java.io.Serializable; +import org.apache.ignite.internal.util.distributed.DistributedProcess; + +/** + * Class path deploy to all response for {@link DistributedProcess} initiate message. + */ +public class ClassPathDeployToAllResponse implements Serializable { +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathProcessor.java new file mode 100644 index 0000000000000..8b515fc8a5137 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathProcessor.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.classpath; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; +import org.apache.ignite.internal.util.distributed.DistributedProcess; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.classpath.IgniteClassPathState.CREATING; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.CLASSPATH_DEPLOY_TO_ALL; + +/** + * TODO: + * 1. How to check data integrity on start? + * Do we want to do this for txt file or for jar only? + * 2. Check and remove obsolete icp from dist on start. + * Do we want to have some flag to skip remove in this case? (if we preparing for ICP registration). + * 3. Should we include CP into snapshots and dumps? + */ +public class ClassPathProcessor extends GridProcessorAdapter { + /** Prefix for metastorage keys. */ + public static final String METASTORE_PREFIX = "classpath."; + + /** Distribute process that distributes new Ignite class path across all server nodes. */ + private final DistributedProcess deployToAllProc; + + /** */ + private final Map> deployToAllFuts = new ConcurrentHashMap<>(); + + /** + * @param ctx Kernal context. + */ + public ClassPathProcessor(GridKernalContext ctx) { + super(ctx); + + deployToAllProc = new DistributedProcess<>( + ctx, + CLASSPATH_DEPLOY_TO_ALL, + this::startDeployToAllProcess, + this::processDeployToAllResult + ); + } + + /** + * Register new classpath in metastorage it same name not exists. + * Fails if exists. + * + * @param name Class path name. + * @param files Files included. + * @param lengths Files lengths. + * @return Class path id. + */ + public UUID startCreation(String name, String[] files, long[] lengths) { + assert files.length == lengths.length : "wrong arrays lengths"; + + A.ensure(U.alphanumericUnderscore(name), "Classpath name must satisfy the following name pattern: a-zA-Z0-9_"); + + IgniteClassPath icp = new IgniteClassPath(UUID.randomUUID(), name, files, lengths); + + toMetastorage(icp, null); + + try { + NodeFileTree ft = ctx.pdsFolderResolver().fileTree(); + + File root = ft.classPath(name); + + NodeFileTree.mkdir(root, "Ignite Class Path root: " + name); + + log.info("New classpath registered [root = " + root + ", icp=" + icp + ']'); + + return icp.id(); + } + catch (Exception e) { + try { + ctx.distributedMetastorage().remove(metastorageKey(icp)); + } + catch (IgniteCheckedException ex) { + log.error("Can't remove metastorage key for IgniteClassPath: " + metastorageKey(icp), e); + } + + throw e; + } + } + + /** + * @param icpID ClassPath ID. + * @param name File name. + * @param offset Offset to write data to. + * @param bytesCnt Bytes count in batch to write. + * @param batch Batch. + */ + public void uploadFilePart( + UUID icpID, + String name, + long offset, + int bytesCnt, + byte[] batch + ) throws IOException { + try { + IgniteClassPath icp = fromMetastorage(icpID); + + if (F.indexOf(icp.files(), name) == -1) + throw new IllegalArgumentException("Unknown lib [icp=" + icp.name() + ", unknown_lib=" + name + ']'); + + File lib = new File(ctx.pdsFolderResolver().fileTree().classPath(icp.name()), name); + + if (offset == 0) { + log.info("Creating new classpath file: " + lib); + + if (!lib.createNewFile()) + throw new IgniteException("File exists: " + lib); + } + + try (RandomAccessFile raf = new RandomAccessFile(lib, "rw")) { + if (raf.length() < offset) { + throw new IgniteException("Wrong offset [icp=" + icp.name() + ", lib=" + name + ", " + + "fileLength=" + raf.length() + ", offset=" + offset + ']'); + } + + raf.seek(offset); + raf.write(batch, 0, bytesCnt); + } + } + catch (Exception e) { + log.error("UploadFilePart:", e); + + throw e; + } + } + + /** + * @param icpId ClassPath ID. + * @return + */ + public IgniteInternalFuture distributeToAllNodes(UUID icpId) { + GridFutureAdapter fut = new GridFutureAdapter<>(); + + synchronized (this) { + IgniteClassPath icp = fromMetastorage(icpId); + + ClassPathDeployToAllRequest req = new ClassPathDeployToAllRequest(icpId); + + if (deployToAllFuts.put(icpId, fut) != null) + return new GridFinishedFuture<>(new IllegalStateException("Distribute to all process started, already: " + icp)); + + deployToAllProc.start(icpId, req); + } + + return fut; + } + + /** + * @param req Request on snapshot creation. + * @return Future which will be completed when a snapshot has been started. + */ + private IgniteInternalFuture startDeployToAllProcess(ClassPathDeployToAllRequest req) { + IgniteClassPath icp = fromMetastorage(req.icpId); + + if (deployToAllFuts.containsKey(req.icpId)) { + log.info("Upload node skip download [icp=" + icp + ']'); + + return new GridFinishedFuture<>(new ClassPathDeployToAllResponse()); + } + + log.info("Starting download new classpath [icp=" + icp + ']'); + + return new GridFinishedFuture<>(new ClassPathDeployToAllResponse()); + } + + /** + * @param id Request id. + * @param res Results. + * @param err Errors. + */ + private void processDeployToAllResult(UUID id, Map res, Map err) { + GridFutureAdapter fut = deployToAllFuts.remove(id); + + // Only upload node manage the process. + if (fut == null) { + if (log.isDebugEnabled()) + log.debug("Unknown distribute process [id=" + id + ']'); + + return; + } + + IgniteClassPath icp = fromMetastorage(id); + + // TODO: check this exception not failed all node. + if (!fut.onDone("OK")) { + throw new IllegalStateException("Distribute process in wrong state " + + "[canceled=" + fut.isCancelled() + ", failed=" + fut.isFailed() + ", done=" + fut.isDone() + ']'); + } + + icp.state(IgniteClassPathState.READY); + + log.info("Deploy to all DONE!"); + } + + /** + * @param icpID ClassPath ID. + * @return Class path. + */ + private IgniteClassPath fromMetastorage(UUID icpID) { + try { + IgniteClassPath[] icp = new IgniteClassPath[1]; + + ctx.distributedMetastorage().iterate(METASTORE_PREFIX, (key, icp0) -> { + if (icpID.equals(((IgniteClassPath)icp0).id())) + icp[0] = (IgniteClassPath)icp0; + }); + + if (icp[0] == null) + throw new IgniteException("ClassPath not found: " + icpID); + + if (icp[0].state() != CREATING) + throw new IgniteException("ClassPath in wrong state [expected=" + CREATING + ", status=" + icp[0].state() + ']'); + + return icp[0]; + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** */ + private void toMetastorage(IgniteClassPath icp, @Nullable IgniteClassPath prev) { + try { + if (!ctx.distributedMetastorage().compareAndSet(metastorageKey(icp), prev, icp)) + throw new IgniteException("Classpath alreay exists: " + icp.name()); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** */ + private static String metastorageKey(IgniteClassPath icp) { + return METASTORE_PREFIX + icp.name(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/classpath/IgniteClassPath.java b/modules/core/src/main/java/org/apache/ignite/internal/classpath/IgniteClassPath.java new file mode 100644 index 0000000000000..78c2127723976 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/classpath/IgniteClassPath.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.classpath; + +import java.io.Serializable; +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Class path POJO. + */ +public class IgniteClassPath implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private UUID id; + + /** */ + private String name; + + /** */ + private String[] files; + + /** */ + private long[] lengths; + + /** */ + private IgniteClassPathState state; + + /** + * @param id Unique id of classpath. + * @param name User provided name. + * @param files Files to include to classpath. + */ + public IgniteClassPath(UUID id, String name, String[] files, long[] lengths) { + this.id = id; + this.name = name; + this.files = files; + this.lengths = lengths; + this.state = IgniteClassPathState.CREATING; + } + + /** */ + public IgniteClassPathState state() { + return state; + } + + /** */ + public void state(IgniteClassPathState state) { + this.state = state; + } + + /** */ + public UUID id() { + return id; + } + + /** */ + public String name() { + return name; + } + + /** */ + public String[] files() { + return files; + } + + /** */ + public long[] lengths() { + return lengths; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteClassPath.class, this); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/classpath/IgniteClassPathState.java b/modules/core/src/main/java/org/apache/ignite/internal/classpath/IgniteClassPathState.java new file mode 100644 index 0000000000000..595bec09fc129 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/classpath/IgniteClassPathState.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.classpath; + +/** */ +public enum IgniteClassPathState { + /** Creationg process in progress. */ + CREATING, + + /** Ready for usage. */ + READY, + + /** Marked for removal. Newly started code can't use corresponding classpath. */ + REMOVING +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java index 112dc47c7125a..747dfd5867dff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java @@ -294,6 +294,9 @@ public enum ClientOperation { /** IgniteSet.iterator page. */ OP_SET_ITERATOR_GET_PAGE(9023), + /** File upload. */ + FILE_UPLOAD(9030), + /** Stop warmup. */ OP_STOP_WARMUP(10000); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java index b21eca8578160..21e8b45149260 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java @@ -18,12 +18,17 @@ package org.apache.ignite.internal.client.thin; import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.EventListener; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiFunction; @@ -55,6 +60,7 @@ import org.apache.ignite.client.events.ClientLifecycleEventListener; import org.apache.ignite.client.events.ClientStartEvent; import org.apache.ignite.client.events.ClientStopEvent; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.BinaryConfiguration; import org.apache.ignite.configuration.ClientConfiguration; import org.apache.ignite.configuration.ClientTransactionConfiguration; @@ -625,6 +631,52 @@ private void retrieveBinaryConfiguration(ClientConfiguration cfg) { marsh.setBinaryConfiguration(resCfg); } + /** + * @param node Node to upload file to. + * @param icpID Classpath ID. + * @param file File to upload. + */ + public void uploadClasspathFile(ClusterNode node, UUID icpID, Path file) throws IOException { + List uploadNode = Collections.singletonList(node.id()); + + String name = file.getFileName().toString(); + byte[] batch = new byte[(int)(U.MB)]; + + try (InputStream fis = Files.newInputStream(file)) { + long[] offset = new long[] {0}; + int[] bytesCnt = new int[1]; + + // We want to create empty file on the server side. + bytesCnt[0] = fis.read(batch); + + do { + ch.service( + ClientOperation.FILE_UPLOAD, + ch -> { + try (BinaryWriterEx w = BinaryUtils.writer(marsh.context(), ch.out(), null)) { + w.writeUuid(node.id()); + w.writeUuid(icpID); + w.writeString(name); + w.writeLong(offset[0]); + w.writeInt(bytesCnt[0]); + w.writeByteArray(batch); + } + }, + in -> null, + uploadNode // Request can still be sent to a random (default) node. + ); + + offset[0] += bytesCnt[0]; + bytesCnt[0] = fis.read(batch); + } + while (bytesCnt[0] > 0); + + // Check all data read and sent. + if (offset[0] != Files.size(file)) + throw new IOException("Can't read all data from file"); + } + } + /** * Thin client implementation of {@link BinaryMetadataHandler}. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/IgniteCommandRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/management/IgniteCommandRegistry.java index 1f28f1cfbd24c..ec61e61f34667 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/IgniteCommandRegistry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/IgniteCommandRegistry.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.management.cache.CacheCommand; import org.apache.ignite.internal.management.cdc.CdcCommand; import org.apache.ignite.internal.management.checkpoint.CheckpointCommand; +import org.apache.ignite.internal.management.classpath.ClassPathCommand; import org.apache.ignite.internal.management.consistency.ConsistencyCommand; import org.apache.ignite.internal.management.defragmentation.DefragmentationCommand; import org.apache.ignite.internal.management.diagnostic.DiagnosticCommand; @@ -77,7 +78,8 @@ public IgniteCommandRegistry() { new DefragmentationCommand(), new PerformanceStatisticsCommand(), new CdcCommand(), - new ConsistencyCommand() + new ConsistencyCommand(), + new ClassPathCommand() ); U.loadService(CommandsProvider.class).forEach(p -> p.commands().forEach(this::register)); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathCommand.java new file mode 100644 index 0000000000000..f297278661da7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathCommand.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.management.classpath; + +import org.apache.ignite.internal.management.api.CommandRegistryImpl; + +/** Command to manage IgniteClassPath. */ +public class ClassPathCommand extends CommandRegistryImpl { + /** */ + public ClassPathCommand() { + super( + new ClassPathCreateCommand() + ); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathCreateCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathCreateCommand.java new file mode 100644 index 0000000000000..e6d6458e7af78 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathCreateCommand.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.management.classpath; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.function.Consumer; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.client.thin.TcpIgniteClient; +import org.apache.ignite.internal.management.api.CommandUtils; +import org.apache.ignite.internal.management.api.NativeCommand; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class ClassPathCreateCommand implements NativeCommand { + /** {@inheritDoc} */ + @Override public String description() { + return "Create ClassPath instance from set of local file"; + } + + /** {@inheritDoc} */ + @Override public Class argClass() { + return ClassPathCreateCommandArg.class; + } + + /** {@inheritDoc} */ + @Override public Void execute( + @Nullable IgniteClient client, + @Nullable Ignite ignite, + ClassPathCreateCommandArg arg, + Consumer printer + ) throws Exception { + if (client != null) { + create(client, arg, printer); + + return null; + } + + throw new UnsupportedOperationException("Creating with the Ignite instance not supported at a time."); + } + + /** */ + private void create(@Nullable IgniteClient client, ClassPathCreateCommandArg arg, Consumer printer) throws Exception { + TcpIgniteClient cli = (TcpIgniteClient)client; + + List files = new ArrayList<>(arg.files.length); + long[] lengths = new long[arg.files.length]; + + for (int i = 0; i < arg.files.length; i++) { + Path f = Path.of(arg.files[i]); + + if (!Files.exists(f)) + throw new IllegalArgumentException("File not exists: " + f); + + files.add(f); + + lengths[i] = Files.size(f); + } + + arg.lengths = lengths; + // We don't want to send full path to server nodes. + // Server nodes require files names, only. + arg.files = fileNames(files); + + ClusterNode uploadNode = uploadNode(client); + + printer.accept("Upload node: " + uploadNode.id()); + + UUID icpID = CommandUtils.execute(client, null, ClassPathStartCreationTask.class, arg, Collections.singletonList(uploadNode)); + + printer.accept("New classpath registered [uploadNode=" + uploadNode.id() + ", name=" + arg.name + ", id=" + icpID.toString() + ']'); + printer.accept("Starting to upload files:"); + + // TODO: add pretty print here. + for (Path file : files) { + printer.accept(String.valueOf(file.toAbsolutePath())); + cli.uploadClasspathFile(uploadNode, icpID, file); + printer.accept("DONE"); + } + + CommandUtils.execute(client, null, ClassPathDistributeTask.class, icpID, Collections.singletonList(uploadNode)); + } + + /** */ + private static ClusterNode uploadNode(IgniteClient client) { + Collection nodes = CommandUtils.nodes(client, null); + + if (F.isEmpty(nodes)) + throw new IgniteException("Cluster empty"); + + Collection servers = CommandUtils.servers(nodes); + + if (F.isEmpty(servers)) + throw new IgniteException("No server nodes"); + + return F.first(servers); + } + + /** */ + private static String[] fileNames(List files) { + String[] fileNames = new String[files.size()]; + + for (int i = 0; i < fileNames.length; i++) + fileNames[i] = files.get(i).getFileName().toString(); + + return fileNames; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathCreateCommandArg.java b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathCreateCommandArg.java new file mode 100644 index 0000000000000..586654424a80d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathCreateCommandArg.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.management.classpath; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.dto.IgniteDataTransferObject; +import org.apache.ignite.internal.management.api.Argument; + +/** */ +public class ClassPathCreateCommandArg extends IgniteDataTransferObject { + /** */ + private static final long serialVersionUID = 0; + + /** */ + @Order(0) + @Argument(description = "Name of the classpath") + String name; + + /** */ + @Order(1) + @Argument(description = "Files to add to classpath") + String[] files; + + /** */ + @Order(2) + long[] lengths; + + /** */ + public String name() { + return name; + } + + /** */ + public void name(String name) { + this.name = name; + } + + /** */ + public String[] files() { + return files; + } + + /** */ + public void files(String[] files) { + this.files = files; + } + + /** */ + public long[] lengths() { + return lengths; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathDistributeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathDistributeTask.java new file mode 100644 index 0000000000000..06b117e480d70 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathDistributeTask.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.management.classpath; + +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.visor.VisorJob; +import org.apache.ignite.internal.visor.VisorOneNodeTask; +import org.jetbrains.annotations.Nullable; + +/** */ +public class ClassPathDistributeTask extends VisorOneNodeTask { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected VisorJob job(UUID arg) { + return new ClassPathDistributeJob(arg, debug); + } + + /** */ + private static class ClassPathDistributeJob extends VisorJob { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + protected ClassPathDistributeJob(@Nullable UUID arg, boolean debug) { + super(arg, debug); + } + + /** {@inheritDoc} */ + @Override protected Void run(@Nullable UUID arg) throws IgniteException { + IgniteInternalFuture fut = ignite.context().classPath().distributeToAllNodes(arg); + + try { + fut.get(); + + return null; + } + catch (IgniteCheckedException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathStartCreationTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathStartCreationTask.java new file mode 100644 index 0000000000000..c7c11a1c7fce8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathStartCreationTask.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.management.classpath; + +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.visor.VisorJob; +import org.apache.ignite.internal.visor.VisorOneNodeTask; +import org.jetbrains.annotations.Nullable; + +/** */ +public class ClassPathStartCreationTask extends VisorOneNodeTask { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected VisorJob job(ClassPathCreateCommandArg arg) { + return new ClassPathStartCreationJob(arg, debug); + } + + /** */ + private static class ClassPathStartCreationJob extends VisorJob { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + protected ClassPathStartCreationJob(@Nullable ClassPathCreateCommandArg arg, boolean debug) { + super(arg, debug); + } + + /** {@inheritDoc} */ + @Override protected UUID run(@Nullable ClassPathCreateCommandArg arg) throws IgniteException { + return ignite.context().classPath().startCreation(arg.name, arg.files, arg.lengths); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/NodeFileTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/NodeFileTree.java index 159eeb77db7c9..2100a9fe0d0f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/NodeFileTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/NodeFileTree.java @@ -295,6 +295,9 @@ public class NodeFileTree extends SharedFileTree { /** Path to the storage directory. */ private final File nodeStorage; + /** Path to the root classpath directory. */ + private final File icp; + /** * Key is the path from {@link DataStorageConfiguration#getExtraStoragePaths()}, may be relative. Value is storage. * @see DataStorageConfiguration#getExtraStoragePaths() @@ -334,6 +337,7 @@ public NodeFileTree(File root, String folderName) { this.folderName = folderName; binaryMeta = new File(binaryMetaRoot, folderName); + icp = new File(icpRoot, folderName); nodeStorage = rootRelative(DB_DIR); checkpoint = new File(nodeStorage, CHECKPOINT_DIR); wal = rootRelative(DFLT_WAL_PATH); @@ -382,6 +386,7 @@ protected NodeFileTree(IgniteConfiguration cfg, File root, String folderName, bo this.folderName = folderName; binaryMeta = new File(binaryMetaRoot, folderName); + icp = new File(icpRoot, folderName); DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration(); @@ -431,6 +436,11 @@ public File binaryMeta() { return binaryMeta; } + /** @return Root directory for Ignite class path files. */ + public File classPathRoot() { + return icp; + } + /** @return Path to the directory containing active WAL segments. */ public @Nullable File wal() { return wal; @@ -1082,6 +1092,14 @@ public File maintenanceFile() { return new File(nodeStorage, MAINTENANCE_FILE_NAME); } + /** + * @param name IgniteClassPath name. + * @return IgniteClassPath directory. + */ + public File classPath(String name) { + return new File(icp, name); + } + /** * @param includeMeta If {@code true} then include metadata directory into results. * @param filter Cache group names to filter. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/SharedFileTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/SharedFileTree.java index 33df1ed636040..199b9a00e38d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/SharedFileTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/SharedFileTree.java @@ -36,6 +36,7 @@ * ├── db ← db (shared between all local nodes). * │ ├── binary_meta ← binaryMetaRoot (shared between all local nodes). * │ ├── marshaller ← marshaller (shared between all local nodes). + * │ ├── classpath ← classpath (shared between all local nodes). * └── snapshots ← snpsRoot (shared between all local nodes). * * @@ -48,6 +49,9 @@ public class SharedFileTree { /** Name of marshaller mappings folder. */ public static final String MARSHALLER_DIR = "marshaller"; + /** Name of classpath folder. */ + public static final String CLASSPATH_DIR = "classpath"; + /** Database default folder. */ protected static final String DB_DIR = "db"; @@ -60,6 +64,9 @@ public class SharedFileTree { /** Path to the directory containing marshaller files. */ private final File marshaller; + /** Path to the directory containing classpath files. */ + protected final File icpRoot; + /** Path to the snapshot root directory. */ private final File snpsRoot; @@ -77,6 +84,7 @@ protected SharedFileTree(File root, String snpsRoot) { marshaller = Paths.get(rootStr, DB_DIR, MARSHALLER_DIR).toFile(); binaryMetaRoot = Paths.get(rootStr, DB_DIR, BINARY_METADATA_DIR).toFile(); + icpRoot = Paths.get(rootStr, DB_DIR, CLASSPATH_DIR).toFile(); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index 84af46b6e31d9..a14437e56ee9f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.binary.GridBinaryMarshaller; import org.apache.ignite.internal.cache.query.index.IndexProcessor; import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor; +import org.apache.ignite.internal.classpath.ClassPathProcessor; import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager; import org.apache.ignite.internal.managers.collision.GridCollisionManager; import org.apache.ignite.internal.managers.communication.GridIoManager; @@ -747,6 +748,11 @@ private void setField(IgniteEx kernal, String name, Object val) throws NoSuchFie return null; } + /** {@inheritDoc} */ + @Override public ClassPathProcessor classPath() { + return null; + } + /** {@inheritDoc} */ @Override public Executor getAsyncContinuationExecutor() { return null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java index d0f54ef2136ba..4d344fc3f1216 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java @@ -76,6 +76,7 @@ import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheScanQueryRequest; import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheSqlFieldsQueryRequest; import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheSqlQueryRequest; +import org.apache.ignite.internal.processors.platform.client.classpath.ClientFileUploadRequest; import org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterChangeStateRequest; import org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterGetDataCenterNodesRequest; import org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterGetStateRequest; @@ -410,6 +411,9 @@ public class ClientMessageParser implements ClientListenerMessageParser { /** Get service topology. */ private static final short OP_SERVICE_GET_TOPOLOGY = 7003; + /** File upload. */ + public static final short FILE_UPLOAD = 9030; + /** Operations that are performed before a node is joined to the topology. */ /** Stop warmup. */ private static final short OP_STOP_WARMUP = 10000; @@ -735,6 +739,9 @@ public ClientListenerRequest decode(BinaryReaderEx reader) { case OP_SERVICE_GET_TOPOLOGY: return new ClientServiceTopologyRequest(reader); + case FILE_UPLOAD: + return new ClientFileUploadRequest(reader); + case OP_STOP_WARMUP: return new ClientCacheStopWarmupRequest(reader); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/classpath/ClientFileUploadRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/classpath/ClientFileUploadRequest.java new file mode 100644 index 0000000000000..2546be36b18c8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/classpath/ClientFileUploadRequest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.client.classpath; + +import java.io.IOException; +import java.util.Collections; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.binary.BinaryRawReader; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridClosureCallMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; +import org.apache.ignite.internal.processors.platform.client.ClientRequest; +import org.apache.ignite.internal.processors.platform.client.ClientResponse; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.resources.IgniteInstanceResource; + +import static org.apache.ignite.internal.processors.task.TaskExecutionOptions.options; + +/** */ +public class ClientFileUploadRequest extends ClientRequest { + /** */ + private final FileUploadPartCallable data; + + /** + * Creates the file upload request. + * + * @param reader Reader. + */ + public ClientFileUploadRequest(BinaryRawReader reader) { + super(reader); + + data = new FileUploadPartCallable( + reader.readUuid(), + reader.readUuid(), + reader.readString(), + reader.readLong(), + reader.readInt(), + reader.readByteArray() + ); + } + + /** {@inheritDoc} */ + @Override public ClientResponse process(ClientConnectionContext ctx) { + // TODO: add backward compatibility support. + + try { + // Forward request to upload node. + if (!ctx.kernalContext().localNodeId().equals(data.uploadNodeId)) { + ClusterNode uploadNode = ctx.kernalContext().cluster().get().node(data.uploadNodeId); + + if (uploadNode == null) { + throw new IgniteException("Upload node not found [localNode=" + ctx.kernalContext().localNodeId() + + ", uploadNode=" + data.uploadNodeId + ", icp=" + data.name + ']'); + } + + ctx.kernalContext().closure().callAsync( + GridClosureCallMode.BALANCE, + data, + options(Collections.singletonList(uploadNode)).withFailoverDisabled() + ).get(); + + return new ClientResponse(requestId()); + } + + ctx.kernalContext().classPath().uploadFilePart(data.icpId, data.name, data.offset, data.bytesCnt, data.batch); + + return new ClientResponse(requestId()); + } + catch (IgniteCheckedException | IOException e) { + return new ClientResponse(requestId(), e.getMessage()); + } + } + + /** */ + private static class FileUploadPartCallable implements IgniteCallable { + /** */ + private static final long serialVersionUID = 0L; + + /** Upload node ID. */ + private final UUID uploadNodeId; + + /** ClassPath ID. */ + private final UUID icpId; + + /** File name. */ + private final String name; + + /** Offset to write data to. */ + private final long offset; + + /** Bytes count in batch to write. */ + private final int bytesCnt; + + /** Batch. */ + private final byte[] batch; + + /** */ + public FileUploadPartCallable(UUID uploadNodeId, UUID icpId, String name, long offset, int bytesCnt, byte[] batch) { + this.uploadNodeId = uploadNodeId; + this.icpId = icpId; + this.name = name; + this.offset = offset; + this.bytesCnt = bytesCnt; + this.batch = batch; + } + + /** Auto-injected grid instance. */ + @IgniteInstanceResource + private transient IgniteEx ignite; + + /** {@inheritDoc} */ + @Override public Void call() throws Exception { + UUID locNodeId = ignite.localNode().id(); + + assert uploadNodeId.equals(locNodeId) + : "Forwarded to wrong node [uploadNode=" + uploadNodeId + ", localNode=" + locNodeId + ']'; + + ignite.context().classPath().uploadFilePart(icpId, name, offset, bytesCnt, batch); + + return null; + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java index 8b1ed6955d256..573c27889f0a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java @@ -508,6 +508,11 @@ public enum DistributedProcessType { /** * Snapshot partitions validation. */ - CHECK_SNAPSHOT_PARTS + CHECK_SNAPSHOT_PARTS, + + /** + * Deploy new classpath to all nodes. + */ + CLASSPATH_DEPLOY_TO_ALL; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorOneNodeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorOneNodeTask.java index 505e615654d43..beb51c6a5d56e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorOneNodeTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorOneNodeTask.java @@ -31,6 +31,8 @@ public abstract class VisorOneNodeTask extends VisorMultiNodeTask @Override protected Collection jobNodes(VisorTaskArgument arg) { Collection nodes = super.jobNodes(arg); + System.out.println("nodes = " + nodes); + assert nodes.size() == 1 : nodes; return nodes;