diff --git a/.server-changes/bulk-action-cursor-pagination.md b/.server-changes/bulk-action-cursor-pagination.md new file mode 100644 index 00000000000..5f506493d11 --- /dev/null +++ b/.server-changes/bulk-action-cursor-pagination.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Fix run pagination that could duplicate or skip runs: the query orders by `(created_at, run_id)` but the cursor cut on `run_id` alone, which diverges when run_id order doesn't match created_at order (e.g. bulk replay re-processing runs). Cursors now encode the composite key as an opaque token and cut on the matching tuple; legacy bare-run_id cursors stay supported for in-flight pagination. diff --git a/apps/webapp/app/presenters/v3/TestTaskPresenter.server.ts b/apps/webapp/app/presenters/v3/TestTaskPresenter.server.ts index d5360cd004a..a9381ab60d2 100644 --- a/apps/webapp/app/presenters/v3/TestTaskPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/TestTaskPresenter.server.ts @@ -203,7 +203,7 @@ export class TestTaskPresenter { prisma: this.replica as PrismaClient, }); - const runIds = await runsRepository.listRunIds({ + const { runIds } = await runsRepository.listRunIds({ organizationId: environment.organizationId, environmentId: environment.id, projectId: environment.projectId, diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.live.ts b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.live.ts index e9993b722c7..616aa728872 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.live.ts +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.live.ts @@ -58,7 +58,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) { return { count: 0, since: newRunsSince }; } - const newRunIds = await runsRepository.listRunIds({ + const { runIds: newRunIds } = await runsRepository.listRunIds({ organizationId: project.organizationId, projectId: project.id, environmentId: environment.id, diff --git a/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts index 7a59179c55e..473d3fc7685 100644 --- a/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts @@ -4,12 +4,16 @@ import { type FilterRunsOptions, type IRunsRepository, type ListRunsOptions, + type RunIdsPage, type RunListInputOptions, type RunsRepositoryOptions, type TagListOptions, convertRunListInputOptionsToFilterRunsOptions, } from "./runsRepository.server"; import parseDuration from "parse-duration"; +import { decodeRunsCursor, encodeRunsCursor } from "./runsCursor.server"; + +type RunCursorRow = { runId: string; createdAt: number }; export class ClickHouseRunsRepository implements IRunsRepository { constructor(private readonly options: RunsRepositoryOptions) {} @@ -18,25 +22,52 @@ export class ClickHouseRunsRepository implements IRunsRepository { return "clickhouse"; } - async listRunIds(options: ListRunsOptions) { + /** + * Runs the keyset-paginated query and returns `{ runId, createdAt }` rows + * (one extra beyond `page.size` to signal "has more"). The ordering is always + * the composite `(created_at, run_id)`; the cursor predicate must match it. + * + * Composite cursors carry both components, so we cut on the + * `(created_at, run_id)` tuple — sound regardless of how run_id order relates + * to created_at order. Legacy bare-run_id cursors fall back to the old + * `run_id`-only predicate (knowingly unsound) for backwards compatibility + * with in-flight cursors. + */ + private async listRunRows(options: ListRunsOptions): Promise { const queryBuilder = this.options.clickhouse.taskRuns.queryBuilder(); applyRunFiltersToQueryBuilder( queryBuilder, await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma) ); + const forward = options.page.direction === "forward" || !options.page.direction; + if (options.page.cursor) { - if (options.page.direction === "forward" || !options.page.direction) { - queryBuilder - .where("run_id < {runId: String}", { runId: options.page.cursor }) - .orderBy("created_at DESC, run_id DESC") - .limit(options.page.size + 1); + const decoded = decodeRunsCursor(options.page.cursor); + + if (forward) { + if (decoded.kind === "composite") { + queryBuilder.where( + "(created_at, run_id) < (fromUnixTimestamp64Milli({cursorCreatedAt: Int64}), {runId: String})", + { cursorCreatedAt: decoded.createdAt, runId: decoded.runId } + ); + } else { + queryBuilder.where("run_id < {runId: String}", { runId: decoded.runId }); + } + queryBuilder.orderBy("created_at DESC, run_id DESC"); } else { - queryBuilder - .where("run_id > {runId: String}", { runId: options.page.cursor }) - .orderBy("created_at ASC, run_id ASC") - .limit(options.page.size + 1); + if (decoded.kind === "composite") { + queryBuilder.where( + "(created_at, run_id) > (fromUnixTimestamp64Milli({cursorCreatedAt: Int64}), {runId: String})", + { cursorCreatedAt: decoded.createdAt, runId: decoded.runId } + ); + } else { + queryBuilder.where("run_id > {runId: String}", { runId: decoded.runId }); + } + queryBuilder.orderBy("created_at ASC, run_id ASC"); } + + queryBuilder.limit(options.page.size + 1); } else { // Initial page - no cursor provided queryBuilder.orderBy("created_at DESC, run_id DESC").limit(options.page.size + 1); @@ -48,75 +79,89 @@ export class ClickHouseRunsRepository implements IRunsRepository { throw queryError; } - const runIds = result.map((row) => row.run_id); - return runIds; + return result.map((row) => ({ runId: row.run_id, createdAt: row.created_at_ms })); } - async listFriendlyRunIds(options: ListRunsOptions) { - // First get internal IDs from ClickHouse - const internalIds = await this.listRunIds(options); + /** + * A keyset-paginated page of run ids ordered by `(created_at, run_id)`, plus + * the cursors to page forward/backward. Cursors are composite tokens that + * match the ordering, so pagination can't duplicate or skip runs even when + * run_id order diverges from created_at order. This is the single source of + * cursor construction — `listRuns` and bulk actions both build on it. + */ + async listRunIds(options: ListRunsOptions): Promise { + const rows = await this.listRunRows(options); - if (internalIds.length === 0) { - return []; - } - - // Then get friendly IDs from Prisma - const runs = await this.options.prisma.taskRun.findMany({ - where: { - id: { - in: internalIds, - }, - }, - select: { - friendlyId: true, - }, - }); - - return runs.map((run) => run.friendlyId); - } - - async listRuns(options: ListRunsOptions) { - const runIds = await this.listRunIds(options); + // listRunRows fetches one extra row beyond page.size to detect "has more". + const hasMore = rows.length > options.page.size; - // If there are more runs than the page size, we need to fetch the next page - const hasMore = runIds.length > options.page.size; + const cursorFor = (row: RunCursorRow | undefined): string | null => + row ? encodeRunsCursor(row.createdAt, row.runId) : null; let nextCursor: string | null = null; let previousCursor: string | null = null; - //get cursors for next and previous pages const direction = options.page.direction ?? "forward"; switch (direction) { case "forward": { - previousCursor = options.page.cursor ? runIds.at(0) ?? null : null; + previousCursor = options.page.cursor ? cursorFor(rows.at(0)) : null; if (hasMore) { - // The next cursor should be the last run ID from this page - nextCursor = runIds[options.page.size - 1]; + // The next cursor is the last run on this page. + nextCursor = cursorFor(rows[options.page.size - 1]); } break; } case "backward": { - const reversedRunIds = [...runIds].reverse(); + const reversedRows = [...rows].reverse(); if (hasMore) { - previousCursor = reversedRunIds.at(1) ?? null; - nextCursor = reversedRunIds.at(options.page.size) ?? null; + previousCursor = cursorFor(reversedRows.at(1)); + nextCursor = cursorFor(reversedRows.at(options.page.size)); } else { - nextCursor = reversedRunIds.at(options.page.size - 1) ?? null; + nextCursor = cursorFor(reversedRows.at(options.page.size - 1)); } - break; } } - const runIdsToReturn = - options.page.direction === "backward" && hasMore - ? runIds.slice(1, options.page.size + 1) - : runIds.slice(0, options.page.size); + const runIds = ( + direction === "backward" && hasMore + ? rows.slice(1, options.page.size + 1) + : rows.slice(0, options.page.size) + ).map((row) => row.runId); + + return { runIds, pagination: { nextCursor, previousCursor } }; + } + + async listFriendlyRunIds(options: ListRunsOptions) { + // First get internal IDs from ClickHouse + const { runIds } = await this.listRunIds(options); + + if (runIds.length === 0) { + return []; + } + + // Then get friendly IDs from Prisma + const runs = await this.options.prisma.taskRun.findMany({ + where: { + id: { + in: runIds, + }, + }, + select: { + friendlyId: true, + }, + }); + + return runs.map((run) => run.friendlyId); + } + + async listRuns(options: ListRunsOptions) { + const { runIds, pagination } = await this.listRunIds(options); let runs = await this.options.prisma.taskRun.findMany({ where: { id: { - in: runIdsToReturn, + in: runIds, }, }, orderBy: { @@ -163,10 +208,7 @@ export class ClickHouseRunsRepository implements IRunsRepository { return { runs, - pagination: { - nextCursor, - previousCursor, - }, + pagination, }; } diff --git a/apps/webapp/app/services/runsRepository/runsCursor.server.ts b/apps/webapp/app/services/runsRepository/runsCursor.server.ts new file mode 100644 index 00000000000..f16d30a0c05 --- /dev/null +++ b/apps/webapp/app/services/runsRepository/runsCursor.server.ts @@ -0,0 +1,47 @@ +/** + * Cursor encoding for keyset pagination over `(created_at, run_id)`. + * + * The list query orders by the composite key `(created_at, run_id)`, so a sound + * cursor must carry BOTH components — cutting on `run_id` alone re-includes and + * skips rows whenever `run_id` order diverges from `created_at` order. + * + * A cursor is an opaque URL-safe base64 token wrapping `{ c: createdAtMs, r: + * runId }`. Cursors are server-issued (the SDK just echoes + * `pagination.next`/`previous` back), so this format needs no client update. + * + * Legacy cursors were the bare internal run_id (a cuid). They are detected by + * decode failure: a cuid base64-decodes to non-JSON bytes, so it falls through + * to `{ kind: "legacy" }` and the old (knowingly unsound) `run_id`-only + * predicate. In-flight legacy cursors keep working and drain naturally. + */ + +import { z } from "zod"; + +export type DecodedRunsCursor = + | { kind: "composite"; createdAt: number; runId: string } + | { kind: "legacy"; runId: string }; + +// `c` = created_at (ms since epoch), `r` = run_id. Short keys keep the token small. +const CompositeCursor = z.object({ + c: z.number().int(), + r: z.string().min(1), +}); + +export function encodeRunsCursor(createdAtMs: number, runId: string): string { + return Buffer.from(JSON.stringify({ c: createdAtMs, r: runId })).toString("base64url"); +} + +export function decodeRunsCursor(cursor: string): DecodedRunsCursor { + try { + const parsed = CompositeCursor.safeParse( + JSON.parse(Buffer.from(cursor, "base64url").toString("utf8")) + ); + if (parsed.success) { + return { kind: "composite", createdAt: parsed.data.c, runId: parsed.data.r }; + } + } catch { + // JSON.parse threw — not a composite cursor. + } + + return { kind: "legacy", runId: cursor }; +} diff --git a/apps/webapp/app/services/runsRepository/runsRepository.server.ts b/apps/webapp/app/services/runsRepository/runsRepository.server.ts index 9a0a4a19746..f4eeb5466d0 100644 --- a/apps/webapp/app/services/runsRepository/runsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/runsRepository.server.ts @@ -127,9 +127,25 @@ export type TagList = { tags: string[]; }; +export type CursorPagination = { + nextCursor: string | null; + previousCursor: string | null; +}; + +export type RunIdsPage = { + runIds: string[]; + pagination: CursorPagination; +}; + export interface IRunsRepository { name: string; - listRunIds(options: ListRunsOptions): Promise; + /** + * A keyset-paginated page of run ids plus the cursors to navigate + * forward/backward. The cursors are opaque composite `(created_at, run_id)` + * tokens, so pagination can't duplicate or skip runs. This is the single + * cursor-aware list primitive — `listRuns` and bulk actions build on it. + */ + listRunIds(options: ListRunsOptions): Promise; /** Returns friendly IDs (e.g., run_xxx) instead of internal UUIDs. Used for ClickHouse task_events queries. */ listFriendlyRunIds(options: ListRunsOptions): Promise; listRuns(options: ListRunsOptions): Promise<{ @@ -154,7 +170,7 @@ export class RunsRepository implements IRunsRepository { return "runsRepository"; } - async listRunIds(options: ListRunsOptions): Promise { + async listRunIds(options: ListRunsOptions): Promise { return startActiveSpan( "runsRepository.listRunIds", async () => this.clickHouseRunsRepository.listRunIds(options), diff --git a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts index 21f5d39db91..babdb02ca6a 100644 --- a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts +++ b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts @@ -159,8 +159,13 @@ export class BulkActionService extends BaseService { throw new Error(`Bulk action group has invalid query name: ${group.queryName}`); } - // 2. Get the runs to process in this batch - const runIds = await runsRepository.listRunIds({ + // 2. Get the runs to process in this batch, plus the cursor for the next + // batch. The cursor is a composite (created_at, run_id) keyset cursor so the + // next batch can't re-include or skip runs. + const { + runIds: runIdsToProcess, + pagination: { nextCursor }, + } = await runsRepository.listRunIds({ ...filters, page: { size: env.BULK_ACTION_BATCH_SIZE, @@ -172,8 +177,6 @@ export class BulkActionService extends BaseService { // 3. Process the runs let successCount = 0; let failureCount = 0; - // Slice because we fetch an extra for the cursor - const runIdsToProcess = runIds.slice(0, env.BULK_ACTION_BATCH_SIZE); switch (group.type) { case BulkActionType.CANCEL: { @@ -274,7 +277,10 @@ export class BulkActionService extends BaseService { } } - const isFinished = runIdsToProcess.length === 0; + // A null nextCursor means there is no further page — this batch was the + // last (or there were no runs at all), so the action is complete. (An empty + // batch also yields a null cursor.) + const isFinished = nextCursor === null; logger.debug("Bulk action group processed batch", { bulkActionId, @@ -292,7 +298,8 @@ export class BulkActionService extends BaseService { const updatedGroup = await this._prisma.bulkActionGroup.update({ where: { id: bulkActionId }, data: { - cursor: runIdsToProcess.at(runIdsToProcess.length - 1), + // Json column: leave unchanged when there's no next cursor (finished). + cursor: nextCursor ?? undefined, successCount: { increment: successCount, }, diff --git a/apps/webapp/test/runsRepository.part4.test.ts b/apps/webapp/test/runsRepository.part4.test.ts index b79e41397ee..2a432acab55 100644 --- a/apps/webapp/test/runsRepository.part4.test.ts +++ b/apps/webapp/test/runsRepository.part4.test.ts @@ -424,14 +424,14 @@ describe("RunsRepository (part 4/4)", () => { from: createdAtMs - 1, page: { size: 100 }, }); - expect(newRunIdsBefore.length).toBeGreaterThanOrEqual(1); + expect(newRunIdsBefore.runIds.length).toBeGreaterThanOrEqual(1); const newRunIdsAfter = await runsRepository.listRunIds({ ...baseOptions, from: createdAtMs + 60_000, page: { size: 100 }, }); - expect(newRunIdsAfter).toHaveLength(0); + expect(newRunIdsAfter.runIds).toHaveLength(0); const fromBeforeRun = createdAtMs - 1; @@ -441,7 +441,7 @@ describe("RunsRepository (part 4/4)", () => { tasks: ["my-task"], page: { size: 100 }, }); - expect(matchingTaskIds.length).toBeGreaterThanOrEqual(1); + expect(matchingTaskIds.runIds.length).toBeGreaterThanOrEqual(1); const otherTaskIds = await runsRepository.listRunIds({ ...baseOptions, @@ -449,7 +449,7 @@ describe("RunsRepository (part 4/4)", () => { tasks: ["other-task"], page: { size: 100 }, }); - expect(otherTaskIds).toHaveLength(0); + expect(otherTaskIds.runIds).toHaveLength(0); } ); }); diff --git a/apps/webapp/test/runsRepositoryCursor.test.ts b/apps/webapp/test/runsRepositoryCursor.test.ts new file mode 100644 index 00000000000..854c59941bb --- /dev/null +++ b/apps/webapp/test/runsRepositoryCursor.test.ts @@ -0,0 +1,299 @@ +import { describe, expect, vi } from "vitest"; + +// Mock the db prisma client +vi.mock("~/db.server", () => ({ + prisma: {}, + $replica: {}, +})); + +import { replicationContainerTest } from "@internal/testcontainers"; +import { setTimeout } from "node:timers/promises"; +import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; +import { setupClickhouseReplication } from "./utils/replicationUtils"; + +vi.setConfig({ testTimeout: 60_000 }); + +/** + * Regression tests for keyset pagination over `(created_at, run_id)`. + * + * `listRunIds`/`listRuns` order by the composite key `(created_at, run_id)` but + * the old cursor predicate cut on `run_id` alone. That is only sound when + * `run_id` lexicographic order matches `created_at` order. When a burst of runs + * is created such that the two orders diverge (here: deliberately reversed), + * keyset pagination both re-includes already-seen runs (duplicates) and drops + * runs it should have returned (skips). + * + * Each test inserts runs with explicit ids so that `run_id` ascending order is + * the exact reverse of `created_at` ascending order, then walks every page and + * asserts the union is exactly the inserted set with no duplicates. + */ +describe("RunsRepository cursor pagination", () => { + replicationContainerTest( + "forward pagination returns every run exactly once when run_id order is the reverse of created_at order", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + const { clickhouse } = await setupClickhouseReplication({ + prisma, + databaseUrl: postgresContainer.getConnectionUri(), + clickhouseUrl: clickhouseContainer.getConnectionUrl(), + redisOptions, + }); + + const organization = await prisma.organization.create({ + data: { title: "test", slug: "test" }, + }); + const project = await prisma.project.create({ + data: { + name: "test", + slug: "test", + organizationId: organization.id, + externalRef: "test", + }, + }); + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test", + pkApiKey: "test", + shortcode: "test", + }, + }); + + // run_id ascending: a < b < c < d < e + // created_at ascending: e < d < c < b < a (the exact reverse) + const ids = [ + "aaaaaaaaaaaaaaaaaaaaaaaa", + "bbbbbbbbbbbbbbbbbbbbbbbb", + "cccccccccccccccccccccccc", + "dddddddddddddddddddddddd", + "eeeeeeeeeeeeeeeeeeeeeeee", + ]; + const base = new Date("2026-06-04T16:55:07.000Z").getTime(); + for (let i = 0; i < ids.length; i++) { + await prisma.taskRun.create({ + data: { + id: ids[i], + // earliest-created run has the largest run_id (reverse correlation) + createdAt: new Date(base + (ids.length - 1 - i) * 1000), + friendlyId: `run_${ids[i]}`, + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + traceId: `trace_${i}`, + spanId: `span_${i}`, + queue: "test", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + }, + }); + } + + await setTimeout(1000); + + const runsRepository = new RunsRepository({ prisma, clickhouse }); + + const baseOptions = { + projectId: project.id, + environmentId: runtimeEnvironment.id, + organizationId: organization.id, + }; + + // Walk every forward page, size 2, accumulating ids. + const seen: string[] = []; + let cursor: string | undefined = undefined; + for (let guard = 0; guard < 20; guard++) { + const page = await runsRepository.listRuns({ + ...baseOptions, + page: { size: 2, cursor, direction: cursor ? "forward" : undefined }, + }); + seen.push(...page.runs.map((r) => r.id)); + if (!page.pagination.nextCursor) break; + cursor = page.pagination.nextCursor; + } + + // No duplicates, no skips: every inserted run appears exactly once. + expect(seen.slice().sort()).toEqual(ids.slice().sort()); + expect(new Set(seen).size).toBe(ids.length); + } + ); + + replicationContainerTest( + "backward pagination round-trips to the previous page when run_id order is the reverse of created_at order", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + const { clickhouse } = await setupClickhouseReplication({ + prisma, + databaseUrl: postgresContainer.getConnectionUri(), + clickhouseUrl: clickhouseContainer.getConnectionUrl(), + redisOptions, + }); + + const organization = await prisma.organization.create({ + data: { title: "test", slug: "test" }, + }); + const project = await prisma.project.create({ + data: { + name: "test", + slug: "test", + organizationId: organization.id, + externalRef: "test", + }, + }); + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test", + pkApiKey: "test", + shortcode: "test", + }, + }); + + // run_id ascending: a < b < c ; created_at ascending: c < b < a (reversed). + const ids = [ + "aaaaaaaaaaaaaaaaaaaaaaaa", + "bbbbbbbbbbbbbbbbbbbbbbbb", + "cccccccccccccccccccccccc", + ]; + const base = new Date("2026-06-04T16:55:07.000Z").getTime(); + for (let i = 0; i < ids.length; i++) { + await prisma.taskRun.create({ + data: { + id: ids[i], + createdAt: new Date(base + (ids.length - 1 - i) * 1000), + friendlyId: `run_${ids[i]}`, + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + traceId: `trace_${i}`, + spanId: `span_${i}`, + queue: "test", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + }, + }); + } + + await setTimeout(1000); + + const runsRepository = new RunsRepository({ prisma, clickhouse }); + const baseOptions = { + projectId: project.id, + environmentId: runtimeEnvironment.id, + organizationId: organization.id, + }; + + // Forward order (created_at DESC) is [a, b, c]. First page (size 2) = {a, b}. + const firstPage = await runsRepository.listRuns({ + ...baseOptions, + page: { size: 2 }, + }); + const firstIds = firstPage.runs.map((r) => r.id).sort(); + expect(firstIds).toEqual(["aaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbb"]); + expect(firstPage.pagination.nextCursor).toBeTruthy(); + + // Forward to the second page = {c}; it exposes a previousCursor. + const secondPage = await runsRepository.listRuns({ + ...baseOptions, + page: { size: 2, cursor: firstPage.pagination.nextCursor!, direction: "forward" }, + }); + expect(secondPage.runs.map((r) => r.id)).toEqual(["cccccccccccccccccccccccc"]); + expect(secondPage.pagination.previousCursor).toBeTruthy(); + + // Stepping backward from the second page must land back on the first page + // exactly — no duplicated or skipped runs across the boundary. + const backPage = await runsRepository.listRuns({ + ...baseOptions, + page: { size: 2, cursor: secondPage.pagination.previousCursor!, direction: "backward" }, + }); + expect(backPage.runs.map((r) => r.id).sort()).toEqual(firstIds); + } + ); + + replicationContainerTest( + "legacy bare run_id cursor still uses the old (run_id-only) predicate for backwards compatibility", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + const { clickhouse } = await setupClickhouseReplication({ + prisma, + databaseUrl: postgresContainer.getConnectionUri(), + clickhouseUrl: clickhouseContainer.getConnectionUrl(), + redisOptions, + }); + + const organization = await prisma.organization.create({ + data: { title: "test", slug: "test" }, + }); + const project = await prisma.project.create({ + data: { + name: "test", + slug: "test", + organizationId: organization.id, + externalRef: "test", + }, + }); + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test", + pkApiKey: "test", + shortcode: "test", + }, + }); + + const ids = [ + "aaaaaaaaaaaaaaaaaaaaaaaa", + "bbbbbbbbbbbbbbbbbbbbbbbb", + "cccccccccccccccccccccccc", + ]; + const base = new Date("2026-06-04T16:55:07.000Z").getTime(); + for (let i = 0; i < ids.length; i++) { + await prisma.taskRun.create({ + data: { + id: ids[i], + createdAt: new Date(base + i * 1000), + friendlyId: `run_${ids[i]}`, + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + traceId: `trace_${i}`, + spanId: `span_${i}`, + queue: "test", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + }, + }); + } + + await setTimeout(1000); + + const runsRepository = new RunsRepository({ prisma, clickhouse }); + + // A legacy cursor is a bare run_id. The old predicate is `run_id < cursor`, + // so passing the largest run_id must return the two smaller ones. + const page = await runsRepository.listRuns({ + projectId: project.id, + environmentId: runtimeEnvironment.id, + organizationId: organization.id, + page: { size: 10, cursor: "cccccccccccccccccccccccc", direction: "forward" }, + }); + + const returned = page.runs.map((r) => r.id).sort(); + expect(returned).toEqual([ + "aaaaaaaaaaaaaaaaaaaaaaaa", + "bbbbbbbbbbbbbbbbbbbbbbbb", + ]); + } + ); +}); diff --git a/internal-packages/clickhouse/src/__snapshots__/taskRuns.test.ts.snap b/internal-packages/clickhouse/src/__snapshots__/taskRuns.test.ts.snap index b9b921a20e1..4ba7f7e7b1b 100644 --- a/internal-packages/clickhouse/src/__snapshots__/taskRuns.test.ts.snap +++ b/internal-packages/clickhouse/src/__snapshots__/taskRuns.test.ts.snap @@ -5,7 +5,7 @@ exports[`Task Runs V2 > should be able to query task runs using the query builde "params": { "environmentId": "cm9kddfcs01zqdy88ld9mmrli", }, - "query": "SELECT run_id FROM trigger_dev.task_runs_v2 FINAL WHERE environment_id = {environmentId: String}", + "query": "SELECT run_id, toUnixTimestamp64Milli(created_at) AS created_at_ms FROM trigger_dev.task_runs_v2 FINAL WHERE environment_id = {environmentId: String}", } `; @@ -15,6 +15,6 @@ exports[`Task Runs V2 > should be able to query task runs using the query builde "environmentId": "cm9kddfcs01zqdy88ld9mmrli", "status": "COMPLETED_SUCCESSFULLY", }, - "query": "SELECT run_id FROM trigger_dev.task_runs_v2 FINAL WHERE environment_id = {environmentId: String} AND status = {status: String}", + "query": "SELECT run_id, toUnixTimestamp64Milli(created_at) AS created_at_ms FROM trigger_dev.task_runs_v2 FINAL WHERE environment_id = {environmentId: String} AND status = {status: String}", } `; diff --git a/internal-packages/clickhouse/src/taskRuns.ts b/internal-packages/clickhouse/src/taskRuns.ts index 77dca1f7726..2201f4baf6e 100644 --- a/internal-packages/clickhouse/src/taskRuns.ts +++ b/internal-packages/clickhouse/src/taskRuns.ts @@ -370,11 +370,25 @@ export const TaskRunV2QueryResult = z.object({ export type TaskRunV2QueryResult = z.infer; +// Adds the created_at timestamp (ms since epoch) needed to build composite +// keyset cursors over (created_at, run_id) — see runsRepository.server.ts. +// Returned as a JSON number because the client sets +// output_format_json_quote_64bit_integers: 0. Kept separate from +// TaskRunV2QueryResult so run_id-only consumers (e.g. the pending-version +// lookup) aren't forced to select a column they don't need. +export const TaskRunListQueryResult = z.object({ + run_id: z.string(), + created_at_ms: z.number().int(), +}); + +export type TaskRunListQueryResult = z.infer; + export function getTaskRunsQueryBuilder(ch: ClickhouseReader, settings?: ClickHouseSettings) { return ch.queryBuilder({ name: "getTaskRuns", - baseQuery: "SELECT run_id FROM trigger_dev.task_runs_v2 FINAL", - schema: TaskRunV2QueryResult, + baseQuery: + "SELECT run_id, toUnixTimestamp64Milli(created_at) AS created_at_ms FROM trigger_dev.task_runs_v2 FINAL", + schema: TaskRunListQueryResult, settings, }); }