目录
redis_decorator
安装
pip install redis_decorator
查看源代码
from io import StringIO
import pandas as pd
import json
import functools
class Cache:
def __init__(self, redis_instance):
if type(redis_instance.echo("hello")) == str:
self.cache_container = redis_instance
else:
raise AttributeError(
"Redis instance's decode_responses must be set True. Use StrictRedis(..., decode_responses=True)")
def key_generator(self, func, *args, **kwargs):
return ":".join(
["redis_dec", str(":".join([func.__name__, *[str(i) for i in args], str(kwargs)]))])
def ttl(self, ttl=None, force_refresh=False):
def enable(func):
@functools.wraps(func)
def func_wrapper(*args, **kwargs):
target_key = self.key_generator(func, *args, **kwargs)
a = self.cache_container.get(target_key)
if a:
return a
else:
result = func(*args, **kwargs)
self.cache_container.set(target_key, result, ttl)
return result
return func_wrapper
return enable
def delete_cache(self, func=None, *args, **kwargs):
if func is None:
print("Delete all the redis_dec")
key = self.cache_container.scan(match="redis_dec:*")[1]
elif not args and not kwargs:
print("Remove every result related to this function")
key = self.cache_container.scan(match=":".join(["redis_dec", func.__name__, "*"]))[1]
else:
key = [self.key_generator(func, *args, **kwargs)]
if key:
self.cache_container.delete(*key)
return 0
def _ser_df(self, func):
@functools.wraps(func)
def func_wrapper(*args, **kwargs):
return func(*args, **kwargs).to_csv()
return func_wrapper
def _de_ser_df(self, func):
@functools.wraps(func)
def func_wrapper(*args, **kwargs):
tmp = pd.read_csv(StringIO(func(*args, **kwargs)))
return tmp.set_index(tmp.columns[0])
return func_wrapper
def df(self, ttl=None):
def deco(func):
for dec in [self._ser_df, self.ttl(ttl), self._de_ser_df]:
func = dec(func)
return func
return deco
def _ser_number(self, func):
@functools.wraps(func)
def func_wrapper(*args, **kwargs):
return str(func(*args, **kwargs))
return func_wrapper
def _de_ser_int(self, func):
@functools.wraps(func)
def func_wrapper(*args, **kwargs):
return int(func(*args, **kwargs))
return func_wrapper
def _de_ser_float(self, func):
@functools.wraps(func)
def func_wrapper(*args, **kwargs):
return float(func(*args, **kwargs))
return func_wrapper
def int(self, ttl=None):
def deco(func):
for dec in [self._ser_number, self.ttl(ttl), self._de_ser_int]:
func = dec(func)
return func
return deco
def float(self, ttl=None):
def deco(func):
for dec in [self._ser_number, self.ttl(ttl), self._de_ser_float]:
func = dec(func)
return func
return deco
def _ser_dict(self, func):
@functools.wraps(func)
def func_wrapper(*args, **kwargs):
return json.dumps(func(*args, **kwargs))
return func_wrapper
def _de_ser_dict(self, func):
@functools.wraps(func)
def func_wrapper(*args, **kwargs):
return json.loads(func(*args, **kwargs))
return func_wrapper
def dict(self, ttl=None):
def deco(func):
for dec in [self._ser_dict, self.ttl(ttl), self._de_ser_dict]:
func = dec(func)
return func
return deco
def list(self, ttl=None):
def deco(func):
for dec in [self._ser_dict, self.ttl(ttl), self._de_ser_dict]:
func = dec(func)
return func
return deco
def _de_ser_json(self, func):
@functools.wraps(func)
def func_wrapper(*args, **kwargs):
return json.loads(func(*args, **kwargs))
return func_wrapper
def _ser_json(self, func):
@functools.wraps(func)
def func_wrapper(*args, **kwargs):
return json.dumps(json.loads(func(*args, **kwargs)))
return func_wrapper
def json(self, ttl=None):
def deco(func):
for dec in [self._ser_json, self.ttl(ttl), self._de_ser_json]:
func = dec(func)
return func
return deco
if __name__ == '__main__':
pass
- Cache类:缓存
- __init__:初始化,必须使用StrictRedis、参数decode_responses=True
- ttl:设置过期时间,单位s
- key_generator:生成key,没填写时,生成redis_dec:方法名:参数1:参数2…形式的key
- delete_cache:删除缓存,删除全部缓存、指定函数缓存、指定函数及参数缓存
- df:缓存df类型数据
- int: 缓存int类型数据
- float: 缓存float类型数据
- dict:缓存dict类型数据
- list :缓存list类型数据
- json:缓存json类型数据,和dict一样
使用
测试缓存int值
python -m unittest test.Test.test_my_int_function
测试批量删除
python -m unittest test.Test.test_delete
测试key生成与删除
python -m unittest test.Test.test_gen_key_delete
redis_decorators
安装
这里先安装一个其他人写的,体验一下
pip install redis_decorators
Successfully installed async-timeout-4.0.3 importlib-metadata-6.7.0 redis-5.0.3 redis-decorators-1.0.1 typing-extensions-4.7.1 zipp-3.15.0
注意:使用时如果报错,需要先删除__init__.py的12行
查看源代码
__init__.py
from .cache_element import CacheDateTime, CacheElement, CacheElementSingleType
from .cacheable import (
Cacheable,
DictCacheable,
DictCacheType,
DictStringCacheable,
ListCacheable,
ListCacheType,
StringCacheable
)
from .caching import RedisCaching, build_redis_url
初始化,导入各种包
cacheable.py 各种可缓存的类型
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, Generic, List, Optional, TypeVar
from redis import Redis
StoreType = TypeVar('StoreType')
ListCacheType = List[str]
DictCacheType = Dict[str, str]
class Cacheable(Generic[StoreType], ABC):
"""Performs caching store and fetch operations for a specific type. Subclass to define how to handle a specific type. """
@abstractmethod
def store(self, client: Redis, key: str, value: StoreType) -> None:
"""Store a value in cache. Args: client (Redis): Cache to store in. key (str): Name of cache value. value (StoreType): Value to store. Returns: None """
pass # pragma: nocover
@abstractmethod
def fetch(self, client: Redis, key: str) -> Optional[StoreType]:
"""Fetch a value from cache. Args: client (Redis): Cache to fetch from. key (str): Name of cache value. Returns: StoreType or None: Value fetched from cache or None if no value exists. """
pass # pragma: nocover
class StringCacheable(Cacheable[str]):
def store(self, client: Redis, key: str, value: str):
client.set(key, value)
def fetch(self, client: Redis, key: str) -> Optional[str]:
return client.get(key)
@dataclass
class DictStringCacheable(Cacheable[str]):
""" Attributes: dict_key (str): Name of hash value. """
dict_key: str
def store(self, client: Redis, key: str, value: str):
client.hset(key, self.dict_key, value)
def fetch(self, client: Redis, key: str) -> Optional[str]:
return client.hget(key, self.dict_key)
class DictCacheable(Cacheable[DictCacheType]):
def store(self, client: Redis, key: str, value: DictCacheType):
client.hset(key, mapping=value)
def fetch(self, client: Redis, key: str) -> Optional[DictCacheType]:
return client.hgetall(key) or None
class ListCacheable(Cacheable[ListCacheType]):
def store(self, client: Redis, key: str, value: ListCacheType):
client.delete(key)
client.rpush(key, *value)
def fetch(self, client: Redis, key: str) -> Optional[ListCacheType]:
return client.lrange(key, 0, -1) or None
- Cacheable类:抽象类,不可实例化。对特定类型执行缓存存储(store)和提取(fetch)操作。子类来定义如何处理特定类型。
- StringCacheable类:Cacheable子类,使用set、get处理字符串。
- DictStringCacheable类:Cacheable子类,使用hset、hget处理字典,有dict_key,有dataclass装饰,将会自动添加__init__等方法。
- DictCacheable类:Cacheable子类,使用hset、hgetall处理字典。
- ListCacheable类:Cacheable子类,使用delete、rpush、lrange处理列表。
cache_element.py 缓存的元素
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Generic, Optional, TypeVar
from redis import Redis
from .cacheable import Cacheable, StringCacheable
FetchType = TypeVar('FetchType')
StoreType = TypeVar('StoreType')
class CacheElement(Generic[FetchType, StoreType], ABC):
"""Get and set cache values.
Attributes:
cacheable (Cacheable): Instance used to store and fetch values.
"""
cacheable: Cacheable[StoreType]
def get_value(self, client: Redis, key: str) -> Optional[FetchType]:
"""Returns cached value or None if no value exists."""
value = self.cacheable.fetch(client, key)
if value is None:
return None
return self.load(value)
def set_value(self, client: Redis, key: str, value: FetchType) -> None:
"""Set value in cache.
Args:
client (Redis): Cache to fetch from.
key (str): Name of cache value.
Returns:
None
"""
self.cacheable.store(client, key, self.dump(value))
@abstractmethod
def load(self, value: StoreType) -> FetchType:
"""Load value from cache into expected Python type."""
pass # pragma: nocover
@abstractmethod
def dump(self, value: FetchType) -> StoreType:
"""Dump value from Python type into type expected by cache."""
pass # pragma: nocover
@dataclass
class CacheElementSingleType(CacheElement[FetchType, FetchType]):
"""A CacheElement that fetches the same type that it stores.
By default, values are passed to and from cache as-is, i.e. no serialization
or deserialization is performed.
For example, a string can be stored and fetched without modification whereas
a datetime would need to be serialized for storage and deserialized for
retrieval.
"""
cacheable: Cacheable[FetchType]
def load(self, value: FetchType) -> FetchType:
return value
def dump(self, value: FetchType) -> FetchType:
return value
@dataclass
class CacheDateTime(CacheElement[datetime, str]):
"""Store and fetch datetime values with string serialization."""
cacheable: Cacheable[str] = StringCacheable()
def dump(self, value: datetime) -> str:
return value.isoformat()
def load(self, value: str) -> datetime:
return datetime.fromisoformat(value)
- CacheElement类:抽象类。缓存元素,子类实现设置和获取缓存值。
- CacheElementSingleType类:CacheElement子类,有cacheable,有dataclass装饰,自动添加__init__等方法。
- CacheDateTime类:CacheElement子类,通过字符串序列化的方式缓存日期类型,有cacheable,有dataclass装饰,自动添加__init__等方法。
caching.py 缓存主要逻辑
from datetime import timedelta
from functools import partial, update_wrapper
from typing import Any, Callable, Optional, Union
from redis import Redis
from .cache_element import CacheDateTime, CacheElement, CacheElementSingleType
from .cacheable import (
DictCacheable,
DictCacheType,
DictStringCacheable,
ListCacheable,
ListCacheType,
StringCacheable
)
class RedisCaching:
"""Provides decorators for automatic caching."""
cache_cls = Redis
_cache_instances = {
}
def __init__(self, url=None, **kwargs):
self._default_cache_kwargs = {
'decode_responses': True, 'socket_timeout': 15}
self.init(url, **kwargs)
def init(self, url, **kwargs):
self._url = url
self._cache_kwargs = {
**self._default_cache_kwargs,
**kwargs,
}
def get_cache(self) -> Redis:
if self._url in self._cache_instances:
return self._cache_instances.get(self._url)
cache = self.cache_cls.from_url(self._url, **self._cache_kwargs)
self._cache_instances[self._url] = cache
return cache
def delete(self, cache_key: str):
self.get_cache().delete(cache_key)
def cache_value(
self,
cache_element: CacheElement,
get_cache_key: Callable[..., str] = None,
expire_in: Union[int, timedelta] = None,
):
"""Decorate a function to automatically cache its return value. Wrapper does two things: 1. If cached value does not exist, cache the return value of the function. 2. If cached value exists, return it instead of calling the function. Args: cache_element (CacheElement): Instance used to get and set cache value. get_cache_key (Callable): Function that returns name of cache value. Accepts the same arguments as the decorated function. expire_in (Union[int, timedelta]): Number of seconds until this key expires after being set. Can be a datetime.timedelta object. Examples: Decorate a function that returns a string: .. code-block:: python @cache.cache_string(get_cache_key=lambda arg: f'object:{arg}') def expensive_fetch_operation(arg) -> str: ... return computed_value Use `cache_key` of decorated function to set `get_cache_key`: .. code-block:: python @cache.cache_string() def expensive_fetch_operation(arg) -> str: ... return computed_value @expensive_fetch_operation.cache_key def expensive_fetch_operation_cache_key(arg) -> str: ... return computed_value """
def decorator(func):
return CacheValueWrapper(
self, func, cache_element, get_cache_key, expire_in
)
return decorator
def cache_string(self, get_cache_key: Callable[..., str] = None, **kwargs):
"""Decorate a function to store a string."""
return self.cache_value(
CacheElementSingleType[str](cacheable=StringCacheable()),
get_cache_key,
**kwargs,
)
def cache_dict(self, get_cache_key: Callable[..., str] = None, **kwargs):
"""Decorate a function to store a dictionary {str: str}."""
return self.cache_value(
CacheElementSingleType[DictCacheType](cacheable=DictCacheable()),
get_cache_key,
**kwargs,
)
def cache_dict_string(self, dict_key: str, get_cache_key=None, **kwargs):
"""Decorate a function to store a specific key inside a cached hash."""
return self.cache_value(
CacheElementSingleType[str](cacheable=DictStringCacheable(dict_key)),
get_cache_key,
**kwargs,
)
def cache_list(self, get_cache_key: Callable[..., str] = None, **kwargs):
"""Decorate a function to store a list of strings."""
return self.cache_value(
CacheElementSingleType[ListCacheType](cacheable=ListCacheable()),
get_cache_key,
**kwargs,
)
def cache_datetime(self, get_cache_key: Callable[..., str] = None, **kwargs):
"""Decorate a function to store a datetime."""
return self.cache_value(CacheDateTime(), get_cache_key, **kwargs)
class CacheValueWrapper:
def __init__(
self,
caching: RedisCaching,
func: Callable,
cache_element: CacheElement,
get_cache_key: Optional[Callable[..., str]] = None,
expire_in: Union[int, timedelta] = None,
):
self._caching = caching
self._func = func
self._cache_element = cache_element
self._get_cache_key = get_cache_key
self._expire_in = expire_in
update_wrapper(self, func)
def __call__(self, *args: Any, **kwargs: Any):
cache_key = self._calculate_cache_key(*args, **kwargs)
value = self._cache_element.get_value(self._caching.get_cache(), cache_key)
if value is None:
client = self._caching.get_cache()
value = self._func(*args, **kwargs)
self._cache_element.set_value(client, cache_key, value)
expire_in = self._calculate_expire_in(value, *args, **kwargs)
if expire_in:
client.expire(cache_key, expire_in)
return value
def __get__(self, instance, owner):
return partial(self, instance)
def cache_key(self, func):
self._get_cache_key = func
return func
def expire_in(self, func):
self._expire_in = func
return func
def _calculate_expire_in(self, value, *args, **kwargs):
if callable(self._expire_in):
kwargs['value'] = value
return self._expire_in(*args, **kwargs)
return self._expire_in
def _calculate_cache_key(self, *args: Any, **kwargs: Any):
if self._get_cache_key is None:
arg_str = ':'.join([self._func.__name__, *[str(arg) for arg in args], str(kwargs)])
return ':'.join(['redis_decorators', arg_str])
return self._get_cache_key(*args, **kwargs)
def build_redis_url(host, password, db, use_secure=True):
prefix = 'rediss' if use_secure else 'redis'
if password:
url = f'{
prefix}://:{
password}@{
host}'
else:
url = f'{
prefix}://{
host}'
if db:
url = f'{
url}/{
db}'
return url
-
RedisCaching类:提供自动化缓存装饰器
- get_cache方法:获取Redis实例
- delete方法:删除指定key
- cache_value:装饰器,自动缓存函数返回值
- cache_string:缓存字符串
- cache_dict:缓存字典
- cache_dict_string:
- cache_list:缓存列表
- cache_datetime:缓存日期时间
-
CacheValueWrapper类:
- cache_key:缓存key
- expire_in:过期时间
- _calculate_cache_key:如果cache_key为None时,计算key为redis_decorators:函数名:参数1:参数2…
- _calculate_expire_in:计算过期时间
-
build_redis_url:根据参数构建url
使用
缓存字符串
python -m unittest test.Test.test_my_string_function
python -m unittest test.Test.test_my_datetime_function
python -m unittest test.Test.test_my_list_function
python -m unittest test.Test.test_my_dict_function
测试删除
python -m unittest test.Test.test_delete_key
总结
redis缓存装饰器需要考虑以下几点:
- key生成:用户不指定key时需要自动生成,统一前缀方便后序更新、删除
- 指定key:用户可以指定key(redis_decorator无法指定key)
- 缓存和获取功能
- 常见类型的缓存支持
- 用户自定义类型可以继承抽象类,实现序列化和反序列化方法(redis_decorator没有)
- 指定key的更新(包没有)
- 指定或批量key的删除(redis_decorators只有删除指定key)
- 支持协程(包没有)
全部代码
main.py
from redis import StrictRedis
from redis_dec import Cache
from redis_decorators import RedisCaching
from datetime import datetime
from settings import REDIS_URL
rediss = StrictRedis.from_url(REDIS_URL,decode_responses=True)
redis_cache = Cache(rediss)
caching = RedisCaching(REDIS_URL)
cache = caching.get_cache() # Redis instance
@redis_cache.int()
def my_int_function(a: int, b: int):
print(f"my_int_function excuting with {
a} {
b}...")
return a+b
@caching.cache_string()
def my_string_function(arg1, arg2):
print(f"my_string_function excuting with {
arg1} {
arg2}...")
return str(arg1+arg2)
@caching.cache_datetime()
def my_datetime_function(date):
print(f"my_date_function excuting with {
date}")
return datetime.now()
@caching.cache_list()
def my_list_function(*arg):
print(f"my_list_function excuting with {
arg}")
return list(arg)
@caching.cache_dict()
def my_dict_function(data:dict):
print(f"my_dict_function excuting with {
data}")
return data
test.py
import unittest
class Test(unittest.TestCase):
def test_gen_key_delete(self):
from main import redis_cache
from main import my_int_function
key = redis_cache.key_generator(my_int_function,5,6)
redis_cache.cache_container.delete(key)
def test_my_int_function(self):
from main import my_int_function
res = my_int_function(5,6)
print(res)
def test_delete(self):
from main import redis_cache
from main import my_int_function
res = redis_cache.delete_cache(func=my_int_function)
print(res)
def test_my_string_function(self):
from main import my_string_function
res = my_string_function(arg1=1,arg2=2)
print(res)
def test_my_datetime_function(self):
from main import my_datetime_function
res = my_datetime_function("now")
print(res)
def test_my_list_function(self):
from main import my_list_function
res = my_list_function(4,5,6)
print(res)
def test_my_dict_function(self):
from main import my_dict_function
res = my_dict_function({
"1":1})
print(res)
def test_delete_key(self):
from main import caching
from typing import Any
func_name = "my_datetime_function"
def _calculate_cache_key(*args: Any, **kwargs: Any):
arg_str = ':'.join([func_name, *[str(arg) for arg in args], str(kwargs)])
return ':'.join(['redis_decorators', arg_str])
caching.delete(_calculate_cache_key("now"))
参考
python3-abc包
python3-dataclasses包
pypi-redis-decorators
github-python-redis-decorators
文章评论