Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,22 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this p

## [Unreleased]

TODO: add at least one Added, Changed, Deprecated, Removed, Fixed or Security section


## [0.8.0] 2025-09-10

### Added

- allow to cancel the workflow

### Fixed

- update cmempy user access token each time a workflow execution status is updated


## [0.7.0] 2025-09-04

### Added

- extendend documentation
Expand Down
89 changes: 89 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Essential Commands

This project uses **Task** (not Make) for build automation. All commands are defined in `Taskfile.yaml`:

- `task check` - Run all quality checks (lint, format, type-check, test)
- `task lint` - Run ruff linting
- `task format` - Auto-format code with ruff
- `task type-check` - Run mypy type checking
- `task test` - Run pytest unit tests
- `task test:integration` - Run integration tests (requires CMEM instance)
- `task build` - Build wheel package with poetry
- `task install` - Install package in development mode
- `task pre-commit` - Install pre-commit hooks

## Architecture Overview

### Core Components

**cmem_plugin_loopwf/task.py** - Main plugin implementation:
- `StartWorkflow` class extends `WorkflowPlugin` from cmem-plugin-base
- Takes input entities and starts sub-workflow for each entity
- `WorkflowExecution` dataclass manages individual workflow execution status
- `WorkflowExecutionList` handles queue-based execution with configurable parallelism
- Implements polling-based workflow status monitoring with async execution
- Supports both entity metadata and file content processing

**cmem_plugin_loopwf/workflow_type.py** - Workflow type definitions and utilities:
- Custom parameter types for workflow selection
- Dynamic value fetching and autocomplete integration

**cmem_plugin_loopwf/exceptions.py** - Custom exception classes:
- Plugin-specific error handling and exception types

### Key Patterns

- **Dataclass Configuration**: Uses `@dataclass` for parameter models with type hints
- **Decorator-Based Registration**: Plugins use `@Plugin` decorator from cmem-plugin-base
- **Autocomplete Integration**: Custom parameter types with dynamic value fetching
- **Async Workflow Execution**: Non-blocking workflow starts with status polling

### Project Structure

```
cmem_plugin_loopwf/
├── task.py # Main plugin logic
├── workflow_type.py # Workflow type definitions and utilities
├── exceptions.py # Custom exception classes
├── loopwf.svg # Plugin icon
└── __init__.py # Plugin registration
tests/ # Unit and integration tests
├── conftest.py # Test configuration
├── test_task.py # Task plugin tests
├── test_discovery.py # Plugin discovery tests
├── test_workflow_type.py # Workflow type tests
├── test_workflow_execution_list.py # Execution list tests
└── utils.py # Test utilities
```

## Development Workflow

### Dependencies
- **Poetry** for package management (not pip/requirements.txt)
- **Ruff** for linting and formatting (replaces black/isort/flake8)
- **MyPy** for type checking
- **Pre-commit** hooks for quality gates

### Testing
- Unit tests in `tests/` directory using pytest
- Integration tests require running CMEM instance
- Test configuration in `pyproject.toml` under `[tool.pytest.ini_options]`

### Plugin Development
- Extend `WorkflowPlugin` base class from cmem-plugin-base
- Use type hints extensively - this is a strongly typed codebase
- Follow the existing parameter model pattern with dataclasses
- All plugins must be registered in `__init__.py`

### CMEM Integration
- Plugin outputs must be compatible with CMEM's RDF/entity model
- Uses `cmempy` library for CMEM API interactions (config, get_json, execute_workflow_io, get_workflows_io)
- Authentication handled through user context propagation via `setup_cmempy_user_access`
- All operations are project-scoped within CMEM instance
- Supports async workflow execution via `/api/workflow/executeAsync` endpoint
- Entity conversion to JSON for workflow input via replaceable datasets
- File processing support with configurable MIME types for file-to-workflow scenarios
19 changes: 16 additions & 3 deletions cmem_plugin_loopwf/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ def wait_until_finished(self) -> None:

def update(self) -> None:
"""Update the execution status"""
if self.execution_context:
setup_cmempy_user_access(context=self.execution_context.user)
response = get_json(
f"{config.get_di_api_endpoint()}/workspace/activities/status",
params={
Expand Down Expand Up @@ -189,11 +191,13 @@ def __init__(self):

def execute(self, parallel_execution: int) -> None:
"""Execute all workflow executions"""
while self.queued > 0:
while self.running < parallel_execution and self.queued > 0:
while self.queued > 0 and not self.is_canceling():
while self.running < parallel_execution and self.queued > 0 and not self.is_canceling():
self.start_next()
self.report()
self.wait_until_finished()
if self.is_canceling():
self.logger.info("Execution canceled - stopping workflow processing")
self.report()

def start_next(self) -> bool:
Expand All @@ -206,9 +210,11 @@ def start_next(self) -> bool:

def wait_until_finished(self, polling_time: int = 1) -> None:
"""Wait until all running workflows are finished"""
while self.running > 0:
while self.running > 0 and not self.is_canceling():
sleep(polling_time)
self.update_running_status()
if self.is_canceling():
self.logger.info("Cancellation detected during polling - stopping workflow monitoring")

def update_running_status(self) -> None:
"""Update status of running workflows"""
Expand Down Expand Up @@ -247,6 +253,13 @@ def queued(self) -> int:
"""Returns the number of queued workflows"""
return len([_ for _ in self.statuses if _.is_queued])

def is_canceling(self) -> bool:
"""Check if the workflow execution context is in canceling state"""
if self.context and hasattr(self.context, "workflow") and self.context.workflow:
status = self.context.workflow.status()
return str(status) == "Canceling"
return False


@Plugin(
label="Start Workflow per Entity",
Expand Down
Loading