Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/12830.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Bounded the number of parsed-but-unhandled pipelined HTTP/1 requests buffered per connection on the server; once the queue reaches an internal limit the parser stops emitting and the transport is paused, resuming as the request handler drains the queue, so a client keeping one handler busy can no longer accumulate an unbounded backlog of pipelined requests -- by :user:`bdraco`.
20 changes: 18 additions & 2 deletions aiohttp/_http_parser.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ cdef class HttpParser:
list _messages
bint _more_data_available
bint _paused
Py_ssize_t _msg_in_flight
Py_ssize_t _max_msg_queue_size
bint _eof_pending
object _payload
unsigned long long _content_length_expected
Expand Down Expand Up @@ -361,6 +363,7 @@ cdef class HttpParser:
size_t max_field_size=8190, payload_exception=None,
bint response_with_body=True, bint read_until_eof=False,
bint auto_decompress=True,
Py_ssize_t max_msg_queue_size=0,
):
cparser.llhttp_settings_init(self._csettings)
cparser.llhttp_init(self._cparser, mode, self._csettings)
Expand All @@ -375,6 +378,8 @@ cdef class HttpParser:
self._buf = bytearray()
self._more_data_available = False
self._paused = False
self._msg_in_flight = 0
self._max_msg_queue_size = max_msg_queue_size
self._eof_pending = False
self._payload = None
self._payload_error = 0
Expand Down Expand Up @@ -558,6 +563,11 @@ cdef class HttpParser:
assert self._payload is not None
self._paused = True

def message_consumed(self):
# Protocol drained a queued message; free a slot for parsing.
if self._msg_in_flight > 0:
self._msg_in_flight -= 1

def feed_eof(self):
cdef bytes desc

Expand Down Expand Up @@ -680,12 +690,12 @@ cdef class HttpRequestParser(HttpParser):
size_t max_line_size=8190, size_t max_headers=128,
size_t max_field_size=8190, payload_exception=None,
bint response_with_body=True, bint read_until_eof=False,
bint auto_decompress=True,
bint auto_decompress=True, Py_ssize_t max_msg_queue_size=0,
):
self._init(cparser.HTTP_REQUEST, protocol, loop, limit, timer,
max_line_size, max_headers, max_field_size,
payload_exception, response_with_body, read_until_eof,
auto_decompress)
auto_decompress, max_msg_queue_size)

cdef object _on_status_complete(self):
cdef int idx1, idx2
Expand Down Expand Up @@ -894,6 +904,12 @@ cdef int cb_on_message_complete(cparser.llhttp_t* parser) except -1:
pyparser._last_error = exc
return -1
else:
if pyparser._max_msg_queue_size:
pyparser._msg_in_flight += 1
if pyparser._msg_in_flight >= pyparser._max_msg_queue_size:
# Queue full: pause llhttp between messages. feed_data() buffers
# the remainder as tail; resumes once the queue drains.
return cparser.HPE_PAUSED
return 0


Expand Down
25 changes: 22 additions & 3 deletions aiohttp/base_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
if TYPE_CHECKING:
from .http_parser import HttpParser

# Raised by transport.pause_reading()/resume_reading() when the transport
# does not support flow control; safe to ignore.
# NOTE: Catch these with a plain try/except/pass, never contextlib.suppress():
# pause/resume run on the hot read path and suppress() is ~6x slower than
# try/except here (it builds a context manager and unpacks this tuple per call).
PAUSE_RESUME_READING_ERRORS = (AttributeError, NotImplementedError, RuntimeError)


class BaseProtocol(asyncio.Protocol):
__slots__ = (
Expand Down Expand Up @@ -65,9 +72,15 @@ def pause_reading(self) -> None:
if self.transport is not None:
try:
self.transport.pause_reading()
except (AttributeError, NotImplementedError, RuntimeError):
except PAUSE_RESUME_READING_ERRORS:
# Transport lacks flow control; nothing to pause. Intentionally
# ignored (see PAUSE_RESUME_READING_ERRORS; do not use suppress).
pass

def _reading_paused_for_msg_queue(self) -> bool:
"""Keep the transport paused for protocol-specific reasons (overridden)."""
return False

def resume_reading(self, resume_parser: bool = True) -> None:
self._reading_paused = False

Expand All @@ -77,10 +90,16 @@ def resume_reading(self, resume_parser: bool = True) -> None:

# Reading may have been paused again in the above call if there was a lot of
# compressed data still pending.
if not self._reading_paused and self.transport is not None:
if (
not self._reading_paused
and not self._reading_paused_for_msg_queue()
and self.transport is not None
):
try:
self.transport.resume_reading()
except (AttributeError, NotImplementedError, RuntimeError):
except PAUSE_RESUME_READING_ERRORS:
# Transport lacks flow control; nothing to resume. Intentionally
# ignored (see PAUSE_RESUME_READING_ERRORS; do not use suppress).
pass
self._reading_paused = False

Expand Down
20 changes: 20 additions & 0 deletions aiohttp/http_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ def __init__(
response_with_body: bool = True,
read_until_eof: bool = False,
auto_decompress: bool = True,
max_msg_queue_size: int = 0,
) -> None:
self.protocol = protocol
self.loop = loop
Expand All @@ -288,6 +289,9 @@ def __init__(
self._auto_decompress = auto_decompress
self._limit = limit
self._headers_parser = HeadersParser(max_field_size, self.lax)
# Stop emitting messages once this many are queued unconsumed (0 = off).
self._max_msg_queue_size = max_msg_queue_size
self._msg_in_flight = 0

@abc.abstractmethod
def parse_message(self, lines: list[bytes]) -> _MsgT: ...
Expand All @@ -299,6 +303,11 @@ def pause_reading(self) -> None:
assert self._payload_parser is not None
self._payload_parser.pause_reading()

def message_consumed(self) -> None:
"""Protocol drained a queued message; free a slot for parsing."""
if self._msg_in_flight > 0:
self._msg_in_flight -= 1

def feed_eof(self) -> _MsgT | None:
if self._payload_parser is not None:
self._payload_parser.feed_eof()
Expand Down Expand Up @@ -340,6 +349,15 @@ def feed_data(
# read HTTP message (request/response line + headers), \r\n\r\n
# and split by lines
if self._payload_parser is None and not self._upgraded:
if (
self._max_msg_queue_size
and self._msg_in_flight >= self._max_msg_queue_size
):
# Queue full: buffer the rest and stop. Safe pause point;
# any preceding body is consumed before the next request
# line. Resumes via feed_data(b"") when the queue drains.
self._tail = data[start_pos:]
break
pos = data.find(SEP, start_pos)
# consume \r\n
if pos == start_pos and not self._lines:
Expand Down Expand Up @@ -484,6 +502,8 @@ def get_content_length() -> int | None:
payload = EMPTY_PAYLOAD

messages.append((msg, payload))
if self._max_msg_queue_size:
self._msg_in_flight += 1
should_close = msg.should_close
else:
self._tail = data[start_pos:]
Expand Down
68 changes: 67 additions & 1 deletion aiohttp/web_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from propcache import under_cached_property

from .abc import AbstractAccessLogger, AbstractAsyncAccessLogger, AbstractStreamWriter
from .base_protocol import BaseProtocol
from .base_protocol import PAUSE_RESUME_READING_ERRORS, BaseProtocol
from .helpers import DEFAULT_CHUNK_SIZE, ceil_timeout, frozen_dataclass_decorator
from .http import (
HttpProcessingError,
Expand All @@ -35,6 +35,11 @@

__all__ = ("RequestHandler", "RequestPayloadError", "PayloadAccessError")

# Max parsed-but-unhandled pipelined requests buffered per connection before
# reading is paused. Bounds memory a client can pin by keeping one handler busy
# and pipelining behind it; reading resumes as the queue drains.
MAX_MSG_QUEUE_SIZE = 32

if TYPE_CHECKING:
import ssl

Expand Down Expand Up @@ -168,6 +173,9 @@ class RequestHandler(BaseProtocol, Generic[_Request]):
"_keepalive_timeout",
"_lingering_time",
"_messages",
"_max_msg_queue_size",
"_msg_queue_resume_size",
"_msg_queue_paused",
"_message_tail",
"_handler_waiter",
"_waiter",
Expand Down Expand Up @@ -206,6 +214,13 @@ def __init__(
auto_decompress: bool = True,
timeout_ceil_threshold: float = 5,
):
self._max_msg_queue_size = MAX_MSG_QUEUE_SIZE
# Low-water mark: resume reading once the queue drains to half the limit
# so we refill in batches instead of churning pause/resume per request.
self._msg_queue_resume_size = MAX_MSG_QUEUE_SIZE // 2
# Set before super().__init__ so _reading_paused_for_msg_queue() is safe
# if BaseProtocol ever triggers a resume during init.
self._msg_queue_paused = False
parser = HttpRequestParser(
self,
loop,
Expand All @@ -215,6 +230,7 @@ def __init__(
max_headers=max_headers,
payload_exception=RequestPayloadError,
auto_decompress=auto_decompress,
max_msg_queue_size=MAX_MSG_QUEUE_SIZE,
)
super().__init__(loop, parser)

Expand Down Expand Up @@ -461,6 +477,14 @@ def data_received(self, data: bytes) -> None:
# don't set result twice
waiter.set_result(None)

# Queue full: pause the transport (the parser already stopped
# emitting). start() resumes as it drains the queue.
if (
not self._msg_queue_paused
and len(self._messages) >= self._max_msg_queue_size
):
self._pause_msg_queue_reading()

self._upgraded = upgraded
if upgraded and tail:
self._message_tail = tail
Expand All @@ -477,6 +501,36 @@ def data_received(self, data: bytes) -> None:
if eof:
self.close()

def _reading_paused_for_msg_queue(self) -> bool:
return self._msg_queue_paused

def _pause_msg_queue_reading(self) -> None:
self._msg_queue_paused = True
if self.transport is not None:
try:
self.transport.pause_reading()
except PAUSE_RESUME_READING_ERRORS:
# Transport lacks flow control; nothing to pause. Intentionally
# ignored (see PAUSE_RESUME_READING_ERRORS; do not use suppress).
pass

def _resume_msg_queue_reading(self) -> None:
if not self._upgraded:
# Reparse buffered pipelined requests while still marked paused so
# a refill past the limit does not re-pause an already-paused
# transport; only resume below once it stayed under the limit.
self.data_received(b"")
if len(self._messages) >= self._max_msg_queue_size:
return
self._msg_queue_paused = False
if not self._reading_paused and self.transport is not None:
try:
self.transport.resume_reading()
except PAUSE_RESUME_READING_ERRORS:
# Transport lacks flow control; nothing to resume. Intentionally
# ignored (see PAUSE_RESUME_READING_ERRORS; do not use suppress).
pass

def keep_alive(self, val: bool) -> None:
"""Set keep-alive connection mode.

Expand Down Expand Up @@ -606,6 +660,18 @@ async def start(self) -> None:

message, payload = self._messages.popleft()

# Free a parser slot; resume reading once drained to low water so
# pipelining keeps flowing while this request is handled.
# no branch: _parser is only None after connection_lost, whose path
# exits this loop, so the None case is not reachably exercisable.
if self._parser is not None: # pragma: no branch
self._parser.message_consumed()
if (
self._msg_queue_paused
and len(self._messages) <= self._msg_queue_resume_size
):
self._resume_msg_queue_reading()

# time is only fetched if logging is enabled as otherwise
# its thrown away and never used.
start = loop.time() if self._logging_enabled else None
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ peername
performant
pickleable
ping
pipelined
pipelining
pluggable
plugin
Expand Down
72 changes: 72 additions & 0 deletions tests/test_http_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,78 @@ def test_c_parser_loaded() -> None:
assert "RawResponseMessageC" in dir(aiohttp.http_parser)


_PIPELINED_GET = b"GET / HTTP/1.1\r\nHost: a\r\n\r\n"


def _build_request_parser(
request_cls: type[HttpRequestParser],
protocol: BaseProtocol,
loop: asyncio.AbstractEventLoop,
max_msg_queue_size: int,
) -> HttpRequestParser:
return request_cls(
protocol,
loop,
DEFAULT_CHUNK_SIZE,
max_line_size=8190,
max_headers=128,
max_field_size=8190,
max_msg_queue_size=max_msg_queue_size,
)


def test_max_msg_queue_size_caps_emitted_messages(
request_cls: type[HttpRequestParser],
protocol: BaseProtocol,
event_loop: asyncio.AbstractEventLoop,
) -> None:
parser = _build_request_parser(request_cls, protocol, event_loop, 4)
messages, upgraded, _tail = parser.feed_data(_PIPELINED_GET * 10)
assert len(messages) == 4
assert not upgraded


def test_max_msg_queue_size_resumes_after_consume(
request_cls: type[HttpRequestParser],
protocol: BaseProtocol,
event_loop: asyncio.AbstractEventLoop,
) -> None:
limit = 4
total = 10
parser = _build_request_parser(request_cls, protocol, event_loop, limit)
messages, _upgraded, _tail = parser.feed_data(_PIPELINED_GET * total)
seen = 0
while messages:
assert len(messages) <= limit
seen += len(messages)
for _msg, _payload in messages:
parser.message_consumed()
messages, _upgraded, _tail = parser.feed_data(b"")
assert seen == total


def test_max_msg_queue_size_zero_is_unbounded(
request_cls: type[HttpRequestParser],
protocol: BaseProtocol,
event_loop: asyncio.AbstractEventLoop,
) -> None:
parser = _build_request_parser(request_cls, protocol, event_loop, 0)
messages, _upgraded, _tail = parser.feed_data(_PIPELINED_GET * 50)
assert len(messages) == 50


def test_message_consumed_underflow_is_ignored(
request_cls: type[HttpRequestParser],
protocol: BaseProtocol,
event_loop: asyncio.AbstractEventLoop,
) -> None:
parser = _build_request_parser(request_cls, protocol, event_loop, 4)
# No message is in flight; consuming must not underflow the counter.
parser.message_consumed()
messages, _upgraded, _tail = parser.feed_data(_PIPELINED_GET * 4)
assert len(messages) == 4


def test_parse_headers(parser: HttpRequestParser) -> None:
text = b"""GET /test HTTP/1.1\r
Host: a\r
Expand Down
Loading
Loading