Administrator
Published on 2025-05-13 / 3 Visits
0

在 OpenResty 里实现并发请求合并回源

#AI

在 OpenResty 里实现并发请求合并回源,可借助 ngx.shared.DICT 共享内存和 lua-resty-lock 互斥锁来达成。下面为你详细介绍实现步骤:

实现思路

  1. 请求去重:运用请求的 URL 或者关键参数作为 key,在共享内存中记录请求状态。
  2. 请求合并:针对相同 key 的并发请求,仅让首个请求回源,其他请求则等待结果。
  3. 结果缓存:把回源的结果缓存起来,供后续请求使用,以此减少回源次数。

代码实现

下面是一个完整的 OpenResty 配置示例:

http {
    # 创建共享内存区域,用于存储请求状态和结果
    lua_shared_dict request_cache 10m;
    lua_shared_dict locks 1m;

    # 加载必要的 Lua 模块
    init_by_lua_block {
        -- 加载锁模块
        local resty_lock = require "resty.lock"
    }

    server {
        listen 80;
        server_name example.com;

        location /merge {
            # 设置请求超时时间
            content_by_lua_block {
                local key = ngx.var.arg_key or ngx.var.uri  -- 使用请求参数或 URI 作为键
                local cache = ngx.shared.request_cache
                local lock = require("resty.lock"):new("locks")

                -- 尝试从缓存获取结果
                local result, flags = cache:get(key)
                if result then
                    ngx.say("Cache hit: ", result)
                    return
                end

                -- 获取锁,确保只有一个请求去回源
                local elapsed, err = lock:lock(key)
                if not elapsed then
                    ngx.log(ngx.ERR, "failed to acquire lock: ", err)
                    ngx.exit(500)
                end

                -- 再次检查缓存,避免在等待锁的过程中已有其他请求更新了缓存
                result, flags = cache:get(key)
                if result then
                    -- 释放锁
                    local ok, err = lock:unlock()
                    if not ok then
                        ngx.log(ngx.ERR, "failed to unlock: ", err)
                    end
                    ngx.say("Cache hit after lock: ", result)
                    return
                end

                -- 回源请求(这里使用 ngx.location.capture 模拟)
                local res = ngx.location.capture("/backend", {
                    args = ngx.req.get_uri_args()
                })

                if not res then
                    ngx.log(ngx.ERR, "failed to request backend")
                    -- 释放锁
                    local ok, err = lock:unlock()
                    if not ok then
                        ngx.log(ngx.ERR, "failed to unlock: ", err)
                    end
                    ngx.exit(500)
                end

                -- 缓存结果(设置 10 秒过期时间)
                local ok, err = cache:set(key, res.body, 10)
                if not ok then
                    ngx.log(ngx.ERR, "failed to set cache: ", err)
                end

                -- 释放锁
                local ok, err = lock:unlock()
                if not ok then
                    ngx.log(ngx.ERR, "failed to unlock: ", err)
                end

                -- 返回结果
                ngx.print(res.body)
            }
        }

        location /backend {
            # 这里配置真实的后端服务
            proxy_pass http://backend_server;
        }
    }
}

配置说明

  1. 共享内存

    • request_cache:用来存储请求的结果。
    • locks:用于实现请求的互斥锁。
  2. 请求处理流程

    • 收到请求后,先检查缓存中是否有对应结果。
    • 若缓存中没有结果,则获取锁,保证只有当前请求去回源。
    • 回源获取结果后,将结果存入缓存,随后释放锁。
    • 释放锁之后,其他等待的请求会被唤醒,此时它们会直接从缓存读取结果。
  3. 关键代码解析

    • lock:lock(key):获取互斥锁,防止多个请求同时回源。
    • cache:get(key):从共享内存读取缓存结果。
    • cache:set(key, res.body, 10):把回源结果存入缓存,并设置 10 秒的过期时间。
    • ngx.location.capture:用于发起内部子请求,访问后端服务。

借助这种方式,你就能有效合并对相同资源的并发请求,减少后端的负载。