Redis缓存问题
in Redis with 0 comment

Redis缓存问题

in Redis with 0 comment

为什么要理解Redis缓存问题

使用Redis可以提高查询效率,一定程度上可以减轻数据库服务器的压力,从而保护了数据库。

通常,应用Redis的场景有:

一旦使用Redis,就会导致Redis和数据库中都存在同样的数据,当数据发生变化时,可能出现不一致的问题!

所以,还有某些数据在特定的场景中不能使用Redis:

需要学会评估是否要求数据一定保持一致!

要使用Redis缓存数据,至少需要:

在使用Redis之前,还必须明确一些问题:

在高并发的业务场景下,数据库大多数情况都是用户并发访问最薄弱的环节。所以,就需要使用redis做一个缓冲操作,让请求先访问到redis,而不是直接访问Mysql等数据库。这样可以大大缓解数据库的压力。
当缓存库出现时,必须要考虑如下问题:

缓存穿透

问题来源

缓存穿透是指缓存和数据库中都没有的数据,而用户不断发起请求。由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。

在流量大时,可能DB就挂掉了,要是有人利用不存在的key频繁攻击我们的应用,这就是漏洞。

如发起为id为“-1”的数据或id为特别大不存在的数据。这时的用户很可能是攻击者,攻击会导致数据库压力过大。

解决方案

  1. 接口层增加校验,如用户鉴权校验,id做基础校验,id<=0的直接拦截;
  2. 从缓存取不到的数据,在数据库中也没有取到,这时也可以将key-value对写为key-null,缓存有效时间可以设置短点,如30秒(设置太长会导致正常情况也没法使用)。这样可以防止攻击用户反复用同一个id暴力攻击 .
  3. 布隆过滤器。bloomfilter就类似于一个hash set,用于快速判某个元素是否存在于集合中,其典型的应用场景就是快速判断一个key是否存在于某容器,不存在就直接返回。布隆过滤器的关键就在于hash算法和容器大小.

缓存击穿

问题来源

缓存击穿是指缓存中没有但数据库中有的数据(一般是缓存时间到期),这时由于并发用户特别多,同时读缓存没读到数据,又同时去数据库去取数据,引起数据库压力瞬间增大,造成过大压力。

解决方案

1、设置热点数据永远不过期。
2、接口限流与熔断,降级。重要的接口一定要做好限流策略,防止用户恶意刷接口,同时要降级准备,当接口中的某些 服务 不可用时候,进行熔断,失败快速返回机制。
3、加互斥锁

缓存雪崩

问题来源

缓存雪崩是指缓存中数据大批量到过期时间,而查询数据量巨大,引起数据库压力过大甚至down机。和缓存击穿不同的是,缓存击穿指并发查同一条数据,缓存雪崩是不同数据都过期了,很多数据都查不到从而查数据库。

解决方案

  1. 缓存数据的过期时间设置随机,防止同一时间大量数据过期现象发生。

  2. 如果缓存数据库是分布式部署,将热点数据均匀分布在不同的缓存数据库中。

  3. 设置热点数据永远不过期。

缓存污染(或满了)

问题来源

缓存污染问题说的是缓存中一些只会被访问一次或者几次的的数据,被访问完后,再也不会被访问到,但这部分数据依然留存在缓存中,消耗缓存空间。 缓存污染会随着数据的持续增加而逐渐显露,随着服务的不断运行,缓存中会存在大量的永远不会再次被访问的数据。缓存空间是有限的,如果缓存空间满了,再往缓存里写数据时就会有额外开销,影响Redis性能。这部分额外开销主要是指写的时候判断淘汰策略,根据淘汰策略去选择要淘汰的数据,然后进行删除操作。

解决方案

最大缓存设置多大

系统的设计选择是一个权衡的过程:大容量缓存是能带来性能加速的收益,但是成本也会更高,而小容量缓存不一定就起不到加速访问的效果。一般来说,我会建议把缓存容量设置为总数据量的 15% 到 30%,兼顾访问性能和内存空间开销。

对于 Redis 来说,一旦确定了缓存最大容量,比如 4GB,你就可以使用下面这个命令来设定缓存的大小了:

CONFIG SET maxmemory 4gb

不过,缓存被写满是不可避免的, 所以需要数据淘汰策略。

缓存淘汰策略

Redis共支持八种淘汰策略,分别是noeviction、volatile-random、volatile-ttl、volatile-lru、volatile-lfu、allkeys-lru、allkeys-random 和 allkeys-lfu 策略。

怎么理解呢?主要看分三类看:

  1. 随机:volatile-random
  2. ttl:volatile-ttl
  3. lru:volatile-lru
  4. lfu:volatile-lfu
  1. 随机:allkeys-random
  2. lru: allkeys-lru
  3. lfu:allkeys-lfu

具体对照如下:

  1. noeviction
    该策略是Redis的默认策略。在这种策略下,一旦缓存被写满了,再有写请求来时,Redis 不再提供服务,而是直接返回错误。这种策略不会淘汰数据,所以无法解决缓存污染问题。一般生产环境不建议使用。
  2. volatile-random
    这个算法比较简单,在设置了过期时间的键值对中,进行随机删除。因为是随机删除,无法把不再访问的数据筛选出来,所以可能依然会存在缓存污染现象,无法解决缓存污染问题。
  3. volatile-ttl
    这种算法判断淘汰数据时参考的指标比随机删除时多进行一步过期时间的排序。Redis在筛选需删除的数据时,越早过期的数据越优先被选择。
  4. volatile-lru
    LRU算法:LRU 算法的全称是 Least Recently Used,按照最近最少使用的原则来筛选数据。这种模式下会使用 LRU 算法筛选设置了过期时间的键值对。

Redis优化的LRU算法实现: Redis会记录每个数据的最近一次被访问的时间戳。在Redis在决定淘汰的数据时,第一次会随机选出 N 个数据,把它们作为一个候选集合。接下来,Redis 会比较这 N 个数据的 lru 字段,把 lru 字段值最小的数据从缓存中淘汰出去。通过随机读取待删除集合,可以让Redis不用维护一个巨大的链表,也不用操作链表,进而提升性能。
Redis 选出的数据个数 N,通过 配置参数 maxmemory-samples 进行配置。个数N越大,则候选集合越大,选择到的最久未被使用的就更准确,N越小,选择到最久未被使用的数据的概率也会随之减小。

  1. volatile-lfu
    会使用 LFU 算法选择设置了过期时间的键值对。

LFU 算法:LFU 缓存策略是在 LRU 策略基础上,为每个数据增加了一个计数器,来统计这个数据的访问次数。当使用 LFU 策略筛选淘汰数据时,首先会根据数据的访问次数进行筛选,把访问次数最低的数据淘汰出缓存。如果两个数据的访问次数相同,LFU 策略再比较这两个数据的访问时效性,把距离上一次访问时间更久的数据淘汰出缓存。
Redis的LFU算法实现:当 LFU 策略筛选数据时,Redis 会在候选集合中,根据数据 lru 字段的后 8bit 选择访问次数最少的数据进行淘汰。当访问次数相同时,再根据 lru 字段的前 16bit 值大小,选择访问时间最久远的数据进行淘汰。
Redis 只使用了 8bit 记录数据的访问次数,而 8bit 记录的最大值是 255,这样在访问快速的情况下,如果每次被访问就将访问次数加一,很快某条数据就达到最大值255,可能很多数据都是255,那么退化成LRU算法了。所以Redis为了解决这个问题,实现了一个更优的计数规则,并可以通过配置项,来控制计数器增加的速度。
参数:
lfu-log-factor ,用计数器当前的值乘以配置项 lfu_log_factor 再加 1,再取其倒数,得到一个 p 值;然后,把这个 p 值和一个取值范围在(0,1)间的随机数 r 值比大小,只有 p 值大于 r 值时,计数器才加 1。
lfu-decay-time, 控制访问次数衰减。LFU 策略会计算当前时间和数据最近一次访问时间的差值,并把这个差值换算成以分钟为单位。然后,LFU 策略再把这个差值除以 lfu_decay_time 值,所得的结果就是数据 counter 要衰减的值。
lfu-log-factor设置越大,递增概率越低,lfu-decay-time设置越大,衰减速度会越慢。
我们在应用 LFU 策略时,一般可以将 lfu_log_factor 取值为 10。
如果业务应用中有短时高频访问的数据的话,建议把 lfu_decay_time 值设置为 1。可以快速衰减访问次数。
volatile-lfu 策略是 Redis 4.0 后新增。

  1. allkeys-lru
    使用 LRU 算法在所有数据中进行筛选。具体LFU算法跟上述 volatile-lru 中介绍的一致,只是筛选的数据范围是全部缓存,这里就不在重复。
  2. allkeys-random
    从所有键值对中随机选择并删除数据。volatile-random 跟 allkeys-random算法一样,随机删除就无法解决缓存污染问题。
  3. allkeys-lfu
    使用 LFU 算法在所有数据中进行筛选。具体LFU算法跟上述 volatile-lfu 中介绍的一致,只是筛选的数据范围是全部缓存,这里就不在重复。allkeys-lfu 策略是 Redis 4.0 后新增。

数据库和缓存一致性

问题来源

使用redis做一个缓冲操作,让请求先访问到redis,而不是直接访问MySQL等数据库:

image-1655277453700

读取缓存步骤一般没有什么问题,但是一旦涉及到数据更新:数据库和缓存更新,就容易出现缓存(Redis)和数据库(MySQL)间的数据一致性问题。

不管是先写MySQL数据库,再删除Redis缓存;还是先删除缓存,再写库,都有可能出现数据不一致的情况。举一个例子:

1.如果删除了缓存Redis,还没有来得及写库MySQL,另一个线程就来读取,发现缓存为空,则去数据库中读取数据写入缓存,此时缓存中为脏数据。
2.如果先写了库,在删除缓存前,写库的线程宕机了,没有删除掉缓存,则也会出现数据不一致情况。

因为写和读是并发的,没法保证顺序,就会出现缓存和数据库的数据不一致的问题。

4种相关模式

更新缓存的的Design Pattern有四种:Cache aside, Read through, Write through, Write behind caching;

节选最最常用的Cache Aside Pattern, 总结来说就是

其具体逻辑如下:

image-1655277657059

image-1655277662372

注意,我们的更新是先更新数据库,成功后,让缓存失效。那么,这种方式是否可以没有文章前面提到过的那个问题呢?我们可以脑补一下。

一个是查询操作,一个是更新操作的并发,首先,没有了删除cache数据的操作了,而是先更新了数据库中的数据,此时,缓存依然有效,所以,并发的查询操作拿的是没有更新的数据,但是,更新操作马上让缓存的失效了,后续的查询操作再把数据从数据库中拉出来。而不会像文章开头的那个逻辑产生的问题,后续的查询操作一直都在取老的数据。

那么,是不是Cache Aside这个就不会有并发问题了?不是的,比如,一个是读操作,但是没有命中缓存,然后就到数据库中取数据,此时来了一个写操作,写完数据库后,让缓存失效,然后,之前的那个读操作再把老的数据放进去,所以,会造成脏数据。

但,这个case理论上会出现,不过,实际上出现的概率可能非常低,因为这个条件需要发生在读缓存时缓存失效,而且并发着有一个写操作。而实际上数据库的写操作会比读操作慢得多,而且还要锁表,而读操作必需在写操作前进入数据库操作,而又要晚于写操作更新缓存,所有的这些条件都具备的概率基本并不大。

所以,这也就是Quora上的那个答案里说的,要么通过2PC或是Paxos协议保证一致性,要么就是拼命的降低并发时脏数据的概率,而Facebook使用了这个降低概率的玩法,因为2PC太慢,而Paxos太复杂。当然,最好还是为缓存设置上过期时间。

方案一:队列+重试机制

image-1655277758053

流程如下所示

然而,该方案有一个缺点,对业务线代码造成大量的侵入。于是有了方案二,在方案二中,启动一个订阅程序去订阅数据库的binlog,获得需要操作的数据。在应用程序中,另起一段程序,获得这个订阅程序传来的信息,进行删除缓存操作。

方案二:异步更新缓存(基于订阅binlog的同步机制)

image-1655277840798

  1. 技术整体思路:

MySQL binlog增量订阅消费+消息队列+增量数据更新到redis
1)读Redis:热数据基本都在Redis
2)写MySQL: 增删改都是操作MySQL
3)更新Redis数据:MySQ的数据操作binlog,来更新到Redis

  1. Redis更新

1)数据操作主要分为两大块:

2)读取binlog后分析 ,利用消息队列,推送更新各台的redis缓存数据。

这样一旦MySQL中产生了新的写入、更新、删除等操作,就可以把binlog相关的消息推送至Redis,Redis再根据binlog中的记录,对Redis进行更新。

其实这种机制,很类似MySQL的主从备份机制,因为MySQL的主备也是通过binlog来实现的数据一致性。

这里可以结合使用canal(阿里的一款开源框架),通过该框架可以对MySQL的binlog进行订阅,而canal正是模仿了mysql的slave数据库的备份请求,使得Redis的数据更新达到了相同的效果。

当然,这里的消息推送工具你也可以采用别的第三方:kafka、rabbitMQ等来实现推送更新Redis。

实例

暂定目标:

cn.tedu.csmall.product.webapi.repository创建ICategoryRedisRepository接口,并在接口中添加抽象方法:

public interface ICategoryRedisRepository {

    String KEY_CATEGORY_ITEM_PREFIX = "categories:item:";
    
    // 将类别详情存入到Redis中
    void save(CategoryDetailsVO category);
    
    // 根据类别id获取类别详情
    CategoryDetailsVO getDetailsById(Long id);
    
}

然后在cn.tedu.csmall.product.webapi.repository.impl创建CategoryRedisRepositoryImpl(接口的实现类),实现以上接口:

@Repository
public class CategoryRedisRepositoryImpl implements ICategoryRedisRepository {
    
    @Autowired
    private RedisTemplate<String, Serilizalbe> redisTemplate;
    
    @Override
    public void save(CategoryDetailsVO category) {
        String key = KEY_CATEGORY_ITEM_PREFIX + category.getId();
        redisTemplate.opsForValue().set(key, category);
    }
    
    @Override
    public CategoryDetailsVO getDetailsById(Long id) {
        String key = KEY_CATEGORY_ITEM_PREFIX + id;
        Serializable result = redisTemplate.opsForValue().get(key);
        if (result == null) {
            return null;
        } else {
            CategoryDetailsVO category = (CategoryDetailsVO) result;
            return category;
        }
    }
}

完成后,测试:

package cn.tedu.csmall.product.webapi.repository;

import cn.tedu.csmall.pojo.vo.CategoryDetailsVO;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class CategoryRedisRepositoryTests {

    @Autowired
    ICategoryRedisRepository repository;

    @Test
    void testGetDetailsByIdSuccessfully() {
        testSave();

        Long id = 10L;
        CategoryDetailsVO category = repository.getDetailsById(id);
        Assertions.assertNotNull(category);
    }

    @Test
    void testGetDetailsByIdReturnNull() {
        Long id = -1L;
        CategoryDetailsVO category = repository.getDetailsById(id);
        Assertions.assertNull(category);
    }

    private void testSave() {
        CategoryDetailsVO category = new CategoryDetailsVO();
        category.setId(10L);
        category.setName("家用电器");

        Assertions.assertDoesNotThrow(() -> {
            repository.save(category);
        });
    }
}

然后,需要修改CategoryServiceImpl中的实现:

@Autowired
private ICategoryRedisRepository categoryRedisRepository;

@Override
public CategoryDetailsVO getDetailsById(Long id) {
    // ===== 以下是原有代码,只从数据库中获取数据 =====
    // CategoryDetailsVO category = categoryMapper.getDetailsById(id);
    // if (category == null) {
    //     throw new ServiceException(State.ERR_CATEGORY_NOT_FOUND,
    //             "获取类别详情失败,尝试访问的数据不存在!");
    // }
    // return category;
    
    // ===== 以下是新的业务,将从Redis中获取数据 =====
    // 从repsotiroy中调用方法,根据id获取缓存的数据
    // 判断缓存中是否存在与此id对应的key
    // 有:表示明确的存入过某数据,此数据可能是有效数据,也可能是null
    // -- 判断此key对应的数据是否为null
    // -- 是:表示明确的存入了null值,则此id对应的数据确实不存在,则抛出异常
    // -- 否:表示明确的存入了有效数据,则返回此数据即可
    
    // 无:表示从未向缓存中写入此id对应的数据,在数据库中,此id可能存在数据,也可能不存在
    // 从mapper中调用方法,根据id获取数据库的数据
    // 判断从数据库中获取的结果是否为null
    // 是:数据库也没有此数据,先向缓存中写入错误数据(null),再抛出异常
    
    // 将从数据库中查询到的结果存入到缓存中
    // 返回查询结果
}

为了避免缓存穿透,需要在ICategoryRedisRepository中添加2个抽象方法:

/**
 * 判断是否存在id对应的缓存数据
 *
 * @param id 类别id
 * @return 存在则返回true,否则返回false
 */
boolean exists(Long id);

/**
 * 向缓存中写入某id对应的空数据(null),此方法主要用于解决缓存穿透问题
 *
 * @param id 类别id
 */
void saveEmptyValue(Long id);

并在CategoryRedisRepositoryImpl中补充实现:

@Override
public boolean exists(Long id) {
    String key = KEY_CATEGORY_ITEM_PREFIX + id;
    return redisTemplate.hasKey(key);
}

@Override
public void saveEmptyValue(Long id) {
    String key = KEY_CATEGORY_ITEM_PREFIX + id;
    redisTemplate.opsForValue().set(key, null);
}

业务中的具体实现为:

@Override
public CategoryDetailsVO getDetailsById(Long id) {
    // ===== 以下是原有代码,只从数据库中获取数据 =====
    // CategoryDetailsVO category = categoryMapper.getDetailsById(id);
    // if (category == null) {
    //     throw new ServiceException(State.ERR_CATEGORY_NOT_FOUND,
    //             "获取类别详情失败,尝试访问的数据不存在!");
    // }
    // return category;

    // ===== 以下是新的业务,将从Redis中获取数据 =====
    log.debug("根据id({})获取类别详情……", id);
    // 从repository中调用方法,根据id获取缓存的数据
    // 判断缓存中是否存在与此id对应的key
    boolean exists = categoryRedisRepository.exists(id);
    if (exists) {
        // 有:表示明确的存入过某数据,此数据可能是有效数据,也可能是null
        // -- 判断此key对应的数据是否为null
        CategoryDetailsVO cacheResult = categoryRedisRepository.getDetailsById(id);
        if (cacheResult == null) {
            // -- 是:表示明确的存入了null值,则此id对应的数据确实不存在,则抛出异常
            log.warn("在缓存中存在此id()对应的Key,却是null值,则抛出异常", id);
            throw new ServiceException(State.ERR_CATEGORY_NOT_FOUND,
                    "获取类别详情失败,尝试访问的数据不存在!");
        } else {
            // -- 否:表示明确的存入了有效数据,则返回此数据即可
            return cacheResult;
        }
    }

    // 缓存中没有此id匹配的数据
    // 从mapper中调用方法,根据id获取数据库的数据
    log.debug("没有命中缓存,则从数据库查询数据……");
    CategoryDetailsVO dbResult = categoryMapper.getDetailsById(id);
    // 判断从数据库中获取的结果是否为null
    if (dbResult == null) {
        // 是:数据库也没有此数据,先向缓存中写入错误数据,再抛出异常
        log.warn("数据库中也无此数据(id={}),先向缓存中写入错误数据", id);
        categoryRedisRepository.saveEmptyValue(id);
        log.warn("抛出异常");
        throw new ServiceException(State.ERR_CATEGORY_NOT_FOUND,
                "获取类别详情失败,尝试访问的数据不存在!");
    }

    // 将从数据库中查询到的结果存入到缓存中
    log.debug("已经从数据库查询到匹配的数据,将数据存入缓存……");
    categoryRedisRepository.save(dbResult);
    // 返回查询结果
    log.debug("返回查询到数据:{}", dbResult);
    return dbResult;
}

许多缓存数据应该是服务器刚刚启动就直接写入到Redis中的,当后续客户端访问时,缓存中已经存在的数据可以直接响应,避免获取数据时缓存中还没有对应的数据,还需要从数据库中查询。

在服务器刚刚启动时就加载需要缓存的数据并写入到Redis中,这种做法称之为缓存预热。

需要解决的问题有:

在Spring Boot中,可以自定义某个组件类,实现ApplicationRunner即可,例如:

package cn.tedu.csmall.product.webapi.app;

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

@Component
public class CachePreLoad implements ApplicationRunner {

    @Override
    public void run(ApplicationArguments args) throws Exception {
        System.out.println("CachePreLoad.run()");
    }

}

为了将全部“类别”写入到缓存中,首先,需要能够从数据库中查询到全部数据,则需要:

然后,还需要实现将查询到的List<CategoryDetailsVO>写入到Redis中,则需要: