Redis与MySQL双写一致性:6大方案与实战代码

先更新数据库还是先删缓存?这是面试官最爱挖的坑。某大厂监控数据显示,每10次缓存更新操作就有3.7次出现数据不一致问题。双写一致性难题本质上是并发环境下数据操作顺序与网络延迟共同作用的结果。

经典缓存更新策略对比

// 典型错误写法示例
public void updateProduct(Product product) {
    // 先更新数据库
    productDao.update(product);
    // 立即删除缓存
    redis.del("product_" + product.getId());
}

这种写法存在严重隐患:当数据库主从同步延迟时,其他线程可能读取到旧数据重新写入缓存。某电商平台曾因此导致促销价格显示错误,造成百万级损失。

延时双删策略实现

// 引入延迟队列的改进方案
public void updateProductWithDelay(Product product) {
    // 第一次删除缓存
    redis.del("product_" + product.getId());
    // 更新数据库
    productDao.update(product);
    // 提交延迟任务
    delayQueue.add(() -> {
        // 二次删除缓存
        redis.del("product_" + product.getId());
    }, 1000); // 延迟1秒执行
}

某社交平台采用该方案后,数据不一致率从12%降至0.3%。关键参数设置建议:

  • 延迟时间 = 主从同步时间 + 200ms缓冲
  • 重试机制需配合消息队列实现

基于Binlog的最终一致性方案

# Canal服务配置示例
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.filter.regex=.*\\..*

Java客户端消费示例:

public class CanalClient {
    public static void main(String[] args) {
        CanalConnector connector = CanalConnectors.newClusterConnector(
            "127.0.0.1:2181", "example", "", "");
        
        while (true) {
            Message message = connector.getWithoutAck(100);
            for (CanalEntry.Entry entry : message.getEntries()) {
                if (entry.getEntryType() == EntryType.ROWDATA) {
                    // 解析数据变更
                    RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                    processRowChange(rowChange);
                }
            }
            connector.ack(message.getId());
        }
    }

    private static void processRowChange(RowChange rowChange) {
        if (rowChange.getEventType() == EventType.UPDATE) {
            for (RowData rowData : rowChange.getRowDatasList()) {
                // 构造缓存key
                String cacheKey = "product_" + rowData.getAfterColumns(0).getValue();
                // 删除或更新缓存
                redis.del(cacheKey);
            }
        }
    }
}

某金融系统使用该方案实现准实时(200ms内)数据同步,TPS达到12000+。关键优化点包括:

  1. 批量消息处理
  2. 缓存键智能构造
  3. 失败重试策略

分布式锁的强一致性方案

Redisson实现示例:

public void updateWithLock(Product product) {
    RLock lock = redisson.getLock("product_lock_" + product.getId());
    try {
        lock.lock();
        // 更新数据库
        productDao.update(product);
        // 删除缓存
        redis.del("product_" + product.getId());
    } finally {
        lock.unlock();
    }
}

某交易系统实测数据显示,该方案将数据一致性提升到99.999%,但带来约15%的性能损耗。推荐在资金交易等强一致性场景使用。

多级缓存架构设计

graph TD
    A[客户端] --> B[本地缓存]
    B --> C{命中?}
    C -->|是| D[返回数据]
    C -->|否| E[Redis集群]
    E --> F{存在?}
    F -->|是| G[返回并回填本地缓存]
    F -->|否| H[数据库查询]
    H --> I[写入Redis]
    I --> J[回填本地缓存]

某视频平台采用该架构后,数据库QPS下降83%,需要注意:

  • 本地缓存过期时间应短于Redis(建议1:3比例)
  • 使用Pub/Sub机制实现缓存失效广播

热点Key处理方案

// 使用Guava RateLimiter限流
public Product getProduct(String id) {
    String cacheKey = "product_" + id;
    Product product = redis.get(cacheKey);
    if (product == null) {
        RateLimiter limiter = RateLimiter.create(100); // 每秒100个请求
        if (limiter.tryAcquire()) {
            product = productDao.getById(id);
            redis.setex(cacheKey, 300, product);
        } else {
            // 降级策略
            return getDegradedProduct();
        }
    }
    return product;
}

某秒杀系统通过该方案将数据库压力从5000QPS降至200QPS,关键技术点:

  1. 动态限流阈值调整
  2. 本地缓存兜底
  3. 异步预热机制

数据版本控制方案

// 带版本号的缓存结构
public class CacheWrapper {
    private long version;
    private Object data;
    // getters/setters
}

// 更新时校验版本
public boolean updateProductWithVersion(Product product, long version) {
    String cacheKey = "product_" + product.getId();
    CacheWrapper wrapper = redis.get(cacheKey);
    if (wrapper.getVersion() != version) {
        return false;
    }
    // 执行数据库更新
    boolean success = productDao.update(product);
    if (success) {
        redis.del(cacheKey);
    }
    return success;
}

某协同编辑系统使用版本控制方案后,数据冲突率降低92%。关键实现细节:

  • 版本号使用乐观锁机制
  • 采用CAS(Compare and Swap)更新
  • 版本号生成使用混合时钟(物理时钟+逻辑时钟)

混合方案的选择策略

根据CAP理论,不同业务场景的解决方案选择:

场景 推荐方案 一致性级别 性能影响
用户画像 延时双删+本地缓存 最终一致
库存管理 分布式锁+版本控制 强一致
商品详情 Binlog监听+多级缓存 最终一致
订单支付 事务消息+同步双写 强一致

某大型电商平台的实际数据表明,混合方案可降低45%的运维成本,但需要建立完善的监控体系:

  1. 缓存命中率监控
  2. 数据同步延迟告警
  3. 版本冲突统计
  4. 锁竞争分析

异步更新队列设计

// 基于Disruptor的高性能队列
public class CacheUpdateEvent {
    private String key;
    private OperationType type;
}

public class CacheUpdateHandler implements EventHandler<CacheUpdateEvent> {
    public void onEvent(CacheUpdateEvent event, long sequence, boolean endOfBatch) {
        switch (event.getType()) {
            case DELETE:
                redis.del(event.getKey());
                break;
            case UPDATE:
                // 从数据库加载最新数据
                Object data = loadFromDB(event.getKey());
                redis.set(event.getKey(), data);
                break;
        }
    }
}

某物流平台使用该方案实现日均百亿级缓存操作,关键优化点:

  • 批量事件合并处理
  • 优先级队列分级
  • 失败重试策略(指数退避算法)

容灾与降级方案

// 熔断器实现示例
public class CircuitBreaker {
    private static final int FAILURE_THRESHOLD = 5;
    private static final long RETRY_TIMEOUT = 30000;
    
    private int failures = 0;
    private long lastFailureTime = 0;
    
    public boolean allowRequest() {
        if (failures >= FAILURE_THRESHOLD) {
            return System.currentTimeMillis() - lastFailureTime > RETRY_TIMEOUT;
        }
        return true;
    }
    
    public void recordFailure() {
        failures++;
        lastFailureTime = System.currentTimeMillis();
    }
    
    public void reset() {
        failures = 0;
    }
}

某金融系统通过熔断机制将故障恢复时间从30分钟缩短到2分钟,关键指标设置建议:

  • 失败阈值:根据业务TPS动态调整
  • 超时时间:逐步递增策略
  • 健康检查间隔:故障恢复后的观察期设置

测试策略设计

JUnit测试示例:

@Test
public void testCacheConsistency() {
    // 初始状态
    productDao.create(testProduct);
    redis.set("product_1", testProduct);
    
    // 并发更新
    ExecutorService executor = Executors.newFixedThreadPool(10);
    for (int i = 0; i < 100; i++) {
        executor.submit(() -> {
            productService.updateProduct(testProduct);
        });
    }
    
    // 验证最终一致性
    await().atMost(10, TimeUnit.SECONDS).until(() -> {
        Product dbProduct = productDao.getById(1);
        Product cacheProduct = redis.get("product_1");
        return dbProduct.equals(cacheProduct);
    });
}

某云计算平台测试覆盖率提升实践:

  1. 边界条件测试:网络超时、节点宕机
  2. 混沌工程:模拟网络分区、高延迟
  3. 压力测试:10万QPS下的一致性验证
  4. 自动化巡检:定时验证核心业务数据一致性
正文到此结束
评论插件初始化中...
Loading...