Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 19 additions & 24 deletions src/databricks/sql/backend/kernel/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,20 +472,26 @@ def execute_command(
except Exception:
# Canceller is best-effort; never block execute on it.
pass
executed = stmt.execute()
# Execute succeeded: the kernel now owns the statement
# lifecycle. It auto-closes the server statement when the
# result stream is fully drained (``ExecutedStatement::
# next_batch`` end-of-stream), with the executed handle's
# ``Drop`` as the backstop for partial/abandoned reads.
# So we must NOT close ``stmt`` here: a premature
# ``CloseStatement`` at execute-return broke lazy
# CloudFetch chunk-link fetches (``get_result_chunks``
# against the live statement) for large paginated-link
# results. Closing here is left ONLY for the error path
# below, where no executed handle / result set was
# produced to reap it.
# ``execute()`` is the kernel's directResults call (Thrift /
# JDBC / use_sea model), returning a single
# ``ExecutedAsyncStatement`` — no arm to feature-detect. On the
# fast path the handle is seeded with the inline result, so
# ``await_result()`` returns it with zero extra round-trips; on
# a slow query it is a poll/cancel handle. Register it so
# ``cursor.cancel()`` (cancel_command) and ``close_command`` can
# reach it, then drive it to terminal via ``await_result`` —
# preserving ``execute()``'s blocking contract. The
# ``_sync_cancellers`` canceller registered above also covers a
# cancel issued during this drive.
async_exec = stmt.execute()
command_id = CommandId.from_sea_statement_id(async_exec.statement_id)
cursor.active_command_id = command_id
with self._async_handles_lock:
self._async_handles[command_id.guid] = async_exec
self._async_statements[command_id.guid] = stmt
close_stmt = False
stream = async_exec.await_result()
return self._make_result_set(stream, cursor, command_id)
except Exception as exc:
raise _wrap_kernel_exception("execute_command", exc) from exc
finally:
Expand All @@ -501,17 +507,6 @@ def execute_command(
except Exception:
pass

command_id = CommandId.from_sea_statement_id(executed.statement_id)
cursor.active_command_id = command_id
# ``KernelResultSet.__init__`` calls ``arrow_schema()`` which
# can itself raise ``KernelError`` (or, in principle, a PyO3
# native exception) — wrap the construction so callers see a
# mapped PEP 249 exception.
try:
return self._make_result_set(executed, cursor, command_id)
except Exception as exc:
raise _wrap_kernel_exception("execute_command", exc) from exc

def cancel_command(self, command_id: CommandId) -> None:
with self._async_handles_lock:
handle = self._async_handles.get(command_id.guid)
Expand Down
Loading