分布式系统中涉及到的缓存问题

本文的思维导图如下:

什么需要被缓存

为了系统性能的提升,我们一般都会将部分数据放入缓存中,加速访问。而 db 承担数据落盘工作。

哪些数据适合放入缓存?

  • 即时性、数据一致性要求不高的
  • 访问量大且更新频率不高的数据(读多,写少)

举例:电商类应用,商品分类,商品列表等适合缓存并加一个失效时间(根据数据更新频率 来定),后台如果发布一个商品,买家需要 5 分钟才能看到新的商品一般还是可以接受的。

data = cache.load(id); //从缓存加载数据
If(data == null){
    data = db.load(id); //从数据库加载数据
    cache.put(id,data); //保存到 cache 中
}
return data;

注意: 在开发中,凡是放入缓存中的数据我们都应该指定过期时间,使其可以在系统即使没有主动更新数据也能自动触发数据加载进缓存的流程。避免业务崩溃导致的数据永久不一致问题。

整合 redis 进行简单测试

使用 redis 作为缓存组件。

导入 redis 的依赖,因为 spring boot starter 默认配置的是使用 lettuce 作为操作 redis 的客户端, 它使用 netty 进行数据通信。默认的 lettuce 容易导致堆外内存溢出。这里将 lettuce 替换为 jedis。

<!--pom.xml-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <exclusions>
        <exclusion>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>

配置 redis 的主机地址

# application.yml
spring:
  redis:
    host: #redis-host
    port: #port, 默认 6379

注入 StringRedisTemplate 对 redis 进行操作。

测试代码

@SpringBootTest
class DemoApplicationTests {

  @Autowired
  StringRedisTemplate stringRedisTemplate;

  @Test
  public void test(){
    ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
    String productJson = ops.get("productJson");
    if(productJson==null){
      Product product = DB.read();
      String jsonString = JSON.toJSONString(product);
      ops.set("productJson",jsonString);
      // ops.set("productJson",productJson,10000); 根据业务需要有可能需要为缓存设置过期时间
      //  业务结束
      System.out.println(jsonString);
    }else{
      // 业务结束
      System.out.println(productJson);
    }
  }
}
@Data
class Product{
  private String name;
  public Product(String name) {
    this.name = name;
  }
}
class DB{
  public static Product read(){
    System.out.println("在数据库中查询");
    return new Product("MI");
  }
}

在使用 redis 做数据缓存时会大量使用到 JSON 序列化操作,建议学习。 推荐阅读:

fastjson学习笔记

StringRedisTemplate 的 常用 api 有:

//向redis里存入数据和设置缓存时间
stringRedisTemplate.opsForValue().set("test", "100",60*10,TimeUnit.SECONDS);

//val做-1操作
stringRedisTemplate.boundValueOps("test").increment(-1);

//根据key获取缓存中的val
stringRedisTemplate.opsForValue().get("test")

//val +1
stringRedisTemplate.boundValueOps("test").increment(1);

//根据key获取过期时间
stringRedisTemplate.getExpire("test")

//根据key获取过期时间并换算成指定单位
stringRedisTemplate.getExpire("test",TimeUnit.SECONDS)

//根据key删除缓存
stringRedisTemplate.delete("test");

//检查key是否存在,返回boolean值
stringRedisTemplate.hasKey("546545");

//向指定key中存放set集合
stringRedisTemplate.opsForSet().add("red_123", "1","2","3");

//设置过期时间
stringRedisTemplate.expire("red_123",1000 , TimeUnit.MILLISECONDS);

//根据key查看集合中是否存在指定数据
stringRedisTemplate.opsForSet().isMember("red_123", "1")

//根据key获取set集合
stringRedisTemplate.opsForSet().members("red_123");

高并发下的缓存失效问题

缓存穿透

也可以采用布隆过滤器的方式,布隆过滤器可以判定某个数据是否在 Redis 中。布隆过滤器判定数据不在 Redis 中,那么这个数据一定不在 Redis 中,可以减少到 Redis 中无效查询的次数。

缓存雪崩

另一种解释:由于缓存层承载着大量请求,有效地保护了存储层,但是如果缓存层由于某些原因不能提供服务,于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会级联宕机的情况。

缓存击穿

加锁解决缓存击穿问题

@Test
  public void test(){
    ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
    String productJson = ops.get("productJson");
    if(productJson==null){
      // 缓存不命中, 准备查询数据库
      String jsonString=null;
      synchronized (this){
        // 为什么需要在缓存中二次查询?
        // 之所以该线程抢到了锁是因为前一个线程释放了锁, 前一个线程对缓存进行了更新, 所以需要二次判断
        productJson = ops.get("productJson");
        if(productJson==null){
          // 从数据库中读取
          Product product = DB.read();
          jsonString = JSON.toJSONString(product);
          ops.set("productJson",jsonString);
          // ops.set("productJson",productJson,10000); 根据业务需要有可能需要为缓存设置过期时间
        }else{
          // 业务结束
          System.out.println(productJson);
        }
      }
    }else{
      // 缓存命中
      System.out.println(productJson);
    }
  }

分布式缓存的解决方法

分布式锁基础

本地缓存存在的问题

当有多个服务存在时,每个服务的缓存仅能够为本服务使用,这样每个服务都要查询一次数据库,并且当数据更新时只会更新单个服务的缓存数据,就会造成数据不一致的问题。

将所有模块的缓存都存放在 redis 中集中管理就可以解决这个问题。

分布式锁

本地锁只能锁住本地服务,这时候需要分布式锁。

在分布式系统中一般是通过 redis 来实现分布式锁。redis 实现分布式锁也是一个很值得讨论的问题,这里面也是大有学问。

阶段一

我们现在考虑如何使用 Redis单机 实现分布式锁。 最简单的方法是使用 Redis 提供的一条命令 SETNX。 该指令只在 key 不存在的情况下,将 key 的值设置为 value,若 key 已经存在,则 SETNX 命令不做任何动作。key 是锁的唯一标识,可以按照业务需要锁定的资源来命名。

比如在某商城的秒杀活动中对某一商品加锁,那么 key 可以设置为 lock ,value 可以设置为任意值,在资源使用完成后,使用 DEL 删除该 key 对锁进行释放,整个过程如下:

在 Java 中的代码实现如下:

public Map<String, List<Catalog2Vo>> getCatalogJsonDbWithRedisLock() {
        //阶段一
        Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "111");
        //获取到锁,执行业务
        if (lock) {
            Map<String, List<Catalog2Vo>> categoriesDb = getCategoryMap();
            //删除锁,如果在此之前报错或宕机会造成死锁
            stringRedisTemplate.delete("lock");
            return categoriesDb;
        }else {
            //没获取到锁,等待100ms重试
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return getCatalogJsonDbWithRedisLock();
        }
    }

public Map<String, List<Catalog2Vo>> getCategoryMap() {
        ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
        String catalogJson = ops.get("catalogJson");
        if (StringUtils.isEmpty(catalogJson)) {
            System.out.println("缓存不命中,准备查询数据库。。。");
            Map<String, List<Catalog2Vo>> categoriesDb= getCategoriesDb();
            String toJSONString = JSON.toJSONString(categoriesDb);
            ops.set("catalogJson", toJSONString);
            return categoriesDb;
        }
        System.out.println("缓存命中。。。。");
        Map<String, List<Catalog2Vo>> listMap = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Catalog2Vo>>>() {});
        return listMap;
    }

很显然,这种获取锁的方式很简单,但也存在一个问题,就是我们上面提到的分布式锁三个核心要素之一的锁超时问题,即如果获得锁的进程在业务逻辑处理过程中出现了异常,比如说机房断电,可能会导致 DEL 指令一直无法执行,导致锁无法释放,该资源将会永远被锁住。

阶段二

所以,在使用 SETNX 拿到锁以后,必须给 key 设置一个过期时间,以保证即使没有被显式释放,在获取锁达到一定时间后也要自动释放,防止资源被长时间独占。由于 SETNX 不支持设置过期时间,所以需要额外的 EXPIRE 指令,整个过程如下:

Java 代码实现如下:

public Map<String, List<Catalog2Vo>> getCatalogJsonDbWithRedisLock() {
        //设置锁
        Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "111");
        if (lock) {
            //设置过期时间
            stringRedisTemplate.expire("lock", 30, TimeUnit.SECONDS);
            Map<String, List<Catalog2Vo>> categoriesDb = getCategoryMap();
            stringRedisTemplate.delete("lock");
            return categoriesDb;
        }else {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return getCatalogJsonDbWithRedisLock();
        }
    }

但是即使是这样还是存在一个问题,setnx设置好,正要去设置过期时间,宕机。又死锁了。就该使得设置过期时间和上锁为一个原子操作。

阶段三

如何解决SETNX 和 EXPIRE 两个操作非原子性的问题?Redis 提供了另外的命令来实现这两条命令的原子性操作。可以使用 Redis 的 SET 指令的扩展参数来实现原子性操作。

在这个 SET 指令中:

  • NX 表示只有当 lock_resource_id 对应的 key 值不存在的时候才能 SET 成功。保证了只有第一个请求的客户端才能获得锁,而其它客户端在锁被释放之前都无法获得锁。
  • EX 10 表示这个锁 10 秒钟后会自动过期,业务可以根据实际情况设置这个时间的大小。

Java 代码实现如下

public Map<String, List<Catalog2Vo>> getCatalogJsonDbWithRedisLock() {
    //加锁的同时设置过期时间,二者是原子性操作
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111",5, TimeUnit.SECONDS);
    if (lock) {
        Map<String, List<Catalog2Vo>> categoriesDb = getCategoryMap();
        //模拟超长的业务执行时间
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        stringRedisTemplate.delete("lock");
        return categoriesDb;
    }else {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return getCatalogJsonDbWithRedisLock();
    }
}

但是这种方式仍然不能彻底解决分布式锁超时问题:

  • 锁被提前释放。假如线程 A 在加锁和释放锁之间的逻辑执行的时间过长(或者线程 A 执行过程中被堵塞),以至于超出了锁的过期时间后进行了释放,但线程 A 在临界区的逻辑还没有执行完,那么这时候线程 B 就可以提前重新获取这把锁,导致临界区代码不能严格的串行执行。
  • 锁被误删。假如以上情形中的线程 A 执行完后,它并不知道此时的锁持有者是线程 B,线程 A 会继续执行 DEL 指令来释放锁,如果线程 B 在临界区的逻辑还没有执行完,线程 A 实际上释放了线程 B 的锁。

阶段四

为了避免以上情况,建议不要在执行时间过长的场景中使用 Redis 分布式锁,同时一个比较安全的做法是在执行 DEL 释放锁之前对锁进行判断,验证当前锁的持有者是否是自己。

具体实现就是在加锁时将 value 设置为一个唯一的随机数(UUID),释放锁时先判断随机数是否一致,然后再执行释放操作,确保不会错误地释放其它线程持有的锁,除非是锁过期了被服务器自动释放,整个过程如下:

Java 代码实现如下:

public Map<String, List<Catalog2Vo>> getCatalogJsonDbWithRedisLock() {
        String uuid = UUID.randomUUID().toString();
        ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
     	//为当前锁设置唯一的uuid,只有当uuid相同时才会进行删除锁的操作
        Boolean lock = ops.setIfAbsent("lock", uuid,5, TimeUnit.SECONDS);
        if (lock) {
            Map<String, List<Catalog2Vo>> categoriesDb = getCategoryMap();
            String lockValue = ops.get("lock");
            if (lockValue.equals(uuid)) {
                try {
                    Thread.sleep(6000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                stringRedisTemplate.delete("lock");
            }
            return categoriesDb;
        }else {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return getCatalogJsonDbWithRedisLock();
        }
    }

阶段五(最终版)

但判断 value 和删除 key 是两个独立的操作,并不是原子性的,所以这个地方需要使用 Lua 脚本进行处理,因为 Lua 脚本可以保证连续多个指令的原子性执行。

Java 代码实现如下

public Map<String, List<Catalog2Vo>> getCatalogJsonDbWithRedisLock() {
        String uuid = UUID.randomUUID().toString();
        ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
        Boolean lock = ops.setIfAbsent("lock", uuid,5, TimeUnit.SECONDS);
        if (lock) {
            Map<String, List<Catalog2Vo>> categoriesDb = getCategoryMap();
            String lockValue = ops.get("lock");
            String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +
                    "    return redis.call(\"del\",KEYS[1])\n" +
                    "else\n" +
                    "    return 0\n" +
                    "end";
            stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), lockValue);
            return categoriesDb;
        }else {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return getCatalogJsonDbWithRedisLock();
        }
    }

总结而言,Redis 单节点实现分布式锁最重要的问题是 实现加锁和删除锁的原子性

基于 Redis 单节点的分布式锁基本完成了,但是这并不是一个完美的方案,只是相对完全一点,因为它并没有完全解决当前线程执行超时锁被提前释放后,其它线程乘虚而入的问题。

Redisson

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
Github:redisson/redisson

在本节中主要关注 Redisson 在分布式锁上的应用。

导入依赖:

<!--pom.xml-->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.4</version>
</dependency>

在容器中注入 Redisson 客户端:

@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
        //redis://127.0.0.1:7181
        //可以用"rediss://"来启用 SSL 连接
        config.useSingleServer().setAddress("redis://115.159.148.114:6380");
        RedissonClient redisson = Redisson.create(config);
        return redisson;
    }
}

可重入锁(Reentrant Lock)

什么是可重入锁?什么是不可重入锁?

假设存在两个方法,方法A,方法B,存在两把锁,锁1,锁2。

  • 可重入锁:A需要锁1,B方法需要锁2。这两个方法不会发生死锁。
  • 不可重入锁:A需要锁1,B也需要锁1。这两个方法可能发生死锁。
public Map<String, List<Catalog2Vo>> getCatalogJsonDbWithRedisson() {
        Map<String, List<Catalog2Vo>> categoryMap=null;
        // 只要名字相同就是同一把锁
        RLock lock = redissonClient.getLock("CatalogJson-Lock");
        // 上锁
        lock.lock();
        try {
            Thread.sleep(30000);
            categoryMap = getCategoryMap();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
            return categoryMap;
        }
    }

大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期,只要占锁成功就会启动一个定时任务,重新给锁设置过期时间。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。每隔10秒钟都会重新对锁进行续期。

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。不会自动续期!

// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁,如果要手动解锁一定要确保业务执行时间小于锁的失效时间
lock.lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}

在实际开发中一般是给锁限定一个过期时间,而不是采用默认的续期方式。

读写锁(ReadWriteLock)

经典进程同步问题 中的读写者模型类似。

  1. 可以多个读者同时读
  2. 但是不允许多个写者同时写, 只允许一个写者写
  3. 不允许在写的时候读, 也不能有新的写者来写
  4. 写者执行写之前, 不应该有读者或写者在操作

当然,Redisson 为我们封装了读写操作,我们不需要去关心线程的互斥问题。

@GetMapping("/read")
    @ResponseBody
    public String read() {
        RReadWriteLock lock = redissonClient.getReadWriteLock("ReadWrite-Lock");
        RLock rLock = lock.readLock();
        String s = "";
        try {
            rLock.lock();
            System.out.println("读锁加锁"+Thread.currentThread().getId());
            Thread.sleep(5000);
            s= redisTemplate.opsForValue().get("lock-value");
        }finally {
            rLock.unlock();
            return "读取完成:"+s;
        }
    }

    @GetMapping("/write")
    @ResponseBody
    public String write() {
        RReadWriteLock lock = redissonClient.getReadWriteLock("ReadWrite-Lock");
        RLock wLock = lock.writeLock();
        String s = UUID.randomUUID().toString();
        try {
            wLock.lock();
            System.out.println("写锁加锁"+Thread.currentThread().getId());
            Thread.sleep(10000);
            redisTemplate.opsForValue().set("lock-value",s);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            wLock.unlock();
            return "写入完成:"+s;
        }
    }

信号量(Semaphore)

信号量为存储在redis中的一个数字,当这个数字大于0时,即可以调用acquire()方法增加数量,也可以调用release()方法减少数量,但是当调用release()之后小于0的话方法就会阻塞,直到数字大于0。

常常用于限定流量的服务,当服务器无法承受高并发量的服务时可以限定流量。

@GetMapping("/park")
@ResponseBody
public String park() {
    RSemaphore park = redissonClient.getSemaphore("park");
    try {
        // 阻塞方法
        park.acquire(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "停进2";
}

@GetMapping("/go")
@ResponseBody
public String go() {
    RSemaphore park = redissonClient.getSemaphore("park");
    park.release(2);
    return "开走2";
}

闭锁(CountDownLatch)

相当于教室门,只有当学生走完了,教室门才可以关闭。

以下代码只有offLatch()被调用5次后 setLatch()才能继续执行

@GetMapping("/setLatch")
    @ResponseBody
    public String setLatch() {
        RCountDownLatch latch = redissonClient.getCountDownLatch("CountDownLatch");
        try {
            latch.trySetCount(5);
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "门栓被放开";
    }

    @GetMapping("/offLatch")
    @ResponseBody
    public String offLatch() {
        RCountDownLatch latch = redissonClient.getCountDownLatch("CountDownLatch");
        latch.countDown();
        return "门栓被放开1";
    }

缓存数据的一致性

缓存的数据如何保证与数据库中的一致。

双写模式

当数据更新时,更新数据库时同时更新缓存

存在的问题: 由于卡顿等原因,导致写缓存2在最前,写缓存1在后面就出现了不一致

这是暂时性的脏数据问题,但是在数据稳定,缓存过期以后,又能得到最新的正确数据。

失效模式

数据库更新时将缓存删除

当两个请求同时修改数据库,一个请求已经更新成功并删除缓存时又有读数据的请求进来,这时候发现缓存中无数据就去数据库中查询并放入缓存,在放入缓存前第二个更新数据库的请求成功,这时候留在缓存中的数据依然是第一次数据更新的数据

解决方法:

1、缓存的所有数据都有过期时间,数据过期下一次查询触发主动更新 2、读写数据的时候(并且写的不频繁),加上分布式的读写锁。

解决方案

无论是双写模式还是失效模式,都会导致缓存的不一致问题。即多个实例同时更新会出事。怎么办?

  • 如果是用户纬度数据(订单数据、用户数据),这种并发几率非常小,不用考虑这个问题,缓存数据加上过期时间,每隔一段时间触发读的主动更新即可

  • 如果是菜单,商品介绍等基础数据,也可以去使用canal订阅binlog的方式。

  • 缓存数据+过期时间也足够解决大部分业务对于缓存的要求。

  • 通过加锁保证并发读写,写写的时候按顺序排好队。读读无所谓。所以适合使用读写锁。(业务不关心脏数据,允许临时脏数据可忽略);

总结:

  • 我们能放入缓存的数据本就不应该是实时性、一致性要求超高的。所以缓存数据的时候加上过期时间,保证每天拿到当前最新数据即可。
  • 我们不应该过度设计,增加系统的复杂性
  • 遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点。

SpringCache

简介

  • Spring 从 3.1 开始定义了 org.springframework.cache.Cache 和 org.springframework.cache.CacheManager 接口来统一不同的缓存技术; 并支持使用 JCache(JSR-107)注解简化我们开发;

  • Cache 接口为缓存的组件规范定义,包含缓存的各种操作集合; Cache 接口下 Spring 提供了各种 xxxCache 的实现;如 RedisCache , EhCacheCache , ConcurrentMapCache 等;

  • 每次调用需要缓存功能的方法时,Spring 会检查检查指定参数的指定的目标方法是否已 经被调用过;如果有就直接从缓存中获取方法调用后的结果,如果没有就调用方法并缓 存结果后返回给用户。下次调用直接从缓存中获取。

  • 使用 Spring 缓存抽象时我们需要关注以下两点;

    1. 确定方法需要被缓存以及他们的缓存策略
    2. 从缓存中读取之前缓存存储的数据

一个 CacheManager 管理着多种缓存 Cache。CacheManager 定义了一些规则,造出的缓存组件,这些缓存组件才是真正给缓存 CRUD 数据的。 RedissonSpringCacheManager 实现了 CacheManager,可以对存放在 Redis 的缓存进行管理。

简单上手

  1. 导入依赖
<!--pom.xml-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
	<exclusions>
		<exclusion>
			<groupId>io.lettuce</groupId>
			<artifactId>lettuce-core</artifactId>
		</exclusion>
	</exclusions>
</dependency>
<dependency>
	<groupId>redis.clients</groupId>
	<artifactId>jedis</artifactId>
</dependency>
  1. 添加配置
spring:
  redis:
    host: 192.168.206.131
  cache:
    type:redis
  1. 主类添加 @EnableCaching 注解开启缓存

示例代码

@SpringBootTest
@EnableCaching
class DemoApplicationTests {

	@Autowired
	StringRedisTemplate stringRedisTemplate;
	@Autowired
	ProductService productService;

	@Test
	public void test(){
		Product product = productService.readFromDb();
		System.out.println(product);
	}
}
@Service
class ProductService{

	@Cacheable(cacheNames = "product")
	public Product readFromDb(){
		System.out.println("在数据库中查询");
		return new DB().read();
	}
}
@Data
class Product implements Serializable {
	private String name;
	public Product(String name) {
		this.name = name;
	}
}
class DB{
	public Product read(){
		return new Product("MI");
	}
}

第一次执行查询

在 Redis 中新增了一个名为 product 的键值对

第二次执行,可见并没有执行 readFromDb() 方法,是直接从 Redis 中读取的缓存。

更多

SpringCache 提供的注解:

每一个需要缓存的数据需要我们来指定要放到哪个名字的缓存,缓存分区按照业务类型来划分。

@Cacheable(cacheNames = “product”) 代表当前方法的结果需要缓存,如果缓存中有方法就不调用。如果缓存中没有会调用方法,最后将方法的结果放入缓存。

默认行为:

  1. 如果缓存中存在该方法的缓存,则方法不调用
  2. key 默认生成,缓存的名字::SimpleKey[] 自动生成的 key 值
  3. 缓存的 value 的值,默认使用 jdk 序列化机制,将序列化后的数据存放到 redis
  4. 默认 ttl 的时间为 -1

自定义:

  1. 指定生成的缓存使用的 key。key 属性指定,接收一个 SpEL:

  2. 指定缓存数据的 ttl: 配置文件中修改 ttl

  3. 将数据保存为 json 格式

前置原理:

CacheAutoConfiguration → RedisCacheConfiguration → 自动配置了 RedisCacheManager → 初始化所有的缓存 → 每个缓存决定使用什么配置 → 如果 redisCacheConfiguration 有就用已有的,没有就用默认配置单 →想要修改缓存的配置,只需要给容器中放一个 RedisCacheConfiguration 即可 →就会应用到当前 RedisCacheManager 管理的所有缓存分区中。

注入 RedisCacheConfiguration 的代码

@Configuration
@EnableCaching
@EnableConfigurationProperties(CacheProperties.class)
public class MyCacheConfig {
	@Bean
	public org.springframework.data.redis.cache.RedisCacheConfiguration redisCacheConfiguration(
			CacheProperties cacheProperties) {
		CacheProperties.Redis redisProperties = cacheProperties.getRedis();
		org.springframework.data.redis.cache.RedisCacheConfiguration config = org.springframework.data.redis.cache.RedisCacheConfiguration
				.defaultCacheConfig();
		//指定缓存序列化方式为json
		config = config.serializeValuesWith(
				RedisSerializationContext.SerializationPair.fromSerializer(new GenericFastJsonRedisSerializer()));
		//设置配置文件中的各项配置,如过期时间
		if (redisProperties.getTimeToLive() != null) {
			config = config.entryTtl(redisProperties.getTimeToLive());
		}
		if (redisProperties.getKeyPrefix() != null) {
			config = config.prefixKeysWith(redisProperties.getKeyPrefix());
		}
		if (!redisProperties.isCacheNullValues()) {
			config = config.disableCachingNullValues();
		}
		if (!redisProperties.isUseKeyPrefix()) {
			config = config.disableKeyPrefix();
		}
		return config;
	}
}

测试结果如下

项目中如何使用

//调用该方法时会将结果缓存,缓存名为category,key为方法名
	//表示该方法的缓存被读取时会加锁
	@Cacheable(value = {"category"},key = "#root.methodName",sync = true)
    public Map<String, List<Catalog2Vo>> getCatalogJsonDbWithSpringCache() {
        return getCategoriesDb();
    }

	//调用该方法会删除缓存category下的所有cache
    @Override
    @CacheEvict(value = {"category"},allEntries = true)
    public void updateCascade(CategoryEntity category) {
        this.updateById(category);
        if (!StringUtils.isEmpty(category.getName())) {
            categoryBrandRelationService.updateCategory(category);
        }
    }

如何级联更新所有关联的数据

@CacheEvict: 失效模式

  1. 同时进行多种缓存操作
  2. 指定删除分区下的所有数据 @CacheEvict( value = “catagory”, allEntries = true)
  3. 存储同一类型的数据都可以指定成同一个分区。分区名默认是缓存的前缀。

SpringCache 的不足之处

读模式:

  1. 缓存穿透:查询一个null数据。解决方案:缓存空数据,可通过spring.cache.redis.cache-null-values=true
  2. 缓存击穿:大量并发进来同时查询一个正好过期的数据。解决方案:加锁 ? 默认是无加锁的; 在 Cacheable 中添加 sync = true 来解决击穿问题。
  3. 缓存雪崩:大量的key同时过期。解决:加随机时间。加上过期时间。

写模式(缓存与数据库的一致性问题):

  1. 读写加锁
  2. 引入Canal,感知到MySQL的更新去更新Redis
  3. 读多写多,直接去数据库查询就行

总结

常规数据,读多写少,即时性,一致性要求不高的数据,完全可以使用Spring-Cache。

写模式,只要缓存的数据有过期时间就足够了。

参考资料