Redis脚本开发:从Lua基础到企业级应用实践

Redis 脚本核心运行机制

Redis脚本基于Lua 5.1解释器实现,通过单线程模型保证原子性操作。每个脚本执行时会创建独立的Lua环境,这种沙箱机制隔离了系统级操作,确保脚本无法执行危险命令。当执行EVAL "return redis.call('GET', 'key')" 0时,Redis会先编译脚本为字节码再执行,编译后的脚本会缓存至内存。

脚本参数传递遵循特定规则:

  • KEYS数组存放Redis键名
  • ARGV数组存放其他参数
  • 必须显式声明KEYS数量(第二个参数)

错误处理机制包含三个层级:

  1. 语法错误:预处理阶段直接拒绝(如EVAL "retun 1" 0
  2. 运行时错误:抛出Lua异常(如操作不存在的数据类型)
  3. Redis调用错误:通过redis.error_reply函数处理
-- 错误处理示例
local value = redis.call('GET', KEYS[1])
if not value then
    return redis.error_reply("KEY_NOT_EXIST")
end
return tonumber(value) * tonumber(ARGV[1])

EVAL与EVALSHA深度解析

EVAL命令的完整语法:

EVAL script numkeys key [key ...] arg [arg ...]

执行过程包含四个阶段:

  1. 语法检查
  2. 编译脚本(生成SHA1摘要)
  3. 存入脚本缓存
  4. 执行字节码

EVALSHA通过预计算的SHA1摘要调用脚本,显著降低网络开销。建议生产环境配合SCRIPT LOAD使用:

# 预加载脚本
SHA=$(redis-cli SCRIPT LOAD "$(cat my_script.lua)")

# 调用脚本
redis-cli EVALSHA $SHA 2 key1 key2 arg1

脚本缓存管理策略:

  • 使用SCRIPT FLUSH清空缓存(谨慎操作)
  • SCRIPT EXISTS检测脚本是否存在
  • Redis默认使用LRU算法管理缓存

Lua与Redis数据类型转换

类型转换对照表:

Lua类型 Redis类型 转换规则
number integer 浮点数转为字符串
string bulk string 直接转换
table (array) multi-bulk reply 索引从1开始
table (key-value) nil 非数组table转换失败
boolean integer true=1, false=0/nil
nil nil 直接转换

特殊转换案例:

-- 返回多维数组
return {1, {2, 3}, {4, {5}}} 
-- 转换为:1) 1 2) 1) 2 2) 3 3) 1) 4 2) 1) 5

脚本调试技巧

使用redis-cli调试模式:

redis-cli --ldb --eval script.lua key1 key2 , arg1 arg2

调试命令清单:

  • step 单步执行
  • continue 继续运行
  • list 显示源码
  • print var 查看变量
  • break 设置断点

调试会话示例:

(lldb) list
1> local count = redis.call('GET', 'counter')
2> if not count then
3>   count = 0
4> end
5> redis.call('SET', 'counter', tonumber(count)+1)
(lldb) break 5
(lldb) print count
"0"

性能优化实践

  1. 局部变量优化
-- 错误写法
for i=1,100 do
    redis.call('SET', 'key'..i, i)
end

-- 正确写法
local cmd = redis.call
for i=1,100 do
    cmd('SET', 'key'..i, i)
end
  1. 管道优化技术
local results = {}
for i=1,10 do
    results[i] = redis.call('INCR', 'counter')
end
return results  -- 返回批量操作结果
  1. 内存复用技巧
local buffer = {}
for i=1,1000 do
    buffer[#buffer+1] = redis.call('HGET', 'hash', 'field'..i)
    if #buffer > 100 then
        -- 分批处理减少内存压力
        process(buffer)
        buffer = {}
    end
end

企业级应用场景

分布式锁加强版

local key = KEYS[1]
local client_id = ARGV[1]
local ttl = tonumber(ARGV[2])

local lock = redis.call('SET', key, client_id, 'NX', 'PX', ttl)
if lock then
    return 1
end

local current_id = redis.call('GET', key)
if current_id == client_id then
    redis.call('PEXPIRE', key, ttl)
    return 2
end

return 0

滑动时间窗限流

local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])

local clear_before = now - window
redis.call('ZREMRANGEBYSCORE', key, 0, clear_before)

local count = redis.call('ZCARD', key)
if count >= limit then
    return 0
end

redis.call('ZADD', key, now, now)
redis.call('EXPIRE', key, window)
return 1

实时数据聚合

local stats_key = KEYS[1]
local event_type = ARGV[1]
local timestamp = tonumber(ARGV[2])

-- 更新小时级统计
local hour_key = stats_key .. ":h:" .. math.floor(timestamp/3600)
redis.call('HINCRBY', hour_key, event_type, 1)
redis.call('EXPIRE', hour_key, 72*3600)  -- 保留3天

-- 更新分钟级统计(滑动窗口)
local minute_key = stats_key .. ":m"
redis.call('ZADD', minute_key, timestamp, event_type .. ":" .. timestamp)
redis.call('ZREMRANGEBYSCORE', minute_key, 0, timestamp-60)
return redis.call('ZCOUNT', minute_key, timestamp-60, timestamp)

高级特性应用

脚本级复制

使用redis.replicate_commands()开启精确复制模式:

redis.replicate_commands()
local counter = redis.call('INCR', 'counter')
if counter % 100 == 0 then
    redis.call('PUBLISH', 'alerts', 'Reached milestone: '..counter)
end
return counter

随机数生成

math.randomseed(tonumber(redis.call('TIME')[1]))
local random = math.random(100)
redis.call('SET', 'daily_seed', random)
return random

脚本自检机制

local function check_dependencies()
    if not redis.call('COMMAND', 'INFO', 'JSON.SET') then
        return redis.error_reply("Require RedisJSON module")
    end
    return true
end

if not check_dependencies() then
    return nil
end
-- 主业务逻辑

安全防护方案

  1. 执行时间监控
local start = redis.call('TIME')[1]
-- 业务逻辑
local used_time = redis.call('TIME')[1] - start
if used_time > 5 then  -- 超过5秒警告
    redis.log(redis.LOG_WARNING, "Long running script: "..used_time)
end
  1. 内存限制检查
local mem_usage = redis.call('INFO', 'memory')['used_memory']
if mem_usage > 100*1024*1024 then  -- 超过100MB
    return redis.error_reply("MEMORY_LIMIT_EXCEEDED")
end
  1. 调用白名单验证
local allowed_cmds = {['GET']=true, ['SET']=true}
local function check_cmd(cmd)
    if not allowed_cmds[cmd] then
        error("FORBIDDEN_COMMAND: "..cmd)
    end
end

check_cmd(redis.call('COMMAND', 'INFO', cmd))

故障排查指南

常见问题处理矩阵:

现象 可能原因 解决方案
NOSCRIPT错误 脚本未预加载 使用SCRIPT LOAD预先加载
BUSY错误 已有脚本执行超时 调整lua-time-limit配置
内存溢出 脚本产生大对象 使用分批次处理
性能低下 循环内调用redis命令 使用管道批量操作
副本数据不一致 未启用复制命令 添加redis.replicate_commands()

调试日志记录技巧:

redis.log(redis.LOG_DEBUG, "Processing user: "..user_id)
local debug_info = {
    keys = KEYS,
    args = ARGV,
    memory = redis.call('INFO', 'memory')
}
redis.call('PUBLISH', 'script_debug', cjson.encode(debug_info))
正文到此结束
评论插件初始化中...
Loading...