支持阿里云RocketMQ商业版

(cherry picked from commit 4424aafed63e13be53934872c8d27e853bbc2be2)
This commit is contained in:
pimh 2021-12-14 22:11:12 +08:00
parent 21814d0365
commit 91d06832e9
15 changed files with 747 additions and 4 deletions

View File

@ -76,6 +76,14 @@ spring:
# producer:
# group: JEEPAY-GROUP
## 阿里云rocketmq配置 ( 注意aliyun-rocketmq配置项请放置到根目录 不是spring的二级配置 )
#aliyun-rocketmq:
# namesrvAddr: 127.0.0.1:9876
# accessKey: accessKey
# secretKey: secretKey
# consumerId: JEEPAY-GROUP
# producerId: JEEPAY-GROUP
#日志配置参数。
# 当存在logback-spring.xml文件时 该配置将引进到logback配置 springboot配置不生效。
# 不存在logback-spring.xml 文件时, 使用springboot的配置 同样可用。
@ -111,5 +119,5 @@ isys:
access-key-secret: SECRET_SECRET_SECRET #AccessKeySecret
mq:
vender: activeMQ # 切换MQ厂商 支持:【 activeMQ rabbitMQ rocketMQ 】, 需正确配置 【对应的yml参数】 和 【jeepay-components-mq项目下pom.xml中的依赖包】。
vender: activeMQ # 切换MQ厂商 支持:【 activeMQ rabbitMQ rocketMQ aliYunRocketMQ 】, 需正确配置 【对应的yml参数】 和 【jeepay-components-mq项目下pom.xml中的依赖包】。

View File

@ -86,6 +86,14 @@ spring:
# producer:
# group: JEEPAY-GROUP
## 阿里云rocketmq配置 ( 注意aliyun-rocketmq配置项请放置到根目录 不是spring的二级配置 )
#aliyun-rocketmq:
# namesrvAddr: 127.0.0.1:9876
# accessKey: accessKey
# secretKey: secretKey
# consumerId: JEEPAY-GROUP
# producerId: JEEPAY-GROUP
#日志配置参数。
# 当存在logback-spring.xml文件时 该配置将引进到logback配置 springboot配置不生效。
# 不存在logback-spring.xml 文件时, 使用springboot的配置 同样可用。
@ -124,5 +132,5 @@ isys:
access-key-secret: SECRET_SECRET_SECRET #AccessKeySecret
mq:
vender: activeMQ # 切换MQ厂商 支持:【 activeMQ rabbitMQ rocketMQ 】, 需正确配置 【对应的yml参数】 和 【jeepay-components-mq项目下pom.xml中的依赖包】。
vender: activeMQ # 切换MQ厂商 支持:【 activeMQ rabbitMQ rocketMQ aliYunRocketMQ 】, 需正确配置 【对应的yml参数】 和 【jeepay-components-mq项目下pom.xml中的依赖包】。

View File

@ -86,6 +86,14 @@ spring:
# producer:
# group: JEEPAY-GROUP
## 阿里云rocketmq配置 ( 注意aliyun-rocketmq配置项请放置到根目录 不是spring的二级配置 )
#aliyun-rocketmq:
# namesrvAddr: 127.0.0.1:9876
# accessKey: accessKey
# secretKey: secretKey
# consumerId: JEEPAY-GROUP
# producerId: JEEPAY-GROUP
#日志配置参数。
# 当存在logback-spring.xml文件时 该配置将引进到logback配置 springboot配置不生效。
# 不存在logback-spring.xml 文件时, 使用springboot的配置 同样可用。
@ -124,5 +132,5 @@ isys:
access-key-secret: SECRET_SECRET_SECRET #AccessKeySecret
mq:
vender: activeMQ # 切换MQ厂商 支持:【 activeMQ rabbitMQ rocketMQ 】, 需正确配置 【对应的yml参数】 和 【jeepay-components-mq项目下pom.xml中的依赖包】。
vender: activeMQ # 切换MQ厂商 支持:【 activeMQ rabbitMQ rocketMQ aliYunRocketMQ 】, 需正确配置 【对应的yml参数】 和 【jeepay-components-mq项目下pom.xml中的依赖包】。

View File

@ -86,6 +86,14 @@ spring:
# producer:
# group: JEEPAY-GROUP
## 阿里云rocketmq配置 ( 注意aliyun-rocketmq配置项请放置到根目录 不是spring的二级配置 )
#aliyun-rocketmq:
# namesrvAddr: 127.0.0.1:9876
# accessKey: accessKey
# secretKey: secretKey
# consumerId: JEEPAY-GROUP
# producerId: JEEPAY-GROUP
#日志配置参数。
# 当存在logback-spring.xml文件时 该配置将引进到logback配置 springboot配置不生效。
# 不存在logback-spring.xml 文件时, 使用springboot的配置 同样可用。
@ -122,5 +130,5 @@ isys:
access-key-secret: SECRET_SECRET_SECRET #AccessKeySecret
mq:
vender: activeMQ # 切换MQ厂商 支持:【 activeMQ rabbitMQ rocketMQ 】, 需正确配置 【对应的yml参数】 和 【jeepay-components-mq项目下pom.xml中的依赖包】。
vender: activeMQ # 切换MQ厂商 支持:【 activeMQ rabbitMQ rocketMQ aliYunRocketMQ 】, 需正确配置 【对应的yml参数】 和 【jeepay-components-mq项目下pom.xml中的依赖包】。

View File

@ -16,6 +16,10 @@
<version>Final</version>
</parent>
<properties>
<aliyun-openservices-ons-client.version>1.8.8.1.Final</aliyun-openservices-ons-client.version>
</properties>
<!-- 项目依赖声明 -->
<dependencies>
@ -66,6 +70,14 @@
<scope>provided</scope>
</dependency>
<!-- AliyunRocketMQ -->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>${aliyun-openservices-ons-client.version}</version>
<scope>provided</scope>
</dependency>
<!-- ↑↑↑↑↑↑ MQ依赖包 ↑↑↑↑↑↑ -->
</dependencies>

View File

@ -29,5 +29,6 @@ public class MQVenderCS {
public static final String ACTIVE_MQ = "activeMQ";
public static final String RABBIT_MQ = "rabbitMQ";
public static final String ROCKET_MQ = "rocketMQ";
public static final String ALIYUN_ROCKET_MQ = "aliYunRocketMQ";
}

View File

@ -0,0 +1,63 @@
package com.jeequan.jeepay.components.mq.vender.aliyunrocketmq;
import com.aliyun.openservices.ons.api.*;
import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum;
import com.jeequan.jeepay.components.mq.vender.IMQMsgReceiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
@Slf4j
public abstract class AbstractAliYunRocketMQReceiver implements IMQMsgReceiver, InitializingBean {
@Autowired
private AliYunRocketMQFactory aliYunRocketMQFactory;
/**
* 获取topic名称
*
* @return
*/
public abstract String getMQName();
/**
* 获取业务名称
*
* @return
*/
public abstract String getCusumerName();
/**
* 发送类型
*
* @return
*/
public MQSendTypeEnum getMQType() {
// QUEUE - 点对点 只有1个消费者可消费 ActiveMQ的queue模式
return MQSendTypeEnum.QUEUE;
}
@Override
public void afterPropertiesSet() throws Exception {
Consumer consumerClient = MQSendTypeEnum.BROADCAST.equals(getMQType()) ?
// 广播订阅模式
aliYunRocketMQFactory.broadcastConsumerClient() :
aliYunRocketMQFactory.consumerClient();
consumerClient.subscribe(this.getMQName(), AliYunRocketMQFactory.defaultTag, new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext context) {
try {
receiveMsg(new String(message.getBody()));
log.info("【{}】MQ消息消费成功topic:{}, messageId:{}", getCusumerName(), message.getTopic(), message.getMsgID());
return Action.CommitMessage;
} catch (Exception e) {
log.error("【{}】MQ消息消费失败topic:{}, messageId:{}", getCusumerName(), message.getTopic(), message.getMsgID(), e);
}
return Action.ReconsumeLater;
}
});
consumerClient.start();
log.info("初始化[{}]消费者topic: {},tag: {}成功", getCusumerName(), this.getMQName(), AliYunRocketMQFactory.defaultTag);
}
}

View File

@ -0,0 +1,68 @@
package com.jeequan.jeepay.components.mq.vender.aliyunrocketmq;
import com.aliyun.openservices.ons.api.*;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import java.util.Properties;
@Service
public class AliYunRocketMQFactory {
public static final String defaultTag = "Default";
@Value("${aliyun-rocketmq.namesrvAddr:}")
public String namesrvAddr;
@Value("${aliyun-rocketmq.accessKey}")
private String accessKey;
@Value("${aliyun-rocketmq.secretKey}")
private String secretKey;
@Value("${aliyun-rocketmq.consumerId}")
private String consumerId;
@Value("${aliyun-rocketmq.producerId}")
private String producerId;
@Bean(name = "producerClient")
public Producer producerClient() {
Properties properties = new Properties();
properties.put(PropertyKeyConst.ProducerId, producerId);
properties.put(PropertyKeyConst.AccessKey, accessKey);
properties.put(PropertyKeyConst.SecretKey, secretKey);
// 判断是否为空生产环境走k8s集群公共配置不获取本地配置文件的值
if (StringUtils.isNotEmpty(namesrvAddr)) {
properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
}
return ONSFactory.createProducer(properties);
}
@Bean(name = "consumerClient")
public Consumer consumerClient() {
Properties properties = new Properties();
properties.put(PropertyKeyConst.ConsumerId, consumerId);
properties.put(PropertyKeyConst.AccessKey, accessKey);
properties.put(PropertyKeyConst.SecretKey, secretKey);
// 判断是否为空生产环境走k8s集群公共配置不获取本地配置文件的值
if (StringUtils.isNotEmpty(namesrvAddr)) {
properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
}
return ONSFactory.createConsumer(properties);
}
@Bean(name = "broadcastConsumerClient")
public Consumer broadcastConsumerClient() {
Properties properties = new Properties();
properties.put(PropertyKeyConst.ConsumerId, consumerId);
properties.put(PropertyKeyConst.AccessKey, accessKey);
properties.put(PropertyKeyConst.SecretKey, secretKey);
// 广播订阅方式设置
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
// 判断是否为空生产环境走k8s集群环境变量自动注入不获取本地配置文件的值
if (StringUtils.isNotEmpty(namesrvAddr)) {
properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
}
return ONSFactory.createConsumer(properties);
}
}

View File

@ -0,0 +1,108 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.aliyunrocketmq;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.jeequan.jeepay.components.mq.model.AbstractMQ;
import com.jeequan.jeepay.components.mq.vender.IMQSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeMap;
/**
* 阿里云rocketMQ 消息发送器的实现
*/
@Slf4j
@Component
public class AliYunRocketMQSender implements IMQSender {
private static final List<Integer> DELAY_TIME_LEVEL = new ArrayList<>();
static {
// 预设值的延迟时间间隔为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
DELAY_TIME_LEVEL.add(1);
DELAY_TIME_LEVEL.add(5);
DELAY_TIME_LEVEL.add(10);
DELAY_TIME_LEVEL.add(30);
DELAY_TIME_LEVEL.add(60 * 1);
DELAY_TIME_LEVEL.add(60 * 2);
DELAY_TIME_LEVEL.add(60 * 3);
DELAY_TIME_LEVEL.add(60 * 4);
DELAY_TIME_LEVEL.add(60 * 5);
DELAY_TIME_LEVEL.add(60 * 6);
DELAY_TIME_LEVEL.add(60 * 7);
DELAY_TIME_LEVEL.add(60 * 8);
DELAY_TIME_LEVEL.add(60 * 9);
DELAY_TIME_LEVEL.add(60 * 10);
DELAY_TIME_LEVEL.add(60 * 20);
DELAY_TIME_LEVEL.add(60 * 30);
DELAY_TIME_LEVEL.add(60 * 60 * 1);
DELAY_TIME_LEVEL.add(60 * 60 * 2);
}
@Autowired
private AliYunRocketMQFactory aliYunRocketMQFactory;
private Producer producerClient;
@Override
public void send(AbstractMQ mqModel) {
Message message = new Message(mqModel.getMQName(), AliYunRocketMQFactory.defaultTag, mqModel.toMessage().getBytes());
sendMessage(message);
}
@Override
public void send(AbstractMQ mqModel, int delaySeconds) {
Message message = new Message(mqModel.getMQName(), AliYunRocketMQFactory.defaultTag, mqModel.toMessage().getBytes());
if (delaySeconds > 0) {
long delayTime = System.currentTimeMillis() + getNearDelayLevel(delaySeconds) * 1000;
// 设置消息需要被投递的时间
message.setStartDeliverTime(delayTime);
}
sendMessage(message);
}
private void sendMessage(Message message) {
if (producerClient == null) {
producerClient = aliYunRocketMQFactory.producerClient();
}
producerClient.start();
SendResult sendResult = producerClient.send(message);
log.info("消息队列推送返回结果:{}", JSONObject.toJSONString(sendResult));
}
/**
* 获取最接近的节点值
**/
private int getNearDelayLevel(int delay) {
// 如果包含则直接返回
if (DELAY_TIME_LEVEL.contains(delay)) {
return DELAY_TIME_LEVEL.indexOf(delay) + 1;
}
//两个时间的绝对值 - 位置
TreeMap<Integer, Integer> resultMap = new TreeMap<>();
DELAY_TIME_LEVEL.stream().forEach(time -> resultMap.put(Math.abs(delay - time), DELAY_TIME_LEVEL.indexOf(time) + 1));
return resultMap.firstEntry().getValue();
}
}

View File

@ -0,0 +1,72 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.receive;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import com.jeequan.jeepay.components.mq.executor.MqThreadExecutor;
import com.jeequan.jeepay.components.mq.model.CleanMchLoginAuthCacheMQ;
import com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.AbstractAliYunRocketMQReceiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* AliYunRocketMQ消息接收器仅在vender=AliYunRocketMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务 清除商户登录信息
*/
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ALIYUN_ROCKET_MQ)
@ConditionalOnBean(CleanMchLoginAuthCacheMQ.IMQReceiver.class)
@Slf4j
public class CleanMchLoginAuthCacheAliYunRocketMQReceiver extends AbstractAliYunRocketMQReceiver {
private static final String cusumerName = "清除商户登录消息";
@Autowired
private CleanMchLoginAuthCacheMQ.IMQReceiver mqReceiver;
/**
* 接收 queue 类型的消息
**/
@Override
@Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY)
public void receiveMsg(String msg) {
mqReceiver.receive(CleanMchLoginAuthCacheMQ.parse(msg));
}
/**
* 获取topic名称
*
* @return
*/
@Override
public String getMQName() {
return CleanMchLoginAuthCacheMQ.MQ_NAME;
}
/**
* 获取业务名称
*
* @return
*/
@Override
public String getCusumerName() {
return cusumerName;
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.receive;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import com.jeequan.jeepay.components.mq.model.PayOrderDivisionMQ;
import com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.AbstractAliYunRocketMQReceiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* AliYunRocketMQ消息接收器仅在vender=AliYunRocketMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务 支付订单分账通知
*/
@Slf4j
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ALIYUN_ROCKET_MQ)
@ConditionalOnBean(PayOrderDivisionMQ.IMQReceiver.class)
public class PayOrderDivisionAliYunRocketMQReceiver extends AbstractAliYunRocketMQReceiver {
private static final String cusumerName = "支付订单分账消息";
@Autowired
private PayOrderDivisionMQ.IMQReceiver mqReceiver;
/**
* 接收 queue 类型的消息
**/
@Override
public void receiveMsg(String msg) {
mqReceiver.receive(PayOrderDivisionMQ.parse(msg));
}
/**
* 获取topic名称
*
* @return
*/
@Override
public String getMQName() {
return PayOrderDivisionMQ.MQ_NAME;
}
/**
* 获取业务名称
*
* @return
*/
@Override
public String getCusumerName() {
return cusumerName;
}
}

View File

@ -0,0 +1,72 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.receive;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import com.jeequan.jeepay.components.mq.executor.MqThreadExecutor;
import com.jeequan.jeepay.components.mq.model.PayOrderMchNotifyMQ;
import com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.AbstractAliYunRocketMQReceiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* AliYunRocketMQ消息接收器仅在vender=AliYunRocketMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务 支付订单商户通知
*/
@Slf4j
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ALIYUN_ROCKET_MQ)
@ConditionalOnBean(PayOrderMchNotifyMQ.IMQReceiver.class)
public class PayOrderMchNotifyAliYunRocketMQReceiver extends AbstractAliYunRocketMQReceiver {
private static final String cusumerName = "支付订单商户消息";
@Autowired
private PayOrderMchNotifyMQ.IMQReceiver mqReceiver;
/**
* 接收 queue 类型的消息
**/
@Override
@Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY)
public void receiveMsg(String msg) {
mqReceiver.receive(PayOrderMchNotifyMQ.parse(msg));
}
/**
* 获取topic名称
*
* @return
*/
@Override
public String getMQName() {
return PayOrderMchNotifyMQ.MQ_NAME;
}
/**
* 获取业务名称
*
* @return
*/
@Override
public String getCusumerName() {
return cusumerName;
}
}

View File

@ -0,0 +1,71 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.receive;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import com.jeequan.jeepay.components.mq.executor.MqThreadExecutor;
import com.jeequan.jeepay.components.mq.model.PayOrderReissueMQ;
import com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.AbstractAliYunRocketMQReceiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* AliYunRocketMQ消息接收器仅在vender=AliYunRocketMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务 支付订单补单一般用于没有回调的接口比如微信的条码支付
*/
@Slf4j
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ALIYUN_ROCKET_MQ)
@ConditionalOnBean(PayOrderReissueMQ.IMQReceiver.class)
public class PayOrderReissueAliYunRocketMQReceiver extends AbstractAliYunRocketMQReceiver {
private static final String cusumerName = "支付订单补单消息";
@Autowired
private PayOrderReissueMQ.IMQReceiver mqReceiver;
/**
* 接收 queue 类型的消息
**/
@Override
@Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY)
public void receiveMsg(String msg) {
mqReceiver.receive(PayOrderReissueMQ.parse(msg));
}
/**
* 获取topic名称
*
* @return
*/
@Override
public String getMQName() {
return PayOrderReissueMQ.MQ_NAME;
}
/**
* 获取业务名称
*
* @return
*/
@Override
public String getCusumerName() {
return cusumerName;
}
}

View File

@ -0,0 +1,87 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.receive;
import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import com.jeequan.jeepay.components.mq.model.ResetAppConfigMQ;
import com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.AbstractAliYunRocketMQReceiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* AliYunRocketMQ消息接收器仅在vender=AliYunRocketMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务 更新系统配置参数
*/
@Slf4j
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ALIYUN_ROCKET_MQ)
@ConditionalOnBean(ResetAppConfigMQ.IMQReceiver.class)
public class ResetAppConfigAliYunRocketMQReceiver extends AbstractAliYunRocketMQReceiver {
private static final String cusumerName = "更新系统配置参数消息";
@Autowired
private ResetAppConfigMQ.IMQReceiver mqReceiver;
/**
* 接收 MQSendTypeEnum.BROADCAST 广播类型的消息
* <p>
* 注意
* AliYunRocketMQ的广播模式fanout交换机 --全部的Queue
* 如果queue包含多个消费者 例如manager和payment的监听器是名称相同的queue下的消费者Consumers 两个消费者是工作模式且存在竞争关系 导致只能一个来消费
* 解决
* 每个topic的QUEUE都声明一个FANOUT交换机 消费者声明一个系统产生的随机队列绑定到这个交换机上然后往交换机发消息只要绑定到这个交换机上都能收到消息
* 参考 https://bbs.csdn.net/topics/392509262?list=70088931
**/
@Override
public void receiveMsg(String msg) {
mqReceiver.receive(ResetAppConfigMQ.parse(msg));
}
/**
* 获取topic名称
*
* @return
*/
@Override
public String getMQName() {
return ResetAppConfigMQ.MQ_NAME;
}
/**
* 获取业务名称
*
* @return
*/
@Override
public String getCusumerName() {
return cusumerName;
}
/**
* 发送类型
*
* @return
*/
@Override
public MQSendTypeEnum getMQType() {
// RocketMQ的广播模式
return MQSendTypeEnum.BROADCAST;
}
}

View File

@ -0,0 +1,88 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.receive;
import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import com.jeequan.jeepay.components.mq.model.ResetIsvMchAppInfoConfigMQ;
import com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.AbstractAliYunRocketMQReceiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* AliYunRocketMQ消息接收器仅在vender=AliYunRocketMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务 更新服务商/商户/商户应用配置信息
*/
@Slf4j
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ALIYUN_ROCKET_MQ)
@ConditionalOnBean(ResetIsvMchAppInfoConfigMQ.IMQReceiver.class)
public class ResetIsvMchAppInfoAliYunRocketMQReceiver extends AbstractAliYunRocketMQReceiver {
private static final String cusumerName = "更新服务商/商户/商户应用配置信息消息";
@Autowired
private ResetIsvMchAppInfoConfigMQ.IMQReceiver mqReceiver;
/**
* 接收 MQSendTypeEnum.BROADCAST 广播类型的消息
* <p>
* 注意
* AliYunRocketMQ的广播模式fanout交换机 --全部的Queue
* 如果queue包含多个消费者 例如manager和payment的监听器是名称相同的queue下的消费者Consumers 两个消费者是工作模式且存在竞争关系 导致只能一个来消费
* 解决
* 每个topic的QUEUE都声明一个FANOUT交换机 消费者声明一个系统产生的随机队列绑定到这个交换机上然后往交换机发消息只要绑定到这个交换机上都能收到消息
* 参考 https://bbs.csdn.net/topics/392509262?list=70088931
**/
@Override
public void receiveMsg(String msg) {
mqReceiver.receive(ResetIsvMchAppInfoConfigMQ.parse(msg));
}
/**
* 获取topic名称
*
* @return
*/
@Override
public String getMQName() {
return ResetIsvMchAppInfoConfigMQ.MQ_NAME;
}
/**
* 获取业务名称
*
* @return
*/
@Override
public String getCusumerName() {
return cusumerName;
}
/**
* 发送类型
*
* @return
*/
@Override
public MQSendTypeEnum getMQType() {
// RocketMQ的广播模式
return MQSendTypeEnum.BROADCAST;
}
}