食谱#
使用 ORM 事件进行异步数据更新#
此食谱提供了一种技术,可在将更新发送到数据库时以乐观方式将新数据推送到缓存中。
使用 SQLAlchemy 进行数据库查询,假设一个简单的缓存装饰函数返回数据库查询的结果
@region.cache_on_arguments()
def get_some_data(argument):
# query database to get data
data = Session().query(DBClass).filter(DBClass.argument == argument).all()
return data
我们希望在数据更改时重新查询此特定函数。我们可以在数据更改时调用 get_some_data.invalidate(argument, hard=False)
,但这样做只会使旧值失效;新值不会在下次调用之前生成,并且还意味着至少一个客户端在生成新值时必须阻塞。我们还可以调用 get_some_data.refresh(argument)
,这将在那一刻执行数据刷新,但随后写入者会因重新查询而延迟。
第三种变体是将此查询的刷新工作卸载到后台线程或进程中。可以使用诸如 CacheRegion.async_creation_runner
的系统来实现此目的。但是,对于较小的用例,一种权宜之计是将缓存刷新操作链接到 ORM 会话的提交,如下所示
from sqlalchemy import event
from sqlalchemy.orm import Session
def cache_refresh(session, refresher, *args, **kwargs):
"""
Refresh the functions cache data in a new thread. Starts refreshing only
after the session was committed so all database data is available.
"""
assert isinstance(session, Session), \
"Need a session, not a sessionmaker or scoped_session"
@event.listens_for(session, "after_commit")
def do_refresh(session):
t = Thread(target=refresher, args=args, kwargs=kwargs)
t.daemon = True
t.start()
在数据持久化序列中,可以给定特定的 SQLAlchemy Session
和一个可调用的工作来调用 cache_refresh
def add_new_data(session, argument):
# add some data
session.add(something_new(argument))
# add a hook to refresh after the Session is committed.
cache_refresh(session, get_some_data.refresh, argument)
请注意,刷新数据的事件与用于持久化的 Session
相关联;但是,实际的刷新操作是使用不同的 Session
调用的,通常是通过线程局部注册表或直接实例化,该 Session
本地于刷新操作。
在 Redis 中为所有键添加前缀#
如果你使用 redis 实例作为后端,其中包含 dogpile.cache 设置的键之外的其他键,最好为所有 dogpile.cache 键添加唯一前缀,以避免与你自己的代码设置的键发生潜在冲突。这可以通过使用键混淆器函数轻松完成
from dogpile.cache import make_region
region = make_region(
key_mangler=lambda key: "myapp:dogpile:" + key
)
将数据编码/解码成另一种格式#
由于 dogpile 正在管理缓存的数据,因此你可能担心有效负载的大小。帮助最小化有效负载的一种可能方法是使用 ProxyBackend 即时重新编码数据,或在数据进入或离开持久性存储时对其进行转换。
在下面的示例中,我们定义了 2 个类来实现 msgpack 编码。Msgpack (http://msgpack.org/) 是一种序列化格式,非常适合类似 json 的数据,并且可以将嵌套字典序列化为比 Python 自己的 pickle 小得多的有效负载。 _EncodedProxy
是我们构建数据编码器的基类,并且继承自 dogpile 自己的 ProxyBackend。你只需使用一个类。此类将 4 个主要的 key/value 函数传递到可配置的解码器和编码器中。 MsgpackProxy
类简单地继承自 _EncodedProxy
并实现了必要的 value_decode
和 value_encode
函数。
编码 ProxyBackend 示例
from dogpile.cache.proxy import ProxyBackend
import msgpack
class _EncodedProxy(ProxyBackend):
"""base class for building value-mangling proxies"""
def value_decode(self, value):
raise NotImplementedError("override me")
def value_encode(self, value):
raise NotImplementedError("override me")
def set(self, k, v):
v = self.value_encode(v)
self.proxied.set(k, v)
def get(self, key):
v = self.proxied.get(key)
return self.value_decode(v)
def set_multi(self, mapping):
"""encode to a new dict to preserve unencoded values in-place when
called by `get_or_create_multi`
"""
mapping_set = {}
for (k, v) in mapping.iteritems():
mapping_set[k] = self.value_encode(v)
return self.proxied.set_multi(mapping_set)
def get_multi(self, keys):
results = self.proxied.get_multi(keys)
translated = []
for record in results:
try:
translated.append(self.value_decode(record))
except Exception as e:
raise
return translated
class MsgpackProxy(_EncodedProxy):
"""custom decode/encode for value mangling"""
def value_decode(self, v):
if not v or v is NO_VALUE:
return NO_VALUE
# you probably want to specify a custom decoder via `object_hook`
v = msgpack.unpackb(payload, encoding="utf-8")
return CachedValue(*v)
def value_encode(self, v):
# you probably want to specify a custom encoder via `default`
v = msgpack.packb(payload, use_bin_type=True)
return v
# extend our region configuration from above with a 'wrap'
region = make_region().configure(
'dogpile.cache.pylibmc',
expiration_time = 3600,
arguments = {
'url': ["127.0.0.1"],
},
wrap = [MsgpackProxy, ]
)