diff --git a/.changeset/chat-realtime-stream-host.md b/.changeset/chat-realtime-stream-host.md new file mode 100644 index 0000000000..746ef7e6dd --- /dev/null +++ b/.changeset/chat-realtime-stream-host.md @@ -0,0 +1,17 @@ +--- +"@trigger.dev/sdk": patch +--- + +Chat clients (`TriggerChatTransport` and `AgentChat`) now stream over Trigger.dev Cloud's dedicated realtime host (`realtime.trigger.dev`) by default. A chat session's long-lived SSE reads and input appends no longer run through the main Cloud API host, keeping chat streaming isolated from regular API traffic. + +This only changes the Cloud default. Custom and self-hosted base URLs are left untouched (they keep serving realtime on the same origin), and passing a `baseURL` resolver function opts out entirely: + +```ts +new TriggerChatTransport({ + task: "my-chat", + // realtime in/out endpoints stay on your own host + baseURL: ({ endpoint }) => "https://trigger.acme.internal", +}); +``` + +If you gate chat traffic behind a CSP or network allowlist, add `realtime.trigger.dev`. diff --git a/packages/trigger-sdk/src/v3/ai-shared.ts b/packages/trigger-sdk/src/v3/ai-shared.ts index a0ea3036cf..0d4b99b149 100644 --- a/packages/trigger-sdk/src/v3/ai-shared.ts +++ b/packages/trigger-sdk/src/v3/ai-shared.ts @@ -24,6 +24,28 @@ import type { InferUITools, ModelMessage, ToolSet, UIDataTypes, UIMessage } from */ export const PENDING_MESSAGE_INJECTED_TYPE = "data-pending-message-injected" as const; +/** Trigger.dev Cloud api host. */ +export const CLOUD_API_BASE_URL = "https://api.trigger.dev"; + +/** + * Trigger.dev Cloud host that serves realtime chat-session endpoints + * (`/realtime/v1/sessions/*`). These are long-lived SSE reads and frequent + * input appends, so Cloud serves them from a dedicated host rather than + * loading the api service. Mirrors the worker-side `STREAM_ORIGIN` config. + */ +export const CLOUD_STREAM_BASE_URL = "https://realtime.trigger.dev"; + +/** + * Map a chat client base URL to the host that should serve its realtime + * session endpoints. On Trigger.dev Cloud the realtime endpoints live on a + * dedicated host, so the Cloud api host is routed there. Any other base URL (a + * custom domain or a self-hosted instance) is returned unchanged, since those + * serve realtime on the same origin as the api. + */ +export function resolveChatStreamBaseURL(baseURL: string): string { + return baseURL.replace(/\/$/, "") === CLOUD_API_BASE_URL ? CLOUD_STREAM_BASE_URL : baseURL; +} + /** * The wire payload shape sent by `TriggerChatTransport`. * Uses `metadata` to match the AI SDK's `ChatRequestOptions` field name. diff --git a/packages/trigger-sdk/src/v3/chat-client.ts b/packages/trigger-sdk/src/v3/chat-client.ts index 0bbdddcfbe..c49a2f1c09 100644 --- a/packages/trigger-sdk/src/v3/chat-client.ts +++ b/packages/trigger-sdk/src/v3/chat-client.ts @@ -28,7 +28,11 @@ import { TRIGGER_CONTROL_SUBTYPE, } from "@trigger.dev/core/v3"; import type { ChatInputChunk, ChatTaskWirePayload } from "./ai-shared.js"; -import { slimSubmitMessageForWire } from "./ai-shared.js"; +import { + CLOUD_API_BASE_URL, + resolveChatStreamBaseURL, + slimSubmitMessageForWire, +} from "./ai-shared.js"; import { sessions } from "./sessions.js"; // ─── Type inference ──────────────────────────────────────────────── @@ -328,9 +332,18 @@ export class AgentChat { this.onTriggered = options.onTriggered; this.onTurnComplete = options.onTurnComplete; const baseURLOption = options.baseURL; - this.baseURLResolver = typeof baseURLOption === "function" - ? baseURLOption - : () => baseURLOption ?? apiClientManager.baseURL ?? "https://api.trigger.dev"; + // `AgentChat` only addresses realtime session endpoints (`in`/`out`). For a + // string base URL pointing at Trigger.dev Cloud, route those to the + // dedicated realtime host so the SSE reads and input appends don't load the + // api service. Custom/self-hosted base URLs pass through unchanged, and a + // resolver function is honored verbatim. + this.baseURLResolver = + typeof baseURLOption === "function" + ? baseURLOption + : () => + resolveChatStreamBaseURL( + baseURLOption ?? apiClientManager.baseURL ?? CLOUD_API_BASE_URL + ); this.fetchOverride = options.fetch; // Hydration: a non-empty `session` means the caller knows the diff --git a/packages/trigger-sdk/src/v3/chat.test.ts b/packages/trigger-sdk/src/v3/chat.test.ts index 6469f1ac86..5666f1e25d 100644 --- a/packages/trigger-sdk/src/v3/chat.test.ts +++ b/packages/trigger-sdk/src/v3/chat.test.ts @@ -654,6 +654,103 @@ describe("TriggerChatTransport", () => { expect(subscribe!.url.startsWith("https://stream.example.com/")).toBe(true); }); + it("defaults realtime endpoints to realtime.trigger.dev on Trigger.dev Cloud", async () => { + const requests: string[] = []; + global.fetch = vi.fn().mockImplementation(async (url: string | URL) => { + const urlStr = typeof url === "string" ? url : url.toString(); + requests.push(urlStr); + if (isSessionStreamAppendUrl(urlStr)) return defaultAppendResponse(); + if (isSessionOutSubscribeUrl(urlStr)) return defaultSseResponse(); + throw new Error(`Unexpected URL: ${urlStr}`); + }); + + // No baseURL -> defaults to https://api.trigger.dev (Cloud). The realtime + // session endpoints should be routed to the dedicated realtime host. + const transport = new TriggerChatTransport({ + task: "my-chat-task", + accessToken: () => "pat", + sessions: { "chat-cloud": { publicAccessToken: "p" } }, + }); + + const stream = await transport.sendMessages({ + trigger: "submit-message", + chatId: "chat-cloud", + messageId: undefined, + messages: [createUserMessage("Hi")], + abortSignal: undefined, + }); + await drainChunks(stream); + + const append = requests.find(isSessionStreamAppendUrl); + const subscribe = requests.find(isSessionOutSubscribeUrl); + expect(append!.startsWith("https://realtime.trigger.dev/")).toBe(true); + expect(subscribe!.startsWith("https://realtime.trigger.dev/")).toBe(true); + }); + + it("leaves a custom/self-hosted baseURL untouched for realtime endpoints", async () => { + const requests: string[] = []; + global.fetch = vi.fn().mockImplementation(async (url: string | URL) => { + const urlStr = typeof url === "string" ? url : url.toString(); + requests.push(urlStr); + if (isSessionStreamAppendUrl(urlStr)) return defaultAppendResponse(); + if (isSessionOutSubscribeUrl(urlStr)) return defaultSseResponse(); + throw new Error(`Unexpected URL: ${urlStr}`); + }); + + const transport = new TriggerChatTransport({ + task: "my-chat-task", + accessToken: () => "pat", + baseURL: "https://trigger.acme.internal", + sessions: { "chat-self": { publicAccessToken: "p" } }, + }); + + const stream = await transport.sendMessages({ + trigger: "submit-message", + chatId: "chat-self", + messageId: undefined, + messages: [createUserMessage("Hi")], + abortSignal: undefined, + }); + await drainChunks(stream); + + const append = requests.find(isSessionStreamAppendUrl); + const subscribe = requests.find(isSessionOutSubscribeUrl); + expect(append!.startsWith("https://trigger.acme.internal/")).toBe(true); + expect(subscribe!.startsWith("https://trigger.acme.internal/")).toBe(true); + }); + + it("does not remap non-prod cloud hosts (mapping is prod-only)", async () => { + const requests: string[] = []; + global.fetch = vi.fn().mockImplementation(async (url: string | URL) => { + const urlStr = typeof url === "string" ? url : url.toString(); + requests.push(urlStr); + if (isSessionStreamAppendUrl(urlStr)) return defaultAppendResponse(); + if (isSessionOutSubscribeUrl(urlStr)) return defaultSseResponse(); + throw new Error(`Unexpected URL: ${urlStr}`); + }); + + const transport = new TriggerChatTransport({ + task: "my-chat-task", + accessToken: () => "pat", + baseURL: "https://test-api.trigger.dev", + sessions: { "chat-test": { publicAccessToken: "p" } }, + }); + + const stream = await transport.sendMessages({ + trigger: "submit-message", + chatId: "chat-test", + messageId: undefined, + messages: [createUserMessage("Hi")], + abortSignal: undefined, + }); + await drainChunks(stream); + + const append = requests.find(isSessionStreamAppendUrl); + const subscribe = requests.find(isSessionOutSubscribeUrl); + expect(append!.startsWith("https://test-api.trigger.dev/")).toBe(true); + expect(subscribe!.startsWith("https://test-api.trigger.dev/")).toBe(true); + }); + it("fetch override is invoked for both .in/append and .out SSE with endpoint ctx", async () => { const fetchCalls: Array<{ url: string; endpoint: string; chatId: string }> = []; diff --git a/packages/trigger-sdk/src/v3/chat.ts b/packages/trigger-sdk/src/v3/chat.ts index 74746744c7..89a5cfb4e8 100644 --- a/packages/trigger-sdk/src/v3/chat.ts +++ b/packages/trigger-sdk/src/v3/chat.ts @@ -33,7 +33,7 @@ import { } from "@trigger.dev/core/v3"; import { ChatTabCoordinator } from "./chat-tab-coordinator.js"; import type { ChatInputChunk, ChatTaskWirePayload } from "./ai-shared.js"; -import { slimSubmitMessageForWire } from "./ai-shared.js"; +import { resolveChatStreamBaseURL, slimSubmitMessageForWire } from "./ai-shared.js"; const DEFAULT_BASE_URL = "https://api.trigger.dev"; const DEFAULT_STREAM_TIMEOUT_SECONDS = 120; @@ -284,6 +284,11 @@ export type TriggerChatTransportOptions = { * endpoint, or a function called per request that picks a base URL from the * endpoint discriminator and chat ID. @default "https://api.trigger.dev" * + * When a string base URL points at Trigger.dev Cloud (`https://api.trigger.dev`), + * the realtime session endpoints (`in`/`out`) are routed to the dedicated + * realtime host (`https://realtime.trigger.dev`) automatically. Pass a + * resolver function to take full control and opt out of this. + * * @example Route appends through a proxy, SSE direct: * ```ts * baseURL: ({ endpoint }) => @@ -462,9 +467,19 @@ export class TriggerChatTransport implements ChatTransport { | undefined; const baseURLOption = options.baseURL ?? DEFAULT_BASE_URL; const streamOverride = options.streamBaseURL; - this.resolveBaseURLFn = typeof baseURLOption === "function" - ? (ctx) => (ctx.endpoint === "out" && streamOverride ? streamOverride : baseURLOption(ctx)) - : (ctx) => (ctx.endpoint === "out" && streamOverride ? streamOverride : baseURLOption); + // The transport only ever talks to realtime session endpoints (`in`/`out`). + // For a string base URL pointing at Trigger.dev Cloud, route those to the + // dedicated realtime host (`resolveChatStreamBaseURL`) so the long-lived + // SSE reads and input appends don't load the api service. An explicit + // `streamBaseURL` (SSE only, deprecated) and a `baseURL` resolver function + // are honored verbatim — the customer owns routing in those cases. + this.resolveBaseURLFn = + typeof baseURLOption === "function" + ? (ctx) => (ctx.endpoint === "out" && streamOverride ? streamOverride : baseURLOption(ctx)) + : (ctx) => + ctx.endpoint === "out" && streamOverride + ? streamOverride + : resolveChatStreamBaseURL(baseURLOption); this.fetchOverride = options.fetch; this.extraHeaders = options.headers ?? {}; this.streamTimeoutSeconds = options.streamTimeoutSeconds ?? DEFAULT_STREAM_TIMEOUT_SECONDS;