Skip to content

Instantly share code, notes, and snippets.

@HacKanCuBa
Last active March 17, 2024 07:04
Show Gist options
  • Select an option

  • Save HacKanCuBa/9fceabaeb6417ed6280c2e7a48981420 to your computer and use it in GitHub Desktop.

Select an option

Save HacKanCuBa/9fceabaeb6417ed6280c2e7a48981420 to your computer and use it in GitHub Desktop.
Cache handy helpers
from abc import ABC, abstractmethod
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import redis.asyncio as redis
class AsyncCacheBackend(ABC):
@abstractmethod
async def get(self, key: str | bytes) -> bytes:
...
@abstractmethod
async def set(self, key: str | bytes, value: bytes, ttl: int | None = None) -> None:
...
@abstractmethod
async def delete(self, key: str | bytes) -> None:
...
class RedisAsyncCache(AsyncCacheBackend):
def __init__(self, conn: redis.Redis):
self._redis = conn
async def get(self, key: str | bytes) -> bytes:
data = await self._redis.get(key)
if data is None:
raise KeyError(key)
return data
async def set(self, key: str | bytes, value: bytes, ttl: int | None = None) -> None:
await self._redis.set(key, value, ex=ttl)
async def delete(self, key: str | bytes) -> None:
await self._redis.delete(key)
class InMemoryAsyncCache(AsyncCacheBackend):
def __init__(self):
self._cache = {}
@staticmethod
def _build_key(key: str | bytes) -> bytes:
if isinstance(key, str):
return key.encode()
return key
async def get(self, key: str | bytes) -> bytes:
return self._cache[self._build_key(key)]
async def set(self, key: str | bytes, value: bytes, _: int | None = None) -> None:
self._cache[self._build_key(key)] = value
async def delete(self, key: str | bytes) -> None:
try:
del self._cache[self._build_key(key)]
except KeyError:
pass
class AsyncCache:
def __init__(self, backend: AsyncCacheBackend, /) -> None:
self._cache = backend
async def get(self, key: str | bytes) -> bytes:
return await self._cache.get(key)
async def set(self, key: str | bytes, value: bytes, ttl: int | None = None) -> None:
await self._cache.set(key, value, ttl)
async def delete(self, key: str | bytes) -> None:
await self._cache.delete(key)
@asynccontextmanager
async def async_redis_connection(host: str, *, port: int = 6379, db: int = 0, **kwargs: Any) -> AsyncGenerator[redis.Redis, None]:
params = {
"auto_close_connection_pool": True,
}
params.update(kwargs)
conn = redis.Redis(host=host, port=port, db=db, **params)
try:
yield conn
finally:
await conn.close()
@asynccontextmanager
async def async_cache(host: str | None = None, **kwargs: Any) -> AsyncGenerator[AsyncCache, None]:
if not host:
cache = AsyncCache(InMemoryAsyncCache())
yield cache
return
async with async_redis_connection(host, **kwargs) as conn:
cache = AsyncCache(RedisAsyncCache(conn))
yield cache
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment