Skip to content

Instantly share code, notes, and snippets.

@HacKanCuBa
Last active March 17, 2024 07:04
Show Gist options
  • Select an option

  • Save HacKanCuBa/9fceabaeb6417ed6280c2e7a48981420 to your computer and use it in GitHub Desktop.

Select an option

Save HacKanCuBa/9fceabaeb6417ed6280c2e7a48981420 to your computer and use it in GitHub Desktop.
Cache handy helpers
from abc import ABC, abstractmethod
from contextlib import asynccontextmanager
import redis.asyncio as redis
class AsyncCacheBackend(ABC):
@abstractmethod
async def get(self, key: str | bytes) -> bytes:
...
@abstractmethod
async def set(self, key: str | bytes, value: bytes, ttl: int | None = None) -> None:
...
@abstractmethod
async def delete(self, key: str | bytes) -> None:
...
class RedisAsyncCache(AsyncCacheBackend):
def __init__(self, conn: redis.Redis):
self._redis = conn
async def get(self, key: str | bytes) -> bytes:
data = await self._redis.get(key)
if data is None:
raise KeyError(key)
return data
async def set(self, key: str | bytes, value: bytes, ttl: int | None = None) -> None:
await self._redis.set(key, value, ex=ttl)
async def delete(self, key: str | bytes) -> None:
await self._redis.delete(key)
class InMemoryAsyncCache(AsyncCacheBackend):
def __init__(self):
self._cache = {}
@staticmethod
def _build_key(key: str | bytes) -> bytes:
if isinstance(key, str):
return key.encode()
return key
async def get(self, key: str | bytes) -> bytes:
return self._cache[self._build_key(key)]
async def set(self, key: str | bytes, value: bytes, _: int | None = None) -> None:
self._cache[self._build_key(key)] = value
async def delete(self, key: str | bytes) -> None:
try:
del self._cache[self._build_key(key)]
except KeyError:
pass
class AsyncCache:
def __init__(self, backend: AsyncCacheBackend, /) -> None:
self._cache = backend
async def get(self, key: str | bytes) -> bytes:
return await self._cache.get(key)
async def set(self, key: str | bytes, value: bytes, ttl: int | None = None) -> None:
await self._cache.set(key, value, ttl)
async def delete(self, key: str | bytes) -> None:
await self._cache.delete(key)
@asynccontextmanager
async def async_redis_connection(host: str, *, port: int = 6379, db: int = 0, **kwargs: Any) -> AsyncGenerator[redis.Redis, None]:
params = {
"auto_close_connection_pool": True,
}
params.update(kwargs)
conn = redis.Redis(host=host, port=port, db=db, **params)
try:
yield conn
finally:
await conn.close()
@asynccontextmanager
async def async_cache(host: str | None = None, **kwargs: Any) -> AsyncGenerator[AsyncCache, None]:
if not host:
cache = AsyncCache(InMemoryAsyncCache())
yield cache
return
async with async_redis_connection(host, **kwargs) as conn:
cache = AsyncCache(RedisAsyncCache(conn))
yield cache
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment