Made an application for supporting sustainable local businesses in San Pancho.
Never really got completed, but it has some useful Svelte components for maps that we can reuse.
http://greenspots.dctrl.space
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
420 lines
14 KiB
420 lines
14 KiB
""" |
|
WSGI Protocol Linter |
|
==================== |
|
|
|
This module provides a middleware that performs sanity checks on the |
|
behavior of the WSGI server and application. It checks that the |
|
:pep:`3333` WSGI spec is properly implemented. It also warns on some |
|
common HTTP errors such as non-empty responses for 304 status codes. |
|
|
|
.. autoclass:: LintMiddleware |
|
|
|
:copyright: 2007 Pallets |
|
:license: BSD-3-Clause |
|
""" |
|
import typing as t |
|
from types import TracebackType |
|
from urllib.parse import urlparse |
|
from warnings import warn |
|
|
|
from ..datastructures import Headers |
|
from ..http import is_entity_header |
|
from ..wsgi import FileWrapper |
|
|
|
if t.TYPE_CHECKING: |
|
from _typeshed.wsgi import StartResponse |
|
from _typeshed.wsgi import WSGIApplication |
|
from _typeshed.wsgi import WSGIEnvironment |
|
|
|
|
|
class WSGIWarning(Warning): |
|
"""Warning class for WSGI warnings.""" |
|
|
|
|
|
class HTTPWarning(Warning): |
|
"""Warning class for HTTP warnings.""" |
|
|
|
|
|
def check_type(context: str, obj: object, need: t.Type = str) -> None: |
|
if type(obj) is not need: |
|
warn( |
|
f"{context!r} requires {need.__name__!r}, got {type(obj).__name__!r}.", |
|
WSGIWarning, |
|
stacklevel=3, |
|
) |
|
|
|
|
|
class InputStream: |
|
def __init__(self, stream: t.IO[bytes]) -> None: |
|
self._stream = stream |
|
|
|
def read(self, *args: t.Any) -> bytes: |
|
if len(args) == 0: |
|
warn( |
|
"WSGI does not guarantee an EOF marker on the input stream, thus making" |
|
" calls to 'wsgi.input.read()' unsafe. Conforming servers may never" |
|
" return from this call.", |
|
WSGIWarning, |
|
stacklevel=2, |
|
) |
|
elif len(args) != 1: |
|
warn( |
|
"Too many parameters passed to 'wsgi.input.read()'.", |
|
WSGIWarning, |
|
stacklevel=2, |
|
) |
|
return self._stream.read(*args) |
|
|
|
def readline(self, *args: t.Any) -> bytes: |
|
if len(args) == 0: |
|
warn( |
|
"Calls to 'wsgi.input.readline()' without arguments are unsafe. Use" |
|
" 'wsgi.input.read()' instead.", |
|
WSGIWarning, |
|
stacklevel=2, |
|
) |
|
elif len(args) == 1: |
|
warn( |
|
"'wsgi.input.readline()' was called with a size hint. WSGI does not" |
|
" support this, although it's available on all major servers.", |
|
WSGIWarning, |
|
stacklevel=2, |
|
) |
|
else: |
|
raise TypeError("Too many arguments passed to 'wsgi.input.readline()'.") |
|
return self._stream.readline(*args) |
|
|
|
def __iter__(self) -> t.Iterator[bytes]: |
|
try: |
|
return iter(self._stream) |
|
except TypeError: |
|
warn("'wsgi.input' is not iterable.", WSGIWarning, stacklevel=2) |
|
return iter(()) |
|
|
|
def close(self) -> None: |
|
warn("The application closed the input stream!", WSGIWarning, stacklevel=2) |
|
self._stream.close() |
|
|
|
|
|
class ErrorStream: |
|
def __init__(self, stream: t.IO[str]) -> None: |
|
self._stream = stream |
|
|
|
def write(self, s: str) -> None: |
|
check_type("wsgi.error.write()", s, str) |
|
self._stream.write(s) |
|
|
|
def flush(self) -> None: |
|
self._stream.flush() |
|
|
|
def writelines(self, seq: t.Iterable[str]) -> None: |
|
for line in seq: |
|
self.write(line) |
|
|
|
def close(self) -> None: |
|
warn("The application closed the error stream!", WSGIWarning, stacklevel=2) |
|
self._stream.close() |
|
|
|
|
|
class GuardedWrite: |
|
def __init__(self, write: t.Callable[[bytes], None], chunks: t.List[int]) -> None: |
|
self._write = write |
|
self._chunks = chunks |
|
|
|
def __call__(self, s: bytes) -> None: |
|
check_type("write()", s, bytes) |
|
self._write(s) |
|
self._chunks.append(len(s)) |
|
|
|
|
|
class GuardedIterator: |
|
def __init__( |
|
self, |
|
iterator: t.Iterable[bytes], |
|
headers_set: t.Tuple[int, Headers], |
|
chunks: t.List[int], |
|
) -> None: |
|
self._iterator = iterator |
|
self._next = iter(iterator).__next__ |
|
self.closed = False |
|
self.headers_set = headers_set |
|
self.chunks = chunks |
|
|
|
def __iter__(self) -> "GuardedIterator": |
|
return self |
|
|
|
def __next__(self) -> bytes: |
|
if self.closed: |
|
warn("Iterated over closed 'app_iter'.", WSGIWarning, stacklevel=2) |
|
|
|
rv = self._next() |
|
|
|
if not self.headers_set: |
|
warn( |
|
"The application returned before it started the response.", |
|
WSGIWarning, |
|
stacklevel=2, |
|
) |
|
|
|
check_type("application iterator items", rv, bytes) |
|
self.chunks.append(len(rv)) |
|
return rv |
|
|
|
def close(self) -> None: |
|
self.closed = True |
|
|
|
if hasattr(self._iterator, "close"): |
|
self._iterator.close() # type: ignore |
|
|
|
if self.headers_set: |
|
status_code, headers = self.headers_set |
|
bytes_sent = sum(self.chunks) |
|
content_length = headers.get("content-length", type=int) |
|
|
|
if status_code == 304: |
|
for key, _value in headers: |
|
key = key.lower() |
|
if key not in ("expires", "content-location") and is_entity_header( |
|
key |
|
): |
|
warn( |
|
f"Entity header {key!r} found in 304 response.", HTTPWarning |
|
) |
|
if bytes_sent: |
|
warn("304 responses must not have a body.", HTTPWarning) |
|
elif 100 <= status_code < 200 or status_code == 204: |
|
if content_length != 0: |
|
warn( |
|
f"{status_code} responses must have an empty content length.", |
|
HTTPWarning, |
|
) |
|
if bytes_sent: |
|
warn(f"{status_code} responses must not have a body.", HTTPWarning) |
|
elif content_length is not None and content_length != bytes_sent: |
|
warn( |
|
"Content-Length and the number of bytes sent to the" |
|
" client do not match.", |
|
WSGIWarning, |
|
) |
|
|
|
def __del__(self) -> None: |
|
if not self.closed: |
|
try: |
|
warn( |
|
"Iterator was garbage collected before it was closed.", WSGIWarning |
|
) |
|
except Exception: |
|
pass |
|
|
|
|
|
class LintMiddleware: |
|
"""Warns about common errors in the WSGI and HTTP behavior of the |
|
server and wrapped application. Some of the issues it checks are: |
|
|
|
- invalid status codes |
|
- non-bytes sent to the WSGI server |
|
- strings returned from the WSGI application |
|
- non-empty conditional responses |
|
- unquoted etags |
|
- relative URLs in the Location header |
|
- unsafe calls to wsgi.input |
|
- unclosed iterators |
|
|
|
Error information is emitted using the :mod:`warnings` module. |
|
|
|
:param app: The WSGI application to wrap. |
|
|
|
.. code-block:: python |
|
|
|
from werkzeug.middleware.lint import LintMiddleware |
|
app = LintMiddleware(app) |
|
""" |
|
|
|
def __init__(self, app: "WSGIApplication") -> None: |
|
self.app = app |
|
|
|
def check_environ(self, environ: "WSGIEnvironment") -> None: |
|
if type(environ) is not dict: |
|
warn( |
|
"WSGI environment is not a standard Python dict.", |
|
WSGIWarning, |
|
stacklevel=4, |
|
) |
|
for key in ( |
|
"REQUEST_METHOD", |
|
"SERVER_NAME", |
|
"SERVER_PORT", |
|
"wsgi.version", |
|
"wsgi.input", |
|
"wsgi.errors", |
|
"wsgi.multithread", |
|
"wsgi.multiprocess", |
|
"wsgi.run_once", |
|
): |
|
if key not in environ: |
|
warn( |
|
f"Required environment key {key!r} not found", |
|
WSGIWarning, |
|
stacklevel=3, |
|
) |
|
if environ["wsgi.version"] != (1, 0): |
|
warn("Environ is not a WSGI 1.0 environ.", WSGIWarning, stacklevel=3) |
|
|
|
script_name = environ.get("SCRIPT_NAME", "") |
|
path_info = environ.get("PATH_INFO", "") |
|
|
|
if script_name and script_name[0] != "/": |
|
warn( |
|
f"'SCRIPT_NAME' does not start with a slash: {script_name!r}", |
|
WSGIWarning, |
|
stacklevel=3, |
|
) |
|
|
|
if path_info and path_info[0] != "/": |
|
warn( |
|
f"'PATH_INFO' does not start with a slash: {path_info!r}", |
|
WSGIWarning, |
|
stacklevel=3, |
|
) |
|
|
|
def check_start_response( |
|
self, |
|
status: str, |
|
headers: t.List[t.Tuple[str, str]], |
|
exc_info: t.Optional[ |
|
t.Tuple[t.Type[BaseException], BaseException, TracebackType] |
|
], |
|
) -> t.Tuple[int, Headers]: |
|
check_type("status", status, str) |
|
status_code_str = status.split(None, 1)[0] |
|
|
|
if len(status_code_str) != 3 or not status_code_str.isdigit(): |
|
warn("Status code must be three digits.", WSGIWarning, stacklevel=3) |
|
|
|
if len(status) < 4 or status[3] != " ": |
|
warn( |
|
f"Invalid value for status {status!r}. Valid status strings are three" |
|
" digits, a space and a status explanation.", |
|
WSGIWarning, |
|
stacklevel=3, |
|
) |
|
|
|
status_code = int(status_code_str) |
|
|
|
if status_code < 100: |
|
warn("Status code < 100 detected.", WSGIWarning, stacklevel=3) |
|
|
|
if type(headers) is not list: |
|
warn("Header list is not a list.", WSGIWarning, stacklevel=3) |
|
|
|
for item in headers: |
|
if type(item) is not tuple or len(item) != 2: |
|
warn("Header items must be 2-item tuples.", WSGIWarning, stacklevel=3) |
|
name, value = item |
|
if type(name) is not str or type(value) is not str: |
|
warn( |
|
"Header keys and values must be strings.", WSGIWarning, stacklevel=3 |
|
) |
|
if name.lower() == "status": |
|
warn( |
|
"The status header is not supported due to" |
|
" conflicts with the CGI spec.", |
|
WSGIWarning, |
|
stacklevel=3, |
|
) |
|
|
|
if exc_info is not None and not isinstance(exc_info, tuple): |
|
warn("Invalid value for exc_info.", WSGIWarning, stacklevel=3) |
|
|
|
headers = Headers(headers) |
|
self.check_headers(headers) |
|
|
|
return status_code, headers |
|
|
|
def check_headers(self, headers: Headers) -> None: |
|
etag = headers.get("etag") |
|
|
|
if etag is not None: |
|
if etag.startswith(("W/", "w/")): |
|
if etag.startswith("w/"): |
|
warn( |
|
"Weak etag indicator should be upper case.", |
|
HTTPWarning, |
|
stacklevel=4, |
|
) |
|
|
|
etag = etag[2:] |
|
|
|
if not (etag[:1] == etag[-1:] == '"'): |
|
warn("Unquoted etag emitted.", HTTPWarning, stacklevel=4) |
|
|
|
location = headers.get("location") |
|
|
|
if location is not None: |
|
if not urlparse(location).netloc: |
|
warn( |
|
"Absolute URLs required for location header.", |
|
HTTPWarning, |
|
stacklevel=4, |
|
) |
|
|
|
def check_iterator(self, app_iter: t.Iterable[bytes]) -> None: |
|
if isinstance(app_iter, bytes): |
|
warn( |
|
"The application returned a bytestring. The response will send one" |
|
" character at a time to the client, which will kill performance." |
|
" Return a list or iterable instead.", |
|
WSGIWarning, |
|
stacklevel=3, |
|
) |
|
|
|
def __call__(self, *args: t.Any, **kwargs: t.Any) -> t.Iterable[bytes]: |
|
if len(args) != 2: |
|
warn("A WSGI app takes two arguments.", WSGIWarning, stacklevel=2) |
|
|
|
if kwargs: |
|
warn( |
|
"A WSGI app does not take keyword arguments.", WSGIWarning, stacklevel=2 |
|
) |
|
|
|
environ: "WSGIEnvironment" = args[0] |
|
start_response: "StartResponse" = args[1] |
|
|
|
self.check_environ(environ) |
|
environ["wsgi.input"] = InputStream(environ["wsgi.input"]) |
|
environ["wsgi.errors"] = ErrorStream(environ["wsgi.errors"]) |
|
|
|
# Hook our own file wrapper in so that applications will always |
|
# iterate to the end and we can check the content length. |
|
environ["wsgi.file_wrapper"] = FileWrapper |
|
|
|
headers_set: t.List[t.Any] = [] |
|
chunks: t.List[int] = [] |
|
|
|
def checking_start_response( |
|
*args: t.Any, **kwargs: t.Any |
|
) -> t.Callable[[bytes], None]: |
|
if len(args) not in {2, 3}: |
|
warn( |
|
f"Invalid number of arguments: {len(args)}, expected 2 or 3.", |
|
WSGIWarning, |
|
stacklevel=2, |
|
) |
|
|
|
if kwargs: |
|
warn("'start_response' does not take keyword arguments.", WSGIWarning) |
|
|
|
status: str = args[0] |
|
headers: t.List[t.Tuple[str, str]] = args[1] |
|
exc_info: t.Optional[ |
|
t.Tuple[t.Type[BaseException], BaseException, TracebackType] |
|
] = (args[2] if len(args) == 3 else None) |
|
|
|
headers_set[:] = self.check_start_response(status, headers, exc_info) |
|
return GuardedWrite(start_response(status, headers, exc_info), chunks) |
|
|
|
app_iter = self.app(environ, t.cast("StartResponse", checking_start_response)) |
|
self.check_iterator(app_iter) |
|
return GuardedIterator( |
|
app_iter, t.cast(t.Tuple[int, Headers], headers_set), chunks |
|
)
|
|
|