From 4e5ae69ba20bcca960900d353e876535d6f6d7d4 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Thu, 4 Jun 2026 20:48:50 -0700 Subject: [PATCH 1/3] refactor(request-log): gateway is sole writer of request log MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The request log had two persistence paths: the gateway wrote some entries directly, while the orchestrator ran the `log`-topic consumer that wrote all downstream entries to storage. Having the orchestrator persist the request log blurs ownership — the orchestrator is a pipeline that should only emit events, not own the request-log table. Concentrating all writes in the gateway gives a single owner for the request log and keeps the orchestrator free of request-log storage writes. The request-log persistence consumer moves from the orchestrator to the gateway: - Move `submitqueue/orchestrator/controller/log/` → `submitqueue/gateway/controller/log/` (importpath, doc comment, and default consumer group `orchestrator-log` → `gateway-log`). Logic is unchanged. - Orchestrator: `TopicKeyLog` becomes publish-only (subscription dropped), the log controller registration and import are removed, controller count 11 → 10. It still publishes via `submitqueue/core/request.PublishLog`. - Gateway: builds a consumer (generic + mysql classifiers), registers the moved log controller on `TopicKeyLog` with a subscription (group `gateway-log`), starts it, and drains it with `Stop(30000)` on shutdown — preserving the 128+SIGTERM graceful-exit contract. - Add `HOSTNAME=gateway-dev` to both gateway compose files for a stable subscriber name; update the workflow RFC and gateway README. - Tests: add a gateway integration test that publishes to the log topic (as the orchestrator does) and asserts the gateway consumer persists it, and an e2e test that lands a request and asserts Status advances to `started` — exercising the publish→consume→persist path across both services. The gateway keeps its two synchronous direct writes (`accepted` on Land, `cancelling` on Cancel) for read-your-write visibility at RPC return. Both are gateway writes, so the invariant holds: only the gateway persists the request log; the orchestrator only publishes. This works because gateway and orchestrator already share the same queue and app databases. - ✅ `bazel build` of both servers + the moved package - ✅ `make test` — unit tests pass (incl. the moved log_test) - ✅ `make check-gazelle`, `make check-tidy`, `make lint` (fmt + license) - ✅ `make integration-test-submitqueue-gateway` — new `TestRequestLogConsumer` verifies the gateway consumer persists a log entry published to the log topic - ✅ `make e2e-test` — new `TestLandRequest_PersistsStartedLogViaGatewayConsumer` verifies an orchestrator-published `started` log is persisted by the gateway and readable via Status; both services still exit 128+SIGTERM on shutdown --- doc/rfc/submitqueue/workflow.md | 6 +- example/submitqueue/docker-compose.yml | 2 + .../submitqueue/gateway/server/BUILD.bazel | 4 ++ .../gateway/server/docker-compose.yml | 2 + example/submitqueue/gateway/server/main.go | 72 +++++++++++++++++-- .../orchestrator/server/BUILD.bazel | 1 - .../submitqueue/orchestrator/server/main.go | 28 +++----- submitqueue/gateway/README.md | 21 +++++- .../controller/log/BUILD.bazel | 2 +- .../controller/log/log.go | 6 +- .../controller/log/log_test.go | 2 +- test/e2e/submitqueue/BUILD.bazel | 1 + test/e2e/submitqueue/suite_test.go | 40 +++++++++++ .../submitqueue/gateway/BUILD.bazel | 6 ++ .../submitqueue/gateway/suite_test.go | 51 +++++++++++++ 15 files changed, 214 insertions(+), 30 deletions(-) rename submitqueue/{orchestrator => gateway}/controller/log/BUILD.bazel (90%) rename submitqueue/{orchestrator => gateway}/controller/log/log.go (92%) rename submitqueue/{orchestrator => gateway}/controller/log/log_test.go (99%) diff --git a/doc/rfc/submitqueue/workflow.md b/doc/rfc/submitqueue/workflow.md index 2b7b33ee..4e255312 100644 --- a/doc/rfc/submitqueue/workflow.md +++ b/doc/rfc/submitqueue/workflow.md @@ -79,13 +79,13 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate` | **buildsignal** | Build | speculate | Feed CI result back into speculation | | **merge** | BatchID | conclude, speculate | Merge the batch and advance the queue | | **conclude** | BatchID | — | Map terminal batch state to request state | -| **log** | RequestLog | — | Append-only sink for request log events | +| **log** | RequestLog | — | Gateway-owned sink: persists request log events to storage | ## DLQ reconciliation -Every primary pipeline topic above is paired with a `{topic}_dlq` subscription consumed by a dedicated DLQ controller. The consumer framework moves a message to its DLQ once the primary controller returns a non-retryable error or exhausts retries on a retryable one; without the DLQ side the affected request would stay in a non-terminal state forever and the gateway would still report it as "in progress". +Every *consumed* primary pipeline topic above is paired with a `{topic}_dlq` subscription consumed by a dedicated DLQ controller. The `log` topic is the exception: the orchestrator only publishes to it (the gateway is the sole consumer that persists the request log), so it has no orchestrator-side subscription and therefore no DLQ. The consumer framework moves a message to its DLQ once the primary controller returns a non-retryable error or exhausts retries on a retryable one; without the DLQ side the affected request would stay in a non-terminal state forever and the gateway would still report it as "in progress". -The DLQ controllers do not re-attempt the failed work. They decode the payload to recover the affected `RequestID` (start, validate, batch, cancel, log) or `BatchID` (score, speculate, build, buildsignal, merge, conclude) and drive the entity to a terminal failed state — `RequestStateError` for requests, `BatchStateFailed` for batches with fan-out to the member requests. State writes use the same optimistic-locking CAS as the primary pipeline, so a late primary-pipeline update wins cleanly and a version mismatch is asked back for redelivery. +The DLQ controllers do not re-attempt the failed work. They decode the payload to recover the affected `RequestID` (start, validate, batch, cancel) or `BatchID` (score, speculate, build, buildsignal, merge, conclude) and drive the entity to a terminal failed state — `RequestStateError` for requests, `BatchStateFailed` for batches with fan-out to the member requests. State writes use the same optimistic-locking CAS as the primary pipeline, so a late primary-pipeline update wins cleanly and a version mismatch is asked back for redelivery. DLQ consumers are wired with `errs.AlwaysRetryableProcessor` and a very high `Retry.MaxAttempts`, with their own DLQ disabled. That combination makes reconciliation effectively non-droppable: any failure is forced retryable rather than escalating to a second-level dead-letter that nobody consumes. The trade-off is that a genuinely unprocessable DLQ message — typically a malformed payload — must be removed by an operator. diff --git a/example/submitqueue/docker-compose.yml b/example/submitqueue/docker-compose.yml index dc49e86f..1b3ed04b 100644 --- a/example/submitqueue/docker-compose.yml +++ b/example/submitqueue/docker-compose.yml @@ -55,6 +55,8 @@ services: - QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true # Path to YAML queue configuration baked into the image - QUEUE_CONFIG_PATH=/root/queues.yaml + # Stable subscriber name for the request-log consumer + - HOSTNAME=gateway-dev depends_on: mysql-app: condition: service_healthy diff --git a/example/submitqueue/gateway/server/BUILD.bazel b/example/submitqueue/gateway/server/BUILD.bazel index 86baf961..5a34d0e6 100644 --- a/example/submitqueue/gateway/server/BUILD.bazel +++ b/example/submitqueue/gateway/server/BUILD.bazel @@ -11,12 +11,16 @@ go_library( importpath = "github.com/uber/submitqueue/example/submitqueue/gateway/server", visibility = ["//visibility:private"], deps = [ + "//core/errs/generic", + "//core/errs/mysql", "//extension/counter/mysql", + "//extension/messagequeue", "//extension/messagequeue/mysql", "//submitqueue/core/consumer", "//submitqueue/extension/queueconfig/yaml", "//submitqueue/extension/storage/mysql", "//submitqueue/gateway/controller", + "//submitqueue/gateway/controller/log", "//submitqueue/gateway/protopb", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_uber_go_tally_v4//:tally", diff --git a/example/submitqueue/gateway/server/docker-compose.yml b/example/submitqueue/gateway/server/docker-compose.yml index 2b018f16..08a2f24d 100644 --- a/example/submitqueue/gateway/server/docker-compose.yml +++ b/example/submitqueue/gateway/server/docker-compose.yml @@ -55,6 +55,8 @@ services: - QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true # Path to YAML queue configuration baked into the image - QUEUE_CONFIG_PATH=/root/queues.yaml + # Stable subscriber name for the request-log consumer + - HOSTNAME=gateway-dev depends_on: mysql-app: condition: service_healthy diff --git a/example/submitqueue/gateway/server/main.go b/example/submitqueue/gateway/server/main.go index b2c814d8..d9ce8f48 100644 --- a/example/submitqueue/gateway/server/main.go +++ b/example/submitqueue/gateway/server/main.go @@ -28,12 +28,16 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/uber-go/tally/v4" + genericerrs "github.com/uber/submitqueue/core/errs/generic" + mysqlerrs "github.com/uber/submitqueue/core/errs/mysql" mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" + extqueue "github.com/uber/submitqueue/extension/messagequeue" queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql" "github.com/uber/submitqueue/submitqueue/core/consumer" yamlqueueconfig "github.com/uber/submitqueue/submitqueue/extension/queueconfig/yaml" mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql" "github.com/uber/submitqueue/submitqueue/gateway/controller" + logctrl "github.com/uber/submitqueue/submitqueue/gateway/controller/log" pb "github.com/uber/submitqueue/submitqueue/gateway/protopb" "go.uber.org/zap" "google.golang.org/grpc" @@ -174,12 +178,29 @@ func run() error { zap.String("queue_dsn", queueDSN), ) - // Build a publish-only topic registry: gateway only feeds the start of the - // orchestrator pipeline (TopicKeyStart) and the cancel topic (TopicKeyCancel). - // No subscription is configured because the gateway never consumes from the queue. + // Stable subscriber name for the log-topic consumer. Falls back to a + // time-seeded name when HOSTNAME is unset (e.g. local runs). + subscriberName := os.Getenv("HOSTNAME") + if subscriberName == "" { + subscriberName = fmt.Sprintf("gateway-%d", time.Now().Unix()) + } + + // Build the topic registry. The gateway publishes to the start of the + // orchestrator pipeline (TopicKeyStart) and the cancel topic (TopicKeyCancel) — + // both publish-only. It additionally consumes the log topic (TopicKeyLog): + // the gateway is the sole writer of the request log, persisting entries that + // the orchestrator publishes there. registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ {Key: consumer.TopicKeyStart, Name: "start", Queue: mysqlQueue}, {Key: consumer.TopicKeyCancel, Name: "cancel", Queue: mysqlQueue}, + { + Key: consumer.TopicKeyLog, + Name: "log", + Queue: mysqlQueue, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "gateway-log", + ), + }, }) if err != nil { return fmt.Errorf("failed to create topic registry: %w", err) @@ -201,7 +222,8 @@ func run() error { // Initialize storage from the shared app database connection. The land // controller writes to this store directly; cancel/status use the request - // log store directly. + // log store directly. The log consumer (registered below) is the sole + // persister of request log entries published by the orchestrator. store, err := mysqlstorage.NewStorage(appDB, scope.SubScope("storage")) if err != nil { return fmt.Errorf("failed to create storage: %w", err) @@ -236,6 +258,29 @@ func run() error { // Register reflection service for debugging with grpcurl reflection.Register(grpcServer) + // Create the queue consumer and register the log controller. The gateway is + // the sole persister of the request log: the orchestrator publishes entries + // to the log topic and this consumer writes them to storage. + logConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry, + // Storage (extension/storage/mysql) and queue (extension/messagequeue/mysql) + // both run on the same MySQL driver, so a single classifier covers + // errors surfaced from either backend. + genericerrs.Classifier, + mysqlerrs.Classifier, + ) + + logController := logctrl.NewController(logger.Sugar(), scope, store, consumer.TopicKeyLog, "gateway-log") + if err := logConsumer.Register(logController); err != nil { + return fmt.Errorf("failed to register log controller: %w", err) + } + + if err := logConsumer.Start(ctx); err != nil { + // The error can also be a result of a context cancellation due to SIGINT or SIGTERM. + // This is expected, just propagate it. + return fmt.Errorf("failed to start log consumer: %w", err) + } + logger.Info("log consumer started") + // Listen on configurable port port := os.Getenv("PORT") if port == "" { @@ -257,6 +302,8 @@ func run() error { // Wait for interrupt signal or server critical error // If interruption is signaled, gracefully stop the server + // If the server exits with an error, cancel the context to signal the consumer + // After this, stop the consumer // If an error happens during shutdown, return the actual error, not the context cancellation error var serverErr error select { @@ -273,10 +320,25 @@ func run() error { serverErr = <-serverErrCh case serverErr = <-serverErrCh: fmt.Println("Shutting down gateway server due to critical GRPC server error...") + + // Cancel the context to signal cancellation to the queue consumer + cancel() } if serverErr != nil { - err = fmt.Errorf("GRPC server exited with error: %w", serverErr) + serverErr = fmt.Errorf("GRPC server exited with error: %w", serverErr) + } + + // Stop the consumer with a 30s timeout; by this time the context should be + // cancelled and the processing threads may already be exiting; recollect them. + errStop := logConsumer.Stop(30000) + if errStop != nil { + errStop = fmt.Errorf("failed to stop consumer: %w", errStop) + } + + if errStop != nil || serverErr != nil { + // Override context cancellation error with the shutdown error + err = errors.Join(errStop, serverErr) } return err diff --git a/example/submitqueue/orchestrator/server/BUILD.bazel b/example/submitqueue/orchestrator/server/BUILD.bazel index 4ae928dd..d01cdb12 100644 --- a/example/submitqueue/orchestrator/server/BUILD.bazel +++ b/example/submitqueue/orchestrator/server/BUILD.bazel @@ -49,7 +49,6 @@ go_library( "//submitqueue/orchestrator/controller/cancel", "//submitqueue/orchestrator/controller/conclude", "//submitqueue/orchestrator/controller/dlq", - "//submitqueue/orchestrator/controller/log", "//submitqueue/orchestrator/controller/merge", "//submitqueue/orchestrator/controller/score", "//submitqueue/orchestrator/controller/speculate", diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index 7412843d..fc5eb294 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -68,7 +68,6 @@ import ( "github.com/uber/submitqueue/submitqueue/orchestrator/controller/cancel" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/conclude" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/dlq" - logctrl "github.com/uber/submitqueue/submitqueue/orchestrator/controller/log" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/merge" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/score" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/speculate" @@ -382,7 +381,6 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe {consumer.TopicKeyBuildSignal, "buildsignal", "orchestrator-buildsignal"}, {consumer.TopicKeyMerge, "merge", "orchestrator-merge"}, {consumer.TopicKeyConclude, "conclude", "orchestrator-conclude"}, - {consumer.TopicKeyLog, "log", "orchestrator-log"}, } configs := make([]consumer.TopicConfig, 0, 2*len(primaryTopics)) @@ -419,6 +417,16 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe }) } + // Publish-only: the orchestrator emits request log entries to the log + // topic but never persists them. The gateway is the sole consumer that + // writes the request log to storage, so the orchestrator registers no + // consuming subscription (and therefore no log DLQ) for this topic. + configs = append(configs, consumer.TopicConfig{ + Key: consumer.TopicKeyLog, + Name: "log", + Queue: q, + }) + return consumer.NewTopicRegistry(configs) } @@ -651,26 +659,13 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, } count++ - logController := logctrl.NewController( - logger, - scope, - store, - consumer.TopicKeyLog, - "orchestrator-log", - ) - if err := c.Register(logController); err != nil { - return count, fmt.Errorf("failed to register log controller: %w", err) - } - count++ - return count, nil } // registerDLQControllers creates one DLQ reconciler per primary stage and // registers them with the DLQ consumer. Each reconciler drives the affected // request or batch into a terminal Error/Failed state so the gateway stops -// reporting it as stuck-in-progress. The log DLQ is a metric-only no-op (log -// entries are observability, not pipeline state). +// reporting it as stuck-in-progress. func registerDLQControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage) (int, error) { dlqScope := scope.SubScope("dlq") dlqRegs := []struct { @@ -687,7 +682,6 @@ func registerDLQControllers(c consumer.Consumer, logger *zap.SugaredLogger, scop {"buildsignal_dlq", dlq.NewDLQBuildSignalController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq")}, {"merge_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeyMerge), "orchestrator-merge-dlq")}, {"conclude_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeyConclude), "orchestrator-conclude-dlq")}, - {"log_dlq", dlq.NewDLQLogController(logger, dlqScope, dlq.TopicKey(consumer.TopicKeyLog), "orchestrator-log-dlq")}, } var count int for _, reg := range dlqRegs { diff --git a/submitqueue/gateway/README.md b/submitqueue/gateway/README.md index d739f424..2858fe77 100644 --- a/submitqueue/gateway/README.md +++ b/submitqueue/gateway/README.md @@ -1 +1,20 @@ -SubmitQueue Gateway +# SubmitQueue Gateway + +The gateway is the RPC entry point to SubmitQueue. It accepts `Land`, `Cancel`, +`Status`, and `Ping` calls, validates them at the edge, and hands work off to the +orchestrator pipeline asynchronously via the message queue. + +## Request log ownership + +The gateway is the **sole writer of the request log**. No other service persists +request log entries: + +- For statuses it produces synchronously (`accepted` on `Land`, `cancelling` on + `Cancel`), the gateway writes directly to storage so the entry is visible the + moment the RPC returns. +- For statuses produced downstream, the orchestrator only *publishes* entries to + the `log` topic via `submitqueue/core/request.PublishLog`. The gateway runs a + consumer that drains the `log` topic and persists each entry to storage. + +This keeps a single service responsible for the request log while letting the +orchestrator remain free of storage writes for it. diff --git a/submitqueue/orchestrator/controller/log/BUILD.bazel b/submitqueue/gateway/controller/log/BUILD.bazel similarity index 90% rename from submitqueue/orchestrator/controller/log/BUILD.bazel rename to submitqueue/gateway/controller/log/BUILD.bazel index f700931c..60647c61 100644 --- a/submitqueue/orchestrator/controller/log/BUILD.bazel +++ b/submitqueue/gateway/controller/log/BUILD.bazel @@ -3,7 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "log", srcs = ["log.go"], - importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/log", + importpath = "github.com/uber/submitqueue/submitqueue/gateway/controller/log", visibility = ["//visibility:public"], deps = [ "//core/metrics", diff --git a/submitqueue/orchestrator/controller/log/log.go b/submitqueue/gateway/controller/log/log.go similarity index 92% rename from submitqueue/orchestrator/controller/log/log.go rename to submitqueue/gateway/controller/log/log.go index 3c6212dd..311681a0 100644 --- a/submitqueue/orchestrator/controller/log/log.go +++ b/submitqueue/gateway/controller/log/log.go @@ -29,6 +29,10 @@ import ( // Controller handles log queue messages. // It consumes request log entries and persists them to storage. // Implements consumer.Controller interface for integration with the consumer. +// +// The request log is written exclusively by the gateway: other services +// (e.g. the orchestrator) only publish log entries to the log topic, and this +// controller is the single consumer that persists them to storage. type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope @@ -40,7 +44,7 @@ type Controller struct { // Verify Controller implements consumer.Controller interface at compile time. var _ consumer.Controller = (*Controller)(nil) -// NewController creates a new log controller for the orchestrator. +// NewController creates a new log controller for the gateway. func NewController( logger *zap.SugaredLogger, scope tally.Scope, diff --git a/submitqueue/orchestrator/controller/log/log_test.go b/submitqueue/gateway/controller/log/log_test.go similarity index 99% rename from submitqueue/orchestrator/controller/log/log_test.go rename to submitqueue/gateway/controller/log/log_test.go index f8a1319d..c88fad1a 100644 --- a/submitqueue/orchestrator/controller/log/log_test.go +++ b/submitqueue/gateway/controller/log/log_test.go @@ -35,7 +35,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope - return NewController(logger, scope, store, consumer.TopicKeyLog, "orchestrator-log") + return NewController(logger, scope, store, consumer.TopicKeyLog, "gateway-log") } func TestController_Process(t *testing.T) { diff --git a/test/e2e/submitqueue/BUILD.bazel b/test/e2e/submitqueue/BUILD.bazel index b0db796e..01bb5273 100644 --- a/test/e2e/submitqueue/BUILD.bazel +++ b/test/e2e/submitqueue/BUILD.bazel @@ -17,6 +17,7 @@ go_test( "integration", ], deps = [ + "//submitqueue/entity", "//submitqueue/gateway/protopb", "//submitqueue/orchestrator/protopb", "//test/testutil", diff --git a/test/e2e/submitqueue/suite_test.go b/test/e2e/submitqueue/suite_test.go index 19a82787..0bf53067 100644 --- a/test/e2e/submitqueue/suite_test.go +++ b/test/e2e/submitqueue/suite_test.go @@ -27,10 +27,12 @@ import ( "database/sql" "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/uber/submitqueue/submitqueue/entity" gatewaypb "github.com/uber/submitqueue/submitqueue/gateway/protopb" orchestratorpb "github.com/uber/submitqueue/submitqueue/orchestrator/protopb" "github.com/uber/submitqueue/test/testutil" @@ -168,6 +170,44 @@ func (s *E2EIntegrationSuite) TestLandRequest_SinglePR() { s.log.Logf("Land request (single PR) succeeded: sqid=%s", resp.Sqid) } +// TestLandRequest_PersistsStartedLogViaGatewayConsumer verifies the request-log +// ownership invariant end-to-end: the orchestrator only *publishes* request log +// entries to the log topic (it never writes the request log itself), and the +// gateway's log consumer drains that topic and persists them to storage. +// +// We observe this through the gateway Status RPC: immediately after Land the +// status is "accepted" (the gateway's synchronous direct write), and once the +// orchestrator's start controller publishes "started" to the log topic, the +// gateway consumer persists it and Status advances to "started". Seeing +// "started" therefore proves the publish→consume→persist path works across both +// services. +func (s *E2EIntegrationSuite) TestLandRequest_PersistsStartedLogViaGatewayConsumer() { + t := s.T() + + landResp, err := s.gatewayClient.Land(s.ctx, &gatewaypb.LandRequest{ + Queue: "e2e-test-queue", + Change: &gatewaypb.Change{Uris: []string{"github://uber/e2e-startlog/pull/4242/abcdef0123456789abcdef0123456789abcdef01"}}, + Strategy: gatewaypb.Strategy_REBASE, + }) + require.NoError(t, err, "Land request failed") + require.NotEmpty(t, landResp.Sqid, "SQID should not be empty") + sqid := landResp.Sqid + s.log.Logf("Land succeeded: sqid=%s; waiting for gateway consumer to persist 'started'", sqid) + + require.Eventually(t, func() bool { + resp, statusErr := s.gatewayClient.Status(s.ctx, &gatewaypb.StatusRequest{Sqid: sqid}) + if statusErr != nil { + s.log.Logf("Status(%s) not ready yet: %v", sqid, statusErr) + return false + } + s.log.Logf("Status(%s) = %q", sqid, resp.Status) + return resp.Status == string(entity.RequestStatusStarted) + }, 30*time.Second, 500*time.Millisecond, + "request %s should reach status %q via the gateway log consumer", sqid, entity.RequestStatusStarted) + + s.log.Logf("Gateway consumer persisted orchestrator-published 'started' log for sqid=%s", sqid) +} + // TestCancelRequest_InvalidSqid verifies the gateway rejects an empty sqid // synchronously before publishing anything to the cancel queue. func (s *E2EIntegrationSuite) TestCancelRequest_InvalidSqid() { diff --git a/test/integration/submitqueue/gateway/BUILD.bazel b/test/integration/submitqueue/gateway/BUILD.bazel index 6fe77784..da3d0d41 100644 --- a/test/integration/submitqueue/gateway/BUILD.bazel +++ b/test/integration/submitqueue/gateway/BUILD.bazel @@ -16,11 +16,17 @@ go_test( "integration", ], deps = [ + "//extension/messagequeue/mysql", + "//submitqueue/core/consumer", + "//submitqueue/core/request", + "//submitqueue/entity", "//submitqueue/gateway/protopb", "//test/testutil", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_stretchr_testify//suite", + "@com_github_uber_go_tally_v4//:tally", "@org_golang_google_grpc//:grpc", + "@org_uber_go_zap//:zap", ], ) diff --git a/test/integration/submitqueue/gateway/suite_test.go b/test/integration/submitqueue/gateway/suite_test.go index b8406262..afa2764d 100644 --- a/test/integration/submitqueue/gateway/suite_test.go +++ b/test/integration/submitqueue/gateway/suite_test.go @@ -30,12 +30,19 @@ import ( "database/sql" "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/uber-go/tally/v4" + queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql" + "github.com/uber/submitqueue/submitqueue/core/consumer" + corerequest "github.com/uber/submitqueue/submitqueue/core/request" + "github.com/uber/submitqueue/submitqueue/entity" pb "github.com/uber/submitqueue/submitqueue/gateway/protopb" "github.com/uber/submitqueue/test/testutil" + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -144,3 +151,47 @@ func (s *GatewayIntegrationSuite) TestLandAPI() { s.log.Logf("Land API test passed: request stored and message published") } + +// TestRequestLogConsumer verifies the gateway's log-topic consumer in isolation: +// no orchestrator runs in this stack, so the test itself publishes a request log +// entry to the log topic exactly as the orchestrator does in production (via +// submitqueue/core/request.PublishLog). The gateway is the sole writer of the +// request log; this asserts its consumer drains the log topic and persists the +// entry to storage, observable through the Status RPC. +func (s *GatewayIntegrationSuite) TestRequestLogConsumer() { + t := s.T() + + // Build a publisher against the shared queue database. NewQueue only wires up + // stores; nothing consumes until a subscriber is started, so this publish-only + // use does not interfere with the gateway container's consumer. + queue, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.queueDB, + Logger: zap.NewNop(), + MetricsScope: tally.NoopScope, + }) + require.NoError(t, err, "failed to create queue publisher") + defer queue.Close() + + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: consumer.TopicKeyLog, Name: "log", Queue: queue}, + }) + require.NoError(t, err, "failed to create topic registry") + + const sqid = "log-consumer-test/1" + logEntry := entity.NewRequestLog(sqid, entity.RequestStatusStarted, 1, "", nil) + require.NoError(t, corerequest.PublishLog(s.ctx, registry, logEntry, sqid), + "failed to publish request log to log topic") + + s.log.Logf("Published 'started' log for sqid=%s; waiting for gateway consumer to persist it", sqid) + + require.Eventually(t, func() bool { + resp, statusErr := s.client.Status(s.ctx, &pb.StatusRequest{Sqid: sqid}) + if statusErr != nil { + return false + } + return resp.Status == string(entity.RequestStatusStarted) + }, 30*time.Second, 500*time.Millisecond, + "gateway log consumer should persist the published request log for sqid=%s", sqid) + + s.log.Logf("Request log consumer test passed: entry persisted and readable via Status") +} From 39e8fa08047674c6e41b15529ef1d3116b220cfe Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Fri, 5 Jun 2026 15:07:48 -0700 Subject: [PATCH 2/3] refactor(request-log): address review on gateway sole-owner PR - Make the log-consumer subscriber name unique per instance (hostname+PID) so co-located gateway processes don't contend for the same partition lease. - Report the gRPC server error first in the shutdown errors.Join (it is the primary failure; consumer-stop is secondary cleanup). - Clarify in the README that the gateway is the sole owner (writer and reader) of the request log; Status/Cancel read directly, orchestrator only publishes. - Extract named poll constants (persistTimeout/persistPollInterval) in the gateway integration and e2e suites with a comment explaining that the in-container consumer is observed black-box via Status, so a bounded poll is used in lieu of an in-process channel/HookSignal wait. Follow-ups split out: design doc (#211) and DLQ PublishLog() (#212). Co-Authored-By: Oz --- .../submitqueue/gateway/server/BUILD.bazel | 1 + example/submitqueue/gateway/server/main.go | 35 ++++++++++++------- submitqueue/gateway/README.md | 9 +++-- test/e2e/submitqueue/suite_test.go | 13 ++++++- .../submitqueue/gateway/suite_test.go | 13 ++++++- 5 files changed, 55 insertions(+), 16 deletions(-) diff --git a/example/submitqueue/gateway/server/BUILD.bazel b/example/submitqueue/gateway/server/BUILD.bazel index 5a34d0e6..05c868bc 100644 --- a/example/submitqueue/gateway/server/BUILD.bazel +++ b/example/submitqueue/gateway/server/BUILD.bazel @@ -11,6 +11,7 @@ go_library( importpath = "github.com/uber/submitqueue/example/submitqueue/gateway/server", visibility = ["//visibility:private"], deps = [ + "//core/errs", "//core/errs/generic", "//core/errs/mysql", "//extension/counter/mysql", diff --git a/example/submitqueue/gateway/server/main.go b/example/submitqueue/gateway/server/main.go index d9ce8f48..c23a47c5 100644 --- a/example/submitqueue/gateway/server/main.go +++ b/example/submitqueue/gateway/server/main.go @@ -28,6 +28,7 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/errs" genericerrs "github.com/uber/submitqueue/core/errs/generic" mysqlerrs "github.com/uber/submitqueue/core/errs/mysql" mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" @@ -178,12 +179,18 @@ func run() error { zap.String("queue_dsn", queueDSN), ) - // Stable subscriber name for the log-topic consumer. Falls back to a - // time-seeded name when HOSTNAME is unset (e.g. local runs). - subscriberName := os.Getenv("HOSTNAME") - if subscriberName == "" { - subscriberName = fmt.Sprintf("gateway-%d", time.Now().Unix()) + // Subscriber name for the log-topic consumer. It must be unique per running + // instance: SubscriberName identifies a subscriber for partition leases, so + // two gateway processes on the same host (sharing HOSTNAME) would otherwise + // contend for the same lease. Append the PID to keep co-located instances + // distinct; the PID is stable for the life of the process. Offset tracking + // stays keyed on the shared ConsumerGroup ("gateway-log"), not this name. + // Falls back to a time-seeded name when HOSTNAME is unset (e.g. local runs). + hostname := os.Getenv("HOSTNAME") + if hostname == "" { + hostname = fmt.Sprintf("gateway-%d", time.Now().Unix()) } + subscriberName := fmt.Sprintf("%s-%d", hostname, os.Getpid()) // Build the topic registry. The gateway publishes to the start of the // orchestrator pipeline (TopicKeyStart) and the cancel topic (TopicKeyCancel) — @@ -262,11 +269,13 @@ func run() error { // the sole persister of the request log: the orchestrator publishes entries // to the log topic and this consumer writes them to storage. logConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry, - // Storage (extension/storage/mysql) and queue (extension/messagequeue/mysql) - // both run on the same MySQL driver, so a single classifier covers - // errors surfaced from either backend. - genericerrs.Classifier, - mysqlerrs.Classifier, + errs.NewClassifierProcessor( + // Storage (extension/storage/mysql) and queue (extension/messagequeue/mysql) + // both run on the same MySQL driver, so a single classifier covers + // errors surfaced from either backend. + genericerrs.Classifier, + mysqlerrs.Classifier, + ), ) logController := logctrl.NewController(logger.Sugar(), scope, store, consumer.TopicKeyLog, "gateway-log") @@ -337,8 +346,10 @@ func run() error { } if errStop != nil || serverErr != nil { - // Override context cancellation error with the shutdown error - err = errors.Join(errStop, serverErr) + // Override context cancellation error with the shutdown error. The server + // error is the primary/root failure, so it leads; the consumer-stop error + // is secondary cleanup. + err = errors.Join(serverErr, errStop) } return err diff --git a/submitqueue/gateway/README.md b/submitqueue/gateway/README.md index 2858fe77..5903f009 100644 --- a/submitqueue/gateway/README.md +++ b/submitqueue/gateway/README.md @@ -6,8 +6,9 @@ orchestrator pipeline asynchronously via the message queue. ## Request log ownership -The gateway is the **sole writer of the request log**. No other service persists -request log entries: +The gateway is the **sole owner of the request log** — the only service that +both writes and reads it. No other service persists or reads request log +entries: - For statuses it produces synchronously (`accepted` on `Land`, `cancelling` on `Cancel`), the gateway writes directly to storage so the entry is visible the @@ -16,5 +17,9 @@ request log entries: the `log` topic via `submitqueue/core/request.PublishLog`. The gateway runs a consumer that drains the `log` topic and persists each entry to storage. +Reads are likewise gateway-only: the `Status` and `Cancel` RPCs read the request +log directly from storage. The orchestrator only *publishes* log entries and +never touches the request log store. + This keeps a single service responsible for the request log while letting the orchestrator remain free of storage writes for it. diff --git a/test/e2e/submitqueue/suite_test.go b/test/e2e/submitqueue/suite_test.go index 0bf53067..91fb0fb7 100644 --- a/test/e2e/submitqueue/suite_test.go +++ b/test/e2e/submitqueue/suite_test.go @@ -56,6 +56,17 @@ func TestE2EIntegration(t *testing.T) { suite.Run(t, new(E2EIntegrationSuite)) } +// The gateway log consumer runs inside the gateway-service container, so this +// suite can only observe persistence black-box through the Status RPC — there is +// no in-process channel/HookSignal to wait on across the container boundary. A +// bounded poll is therefore the deterministic-enough analog: persistTimeout is a +// safety net (a failure here means something is genuinely stuck, not a timing +// race), and persistPollInterval bounds how often we re-query. +const ( + persistTimeout = 30 * time.Second + persistPollInterval = 500 * time.Millisecond +) + func (s *E2EIntegrationSuite) SetupSuite() { t := s.T() s.ctx = context.Background() @@ -202,7 +213,7 @@ func (s *E2EIntegrationSuite) TestLandRequest_PersistsStartedLogViaGatewayConsum } s.log.Logf("Status(%s) = %q", sqid, resp.Status) return resp.Status == string(entity.RequestStatusStarted) - }, 30*time.Second, 500*time.Millisecond, + }, persistTimeout, persistPollInterval, "request %s should reach status %q via the gateway log consumer", sqid, entity.RequestStatusStarted) s.log.Logf("Gateway consumer persisted orchestrator-published 'started' log for sqid=%s", sqid) diff --git a/test/integration/submitqueue/gateway/suite_test.go b/test/integration/submitqueue/gateway/suite_test.go index afa2764d..1db104c8 100644 --- a/test/integration/submitqueue/gateway/suite_test.go +++ b/test/integration/submitqueue/gateway/suite_test.go @@ -60,6 +60,17 @@ func TestGatewayIntegration(t *testing.T) { suite.Run(t, new(GatewayIntegrationSuite)) } +// The log consumer runs inside the gateway-service container, so this suite can +// only observe persistence black-box through the Status RPC — there is no +// in-process channel/HookSignal to wait on across the container boundary. A +// bounded poll is therefore the deterministic-enough analog: persistTimeout is a +// safety net (a failure here means something is genuinely stuck, not a timing +// race), and persistPollInterval bounds how often we re-query. +const ( + persistTimeout = 30 * time.Second + persistPollInterval = 500 * time.Millisecond +) + func (s *GatewayIntegrationSuite) SetupSuite() { t := s.T() s.ctx = context.Background() @@ -190,7 +201,7 @@ func (s *GatewayIntegrationSuite) TestRequestLogConsumer() { return false } return resp.Status == string(entity.RequestStatusStarted) - }, 30*time.Second, 500*time.Millisecond, + }, persistTimeout, persistPollInterval, "gateway log consumer should persist the published request log for sqid=%s", sqid) s.log.Logf("Request log consumer test passed: entry persisted and readable via Status") From 1a51248c1ead7f35325addaa2b146fcf0a2ee61d Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Fri, 5 Jun 2026 15:53:22 -0700 Subject: [PATCH 3/3] docs(workflow): document queue and database ownership by service MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Issue #211 (follow-up from PR #205) asks for a single place that records the submitqueue topology at a high level: which service owns its data and how the two services communicate. The workflow RFC already covers the cross-queue flow, so ownership belongs alongside it. Append an "Ownership by service" section to doc/rfc/submitqueue/workflow.md, described at a conceptual level rather than enumerating individual tables and topics: - Gateway — RPC entry point and owner of the request log; the only service that reads or writes that record. - Orchestrator — runs the pipeline and owns its working state (requests, batches, builds); the only service that writes it. - Messaging queue — the shared, pluggable infrastructure the two services communicate through, kept in its own database separate from application data. A closing "Request-log ownership invariant" section captures the rule: the orchestrator only emits log events, the gateway is the sole consumer and the only writer of the request log. Documentation only; no code, schema, or proto changes. - ✅ `make lint` (clean tree) Closes #211 --- doc/rfc/submitqueue/workflow.md | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/doc/rfc/submitqueue/workflow.md b/doc/rfc/submitqueue/workflow.md index 4e255312..52e1aff5 100644 --- a/doc/rfc/submitqueue/workflow.md +++ b/doc/rfc/submitqueue/workflow.md @@ -14,8 +14,8 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate` │ LandRequest ▼ ┌──────────────────────┐ ┌──────────────────────────────────┐ - │ log (terminal sink) │◄───│ start │ - │ Append RequestLog │ │ Persist Request, emit Started │ + │ gateway: log │◄───│ start │ + │ Persist request log │ │ Persist Request, emit Started │ └──────────────────────┘ └────────────────┬─────────────────┘ ▲ │ RequestID │ ▼ @@ -90,3 +90,25 @@ The DLQ controllers do not re-attempt the failed work. They decode the payload t DLQ consumers are wired with `errs.AlwaysRetryableProcessor` and a very high `Retry.MaxAttempts`, with their own DLQ disabled. That combination makes reconciliation effectively non-droppable: any failure is forced retryable rather than escalating to a second-level dead-letter that nobody consumes. The trade-off is that a genuinely unprocessable DLQ message — typically a malformed payload — must be removed by an operator. See `submitqueue/orchestrator/controller/dlq/README.md` for the design constraints (simplest possible implementation, reconcile-only, no recovery) and the per-topic controller mapping. + +## Ownership by service + +Each service owns its own data; the gateway and orchestrator never touch each other's, and the only thing they share is the messaging queue. + +### Gateway + +The gateway is the RPC entry point and the owner of the request log. It accepts requests, hands them to the orchestrator over the queue, and owns the record of what happened to each request — the only service that reads or writes the request log. It writes that record both directly, as requests arrive, and by consuming the log events the orchestrator emits. + +### Orchestrator + +The orchestrator runs the pipeline that advances a request from acceptance to a terminal state. It owns the working state of that pipeline — requests, batches, builds, and their bookkeeping — and is the only service that writes it. It drives a request through a series of internal stages, re-entering speculation as CI results arrive and as batches advance. + +### Shared: the messaging queue + +The two services communicate only through the messaging queue. It is pluggable infrastructure kept in its own database, separate from either service's application data: the gateway publishes incoming requests for the orchestrator to consume, and the orchestrator publishes log events for the gateway to consume. + +## Request-log ownership invariant + +The request log has exactly one owner: the **gateway**. The orchestrator only emits log events onto the queue; it never persists them. The gateway is the sole consumer of those events and the only writer of the request log. + +This keeps all request-log writes in one service: the orchestrator stays a pure pipeline that emits events, and the gateway owns the request log end to end.