dogpile Core#

dogpile 提供了一个锁定接口,围绕一对“值创建”和“值检索”函数。

版本 0.6.0 中已更改: dogpile 包封装了以前由单独的 dogpile.core 包提供的功能。

主要接口是 Lock 对象,它提供仅由一个线程和/或进程调用创建函数的功能,将所有其他线程/进程推迟到“值检索”函数,直到单个创建线程完成。

我是否需要直接学习 dogpile Core API?#

预计大多数 dogpile 用户将通过 dogpile.cache 缓存前端间接使用它。如果您属于此类别,那么简短的答案是否定的。

直接使用此处描述的核心 dogpile API 意味着您正在构建自己的资源使用系统,该系统位于 dogpile.cache 提供的系统之外或作为其补充。

基本用法#

dogpile 提供的主要 API 是 Lock 对象。此对象允许提供互斥、值创建以及值检索的函数。

示例用法如下

from dogpile import Lock, NeedRegenerationException
import threading
import time

# store a reference to a "resource", some
# object that is expensive to create.
the_resource = [None]

def some_creation_function():
    # call a value creation function
    value = create_some_resource()

    # get creationtime using time.time()
    creationtime = time.time()

    # keep track of the value and creation time in the "cache"
    the_resource[0] = tup = (value, creationtime)

    # return the tuple of (value, creationtime)
    return tup

def retrieve_resource():
    # function that retrieves the resource and
    # creation time.

    # if no resource, then raise NeedRegenerationException
    if the_resource[0] is None:
        raise NeedRegenerationException()

    # else return the tuple of (value, creationtime)
    return the_resource[0]

# a mutex, which needs here to be shared across all invocations
# of this particular creation function
mutex = threading.Lock()

with Lock(mutex, some_creation_function, retrieve_resource, 3600) as value:
      # some function that uses
      # the resource.  Won't reach
      # here until some_creation_function()
      # has completed at least once.
      value.do_something()

上述 some_creation_function() 将在 Lock 首次作为上下文管理器调用时调用。然后将此函数返回的值传递到 with 块中,应用程序代码可以在其中使用此值。在此初始期间调用 Lock 的并发线程将被阻塞,直到 some_creation_function() 完成。

创建函数首次成功完成后,对 Lock 的新调用将调用 retrieve_resource() 以获取当前缓存值及其创建时间;如果创建时间早于当前时间减去 3600 的过期时间,则将再次调用 some_creation_function(),但仅由一个线程/进程调用,使用给定的互斥对象作为同步源。在此期间调用 Lock 的并发线程/进程将继续执行,而不会被阻塞;相反, retrieve_resource() 刚刚返回的“陈旧”值将继续返回,直到创建函数完成。

Lock API 旨在与 Memcached 等简单缓存后端配合使用。它解决了以下问题:

  • 值可能会在我们的过期时间达到之前随时从缓存中消失。 NeedRegenerationException 类用于向 Lock 对象发出警报,表明某个值需要在通常的过期时间之前重新生成。

  • 在类似 Memcached 的系统中,没有用于“检查”键而不实际检索键的函数。使用 retrieve_resource() 函数允许我们检查现有键,同时返回现有值(如果存在),而无需进行两次单独的往返。

  • Lock 使用的“创建”函数应将新创建的值存储在缓存中,并返回该值。这比使用两次单独的往返来分别存储和重新检索对象也更有效。

示例:直接使用 dogpile 进行缓存#

以下示例近似 Beaker 的“缓存装饰”功能,以装饰任何函数并将值存储在 Memcached 中。请注意,通常情况下,我们只使用 dogpile.cache,但是出于示例目的,我们将说明如何直接使用 Lock 对象。

我们创建一个名为 cached() 的 Python 装饰器函数,它将为单个函数的输出提供缓存。它给出了我们希望在 Memcached 中使用的“键”,并且在内部使用 Lock,以及基于线程的互斥锁(我们将在下一部分中看到分布式互斥锁)

import pylibmc
import threading
import time
from dogpile import Lock, NeedRegenerationException

mc_pool = pylibmc.ThreadMappedPool(pylibmc.Client("localhost"))

def cached(key, expiration_time):
    """A decorator that will cache the return value of a function
    in memcached given a key."""

    mutex = threading.Lock()

    def get_value():
         with mc_pool.reserve() as mc:
            value_plus_time = mc.get(key)
            if value_plus_time is None:
                raise NeedRegenerationException()
            # return a tuple (value, createdtime)
            return value_plus_time

    def decorate(fn):
        def gen_cached():
            value = fn()
            with mc_pool.reserve() as mc:
                # create a tuple (value, createdtime)
                value_plus_time = (value, time.time())
                mc.put(key, value_plus_time)
            return value_plus_time

        def invoke():
            with Lock(mutex, gen_cached, get_value, expiration_time) as value:
                return value
        return invoke

    return decorate

使用上述内容,我们可以将任何函数装饰为

@cached("some key", 3600)
def generate_my_expensive_value():
    return slow_database.lookup("stuff")

Lock 对象将确保一次只有一个线程执行 slow_database.lookup(),并且仅每 3600 秒执行一次,除非 Memcached 已删除该值,在这种情况下,它将根据需要再次调用。

特别是,dogpile.core 的系统允许我们每次访问最多调用一次 memcached get() 函数,而不是 Beaker 的系统,它调用它两次,并且在我们刚刚创建该值时不会让我们调用 get()。

对于互斥锁对象,我们保留一个 threading.Lock 对象,该对象是装饰函数的局部对象,而不是使用全局锁。这将进程内锁定本地化为该一个装饰函数的局部对象。在下一部分中,我们将看到跨进程锁的使用,它以不同的方式实现这一点。

使用文件或分布式锁与 Dogpile#

到目前为止的示例使用 threading.Lock() 对象进行同步。如果我们的应用程序使用多个进程,我们将希望协调创建操作,不仅在线程上,而且在其他进程可以访问的某个互斥锁上。

在此示例中,我们将使用 lockfile 包提供的基于文件锁,它使用 unix 符号链接概念来提供文件系统级锁(该锁也已变为线程安全)。另一种策略可能直接基于 Unix os.flock() 调用,或使用 NFS 安全文件锁(如 flufl.lock),而另一种方法是针对缓存服务器进行锁定,使用类似于 使用 Memcached 作为分布式锁定服务 中所述的配方。

所有这些锁定方案的共同点是,与 Python threading.Lock 对象不同,它们都需要访问一个实际密钥,该密钥充当所有进程将协调的符号。因此,在此处,我们还需要创建“互斥锁”,我们使用 key 参数将其传递给 Lock

import lockfile
import os
from hashlib import sha1

# ... other imports and setup from the previous example

def cached(key, expiration_time):
    """A decorator that will cache the return value of a function
    in memcached given a key."""

    lock_path = os.path.join("/tmp", "%s.lock" % sha1(key).hexdigest())

    # ... get_value() from the previous example goes here

    def decorate(fn):
        # ... gen_cached() from the previous example goes here

        def invoke():
            # create an ad-hoc FileLock
            mutex = lockfile.FileLock(lock_path)

            with Lock(mutex, gen_cached, get_value, expiration_time) as value:
                return value
        return invoke

    return decorate

对于给定的密钥“some_key”,我们生成密钥的十六进制摘要,然后使用 lockfile.FileLock() 来针对文件 /tmp/53def077a4264bd3183d4eb21b1f56f883e1b572.lock 创建锁。各种进程中的任意数量的 Lock 对象现在将相互协调,使用此通用文件名作为“接力棒”,以此为基础进行新值创建。

与我们使用 threading.Lock 时不同,文件锁最终锁定在文件上,因此 FileLock() 的多个实例都将在同一文件上进行协调 - 通常情况下,依赖于 flock() 的文件锁需要非线程化使用,因此在任何情况下,每个线程的唯一文件系统锁通常都是一个好主意。