Polyglot Queues, Simplified. Read and write the canonical BabelQueue message envelope from Java — so your Java/Spring services exchange messages with Laravel, Symfony, Python, Go and Node over one strict JSON format, on the broker you already run.
This is the framework-agnostic Java core: the wire-envelope codec, contracts and dead-letter helpers — zero dependencies (pure JDK, including its own minimal JSON codec, so no Jackson/Gson is forced on you). The full standard is documented at babelqueue.com.
Maven:
<dependency>
<groupId>com.babelqueue</groupId>
<artifactId>babelqueue-core</artifactId>
<version>1.0.0</version>
</dependency>Gradle:
implementation("com.babelqueue:babelqueue-core:1.0.0")Requires Java 17+.
import com.babelqueue.*;
import java.util.Map;
// Produce — build the canonical envelope and publish the JSON to your broker.
Envelope env = EnvelopeCodec.make(
"urn:babel:orders:created",
Map.of("order_id", 1042L),
"orders",
null);
String body = EnvelopeCodec.encode(env); // compact UTF-8 JSON
// jedis.rpush("queues:orders", body);
// / channel.basicPublish("", "orders", props, body.getBytes(StandardCharsets.UTF_8));
// Consume — decode a message produced by ANY BabelQueue SDK.
Envelope in = EnvelopeCodec.decode(body);
if (EnvelopeCodec.accepts(in)) {
switch (EnvelopeCodec.urn(in)) {
case "urn:babel:orders:created" ->
System.out.println(in.data().get("order_id") + " " + in.traceId());
default -> { /* unknown URN */ }
}
}The envelope is identical to every other SDK's:
{
"job": "urn:babel:orders:created",
"trace_id": "…",
"data": { "order_id": 1042 },
"meta": { "id": "…", "queue": "orders", "lang": "java", "schema_version": 1, "created_at": 1749132727000 },
"attempts": 0
}JSON numbers decode into
dataasLong(integers) orDouble(decimals); objects asLinkedHashMap(insertion order preserved).encodeleaves slashes and non-ASCII unescaped, so the bytes match the PHP/Python/Node cores.
record OrderCreated(long orderId) implements PolyglotMessage, HasTraceId {
public String getBabelUrn() { return "urn:babel:orders:created"; }
public Map<String, Object> toPayload() { return Map.of("order_id", orderId); }
public String getBabelTraceId() { return null; } // or an inbound trace to continue
}
Envelope env = EnvelopeCodec.fromMessage(new OrderCreated(1042L), "orders");Envelope dlq = DeadLetters.annotate(env, "failed", "orders", 3, "boom", "java.lang.RuntimeException");
// publish EnvelopeCodec.encode(dlq) to the "orders.dlq" queueDeadLetters.annotate returns a copy — the original envelope is preserved
unchanged inside the dead-lettered message, so any-language consumers can still
read it.
The com.babelqueue.outbox helper (ADR-0029) removes the producer dual write —
"commit the business row" and "publish to the broker" are two systems that can
disagree on a crash. Instead the message is written into your own database, inside
your own transaction, so it commits or rolls back atomically with the business data;
a separate relay publishes the durable rows afterwards.
import com.babelqueue.outbox.*;
// Bind the OutboxStore to YOUR database (a JDBC adapter writes the row on `connection`).
// The core ships only an in-memory reference — it pulls in no DB driver (GR-7).
OutboxStore store = /* your JDBC-backed store */;
Outbox outbox = new Outbox(store);
// 1) Write side — the CALLER owns the transaction boundary (this is the whole point):
connection.setAutoCommit(false);
try {
insertOrder(connection, order); // the business write
Envelope env = EnvelopeCodec.make("urn:babel:orders:created", data, "orders", null);
outbox.write(env); // same connection, same tx
connection.commit(); // both, or neither
} catch (Exception e) {
connection.rollback();
throw e;
}
// 2) Relay side — run on a short interval AFTER the tx commits. Publish the stored bytes
// verbatim through your broker; mark published only after the transport accepts them.
OutboxTransport transport = (queue, body) -> jedis.rpush("queues:" + queue, new String(body, UTF_8));
OutboxRelay relay = new OutboxRelay(transport, store);
OutboxRelayResult result = relay.drain(0); // loop until the outbox is emptyOutbox.write stores the EnvelopeCodec-encoded bytes verbatim (GR-1, the envelope
never changes) and the relay publishes those exact bytes — so trace_id is preserved
end-to-end (GR-4) and the body is byte-identical before store and after relay (GR-5). A
publish that throws is recorded (markFailed) with a bounded backoff and left pending for
the next pass; one poison row never blocks the batch. This is exactly-once handoff into
the broker, then at-least-once on the wire as always — consumers still dedupe on meta.id
(the consumer-side com.babelqueue.idempotency helper is the mirror). Relay concurrency
(SELECT … FOR UPDATE SKIP LOCKED) is the adapter's job; the in-memory reference store does
not implement it.
The com.babelqueue.gdpr helper (ADR-0030) is the runtime, SDK-enforcement half of
the registry's x-gdpr-sensitive declaration: a producer encrypts each marked data
field before publish, a consumer decrypts it after decode. The registry only declares
which fields are personal data; this enforces it on the wire. It is standalone and
opt-in — call it, or don't.
import com.babelqueue.gdpr.*;
import com.babelqueue.schema.*;
// The Cipher is YOURS — a seam onto KMS/Vault/HSM/tokenisation. The core ships a JDK-only
// reference (AES-256-GCM, random 12-byte IV prepended, Base64) so it pulls no crypto dep (GR-7).
Cipher cipher = new AesGcmCipher(key); // key is 16/24/32 bytes; the caller owns it
Map<String, Object> schema = provider.schemaFor(env.job()); // the same per-URN schema you validate against
if (schema != null) {
// Producer — validate CLEARTEXT first, then encrypt the marked leaves in place:
SchemaValidation.validate(provider, env.job(), env.data());
Gdpr.protect(env.data(), schema, cipher);
}
String body = EnvelopeCodec.encode(env); // ciphertext rides inside data
// Consumer — decrypt the marked leaves in place AFTER decode, BEFORE the handler reads data:
Envelope in = EnvelopeCodec.decode(body);
Map<String, Object> inSchema = provider.schemaFor(in.job());
if (inSchema != null) {
Gdpr.unprotect(in.data(), inSchema, cipher); // wrong key → DecryptException (retry/DLQ)
}The wire envelope stays frozen (GR-1): only the value of a sensitive field changes
— it becomes a ciphertext string, so data is still pure JSON (GR-3) and any SDK can
carry the envelope even without the key. meta.schema_version stays 1 and trace_id is
untouched (GR-4). Each leaf is canonically JSON-encoded before encryption and decoded back
after, so protect → unprotect restores the value byte-for-byte (a number comes back a
number, an object an object). The sensitive paths come from the schema's x-gdpr-sensitive
marks (SensitivePaths.of(schema) — nested objects, array items field[], and the root),
which are validation-neutral so annotating a schema is never a breaking change. Validate
cleartext before protect / after unprotect: a schema that constrains a sensitive
field (minLength, enum, …) would reject the ciphertext string otherwise.
It enforces the contract: the envelope shape, URN identity, trace propagation, schema-version gating and the dead-letter block. It is intentionally not a worker/runtime — broker wiring, acks and retry loops stay in your own code (or a future Spring adapter), exactly as with the other SDK cores.
UnknownUrnStrategy (FAIL, DELETE, RELEASE, DEAD_LETTER) is provided for
adapters to act on.
This core passes the shared cross-SDK conformance suite (vendored under
src/test/resources/conformance/) — the same
fixtures every BabelQueue SDK must satisfy, so a Java producer and, say, a Laravel
consumer agree byte-for-byte.
mvn testMIT © Muhammet Şafak