diff --git a/org.knime.python3.scripting.nodes/src/main/java/org/knime/python3/scripting/nodes2/PythonScriptNodeModel.java b/org.knime.python3.scripting.nodes/src/main/java/org/knime/python3/scripting/nodes2/PythonScriptNodeModel.java index df41f6ed9..22c597c39 100644 --- a/org.knime.python3.scripting.nodes/src/main/java/org/knime/python3/scripting/nodes2/PythonScriptNodeModel.java +++ b/org.knime.python3.scripting.nodes/src/main/java/org/knime/python3/scripting/nodes2/PythonScriptNodeModel.java @@ -232,7 +232,7 @@ protected PortObject[] execute(final PortObject[] inObjects, final ExecutionCont /** * Throw a nicer error message if the exception we are seeing is an "Error while obtaining a new communication - * channel" + * channel" or "Cannot obtain a new communication channel" (CallbackClient already shut down) * * @param ex The exception * @throws KNIMEException A more human-readable exception @@ -245,6 +245,17 @@ private void handleNewCommunicationChannelError(final Py4JException ex) throws K "The Python process we prepared in the background got killed. Try again to start a new one."); throw KNIMEException.of(messageBuilder.build().orElseThrow(), ex); } + // Handle the case where the CallbackClient was already shut down (no cause) + if (ex.getCause() == null && ex.getMessage() != null + && ex.getMessage().startsWith("Cannot obtain a new communication channel")) { + var messageBuilder = createMessageBuilder(); + messageBuilder.withSummary("The Python communication channel was unexpectedly shut down before use. " + + "Details: " + ex.getMessage()); + messageBuilder.addResolutions("This is a known intermittent issue. Please try executing again.", + "Check the KNIME log for entries around this error for more information about " + + "what shut down the communication channel."); + throw KNIMEException.of(messageBuilder.build().orElseThrow(), ex); + } } private void runUserScript(final PythonScriptingSession session) diff --git a/org.knime.python3.scripting.nodes/src/main/java/org/knime/python3/scripting/nodes2/PythonScriptingSession.java b/org.knime.python3.scripting.nodes/src/main/java/org/knime/python3/scripting/nodes2/PythonScriptingSession.java index 0929ca279..b333c9a55 100644 --- a/org.knime.python3.scripting.nodes/src/main/java/org/knime/python3/scripting/nodes2/PythonScriptingSession.java +++ b/org.knime.python3.scripting.nodes/src/main/java/org/knime/python3/scripting/nodes2/PythonScriptingSession.java @@ -161,6 +161,8 @@ final class PythonScriptingSession implements AsynchronousCloseable m_fileStoreHandlerSupplier = fileStoreHandlerSupplier; m_gateway = createGateway(pythonCommand); m_entryPoint = m_gateway.getEntryPoint(); + LOGGER.infoWithFormat("PythonScriptingSession created gateway (hash=%s) on thread '%s'", + Integer.toHexString(System.identityHashCode(m_gateway)), Thread.currentThread().getName()); m_tableConverter = new PythonArrowTableConverter(EXECUTOR_SERVICE, ARROW_STORE_FACTORY, fileStoreHandlerSupplier.getWriteFileStoreHandler()); m_outputRedirector = PythonGatewayUtils.redirectGatewayOutput(m_gateway, LOGGER::info, LOGGER::info); @@ -495,6 +497,13 @@ private void setCurrentWorkingDirectory() { Optional.ofNullable(workflowDirRef).map(r -> r.getFile().toString()) .ifPresent(m_entryPoint::setCurrentWorkingDirectory); } catch (Py4JException ex) { + // Log detailed diagnostics if the CallbackClient was already shut down + if (ex.getCause() == null && ex.getMessage() != null + && ex.getMessage().startsWith("Cannot obtain a new communication channel")) { + LOGGER.error("CallbackClient was shut down before first use. " + "Gateway hash=" + + Integer.toHexString(System.identityHashCode(m_gateway)) + ", thread='" + + Thread.currentThread().getName() + "'", ex); + } PythonProcessTerminatedException.throwIfTerminated(m_gateway, ex); throw ex; } catch (Exception ex) { // NOSONAR: We want to catch any exception here diff --git a/org.knime.python3/src/main/java/org/knime/python3/DefaultPythonGateway.java b/org.knime.python3/src/main/java/org/knime/python3/DefaultPythonGateway.java index c4fd6d206..eabc0eebe 100644 --- a/org.knime.python3/src/main/java/org/knime/python3/DefaultPythonGateway.java +++ b/org.knime.python3/src/main/java/org/knime/python3/DefaultPythonGateway.java @@ -327,6 +327,8 @@ public T getEntryPoint() { @Override public void close() throws IOException { if (m_clientServer != null) { + LOGGER.infoWithFormat("Closing PythonGateway (PID=%s) from thread '%s'", m_pid, + Thread.currentThread().getName()); m_entryPoint = null; m_clientServer.shutdown(); m_clientServer = null; diff --git a/org.knime.python3/src/main/java/org/knime/python3/PythonGatewayCreationGate.java b/org.knime.python3/src/main/java/org/knime/python3/PythonGatewayCreationGate.java index 62e507790..7b4a3541c 100644 --- a/org.knime.python3/src/main/java/org/knime/python3/PythonGatewayCreationGate.java +++ b/org.knime.python3/src/main/java/org/knime/python3/PythonGatewayCreationGate.java @@ -182,18 +182,21 @@ public void notify(final EventObject o) { if (o instanceof PhaseEvent && ((PhaseEvent)o).getPhaseId().equals(PhaseSetFactory.PHASE_INSTALL) && ((PhaseEvent)o).getType() == PhaseEvent.TYPE_START) { // lock if we enter the "install" phase - LOGGER.info("Blocking Python process startup during installation"); + LOGGER.info("Blocking Python process startup during installation (thread='" + + Thread.currentThread().getName() + "')"); INSTANCE.blockPythonCreation(); } else if (o instanceof PhaseEvent && ((PhaseEvent)o).getPhaseId().equals(PhaseSetFactory.PHASE_CONFIGURE) && ((PhaseEvent)o).getType() == PhaseEvent.TYPE_START) { // "configure" is the normal phase after install, so we can unlock Python processes again - LOGGER.info("Allowing Python process startup again after installation"); + LOGGER.info("Allowing Python process startup again after installation (thread='" + + Thread.currentThread().getName() + "')"); INSTANCE.allowPythonCreation(); } else if (o instanceof RollbackOperationEvent && !INSTANCE.isPythonGatewayCreationAllowed()) { // According to org.eclipse.equinox.internal.p2.engine.Engine.perform() -> L92, // a RollbackOperationEvent will be fired if an operation failed, and this event is only fired in that case, // so we unlock if we are currently locked. - LOGGER.info("Allowing Python process startup again after installation failed"); + LOGGER.info("Allowing Python process startup again after installation failed (thread='" + + Thread.currentThread().getName() + "')"); INSTANCE.allowPythonCreation(); } } diff --git a/org.knime.python3/src/main/java/org/knime/python3/PythonGatewayTracker.java b/org.knime.python3/src/main/java/org/knime/python3/PythonGatewayTracker.java index c9d18923d..63d5782ae 100644 --- a/org.knime.python3/src/main/java/org/knime/python3/PythonGatewayTracker.java +++ b/org.knime.python3/src/main/java/org/knime/python3/PythonGatewayTracker.java @@ -111,9 +111,11 @@ void clear() throws IOException { return; } - LOGGER.error("Found running Python processes. Aborting them to allow installation process. " - + "If this leads to failures in node execution, " - + "please restart those nodes once the installation has finished"); + LOGGER.errorWithFormat( + "Found running Python processes (%d). Aborting them to allow installation process. " + + "If this leads to failures in node execution, " + + "please restart those nodes once the installation has finished. Triggered from thread '%s'.", + m_openGateways.size(), Thread.currentThread().getName()); var exceptions = new ArrayList(); for (var gateway : m_openGateways) { diff --git a/org.knime.python3/src/main/java/org/knime/python3/QueuedPythonGatewayFactory.java b/org.knime.python3/src/main/java/org/knime/python3/QueuedPythonGatewayFactory.java index e0b2965c8..402383dd2 100644 --- a/org.knime.python3/src/main/java/org/knime/python3/QueuedPythonGatewayFactory.java +++ b/org.knime.python3/src/main/java/org/knime/python3/QueuedPythonGatewayFactory.java @@ -217,11 +217,13 @@ public void onPythonGatewayCreationGateOpen() { @Override public void onPythonGatewayCreationGateClose() { - evictGateways(// - m_gateways.values().stream()// - .flatMap(Collection::stream)// - .collect(Collectors.toList())// - ); + var gatewaysToEvict = m_gateways.values().stream()// + .flatMap(Collection::stream)// + .collect(Collectors.toList()); + LOGGER.warnWithFormat( + "PythonGatewayCreationGate closed: evicting %d queued gateways from thread '%s'", + gatewaysToEvict.size(), Thread.currentThread().getName()); + evictGateways(gatewaysToEvict); } }); }