Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .changeset/chat-realtime-stream-host.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
"@trigger.dev/sdk": patch
---

Chat clients (`TriggerChatTransport` and `AgentChat`) now stream over Trigger.dev Cloud's dedicated realtime host (`realtime.trigger.dev`) by default. A chat session's long-lived SSE reads and input appends no longer run through the main Cloud API host, keeping chat streaming isolated from regular API traffic.

This only changes the Cloud default. Custom and self-hosted base URLs are left untouched (they keep serving realtime on the same origin), and passing a `baseURL` resolver function opts out entirely:

```ts
new TriggerChatTransport({
task: "my-chat",
// realtime in/out endpoints stay on your own host
baseURL: ({ endpoint }) => "https://trigger.acme.internal",
});
```

If you gate chat traffic behind a CSP or network allowlist, add `realtime.trigger.dev`.
22 changes: 22 additions & 0 deletions packages/trigger-sdk/src/v3/ai-shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,28 @@ import type { InferUITools, ModelMessage, ToolSet, UIDataTypes, UIMessage } from
*/
export const PENDING_MESSAGE_INJECTED_TYPE = "data-pending-message-injected" as const;

/** Trigger.dev Cloud api host. */
export const CLOUD_API_BASE_URL = "https://api.trigger.dev";

/**
* Trigger.dev Cloud host that serves realtime chat-session endpoints
* (`/realtime/v1/sessions/*`). These are long-lived SSE reads and frequent
* input appends, so Cloud serves them from a dedicated host rather than
* loading the api service. Mirrors the worker-side `STREAM_ORIGIN` config.
*/
export const CLOUD_STREAM_BASE_URL = "https://realtime.trigger.dev";

/**
* Map a chat client base URL to the host that should serve its realtime
* session endpoints. On Trigger.dev Cloud the realtime endpoints live on a
* dedicated host, so the Cloud api host is routed there. Any other base URL (a
* custom domain or a self-hosted instance) is returned unchanged, since those
* serve realtime on the same origin as the api.
*/
export function resolveChatStreamBaseURL(baseURL: string): string {
return baseURL.replace(/\/$/, "") === CLOUD_API_BASE_URL ? CLOUD_STREAM_BASE_URL : baseURL;
}

/**
* The wire payload shape sent by `TriggerChatTransport`.
* Uses `metadata` to match the AI SDK's `ChatRequestOptions` field name.
Expand Down
21 changes: 17 additions & 4 deletions packages/trigger-sdk/src/v3/chat-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ import {
TRIGGER_CONTROL_SUBTYPE,
} from "@trigger.dev/core/v3";
import type { ChatInputChunk, ChatTaskWirePayload } from "./ai-shared.js";
import { slimSubmitMessageForWire } from "./ai-shared.js";
import {
CLOUD_API_BASE_URL,
resolveChatStreamBaseURL,
slimSubmitMessageForWire,
} from "./ai-shared.js";
import { sessions } from "./sessions.js";

// ─── Type inference ────────────────────────────────────────────────
Expand Down Expand Up @@ -328,9 +332,18 @@ export class AgentChat<TAgent = unknown> {
this.onTriggered = options.onTriggered;
this.onTurnComplete = options.onTurnComplete;
const baseURLOption = options.baseURL;
this.baseURLResolver = typeof baseURLOption === "function"
? baseURLOption
: () => baseURLOption ?? apiClientManager.baseURL ?? "https://api.trigger.dev";
// `AgentChat` only addresses realtime session endpoints (`in`/`out`). For a
// string base URL pointing at Trigger.dev Cloud, route those to the
// dedicated realtime host so the SSE reads and input appends don't load the
// api service. Custom/self-hosted base URLs pass through unchanged, and a
// resolver function is honored verbatim.
this.baseURLResolver =
typeof baseURLOption === "function"
? baseURLOption
: () =>
resolveChatStreamBaseURL(
baseURLOption ?? apiClientManager.baseURL ?? CLOUD_API_BASE_URL
);
this.fetchOverride = options.fetch;

// Hydration: a non-empty `session` means the caller knows the
Expand Down
97 changes: 97 additions & 0 deletions packages/trigger-sdk/src/v3/chat.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,103 @@ describe("TriggerChatTransport", () => {
expect(subscribe!.url.startsWith("https://stream.example.com/")).toBe(true);
});

it("defaults realtime endpoints to realtime.trigger.dev on Trigger.dev Cloud", async () => {
const requests: string[] = [];
global.fetch = vi.fn().mockImplementation(async (url: string | URL) => {
const urlStr = typeof url === "string" ? url : url.toString();
requests.push(urlStr);
if (isSessionStreamAppendUrl(urlStr)) return defaultAppendResponse();
if (isSessionOutSubscribeUrl(urlStr)) return defaultSseResponse();
throw new Error(`Unexpected URL: ${urlStr}`);
});

// No baseURL -> defaults to https://api.trigger.dev (Cloud). The realtime
// session endpoints should be routed to the dedicated realtime host.
const transport = new TriggerChatTransport({
task: "my-chat-task",
accessToken: () => "pat",
sessions: { "chat-cloud": { publicAccessToken: "p" } },
});

const stream = await transport.sendMessages({
trigger: "submit-message",
chatId: "chat-cloud",
messageId: undefined,
messages: [createUserMessage("Hi")],
abortSignal: undefined,
});
await drainChunks(stream);

const append = requests.find(isSessionStreamAppendUrl);
const subscribe = requests.find(isSessionOutSubscribeUrl);
expect(append!.startsWith("https://realtime.trigger.dev/")).toBe(true);
expect(subscribe!.startsWith("https://realtime.trigger.dev/")).toBe(true);
});

it("leaves a custom/self-hosted baseURL untouched for realtime endpoints", async () => {
const requests: string[] = [];
global.fetch = vi.fn().mockImplementation(async (url: string | URL) => {
const urlStr = typeof url === "string" ? url : url.toString();
requests.push(urlStr);
if (isSessionStreamAppendUrl(urlStr)) return defaultAppendResponse();
if (isSessionOutSubscribeUrl(urlStr)) return defaultSseResponse();
throw new Error(`Unexpected URL: ${urlStr}`);
});

const transport = new TriggerChatTransport({
task: "my-chat-task",
accessToken: () => "pat",
baseURL: "https://trigger.acme.internal",
sessions: { "chat-self": { publicAccessToken: "p" } },
});

const stream = await transport.sendMessages({
trigger: "submit-message",
chatId: "chat-self",
messageId: undefined,
messages: [createUserMessage("Hi")],
abortSignal: undefined,
});
await drainChunks(stream);

const append = requests.find(isSessionStreamAppendUrl);
const subscribe = requests.find(isSessionOutSubscribeUrl);
expect(append!.startsWith("https://trigger.acme.internal/")).toBe(true);
expect(subscribe!.startsWith("https://trigger.acme.internal/")).toBe(true);
});

it("does not remap non-prod cloud hosts (mapping is prod-only)", async () => {
const requests: string[] = [];
global.fetch = vi.fn().mockImplementation(async (url: string | URL) => {
const urlStr = typeof url === "string" ? url : url.toString();
requests.push(urlStr);
if (isSessionStreamAppendUrl(urlStr)) return defaultAppendResponse();
if (isSessionOutSubscribeUrl(urlStr)) return defaultSseResponse();
throw new Error(`Unexpected URL: ${urlStr}`);
});

const transport = new TriggerChatTransport({
task: "my-chat-task",
accessToken: () => "pat",
baseURL: "https://test-api.trigger.dev",
sessions: { "chat-test": { publicAccessToken: "p" } },
});

const stream = await transport.sendMessages({
trigger: "submit-message",
chatId: "chat-test",
messageId: undefined,
messages: [createUserMessage("Hi")],
abortSignal: undefined,
});
await drainChunks(stream);

const append = requests.find(isSessionStreamAppendUrl);
const subscribe = requests.find(isSessionOutSubscribeUrl);
expect(append!.startsWith("https://test-api.trigger.dev/")).toBe(true);
expect(subscribe!.startsWith("https://test-api.trigger.dev/")).toBe(true);
});

it("fetch override is invoked for both .in/append and .out SSE with endpoint ctx", async () => {
const fetchCalls: Array<{ url: string; endpoint: string; chatId: string }> = [];

Expand Down
23 changes: 19 additions & 4 deletions packages/trigger-sdk/src/v3/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import {
} from "@trigger.dev/core/v3";
import { ChatTabCoordinator } from "./chat-tab-coordinator.js";
import type { ChatInputChunk, ChatTaskWirePayload } from "./ai-shared.js";
import { slimSubmitMessageForWire } from "./ai-shared.js";
import { resolveChatStreamBaseURL, slimSubmitMessageForWire } from "./ai-shared.js";

const DEFAULT_BASE_URL = "https://api.trigger.dev";
const DEFAULT_STREAM_TIMEOUT_SECONDS = 120;
Expand Down Expand Up @@ -284,6 +284,11 @@ export type TriggerChatTransportOptions<TClientData = unknown> = {
* endpoint, or a function called per request that picks a base URL from the
* endpoint discriminator and chat ID. @default "https://api.trigger.dev"
*
* When a string base URL points at Trigger.dev Cloud (`https://api.trigger.dev`),
* the realtime session endpoints (`in`/`out`) are routed to the dedicated
* realtime host (`https://realtime.trigger.dev`) automatically. Pass a
* resolver function to take full control and opt out of this.
*
* @example Route appends through a proxy, SSE direct:
* ```ts
* baseURL: ({ endpoint }) =>
Expand Down Expand Up @@ -462,9 +467,19 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
| undefined;
const baseURLOption = options.baseURL ?? DEFAULT_BASE_URL;
const streamOverride = options.streamBaseURL;
this.resolveBaseURLFn = typeof baseURLOption === "function"
? (ctx) => (ctx.endpoint === "out" && streamOverride ? streamOverride : baseURLOption(ctx))
: (ctx) => (ctx.endpoint === "out" && streamOverride ? streamOverride : baseURLOption);
// The transport only ever talks to realtime session endpoints (`in`/`out`).
// For a string base URL pointing at Trigger.dev Cloud, route those to the
// dedicated realtime host (`resolveChatStreamBaseURL`) so the long-lived
// SSE reads and input appends don't load the api service. An explicit
// `streamBaseURL` (SSE only, deprecated) and a `baseURL` resolver function
// are honored verbatim — the customer owns routing in those cases.
this.resolveBaseURLFn =
typeof baseURLOption === "function"
? (ctx) => (ctx.endpoint === "out" && streamOverride ? streamOverride : baseURLOption(ctx))
: (ctx) =>
ctx.endpoint === "out" && streamOverride
? streamOverride
: resolveChatStreamBaseURL(baseURLOption);
this.fetchOverride = options.fetch;
this.extraHeaders = options.headers ?? {};
this.streamTimeoutSeconds = options.streamTimeoutSeconds ?? DEFAULT_STREAM_TIMEOUT_SECONDS;
Expand Down
Loading