Skip to content

BabelQueue/babelqueue-node

BabelQueue for Node.js

CI npm node License: MIT

Polyglot Queues, Simplified. Read and write the canonical BabelQueue message envelope from Node.js — so your Node services exchange messages with Laravel, Symfony, Python, Go and .NET over one strict JSON format, on the broker you already run.

This is the framework-agnostic Node/TypeScript core: the wire-envelope codec, contracts and dead-letter helpers — zero runtime dependencies, shipped as a dual ESM + CommonJS package with bundled types. The full standard is documented at babelqueue.com.

Installation

npm install @babelqueue/core

Requires Node >=18.

Usage

import { EnvelopeCodec } from "@babelqueue/core";

// Produce — build the canonical envelope and publish the JSON to your broker.
const env = EnvelopeCodec.make(
  "urn:babel:orders:created",
  { order_id: 1042 },
  { queue: "orders" },
);
const body = EnvelopeCodec.encode(env); // compact UTF-8 JSON string
// await redis.rpush("queues:orders", body);
//   /  channel.sendToQueue("orders", Buffer.from(body));

// Consume — decode a message produced by ANY BabelQueue SDK.
const incoming = EnvelopeCodec.decode(body);
if (EnvelopeCodec.accepts(incoming)) {
  // `incoming` is now narrowed to a fully-typed Envelope
  switch (EnvelopeCodec.urn(incoming)) {
    case "urn:babel:orders:created":
      console.log(incoming.data.order_id, incoming.trace_id);
      break;
  }
}

CommonJS works too:

const { EnvelopeCodec } = require("@babelqueue/core");

The envelope is identical to every other SDK's:

{
  "job": "urn:babel:orders:created",
  "trace_id": "",
  "data": { "order_id": 1042 },
  "meta": { "id": "", "queue": "orders", "lang": "node", "schema_version": 1, "created_at": 1749132727000 },
  "attempts": 0
}

Typed messages (optional)

import { EnvelopeCodec, type PolyglotMessage } from "@babelqueue/core";

class OrderCreated implements PolyglotMessage {
  constructor(private readonly orderId: number) {}
  getBabelUrn() {
    return "urn:babel:orders:created";
  }
  toPayload() {
    return { order_id: this.orderId };
  }
}

const env = EnvelopeCodec.fromMessage(new OrderCreated(1042), "orders");

Continue an existing trace by adding getBabelTraceId(): string | null (see HasTraceId), or pass { traceId } to EnvelopeCodec.make.

Dead-letter

import { annotate, EnvelopeCodec } from "@babelqueue/core";

const dlq = annotate(env, "failed", "orders", { attempts: 3, error: "boom" });
// publish EnvelopeCodec.encode(dlq) to the "orders.dlq" queue

annotate returns a copy — the original envelope is preserved unchanged inside the dead-lettered message, so any-language consumers can still read it.

Replay-bypass (optional)

A deliberate replay off the DLQ (redrive) re-runs the handler, re-firing its external side-effects — a second charge, a duplicate email. With bypass, redrive stamps a bq-replay-bypass transport header on each replayed message; a handler reads the delivered headers and skips the effects that already ran, while the idempotent core still runs (ADR-0027).

import { redrive, isReplay, bypassExternalEffects } from "@babelqueue/core";

// PRODUCER: redrive with bypass (the IO must carry headers — publishWithHeaders).
await redrive(io, "orders.dlq", { bypass: true });

// CONSUMER: the adapter surfaces the delivered message's out-of-band headers.
async function onMessage(env, headers) {
  saveOrder(env);                                    // idempotent core — always runs
  await bypassExternalEffects(headers, async () => { // skipped when isReplay(headers)
    await sendConfirmationEmail(env);
  });
}

The marker rides beside the frozen envelope, never inside it (schema_version stays 1, GR-1) — the same out-of-band HeaderCarrier seam as the traceparent header. It takes effect only when the RedriveIO implements publishWithHeaders; otherwise bypass is a no-op (bypassed: false) and the message is still redriven.

Transactional outbox (optional)

A plain producer does two things that must both happen or neither — commit the business row and publish the message — across two systems that can disagree on a crash (the dual write). The outbox removes it: outbox.write(env) persists the encoded envelope into the same DB transaction as your business write, and a separate OutboxRelay publishes the durable rows afterwards (ADR-0029).

import { Outbox, OutboxRelay, InMemoryOutboxStore, EnvelopeCodec } from "@babelqueue/core";

// 1) WRITE — the caller owns the transaction boundary (this is the whole point).
const outbox = new Outbox(store); // your OutboxStore, bound to your DB
await db.transaction(async (tx) => {
  await tx.insertOrder(order);                          // the business write
  const env = EnvelopeCodec.make("urn:babel:orders:created", { order_id }, { queue: "orders" });
  await outbox.write(env);                              // same connection, same tx
});                                                     // both commit, or neither

// 2) RELAY — drain the durable rows onto the broker (a worker loop / cron).
const relay = new OutboxRelay(transport, store);       // your OutboxTransport
await relay.drain();                                   // publishes verbatim, marks published

The store and the transport are interfaces you bind to your own DB and broker — the core ships no DB driver (GR-7) and only an InMemoryOutboxStore reference for tests/demos. The relay publishes the stored bytes verbatim — it never decodes, rebuilds or re-encodes the envelope — so trace_id is preserved end-to-end (GR-4) and the body is byte-identical before store and after relay (GR-1/GR-5). It is at-least-once handoff: a crash between publish and mark-published re-publishes the row, so consumers must stay idempotent (the Wrap helper is the consumer-side mirror).

Implement OutboxStore over your DB (save, fetchUnpublished oldest-first — your adapter SHOULD claim/lock rows so two relays don't double-publish — markPublished, markFailed) and OutboxTransport (publish(body, queue)) over your broker.

GDPR field encryption (optional)

A schema can mark a data field x-gdpr-sensitive (ADR-0030). gdpr.protect encrypts each marked leaf in place before publish, and gdpr.unprotect restores it after decode — so PII rides the wire as ciphertext while the envelope stays frozen.

import { EnvelopeCodec, gdpr, schema } from "@babelqueue/core";

// Your key, your cipher — bind a KMS/Vault/HSM here, or use the bundled AES-256-GCM one.
const cipher = new gdpr.AesGcmCipher(myKey); // 16/24/32-byte key

// PRODUCER — validate CLEARTEXT, then encrypt the marked leaves.
const env = EnvelopeCodec.make("urn:babel:people:created", person, { queue: "people" });
const s = provider.schemaFor(env.job);
if (s) {
  await schema.validate(provider, env.job, env.data); // cleartext, before protect
  gdpr.protect(env.data, s, cipher);                   // email/full_name/... → ciphertext
}
const body = EnvelopeCodec.encode(env);                // ciphertext rides inside data

// CONSUMER — decrypt the marked leaves, then validate CLEARTEXT.
const incoming = EnvelopeCodec.decode(body);
const cs = provider.schemaFor(incoming.job!);
if (cs) {
  gdpr.unprotect(incoming.data as Record<string, unknown>, cs, cipher);
  await schema.validate(provider, incoming.job!, incoming.data as Record<string, unknown>);
}

protect canonically JSON-encodes each marked value then replaces it with the cipher's ciphertext string, so the round-trip is byte-for-byte exact (numbers come back as numbers, objects as objects). It walks nested objects (profile.full_name) and array items (addresses[].line); an absent marked field is skipped; a non-sensitive field is never touched. Validate cleartext — before protect, after unprotect — because a schema constraining a sensitive field would reject the ciphertext string.

The Cipher is an interface you bind (encrypt(bytes) → string / decrypt(string) → bytes, both synchronous) so the core pulls no crypto dependency (GR-7). The bundled AesGcmCipher is built only on node:crypto (AES-256-GCM, random IV prepended, base64; GCM auth-tag verified so a wrong key or tampered ciphertext throws DecryptError). The envelope stays frozen (GR-1): only data values change, a ciphertext is a JSON string so data stays pure JSON (GR-3), meta.schema_version stays 1 and trace_id is preserved (GR-4) — an SDK without the key still carries the envelope, it just can't read the protected fields. Entirely opt-in.

What this core is (and isn't)

It enforces the contract: the envelope shape, URN identity, trace propagation, schema-version gating and the dead-letter block. It is intentionally not a worker/runtime — broker wiring, acks and retry loops stay in your own code (or a future thin adapter), exactly as with the other SDK cores.

UnknownUrnStrategy (FAIL, DELETE, RELEASE, DEAD_LETTER) is provided for adapters to act on.

Conformance

This core passes the shared cross-SDK conformance suite (vendored under test/conformance/) — the same fixtures every BabelQueue SDK must satisfy, so a Node producer and, say, a Laravel consumer agree byte-for-byte.

npm test

License

MIT © Muhammet Şafak

About

Read & write the canonical BabelQueue envelope from Node.js — exchange messages with Laravel, Symfony, Go, Python & .NET over one strict JSON format. Zero-dependency core.

Topics

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Sponsor this project

 

Packages

 
 
 

Contributors