一、监听key过期时间
处理流程:当redis的一个key过期时,redis会生成一个事件,通知订阅了该事件的客户端(KeyExpirationEventMessageListener),然后在客户端的回调方法中处理逻辑。
1)新建SpringBoot项目,maven依赖及yml如下
maven依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
yml文件
server:
port: 8000
spring:
redis:
database: 0
host: xxxx
port: 6379
password: xxxxxx
lettuce:
pool:
#最大连接数
max-active: 8
#最大阻塞等待时间
max-wait: -1
#最大空闲
max-idle: 8
#最小空闲
min-idle: 0
#连接超时时间
timeout: 5000
2)修改redis.conf文件开启事件通知配置
默认的配置:notify-keyspace-events “”
修改为:notify-keyspace-events Ex,该配置表示监听key的过期事件
3)设置Redis监听配置,注入Bean RedisMessageListenerContaine
@Configuration
public class RedisTimeoutConfiguration {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
return redisMessageListenerContainer;
}
@Bean
public KeyExpiredListener keyExpiredListener() {
return new KeyExpiredListener(this.redisMessageListenerContainer());
}
}
4)创建监听器类,重写key过期回调方法onMessage
@Slf4j
public class KeyExpiredListener extends KeyExpirationEventMessageListener {
@Autowired
public RedisTemplate<String, String> redisTemplate;
public KeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] bytes) {
String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
//过期的key
String key = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("redis key 过期:bytes={},channel={},key={}", new String(bytes), channel, key);
}
}
5)编写测试接口:写入一个带过期时间的key
@RestController
@RequestMapping("/demo")
public class BasicController {
@Autowired
public RedisTemplate<String, String> redisTemplate;
@GetMapping(value = "/test")
public void redisTest() {
redisTemplate.opsForValue().set("test", "5s后过期", 5, TimeUnit.SECONDS);
}
}
执行后,onMessage
监听方法打印结果:
redis key 过期:bytes=__keyevent@*__:expired,channel=__keyevent@0__:expired,key=test
该方案缺点:可靠性问题,Redis 是一个内存数据库,尽管它提供了数据持久化选项(如 RDB 和 AOF),但在某些情况下(如意外崩溃或重启),可能会丢失一些未处理的过期事件。
二、zset + score
基本思路是将消息按需发送的时间作为分数存储在有序集合zset中,然后定期检查并处理到期的消息。代码例子如下:
1)创建 DelayedMessageService 类
@Slf4j
@Service
public class DelayedMessageService {
private static final String DELAYED_MESSAGES_ZSET = "delayed:messages";
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void addMessage(String message, long delayMillis) {
long score = System.currentTimeMillis() + delayMillis;
redisTemplate.opsForZSet().add(DELAYED_MESSAGES_ZSET, message, score);
}
@Scheduled(fixedRate = 1000)
public void processMessages() {
long now = System.currentTimeMillis();
Set<ZSetOperations.TypedTuple<String>> messages = redisTemplate.opsForZSet().rangeByScoreWithScores(DELAYED_MESSAGES_ZSET, 0, now);
if (messages != null && !messages.isEmpty()) {
for (ZSetOperations.TypedTuple<String> message : messages) {
String msg = message.getValue();
long score = message.getScore().longValue();
if (score <= now) {
// Process the message
System.out.println("Processing message: " + msg);
// Remove the message from the zset
redisTemplate.opsForZSet().remove(DELAYED_MESSAGES_ZSET, msg);
}
}
}else{
log.info("定时任务执行~");
}
}
}
2)编写Controller接口测试,初始化zset内容
@RestController
@RequestMapping("/demo")
public class BasicController {
@Autowired
private DelayedMessageService delayedMessageService;
@GetMapping(value = "/test2")
public void redisZsetTest() {
// Add some messages with delays
delayedMessageService.addMessage("Message 1", 5000); // 5 seconds delay
delayedMessageService.addMessage("Message 2", 10000); // 10 seconds delay
delayedMessageService.addMessage("Message 3", 15000); // 15 seconds delay
}
}
说明:
redisZsetTest接口通过调用DelayedMessageService的addMessage方法,将消息及其到期时间添加到 Redis 的 zset 中
开启一个定时任务,定期检查和处理到期的消息。使用 @Scheduled 注解定期执行,每秒检查一次,注意这里使用@Scheduled,不要忘了启动类上添加@EnableScheduling注解,否则定时任务不会生效。fixedRate 属性表示以固定的频率(毫秒为单位)执行方法。即方法执行完成后,会立即等待指定的毫秒数,然后再次执行。
通过 redisTemplate.opsForZSet().rangeByScoreWithScores 方法按时间范围获取到期的消息,消息处理完成后,从zset 中移除处理过的消息
三、Redisson框架
利用 Redisson 提供的数据结构RDelayedQueue和RBlockingDeque,可以自动处理过期的任务并将它们移动到阻塞队列中,这样我们就可以从阻塞队列中获取任务并进行消费处理。例子如下:
1)添加依赖
<!-- Redisson 依赖项 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>2.15.1</version>
</dependency>
2)创建DelayedMessageService
@Slf4j
@Service
public class DelayedMessageService {
@Autowired
private RedissonClient redissonClient;
private RBlockingDeque<String> blockingDeque;
private RDelayedQueue<String> delayedQueue;
@PostConstruct
public void init() {
this.blockingDeque = redissonClient.getBlockingDeque("delayedQueue");
this.delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
Executors.newSingleThreadExecutor().submit(this::processMessages);
}
public void addMessage(String message, long delayMillis) {
delayedQueue.offer(message, delayMillis, TimeUnit.MILLISECONDS);
}
public void processMessages() {
try {
while (true) {
String message = blockingDeque.take();
// Process the message
log.info("消息被处理: " + message);
// ..业务逻辑处理
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("中断异常",e);
}
}
}
3)测试接口
@GetMapping(value = "/test3")
public void redisQueueTest() {
// Add some messages with delays
delayedMessageService.addMessage("Message 1", 5000); // 5 seconds delay
delayedMessageService.addMessage("Message 2", 10000); // 10 seconds delay
delayedMessageService.addMessage("Message 3", 15000); // 15 seconds delay
}
原文链接:https://blog.csdn.net/zhangyifang_009/article/details/139337195