Made an application for supporting sustainable local businesses in San Pancho.
Never really got completed, but it has some useful Svelte components for maps that we can reuse.
http://greenspots.dctrl.space
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1099 lines
36 KiB
1099 lines
36 KiB
import codecs |
|
import io |
|
import mimetypes |
|
import os |
|
import pkgutil |
|
import re |
|
import sys |
|
import typing as t |
|
import unicodedata |
|
import warnings |
|
from datetime import datetime |
|
from html.entities import name2codepoint |
|
from time import time |
|
from zlib import adler32 |
|
|
|
from ._internal import _DictAccessorProperty |
|
from ._internal import _missing |
|
from ._internal import _parse_signature |
|
from ._internal import _TAccessorValue |
|
from .datastructures import Headers |
|
from .exceptions import NotFound |
|
from .exceptions import RequestedRangeNotSatisfiable |
|
from .security import safe_join |
|
from .urls import url_quote |
|
from .wsgi import wrap_file |
|
|
|
if t.TYPE_CHECKING: |
|
from _typeshed.wsgi import WSGIEnvironment |
|
from .wrappers.request import Request |
|
from .wrappers.response import Response |
|
|
|
_T = t.TypeVar("_T") |
|
|
|
_entity_re = re.compile(r"&([^;]+);") |
|
_filename_ascii_strip_re = re.compile(r"[^A-Za-z0-9_.-]") |
|
_windows_device_files = ( |
|
"CON", |
|
"AUX", |
|
"COM1", |
|
"COM2", |
|
"COM3", |
|
"COM4", |
|
"LPT1", |
|
"LPT2", |
|
"LPT3", |
|
"PRN", |
|
"NUL", |
|
) |
|
|
|
|
|
class cached_property(property, t.Generic[_T]): |
|
"""A :func:`property` that is only evaluated once. Subsequent access |
|
returns the cached value. Setting the property sets the cached |
|
value. Deleting the property clears the cached value, accessing it |
|
again will evaluate it again. |
|
|
|
.. code-block:: python |
|
|
|
class Example: |
|
@cached_property |
|
def value(self): |
|
# calculate something important here |
|
return 42 |
|
|
|
e = Example() |
|
e.value # evaluates |
|
e.value # uses cache |
|
e.value = 16 # sets cache |
|
del e.value # clears cache |
|
|
|
The class must have a ``__dict__`` for this to work. |
|
|
|
.. versionchanged:: 2.0 |
|
``del obj.name`` clears the cached value. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
fget: t.Callable[[t.Any], _T], |
|
name: t.Optional[str] = None, |
|
doc: t.Optional[str] = None, |
|
) -> None: |
|
super().__init__(fget, doc=doc) |
|
self.__name__ = name or fget.__name__ |
|
self.__module__ = fget.__module__ |
|
|
|
def __set__(self, obj: object, value: _T) -> None: |
|
obj.__dict__[self.__name__] = value |
|
|
|
def __get__(self, obj: object, type: type = None) -> _T: # type: ignore |
|
if obj is None: |
|
return self # type: ignore |
|
|
|
value: _T = obj.__dict__.get(self.__name__, _missing) |
|
|
|
if value is _missing: |
|
value = self.fget(obj) # type: ignore |
|
obj.__dict__[self.__name__] = value |
|
|
|
return value |
|
|
|
def __delete__(self, obj: object) -> None: |
|
del obj.__dict__[self.__name__] |
|
|
|
|
|
def invalidate_cached_property(obj: object, name: str) -> None: |
|
"""Invalidates the cache for a :class:`cached_property`: |
|
|
|
>>> class Test(object): |
|
... @cached_property |
|
... def magic_number(self): |
|
... print("recalculating...") |
|
... return 42 |
|
... |
|
>>> var = Test() |
|
>>> var.magic_number |
|
recalculating... |
|
42 |
|
>>> var.magic_number |
|
42 |
|
>>> invalidate_cached_property(var, "magic_number") |
|
>>> var.magic_number |
|
recalculating... |
|
42 |
|
|
|
You must pass the name of the cached property as the second argument. |
|
|
|
.. deprecated:: 2.0 |
|
Will be removed in Werkzeug 2.1. Use ``del obj.name`` instead. |
|
""" |
|
warnings.warn( |
|
"'invalidate_cached_property' is deprecated and will be removed" |
|
" in Werkzeug 2.1. Use 'del obj.name' instead.", |
|
DeprecationWarning, |
|
stacklevel=2, |
|
) |
|
delattr(obj, name) |
|
|
|
|
|
class environ_property(_DictAccessorProperty[_TAccessorValue]): |
|
"""Maps request attributes to environment variables. This works not only |
|
for the Werkzeug request object, but also any other class with an |
|
environ attribute: |
|
|
|
>>> class Test(object): |
|
... environ = {'key': 'value'} |
|
... test = environ_property('key') |
|
>>> var = Test() |
|
>>> var.test |
|
'value' |
|
|
|
If you pass it a second value it's used as default if the key does not |
|
exist, the third one can be a converter that takes a value and converts |
|
it. If it raises :exc:`ValueError` or :exc:`TypeError` the default value |
|
is used. If no default value is provided `None` is used. |
|
|
|
Per default the property is read only. You have to explicitly enable it |
|
by passing ``read_only=False`` to the constructor. |
|
""" |
|
|
|
read_only = True |
|
|
|
def lookup(self, obj: "Request") -> "WSGIEnvironment": |
|
return obj.environ |
|
|
|
|
|
class header_property(_DictAccessorProperty[_TAccessorValue]): |
|
"""Like `environ_property` but for headers.""" |
|
|
|
def lookup(self, obj: t.Union["Request", "Response"]) -> Headers: |
|
return obj.headers |
|
|
|
|
|
class HTMLBuilder: |
|
"""Helper object for HTML generation. |
|
|
|
Per default there are two instances of that class. The `html` one, and |
|
the `xhtml` one for those two dialects. The class uses keyword parameters |
|
and positional parameters to generate small snippets of HTML. |
|
|
|
Keyword parameters are converted to XML/SGML attributes, positional |
|
arguments are used as children. Because Python accepts positional |
|
arguments before keyword arguments it's a good idea to use a list with the |
|
star-syntax for some children: |
|
|
|
>>> html.p(class_='foo', *[html.a('foo', href='foo.html'), ' ', |
|
... html.a('bar', href='bar.html')]) |
|
'<p class="foo"><a href="foo.html">foo</a> <a href="bar.html">bar</a></p>' |
|
|
|
This class works around some browser limitations and can not be used for |
|
arbitrary SGML/XML generation. For that purpose lxml and similar |
|
libraries exist. |
|
|
|
Calling the builder escapes the string passed: |
|
|
|
>>> html.p(html("<foo>")) |
|
'<p><foo></p>' |
|
|
|
.. deprecated:: 2.0 |
|
Will be removed in Werkzeug 2.1. |
|
""" |
|
|
|
_entity_re = re.compile(r"&([^;]+);") |
|
_entities = name2codepoint.copy() |
|
_entities["apos"] = 39 |
|
_empty_elements = { |
|
"area", |
|
"base", |
|
"basefont", |
|
"br", |
|
"col", |
|
"command", |
|
"embed", |
|
"frame", |
|
"hr", |
|
"img", |
|
"input", |
|
"keygen", |
|
"isindex", |
|
"link", |
|
"meta", |
|
"param", |
|
"source", |
|
"wbr", |
|
} |
|
_boolean_attributes = { |
|
"selected", |
|
"checked", |
|
"compact", |
|
"declare", |
|
"defer", |
|
"disabled", |
|
"ismap", |
|
"multiple", |
|
"nohref", |
|
"noresize", |
|
"noshade", |
|
"nowrap", |
|
} |
|
_plaintext_elements = {"textarea"} |
|
_c_like_cdata = {"script", "style"} |
|
|
|
def __init__(self, dialect): # type: ignore |
|
self._dialect = dialect |
|
|
|
def __call__(self, s): # type: ignore |
|
import html |
|
|
|
warnings.warn( |
|
"'utils.HTMLBuilder' is deprecated and will be removed in Werkzeug 2.1.", |
|
DeprecationWarning, |
|
stacklevel=2, |
|
) |
|
return html.escape(s) |
|
|
|
def __getattr__(self, tag): # type: ignore |
|
import html |
|
|
|
warnings.warn( |
|
"'utils.HTMLBuilder' is deprecated and will be removed in Werkzeug 2.1.", |
|
DeprecationWarning, |
|
stacklevel=2, |
|
) |
|
if tag[:2] == "__": |
|
raise AttributeError(tag) |
|
|
|
def proxy(*children, **arguments): # type: ignore |
|
buffer = f"<{tag}" |
|
for key, value in arguments.items(): |
|
if value is None: |
|
continue |
|
if key[-1] == "_": |
|
key = key[:-1] |
|
if key in self._boolean_attributes: |
|
if not value: |
|
continue |
|
if self._dialect == "xhtml": |
|
value = f'="{key}"' |
|
else: |
|
value = "" |
|
else: |
|
value = f'="{html.escape(value)}"' |
|
buffer += f" {key}{value}" |
|
if not children and tag in self._empty_elements: |
|
if self._dialect == "xhtml": |
|
buffer += " />" |
|
else: |
|
buffer += ">" |
|
return buffer |
|
buffer += ">" |
|
|
|
children_as_string = "".join([str(x) for x in children if x is not None]) |
|
|
|
if children_as_string: |
|
if tag in self._plaintext_elements: |
|
children_as_string = html.escape(children_as_string) |
|
elif tag in self._c_like_cdata and self._dialect == "xhtml": |
|
children_as_string = f"/*<![CDATA[*/{children_as_string}/*]]>*/" |
|
buffer += children_as_string + f"</{tag}>" |
|
return buffer |
|
|
|
return proxy |
|
|
|
def __repr__(self) -> str: |
|
return f"<{type(self).__name__} for {self._dialect!r}>" |
|
|
|
|
|
html = HTMLBuilder("html") |
|
xhtml = HTMLBuilder("xhtml") |
|
|
|
# https://cgit.freedesktop.org/xdg/shared-mime-info/tree/freedesktop.org.xml.in |
|
# https://www.iana.org/assignments/media-types/media-types.xhtml |
|
# Types listed in the XDG mime info that have a charset in the IANA registration. |
|
_charset_mimetypes = { |
|
"application/ecmascript", |
|
"application/javascript", |
|
"application/sql", |
|
"application/xml", |
|
"application/xml-dtd", |
|
"application/xml-external-parsed-entity", |
|
} |
|
|
|
|
|
def get_content_type(mimetype: str, charset: str) -> str: |
|
"""Returns the full content type string with charset for a mimetype. |
|
|
|
If the mimetype represents text, the charset parameter will be |
|
appended, otherwise the mimetype is returned unchanged. |
|
|
|
:param mimetype: The mimetype to be used as content type. |
|
:param charset: The charset to be appended for text mimetypes. |
|
:return: The content type. |
|
|
|
.. versionchanged:: 0.15 |
|
Any type that ends with ``+xml`` gets a charset, not just those |
|
that start with ``application/``. Known text types such as |
|
``application/javascript`` are also given charsets. |
|
""" |
|
if ( |
|
mimetype.startswith("text/") |
|
or mimetype in _charset_mimetypes |
|
or mimetype.endswith("+xml") |
|
): |
|
mimetype += f"; charset={charset}" |
|
|
|
return mimetype |
|
|
|
|
|
def detect_utf_encoding(data: bytes) -> str: |
|
"""Detect which UTF encoding was used to encode the given bytes. |
|
|
|
The latest JSON standard (:rfc:`8259`) suggests that only UTF-8 is |
|
accepted. Older documents allowed 8, 16, or 32. 16 and 32 can be big |
|
or little endian. Some editors or libraries may prepend a BOM. |
|
|
|
:internal: |
|
|
|
:param data: Bytes in unknown UTF encoding. |
|
:return: UTF encoding name |
|
|
|
.. deprecated:: 2.0 |
|
Will be removed in Werkzeug 2.1. This is built in to |
|
:func:`json.loads`. |
|
|
|
.. versionadded:: 0.15 |
|
""" |
|
warnings.warn( |
|
"'detect_utf_encoding' is deprecated and will be removed in" |
|
" Werkzeug 2.1. This is built in to 'json.loads'.", |
|
DeprecationWarning, |
|
stacklevel=2, |
|
) |
|
head = data[:4] |
|
|
|
if head[:3] == codecs.BOM_UTF8: |
|
return "utf-8-sig" |
|
|
|
if b"\x00" not in head: |
|
return "utf-8" |
|
|
|
if head in (codecs.BOM_UTF32_BE, codecs.BOM_UTF32_LE): |
|
return "utf-32" |
|
|
|
if head[:2] in (codecs.BOM_UTF16_BE, codecs.BOM_UTF16_LE): |
|
return "utf-16" |
|
|
|
if len(head) == 4: |
|
if head[:3] == b"\x00\x00\x00": |
|
return "utf-32-be" |
|
|
|
if head[::2] == b"\x00\x00": |
|
return "utf-16-be" |
|
|
|
if head[1:] == b"\x00\x00\x00": |
|
return "utf-32-le" |
|
|
|
if head[1::2] == b"\x00\x00": |
|
return "utf-16-le" |
|
|
|
if len(head) == 2: |
|
return "utf-16-be" if head.startswith(b"\x00") else "utf-16-le" |
|
|
|
return "utf-8" |
|
|
|
|
|
def format_string(string: str, context: t.Mapping[str, t.Any]) -> str: |
|
"""String-template format a string: |
|
|
|
>>> format_string('$foo and ${foo}s', dict(foo=42)) |
|
'42 and 42s' |
|
|
|
This does not do any attribute lookup. |
|
|
|
:param string: the format string. |
|
:param context: a dict with the variables to insert. |
|
|
|
.. deprecated:: 2.0 |
|
Will be removed in Werkzeug 2.1. Use :class:`string.Template` |
|
instead. |
|
""" |
|
from string import Template |
|
|
|
warnings.warn( |
|
"'utils.format_string' is deprecated and will be removed in" |
|
" Werkzeug 2.1. Use 'string.Template' instead.", |
|
DeprecationWarning, |
|
stacklevel=2, |
|
) |
|
return Template(string).substitute(context) |
|
|
|
|
|
def secure_filename(filename: str) -> str: |
|
r"""Pass it a filename and it will return a secure version of it. This |
|
filename can then safely be stored on a regular file system and passed |
|
to :func:`os.path.join`. The filename returned is an ASCII only string |
|
for maximum portability. |
|
|
|
On windows systems the function also makes sure that the file is not |
|
named after one of the special device files. |
|
|
|
>>> secure_filename("My cool movie.mov") |
|
'My_cool_movie.mov' |
|
>>> secure_filename("../../../etc/passwd") |
|
'etc_passwd' |
|
>>> secure_filename('i contain cool \xfcml\xe4uts.txt') |
|
'i_contain_cool_umlauts.txt' |
|
|
|
The function might return an empty filename. It's your responsibility |
|
to ensure that the filename is unique and that you abort or |
|
generate a random filename if the function returned an empty one. |
|
|
|
.. versionadded:: 0.5 |
|
|
|
:param filename: the filename to secure |
|
""" |
|
filename = unicodedata.normalize("NFKD", filename) |
|
filename = filename.encode("ascii", "ignore").decode("ascii") |
|
|
|
for sep in os.path.sep, os.path.altsep: |
|
if sep: |
|
filename = filename.replace(sep, " ") |
|
filename = str(_filename_ascii_strip_re.sub("", "_".join(filename.split()))).strip( |
|
"._" |
|
) |
|
|
|
# on nt a couple of special files are present in each folder. We |
|
# have to ensure that the target file is not such a filename. In |
|
# this case we prepend an underline |
|
if ( |
|
os.name == "nt" |
|
and filename |
|
and filename.split(".")[0].upper() in _windows_device_files |
|
): |
|
filename = f"_{filename}" |
|
|
|
return filename |
|
|
|
|
|
def escape(s: t.Any) -> str: |
|
"""Replace ``&``, ``<``, ``>``, ``"``, and ``'`` with HTML-safe |
|
sequences. |
|
|
|
``None`` is escaped to an empty string. |
|
|
|
.. deprecated:: 2.0 |
|
Will be removed in Werkzeug 2.1. Use MarkupSafe instead. |
|
""" |
|
import html |
|
|
|
warnings.warn( |
|
"'utils.escape' is deprecated and will be removed in Werkzeug" |
|
" 2.1. Use MarkupSafe instead.", |
|
DeprecationWarning, |
|
stacklevel=2, |
|
) |
|
|
|
if s is None: |
|
return "" |
|
|
|
if hasattr(s, "__html__"): |
|
return s.__html__() # type: ignore |
|
|
|
if not isinstance(s, str): |
|
s = str(s) |
|
|
|
return html.escape(s, quote=True) # type: ignore |
|
|
|
|
|
def unescape(s: str) -> str: |
|
"""The reverse of :func:`escape`. This unescapes all the HTML |
|
entities, not only those inserted by ``escape``. |
|
|
|
.. deprecated:: 2.0 |
|
Will be removed in Werkzeug 2.1. Use MarkupSafe instead. |
|
""" |
|
import html |
|
|
|
warnings.warn( |
|
"'utils.unescape' is deprecated and will be removed in Werkzueg" |
|
" 2.1. Use MarkupSafe instead.", |
|
DeprecationWarning, |
|
stacklevel=2, |
|
) |
|
return html.unescape(s) |
|
|
|
|
|
def redirect( |
|
location: str, code: int = 302, Response: t.Optional[t.Type["Response"]] = None |
|
) -> "Response": |
|
"""Returns a response object (a WSGI application) that, if called, |
|
redirects the client to the target location. Supported codes are |
|
301, 302, 303, 305, 307, and 308. 300 is not supported because |
|
it's not a real redirect and 304 because it's the answer for a |
|
request with a request with defined If-Modified-Since headers. |
|
|
|
.. versionadded:: 0.6 |
|
The location can now be a unicode string that is encoded using |
|
the :func:`iri_to_uri` function. |
|
|
|
.. versionadded:: 0.10 |
|
The class used for the Response object can now be passed in. |
|
|
|
:param location: the location the response should redirect to. |
|
:param code: the redirect status code. defaults to 302. |
|
:param class Response: a Response class to use when instantiating a |
|
response. The default is :class:`werkzeug.wrappers.Response` if |
|
unspecified. |
|
""" |
|
import html |
|
|
|
if Response is None: |
|
from .wrappers import Response # type: ignore |
|
|
|
display_location = html.escape(location) |
|
if isinstance(location, str): |
|
# Safe conversion is necessary here as we might redirect |
|
# to a broken URI scheme (for instance itms-services). |
|
from .urls import iri_to_uri |
|
|
|
location = iri_to_uri(location, safe_conversion=True) |
|
response = Response( # type: ignore |
|
'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">\n' |
|
"<title>Redirecting...</title>\n" |
|
"<h1>Redirecting...</h1>\n" |
|
"<p>You should be redirected automatically to target URL: " |
|
f'<a href="{html.escape(location)}">{display_location}</a>. If' |
|
" not click the link.", |
|
code, |
|
mimetype="text/html", |
|
) |
|
response.headers["Location"] = location |
|
return response |
|
|
|
|
|
def append_slash_redirect(environ: "WSGIEnvironment", code: int = 301) -> "Response": |
|
"""Redirects to the same URL but with a slash appended. The behavior |
|
of this function is undefined if the path ends with a slash already. |
|
|
|
:param environ: the WSGI environment for the request that triggers |
|
the redirect. |
|
:param code: the status code for the redirect. |
|
""" |
|
new_path = environ["PATH_INFO"].strip("/") + "/" |
|
query_string = environ.get("QUERY_STRING") |
|
if query_string: |
|
new_path += f"?{query_string}" |
|
return redirect(new_path, code) |
|
|
|
|
|
def send_file( |
|
path_or_file: t.Union[os.PathLike, str, t.IO[bytes]], |
|
environ: "WSGIEnvironment", |
|
mimetype: t.Optional[str] = None, |
|
as_attachment: bool = False, |
|
download_name: t.Optional[str] = None, |
|
conditional: bool = True, |
|
etag: t.Union[bool, str] = True, |
|
last_modified: t.Optional[t.Union[datetime, int, float]] = None, |
|
max_age: t.Optional[ |
|
t.Union[int, t.Callable[[t.Optional[str]], t.Optional[int]]] |
|
] = None, |
|
use_x_sendfile: bool = False, |
|
response_class: t.Optional[t.Type["Response"]] = None, |
|
_root_path: t.Optional[t.Union[os.PathLike, str]] = None, |
|
) -> "Response": |
|
"""Send the contents of a file to the client. |
|
|
|
The first argument can be a file path or a file-like object. Paths |
|
are preferred in most cases because Werkzeug can manage the file and |
|
get extra information from the path. Passing a file-like object |
|
requires that the file is opened in binary mode, and is mostly |
|
useful when building a file in memory with :class:`io.BytesIO`. |
|
|
|
Never pass file paths provided by a user. The path is assumed to be |
|
trusted, so a user could craft a path to access a file you didn't |
|
intend. |
|
|
|
If the WSGI server sets a ``file_wrapper`` in ``environ``, it is |
|
used, otherwise Werkzeug's built-in wrapper is used. Alternatively, |
|
if the HTTP server supports ``X-Sendfile``, ``use_x_sendfile=True`` |
|
will tell the server to send the given path, which is much more |
|
efficient than reading it in Python. |
|
|
|
:param path_or_file: The path to the file to send, relative to the |
|
current working directory if a relative path is given. |
|
Alternatively, a file-like object opened in binary mode. Make |
|
sure the file pointer is seeked to the start of the data. |
|
:param environ: The WSGI environ for the current request. |
|
:param mimetype: The MIME type to send for the file. If not |
|
provided, it will try to detect it from the file name. |
|
:param as_attachment: Indicate to a browser that it should offer to |
|
save the file instead of displaying it. |
|
:param download_name: The default name browsers will use when saving |
|
the file. Defaults to the passed file name. |
|
:param conditional: Enable conditional and range responses based on |
|
request headers. Requires passing a file path and ``environ``. |
|
:param etag: Calculate an ETag for the file, which requires passing |
|
a file path. Can also be a string to use instead. |
|
:param last_modified: The last modified time to send for the file, |
|
in seconds. If not provided, it will try to detect it from the |
|
file path. |
|
:param max_age: How long the client should cache the file, in |
|
seconds. If set, ``Cache-Control`` will be ``public``, otherwise |
|
it will be ``no-cache`` to prefer conditional caching. |
|
:param use_x_sendfile: Set the ``X-Sendfile`` header to let the |
|
server to efficiently send the file. Requires support from the |
|
HTTP server. Requires passing a file path. |
|
:param response_class: Build the response using this class. Defaults |
|
to :class:`~werkzeug.wrappers.Response`. |
|
:param _root_path: Do not use. For internal use only. Use |
|
:func:`send_from_directory` to safely send files under a path. |
|
|
|
.. versionchanged:: 2.0.2 |
|
``send_file`` only sets a detected ``Content-Encoding`` if |
|
``as_attachment`` is disabled. |
|
|
|
.. versionadded:: 2.0 |
|
Adapted from Flask's implementation. |
|
|
|
.. versionchanged:: 2.0 |
|
``download_name`` replaces Flask's ``attachment_filename`` |
|
parameter. If ``as_attachment=False``, it is passed with |
|
``Content-Disposition: inline`` instead. |
|
|
|
.. versionchanged:: 2.0 |
|
``max_age`` replaces Flask's ``cache_timeout`` parameter. |
|
``conditional`` is enabled and ``max_age`` is not set by |
|
default. |
|
|
|
.. versionchanged:: 2.0 |
|
``etag`` replaces Flask's ``add_etags`` parameter. It can be a |
|
string to use instead of generating one. |
|
|
|
.. versionchanged:: 2.0 |
|
If an encoding is returned when guessing ``mimetype`` from |
|
``download_name``, set the ``Content-Encoding`` header. |
|
""" |
|
if response_class is None: |
|
from .wrappers import Response |
|
|
|
response_class = Response |
|
|
|
path: t.Optional[str] = None |
|
file: t.Optional[t.IO[bytes]] = None |
|
size: t.Optional[int] = None |
|
mtime: t.Optional[float] = None |
|
headers = Headers() |
|
|
|
if isinstance(path_or_file, (os.PathLike, str)) or hasattr( |
|
path_or_file, "__fspath__" |
|
): |
|
path_or_file = t.cast(t.Union[os.PathLike, str], path_or_file) |
|
|
|
# Flask will pass app.root_path, allowing its send_file wrapper |
|
# to not have to deal with paths. |
|
if _root_path is not None: |
|
path = os.path.join(_root_path, path_or_file) |
|
else: |
|
path = os.path.abspath(path_or_file) |
|
|
|
stat = os.stat(path) |
|
size = stat.st_size |
|
mtime = stat.st_mtime |
|
else: |
|
file = path_or_file |
|
|
|
if download_name is None and path is not None: |
|
download_name = os.path.basename(path) |
|
|
|
if mimetype is None: |
|
if download_name is None: |
|
raise TypeError( |
|
"Unable to detect the MIME type because a file name is" |
|
" not available. Either set 'download_name', pass a" |
|
" path instead of a file, or set 'mimetype'." |
|
) |
|
|
|
mimetype, encoding = mimetypes.guess_type(download_name) |
|
|
|
if mimetype is None: |
|
mimetype = "application/octet-stream" |
|
|
|
# Don't send encoding for attachments, it causes browsers to |
|
# save decompress tar.gz files. |
|
if encoding is not None and not as_attachment: |
|
headers.set("Content-Encoding", encoding) |
|
|
|
if download_name is not None: |
|
try: |
|
download_name.encode("ascii") |
|
except UnicodeEncodeError: |
|
simple = unicodedata.normalize("NFKD", download_name) |
|
simple = simple.encode("ascii", "ignore").decode("ascii") |
|
quoted = url_quote(download_name, safe="") |
|
names = {"filename": simple, "filename*": f"UTF-8''{quoted}"} |
|
else: |
|
names = {"filename": download_name} |
|
|
|
value = "attachment" if as_attachment else "inline" |
|
headers.set("Content-Disposition", value, **names) |
|
elif as_attachment: |
|
raise TypeError( |
|
"No name provided for attachment. Either set" |
|
" 'download_name' or pass a path instead of a file." |
|
) |
|
|
|
if use_x_sendfile and path is not None: |
|
headers["X-Sendfile"] = path |
|
data = None |
|
else: |
|
if file is None: |
|
file = open(path, "rb") # type: ignore |
|
elif isinstance(file, io.BytesIO): |
|
size = file.getbuffer().nbytes |
|
elif isinstance(file, io.TextIOBase): |
|
raise ValueError("Files must be opened in binary mode or use BytesIO.") |
|
|
|
data = wrap_file(environ, file) |
|
|
|
rv = response_class( |
|
data, mimetype=mimetype, headers=headers, direct_passthrough=True |
|
) |
|
|
|
if size is not None: |
|
rv.content_length = size |
|
|
|
if last_modified is not None: |
|
rv.last_modified = last_modified # type: ignore |
|
elif mtime is not None: |
|
rv.last_modified = mtime # type: ignore |
|
|
|
rv.cache_control.no_cache = True |
|
|
|
# Flask will pass app.get_send_file_max_age, allowing its send_file |
|
# wrapper to not have to deal with paths. |
|
if callable(max_age): |
|
max_age = max_age(path) |
|
|
|
if max_age is not None: |
|
if max_age > 0: |
|
rv.cache_control.no_cache = None |
|
rv.cache_control.public = True |
|
|
|
rv.cache_control.max_age = max_age |
|
rv.expires = int(time() + max_age) # type: ignore |
|
|
|
if isinstance(etag, str): |
|
rv.set_etag(etag) |
|
elif etag and path is not None: |
|
check = adler32(path.encode("utf-8")) & 0xFFFFFFFF |
|
rv.set_etag(f"{mtime}-{size}-{check}") |
|
|
|
if conditional: |
|
try: |
|
rv = rv.make_conditional(environ, accept_ranges=True, complete_length=size) |
|
except RequestedRangeNotSatisfiable: |
|
if file is not None: |
|
file.close() |
|
|
|
raise |
|
|
|
# Some x-sendfile implementations incorrectly ignore the 304 |
|
# status code and send the file anyway. |
|
if rv.status_code == 304: |
|
rv.headers.pop("x-sendfile", None) |
|
|
|
return rv |
|
|
|
|
|
def send_from_directory( |
|
directory: t.Union[os.PathLike, str], |
|
path: t.Union[os.PathLike, str], |
|
environ: "WSGIEnvironment", |
|
**kwargs: t.Any, |
|
) -> "Response": |
|
"""Send a file from within a directory using :func:`send_file`. |
|
|
|
This is a secure way to serve files from a folder, such as static |
|
files or uploads. Uses :func:`~werkzeug.security.safe_join` to |
|
ensure the path coming from the client is not maliciously crafted to |
|
point outside the specified directory. |
|
|
|
If the final path does not point to an existing regular file, |
|
returns a 404 :exc:`~werkzeug.exceptions.NotFound` error. |
|
|
|
:param directory: The directory that ``path`` must be located under. |
|
:param path: The path to the file to send, relative to |
|
``directory``. |
|
:param environ: The WSGI environ for the current request. |
|
:param kwargs: Arguments to pass to :func:`send_file`. |
|
|
|
.. versionadded:: 2.0 |
|
Adapted from Flask's implementation. |
|
""" |
|
path = safe_join(os.fspath(directory), os.fspath(path)) |
|
|
|
if path is None: |
|
raise NotFound() |
|
|
|
# Flask will pass app.root_path, allowing its send_from_directory |
|
# wrapper to not have to deal with paths. |
|
if "_root_path" in kwargs: |
|
path = os.path.join(kwargs["_root_path"], path) |
|
|
|
try: |
|
if not os.path.isfile(path): |
|
raise NotFound() |
|
except ValueError: |
|
# path contains null byte on Python < 3.8 |
|
raise NotFound() from None |
|
|
|
return send_file(path, environ, **kwargs) |
|
|
|
|
|
def import_string(import_name: str, silent: bool = False) -> t.Any: |
|
"""Imports an object based on a string. This is useful if you want to |
|
use import paths as endpoints or something similar. An import path can |
|
be specified either in dotted notation (``xml.sax.saxutils.escape``) |
|
or with a colon as object delimiter (``xml.sax.saxutils:escape``). |
|
|
|
If `silent` is True the return value will be `None` if the import fails. |
|
|
|
:param import_name: the dotted name for the object to import. |
|
:param silent: if set to `True` import errors are ignored and |
|
`None` is returned instead. |
|
:return: imported object |
|
""" |
|
import_name = import_name.replace(":", ".") |
|
try: |
|
try: |
|
__import__(import_name) |
|
except ImportError: |
|
if "." not in import_name: |
|
raise |
|
else: |
|
return sys.modules[import_name] |
|
|
|
module_name, obj_name = import_name.rsplit(".", 1) |
|
module = __import__(module_name, globals(), locals(), [obj_name]) |
|
try: |
|
return getattr(module, obj_name) |
|
except AttributeError as e: |
|
raise ImportError(e) from None |
|
|
|
except ImportError as e: |
|
if not silent: |
|
raise ImportStringError(import_name, e).with_traceback( |
|
sys.exc_info()[2] |
|
) from None |
|
|
|
return None |
|
|
|
|
|
def find_modules( |
|
import_path: str, include_packages: bool = False, recursive: bool = False |
|
) -> t.Iterator[str]: |
|
"""Finds all the modules below a package. This can be useful to |
|
automatically import all views / controllers so that their metaclasses / |
|
function decorators have a chance to register themselves on the |
|
application. |
|
|
|
Packages are not returned unless `include_packages` is `True`. This can |
|
also recursively list modules but in that case it will import all the |
|
packages to get the correct load path of that module. |
|
|
|
:param import_path: the dotted name for the package to find child modules. |
|
:param include_packages: set to `True` if packages should be returned, too. |
|
:param recursive: set to `True` if recursion should happen. |
|
:return: generator |
|
""" |
|
module = import_string(import_path) |
|
path = getattr(module, "__path__", None) |
|
if path is None: |
|
raise ValueError(f"{import_path!r} is not a package") |
|
basename = f"{module.__name__}." |
|
for _importer, modname, ispkg in pkgutil.iter_modules(path): |
|
modname = basename + modname |
|
if ispkg: |
|
if include_packages: |
|
yield modname |
|
if recursive: |
|
yield from find_modules(modname, include_packages, True) |
|
else: |
|
yield modname |
|
|
|
|
|
def validate_arguments(func, args, kwargs, drop_extra=True): # type: ignore |
|
"""Checks if the function accepts the arguments and keyword arguments. |
|
Returns a new ``(args, kwargs)`` tuple that can safely be passed to |
|
the function without causing a `TypeError` because the function signature |
|
is incompatible. If `drop_extra` is set to `True` (which is the default) |
|
any extra positional or keyword arguments are dropped automatically. |
|
|
|
The exception raised provides three attributes: |
|
|
|
`missing` |
|
A set of argument names that the function expected but where |
|
missing. |
|
|
|
`extra` |
|
A dict of keyword arguments that the function can not handle but |
|
where provided. |
|
|
|
`extra_positional` |
|
A list of values that where given by positional argument but the |
|
function cannot accept. |
|
|
|
This can be useful for decorators that forward user submitted data to |
|
a view function:: |
|
|
|
from werkzeug.utils import ArgumentValidationError, validate_arguments |
|
|
|
def sanitize(f): |
|
def proxy(request): |
|
data = request.values.to_dict() |
|
try: |
|
args, kwargs = validate_arguments(f, (request,), data) |
|
except ArgumentValidationError: |
|
raise BadRequest('The browser failed to transmit all ' |
|
'the data expected.') |
|
return f(*args, **kwargs) |
|
return proxy |
|
|
|
:param func: the function the validation is performed against. |
|
:param args: a tuple of positional arguments. |
|
:param kwargs: a dict of keyword arguments. |
|
:param drop_extra: set to `False` if you don't want extra arguments |
|
to be silently dropped. |
|
:return: tuple in the form ``(args, kwargs)``. |
|
|
|
.. deprecated:: 2.0 |
|
Will be removed in Werkzeug 2.1. Use :func:`inspect.signature` |
|
instead. |
|
""" |
|
warnings.warn( |
|
"'utils.validate_arguments' is deprecated and will be removed" |
|
" in Werkzeug 2.1. Use 'inspect.signature' instead.", |
|
DeprecationWarning, |
|
stacklevel=2, |
|
) |
|
parser = _parse_signature(func) |
|
args, kwargs, missing, extra, extra_positional = parser(args, kwargs)[:5] |
|
if missing: |
|
raise ArgumentValidationError(tuple(missing)) |
|
elif (extra or extra_positional) and not drop_extra: |
|
raise ArgumentValidationError(None, extra, extra_positional) |
|
return tuple(args), kwargs |
|
|
|
|
|
def bind_arguments(func, args, kwargs): # type: ignore |
|
"""Bind the arguments provided into a dict. When passed a function, |
|
a tuple of arguments and a dict of keyword arguments `bind_arguments` |
|
returns a dict of names as the function would see it. This can be useful |
|
to implement a cache decorator that uses the function arguments to build |
|
the cache key based on the values of the arguments. |
|
|
|
:param func: the function the arguments should be bound for. |
|
:param args: tuple of positional arguments. |
|
:param kwargs: a dict of keyword arguments. |
|
:return: a :class:`dict` of bound keyword arguments. |
|
|
|
.. deprecated:: 2.0 |
|
Will be removed in Werkzeug 2.1. Use :meth:`Signature.bind` |
|
instead. |
|
""" |
|
warnings.warn( |
|
"'utils.bind_arguments' is deprecated and will be removed in" |
|
" Werkzeug 2.1. Use 'Signature.bind' instead.", |
|
DeprecationWarning, |
|
stacklevel=2, |
|
) |
|
( |
|
args, |
|
kwargs, |
|
missing, |
|
extra, |
|
extra_positional, |
|
arg_spec, |
|
vararg_var, |
|
kwarg_var, |
|
) = _parse_signature(func)(args, kwargs) |
|
values = {} |
|
for (name, _has_default, _default), value in zip(arg_spec, args): |
|
values[name] = value |
|
if vararg_var is not None: |
|
values[vararg_var] = tuple(extra_positional) |
|
elif extra_positional: |
|
raise TypeError("too many positional arguments") |
|
if kwarg_var is not None: |
|
multikw = set(extra) & {x[0] for x in arg_spec} |
|
if multikw: |
|
raise TypeError( |
|
f"got multiple values for keyword argument {next(iter(multikw))!r}" |
|
) |
|
values[kwarg_var] = extra |
|
elif extra: |
|
raise TypeError(f"got unexpected keyword argument {next(iter(extra))!r}") |
|
return values |
|
|
|
|
|
class ArgumentValidationError(ValueError): |
|
"""Raised if :func:`validate_arguments` fails to validate |
|
|
|
.. deprecated:: 2.0 |
|
Will be removed in Werkzeug 2.1 along with ``utils.bind`` and |
|
``validate_arguments``. |
|
""" |
|
|
|
def __init__(self, missing=None, extra=None, extra_positional=None): # type: ignore |
|
self.missing = set(missing or ()) |
|
self.extra = extra or {} |
|
self.extra_positional = extra_positional or [] |
|
super().__init__( |
|
"function arguments invalid." |
|
f" ({len(self.missing)} missing," |
|
f" {len(self.extra) + len(self.extra_positional)} additional)" |
|
) |
|
|
|
|
|
class ImportStringError(ImportError): |
|
"""Provides information about a failed :func:`import_string` attempt.""" |
|
|
|
#: String in dotted notation that failed to be imported. |
|
import_name: str |
|
#: Wrapped exception. |
|
exception: BaseException |
|
|
|
def __init__(self, import_name: str, exception: BaseException) -> None: |
|
self.import_name = import_name |
|
self.exception = exception |
|
msg = import_name |
|
name = "" |
|
tracked = [] |
|
for part in import_name.replace(":", ".").split("."): |
|
name = f"{name}.{part}" if name else part |
|
imported = import_string(name, silent=True) |
|
if imported: |
|
tracked.append((name, getattr(imported, "__file__", None))) |
|
else: |
|
track = [f"- {n!r} found in {i!r}." for n, i in tracked] |
|
track.append(f"- {name!r} not found.") |
|
track_str = "\n".join(track) |
|
msg = ( |
|
f"import_string() failed for {import_name!r}. Possible reasons" |
|
f" are:\n\n" |
|
"- missing __init__.py in a package;\n" |
|
"- package or module path not included in sys.path;\n" |
|
"- duplicated package or module name taking precedence in" |
|
" sys.path;\n" |
|
"- missing module, class, function or variable;\n\n" |
|
f"Debugged import:\n\n{track_str}\n\n" |
|
f"Original exception:\n\n{type(exception).__name__}: {exception}" |
|
) |
|
break |
|
|
|
super().__init__(msg) |
|
|
|
def __repr__(self) -> str: |
|
return f"<{type(self).__name__}({self.import_name!r}, {self.exception!r})>"
|
|
|