我经常遇到需要缓存这个或那个的情况。通常,这些值会被缓存一段时间。您可能熟悉这种模式。您尝试从缓存中获取一个值,如果成功,则将其返回给调用者并结束。如果该值不存在,您将获取它(很可能从数据库中)或计算它并将其放入缓存中。在大多数情况下,这非常有效。但是,如果您用于缓存条目的密钥被频繁访问,并且计算数据的操作需要一段时间,您最终会遇到多个并行请求同时发生缓存未命中的情况。所有这些请求都将从源独立加载并将值存储在缓存中。这会导致资源浪费,甚至可能导致拒绝服务。
让我举个例子来说明一下。我将使用 Redis 进行缓存,并在顶部使用一个简单的 Go http 服务器。完整代码如下:
package main import ( "errors" "log" "net/http" "time" "github.com/redis/go-redis/v9" ) type handler struct { rdb *redis.Client cacheTTL time.Duration } func (ch *handler) simple(w http.ResponseWriter, r *http.Request) { cacheKey := "my_cache_key" // we'll use 200 to signify a cache hit & 201 to signify a miss responseCode := http.StatusOK cachedData, err := ch.rdb.Get(r.Context(), cacheKey).Result() if err != nil { if !errors.Is(err, redis.Nil) { log.Println("could not reach redis", err.Error()) http.Error(w, "could not reach redis", http.StatusInternalServerError) return } // cache miss - fetch & store res := longRunningOperation() responseCode = http.StatusCreated err = ch.rdb.Set(r.Context(), cacheKey, res, ch.cacheTTL).Err() if err != nil { log.Println("failed to set cache value", err.Error()) http.Error(w, "failed to set cache value", http.StatusInternalServerError) return } cachedData = res } w.WriteHeader(responseCode) _, _ = w.Write([]byte(cachedData)) } func longRunningOperation() string { time.Sleep(time.Millisecond * 500) return "hello" } func main() { ttl := time.Second * 3 rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) handler := &handler{ rdb: rdb, cacheTTL: ttl, } http.HandleFunc("/simple", handler.simple) if err := http.ListenAndServe(":8080", nil); err != nil { log.Fatalf("Could not start server: %s\n", err.Error()) } }
让我们在 /simple 端点上施加一些负载,看看会发生什么。我将使用 vegeta 来实现此目的。
我运行 vegeta Attack -duration=30s -rate=500 -targets=./targets_simple.txt > res_simple.bin。 Vegeta 最终每秒发出 500 个请求,持续 30 秒。我将它们绘制为 HTTP 结果代码的直方图,其中每个桶的跨度为 100 毫秒。结果如下图。
当我们开始实验时,缓存是空的——我们没有存储任何值。当一堆请求到达我们的服务器时,我们得到了最初的踩踏。他们都检查缓存,没有发现任何内容,调用 longRunningOperation 并将其存储在缓存中。由于 longRunningOperation 大约需要 500 毫秒才能完成前 500 毫秒内发出的任何请求,最终都会调用 longRunningOperation。一旦其中一个请求设法将值存储在缓存中,所有后续请求都会从缓存中获取该值,我们开始看到状态代码为 200 的响应。然后,随着 Redis 上的过期机制启动,该模式每 3 秒重复一次。
在这个玩具示例中,这不会导致任何问题,但在生产环境中,这可能会导致系统上不必要的负载、用户体验下降,甚至自我诱导的拒绝服务。那么我们怎样才能防止这种情况发生呢?嗯,有几种方法。我们可以引入锁——任何缓存未命中都会导致代码尝试实现锁。分布式锁定并不是一件简单的事情,通常它们有微妙的边缘情况,需要微妙的处理。我们还可以使用后台作业定期重新计算该值,但这需要运行一个额外的进程,引入另一个需要在我们的代码中维护和监视的齿轮。如果您有动态缓存键,则此方法也可能不可行。还有另一种方法,称为概率提前过期,这是我想进一步探索的方法。
这种技术允许人们根据概率重新计算该值。从缓存中获取值时,您还可以根据概率计算是否需要重新生成缓存值。越接近现有价值到期,概率就越高。
我的具体实现基于 A. Vattani、F.Chierichetti 和 K. Lowenstein 在《最佳概率缓存踩踏预防》中的 XFetch。
我将在 HTTP 服务器上引入一个新端点,该端点也将执行昂贵的计算,但这次在缓存时使用 XFetch。为了使 XFetch 工作,我们需要存储昂贵的操作花费了多长时间(增量)以及缓存键何时过期。为了实现这一目标,我将引入一个结构体来保存这些值以及消息本身:
type probabilisticValue struct { Message string Expiry time.Time Delta time.Duration }
我添加一个函数来用这些属性包装原始消息并将其序列化以存储在redis中:
func wrapMessage(message string, delta, cacheTTL time.Duration) (string, error) { bts, err := json.Marshal(probabilisticValue{ Message: message, Delta: delta, Expiry: time.Now().Add(cacheTTL), }) if err != nil { return "", fmt.Errorf("could not marshal message: %w", err) } return string(bts), nil }
我们还编写一个方法来重新计算并将值存储到redis中:
func (ch *handler) recomputeValue(ctx context.Context, cacheKey string) (string, error) { start := time.Now() message := longRunningOperation() delta := time.Since(start) wrapped, err := wrapMessage(message, delta, ch.cacheTTL) if err != nil { return "", fmt.Errorf("could not wrap message: %w", err) } err = ch.rdb.Set(ctx, cacheKey, wrapped, ch.cacheTTL).Err() if err != nil { return "", fmt.Errorf("could not save value: %w", err) } return message, nil }
为了确定是否需要根据概率更新值,我们可以在 probabilisticValue 中添加一个方法:
func (pv probabilisticValue) shouldUpdate() bool { // suggested default param in XFetch implementation // if increased - results in earlier expirations beta := 1.0 now := time.Now() scaledGap := pv.Delta.Seconds() * beta * math.Log(rand.Float64()) return now.Sub(pv.Expiry).Seconds() >= scaledGap }
如果我们将其全部连接起来,我们最终会得到以下处理程序:
func (ch *handler) probabilistic(w http.ResponseWriter, r *http.Request) { cacheKey := "probabilistic_cache_key" // we'll use 200 to signify a cache hit & 201 to signify a miss responseCode := http.StatusOK cachedData, err := ch.rdb.Get(r.Context(), cacheKey).Result() if err != nil { if !errors.Is(err, redis.Nil) { log.Println("could not reach redis", err.Error()) http.Error(w, "could not reach redis", http.StatusInternalServerError) return } res, err := ch.recomputeValue(r.Context(), cacheKey) if err != nil { log.Println("could not recompute value", err.Error()) http.Error(w, "could not recompute value", http.StatusInternalServerError) return } responseCode = http.StatusCreated cachedData = res w.WriteHeader(responseCode) _, _ = w.Write([]byte(cachedData)) return } pv := probabilisticValue{} err = json.Unmarshal([]byte(cachedData), &pv) if err != nil { log.Println("could not unmarshal probabilistic value", err.Error()) http.Error(w, "could not unmarshal probabilistic value", http.StatusInternalServerError) return } if pv.shouldUpdate() { _, err := ch.recomputeValue(r.Context(), cacheKey) if err != nil { log.Println("could not recompute value", err.Error()) http.Error(w, "could not recompute value", http.StatusInternalServerError) return } responseCode = http.StatusAccepted } w.WriteHeader(responseCode) _, _ = w.Write([]byte(cachedData)) }
处理程序的工作方式与第一个处理程序非常相似,但是,在获得缓存命中后,我们就掷骰子。根据结果,我们要么只返回刚刚获取的值,要么提前更新该值。
我们将使用 HTTP 状态代码来确定 3 种情况:
这次我再次启动 vegeta 在新端点上运行,结果如下:
那里的微小蓝色斑点表明我们实际上何时提前结束了缓存值的更新。在初始预热期后,我们不再看到缓存未命中。为了避免初始峰值,如果这对您的用例很重要,您可以预先存储缓存值。
如果您想更积极地进行缓存并更频繁地刷新值,您可以使用 beta 参数。以下是将 beta 参数设置为 2 时相同实验的结果:
我们现在看到概率更新更加频繁。
总而言之,这是一个巧妙的小技术,可以帮助避免缓存踩踏。但请记住,这仅在您定期从缓存中获取相同密钥时才有效 - 否则您不会看到太多好处。
有另一种方法来处理缓存踩踏吗?注意到一个错误吗?请在下面的评论中告诉我!
免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。
Copyright© 2022 湘ICP备2022001581号-3