dogpile Core#
dogpile
提供了一个锁定接口,围绕一对“值创建”和“值检索”函数。
版本 0.6.0 中已更改: dogpile
包封装了以前由单独的 dogpile.core
包提供的功能。
主要接口是 Lock
对象,它提供仅由一个线程和/或进程调用创建函数的功能,将所有其他线程/进程推迟到“值检索”函数,直到单个创建线程完成。
我是否需要直接学习 dogpile Core API?#
预计大多数 dogpile
用户将通过 dogpile.cache
缓存前端间接使用它。如果您属于此类别,那么简短的答案是否定的。
直接使用此处描述的核心 dogpile
API 意味着您正在构建自己的资源使用系统,该系统位于 dogpile.cache
提供的系统之外或作为其补充。
基本用法#
dogpile 提供的主要 API 是 Lock
对象。此对象允许提供互斥、值创建以及值检索的函数。
示例用法如下
from dogpile import Lock, NeedRegenerationException
import threading
import time
# store a reference to a "resource", some
# object that is expensive to create.
the_resource = [None]
def some_creation_function():
# call a value creation function
value = create_some_resource()
# get creationtime using time.time()
creationtime = time.time()
# keep track of the value and creation time in the "cache"
the_resource[0] = tup = (value, creationtime)
# return the tuple of (value, creationtime)
return tup
def retrieve_resource():
# function that retrieves the resource and
# creation time.
# if no resource, then raise NeedRegenerationException
if the_resource[0] is None:
raise NeedRegenerationException()
# else return the tuple of (value, creationtime)
return the_resource[0]
# a mutex, which needs here to be shared across all invocations
# of this particular creation function
mutex = threading.Lock()
with Lock(mutex, some_creation_function, retrieve_resource, 3600) as value:
# some function that uses
# the resource. Won't reach
# here until some_creation_function()
# has completed at least once.
value.do_something()
上述 some_creation_function()
将在 Lock
首次作为上下文管理器调用时调用。然后将此函数返回的值传递到 with
块中,应用程序代码可以在其中使用此值。在此初始期间调用 Lock
的并发线程将被阻塞,直到 some_creation_function()
完成。
创建函数首次成功完成后,对 Lock
的新调用将调用 retrieve_resource()
以获取当前缓存值及其创建时间;如果创建时间早于当前时间减去 3600 的过期时间,则将再次调用 some_creation_function()
,但仅由一个线程/进程调用,使用给定的互斥对象作为同步源。在此期间调用 Lock
的并发线程/进程将继续执行,而不会被阻塞;相反, retrieve_resource()
刚刚返回的“陈旧”值将继续返回,直到创建函数完成。
Lock
API 旨在与 Memcached 等简单缓存后端配合使用。它解决了以下问题:
值可能会在我们的过期时间达到之前随时从缓存中消失。
NeedRegenerationException
类用于向Lock
对象发出警报,表明某个值需要在通常的过期时间之前重新生成。在类似 Memcached 的系统中,没有用于“检查”键而不实际检索键的函数。使用
retrieve_resource()
函数允许我们检查现有键,同时返回现有值(如果存在),而无需进行两次单独的往返。Lock
使用的“创建”函数应将新创建的值存储在缓存中,并返回该值。这比使用两次单独的往返来分别存储和重新检索对象也更有效。
示例:直接使用 dogpile 进行缓存#
以下示例近似 Beaker 的“缓存装饰”功能,以装饰任何函数并将值存储在 Memcached 中。请注意,通常情况下,我们只使用 dogpile.cache,但是出于示例目的,我们将说明如何直接使用 Lock
对象。
我们创建一个名为 cached()
的 Python 装饰器函数,它将为单个函数的输出提供缓存。它给出了我们希望在 Memcached 中使用的“键”,并且在内部使用 Lock
,以及基于线程的互斥锁(我们将在下一部分中看到分布式互斥锁)
import pylibmc
import threading
import time
from dogpile import Lock, NeedRegenerationException
mc_pool = pylibmc.ThreadMappedPool(pylibmc.Client("localhost"))
def cached(key, expiration_time):
"""A decorator that will cache the return value of a function
in memcached given a key."""
mutex = threading.Lock()
def get_value():
with mc_pool.reserve() as mc:
value_plus_time = mc.get(key)
if value_plus_time is None:
raise NeedRegenerationException()
# return a tuple (value, createdtime)
return value_plus_time
def decorate(fn):
def gen_cached():
value = fn()
with mc_pool.reserve() as mc:
# create a tuple (value, createdtime)
value_plus_time = (value, time.time())
mc.put(key, value_plus_time)
return value_plus_time
def invoke():
with Lock(mutex, gen_cached, get_value, expiration_time) as value:
return value
return invoke
return decorate
使用上述内容,我们可以将任何函数装饰为
@cached("some key", 3600)
def generate_my_expensive_value():
return slow_database.lookup("stuff")
Lock
对象将确保一次只有一个线程执行 slow_database.lookup()
,并且仅每 3600 秒执行一次,除非 Memcached 已删除该值,在这种情况下,它将根据需要再次调用。
特别是,dogpile.core 的系统允许我们每次访问最多调用一次 memcached get() 函数,而不是 Beaker 的系统,它调用它两次,并且在我们刚刚创建该值时不会让我们调用 get()。
对于互斥锁对象,我们保留一个 threading.Lock
对象,该对象是装饰函数的局部对象,而不是使用全局锁。这将进程内锁定本地化为该一个装饰函数的局部对象。在下一部分中,我们将看到跨进程锁的使用,它以不同的方式实现这一点。
使用文件或分布式锁与 Dogpile#
到目前为止的示例使用 threading.Lock()
对象进行同步。如果我们的应用程序使用多个进程,我们将希望协调创建操作,不仅在线程上,而且在其他进程可以访问的某个互斥锁上。
在此示例中,我们将使用 lockfile 包提供的基于文件锁,它使用 unix 符号链接概念来提供文件系统级锁(该锁也已变为线程安全)。另一种策略可能直接基于 Unix os.flock()
调用,或使用 NFS 安全文件锁(如 flufl.lock),而另一种方法是针对缓存服务器进行锁定,使用类似于 使用 Memcached 作为分布式锁定服务 中所述的配方。
所有这些锁定方案的共同点是,与 Python threading.Lock
对象不同,它们都需要访问一个实际密钥,该密钥充当所有进程将协调的符号。因此,在此处,我们还需要创建“互斥锁”,我们使用 key
参数将其传递给 Lock
import lockfile
import os
from hashlib import sha1
# ... other imports and setup from the previous example
def cached(key, expiration_time):
"""A decorator that will cache the return value of a function
in memcached given a key."""
lock_path = os.path.join("/tmp", "%s.lock" % sha1(key).hexdigest())
# ... get_value() from the previous example goes here
def decorate(fn):
# ... gen_cached() from the previous example goes here
def invoke():
# create an ad-hoc FileLock
mutex = lockfile.FileLock(lock_path)
with Lock(mutex, gen_cached, get_value, expiration_time) as value:
return value
return invoke
return decorate
对于给定的密钥“some_key”,我们生成密钥的十六进制摘要,然后使用 lockfile.FileLock()
来针对文件 /tmp/53def077a4264bd3183d4eb21b1f56f883e1b572.lock
创建锁。各种进程中的任意数量的 Lock
对象现在将相互协调,使用此通用文件名作为“接力棒”,以此为基础进行新值创建。
与我们使用 threading.Lock
时不同,文件锁最终锁定在文件上,因此 FileLock()
的多个实例都将在同一文件上进行协调 - 通常情况下,依赖于 flock()
的文件锁需要非线程化使用,因此在任何情况下,每个线程的唯一文件系统锁通常都是一个好主意。