From 8974e7bb744b49d4c3c040598c64bb2a2da06f2e Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Sat, 6 Jun 2026 12:18:58 +0000 Subject: [PATCH] feat(kernel): consume the folded directResults execute() The kernel's `Statement.execute()` is now the directResults call (databricks/databricks-sql-kernel#136): it returns a SINGLE `ExecutedAsyncStatement` -- seeded with the inline result when the query finished within the server inline wait (fast path, zero extra round-trips), or a poll/cancel handle when still running -- instead of always blocking to terminal. `KernelDatabricksClient.execute_command` now drives ONE uniform path (no `hasattr(await_result)` arm to feature-detect): register the handle (so `cursor.cancel()` / `close()` reach it), then `await_result()` to drive it to a ready result set -- preserving execute()'s blocking contract. `cursor.execute()` behaviour is unchanged for callers (still blocks to a ready result set); this just adapts to the kernel's new single-handle return. Mid-run cancel still works via the `_sync_cancellers` canceller registered before execute. Tested e2e against live warehouses (auto-closing and non-auto-closing): SELECT 1, range(N), and CREATE+count all succeed on both. The kernel maps an auto-closed `CLOSED` statement to `Succeeded`, so the connector sees a uniform success regardless of warehouse auto-close behaviour. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore --- src/databricks/sql/backend/kernel/client.py | 43 +++++++++------------ 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/src/databricks/sql/backend/kernel/client.py b/src/databricks/sql/backend/kernel/client.py index cb3b0b7ba..2b6b4265d 100644 --- a/src/databricks/sql/backend/kernel/client.py +++ b/src/databricks/sql/backend/kernel/client.py @@ -472,20 +472,26 @@ def execute_command( except Exception: # Canceller is best-effort; never block execute on it. pass - executed = stmt.execute() - # Execute succeeded: the kernel now owns the statement - # lifecycle. It auto-closes the server statement when the - # result stream is fully drained (``ExecutedStatement:: - # next_batch`` end-of-stream), with the executed handle's - # ``Drop`` as the backstop for partial/abandoned reads. - # So we must NOT close ``stmt`` here: a premature - # ``CloseStatement`` at execute-return broke lazy - # CloudFetch chunk-link fetches (``get_result_chunks`` - # against the live statement) for large paginated-link - # results. Closing here is left ONLY for the error path - # below, where no executed handle / result set was - # produced to reap it. + # ``execute()`` is the kernel's directResults call (Thrift / + # JDBC / use_sea model), returning a single + # ``ExecutedAsyncStatement`` — no arm to feature-detect. On the + # fast path the handle is seeded with the inline result, so + # ``await_result()`` returns it with zero extra round-trips; on + # a slow query it is a poll/cancel handle. Register it so + # ``cursor.cancel()`` (cancel_command) and ``close_command`` can + # reach it, then drive it to terminal via ``await_result`` — + # preserving ``execute()``'s blocking contract. The + # ``_sync_cancellers`` canceller registered above also covers a + # cancel issued during this drive. + async_exec = stmt.execute() + command_id = CommandId.from_sea_statement_id(async_exec.statement_id) + cursor.active_command_id = command_id + with self._async_handles_lock: + self._async_handles[command_id.guid] = async_exec + self._async_statements[command_id.guid] = stmt close_stmt = False + stream = async_exec.await_result() + return self._make_result_set(stream, cursor, command_id) except Exception as exc: raise _wrap_kernel_exception("execute_command", exc) from exc finally: @@ -501,17 +507,6 @@ def execute_command( except Exception: pass - command_id = CommandId.from_sea_statement_id(executed.statement_id) - cursor.active_command_id = command_id - # ``KernelResultSet.__init__`` calls ``arrow_schema()`` which - # can itself raise ``KernelError`` (or, in principle, a PyO3 - # native exception) — wrap the construction so callers see a - # mapped PEP 249 exception. - try: - return self._make_result_set(executed, cursor, command_id) - except Exception as exc: - raise _wrap_kernel_exception("execute_command", exc) from exc - def cancel_command(self, command_id: CommandId) -> None: with self._async_handles_lock: handle = self._async_handles.get(command_id.guid)