优化阿里云RocketMQ配置属性,修复Message无法消费到的问题

This commit is contained in:
pimh 2021-12-16 20:41:28 +08:00
parent 29416263a7
commit 4161f0f53e
6 changed files with 25 additions and 40 deletions

View File

@ -81,7 +81,6 @@ spring:
# namesrvAddr: 127.0.0.1:9876
# accessKey: accessKey
# secretKey: secretKey
# consumerId: JEEPAY-GROUP
# producerId: JEEPAY-GROUP
#日志配置参数。

View File

@ -91,7 +91,8 @@ spring:
# namesrvAddr: 127.0.0.1:9876
# accessKey: accessKey
# secretKey: secretKey
# consumerId: JEEPAY-GROUP
# consumerId: JEEPAY-GROUP-MGR
# broadcastConsumerId: JEEPAY-GROUP-MGR-BROADCAST # 广播模式消费者ID
# producerId: JEEPAY-GROUP
#日志配置参数。

View File

@ -91,7 +91,8 @@ spring:
# namesrvAddr: 127.0.0.1:9876
# accessKey: accessKey
# secretKey: secretKey
# consumerId: JEEPAY-GROUP
# consumerId: JEEPAY-GROUP-MCH
# broadcastConsumerId: JEEPAY-GROUP-MCH-BROADCAST # 广播模式消费者ID
# producerId: JEEPAY-GROUP
#日志配置参数。

View File

@ -91,9 +91,11 @@ spring:
# namesrvAddr: 127.0.0.1:9876
# accessKey: accessKey
# secretKey: secretKey
# consumerId: JEEPAY-GROUP
# consumerId: JEEPAY-GROUP-PAY
# broadcastConsumerId: JEEPAY-GROUP-PAY-BROADCAST # 广播模式消费者ID
# producerId: JEEPAY-GROUP
#日志配置参数。
# 当存在logback-spring.xml文件时 该配置将引进到logback配置 springboot配置不生效。
# 不存在logback-spring.xml 文件时, 使用springboot的配置 同样可用。

View File

@ -24,6 +24,8 @@ public class AliYunRocketMQFactory {
private String secretKey;
@Value("${aliyun-rocketmq.consumerId}")
private String consumerId;
@Value("${aliyun-rocketmq.broadcastConsumerId}")
private String broadcastConsumerId;
@Value("${aliyun-rocketmq.producerId}")
private String producerId;
@ -33,7 +35,7 @@ public class AliYunRocketMQFactory {
properties.put(PropertyKeyConst.ProducerId, producerId);
properties.put(PropertyKeyConst.AccessKey, accessKey);
properties.put(PropertyKeyConst.SecretKey, secretKey);
// 判断是否为空生产环境走k8s集群公共配置不获取本地配置文件的值
// 判断是否为空生产环境走k8s集群环境变量自动注入不获取本地配置文件的值
if (StringUtils.isNotEmpty(namesrvAddr)) {
properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
}
@ -46,7 +48,7 @@ public class AliYunRocketMQFactory {
properties.put(PropertyKeyConst.ConsumerId, consumerId);
properties.put(PropertyKeyConst.AccessKey, accessKey);
properties.put(PropertyKeyConst.SecretKey, secretKey);
// 判断是否为空生产环境走k8s集群公共配置不获取本地配置文件的值
// 判断是否为空生产环境走k8s集群环境变量自动注入不获取本地配置文件的值
if (StringUtils.isNotEmpty(namesrvAddr)) {
properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
}
@ -56,7 +58,7 @@ public class AliYunRocketMQFactory {
@Bean(name = "broadcastConsumerClient")
public Consumer broadcastConsumerClient() {
Properties properties = new Properties();
properties.put(PropertyKeyConst.ConsumerId, consumerId);
properties.put(PropertyKeyConst.ConsumerId, broadcastConsumerId);
properties.put(PropertyKeyConst.AccessKey, accessKey);
properties.put(PropertyKeyConst.SecretKey, secretKey);
// 广播订阅方式设置

View File

@ -39,29 +39,10 @@ import java.util.TreeMap;
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ALIYUN_ROCKET_MQ)
public class AliYunRocketMQSender implements IMQSender {
private static final List<Integer> DELAY_TIME_LEVEL = new ArrayList<>();
static {
// 预设值的延迟时间间隔为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
DELAY_TIME_LEVEL.add(1);
DELAY_TIME_LEVEL.add(5);
DELAY_TIME_LEVEL.add(10);
DELAY_TIME_LEVEL.add(30);
DELAY_TIME_LEVEL.add(60 * 1);
DELAY_TIME_LEVEL.add(60 * 2);
DELAY_TIME_LEVEL.add(60 * 3);
DELAY_TIME_LEVEL.add(60 * 4);
DELAY_TIME_LEVEL.add(60 * 5);
DELAY_TIME_LEVEL.add(60 * 6);
DELAY_TIME_LEVEL.add(60 * 7);
DELAY_TIME_LEVEL.add(60 * 8);
DELAY_TIME_LEVEL.add(60 * 9);
DELAY_TIME_LEVEL.add(60 * 10);
DELAY_TIME_LEVEL.add(60 * 20);
DELAY_TIME_LEVEL.add(60 * 30);
DELAY_TIME_LEVEL.add(60 * 60 * 1);
DELAY_TIME_LEVEL.add(60 * 60 * 2);
}
/** 最大延迟24小时 */
private static final int MAX_DELAY_TIME = 60 * 60 * 24;
/** 最小延迟1秒 */
private static final int MIN_DELAY_TIME = 1;
@Autowired
private AliYunRocketMQFactory aliYunRocketMQFactory;
@ -77,7 +58,7 @@ public class AliYunRocketMQSender implements IMQSender {
public void send(AbstractMQ mqModel, int delaySeconds) {
Message message = new Message(mqModel.getMQName(), AliYunRocketMQFactory.defaultTag, mqModel.toMessage().getBytes());
if (delaySeconds > 0) {
long delayTime = System.currentTimeMillis() + getNearDelayLevel(delaySeconds) * 1000;
long delayTime = System.currentTimeMillis() + delayTimeCorrector(delaySeconds) * 1000;
// 设置消息需要被投递的时间
message.setStartDeliverTime(delayTime);
}
@ -94,17 +75,16 @@ public class AliYunRocketMQSender implements IMQSender {
}
/**
* 获取最接近的节点值
* 检查延迟时间的有效性并返回校正后的延迟时间
**/
private int getNearDelayLevel(int delay) {
// 如果包含则直接返回
if (DELAY_TIME_LEVEL.contains(delay)) {
return DELAY_TIME_LEVEL.indexOf(delay) + 1;
private int delayTimeCorrector(int delay) {
if (delay < MIN_DELAY_TIME) {
return MIN_DELAY_TIME;
}
//两个时间的绝对值 - 位置
TreeMap<Integer, Integer> resultMap = new TreeMap<>();
DELAY_TIME_LEVEL.stream().forEach(time -> resultMap.put(Math.abs(delay - time), DELAY_TIME_LEVEL.indexOf(time) + 1));
return resultMap.firstEntry().getValue();
if (delay > MAX_DELAY_TIME) {
return MAX_DELAY_TIME;
}
return delay;
}