Last active
March 17, 2024 07:04
-
-
Save HacKanCuBa/9fceabaeb6417ed6280c2e7a48981420 to your computer and use it in GitHub Desktop.
Cache handy helpers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from abc import ABC, abstractmethod | |
| from contextlib import asynccontextmanager | |
| from typing import AsyncGenerator, Sequence | |
| import redis.asyncio as redis | |
| class AsyncCacheBackend(ABC): | |
| @abstractmethod | |
| async def get(self, key: str | bytes) -> bytes: | |
| ... | |
| @abstractmethod | |
| async def set(self, key: str | bytes, value: bytes, ttl: int | None = None) -> None: | |
| ... | |
| @abstractmethod | |
| async def delete(self, key: str | bytes) -> None: | |
| ... | |
| class RedisAsyncCache(AsyncCacheBackend): | |
| def __init__(self, conn: redis.Redis): | |
| self._redis = conn | |
| async def get(self, key: str | bytes) -> bytes | None: | |
| return await self._redis.get(key) | |
| async def set(self, key: str | bytes, value: bytes, ttl: int | None = None) -> None: | |
| await self._redis.set(key, value, ex=ttl) | |
| async def delete(self, key: str | bytes) -> None: | |
| await self._redis.delete(key) | |
| async def keys(self) -> tuple[bytes]: | |
| keys: list[bytes] = await self._redis.keys() | |
| return tuple(keys) | |
| class InMemoryAsyncCache(AsyncCacheBackend): | |
| def __init__(self): | |
| self._cache = {} | |
| @staticmethod | |
| def _build_key(key: str | bytes) -> bytes: | |
| if isinstance(key, str): | |
| return key.encode() | |
| return key | |
| async def get(self, key: str | bytes) -> bytes | None: | |
| return self._cache.get(self._build_key(key)) | |
| async def set(self, key: str | bytes, value: bytes, _: int | None = None) -> None: | |
| self._cache[self._build_key(key)] = value | |
| async def delete(self, key: str | bytes) -> None: | |
| try: | |
| del self._cache[self._build_key(key)] | |
| except KeyError: | |
| pass | |
| async def keys(self) -> tuple[bytes]: | |
| return tuple(self._cache.keys()) | |
| class AsyncCache: | |
| def __init__(self, backend: AsyncCacheBackend, /) -> None: | |
| self._cache = backend | |
| async def get(self, key: str | bytes) -> bytes: | |
| value = await self._cache.get(key) | |
| if value is None: | |
| raise KeyError(key) | |
| return value | |
| async def set(self, key: str | bytes, value: bytes, ttl: int | None = None) -> None: | |
| await self._cache.set(key, value, ttl) | |
| async def delete(self, key: str | bytes) -> None: | |
| await self._cache.delete(key) | |
| class AsyncCachePassthrough: | |
| def __init__(self, backends: Sequence[AsyncCacheBackend], /) -> None: | |
| self._caches = tuple(backends) | |
| async def get(self, key: str | bytes) -> bytes: | |
| value = None | |
| to_set = set() | |
| for cache in self._caches: | |
| value = await cache.get(key) | |
| if value is not None: | |
| break | |
| if value is None: | |
| raise KeyError(key) | |
| for cache in to_set: | |
| await cache.set(key, value) | |
| return value | |
| async def set(self, key: str | bytes, value: bytes) -> None: | |
| for cache in self._caches: | |
| await cache.set(key, value) | |
| async def delete(self, key: str | bytes) -> None: | |
| for cache in self._caches: | |
| await cache.delete(key) | |
| async def warmup(self) -> None: | |
| # This will assume that if two caches have the same key, they also have the same value | |
| keys: defaultdict[bytes, set[AsyncCacheBackend]] = defaultdict(set) | |
| for cache in self._caches: | |
| for key in await cache.keys(): | |
| keys[key].add(cache) | |
| for cache in self._caches: | |
| for key in keys: | |
| if cache in keys[key]: | |
| continue | |
| other = next(iter(keys[key])) | |
| value = await other.get(key) | |
| await cache.set(key, value) | |
| @asynccontextmanager | |
| async def async_redis_connection(host: str, *, port: int = 6379, db: int = 0, **kwargs: Any) -> AsyncGenerator[redis.Redis, None]: | |
| params = { | |
| "auto_close_connection_pool": True, | |
| } | |
| params.update(kwargs) | |
| conn = redis.Redis(host=host, port=port, db=db, **params) | |
| try: | |
| yield conn | |
| finally: | |
| await conn.close() | |
| @asynccontextmanager | |
| async def async_cache(host: str | None = None, **kwargs: Any) -> AsyncGenerator[AsyncCache, None]: | |
| if not host: | |
| cache = AsyncCache(InMemoryAsyncCache()) | |
| yield cache | |
| return | |
| async with async_redis_connection(host, **kwargs) as conn: | |
| cache = AsyncCache(RedisAsyncCache(conn)) | |
| yield cache |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment