From 4161f0f53e86f60555c1eadb4f0006de559c5bfb Mon Sep 17 00:00:00 2001 From: pimh Date: Thu, 16 Dec 2021 20:41:28 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=98=BF=E9=87=8C=E4=BA=91Ro?= =?UTF-8?q?cketMQ=E9=85=8D=E7=BD=AE=E5=B1=9E=E6=80=A7=EF=BC=8C=E4=BF=AE?= =?UTF-8?q?=E5=A4=8DMessage=E6=97=A0=E6=B3=95=E6=B6=88=E8=B4=B9=E5=88=B0?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/devCommons/config/application.yml | 1 - conf/manager/application.yml | 3 +- conf/merchant/application.yml | 3 +- conf/payment/application.yml | 4 +- .../aliyunrocketmq/AliYunRocketMQFactory.java | 8 ++-- .../aliyunrocketmq/AliYunRocketMQSender.java | 46 ++++++------------- 6 files changed, 25 insertions(+), 40 deletions(-) diff --git a/conf/devCommons/config/application.yml b/conf/devCommons/config/application.yml index 3e1c119..c68b63c 100644 --- a/conf/devCommons/config/application.yml +++ b/conf/devCommons/config/application.yml @@ -81,7 +81,6 @@ spring: # namesrvAddr: 127.0.0.1:9876 # accessKey: accessKey # secretKey: secretKey -# consumerId: JEEPAY-GROUP # producerId: JEEPAY-GROUP #日志配置参数。 diff --git a/conf/manager/application.yml b/conf/manager/application.yml index 5072c24..bd39459 100644 --- a/conf/manager/application.yml +++ b/conf/manager/application.yml @@ -91,7 +91,8 @@ spring: # namesrvAddr: 127.0.0.1:9876 # accessKey: accessKey # secretKey: secretKey -# consumerId: JEEPAY-GROUP +# consumerId: JEEPAY-GROUP-MGR +# broadcastConsumerId: JEEPAY-GROUP-MGR-BROADCAST # 广播模式消费者ID # producerId: JEEPAY-GROUP #日志配置参数。 diff --git a/conf/merchant/application.yml b/conf/merchant/application.yml index 260e0ad..fb773b3 100644 --- a/conf/merchant/application.yml +++ b/conf/merchant/application.yml @@ -91,7 +91,8 @@ spring: # namesrvAddr: 127.0.0.1:9876 # accessKey: accessKey # secretKey: secretKey -# consumerId: JEEPAY-GROUP +# consumerId: JEEPAY-GROUP-MCH +# broadcastConsumerId: JEEPAY-GROUP-MCH-BROADCAST # 广播模式消费者ID # producerId: JEEPAY-GROUP #日志配置参数。 diff --git a/conf/payment/application.yml b/conf/payment/application.yml index ac56507..4f2bc8e 100644 --- a/conf/payment/application.yml +++ b/conf/payment/application.yml @@ -91,9 +91,11 @@ spring: # namesrvAddr: 127.0.0.1:9876 # accessKey: accessKey # secretKey: secretKey -# consumerId: JEEPAY-GROUP +# consumerId: JEEPAY-GROUP-PAY +# broadcastConsumerId: JEEPAY-GROUP-PAY-BROADCAST # 广播模式消费者ID # producerId: JEEPAY-GROUP + #日志配置参数。 # 当存在logback-spring.xml文件时: 该配置将引进到logback配置, springboot配置不生效。 # 不存在logback-spring.xml 文件时, 使用springboot的配置, 同样可用。 diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/aliyunrocketmq/AliYunRocketMQFactory.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/aliyunrocketmq/AliYunRocketMQFactory.java index ed5aae0..0c15db5 100644 --- a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/aliyunrocketmq/AliYunRocketMQFactory.java +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/aliyunrocketmq/AliYunRocketMQFactory.java @@ -24,6 +24,8 @@ public class AliYunRocketMQFactory { private String secretKey; @Value("${aliyun-rocketmq.consumerId}") private String consumerId; + @Value("${aliyun-rocketmq.broadcastConsumerId}") + private String broadcastConsumerId; @Value("${aliyun-rocketmq.producerId}") private String producerId; @@ -33,7 +35,7 @@ public class AliYunRocketMQFactory { properties.put(PropertyKeyConst.ProducerId, producerId); properties.put(PropertyKeyConst.AccessKey, accessKey); properties.put(PropertyKeyConst.SecretKey, secretKey); - // 判断是否为空(生产环境走k8s集群公共配置,不获取本地配置文件的值) + // 判断是否为空(生产环境走k8s集群环境变量自动注入,不获取本地配置文件的值) if (StringUtils.isNotEmpty(namesrvAddr)) { properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr); } @@ -46,7 +48,7 @@ public class AliYunRocketMQFactory { properties.put(PropertyKeyConst.ConsumerId, consumerId); properties.put(PropertyKeyConst.AccessKey, accessKey); properties.put(PropertyKeyConst.SecretKey, secretKey); - // 判断是否为空(生产环境走k8s集群公共配置,不获取本地配置文件的值) + // 判断是否为空(生产环境走k8s集群环境变量自动注入,不获取本地配置文件的值) if (StringUtils.isNotEmpty(namesrvAddr)) { properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr); } @@ -56,7 +58,7 @@ public class AliYunRocketMQFactory { @Bean(name = "broadcastConsumerClient") public Consumer broadcastConsumerClient() { Properties properties = new Properties(); - properties.put(PropertyKeyConst.ConsumerId, consumerId); + properties.put(PropertyKeyConst.ConsumerId, broadcastConsumerId); properties.put(PropertyKeyConst.AccessKey, accessKey); properties.put(PropertyKeyConst.SecretKey, secretKey); // 广播订阅方式设置 diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/aliyunrocketmq/AliYunRocketMQSender.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/aliyunrocketmq/AliYunRocketMQSender.java index d861372..e02759b 100644 --- a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/aliyunrocketmq/AliYunRocketMQSender.java +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/aliyunrocketmq/AliYunRocketMQSender.java @@ -39,29 +39,10 @@ import java.util.TreeMap; @ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ALIYUN_ROCKET_MQ) public class AliYunRocketMQSender implements IMQSender { - private static final List DELAY_TIME_LEVEL = new ArrayList<>(); - - static { - // 预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h - DELAY_TIME_LEVEL.add(1); - DELAY_TIME_LEVEL.add(5); - DELAY_TIME_LEVEL.add(10); - DELAY_TIME_LEVEL.add(30); - DELAY_TIME_LEVEL.add(60 * 1); - DELAY_TIME_LEVEL.add(60 * 2); - DELAY_TIME_LEVEL.add(60 * 3); - DELAY_TIME_LEVEL.add(60 * 4); - DELAY_TIME_LEVEL.add(60 * 5); - DELAY_TIME_LEVEL.add(60 * 6); - DELAY_TIME_LEVEL.add(60 * 7); - DELAY_TIME_LEVEL.add(60 * 8); - DELAY_TIME_LEVEL.add(60 * 9); - DELAY_TIME_LEVEL.add(60 * 10); - DELAY_TIME_LEVEL.add(60 * 20); - DELAY_TIME_LEVEL.add(60 * 30); - DELAY_TIME_LEVEL.add(60 * 60 * 1); - DELAY_TIME_LEVEL.add(60 * 60 * 2); - } + /** 最大延迟24小时 */ + private static final int MAX_DELAY_TIME = 60 * 60 * 24; + /** 最小延迟1秒 */ + private static final int MIN_DELAY_TIME = 1; @Autowired private AliYunRocketMQFactory aliYunRocketMQFactory; @@ -77,7 +58,7 @@ public class AliYunRocketMQSender implements IMQSender { public void send(AbstractMQ mqModel, int delaySeconds) { Message message = new Message(mqModel.getMQName(), AliYunRocketMQFactory.defaultTag, mqModel.toMessage().getBytes()); if (delaySeconds > 0) { - long delayTime = System.currentTimeMillis() + getNearDelayLevel(delaySeconds) * 1000; + long delayTime = System.currentTimeMillis() + delayTimeCorrector(delaySeconds) * 1000; // 设置消息需要被投递的时间。 message.setStartDeliverTime(delayTime); } @@ -94,17 +75,16 @@ public class AliYunRocketMQSender implements IMQSender { } /** - * 获取最接近的节点值 + * 检查延迟时间的有效性并返回校正后的延迟时间 **/ - private int getNearDelayLevel(int delay) { - // 如果包含则直接返回 - if (DELAY_TIME_LEVEL.contains(delay)) { - return DELAY_TIME_LEVEL.indexOf(delay) + 1; + private int delayTimeCorrector(int delay) { + if (delay < MIN_DELAY_TIME) { + return MIN_DELAY_TIME; } - //两个时间的绝对值 - 位置 - TreeMap resultMap = new TreeMap<>(); - DELAY_TIME_LEVEL.stream().forEach(time -> resultMap.put(Math.abs(delay - time), DELAY_TIME_LEVEL.indexOf(time) + 1)); - return resultMap.firstEntry().getValue(); + if (delay > MAX_DELAY_TIME) { + return MAX_DELAY_TIME; + } + return delay; }