From e4fd87e5e9e1f04291b5d739fcc6c957e5b99aca Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 6 Jun 2026 21:52:27 +0100 Subject: [PATCH] feat(sdk): route chat session streaming to the realtime host by default Chat clients now send realtime session reads and input appends to the dedicated realtime host on Trigger.dev Cloud instead of the API service, isolating long-lived chat streaming from regular API traffic. Custom and self-hosted base URLs are unchanged; the change takes effect as clients upgrade. --- .changeset/chat-realtime-stream-host.md | 17 ++++ packages/trigger-sdk/src/v3/ai-shared.ts | 22 +++++ packages/trigger-sdk/src/v3/chat-client.ts | 21 ++++- packages/trigger-sdk/src/v3/chat.test.ts | 97 ++++++++++++++++++++++ packages/trigger-sdk/src/v3/chat.ts | 23 ++++- 5 files changed, 172 insertions(+), 8 deletions(-) create mode 100644 .changeset/chat-realtime-stream-host.md diff --git a/.changeset/chat-realtime-stream-host.md b/.changeset/chat-realtime-stream-host.md new file mode 100644 index 00000000000..746ef7e6dda --- /dev/null +++ b/.changeset/chat-realtime-stream-host.md @@ -0,0 +1,17 @@ +--- +"@trigger.dev/sdk": patch +--- + +Chat clients (`TriggerChatTransport` and `AgentChat`) now stream over Trigger.dev Cloud's dedicated realtime host (`realtime.trigger.dev`) by default. A chat session's long-lived SSE reads and input appends no longer run through the main Cloud API host, keeping chat streaming isolated from regular API traffic. + +This only changes the Cloud default. Custom and self-hosted base URLs are left untouched (they keep serving realtime on the same origin), and passing a `baseURL` resolver function opts out entirely: + +```ts +new TriggerChatTransport({ + task: "my-chat", + // realtime in/out endpoints stay on your own host + baseURL: ({ endpoint }) => "https://trigger.acme.internal", +}); +``` + +If you gate chat traffic behind a CSP or network allowlist, add `realtime.trigger.dev`. diff --git a/packages/trigger-sdk/src/v3/ai-shared.ts b/packages/trigger-sdk/src/v3/ai-shared.ts index a0ea3036cff..0d4b99b1494 100644 --- a/packages/trigger-sdk/src/v3/ai-shared.ts +++ b/packages/trigger-sdk/src/v3/ai-shared.ts @@ -24,6 +24,28 @@ import type { InferUITools, ModelMessage, ToolSet, UIDataTypes, UIMessage } from */ export const PENDING_MESSAGE_INJECTED_TYPE = "data-pending-message-injected" as const; +/** Trigger.dev Cloud api host. */ +export const CLOUD_API_BASE_URL = "https://api.trigger.dev"; + +/** + * Trigger.dev Cloud host that serves realtime chat-session endpoints + * (`/realtime/v1/sessions/*`). These are long-lived SSE reads and frequent + * input appends, so Cloud serves them from a dedicated host rather than + * loading the api service. Mirrors the worker-side `STREAM_ORIGIN` config. + */ +export const CLOUD_STREAM_BASE_URL = "https://realtime.trigger.dev"; + +/** + * Map a chat client base URL to the host that should serve its realtime + * session endpoints. On Trigger.dev Cloud the realtime endpoints live on a + * dedicated host, so the Cloud api host is routed there. Any other base URL (a + * custom domain or a self-hosted instance) is returned unchanged, since those + * serve realtime on the same origin as the api. + */ +export function resolveChatStreamBaseURL(baseURL: string): string { + return baseURL.replace(/\/$/, "") === CLOUD_API_BASE_URL ? CLOUD_STREAM_BASE_URL : baseURL; +} + /** * The wire payload shape sent by `TriggerChatTransport`. * Uses `metadata` to match the AI SDK's `ChatRequestOptions` field name. diff --git a/packages/trigger-sdk/src/v3/chat-client.ts b/packages/trigger-sdk/src/v3/chat-client.ts index 0bbdddcfbea..c49a2f1c09c 100644 --- a/packages/trigger-sdk/src/v3/chat-client.ts +++ b/packages/trigger-sdk/src/v3/chat-client.ts @@ -28,7 +28,11 @@ import { TRIGGER_CONTROL_SUBTYPE, } from "@trigger.dev/core/v3"; import type { ChatInputChunk, ChatTaskWirePayload } from "./ai-shared.js"; -import { slimSubmitMessageForWire } from "./ai-shared.js"; +import { + CLOUD_API_BASE_URL, + resolveChatStreamBaseURL, + slimSubmitMessageForWire, +} from "./ai-shared.js"; import { sessions } from "./sessions.js"; // ─── Type inference ──────────────────────────────────────────────── @@ -328,9 +332,18 @@ export class AgentChat { this.onTriggered = options.onTriggered; this.onTurnComplete = options.onTurnComplete; const baseURLOption = options.baseURL; - this.baseURLResolver = typeof baseURLOption === "function" - ? baseURLOption - : () => baseURLOption ?? apiClientManager.baseURL ?? "https://api.trigger.dev"; + // `AgentChat` only addresses realtime session endpoints (`in`/`out`). For a + // string base URL pointing at Trigger.dev Cloud, route those to the + // dedicated realtime host so the SSE reads and input appends don't load the + // api service. Custom/self-hosted base URLs pass through unchanged, and a + // resolver function is honored verbatim. + this.baseURLResolver = + typeof baseURLOption === "function" + ? baseURLOption + : () => + resolveChatStreamBaseURL( + baseURLOption ?? apiClientManager.baseURL ?? CLOUD_API_BASE_URL + ); this.fetchOverride = options.fetch; // Hydration: a non-empty `session` means the caller knows the diff --git a/packages/trigger-sdk/src/v3/chat.test.ts b/packages/trigger-sdk/src/v3/chat.test.ts index 6469f1ac86c..5666f1e25dc 100644 --- a/packages/trigger-sdk/src/v3/chat.test.ts +++ b/packages/trigger-sdk/src/v3/chat.test.ts @@ -654,6 +654,103 @@ describe("TriggerChatTransport", () => { expect(subscribe!.url.startsWith("https://stream.example.com/")).toBe(true); }); + it("defaults realtime endpoints to realtime.trigger.dev on Trigger.dev Cloud", async () => { + const requests: string[] = []; + global.fetch = vi.fn().mockImplementation(async (url: string | URL) => { + const urlStr = typeof url === "string" ? url : url.toString(); + requests.push(urlStr); + if (isSessionStreamAppendUrl(urlStr)) return defaultAppendResponse(); + if (isSessionOutSubscribeUrl(urlStr)) return defaultSseResponse(); + throw new Error(`Unexpected URL: ${urlStr}`); + }); + + // No baseURL -> defaults to https://api.trigger.dev (Cloud). The realtime + // session endpoints should be routed to the dedicated realtime host. + const transport = new TriggerChatTransport({ + task: "my-chat-task", + accessToken: () => "pat", + sessions: { "chat-cloud": { publicAccessToken: "p" } }, + }); + + const stream = await transport.sendMessages({ + trigger: "submit-message", + chatId: "chat-cloud", + messageId: undefined, + messages: [createUserMessage("Hi")], + abortSignal: undefined, + }); + await drainChunks(stream); + + const append = requests.find(isSessionStreamAppendUrl); + const subscribe = requests.find(isSessionOutSubscribeUrl); + expect(append!.startsWith("https://realtime.trigger.dev/")).toBe(true); + expect(subscribe!.startsWith("https://realtime.trigger.dev/")).toBe(true); + }); + + it("leaves a custom/self-hosted baseURL untouched for realtime endpoints", async () => { + const requests: string[] = []; + global.fetch = vi.fn().mockImplementation(async (url: string | URL) => { + const urlStr = typeof url === "string" ? url : url.toString(); + requests.push(urlStr); + if (isSessionStreamAppendUrl(urlStr)) return defaultAppendResponse(); + if (isSessionOutSubscribeUrl(urlStr)) return defaultSseResponse(); + throw new Error(`Unexpected URL: ${urlStr}`); + }); + + const transport = new TriggerChatTransport({ + task: "my-chat-task", + accessToken: () => "pat", + baseURL: "https://trigger.acme.internal", + sessions: { "chat-self": { publicAccessToken: "p" } }, + }); + + const stream = await transport.sendMessages({ + trigger: "submit-message", + chatId: "chat-self", + messageId: undefined, + messages: [createUserMessage("Hi")], + abortSignal: undefined, + }); + await drainChunks(stream); + + const append = requests.find(isSessionStreamAppendUrl); + const subscribe = requests.find(isSessionOutSubscribeUrl); + expect(append!.startsWith("https://trigger.acme.internal/")).toBe(true); + expect(subscribe!.startsWith("https://trigger.acme.internal/")).toBe(true); + }); + + it("does not remap non-prod cloud hosts (mapping is prod-only)", async () => { + const requests: string[] = []; + global.fetch = vi.fn().mockImplementation(async (url: string | URL) => { + const urlStr = typeof url === "string" ? url : url.toString(); + requests.push(urlStr); + if (isSessionStreamAppendUrl(urlStr)) return defaultAppendResponse(); + if (isSessionOutSubscribeUrl(urlStr)) return defaultSseResponse(); + throw new Error(`Unexpected URL: ${urlStr}`); + }); + + const transport = new TriggerChatTransport({ + task: "my-chat-task", + accessToken: () => "pat", + baseURL: "https://test-api.trigger.dev", + sessions: { "chat-test": { publicAccessToken: "p" } }, + }); + + const stream = await transport.sendMessages({ + trigger: "submit-message", + chatId: "chat-test", + messageId: undefined, + messages: [createUserMessage("Hi")], + abortSignal: undefined, + }); + await drainChunks(stream); + + const append = requests.find(isSessionStreamAppendUrl); + const subscribe = requests.find(isSessionOutSubscribeUrl); + expect(append!.startsWith("https://test-api.trigger.dev/")).toBe(true); + expect(subscribe!.startsWith("https://test-api.trigger.dev/")).toBe(true); + }); + it("fetch override is invoked for both .in/append and .out SSE with endpoint ctx", async () => { const fetchCalls: Array<{ url: string; endpoint: string; chatId: string }> = []; diff --git a/packages/trigger-sdk/src/v3/chat.ts b/packages/trigger-sdk/src/v3/chat.ts index 74746744c7e..89a5cfb4e82 100644 --- a/packages/trigger-sdk/src/v3/chat.ts +++ b/packages/trigger-sdk/src/v3/chat.ts @@ -33,7 +33,7 @@ import { } from "@trigger.dev/core/v3"; import { ChatTabCoordinator } from "./chat-tab-coordinator.js"; import type { ChatInputChunk, ChatTaskWirePayload } from "./ai-shared.js"; -import { slimSubmitMessageForWire } from "./ai-shared.js"; +import { resolveChatStreamBaseURL, slimSubmitMessageForWire } from "./ai-shared.js"; const DEFAULT_BASE_URL = "https://api.trigger.dev"; const DEFAULT_STREAM_TIMEOUT_SECONDS = 120; @@ -284,6 +284,11 @@ export type TriggerChatTransportOptions = { * endpoint, or a function called per request that picks a base URL from the * endpoint discriminator and chat ID. @default "https://api.trigger.dev" * + * When a string base URL points at Trigger.dev Cloud (`https://api.trigger.dev`), + * the realtime session endpoints (`in`/`out`) are routed to the dedicated + * realtime host (`https://realtime.trigger.dev`) automatically. Pass a + * resolver function to take full control and opt out of this. + * * @example Route appends through a proxy, SSE direct: * ```ts * baseURL: ({ endpoint }) => @@ -462,9 +467,19 @@ export class TriggerChatTransport implements ChatTransport { | undefined; const baseURLOption = options.baseURL ?? DEFAULT_BASE_URL; const streamOverride = options.streamBaseURL; - this.resolveBaseURLFn = typeof baseURLOption === "function" - ? (ctx) => (ctx.endpoint === "out" && streamOverride ? streamOverride : baseURLOption(ctx)) - : (ctx) => (ctx.endpoint === "out" && streamOverride ? streamOverride : baseURLOption); + // The transport only ever talks to realtime session endpoints (`in`/`out`). + // For a string base URL pointing at Trigger.dev Cloud, route those to the + // dedicated realtime host (`resolveChatStreamBaseURL`) so the long-lived + // SSE reads and input appends don't load the api service. An explicit + // `streamBaseURL` (SSE only, deprecated) and a `baseURL` resolver function + // are honored verbatim — the customer owns routing in those cases. + this.resolveBaseURLFn = + typeof baseURLOption === "function" + ? (ctx) => (ctx.endpoint === "out" && streamOverride ? streamOverride : baseURLOption(ctx)) + : (ctx) => + ctx.endpoint === "out" && streamOverride + ? streamOverride + : resolveChatStreamBaseURL(baseURLOption); this.fetchOverride = options.fetch; this.extraHeaders = options.headers ?? {}; this.streamTimeoutSeconds = options.streamTimeoutSeconds ?? DEFAULT_STREAM_TIMEOUT_SECONDS;