Polyglot Queues, Simplified. Read and write the canonical BabelQueue message envelope from .NET — so your C#/.NET services exchange messages with Laravel, Symfony, Python, Go, Node and Java over one strict JSON format, on the broker you already run.
This is the framework-agnostic .NET core: the wire-envelope codec, contracts
and dead-letter helpers — zero dependencies (in-box System.Text.Json only).
The full standard is documented at babelqueue.com.
dotnet add package BabelQueue.CoreTargets .NET 8.
using BabelQueue;
// Produce — build the canonical envelope and publish the JSON to your broker.
var env = EnvelopeCodec.Make(
"urn:babel:orders:created",
new Dictionary<string, object?> { ["order_id"] = 1042L },
queue: "orders");
string body = EnvelopeCodec.Encode(env); // compact UTF-8 JSON
// await db.ListRightPushAsync("queues:orders", body);
// / channel.BasicPublish("", "orders", props, Encoding.UTF8.GetBytes(body));
// Consume — decode a message produced by ANY BabelQueue SDK.
var incoming = EnvelopeCodec.Decode(body);
if (EnvelopeCodec.Accepts(incoming))
{
switch (EnvelopeCodec.Urn(incoming))
{
case "urn:babel:orders:created":
Console.WriteLine($"{incoming.Data!["order_id"]} {incoming.TraceId}");
break;
}
}The envelope is identical to every other SDK's:
{
"job": "urn:babel:orders:created",
"trace_id": "…",
"data": { "order_id": 1042 },
"meta": { "id": "…", "queue": "orders", "lang": "dotnet", "schema_version": 1, "created_at": 1749132727000 },
"attempts": 0
}JSON numbers decode into
Dataaslong(integers) ordouble(decimals); objects asDictionary<string, object?>(insertion order preserved).EncodeusesUnsafeRelaxedJsonEscaping, so slashes and non-ASCII stay literal and the bytes match the PHP/Python/Node/Java cores.
public sealed class OrderCreated(long orderId) : IPolyglotMessage, IHasTraceId
{
public string GetBabelUrn() => "urn:babel:orders:created";
public IReadOnlyDictionary<string, object?> ToPayload() =>
new Dictionary<string, object?> { ["order_id"] = orderId };
public string? GetBabelTraceId() => null; // or an inbound trace to continue
}
var env = EnvelopeCodec.FromMessage(new OrderCreated(1042L), "orders");var dlq = DeadLetters.Annotate(env, "failed", "orders", attempts: 3, error: "boom");
// publish EnvelopeCodec.Encode(dlq) to the "orders.dlq" queueDeadLetters.Annotate returns a copy — the original envelope is preserved
unchanged inside the dead-lettered message, so any-language consumers can still
read it.
A deliberate replay off the DLQ (Redrive.RedriveAsync) re-runs the handler,
re-firing its external side-effects — a second charge, a duplicate email. With
Bypass, redrive stamps a bq-replay-bypass transport header on each replayed
message; a handler reads the delivered headers and skips the effects that already
ran, while the idempotent core still runs (ADR-0027).
// PRODUCER — redrive with bypass (the transport must be an IHeaderPublisher).
await Redrive.RedriveAsync(transport, "orders.dlq", new Redrive.Options(Bypass: true));
// CONSUMER — the adapter surfaces the delivered message's out-of-band headers.
await handler(env); // idempotent core — always runs
await Replay.BypassExternalEffectsAsync(headers, async () => // skipped when Replay.IsReplay(headers)
{
await SendConfirmationEmailAsync(env);
});The marker rides beside the frozen envelope on the out-of-band header carrier,
never inside it (schema_version stays 1, GR-1; trace_id preserved, GR-4) —
the same seam as the traceparent header. It takes effect only when the transport
implements Redrive.IHeaderPublisher; otherwise Bypass is a no-op (Bypassed: false) and the message is still redriven.
A plain producer does two things that must both happen or neither — commit the business row and publish the message — across two systems, so a crash between them loses or duplicates the message (the dual write). The outbox (ADR-0029) removes it: the message is stored into the same database, in the same transaction as the business data, then a separate relay publishes the durable rows.
using BabelQueue.Outbox;
// WRITE — the caller owns the transaction boundary (this is the whole point).
var outbox = new Outbox(store); // store : IOutboxStore (your DB, ADO.NET)
await using var tx = await db.BeginTransactionAsync(ct);
await db.InsertOrderAsync(order, tx, ct); // the business write
var env = EnvelopeCodec.Make("urn:order:placed", data, "orders");
await outbox.WriteAsync(env, ct); // same connection, same tx — encodes & saves
await tx.CommitAsync(ct); // both, or neither
// RELAY — drain pending rows and publish them, on a worker loop / scheduler.
var relay = new OutboxRelay(
(body, queue, c) => transport.PublishAsync(queue, body), // your publish seam (verbatim body)
store);
await relay.DrainAsync(cancellationToken: ct); // FlushAsync() does one batchOutbox.WriteAsync encodes via the frozen codec and delegates to
IOutboxStore.SaveAsync — it never begins or commits anything, so it composes
with your existing unit-of-work. The relay publishes the stored bytes verbatim
(never decodes/rebuilds the envelope), so schema_version stays 1 (GR-1/GR-5)
and trace_id is preserved end-to-end (GR-4). A publish that throws marks the row
failed and leaves it pending — one poison row never blocks the batch — with a
bounded, capped backoff; DrainAsync loops until no progress is made.
This is exactly-once handoff, not exactly-once delivery: after a crash the relay
re-publishes a row, so consumers must stay idempotent (Idempotency.Wrap is the
consumer-side mirror, ADR-0022). The core takes no DB dependency (GR-7) — bind
IOutboxStore to your own table over ADO.NET; the bundled InMemoryOutboxStore is
for tests / single-process demos and does not claim/lock rows (a production adapter's
job).
A schema registry can mark a data field as personal data with the
x-gdpr-sensitive keyword (ADR-0030). BabelQueue.Gdpr is the runtime half of
that: a producer encrypts exactly those marked leaves before publish, a consumer
decrypts them after decode — so PII never crosses the wire in clear, while the
envelope stays frozen (only the marked values inside data change, into
ciphertext strings; meta.schema_version stays 1, trace_id is untouched, and
data is still pure JSON, so any SDK can carry the message even without the key).
using BabelQueue.Gdpr;
using BabelQueue.Schema;
// The type and namespace are both `Gdpr`, so alias the type at the call site.
using GdprFields = BabelQueue.Gdpr.Gdpr;
// The cipher is YOURS — bind it to a KMS / Vault / HSM, or use the in-box reference.
ICipher cipher = new AesGcmCipher(my32ByteKey); // AES-256-GCM, your key, no key mgmt
// PRODUCER — validate cleartext, then encrypt the marked leaves in place.
var schema = provider.SchemaFor("urn:order:placed"); // the per-URN JSON Schema
SchemaValidation.Validate(provider, "urn:order:placed", data); // cleartext, before Protect
var env = EnvelopeCodec.Make("urn:order:placed", data, "orders");
GdprFields.Protect((IDictionary<string, object?>)env.Data!, schema, cipher);
var body = EnvelopeCodec.Encode(env); // ciphertext rides inside data
// CONSUMER — decode, decrypt the marked leaves in place, then handle (and validate cleartext).
var incoming = EnvelopeCodec.Decode(body);
GdprFields.Unprotect((IDictionary<string, object?>)incoming.Data!, schema, cipher);ICipher (string Encrypt(byte[]) / byte[] Decrypt(string)) is caller-provided,
so the core takes no crypto dependency (GR-7) — only your concrete backend does.
The bundled AesGcmCipher is a reference on the in-box
System.Security.Cryptography.AesGcm (random 12-byte nonce, 16-byte auth tag, base64);
a wrong key or tampered ciphertext fails authentication and throws.
Protect canonically JSON-encodes each marked value before encrypting, and Unprotect
restores it byte-for-byte (numbers as long/double, objects as Dictionary).
An absent marked field is skipped; a non-string leaf in Unprotect is left untouched
(so it is idempotent for already-cleartext non-string values); a value that the cipher
cannot open throws ProtectedFieldException, so the message takes the retry /
dead-letter path rather than being processed as unreadable PII. Validate cleartext —
before Protect, after Unprotect — because a schema that constrains a sensitive field
(minLength, enum, …) would reject the ciphertext string. The feature is strictly
opt-in: a producer/consumer that never calls these helpers behaves exactly as before.
It enforces the contract: the envelope shape, URN identity, trace propagation, schema-version gating and the dead-letter block. It is intentionally not a worker/runtime — broker wiring, acks and retry loops stay in your own code (or a future adapter), exactly as with the other SDK cores.
UnknownUrnStrategy (Fail, Delete, Release, DeadLetter) is provided for
adapters to act on.
BabelQueue.Tracing adds opt-in OpenTelemetry tracing built only on the in-box
System.Diagnostics.Activity — the primitive the OpenTelemetry .NET SDK consumes —
so the core still takes zero package dependencies. To export, wire a
TracerProvider that listens to the source Telemetry.ActivitySourceName
("BabelQueue", e.g. .AddSource("BabelQueue")); with no listener the helpers are
nearly free and emit nothing. The wire envelope is never touched.
Cross-hop propagation works at two layered levels:
trace_idcorrelation (v0.1):Telemetry.Wrap/Telemetry.PublishAsyncmap the envelope'strace_idto an OTel trace id, so every hop that shares atrace_idshares one trace — correlation and per-hop timing.- W3C
traceparentspan linkage (v0.2): the header-aware overloads carry the active span across a hop as a W3Ctraceparenton an out-of-band header carrier that rides beside the frozen envelope (never inside it), so the consumer span becomes a true child of the producer span. No header ⇒ it falls back to the v0.1trace_idparent — a strict, backward-compatible upgrade.
using BabelQueue.Tracing;
// PRODUCER — inject the active span's traceparent into a carrier the adapter
// carries on the transport's metadata channel, beside the envelope.
var headers = new Dictionary<string, string>();
string id = await Telemetry.PublishAsync(
"urn:babel:orders:created",
new Dictionary<string, object?> { ["order_id"] = 1042L },
headers, // <- the out-of-band carrier
env => myTransport.SendAsync(env, headers), // adapter carries `headers`
queue: "orders");
// CONSUMER — pass the delivered message's headers; the process span is started
// as a child of the producer span (or the trace_id parent when absent).
Handler traced = Telemetry.Wrap(async env => { /* handle */ }, deliveredHeaders);
await traced(incoming);The carrier is
IDictionary<string,string>to write (producer) /IReadOnlyDictionary<string,string>to read (consumer) — the .NET counterpart of the GoHeaderPublisher/ReceivedMessage.Headersand NodeHeaderCarrierseams.Traceparent.Inject/Traceparent.RemoteParentFromHeadersexpose the W3C inject/extract directly. Per-adapter transport wiring (the .NET transports live in the separateBabelQueue.Sqs/BabelQueue.Redis/BabelQueue.MassTransitpackages) is a documented follow-up; this core ships the mechanism.
This core passes the shared cross-SDK conformance suite (vendored under
tests/BabelQueue.Core.Tests/conformance/) — the same fixtures every BabelQueue
SDK must satisfy, so a .NET producer and, say, a Laravel consumer agree
byte-for-byte.
dotnet testMIT © Muhammet Şafak