Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions guides/ai-transport/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Ably API key (required for all guides)
ABLY_API_KEY=

# OpenAI API key (required for openai-* guides)
OPENAI_API_KEY=

# Anthropic API key (required for anthropic-* and lang-graph-* guides)
ANTHROPIC_API_KEY=

# Vercel AI SDK uses the underlying provider's key (e.g. OPENAI_API_KEY)
2 changes: 2 additions & 0 deletions guides/ai-transport/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
node_modules/
.env
73 changes: 73 additions & 0 deletions guides/ai-transport/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# AI Transport Guides - E2E Code

Full, runnable implementations for each AI Transport guide. Each guide directory contains language-specific subdirectories with publisher (agent) and subscriber (client) code.

## Structure

```
<guide-name>/
javascript/ # JavaScript/TypeScript implementation
src/publisher.ts
src/subscriber.ts
test/e2e.test.ts
python/ # (future)
java/ # (future)
```

## Guides

| Guide | Provider | Pattern | Languages |
|-------|----------|---------|-----------|
| `openai-message-per-token` | OpenAI | Message per token | JavaScript |
| `openai-message-per-response` | OpenAI | Message per response | JavaScript |
| `anthropic-message-per-token` | Anthropic | Message per token | JavaScript |
| `anthropic-message-per-response` | Anthropic | Message per response | JavaScript |
| `vercel-message-per-token` | Vercel AI SDK | Message per token | JavaScript |
| `vercel-message-per-response` | Vercel AI SDK | Message per response | JavaScript |
| `lang-graph-message-per-token` | LangGraph | Message per token | JavaScript |
| `lang-graph-message-per-response` | LangGraph | Message per response | JavaScript |

## Streaming patterns

- **Message per token**: Publisher sends discrete `start`, `token`, and `stop` events. Subscriber reconstructs the full response by appending tokens correlated by `responseId`.
- **Message per response**: Publisher creates a single message and appends each token to it. Subscriber handles `message.create` and `message.append` actions, with the full response available in message history.

## Prerequisites

- Node.js 20+
- API keys for the relevant providers (see `.env.example`)

## Setup

```bash
cd guides/ai-transport
cp .env.example .env
# Fill in your API keys in .env

yarn install
```

## Running a guide

Each guide has a publisher (streams from LLM to Ably) and a subscriber (reads from Ably):

```bash
# Terminal 1 - start the subscriber
npx tsx <guide-name>/javascript/src/subscriber.ts

# Terminal 2 - start the publisher
npx tsx <guide-name>/javascript/src/publisher.ts
```

## Running tests

```bash
# All guides
yarn test

# Watch mode
yarn test:watch

# Single guide
npx vitest run <guide-name>
```
14 changes: 14 additions & 0 deletions guides/ai-transport/anthropic-citations/javascript/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "guide-anthropic-citations",
"version": "0.1.0",
"private": true,
"type": "module",
"scripts": {
"start:agent": "tsx src/agent.ts",
"start:client": "tsx src/client.ts",
"test": "vitest run"
},
"dependencies": {
"@anthropic-ai/sdk": "^0.71"
}
}
140 changes: 140 additions & 0 deletions guides/ai-transport/anthropic-citations/javascript/src/agent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import Anthropic from '@anthropic-ai/sdk';
import Ably from 'ably';

interface Citation {
type: string;
cited_text: string;
document_index: number;
document_title: string;
start_char_index?: number;
end_char_index?: number;
start_page_number?: number;
end_page_number?: number;
source?: string;
responseStartOffset: number;
responseEndOffset: number;
}

export async function publish(
channel: Ably.RealtimeChannel,
question: string,
documentContent: string,
): Promise<{ msgSerial: string; citations: Citation[] }> {
const anthropic = new Anthropic();

const response = await anthropic.messages.create({
model: 'claude-sonnet-4-5',
max_tokens: 1024,
messages: [
{
role: 'user',
content: [
{
type: 'document',
source: {
type: 'text',
media_type: 'text/plain',
data: documentContent,
},
title: 'Source Document',
citations: { enabled: true },
},
{
type: 'text',
text: question,
},
],
},
],
});

let fullText = '';
const citations: Citation[] = [];
let currentOffset = 0;

// Extract text and citations from response
for (const block of response.content) {
if (block.type === 'text') {
const text = block.text;

if ('citations' in block && Array.isArray(block.citations)) {
for (const citation of block.citations as unknown as Record<string, unknown>[]) {
citations.push({
type: citation.type as string,
cited_text: citation.cited_text as string,
document_index: citation.document_index as number,
document_title: citation.document_title as string,
start_char_index: citation.start_char_index as number | undefined,
end_char_index: citation.end_char_index as number | undefined,
start_page_number: citation.start_page_number as number | undefined,
end_page_number: citation.end_page_number as number | undefined,
source: citation.source as string | undefined,
// Track position in the full response text
responseStartOffset: currentOffset,
responseEndOffset: currentOffset + text.length,
});
}
}

fullText += text;
currentOffset += text.length;
}
}

const result = await channel.publish('response', fullText);
const msgSerial = result.serials[0]!;

// Publish each citation as an annotation
for (const citation of citations) {
let sourceDomain;
try {
sourceDomain = citation.source ? new URL(citation.source).hostname : citation.document_title;
} catch {
sourceDomain = citation.document_title || 'document';
}

await channel.annotations.publish(msgSerial, {
type: 'citations:multiple.v1',
name: sourceDomain,
data: {
title: citation.document_title,
citedText: citation.cited_text,
citationType: citation.type,
startOffset: citation.responseStartOffset,
endOffset: citation.responseEndOffset,
documentIndex: citation.document_index,
...(citation.start_char_index !== undefined && {
startCharIndex: citation.start_char_index,
endCharIndex: citation.end_char_index,
}),
...(citation.start_page_number !== undefined && {
startPageNumber: citation.start_page_number,
endPageNumber: citation.end_page_number,
}),
},
});
}

return { msgSerial, citations };
}

async function main() {
const realtime = new Ably.Realtime({ key: process.env.ABLY_API_KEY, echoMessages: false });
const channel = realtime.channels.get('ai:anthropic-citations-guide');

const document = `The James Webb Space Telescope (JWST) is a space telescope designed to conduct infrared astronomy. Its high-resolution and high-sensitivity instruments allow it to view objects too old and distant for the Hubble Space Telescope. This enables investigations across many fields of astronomy and cosmology, such as observation of the first stars and the formation of the first galaxies.`;

const { msgSerial, citations } = await publish(
channel,
'What is the JWST designed for? Cite your sources.',
document,
);
console.log('Message serial:', msgSerial);
console.log('Citations:', citations.length);
realtime.close();
}

import { fileURLToPath } from 'url';
if (process.argv[1] === fileURLToPath(import.meta.url)) {
main().catch(console.error);
}
64 changes: 64 additions & 0 deletions guides/ai-transport/anthropic-citations/javascript/src/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import Ably from 'ably';

interface CitationSummaryResult {
content: string;
citationSummary: Record<string, { total: number }>;
}

export function subscribe(channel: Ably.RealtimeChannel): Promise<CitationSummaryResult> {
return new Promise((resolve) => {
let content = '';
let citationSummary: Record<string, { total: number }> = {};
let doneTimer: ReturnType<typeof setTimeout> | null = null;

const resetTimer = () => {
if (doneTimer) {
clearTimeout(doneTimer);
}
doneTimer = setTimeout(() => {
resolve({ content, citationSummary });
}, 3000);
};

channel.subscribe((message: Ably.Message) => {
const serial = message.serial;
if (!serial) {
return;
}

switch (message.action) {
case 'message.create':
console.log('\n[Response received]', serial);
content = message.data as string;
resetTimer();
break;

case 'message.summary':
if (message.annotations?.summary?.['citations:multiple.v1']) {
citationSummary = message.annotations.summary['citations:multiple.v1'] as Record<
string,
{ total: number }
>;
console.log('\n[Citation summary received]', citationSummary);
}
resetTimer();
break;
}
});
});
}

async function main() {
const realtime = new Ably.Realtime({ key: process.env.ABLY_API_KEY });
const channel = realtime.channels.get('ai:anthropic-citations-guide');
console.log('Client ready, waiting for response and citations...');
const { content, citationSummary } = await subscribe(channel);
console.log('\nFull response:', content);
console.log('Citation summary:', citationSummary);
realtime.close();
}

import { fileURLToPath } from 'url';
if (process.argv[1] === fileURLToPath(import.meta.url)) {
main().catch(console.error);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import Ably from 'ably';
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
import { publish } from '../src/agent.js';
import { subscribe } from '../src/client.js';
import { waitForMessage } from '../../../test-helpers.js';

const document =
'Ably provides realtime messaging infrastructure. It supports pub/sub, presence, and history features.';

describe('anthropic-citations', () => {
let publisherClient: Ably.Realtime;
let subscriberClient: Ably.Realtime;
let channelName: string;

beforeAll(() => {
channelName = `ai:test-anthropic-citations-${Date.now()}`;
publisherClient = new Ably.Realtime({ key: process.env.ABLY_API_KEY, echoMessages: false });
subscriberClient = new Ably.Realtime({ key: process.env.ABLY_API_KEY });
});

afterAll(async () => {
await new Promise((resolve) => setTimeout(resolve, 500));
publisherClient?.close();
subscriberClient?.close();
});

it('publishes response and citation annotations', async () => {
const subChannel = subscriberClient.channels.get(channelName + '-annotations');
const pubChannel = publisherClient.channels.get(channelName + '-annotations');

const responsePromise = subscribe(subChannel);
await subChannel.attach();

const summaryReceived = waitForMessage(subChannel, (m) => m.action === 'message.summary');

await publish(pubChannel, 'What features does Ably provide? Cite your sources.', document);
await summaryReceived;

const { content, citationSummary } = await responsePromise;

expect(content.length).toBeGreaterThan(0);
expect(Object.keys(citationSummary).length).toBeGreaterThan(0);
});

it('citations reference the source document', async () => {
const pubChannel = publisherClient.channels.get(channelName + '-doc-ref');

const { msgSerial, citations } = await publish(
pubChannel,
'What features does Ably provide? Cite your sources.',
document,
);

expect(msgSerial).toBeDefined();
expect(citations.length).toBeGreaterThan(0);
expect(citations[0].document_title).toBe('Source Document');
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"extends": "../../tsconfig.base.json",
"include": ["src/**/*.ts", "test/**/*.ts"]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "guide-anthropic-human-in-the-loop",
"version": "0.1.0",
"private": true,
"type": "module",
"scripts": {
"start:agent": "tsx src/agent.ts",
"start:approver": "tsx src/approver.ts",
"test": "vitest run"
},
"dependencies": {
"@anthropic-ai/sdk": "^0.71"
}
}
Loading