Made an application for supporting sustainable local businesses in San Pancho.
Never really got completed, but it has some useful Svelte components for maps that we can reuse.
http://greenspots.dctrl.space
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
364 lines
12 KiB
364 lines
12 KiB
"""The optional bytecode cache system. This is useful if you have very |
|
complex template situations and the compilation of all those templates |
|
slows down your application too much. |
|
|
|
Situations where this is useful are often forking web applications that |
|
are initialized on the first request. |
|
""" |
|
import errno |
|
import fnmatch |
|
import marshal |
|
import os |
|
import pickle |
|
import stat |
|
import sys |
|
import tempfile |
|
import typing as t |
|
from hashlib import sha1 |
|
from io import BytesIO |
|
from types import CodeType |
|
|
|
if t.TYPE_CHECKING: |
|
import typing_extensions as te |
|
from .environment import Environment |
|
|
|
class _MemcachedClient(te.Protocol): |
|
def get(self, key: str) -> bytes: |
|
... |
|
|
|
def set(self, key: str, value: bytes, timeout: t.Optional[int] = None) -> None: |
|
... |
|
|
|
|
|
bc_version = 5 |
|
# Magic bytes to identify Jinja bytecode cache files. Contains the |
|
# Python major and minor version to avoid loading incompatible bytecode |
|
# if a project upgrades its Python version. |
|
bc_magic = ( |
|
b"j2" |
|
+ pickle.dumps(bc_version, 2) |
|
+ pickle.dumps((sys.version_info[0] << 24) | sys.version_info[1], 2) |
|
) |
|
|
|
|
|
class Bucket: |
|
"""Buckets are used to store the bytecode for one template. It's created |
|
and initialized by the bytecode cache and passed to the loading functions. |
|
|
|
The buckets get an internal checksum from the cache assigned and use this |
|
to automatically reject outdated cache material. Individual bytecode |
|
cache subclasses don't have to care about cache invalidation. |
|
""" |
|
|
|
def __init__(self, environment: "Environment", key: str, checksum: str) -> None: |
|
self.environment = environment |
|
self.key = key |
|
self.checksum = checksum |
|
self.reset() |
|
|
|
def reset(self) -> None: |
|
"""Resets the bucket (unloads the bytecode).""" |
|
self.code: t.Optional[CodeType] = None |
|
|
|
def load_bytecode(self, f: t.BinaryIO) -> None: |
|
"""Loads bytecode from a file or file like object.""" |
|
# make sure the magic header is correct |
|
magic = f.read(len(bc_magic)) |
|
if magic != bc_magic: |
|
self.reset() |
|
return |
|
# the source code of the file changed, we need to reload |
|
checksum = pickle.load(f) |
|
if self.checksum != checksum: |
|
self.reset() |
|
return |
|
# if marshal_load fails then we need to reload |
|
try: |
|
self.code = marshal.load(f) |
|
except (EOFError, ValueError, TypeError): |
|
self.reset() |
|
return |
|
|
|
def write_bytecode(self, f: t.BinaryIO) -> None: |
|
"""Dump the bytecode into the file or file like object passed.""" |
|
if self.code is None: |
|
raise TypeError("can't write empty bucket") |
|
f.write(bc_magic) |
|
pickle.dump(self.checksum, f, 2) |
|
marshal.dump(self.code, f) |
|
|
|
def bytecode_from_string(self, string: bytes) -> None: |
|
"""Load bytecode from bytes.""" |
|
self.load_bytecode(BytesIO(string)) |
|
|
|
def bytecode_to_string(self) -> bytes: |
|
"""Return the bytecode as bytes.""" |
|
out = BytesIO() |
|
self.write_bytecode(out) |
|
return out.getvalue() |
|
|
|
|
|
class BytecodeCache: |
|
"""To implement your own bytecode cache you have to subclass this class |
|
and override :meth:`load_bytecode` and :meth:`dump_bytecode`. Both of |
|
these methods are passed a :class:`~jinja2.bccache.Bucket`. |
|
|
|
A very basic bytecode cache that saves the bytecode on the file system:: |
|
|
|
from os import path |
|
|
|
class MyCache(BytecodeCache): |
|
|
|
def __init__(self, directory): |
|
self.directory = directory |
|
|
|
def load_bytecode(self, bucket): |
|
filename = path.join(self.directory, bucket.key) |
|
if path.exists(filename): |
|
with open(filename, 'rb') as f: |
|
bucket.load_bytecode(f) |
|
|
|
def dump_bytecode(self, bucket): |
|
filename = path.join(self.directory, bucket.key) |
|
with open(filename, 'wb') as f: |
|
bucket.write_bytecode(f) |
|
|
|
A more advanced version of a filesystem based bytecode cache is part of |
|
Jinja. |
|
""" |
|
|
|
def load_bytecode(self, bucket: Bucket) -> None: |
|
"""Subclasses have to override this method to load bytecode into a |
|
bucket. If they are not able to find code in the cache for the |
|
bucket, it must not do anything. |
|
""" |
|
raise NotImplementedError() |
|
|
|
def dump_bytecode(self, bucket: Bucket) -> None: |
|
"""Subclasses have to override this method to write the bytecode |
|
from a bucket back to the cache. If it unable to do so it must not |
|
fail silently but raise an exception. |
|
""" |
|
raise NotImplementedError() |
|
|
|
def clear(self) -> None: |
|
"""Clears the cache. This method is not used by Jinja but should be |
|
implemented to allow applications to clear the bytecode cache used |
|
by a particular environment. |
|
""" |
|
|
|
def get_cache_key( |
|
self, name: str, filename: t.Optional[t.Union[str]] = None |
|
) -> str: |
|
"""Returns the unique hash key for this template name.""" |
|
hash = sha1(name.encode("utf-8")) |
|
|
|
if filename is not None: |
|
hash.update(f"|{filename}".encode()) |
|
|
|
return hash.hexdigest() |
|
|
|
def get_source_checksum(self, source: str) -> str: |
|
"""Returns a checksum for the source.""" |
|
return sha1(source.encode("utf-8")).hexdigest() |
|
|
|
def get_bucket( |
|
self, |
|
environment: "Environment", |
|
name: str, |
|
filename: t.Optional[str], |
|
source: str, |
|
) -> Bucket: |
|
"""Return a cache bucket for the given template. All arguments are |
|
mandatory but filename may be `None`. |
|
""" |
|
key = self.get_cache_key(name, filename) |
|
checksum = self.get_source_checksum(source) |
|
bucket = Bucket(environment, key, checksum) |
|
self.load_bytecode(bucket) |
|
return bucket |
|
|
|
def set_bucket(self, bucket: Bucket) -> None: |
|
"""Put the bucket into the cache.""" |
|
self.dump_bytecode(bucket) |
|
|
|
|
|
class FileSystemBytecodeCache(BytecodeCache): |
|
"""A bytecode cache that stores bytecode on the filesystem. It accepts |
|
two arguments: The directory where the cache items are stored and a |
|
pattern string that is used to build the filename. |
|
|
|
If no directory is specified a default cache directory is selected. On |
|
Windows the user's temp directory is used, on UNIX systems a directory |
|
is created for the user in the system temp directory. |
|
|
|
The pattern can be used to have multiple separate caches operate on the |
|
same directory. The default pattern is ``'__jinja2_%s.cache'``. ``%s`` |
|
is replaced with the cache key. |
|
|
|
>>> bcc = FileSystemBytecodeCache('/tmp/jinja_cache', '%s.cache') |
|
|
|
This bytecode cache supports clearing of the cache using the clear method. |
|
""" |
|
|
|
def __init__( |
|
self, directory: t.Optional[str] = None, pattern: str = "__jinja2_%s.cache" |
|
) -> None: |
|
if directory is None: |
|
directory = self._get_default_cache_dir() |
|
self.directory = directory |
|
self.pattern = pattern |
|
|
|
def _get_default_cache_dir(self) -> str: |
|
def _unsafe_dir() -> "te.NoReturn": |
|
raise RuntimeError( |
|
"Cannot determine safe temp directory. You " |
|
"need to explicitly provide one." |
|
) |
|
|
|
tmpdir = tempfile.gettempdir() |
|
|
|
# On windows the temporary directory is used specific unless |
|
# explicitly forced otherwise. We can just use that. |
|
if os.name == "nt": |
|
return tmpdir |
|
if not hasattr(os, "getuid"): |
|
_unsafe_dir() |
|
|
|
dirname = f"_jinja2-cache-{os.getuid()}" |
|
actual_dir = os.path.join(tmpdir, dirname) |
|
|
|
try: |
|
os.mkdir(actual_dir, stat.S_IRWXU) |
|
except OSError as e: |
|
if e.errno != errno.EEXIST: |
|
raise |
|
try: |
|
os.chmod(actual_dir, stat.S_IRWXU) |
|
actual_dir_stat = os.lstat(actual_dir) |
|
if ( |
|
actual_dir_stat.st_uid != os.getuid() |
|
or not stat.S_ISDIR(actual_dir_stat.st_mode) |
|
or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU |
|
): |
|
_unsafe_dir() |
|
except OSError as e: |
|
if e.errno != errno.EEXIST: |
|
raise |
|
|
|
actual_dir_stat = os.lstat(actual_dir) |
|
if ( |
|
actual_dir_stat.st_uid != os.getuid() |
|
or not stat.S_ISDIR(actual_dir_stat.st_mode) |
|
or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU |
|
): |
|
_unsafe_dir() |
|
|
|
return actual_dir |
|
|
|
def _get_cache_filename(self, bucket: Bucket) -> str: |
|
return os.path.join(self.directory, self.pattern % (bucket.key,)) |
|
|
|
def load_bytecode(self, bucket: Bucket) -> None: |
|
filename = self._get_cache_filename(bucket) |
|
|
|
if os.path.exists(filename): |
|
with open(filename, "rb") as f: |
|
bucket.load_bytecode(f) |
|
|
|
def dump_bytecode(self, bucket: Bucket) -> None: |
|
with open(self._get_cache_filename(bucket), "wb") as f: |
|
bucket.write_bytecode(f) |
|
|
|
def clear(self) -> None: |
|
# imported lazily here because google app-engine doesn't support |
|
# write access on the file system and the function does not exist |
|
# normally. |
|
from os import remove |
|
|
|
files = fnmatch.filter(os.listdir(self.directory), self.pattern % ("*",)) |
|
for filename in files: |
|
try: |
|
remove(os.path.join(self.directory, filename)) |
|
except OSError: |
|
pass |
|
|
|
|
|
class MemcachedBytecodeCache(BytecodeCache): |
|
"""This class implements a bytecode cache that uses a memcache cache for |
|
storing the information. It does not enforce a specific memcache library |
|
(tummy's memcache or cmemcache) but will accept any class that provides |
|
the minimal interface required. |
|
|
|
Libraries compatible with this class: |
|
|
|
- `cachelib <https://github.com/pallets/cachelib>`_ |
|
- `python-memcached <https://pypi.org/project/python-memcached/>`_ |
|
|
|
(Unfortunately the django cache interface is not compatible because it |
|
does not support storing binary data, only text. You can however pass |
|
the underlying cache client to the bytecode cache which is available |
|
as `django.core.cache.cache._client`.) |
|
|
|
The minimal interface for the client passed to the constructor is this: |
|
|
|
.. class:: MinimalClientInterface |
|
|
|
.. method:: set(key, value[, timeout]) |
|
|
|
Stores the bytecode in the cache. `value` is a string and |
|
`timeout` the timeout of the key. If timeout is not provided |
|
a default timeout or no timeout should be assumed, if it's |
|
provided it's an integer with the number of seconds the cache |
|
item should exist. |
|
|
|
.. method:: get(key) |
|
|
|
Returns the value for the cache key. If the item does not |
|
exist in the cache the return value must be `None`. |
|
|
|
The other arguments to the constructor are the prefix for all keys that |
|
is added before the actual cache key and the timeout for the bytecode in |
|
the cache system. We recommend a high (or no) timeout. |
|
|
|
This bytecode cache does not support clearing of used items in the cache. |
|
The clear method is a no-operation function. |
|
|
|
.. versionadded:: 2.7 |
|
Added support for ignoring memcache errors through the |
|
`ignore_memcache_errors` parameter. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
client: "_MemcachedClient", |
|
prefix: str = "jinja2/bytecode/", |
|
timeout: t.Optional[int] = None, |
|
ignore_memcache_errors: bool = True, |
|
): |
|
self.client = client |
|
self.prefix = prefix |
|
self.timeout = timeout |
|
self.ignore_memcache_errors = ignore_memcache_errors |
|
|
|
def load_bytecode(self, bucket: Bucket) -> None: |
|
try: |
|
code = self.client.get(self.prefix + bucket.key) |
|
except Exception: |
|
if not self.ignore_memcache_errors: |
|
raise |
|
else: |
|
bucket.bytecode_from_string(code) |
|
|
|
def dump_bytecode(self, bucket: Bucket) -> None: |
|
key = self.prefix + bucket.key |
|
value = bucket.bytecode_to_string() |
|
|
|
try: |
|
if self.timeout is not None: |
|
self.client.set(key, value, self.timeout) |
|
else: |
|
self.client.set(key, value) |
|
except Exception: |
|
if not self.ignore_memcache_errors: |
|
raise
|
|
|