食谱#

使用 ORM 事件进行异步数据更新#

此食谱提供了一种技术,可在将更新发送到数据库时以乐观方式将新数据推送到缓存中。

使用 SQLAlchemy 进行数据库查询,假设一个简单的缓存装饰函数返回数据库查询的结果

@region.cache_on_arguments()
def get_some_data(argument):
    # query database to get data
    data = Session().query(DBClass).filter(DBClass.argument == argument).all()
    return data

我们希望在数据更改时重新查询此特定函数。我们可以在数据更改时调用 get_some_data.invalidate(argument, hard=False),但这样做只会使旧值失效;新值不会在下次调用之前生成,并且还意味着至少一个客户端在生成新值时必须阻塞。我们还可以调用 get_some_data.refresh(argument),这将在那一刻执行数据刷新,但随后写入者会因重新查询而延迟。

第三种变体是将此查询的刷新工作卸载到后台线程或进程中。可以使用诸如 CacheRegion.async_creation_runner 的系统来实现此目的。但是,对于较小的用例,一种权宜之计是将缓存刷新操作链接到 ORM 会话的提交,如下所示

from sqlalchemy import event
from sqlalchemy.orm import Session

def cache_refresh(session, refresher, *args, **kwargs):
    """
    Refresh the functions cache data in a new thread. Starts refreshing only
    after the session was committed so all database data is available.
    """
    assert isinstance(session, Session), \
        "Need a session, not a sessionmaker or scoped_session"

    @event.listens_for(session, "after_commit")
    def do_refresh(session):
        t = Thread(target=refresher, args=args, kwargs=kwargs)
        t.daemon = True
        t.start()

在数据持久化序列中,可以给定特定的 SQLAlchemy Session 和一个可调用的工作来调用 cache_refresh

def add_new_data(session, argument):
    # add some data
    session.add(something_new(argument))

    # add a hook to refresh after the Session is committed.
    cache_refresh(session, get_some_data.refresh, argument)

请注意,刷新数据的事件与用于持久化的 Session 相关联;但是,实际的刷新操作是使用不同的 Session 调用的,通常是通过线程局部注册表或直接实例化,该 Session 本地于刷新操作。

在 Redis 中为所有键添加前缀#

如果你使用 redis 实例作为后端,其中包含 dogpile.cache 设置的键之外的其他键,最好为所有 dogpile.cache 键添加唯一前缀,以避免与你自己的代码设置的键发生潜在冲突。这可以通过使用键混淆器函数轻松完成

from dogpile.cache import make_region

region = make_region(
  key_mangler=lambda key: "myapp:dogpile:" + key
)

将数据编码/解码成另一种格式#

由于 dogpile 正在管理缓存的数据,因此你可能担心有效负载的大小。帮助最小化有效负载的一种可能方法是使用 ProxyBackend 即时重新编码数据,或在数据进入或离开持久性存储时对其进行转换。

在下面的示例中,我们定义了 2 个类来实现 msgpack 编码。Msgpack (http://msgpack.org/) 是一种序列化格式,非常适合类似 json 的数据,并且可以将嵌套字典序列化为比 Python 自己的 pickle 小得多的有效负载。 _EncodedProxy 是我们构建数据编码器的基类,并且继承自 dogpile 自己的 ProxyBackend。你只需使用一个类。此类将 4 个主要的 key/value 函数传递到可配置的解码器和编码器中。 MsgpackProxy 类简单地继承自 _EncodedProxy 并实现了必要的 value_decodevalue_encode 函数。

编码 ProxyBackend 示例

from dogpile.cache.proxy import ProxyBackend
import msgpack

class _EncodedProxy(ProxyBackend):
    """base class for building value-mangling proxies"""

    def value_decode(self, value):
        raise NotImplementedError("override me")

    def value_encode(self, value):
        raise NotImplementedError("override me")

    def set(self, k, v):
        v = self.value_encode(v)
        self.proxied.set(k, v)

    def get(self, key):
        v = self.proxied.get(key)
        return self.value_decode(v)

    def set_multi(self, mapping):
        """encode to a new dict to preserve unencoded values in-place when
           called by `get_or_create_multi`
           """
        mapping_set = {}
        for (k, v) in mapping.iteritems():
            mapping_set[k] = self.value_encode(v)
        return self.proxied.set_multi(mapping_set)

    def get_multi(self, keys):
        results = self.proxied.get_multi(keys)
        translated = []
        for record in results:
            try:
                translated.append(self.value_decode(record))
            except Exception as e:
                raise
        return translated


class MsgpackProxy(_EncodedProxy):
    """custom decode/encode for value mangling"""

    def value_decode(self, v):
        if not v or v is NO_VALUE:
            return NO_VALUE
        # you probably want to specify a custom decoder via `object_hook`
        v = msgpack.unpackb(payload, encoding="utf-8")
        return CachedValue(*v)

    def value_encode(self, v):
        # you probably want to specify a custom encoder via `default`
        v = msgpack.packb(payload, use_bin_type=True)
        return v

# extend our region configuration from above with a 'wrap'
region = make_region().configure(
    'dogpile.cache.pylibmc',
    expiration_time = 3600,
    arguments = {
        'url': ["127.0.0.1"],
    },
    wrap = [MsgpackProxy, ]
)