博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
缓存及分布式锁
阅读量:4147 次
发布时间:2019-05-25

本文共 16832 字,大约阅读时间需要 56 分钟。

文章目录

一、缓存

1、缓存使用

为了系统性能的提升,我们一般都会将部分数据放入缓存中,加速访问。而DB承担数据罗盘工作。

那些数据适合放入缓存?

  • 即时性、数据一致性要求不高的
  • 访问量大而且更新频率不高的数据(读多,写少)

2、本地缓存与分布式缓存

本地缓存:

在这里插入图片描述
本地缓存可以在单体应用中使用,如果分布式的就会出现问题。
分布式缓存-本地模式在分布式下的问题:
在这里插入图片描述
本地缓存模式下的分布式有问题,会造成缓存不统一,所以,不应该再使用本地模式的缓存
分布式缓存:
在这里插入图片描述
分布式模式下,所有的微服务都共用同一个缓存中间件。

3、整合redis

1、引入redis

在gulimall-product/pom.xml中引入redis

org.springframework.boot
spring-boot-starter-data-redis

在gulimall-product/src/main/resources/application.yml 文件中配置redis:

spring:  datasource:    username: root    password: root    url: jdbc:mysql://192.168.10.10:3306/gulimall_pms    driver-class-name: com.mysql.cj.jdbc.Driver  #  配置nacos注册中心  cloud:    nacos:      discovery:        server-addr: 127.0.0.1:8848  jackson:    date-format: yyyy-MM-dd HH:mm:ss  thymeleaf:    cache: false  # 调试期间,关闭缓存  redis:    host: 192.168.10.10    prot: 6379

单元测试:

@RunWith(SpringRunner.class)@SpringBootTestpublic class GulimallProductApplicationTests {  @Autowired  StringRedisTemplate stringRedisTemplate;    @Test  public void redisTest(){    ValueOperations
ops = stringRedisTemplate.opsForValue(); ops.set("hello", "lily"); String key = "hello"; System.out.println(ops.get(key)); } }

二、高并发下缓存失效的三个问题-穿透、雪崩、击穿

1、缓存穿透

在这里插入图片描述

缓存穿透
指查询一个一定不存在的数据,由于缓存是不命中,将去查数据库,但是数据库也无此记录,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。

风险

利用不存在的数据进行攻击,数据库瞬时压力增大,最终导致崩溃。

解决

null结果缓存,并加入短暂的过期时间。

2、缓存雪崩

在这里插入图片描述

缓存雪崩
缓存雪崩是指在我们设置缓存时key采用了相同的过期时间,导致缓存在某一个时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。

解决

原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体的失效事件。

3、缓存击穿

在这里插入图片描述

缓存击穿

  • 对于一些设置了过期时间的key,如果这些key可能会在某一时间点被高并发的访问,是一种非常“热点“的数据。
  • 如果这个key在大量请求同时进来前刚好失效,那么所有对这个key的查询都落在db,我们成为缓存击穿。

解决

加锁来处理。
大量并发只让一个去查,其他人等待,查到以后释放锁,其他人获取到锁,先查缓存,就会有数据,不用去db。

4、小结

穿透:查询一个永不存在的数据

雪崩:缓存key同一时间大面积失效
击穿:某一个单热点被高频访问,在前一点突然失效。

结果方案:

1、空结果缓存:解决缓存穿透
2、设置过期时间(加随机值),解决缓存雪崩
3、加锁:解决缓存击穿

三、分布式锁原理及使用

1、本地锁

在这里插入图片描述

锁-时序问题:
在这里插入图片描述

2、分布式锁演进

1、基本原理

在这里插入图片描述
我们可以同时去一个地方“占坑”,如果占到,就执行逻辑。否则就必须等待,直到释放锁。占坑可以去Redis,也可以去数据库,可以去任何大家都可以访问到的地方。等待可以自旋的方式。

2、阶段一

在这里插入图片描述
NX – 只有键key不存在的时候才会设置key的值。

解读:每个线程在访问时,先会去判断这个“坑位”有没有加锁,如果没有加锁,则自己占住加锁(然后获取数据,从缓存/数据库),别人来的时候看到加锁就会等待,当从坑位离开时,会删除锁,然后其他的才会继续站位。

进入redis:

> docker exec -it redis redis-cli# 可以多开几个窗口进行测试127.0.0.1:6379> set lock 1 NXOK

3、阶段二

在这里插入图片描述
4、阶段三
在这里插入图片描述
5、阶段四
在这里插入图片描述
6、最终形态
在这里插入图片描述
分布式锁代码
gulimall/product/service/impl/CategoryServiceImpl.java

/**     * 从数据库查询并封装数据::分布式锁     * @return     */    public Map
> getCatalogJsonFromDbWithRedisLock() { //1、占分布式锁。去redis占坑 设置过期时间必须和加锁是同步的,保证原子性(避免死锁) String uuid = UUID.randomUUID().toString(); Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS); if (lock) { System.out.println("获取分布式锁成功..."); Map
> dataFromDb = null; try { //加锁成功...执行业务 dataFromDb = getDataFromDb(); } finally { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; //删除锁 stringRedisTemplate.execute(new DefaultRedisScript
(script, Long.class), Arrays.asList("lock"), uuid); } //先去redis查询下保证当前的锁是自己的 //获取值对比,对比成功删除=原子性 lua脚本解锁 // String lockValue = stringRedisTemplate.opsForValue().get("lock"); // if (uuid.equals(lockValue)) { // //删除我自己的锁 // stringRedisTemplate.delete("lock"); // } return dataFromDb; } else { System.out.println("获取分布式锁失败...等待重试..."); //加锁失败...重试机制 //休眠一百毫秒 try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return getCatalogJsonFromDbWithRedisLock(); //自旋的方式 } } private Map
> getDataFromDb() { //得到锁以后,我们应该再去缓存中确定一次,如果没有才需要继续查询 String catalogJson = stringRedisTemplate.opsForValue().get("catalogJson"); if (!StringUtils.isEmpty(catalogJson)) { //缓存不为空直接返回 Map
> result = JSON.parseObject(catalogJson, new TypeReference
>>() { }); return result; } System.out.println("查询了数据库"); /** * 将数据库的多次查询变为一次 */ List
selectList = this.baseMapper.selectList(null); //1、查出所有分类 //1、1)查出所有一级分类 List
level1Categorys = getParent_cid(selectList, 0L); //封装数据 Map
> parentCid = level1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> { //1、每一个的一级分类,查到这个一级分类的二级分类 List
categoryEntities = getParent_cid(selectList, v.getCatId()); //2、封装上面的结果 List
catelog2Vos = null; if (categoryEntities != null) { catelog2Vos = categoryEntities.stream().map(l2 -> { Catelog2Vo catelog2Vo = new Catelog2Vo(v.getCatId().toString(), null, l2.getCatId().toString(), l2.getName().toString()); //1、找当前二级分类的三级分类封装成vo List
level3Catelog = getParent_cid(selectList, l2.getCatId()); if (level3Catelog != null) { List
category3Vos = level3Catelog.stream().map(l3 -> { //2、封装成指定格式 Catelog2Vo.Category3Vo category3Vo = new Catelog2Vo.Category3Vo(l2.getCatId().toString(), l3.getCatId().toString(), l3.getName()); return category3Vo; }).collect(Collectors.toList()); catelog2Vo.setCatalog3List(category3Vos); } return catelog2Vo; }).collect(Collectors.toList()); } return catelog2Vos; })); //3、将查到的数据放入缓存,将对象转为json String valueJson = JSON.toJSONString(parentCid); stringRedisTemplate.opsForValue().set("catalogJson", valueJson, 1, TimeUnit.DAYS); return parentCid; }

四、分布式锁-Redisson

1、Redisson简介和整合

1、简介

Redisson 是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

2、整合

1、引入依赖
gulimall-product/pom.xml

org.redisson
redisson
3.12.0

文档:

为了学习Redisson 原理,我们先手动创建Redisson配置文件,以后项目中可以引用已经封装好的 spring-boot-redisson-starter 来快速安装。

2、增加redisson配置文件

com/atguigu/gulimall/product/config/MyRedissonConfig.java

package com.atguigu.gulimall.product.config;import org.redisson.Redisson;import org.redisson.api.RedissonClient;import org.redisson.config.Config;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.io.IOException;/** * @author: kaiyi * @create: 2020-09-01 14:10 */@Configurationpublic class MyRedissonConfig {  /**   * 所有对Redisson的使用都是通过RedissonClient   * @return   * @throws IOException   */  @Bean(destroyMethod="shutdown")  public RedissonClient redisson() throws IOException {    //1、创建配置(单节点模式)    Config config = new Config();    config.useSingleServer().setAddress("redis://192.168.10.10:6379");    //2、根据Config创建出RedissonClient实例    //Redis url should start with redis:// or rediss://    RedissonClient redissonClient = Redisson.create(config);    return redissonClient;  }}

2、Redisson-lock锁测试

@Autowired  private RedissonClient redisson; @ResponseBody  @GetMapping(value = "/hello")  public String hello() {    //1、获取一把锁,只要锁的名字一样,就是同一把锁    RLock myLock = redisson.getLock("my-lock");    //2、加锁    myLock.lock();      //阻塞式等待。默认加的锁都是30s    //1)、锁的自动续期,如果业务超长,运行期间自动锁上新的30s。不用担心业务时间长,锁自动过期被删掉    //2)、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认会在30s内自动过期,不会产生死锁问题    // myLock.lock(10,TimeUnit.SECONDS);   //10秒钟自动解锁,自动解锁时间一定要大于业务执行时间    //问题:在锁时间到了以后,不会自动续期    //1、如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是 我们制定的时间    //2、如果我们指定锁的超时时间,就使用 lockWatchdogTimeout = 30 * 1000 【看门狗默认时间】    //只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10秒都会自动的再次续期,续成30秒    // internalLockLeaseTime 【看门狗时间】 / 3, 10s    try {      System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());      try { TimeUnit.SECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); }    } catch (Exception ex) {      ex.printStackTrace();    } finally {      //3、解锁  假设解锁代码没有运行,Redisson会不会出现死锁      System.out.println("释放锁..." + Thread.currentThread().getId());      myLock.unlock();    }    return "hello";  }

官方文档:

3、Redisson读写锁

/** * @author: kaiyi * @create: 2020-08-29 13:30 */@Controllerpublic class IndexController {  @Autowired  private RedissonClient redisson;  @Autowired  private StringRedisTemplate stringRedisTemplate;    /**   * 保证一定能读到最新数据,修改期间,写锁是一个排它锁(互斥锁、独享锁),读锁是一个共享锁   * 写锁没释放读锁必须等待   * 读 + 读 :相当于无锁,并发读,只会在Redis中记录好,所有当前的读锁。他们都会同时加锁成功   * 写 + 读 :必须等待写锁释放   * 写 + 写 :阻塞方式   * 读 + 写 :有读锁。写也需要等待   * 只要有读或者写的存都必须等待   * @return   */  @GetMapping(value = "/write")  @ResponseBody  public String writeValue() {    String s = "";    RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");    RLock rLock = readWriteLock.writeLock();    try {      //1、改数据加写锁,读数据加读锁      rLock.lock();      s = UUID.randomUUID().toString();      ValueOperations
ops = stringRedisTemplate.opsForValue(); ops.set("writeValue",s); TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } finally { rLock.unlock(); } return s; } @GetMapping(value = "/read") @ResponseBody public String readValue() { String s = ""; RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock"); //加读锁 RLock rLock = readWriteLock.readLock(); try { rLock.lock(); ValueOperations
ops = stringRedisTemplate.opsForValue(); s = ops.get("writeValue"); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } finally { rLock.unlock(); } return s; }}

信号量

/**   * 车库停车   * 3车位   * 信号量也可以做分布式限流   */  @GetMapping(value = "/park")  @ResponseBody  public String park() throws InterruptedException {    RSemaphore park = redisson.getSemaphore("park");    park.acquire();     //获取一个信号、获取一个值,占一个车位    boolean flag = park.tryAcquire();    if (flag) {      //执行业务    } else {      return "error";    }    return "ok=>" + flag;  }  @GetMapping(value = "/go")  @ResponseBody  public String go() {    RSemaphore park = redisson.getSemaphore("park");    park.release();     //释放一个车位    return "ok";  }

分布式闭锁

/**   * 放假、锁门   * 1班没人了   * 5个班,全部走完,我们才可以锁大门   * 分布式闭锁   */  @GetMapping(value = "/lockDoor")  @ResponseBody  public String lockDoor() throws InterruptedException {    RCountDownLatch door = redisson.getCountDownLatch("door");    door.trySetCount(5);    door.await();       //等待闭锁完成    return "放假了...";  }  @GetMapping(value = "/gogogo/{id}")  @ResponseBody  public String gogogo(@PathVariable("id") Long id) {    RCountDownLatch door = redisson.getCountDownLatch("door");    door.countDown();       //计数-1    return id + "班的人都走了...";  }

4、项目实战

项目文件com/atguigu/gulimall/product/service/impl/CategoryServiceImpl.java

@Autowired  private RedissonClient redissonClient;/**   * 缓存里的数据如何和数据库的数据保持一致??   * 缓存数据一致性   * 1)、双写模式   * 2)、失效模式   * @return   */  public Map
> getCatalogJsonFromDbWithRedissonLock() { //1、占分布式锁。去redis占坑 //(锁的粒度,越细越快:具体缓存的是某个数据,11号商品) product-11-lock //RLock catalogJsonLock = redissonClient.getLock("catalogJson-lock"); //创建读锁 RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("catalogJson-lock"); RLock rLock = readWriteLock.readLock(); Map
> dataFromDb = null; try { rLock.lock(); //加锁成功...执行业务 dataFromDb = getDataFromDb(); } finally { rLock.unlock(); } //先去redis查询下保证当前的锁是自己的 //获取值对比,对比成功删除=原子性 lua脚本解锁 // String lockValue = stringRedisTemplate.opsForValue().get("lock"); // if (uuid.equals(lockValue)) { // //删除我自己的锁 // stringRedisTemplate.delete("lock"); // } return dataFromDb; }

五、SpringCache

1、项目整合SpringCache

整合步骤:

  • 1、引入依赖
    spring-boot-starter-cache,spring-boot-starter-data-redis
  • 2、写配置
    2.1 自动配置了那些 (CacheAutoConfiguration 会导入 RedisCacheConfiguration,自动配好了缓存管理器)
    2.2 配置使用Redis作为缓存 spring.cache.type=redis
  • 3、使用缓存

pom.xml

org.springframework.boot
spring-boot-starter-cache
org.springframework.boot
spring-boot-starter-data-redis
io.lettuce
lettuce-core
redis.clients
jedis

代码整合

增加Spring cache 缓存
com/atguigu/gulimall/product/service/impl/CategoryServiceImpl.java

@Service("categoryService")public class CategoryServiceImpl extends ServiceImpl
implements CategoryService {// @Autowired// CategoryDao categoryDao; @Resource private CategoryBrandRelationService categoryBrandRelationService; @Resource private StringRedisTemplate stringRedisTemplate; @Autowired private RedissonClient redissonClient; /** * 级联更新所有关联的数据 * * @CacheEvict:失效模式 * @CachePut:双写模式,需要有返回值 * 1、同时进行多种缓存操作:@Caching * 2、指定删除某个分区下的所有数据 @CacheEvict(value = "category",allEntries = true) * 3、存储同一类型的数据,都可以指定为同一分区 * @param category */ // @Caching(evict = { // @CacheEvict(value = "category",key = "'getLevel1Categorys'"), // @CacheEvict(value = "category",key = "'getCatalogJson'") // }) @CacheEvict(value = "category",allEntries = true) //删除某个分区下的所有数据 @Transactional(rollbackFor = Exception.class) @Override public void updateCascade(CategoryEntity category) { RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("catalogJson-lock"); //创建写锁 RLock rLock = readWriteLock.writeLock(); try { rLock.lock(); this.baseMapper.updateById(category); categoryBrandRelationService.updateCategory(category.getCatId(), category.getName()); } catch (Exception e) { e.printStackTrace(); } finally { rLock.unlock(); } //同时修改缓存中的数据 //删除缓存,等待下一次主动查询进行更新 } /** * 每一个需要缓存的数据我们都来指定要放到那个名字的缓存。【缓存的分区(按照业务类型分)】 * 代表当前方法的结果需要缓存,如果缓存中有,方法都不用调用,如果缓存中没有,会调用方法。最后将方法的结果放入缓存 * 默认行为 * 如果缓存中有,方法不再调用 * key是默认生成的:缓存的名字::SimpleKey::[](自动生成key值) * 缓存的value值,默认使用jdk序列化机制,将序列化的数据存到redis中 * 默认时间是 -1: * * 自定义操作:key的生成 * 指定生成缓存的key:key属性指定,接收一个Spel * 指定缓存的数据的存活时间:配置文档中修改存活时间 * 将数据保存为json格式 * * * 4、Spring-Cache的不足之处: * 1)、读模式 * 缓存穿透:查询一个null数据。解决方案:缓存空数据 * 缓存击穿:大量并发进来同时查询一个正好过期的数据。解决方案:加锁 ? 默认是无加锁的;使用sync = true来解决击穿问题 * 缓存雪崩:大量的key同时过期。解决:加随机时间。加上过期时间 * 2)、写模式:(缓存与数据库一致) * 1)、读写加锁。 * 2)、引入Canal,感知到MySQL的更新去更新Redis * 3)、读多写多,直接去数据库查询就行 * * 总结: * 常规数据(读多写少,即时性,一致性要求不高的数据,完全可以使用Spring-Cache):写模式(只要缓存的数据有过期时间就足够了) * 特殊数据:特殊设计 * * 原理: * CacheManager(RedisCacheManager)->Cache(RedisCache)->Cache负责缓存的读写 * @return */ @Cacheable(value = {"category"},key = "#root.method.name",sync = true) @Override public List
getLevel1Categorys() { System.out.println("getLevel1Categorys........"); long l = System.currentTimeMillis(); List
categoryEntities = this.baseMapper.selectList( new QueryWrapper
().eq("parent_cid", 0)); System.out.println("消耗时间:"+ (System.currentTimeMillis() - l)); return categoryEntities; } @Cacheable(value = "category",key = "#root.methodName") @Override public Map
> getCatalogJson() { System.out.println("查询了数据库"); //将数据库的多次查询变为一次 List
selectList = this.baseMapper.selectList(null); //1、查出所有分类 //1、1)查出所有一级分类 List
level1Categorys = getParent_cid(selectList, 0L); //封装数据 Map
> parentCid = level1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> { //1、每一个的一级分类,查到这个一级分类的二级分类 List
categoryEntities = getParent_cid(selectList, v.getCatId()); //2、封装上面的结果 List
catelog2Vos = null; if (categoryEntities != null) { catelog2Vos = categoryEntities.stream().map(l2 -> { Catelog2Vo catelog2Vo = new Catelog2Vo(v.getCatId().toString(), null, l2.getCatId().toString(), l2.getName().toString()); //1、找当前二级分类的三级分类封装成vo List
level3Catelog = getParent_cid(selectList, l2.getCatId()); if (level3Catelog != null) { List
category3Vos = level3Catelog.stream().map(l3 -> { //2、封装成指定格式 Catelog2Vo.Category3Vo category3Vo = new Catelog2Vo.Category3Vo(l2.getCatId().toString(), l3.getCatId().toString(), l3.getName()); return category3Vo; }).collect(Collectors.toList()); catelog2Vo.setCatalog3List(category3Vos); } return catelog2Vo; }).collect(Collectors.toList()); } return catelog2Vos; })); return parentCid; }}

转载地址:http://sonti.baihongyu.com/

你可能感兴趣的文章
Ubuntu Navicat for MySQL安装以及破解方案
查看>>
HTTPS那些事 用java实现HTTPS工作原理
查看>>
oracle函数trunc的使用
查看>>
MySQL 存储过程或者函数中传参数实现where id in(1,2,3,...)IN条件拼接
查看>>
java反编译
查看>>
Class.forName( )你搞懂了吗?——转
查看>>
jarFile
查看>>
EJB3.0定时发送jms(发布/定阅)方式
查看>>
EJB与JAVA BEAN_J2EE的异步消息机制
查看>>
数学等于号是=那三个横杠是什么符
查看>>
HTTP协议详解
查看>>
java多线程中的join方法详解
查看>>
ECLIPSE远程调试出现如下问题 ECLIPSE中调试代码提示找不到源
查看>>
java abstract修饰符
查看>>
数组分为两部分,使得其和相差最小
查看>>
java抽象类和接口
查看>>
有趣的排序——百度2017春招
查看>>
二叉树的最近公共祖先LCA
查看>>
数组中累加和为定值K的最长子数组长度
查看>>
素数对--腾讯2017校招编程
查看>>