本文共 16832 字,大约阅读时间需要 56 分钟。
为了系统性能的提升,我们一般都会将部分数据放入缓存中,加速访问。而DB承担数据罗盘工作。
那些数据适合放入缓存?本地缓存:
本地缓存可以在单体应用中使用,如果分布式的就会出现问题。 分布式缓存-本地模式在分布式下的问题: 本地缓存模式下的分布式有问题,会造成缓存不统一,所以,不应该再使用本地模式的缓存 分布式缓存: 分布式模式下,所有的微服务都共用同一个缓存中间件。1、引入redis
在gulimall-product/pom.xml中引入redisorg.springframework.boot spring-boot-starter-data-redis
在gulimall-product/src/main/resources/application.yml 文件中配置redis:
spring: datasource: username: root password: root url: jdbc:mysql://192.168.10.10:3306/gulimall_pms driver-class-name: com.mysql.cj.jdbc.Driver # 配置nacos注册中心 cloud: nacos: discovery: server-addr: 127.0.0.1:8848 jackson: date-format: yyyy-MM-dd HH:mm:ss thymeleaf: cache: false # 调试期间,关闭缓存 redis: host: 192.168.10.10 prot: 6379
单元测试:
@RunWith(SpringRunner.class)@SpringBootTestpublic class GulimallProductApplicationTests { @Autowired StringRedisTemplate stringRedisTemplate; @Test public void redisTest(){ ValueOperationsops = stringRedisTemplate.opsForValue(); ops.set("hello", "lily"); String key = "hello"; System.out.println(ops.get(key)); } }
风险
利用不存在的数据进行攻击,数据库瞬时压力增大,最终导致崩溃。解决
null结果缓存,并加入短暂的过期时间。解决
原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体的失效事件。解决
加锁来处理。 大量并发只让一个去查,其他人等待,查到以后释放锁,其他人获取到锁,先查缓存,就会有数据,不用去db。穿透:查询一个永不存在的数据
雪崩:缓存key同一时间大面积失效 击穿:某一个单热点被高频访问,在前一点突然失效。结果方案:
1、空结果缓存:解决缓存穿透 2、设置过期时间(加随机值),解决缓存雪崩 3、加锁:解决缓存击穿1、基本原理
我们可以同时去一个地方“占坑”,如果占到,就执行逻辑。否则就必须等待,直到释放锁。占坑可以去Redis,也可以去数据库,可以去任何大家都可以访问到的地方。等待可以自旋的方式。2、阶段一
NX – 只有键key不存在的时候才会设置key的值。解读:每个线程在访问时,先会去判断这个“坑位”有没有加锁,如果没有加锁,则自己占住加锁(然后获取数据,从缓存/数据库),别人来的时候看到加锁就会等待,当从坑位离开时,会删除锁,然后其他的才会继续站位。
进入redis:
> docker exec -it redis redis-cli# 可以多开几个窗口进行测试127.0.0.1:6379> set lock 1 NXOK
3、阶段二
4、阶段三 5、阶段四 6、最终形态 分布式锁代码 gulimall/product/service/impl/CategoryServiceImpl.java/** * 从数据库查询并封装数据::分布式锁 * @return */ public Map> getCatalogJsonFromDbWithRedisLock() { //1、占分布式锁。去redis占坑 设置过期时间必须和加锁是同步的,保证原子性(避免死锁) String uuid = UUID.randomUUID().toString(); Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS); if (lock) { System.out.println("获取分布式锁成功..."); Map > dataFromDb = null; try { //加锁成功...执行业务 dataFromDb = getDataFromDb(); } finally { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; //删除锁 stringRedisTemplate.execute(new DefaultRedisScript (script, Long.class), Arrays.asList("lock"), uuid); } //先去redis查询下保证当前的锁是自己的 //获取值对比,对比成功删除=原子性 lua脚本解锁 // String lockValue = stringRedisTemplate.opsForValue().get("lock"); // if (uuid.equals(lockValue)) { // //删除我自己的锁 // stringRedisTemplate.delete("lock"); // } return dataFromDb; } else { System.out.println("获取分布式锁失败...等待重试..."); //加锁失败...重试机制 //休眠一百毫秒 try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return getCatalogJsonFromDbWithRedisLock(); //自旋的方式 } } private Map > getDataFromDb() { //得到锁以后,我们应该再去缓存中确定一次,如果没有才需要继续查询 String catalogJson = stringRedisTemplate.opsForValue().get("catalogJson"); if (!StringUtils.isEmpty(catalogJson)) { //缓存不为空直接返回 Map > result = JSON.parseObject(catalogJson, new TypeReference
1、简介
Redisson 是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
2、整合
1、引入依赖 gulimall-product/pom.xmlorg.redisson redisson 3.12.0
文档:
为了学习Redisson 原理,我们先手动创建Redisson配置文件,以后项目中可以引用已经封装好的 spring-boot-redisson-starter 来快速安装。2、增加redisson配置文件
com/atguigu/gulimall/product/config/MyRedissonConfig.javapackage com.atguigu.gulimall.product.config;import org.redisson.Redisson;import org.redisson.api.RedissonClient;import org.redisson.config.Config;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.io.IOException;/** * @author: kaiyi * @create: 2020-09-01 14:10 */@Configurationpublic class MyRedissonConfig { /** * 所有对Redisson的使用都是通过RedissonClient * @return * @throws IOException */ @Bean(destroyMethod="shutdown") public RedissonClient redisson() throws IOException { //1、创建配置(单节点模式) Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.10.10:6379"); //2、根据Config创建出RedissonClient实例 //Redis url should start with redis:// or rediss:// RedissonClient redissonClient = Redisson.create(config); return redissonClient; }}
@Autowired private RedissonClient redisson; @ResponseBody @GetMapping(value = "/hello") public String hello() { //1、获取一把锁,只要锁的名字一样,就是同一把锁 RLock myLock = redisson.getLock("my-lock"); //2、加锁 myLock.lock(); //阻塞式等待。默认加的锁都是30s //1)、锁的自动续期,如果业务超长,运行期间自动锁上新的30s。不用担心业务时间长,锁自动过期被删掉 //2)、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认会在30s内自动过期,不会产生死锁问题 // myLock.lock(10,TimeUnit.SECONDS); //10秒钟自动解锁,自动解锁时间一定要大于业务执行时间 //问题:在锁时间到了以后,不会自动续期 //1、如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是 我们制定的时间 //2、如果我们指定锁的超时时间,就使用 lockWatchdogTimeout = 30 * 1000 【看门狗默认时间】 //只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10秒都会自动的再次续期,续成30秒 // internalLockLeaseTime 【看门狗时间】 / 3, 10s try { System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId()); try { TimeUnit.SECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } } catch (Exception ex) { ex.printStackTrace(); } finally { //3、解锁 假设解锁代码没有运行,Redisson会不会出现死锁 System.out.println("释放锁..." + Thread.currentThread().getId()); myLock.unlock(); } return "hello"; }
官方文档:
/** * @author: kaiyi * @create: 2020-08-29 13:30 */@Controllerpublic class IndexController { @Autowired private RedissonClient redisson; @Autowired private StringRedisTemplate stringRedisTemplate; /** * 保证一定能读到最新数据,修改期间,写锁是一个排它锁(互斥锁、独享锁),读锁是一个共享锁 * 写锁没释放读锁必须等待 * 读 + 读 :相当于无锁,并发读,只会在Redis中记录好,所有当前的读锁。他们都会同时加锁成功 * 写 + 读 :必须等待写锁释放 * 写 + 写 :阻塞方式 * 读 + 写 :有读锁。写也需要等待 * 只要有读或者写的存都必须等待 * @return */ @GetMapping(value = "/write") @ResponseBody public String writeValue() { String s = ""; RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock"); RLock rLock = readWriteLock.writeLock(); try { //1、改数据加写锁,读数据加读锁 rLock.lock(); s = UUID.randomUUID().toString(); ValueOperationsops = stringRedisTemplate.opsForValue(); ops.set("writeValue",s); TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } finally { rLock.unlock(); } return s; } @GetMapping(value = "/read") @ResponseBody public String readValue() { String s = ""; RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock"); //加读锁 RLock rLock = readWriteLock.readLock(); try { rLock.lock(); ValueOperations ops = stringRedisTemplate.opsForValue(); s = ops.get("writeValue"); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } finally { rLock.unlock(); } return s; }}
信号量
/** * 车库停车 * 3车位 * 信号量也可以做分布式限流 */ @GetMapping(value = "/park") @ResponseBody public String park() throws InterruptedException { RSemaphore park = redisson.getSemaphore("park"); park.acquire(); //获取一个信号、获取一个值,占一个车位 boolean flag = park.tryAcquire(); if (flag) { //执行业务 } else { return "error"; } return "ok=>" + flag; } @GetMapping(value = "/go") @ResponseBody public String go() { RSemaphore park = redisson.getSemaphore("park"); park.release(); //释放一个车位 return "ok"; }
分布式闭锁
/** * 放假、锁门 * 1班没人了 * 5个班,全部走完,我们才可以锁大门 * 分布式闭锁 */ @GetMapping(value = "/lockDoor") @ResponseBody public String lockDoor() throws InterruptedException { RCountDownLatch door = redisson.getCountDownLatch("door"); door.trySetCount(5); door.await(); //等待闭锁完成 return "放假了..."; } @GetMapping(value = "/gogogo/{id}") @ResponseBody public String gogogo(@PathVariable("id") Long id) { RCountDownLatch door = redisson.getCountDownLatch("door"); door.countDown(); //计数-1 return id + "班的人都走了..."; }
项目文件com/atguigu/gulimall/product/service/impl/CategoryServiceImpl.java
@Autowired private RedissonClient redissonClient;/** * 缓存里的数据如何和数据库的数据保持一致?? * 缓存数据一致性 * 1)、双写模式 * 2)、失效模式 * @return */ public Map> getCatalogJsonFromDbWithRedissonLock() { //1、占分布式锁。去redis占坑 //(锁的粒度,越细越快:具体缓存的是某个数据,11号商品) product-11-lock //RLock catalogJsonLock = redissonClient.getLock("catalogJson-lock"); //创建读锁 RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("catalogJson-lock"); RLock rLock = readWriteLock.readLock(); Map > dataFromDb = null; try { rLock.lock(); //加锁成功...执行业务 dataFromDb = getDataFromDb(); } finally { rLock.unlock(); } //先去redis查询下保证当前的锁是自己的 //获取值对比,对比成功删除=原子性 lua脚本解锁 // String lockValue = stringRedisTemplate.opsForValue().get("lock"); // if (uuid.equals(lockValue)) { // //删除我自己的锁 // stringRedisTemplate.delete("lock"); // } return dataFromDb; }
整合步骤:
pom.xml
org.springframework.boot spring-boot-starter-cache org.springframework.boot spring-boot-starter-data-redis io.lettuce lettuce-core redis.clients jedis
代码整合
增加Spring cache 缓存 com/atguigu/gulimall/product/service/impl/CategoryServiceImpl.java@Service("categoryService")public class CategoryServiceImpl extends ServiceImplimplements CategoryService {// @Autowired// CategoryDao categoryDao; @Resource private CategoryBrandRelationService categoryBrandRelationService; @Resource private StringRedisTemplate stringRedisTemplate; @Autowired private RedissonClient redissonClient; /** * 级联更新所有关联的数据 * * @CacheEvict:失效模式 * @CachePut:双写模式,需要有返回值 * 1、同时进行多种缓存操作:@Caching * 2、指定删除某个分区下的所有数据 @CacheEvict(value = "category",allEntries = true) * 3、存储同一类型的数据,都可以指定为同一分区 * @param category */ // @Caching(evict = { // @CacheEvict(value = "category",key = "'getLevel1Categorys'"), // @CacheEvict(value = "category",key = "'getCatalogJson'") // }) @CacheEvict(value = "category",allEntries = true) //删除某个分区下的所有数据 @Transactional(rollbackFor = Exception.class) @Override public void updateCascade(CategoryEntity category) { RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("catalogJson-lock"); //创建写锁 RLock rLock = readWriteLock.writeLock(); try { rLock.lock(); this.baseMapper.updateById(category); categoryBrandRelationService.updateCategory(category.getCatId(), category.getName()); } catch (Exception e) { e.printStackTrace(); } finally { rLock.unlock(); } //同时修改缓存中的数据 //删除缓存,等待下一次主动查询进行更新 } /** * 每一个需要缓存的数据我们都来指定要放到那个名字的缓存。【缓存的分区(按照业务类型分)】 * 代表当前方法的结果需要缓存,如果缓存中有,方法都不用调用,如果缓存中没有,会调用方法。最后将方法的结果放入缓存 * 默认行为 * 如果缓存中有,方法不再调用 * key是默认生成的:缓存的名字::SimpleKey::[](自动生成key值) * 缓存的value值,默认使用jdk序列化机制,将序列化的数据存到redis中 * 默认时间是 -1: * * 自定义操作:key的生成 * 指定生成缓存的key:key属性指定,接收一个Spel * 指定缓存的数据的存活时间:配置文档中修改存活时间 * 将数据保存为json格式 * * * 4、Spring-Cache的不足之处: * 1)、读模式 * 缓存穿透:查询一个null数据。解决方案:缓存空数据 * 缓存击穿:大量并发进来同时查询一个正好过期的数据。解决方案:加锁 ? 默认是无加锁的;使用sync = true来解决击穿问题 * 缓存雪崩:大量的key同时过期。解决:加随机时间。加上过期时间 * 2)、写模式:(缓存与数据库一致) * 1)、读写加锁。 * 2)、引入Canal,感知到MySQL的更新去更新Redis * 3)、读多写多,直接去数据库查询就行 * * 总结: * 常规数据(读多写少,即时性,一致性要求不高的数据,完全可以使用Spring-Cache):写模式(只要缓存的数据有过期时间就足够了) * 特殊数据:特殊设计 * * 原理: * CacheManager(RedisCacheManager)->Cache(RedisCache)->Cache负责缓存的读写 * @return */ @Cacheable(value = {"category"},key = "#root.method.name",sync = true) @Override public List getLevel1Categorys() { System.out.println("getLevel1Categorys........"); long l = System.currentTimeMillis(); List categoryEntities = this.baseMapper.selectList( new QueryWrapper ().eq("parent_cid", 0)); System.out.println("消耗时间:"+ (System.currentTimeMillis() - l)); return categoryEntities; } @Cacheable(value = "category",key = "#root.methodName") @Override public Map > getCatalogJson() { System.out.println("查询了数据库"); //将数据库的多次查询变为一次 List selectList = this.baseMapper.selectList(null); //1、查出所有分类 //1、1)查出所有一级分类 List level1Categorys = getParent_cid(selectList, 0L); //封装数据 Map > parentCid = level1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> { //1、每一个的一级分类,查到这个一级分类的二级分类 List categoryEntities = getParent_cid(selectList, v.getCatId()); //2、封装上面的结果 List catelog2Vos = null; if (categoryEntities != null) { catelog2Vos = categoryEntities.stream().map(l2 -> { Catelog2Vo catelog2Vo = new Catelog2Vo(v.getCatId().toString(), null, l2.getCatId().toString(), l2.getName().toString()); //1、找当前二级分类的三级分类封装成vo List level3Catelog = getParent_cid(selectList, l2.getCatId()); if (level3Catelog != null) { List category3Vos = level3Catelog.stream().map(l3 -> { //2、封装成指定格式 Catelog2Vo.Category3Vo category3Vo = new Catelog2Vo.Category3Vo(l2.getCatId().toString(), l3.getCatId().toString(), l3.getName()); return category3Vo; }).collect(Collectors.toList()); catelog2Vo.setCatalog3List(category3Vos); } return catelog2Vo; }).collect(Collectors.toList()); } return catelog2Vos; })); return parentCid; }}
转载地址:http://sonti.baihongyu.com/