package com.dong.mytest.demo.client;
import cn.hutool.extra.spring.SpringUtil;
import com.dong.mytest.demo.common.dto.DelayMessage;
import com.dong.mytest.demo.common.util.DateUtil;
import com.dong.mytest.demo.service.delayqueue.DelayQueueConsumer;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* @author dong
*/
@Slf4j
@Component
public class RedissonDelayQueueClient implements InitializingBean {
@Resource
private RedissonClient redissonClient;
private final Map<String, RDelayedQueue<DelayMessage>> delayQueueMap = new ConcurrentHashMap<>(16);
public void addDelayMessage(DelayMessage delayMessage) {
log.info("delayMessage={}", delayMessage);
if (delayQueueMap.get(delayMessage.getQueueName()) == null) {
log.warn("queueName={},该延迟队列不存在,请确认后再试...", delayMessage.getQueueName());
return;
}
delayMessage.setCreateTime(DateUtil.getNowFormatStr());
RDelayedQueue<DelayMessage> rDelayedQueue = delayQueueMap.get(delayMessage.getQueueName());
rDelayedQueue.offer(delayMessage, delayMessage.getDelayTime(), delayMessage.getTimeUnit() == null ? TimeUnit.SECONDS : delayMessage.getTimeUnit());
}
@Override
public void afterPropertiesSet() throws Exception {
// 有新的延迟队列在这里添加,队列消费类需要继承DelayQueueConsumer,并且service名称为 ${queueName}Consumer
List<String> queueNameList = Lists.newArrayList("orderAutoCancelDelayQueue");
// 加载延迟队列
for (String queueName : queueNameList) {
DelayQueueConsumer delayQueueConsumer = SpringUtil.getBean(queueName + "Consumer");
if (delayQueueConsumer == null) {
throw new RuntimeException("queueName=" + queueName + ",delayQueueConsumer=null,请检查配置...");
}
// Redisson的延时队列是对另一个队列的再包装,使用时要先将延时消息添加到延时队列中,当延时队列中的消息达到设定的延时时间后,
// 该延时消息才会进行进入到被包装队列中,因此,我们只需要对被包装队列进行监听即可。
RBlockingQueue<DelayMessage> rBlockingQueue = redissonClient.getBlockingDeque(queueName);
RDelayedQueue<DelayMessage> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);
delayQueueMap.put(queueName, rDelayedQueue);
// 订阅新元素的到来,调用的是takeAsync(),异步执行
rBlockingQueue.subscribeOnElements(delayQueueConsumer::execute);
}
}
}
package com.dong.mytest.demo.service.delayqueue;
import com.dong.mytest.demo.common.dto.DelayMessage;
/**
* @author dong
*/
public interface DelayQueueConsumer {
/**
* 执行延迟消息
*
* @param delayMessage delayMessage
*/
void execute(DelayMessage delayMessage);
}
package com.dong.mytest.demo.service.delayqueue.impl;
import com.dong.mytest.demo.common.dto.DelayMessage;
import com.dong.mytest.demo.service.delayqueue.DelayQueueConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* @author dong
*/
@Service("orderAutoCancelDelayQueueConsumer")
@Slf4j
public class OrderAutoCancelDelayQueueConsumer implements DelayQueueConsumer {
@Override
public void execute(DelayMessage delayMessage) {
log.info("====OrderAutoCancelConsumer=====delayMessage={}", delayMessage);
}
}
package com.dong.mytest.demo.common.dto;
import com.alibaba.fastjson.JSON;
import lombok.Data;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
/**
* @author dong
*/
@Data
public class DelayMessage implements Serializable {
private String queueName;
private Long delayTime;
private TimeUnit timeUnit;
private String msgBody;
private String createTime;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}