Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1e0ccf7
ci: compile main sources in coverage_report job
sonus21 May 2, 2026
65dfd14
Merge branch 'master' of github.com:/sonus21/rqueue
sonus21 May 3, 2026
5186137
nats-web: implement pause / soft-delete admin ops and capability-awar…
sonus21 May 5, 2026
42eb61e
nats-web: capability-aware nav / charts and stream-based peek
sonus21 May 5, 2026
2c15390
nats-web: backend-aware data-type labels and Limits-aware queue size
sonus21 May 5, 2026
81b4470
nats-web: position-based pending estimate for Limits streams, render …
sonus21 May 5, 2026
0d040b1
nats-web: per-consumer pending breakdown for Limits-retention streams
sonus21 May 5, 2026
c2dbfc2
nats-web: consumer-level Subscribers + Terminal Storage redesign
sonus21 May 5, 2026
60f6f33
nats-web: queue-detail redesign — hero, chip strip, subscriber & term…
sonus21 May 5, 2026
a6a9e69
fix: reuse single consumer for workqueue streams
sonus21 May 5, 2026
a7a3054
Revert "fix: reuse single consumer for workqueue streams"
sonus21 May 5, 2026
4fdd11b
nats-web: tighten queue-detail layout, add play/pause action button
sonus21 May 5, 2026
21ecfb5
fix: use single consumer-name suffix in resolvedConsumerName
sonus21 May 5, 2026
7bcc30d
nats-web: consumer-aware peek for Limits-retention streams
sonus21 May 5, 2026
0a4eb5c
nats-web: peek from ackFloor, not delivered.streamSeq
sonus21 May 5, 2026
524fc5c
fix: ack/nack target wrong NATS Message under multi-consumer fan-out
sonus21 May 5, 2026
f83a821
nats: regression test for inFlight key-collision under fan-out
sonus21 May 5, 2026
ec99465
nats-web: align Pending column with explorer + add Workers column
sonus21 May 5, 2026
deb1040
nats: tests for consumer-aware peek + adapt QueueModeIT to new contract
sonus21 May 5, 2026
1fc9693
nats-web: keep Pending column as numPending (yet-to-deliver only)
sonus21 May 5, 2026
e523b6d
Merge branch 'master' of github.com:/sonus21/rqueue into nats-v2-web
sonus21 May 6, 2026
4c777f1
core: expose message-converter exception to middleware via Job
sonus21 May 7, 2026
4e20b75
web: fix tests for consumer-aware getExplorePageData + peek signatures
sonus21 May 7, 2026
de928e0
build: bump version to 4.0.0-RC6 and publish nats/redis/web modules
sonus21 May 7, 2026
dc99997
docs: changelog for 4.0.0 (date TBD)
sonus21 May 7, 2026
28194a9
docs: backfill RC3/RC4 changelog entries and trim 4.0.0 scope
sonus21 May 7, 2026
f6ee247
docs: split 4.0.0 entry into RC5 (web fixes + multi-consumer fix) and…
sonus21 May 7, 2026
11593ad
docs: correct RC3 changelog — Java 17 baseline (lowered from 21)
sonus21 May 7, 2026
ecd9070
Apply Palantir Java Format
github-actions[bot] May 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ ext {

subprojects {
group = "com.github.sonus21"
version = "4.0.0-RC4"
version = "4.0.0-RC6"

dependencies {
// https://mvnrepository.com/artifact/org.springframework/spring-messaging
Expand Down
100 changes: 100 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,106 @@ layout: default

All notable user-facing changes to this project are documented in this file.

## Release [4.0.0] TBD

{: .highlight}
First stable 4.0.0 release. Targets Spring Boot 4.x and Spring Framework 7.x on
Java 17 (lowered from the original Java 21 baseline in RC3). Promotes the RC6
line to GA — no functional changes versus RC6. See RC1 / RC2 below for the
foundational Spring Boot 4 and Jackson 3 migration notes; RC3 for the Java 17
baseline change; RC4–RC6 below for the NATS backend, broker SPI, dashboard
work, and middleware additions that build on top.

## Release [4.0.0.RC6] TBD

{: .highlight}
Release candidate.

### Features
* **Message-converter exception exposed to middleware** — `Job` now exposes
`getConversionException()` (and a `hasConversionException()` default) so
middleware can detect and react to inbound deserialization failures (route
to DLQ, alert, attempt a fallback decode) instead of being unable to
distinguish a converter error from a legitimately-String payload.

### Build
* `rqueue-nats`, `rqueue-redis`, and `rqueue-web` now publish to Maven Central
alongside `rqueue-core`, `rqueue-spring`, and `rqueue-spring-boot-starter`.

## Release [4.0.0.RC5] TBD

{: .highlight}
Release candidate. The two themes are a multi-consumer correctness fix on the
NATS backend and a NATS-aware dashboard built on a new pluggable broker SPI.

### Features
* **Pluggable broker SPI** — the queueing layer was separated from Redis behind
a `MessageBroker` SPI with a `Capabilities` model. The dashboard, explorer,
and admin paths adapt to backend capabilities (nav tabs, charts, data-type
labels, queue-size accounting) instead of assuming Redis primitives.
* **Consumer-aware peek** — added a consumer-aware `peek` overload on the
broker SPI. The dashboard explorer can browse a specific consumer's
outstanding messages on Limits-retention streams, skipping already-acked
ranges and reflecting per-consumer ack floors. Useful for fan-out topologies
where each durable has a different delivery position.
* **NATS-aware queue detail page** — redesigned queue detail with a hero panel,
chip strip, per-consumer Subscribers table (separate Pending and In-Flight
columns plus a Workers column), and a Terminal Storage card. Pending shows
yet-to-deliver count; In-Flight shows messages currently being processed.
Limits-retention queues render approximate sizes with a `~` prefix.
* **Pause / soft-delete admin ops for NATS queues** — operators can pause and
soft-delete NATS queues from the dashboard, with capability-gated controls
so unsupported actions do not appear on backends that cannot honour them.

### Fixes
* **NATS ack/nack under multi-consumer fan-out** — fixed an in-flight key
collision that could cause ack/nack to target the wrong NATS message when
multiple consumers were fanning out from the same stream.
* **Consumer-name resolution** — `resolvedConsumerName` now uses a single
consumer-name suffix, preventing duplicated suffixing under repeated lookups.
* **Peek base sequence** — NATS peek now bases on `ackFloor` rather than
`delivered.streamSeq`, so the explorer shows the correct un-acked tail
instead of skipping past acked-but-not-yet-deleted messages.

### Migration Notes
* Backends are now selected via the `MessageBroker` SPI. Existing Redis
applications continue to work without configuration changes — a Redis
broker is wired by default. Applications wanting NATS should add
`rqueue-nats` and configure a JetStream `MessageBroker` bean.
* The dashboard `/explore` API gained a `consumerName` query parameter
(nullable). Callers using the REST API directly should pass `null` to
preserve existing behaviour or a specific consumer name to scope the peek.

## Release [4.0.0.RC4] 14-Apr-2026

{: .highlight}
Release candidate. The headline change is the introduction of the NATS
JetStream backend.

### Features
* **NATS JetStream backend** — added a new `rqueue-nats` module that lets Rqueue
run on NATS JetStream as the message broker. Supports Limits-retention and
WorkQueue-retention streams, durable consumers, and ack/nack delivery
semantics. This is the initial drop; the broker SPI extraction and the
capability-aware dashboard land in RC5.

### CI / Build
* Coveralls integration fixed for GitHub Actions (token wiring, build-number
propagation, request-payload diagnostics).

## Release [4.0.0.RC3] 14-Apr-2026

{: .highlight}
Lowers the Java baseline from 21 back to 17.

### Changes
* **Java 17 baseline (was 21 in RC1/RC2)** — `languageVersion`,
`sourceCompatibility`, and `targetCompatibility` reverted from 21 to 17 so
the library can be consumed by applications still on Java 17. RC1's
"Java 21 baseline" note is superseded — 4.0.0 supports Java 17 and above.
* Documentation tweaks (header refresh, dependabot bump for `addressable`
in `/docs`).

## Release [4.0.0.RC2] 24-Mar-2026

{: .highlight}
Expand Down
17 changes: 10 additions & 7 deletions nats-task-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@ All v1 items are done and 360 unit tests pass. Branch `nats-backend` is ready to

## v2 pending items

### 1. Web dashboard — NATS gaps
### 1. Web dashboard — NATS gaps *(IN PROGRESS — pause/delete/explore landed)*

Controllers are no longer Redis-gated but several operations throw `BackendCapabilityException` (HTTP 501) on NATS. The front-end should hide unsupported panels proactively instead of relying on 501s.

- Expose `GET /rqueue/api/capabilities` returning the `Capabilities` record so the UI can conditionally hide panels.
- Extend `Capabilities` with dashboard-op flags: `supportsCharts`, `supportsMessageBrowse`, `supportsAdminMove`.
- Wire the flags into Pebble templates (scheduled panel, cron jobs panel, chart panel already have `hideScheduledPanel` / `hideCronJobs` hooks in `DataViewResponse`).
- ✅ `GET /rqueue/api/capabilities` already returns the `Capabilities` record so the UI can conditionally hide panels.
- ✅ `RqueueQDetailServiceImpl.getRunningTasks()` / `getScheduledTasks()` now return header-only tables on NATS instead of zero rows / 501s. Pending queue browsing routes through `MessageBroker.peek()`.
- ✅ `NatsRqueueUtilityService` implements `pauseUnpauseQueue` (persists flag + notifies local `RqueueMessageListenerContainer`), soft `deleteMessage` (KV metadata flag), `getDataType` (returns `"STREAM"`), `aggregateDataCounter`. 20 unit tests cover the path.
- ⏳ Pause-event multi-instance fan-out: `RqueueInternalPubSubChannel` is Redis-only. NATS bridge follow-up: subscribe to `rqueue.internal.<broker>` via `MessageBroker.subscribe/publish` and rebroadcast pause requests across worker JVMs.
- ⏳ Extend `Capabilities` with dashboard-op flags: `supportsCharts`, `supportsMessageBrowse`, `supportsAdminMove` (not yet — current flags suffice for the panels we hide today).
- ⏳ Pebble templates: `hideScheduledPanel` / `hideCronJobs` already wired into `DataViewResponse`. Front-end hides those panels; chart and message-browse hides still TBD.

Affected services that throw on NATS today:
- `RqueueDashboardChartServiceImpl` — time-series charts (no equivalent in JetStream)
- `RqueueUtilityServiceImpl` — move/enqueue admin ops
- `NatsMessageBrowsingRepository.viewData` — positional message browse
- `RqueueDashboardChartServiceImpl` — time-series charts (no equivalent in JetStream) — still pending
- `RqueueUtilityServiceImpl` — move/enqueue admin ops — `moveMessage`, `enqueueMessage`, `makeEmpty` deliberately remain `notSupported` (no JetStream primitive); `pauseUnpauseQueue` and `deleteMessage` now implemented
- `NatsMessageBrowsingRepository.viewData` — positional message browse (Redis-only by design)

### 2. Reactive listener container

Expand Down
11 changes: 9 additions & 2 deletions nats-task.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,16 @@ Then re-run:
./gradlew :rqueue-spring-boot-starter:test --tests "com.github.sonus21.rqueue.spring.boot.integration.NatsBackendEndToEndIT"
```

### Web-layer NATS dashboard gap (new follow-up)
### Web-layer NATS dashboard gap (new follow-up) *(PARTIAL — admin write ops landing)*

All 4 controllers and the 5 web service impls (`RqueueDashboardChartService*`, `RqueueQDetailService*`, `RqueueJobService*`, `RqueueSystemManagerService*`, `RqueueUtilityService*`) are still gated `@Conditional(RedisBackendCondition)`. On NATS the dashboard reports broker-derived sizes only; no charts, no message browse, no admin ops. Plan to fix:
All 4 controllers and the 5 web service impls (`RqueueDashboardChartService*`, `RqueueQDetailService*`, `RqueueJobService*`, `RqueueSystemManagerService*`, `RqueueUtilityService*`) are still gated `@Conditional(RedisBackendCondition)`. On NATS the dashboard reports broker-derived sizes only; no charts, no message browse, no admin ops.

Status:
- ✅ `NatsRqueueUtilityService` (rqueue-nats `@Conditional(NatsBackendCondition)`) replaces the all-stub impl: `pauseUnpauseQueue`, soft `deleteMessage`, `getDataType`, `aggregateDataCounter` work end-to-end. `moveMessage` / `enqueueMessage` / `makeEmpty` are deliberately `notSupported` (no JetStream equivalent).
- ✅ `RqueueQDetailServiceImpl` returns header-only tables for `getRunningTasks` / `getScheduledTasks` when the broker capabilities suppress those sections, instead of rendering 0-rows / 501s.
- ⏳ Charts (`RqueueDashboardChartService`), message browse, and `moveMessage` on NATS — still pending.

Plan to fix the rest:

1. Introduce repository interfaces in `rqueue-core/repository/` for the few storage primitives the web services share (queue browsing, time-series counters, atomic move). Web service impls move into core / `rqueue-web` and depend only on the repos.
2. Redis impls of the repos stay in `rqueue-redis`; NATS impls go in `rqueue-nats` and throw `BackendCapabilityException("nats", "operation", "reason")` for primitives JetStream can't model (positional message moves, time-bucket charts).
Expand Down
3 changes: 3 additions & 0 deletions publish.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,8 @@
#

./gradlew rqueue-core:publishToMavenCentral
./gradlew rqueue-nats:publishToMavenCentral
./gradlew rqueue-redis:publishToMavenCentral
./gradlew rqueue-web:publishToMavenCentral
./gradlew rqueue-spring:publishToMavenCentral
./gradlew rqueue-spring-boot-starter:publishToMavenCentral
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,27 @@ public static int getActiveQueueCount() {
public static int getRegisteredQueueCount() {
return registry.size();
}

/**
* Returns every {@link QueueDetail} registered under the given queue name, including all
* {@code @RqueueListener} methods that share the same backing storage. Used by the
* dashboard to render one subscriber row per handler. Returns an empty list when no
* detail is registered.
*/
public static List<QueueDetail> getAllForQueue(String queueName) {
if (queueName == null) {
return new ArrayList<>();
}
synchronized (lock) {
List<QueueDetail> matches = registry.values().stream()
.filter(qd -> queueName.equals(qd.getName()))
.sorted(Comparator.comparing(qd -> {
String cn = qd.getConsumerName();
return cn == null ? "" : cn;
}))
.collect(Collectors.toList());
lock.notifyAll();
return matches;
}
}
}
14 changes: 14 additions & 0 deletions rqueue-core/src/main/java/com/github/sonus21/rqueue/core/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ public interface Job {
*/
void setMessage(Object message);

/**
* Exception thrown by the message converter while deserializing the raw payload, if any. When
* conversion fails the raw {@link String} payload is exposed via {@link #getMessage()} so that
* middleware can still run; this method gives middleware access to the underlying error so it can
* decide what to do (skip the handler, route to DLQ, alert, attempt a fallback decode, etc.).
*
* @return the conversion error, or {@code null} if conversion succeeded
*/
Throwable getConversionException();

default boolean hasConversionException() {
return getConversionException() != null;
}

/**
* MessageMetadata corresponding the enqueued message
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ default List<RqueueMessage> pop(

List<RqueueMessage> peek(QueueDetail q, long offset, long count);

/**
* Consumer-aware peek overload. When {@code consumerName} is non-null and the backend has
* per-consumer offsets (e.g. NATS Limits-retention streams), the implementation starts
* pagination from that consumer's next undelivered sequence so the dashboard shows messages
* still pending for that specific subscriber instead of the entire retained window. The
* default delegates to {@link #peek(QueueDetail, long, long)} for backends with a single
* shared pool.
*/
default List<RqueueMessage> peek(QueueDetail q, String consumerName, long offset, long count) {
return peek(q, offset, count);
}

/**
* Remove {@code old} from the processing store and re-enqueue {@code updated} for retry.
* {@code delayMs <= 0} means immediate; {@code delayMs > 0} means schedule after that delay.
Expand Down Expand Up @@ -198,6 +210,85 @@ default String dlqStorageDisplayName(QueueDetail q) {
return null;
}

/**
* Indicates whether {@link #size(QueueDetail)} returns an exact count or an approximation
* for the given queue. Brokers that compute pending from per-consumer position math (e.g.
* NATS JetStream Limits-retention streams) return {@code true} so the dashboard renders
* the figure with a "~" prefix instead of presenting it as authoritative. Defaults to
* {@code false} (the historical Redis behavior — exact list / sorted-set lengths).
*/
default boolean isSizeApproximate(QueueDetail q) {
return false;
}

/**
* Per-consumer pending breakdown for queues whose backend has multiple independent
* subscribers — e.g. JetStream Limits-retention streams where each durable consumer
* progresses at its own pace. Returns an ordered map of {@code consumerName -> pending}
* so the dashboard can render one row per consumer instead of a single aggregate.
*
* <p>The default returns {@code null}, signalling that the queue has a single shared pool
* (Redis lists, NATS WorkQueue streams) and the caller should fall back to
* {@link #size(QueueDetail)}. Empty / null also means "no consumers attached".
*
* @deprecated superseded by {@link #subscribers(QueueDetail)} which returns a richer view
* (consumer + pending + in-flight + shared flag). Retained for one release so
* downstream callers keep compiling.
*/
@Deprecated
default java.util.Map<String, Long> consumerPendingSizes(QueueDetail q) {
return null;
}

/**
* Per-subscriber breakdown for the queue-detail dashboard. Each entry represents one
* logical handler attached to the queue:
*
* <ul>
* <li><b>Redis</b> — one entry per {@code @RqueueListener} method that registered for
* the queue. {@code pending} is the shared list size on every row
* ({@code pendingShared = true}); {@code inFlight} is the shared processing-ZSET
* size.
* <li><b>NATS JetStream</b> — one entry per durable consumer. For WorkQueue retention
* {@code pending} is the shared stream {@code msgCount} ({@code pendingShared = true});
* for Limits retention it is the exact per-consumer {@code numPending}
* ({@code pendingShared = false}). {@code inFlight} is the consumer's
* {@code numAckPending} in both cases.
* </ul>
*
* <p>The default returns a single anonymous row backed by {@link #size(QueueDetail)}, so
* brokers that don't track named subscribers still render a working table.
*/
default java.util.List<SubscriberView> subscribers(QueueDetail q) {
long pending;
try {
pending = size(q);
} catch (RuntimeException e) {
pending = 0L;
}
return java.util.Collections.singletonList(
new SubscriberView(q.resolvedConsumerName(), pending, 0L, true));
}

/**
* Backend-aware human-readable label for the given Redis-shaped {@code DataType} on the given
* dashboard tab. Surfaces in the queue-detail page's "Data Type" column so NATS deployments
* can show "Queue (Stream)" instead of "LIST".
*
* <p>The default returns {@code null}, which the dashboard interprets as "fall back to
* {@code DataType.name()}" (the historical Redis behavior).
*
* @param tab the dashboard nav tab the row corresponds to (PENDING, RUNNING, SCHEDULED, DEAD,
* COMPLETED, etc.). May be {@code null} when called in a context without a tab.
* @param type Redis-shaped data type used by the dashboard's table rendering.
* @return display label, or {@code null} to fall through to the default rendering.
*/
default String dataTypeLabel(
com.github.sonus21.rqueue.models.enums.NavTab tab,
com.github.sonus21.rqueue.models.enums.DataType type) {
return null;
}

AutoCloseable subscribe(String channel, Consumer<String> handler);

void publish(String channel, String payload);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2026 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*/

package com.github.sonus21.rqueue.core.spi;

/**
* Per-subscriber row surfaced by {@link MessageBroker#subscribers} for the queue-detail
* dashboard. Each entry corresponds to one logical consumer attached to the queue:
*
* <ul>
* <li>On Redis, one entry per {@code @RqueueListener} method (handlers all share the
* same backing list, so {@code pending} is the same on every row and
* {@code pendingShared} is {@code true}).
* <li>On NATS JetStream, one entry per durable consumer. {@code pending} is the
* per-consumer {@code numPending} for Limits retention (exact, divergent across
* rows) and the shared {@code msgCount} for WorkQueue retention (same on every
* row, {@code pendingShared = true}).
* </ul>
*
* @param consumerName logical consumer / handler name (from {@code @RqueueListener.consumerName}
* when set, otherwise a backend-derived name like {@code rqueue-<queue>}).
* @param pending messages waiting to be processed by this subscriber.
* @param inFlight messages this subscriber has received but not yet acknowledged.
* @param pendingShared {@code true} when {@code pending} is a queue-wide aggregate rather
* than this subscriber's exclusive backlog. The dashboard renders these with a
* "(shared)" hint so it's clear the figure is not per-consumer.
*/
public record SubscriberView(
String consumerName, long pending, long inFlight, boolean pendingShared) {

public SubscriberView(String consumerName, long pending, long inFlight) {
this(consumerName, pending, inFlight, false);
}
}
Loading