From c201d5f06cd233035b30a968560000b6932db3c6 Mon Sep 17 00:00:00 2001 From: terrfly Date: Tue, 27 Jul 2021 10:09:19 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=97=A7=E7=89=88MQ=E5=8F=91?= =?UTF-8?q?=E9=80=81=E5=92=8C=E7=9B=91=E5=90=AC=E6=96=B9=E5=BC=8F=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jeequan/jeepay/mgr/mq/ActiveMqSend.java | 171 ---------------- .../jeequan/jeepay/mgr/mq/RabbitMqSend.java | 106 ---------- .../jeequan/jeepay/mgr/mq/RocketMqSend.java | 116 ----------- .../jeepay/mgr/mq/config/JMSConfig.java | 46 ----- .../jeepay/mgr/mq/config/RabbitMqConfig.java | 114 ----------- .../jeequan/jeepay/mch/mq/ActiveMqSend.java | 70 ------- .../jeequan/jeepay/mch/mq/RabbitMqSend.java | 56 ------ .../jeequan/jeepay/mch/mq/RocketMqSend.java | 58 ------ .../mch/mq/activemq/queue/MqQueueReceive.java | 55 ----- .../mch/mq/activemq/topic/MqTopicReceive.java | 52 ----- .../jeepay/mch/mq/config/JMSConfig.java | 44 ---- .../jeepay/mch/mq/config/RabbitMqConfig.java | 59 ------ .../mch/mq/rabbitmq/RabbitMqReceive.java | 57 ------ .../mch/mq/receive/MqReceiveCommon.java | 70 ------- .../mch/mq/rocketmq/RocketMqReceive.java | 65 ------ .../jeepay/pay/mq/ActiveMqMessage.java | 166 ---------------- .../jeepay/pay/mq/RabbitMqMessage.java | 113 ----------- .../jeepay/pay/mq/RocketMqMessage.java | 145 -------------- .../pay/mq/activemq/topic/MqTopicReceive.java | 73 ------- .../jeepay/pay/mq/config/JMSConfig.java | 46 ----- .../pay/mq/config/MqThreadExecutor.java | 62 ------ .../jeepay/pay/mq/config/RabbitMqConfig.java | 69 ------- .../mq/rabbitmq/RabbitMqDirectReceive.java | 77 ------- .../pay/mq/receive/MqReceiveCommon.java | 188 ------------------ .../pay/mq/rocketmq/RocketMqReceive.java | 89 --------- 25 files changed, 2167 deletions(-) delete mode 100644 jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/ActiveMqSend.java delete mode 100644 jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/RabbitMqSend.java delete mode 100644 jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/RocketMqSend.java delete mode 100644 jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/config/JMSConfig.java delete mode 100644 jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/config/RabbitMqConfig.java delete mode 100644 jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/ActiveMqSend.java delete mode 100644 jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/RabbitMqSend.java delete mode 100644 jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/RocketMqSend.java delete mode 100644 jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/activemq/queue/MqQueueReceive.java delete mode 100644 jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/activemq/topic/MqTopicReceive.java delete mode 100644 jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/config/JMSConfig.java delete mode 100644 jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/config/RabbitMqConfig.java delete mode 100644 jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/rabbitmq/RabbitMqReceive.java delete mode 100644 jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/receive/MqReceiveCommon.java delete mode 100644 jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/rocketmq/RocketMqReceive.java delete mode 100644 jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/ActiveMqMessage.java delete mode 100644 jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/RabbitMqMessage.java delete mode 100644 jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/RocketMqMessage.java delete mode 100644 jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/activemq/topic/MqTopicReceive.java delete mode 100644 jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/config/JMSConfig.java delete mode 100644 jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/config/MqThreadExecutor.java delete mode 100644 jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/config/RabbitMqConfig.java delete mode 100644 jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/rabbitmq/RabbitMqDirectReceive.java delete mode 100644 jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/receive/MqReceiveCommon.java delete mode 100644 jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/rocketmq/RocketMqReceive.java diff --git a/jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/ActiveMqSend.java b/jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/ActiveMqSend.java deleted file mode 100644 index 1fb7ebd..0000000 --- a/jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/ActiveMqSend.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.mgr.mq; - -import com.jeequan.jeepay.core.constants.CS; -import com.jeequan.jeepay.core.mq.MqCommonService; -import com.jeequan.jeepay.service.impl.SysConfigService; -import lombok.extern.slf4j.Slf4j; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Lazy; -import org.springframework.context.annotation.Profile; -import org.springframework.jms.annotation.JmsListener; -import org.springframework.jms.core.JmsTemplate; -import org.springframework.stereotype.Component; - -import javax.jms.Queue; - -/** - * - * @author pangxiaoyu - * @site https://www.jeepay.vip - * @date 2021-06-07 07:15 - */ -@Slf4j -@Component -@Profile(CS.MQTYPE.ACTIVE_MQ) -public class ActiveMqSend extends MqCommonService { - - @Autowired private JmsTemplate jmsTemplate; - - @Autowired private SysConfigService sysConfigService; - - - @Bean("activeMqSendModifyMchUserRemove") - public Queue mqQueue4ModifyMchUserRemove(){ - return new ActiveMQQueue(CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE); - } - - @Lazy - @Autowired - @Qualifier("activeMqSendModifyMchUserRemove") - private Queue mqQueue4ModifyMchUserRemove; - - @Bean("activeMqSendPayOrderMchNotify") - public Queue mqQueue4PayOrderMchNotify(){ - return new ActiveMQQueue(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY); - } - - @Lazy - @Autowired - @Qualifier("activeMqSendPayOrderMchNotify") - private Queue mqQueue4PayOrderMchNotify; - - @Bean("activeMqSendModifyIsvInfo") - public ActiveMQTopic mqTopic4ModifyIsvInfo(){ - return new ActiveMQTopic(CS.MQ.TOPIC_MODIFY_ISV_INFO); - } - - @Lazy - @Autowired - @Qualifier("activeMqSendModifyIsvInfo") - private ActiveMQTopic mqTopic4ModifyIsvInfo; - - @Bean("activeMqSendModifyMchApp") - public ActiveMQTopic mqTopic4ModifyMchApp(){ - return new ActiveMQTopic(CS.MQ.TOPIC_MODIFY_MCH_APP); - } - - @Lazy - @Autowired - @Qualifier("activeMqSendModifyMchApp") - private ActiveMQTopic mqTopic4ModifyMchApp; - - - @Bean("activeMqSendModifyMchInfo") - public ActiveMQTopic mqTopic4ModifyMchInfo(){ - return new ActiveMQTopic(CS.MQ.TOPIC_MODIFY_MCH_INFO); - } - - @Lazy - @Autowired - @Qualifier("activeMqSendModifyMchInfo") - private ActiveMQTopic mqTopic4ModifyMchInfo; - - @Bean("activeMqSendModifySysConfig") - public ActiveMQTopic mqTopic4ModifySysConfig(){ - return new ActiveMQTopic(CS.MQ.TOPIC_MODIFY_SYS_CONFIG); - } - - @Lazy - @Autowired - @Qualifier("activeMqSendModifySysConfig") - private ActiveMQTopic mqTopic4ModifySysConfig; - - @Override - public void send(String msg, String sendType) { - if (sendType.equals(CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY)) { // 商户订单回调 - queuePayOrderMchNotify(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_ISV_INFO)) { // 服务商信息修改 - topicModifyIsvInfo(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_MCH_APP)) { // 商户应用修改 - topicModifyMchApp(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_MCH_INFO)) { // 商户信息修改 - topicModifyMchInfo(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_SYS_CONFIG)) { // 系统配置修改 - topicModifySysConfig(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_MCH_LOGIN_USER_REMOVE)) { // 商户登录用户清除信息 - queueMchLoginUserRemove(msg); - } - } - - @Override - public void send(String msg, long delay, String sendType) { - - } - - /** 发送商户订单回调消息 **/ - public void queuePayOrderMchNotify(String msg) { - this.jmsTemplate.convertAndSend(mqQueue4PayOrderMchNotify, msg); - } - - /** 发送服务商信息修改消息 **/ - public void topicModifyIsvInfo(String msg) { - this.jmsTemplate.convertAndSend(mqTopic4ModifyIsvInfo, msg); - } - - /** 发送商户应用修改消息 **/ - public void topicModifyMchApp(String msg) { - this.jmsTemplate.convertAndSend(mqTopic4ModifyMchApp, msg); - } - - /** 发送商户信息修改消息 **/ - public void topicModifyMchInfo(String msg) { - this.jmsTemplate.convertAndSend(mqTopic4ModifyMchInfo, msg); - } - - /** 发送系统配置修改消息 **/ - public void topicModifySysConfig(String msg) { - this.jmsTemplate.convertAndSend(mqTopic4ModifySysConfig, msg); - } - - /** 发送商户登录用户清除信息消息 **/ - public void queueMchLoginUserRemove(String msg) { this.jmsTemplate.convertAndSend(mqQueue4ModifyMchUserRemove, msg); } - - /** 接收 更新系统配置项的消息 **/ - @JmsListener(destination = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, containerFactory = "jmsListenerContainer") - public void receive(String msg) { - - log.info("成功接收更新系统配置的订阅通知, msg={}", msg); - sysConfigService.initDBConfig(msg); - log.info("系统配置静态属性已重置"); - } - -} diff --git a/jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/RabbitMqSend.java b/jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/RabbitMqSend.java deleted file mode 100644 index 0a080f0..0000000 --- a/jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/RabbitMqSend.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.mgr.mq; - -import com.jeequan.jeepay.core.constants.CS; -import com.jeequan.jeepay.core.mq.MqCommonService; -import com.jeequan.jeepay.service.impl.SysConfigService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.core.AmqpTemplate; -import org.springframework.amqp.rabbit.annotation.Exchange; -import org.springframework.amqp.rabbit.annotation.Queue; -import org.springframework.amqp.rabbit.annotation.QueueBinding; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Profile; -import org.springframework.stereotype.Component; - -/** - * - * @author pangxiaoyu - * @site https://www.jeepay.vip - * @date 2021-06-07 07:15 - */ -@Slf4j -@Component -@Profile(CS.MQTYPE.RABBIT_MQ) -public class RabbitMqSend extends MqCommonService { - - @Autowired private AmqpTemplate rabbitTemplate; - - @Autowired private SysConfigService sysConfigService; - - @Override - public void send(String msg, String sendType) { - if (sendType.equals(CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY)) { // 商户订单回调 - payOrderMchNotify(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_ISV_INFO)) { // 服务商信息修改 - directModifyIsvInfo(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_MCH_APP)) { // 商户应用修改 - directModifyMchApp(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_MCH_INFO)) { // 商户信息修改 - directModifyMchInfo(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_SYS_CONFIG)) { // 系统配置修改 - fanoutModifySysConfig(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_MCH_LOGIN_USER_REMOVE)) { // 商户登录用户清除信息 - directMchLoginUserRemove(msg); - } - } - - @Override - public void send(String msg, long delay, String sendType) { - - } - - /** 发送商户订单回调消息 **/ - public void payOrderMchNotify(String msg) { - rabbitTemplate.convertAndSend(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY, msg); - } - - /** 发送服务商信息修改消息 **/ - public void directModifyIsvInfo(String msg) { - rabbitTemplate.convertAndSend(CS.DIRECT_EXCHANGE, CS.MQ.TOPIC_MODIFY_ISV_INFO, msg); - } - - /** 发送商户应用修改消息 **/ - public void directModifyMchApp(String msg) { - rabbitTemplate.convertAndSend(CS.DIRECT_EXCHANGE, CS.MQ.TOPIC_MODIFY_MCH_APP, msg); - } - - /** 发送商户信息修改消息 **/ - public void directModifyMchInfo(String msg) { - rabbitTemplate.convertAndSend(CS.DIRECT_EXCHANGE, CS.MQ.TOPIC_MODIFY_MCH_INFO, msg); - } - - /** 发送系统配置修改消息 **/ - public void fanoutModifySysConfig(String msg) { - this.rabbitTemplate.convertAndSend(CS.FANOUT_EXCHANGE_SYS_CONFIG, CS.MQ.FANOUT_MODIFY_SYS_CONFIG, msg); - } - - /** 发送商户登录用户清除信息消息 **/ - public void directMchLoginUserRemove(String msg) { - this.rabbitTemplate.convertAndSend(CS.DIRECT_EXCHANGE, CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE, msg); - } - - /** 接收 更新系统配置项的消息 **/ - @RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name = CS.FANOUT_EXCHANGE_SYS_CONFIG,type = "fanout"))}) - public void receive(String msg) { - log.info("成功接收更新系统配置的订阅通知, msg={}", msg); - sysConfigService.initDBConfig(msg); - log.info("系统配置静态属性已重置"); - } - -} diff --git a/jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/RocketMqSend.java b/jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/RocketMqSend.java deleted file mode 100644 index f67ea40..0000000 --- a/jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/RocketMqSend.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.mgr.mq; - -import com.jeequan.jeepay.core.constants.CS; -import com.jeequan.jeepay.core.mq.MqCommonService; -import com.jeequan.jeepay.service.impl.SysConfigService; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.annotation.MessageModel; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; -import org.apache.rocketmq.spring.core.RocketMQTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Profile; -import org.springframework.stereotype.Component; -import org.springframework.stereotype.Service; - -/** - * - * @author pangxiaoyu - * @site https://www.jeepay.vip - * @date 2021-06-07 07:15 - */ -@Slf4j -@Component -@Profile(CS.MQTYPE.ROCKET_MQ) -public class RocketMqSend extends MqCommonService { - - @Autowired private RocketMQTemplate rocketMQTemplate; - - @Autowired private SysConfigService sysConfigService; - - @Override - public void send(String msg, String sendType) { - if (sendType.equals(CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY)) { // 商户订单回调 - payOrderMchNotify(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_ISV_INFO)) { // 服务商信息修改 - modifyIsvInfo(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_MCH_APP)) { // 商户应用修改 - modifyMchApp(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_MCH_INFO)) { // 商户信息修改 - modifyMchInfo(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_SYS_CONFIG)) { // 系统配置修改 - modifySysConfig(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_MCH_LOGIN_USER_REMOVE)) { // 商户登录用户清除信息 - mchLoginUserRemove(msg); - } - } - - @Override - public void send(String msg, long delay, String sendType) { - - } - - /** 发送商户订单回调消息 **/ - public void payOrderMchNotify(String msg) { - sendMsg(msg, CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY); - } - - /** 发送服务商信息修改消息 **/ - public void modifyIsvInfo(String msg) { - sendMsg(msg, CS.MQ.TOPIC_MODIFY_ISV_INFO); - } - - /** 发送商户应用修改消息 **/ - public void modifyMchApp(String msg) { - sendMsg(msg, CS.MQ.TOPIC_MODIFY_MCH_APP); - } - - /** 发送商户信息修改消息 **/ - public void modifyMchInfo(String msg) { - sendMsg(msg, CS.MQ.TOPIC_MODIFY_MCH_INFO); - } - - /** 发送系统配置修改消息 **/ - public void modifySysConfig(String msg) { - sendMsg(msg, CS.MQ.TOPIC_MODIFY_SYS_CONFIG); - } - - /** 发送商户登录用户清除信息消息 **/ - public void mchLoginUserRemove(String msg) { - sendMsg(msg, CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE); - } - - public void sendMsg(String msg, String group) { - // 这里的分组和消息名称未做区分 - rocketMQTemplate.getProducer().setProducerGroup(group); - this.rocketMQTemplate.convertAndSend(group, msg); - } - - /** 接收 更新系统配置项的消息 **/ - @Service - @RocketMQMessageListener(topic = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, consumerGroup = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, messageModel = MessageModel.BROADCASTING) - class RocketMqReceive implements RocketMQListener { - @Override - public void onMessage(String msg) { - log.info("成功接收更新系统配置的订阅通知, msg={}", msg); - sysConfigService.initDBConfig(msg); - log.info("系统配置静态属性已重置"); - } - } - -} diff --git a/jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/config/JMSConfig.java b/jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/config/JMSConfig.java deleted file mode 100644 index e32ccaa..0000000 --- a/jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/config/JMSConfig.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.mgr.mq.config; - -import com.jeequan.jeepay.core.constants.CS; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Profile; -import org.springframework.jms.config.DefaultJmsListenerContainerFactory; -import org.springframework.jms.config.JmsListenerContainerFactory; -import org.springframework.stereotype.Component; - -import javax.jms.ConnectionFactory; - -/* -* JMS消息配置项 -* -* @author terrfly -* @site https://www.jeepay.vip -* @date 2021/6/8 17:10 -*/ -@Component -@Profile(CS.MQTYPE.ACTIVE_MQ) -public class JMSConfig { - - /** 新增jmsListenerContainer, 用于接收topic类型的消息 **/ - @Bean - public JmsListenerContainerFactory jmsListenerContainer(ConnectionFactory factory){ - DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); - bean.setPubSubDomain(true); - bean.setConnectionFactory(factory); - return bean; - } -} diff --git a/jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/config/RabbitMqConfig.java b/jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/config/RabbitMqConfig.java deleted file mode 100644 index bff611b..0000000 --- a/jeepay-manager/src/main/java/com/jeequan/jeepay/mgr/mq/config/RabbitMqConfig.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.mgr.mq.config; - -import com.jeequan.jeepay.core.constants.CS; -import org.springframework.amqp.core.*; -import org.springframework.amqp.rabbit.annotation.EnableRabbit; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Profile; - -/** - * RabbitMq - * 队列交换机注册 - * @author xiaoyu - * @site https://www.jeepay.vip - * @date 2021/6/25 17:10 - */ -@Configuration -@EnableRabbit -@Profile(CS.MQTYPE.RABBIT_MQ) -public class RabbitMqConfig { - - @Bean("modifyIsvInfo") - public Queue modifyIsvInfo() { return new Queue(CS.MQ.TOPIC_MODIFY_ISV_INFO,true); } - - @Bean("modifyMchApp") - public Queue modifyMchApp() { - return new Queue(CS.MQ.TOPIC_MODIFY_MCH_APP,true); - } - - @Bean("modifyMchInfo") - public Queue modifyMchInfo() { - return new Queue(CS.MQ.TOPIC_MODIFY_MCH_INFO,true); - } - - @Bean("modifySysConfig") - public Queue modifySysConfig() { - return new Queue(CS.MQ.FANOUT_MODIFY_SYS_CONFIG,true); - } - - @Bean("payOrderMchNotify") - public Queue payOrderMchNotify() { - return new Queue(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY,true); - } - - @Bean("mchUserRemove") - public Queue mchUserRemove() { - return new Queue(CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE,true); - } - - //创建 fanout 交换机 - @Bean("fanoutExchange") - FanoutExchange fanoutExchange() { - return new FanoutExchange(CS.FANOUT_EXCHANGE_SYS_CONFIG,true,false); - } - - //创建 direct 交换机 - @Bean("directExchange") - DirectExchange directExchange() { - return new DirectExchange(CS.DIRECT_EXCHANGE,true,false); - } - - //绑定 将队列和交换机绑定, 并设置用于匹配键:TOPIC_MODIFY_ISV_INFO - @Bean - Binding bindingIsvInfo(@Qualifier("modifyIsvInfo") Queue modifyIsvInfo, @Qualifier("directExchange") DirectExchange directExchange) { - return BindingBuilder.bind(modifyIsvInfo).to(directExchange).with(CS.MQ.TOPIC_MODIFY_ISV_INFO); - } - - //绑定 将队列和交换机绑定, 并设置用于匹配键:TOPIC_MODIFY_MCH_APP - @Bean - Binding bindingMchApp(@Qualifier("modifyMchApp") Queue modifyMchApp, @Qualifier("directExchange") DirectExchange directExchange) { - return BindingBuilder.bind(modifyMchApp).to(directExchange).with(CS.MQ.TOPIC_MODIFY_MCH_APP); - } - - //绑定 将队列和交换机绑定, 并设置用于匹配键:TOPIC_MODIFY_MCH_INFO - @Bean - Binding bindingMchInfo(@Qualifier("modifyMchInfo") Queue modifyMchInfo, @Qualifier("directExchange") DirectExchange directExchange) { - return BindingBuilder.bind(modifyMchInfo).to(directExchange).with(CS.MQ.TOPIC_MODIFY_MCH_INFO); - } - - //绑定 将队列和交换机绑定 - @Bean - Binding bindingSysConfig(Queue modifySysConfig, FanoutExchange fanoutExchange) { - return BindingBuilder.bind(modifySysConfig).to(fanoutExchange); - } - - //绑定 将队列和交换机绑定, 并设置用于匹配键:QUEUE_PAYORDER_MCH_NOTIFY - @Bean - Binding bindingPayOrderMchNotify(@Qualifier("payOrderMchNotify") Queue payOrderMchNotify, @Qualifier("directExchange") DirectExchange directExchange) { - return BindingBuilder.bind(payOrderMchNotify).to(directExchange).with(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY); - } - - //绑定 将队列和交换机绑定, 并设置用于匹配键:QUEUE_MODIFY_MCH_USER_REMOVE - @Bean - Binding bindingMchUserRemove(@Qualifier("mchUserRemove") Queue mchUserRemove, @Qualifier("directExchange") DirectExchange directExchange) { - return BindingBuilder.bind(mchUserRemove).to(directExchange).with(CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE); - } - -} diff --git a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/ActiveMqSend.java b/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/ActiveMqSend.java deleted file mode 100644 index 0926e9e..0000000 --- a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/ActiveMqSend.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.mch.mq; - -import com.jeequan.jeepay.core.constants.CS; -import com.jeequan.jeepay.core.mq.MqCommonService; -import lombok.extern.slf4j.Slf4j; -import org.apache.activemq.command.ActiveMQTopic; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Lazy; -import org.springframework.context.annotation.Profile; -import org.springframework.jms.core.JmsTemplate; -import org.springframework.stereotype.Component; - -/* -* @author terrfly -* @site https://www.jeepay.vip -* @date 2021/6/8 17:10 -*/ -@Slf4j -@Component -@Profile(CS.MQTYPE.ACTIVE_MQ) -public class ActiveMqSend extends MqCommonService { - - @Autowired private JmsTemplate jmsTemplate; - - @Bean("activeMqSendModifyMchApp") - public ActiveMQTopic mqTopic4ModifyMchApp(){ - return new ActiveMQTopic(CS.MQ.TOPIC_MODIFY_MCH_APP); - } - - @Lazy - @Autowired - @Qualifier("activeMqSendModifyMchApp") - private ActiveMQTopic mqTopic4ModifyMchApp; - - /** 推送消息到各个节点 **/ - @Override - public void send(String msg, String sendType) { - if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_MCH_APP)) { // 商户应用修改 - topicModifyMchApp(msg); - } - } - - @Override - public void send(String msg, long delay, String sendType) { - - } - - /** 发送商户应用修改信息 **/ - public void topicModifyMchApp(String msg) { - this.jmsTemplate.convertAndSend(mqTopic4ModifyMchApp, msg); - } - -} diff --git a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/RabbitMqSend.java b/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/RabbitMqSend.java deleted file mode 100644 index cf7c7e8..0000000 --- a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/RabbitMqSend.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.mch.mq; - -import com.jeequan.jeepay.core.constants.CS; -import com.jeequan.jeepay.core.mq.MqCommonService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Profile; -import org.springframework.stereotype.Component; - -/** - * RabbitMq - * @author xiaoyu - * @site https://www.jeepay.vip - * @date 2021/6/25 17:10 - */ -@Slf4j -@Component -@Profile(CS.MQTYPE.RABBIT_MQ) -public class RabbitMqSend extends MqCommonService { - - @Autowired private RabbitTemplate rabbitTemplate; - - /** 推送消息 **/ - @Override - public void send(String msg, String sendType) { - if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_MCH_APP)) { // 商户应用修改 - directModifyMchApp(msg); - } - } - - @Override - public void send(String msg, long delay, String sendType) { - - } - - /** 发送商户应用修改信息 **/ - public void directModifyMchApp(String msg) { - rabbitTemplate.convertAndSend(CS.DIRECT_EXCHANGE, CS.MQ.TOPIC_MODIFY_MCH_APP, msg); - } -} diff --git a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/RocketMqSend.java b/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/RocketMqSend.java deleted file mode 100644 index f80395f..0000000 --- a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/RocketMqSend.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.mch.mq; - -import com.jeequan.jeepay.core.constants.CS; -import com.jeequan.jeepay.core.mq.MqCommonService; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.core.RocketMQTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Profile; -import org.springframework.stereotype.Component; - -/** - * RocketMQ - * @author xiaoyu - * @site https://www.jeepay.vip - * @date 2021/6/25 17:10 - */ -@Slf4j -@Component -@Profile(CS.MQTYPE.ROCKET_MQ) -public class RocketMqSend extends MqCommonService { - - @Autowired private RocketMQTemplate rocketMQTemplate; - - /** 推送消息到各个节点 **/ - @Override - public void send(String msg, String sendType) { - if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_MCH_APP)) { // 商户应用修改 - topicModifyMchApp(msg); - } - } - - @Override - public void send(String msg, long delay, String sendType) { - - } - - /** 发送商户应用修改信息 **/ - public void topicModifyMchApp(String msg) { - rocketMQTemplate.getProducer().setProducerGroup(CS.MQ.TOPIC_MODIFY_MCH_APP); - rocketMQTemplate.convertAndSend(CS.MQ.TOPIC_MODIFY_MCH_APP, msg); - } - -} diff --git a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/activemq/queue/MqQueueReceive.java b/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/activemq/queue/MqQueueReceive.java deleted file mode 100644 index 4302fc6..0000000 --- a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/activemq/queue/MqQueueReceive.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.mch.mq.activemq.queue; - -import com.jeequan.jeepay.core.constants.CS; -import com.jeequan.jeepay.mch.mq.receive.MqReceiveCommon; -import lombok.extern.slf4j.Slf4j; -import org.apache.activemq.command.ActiveMQQueue; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Profile; -import org.springframework.jms.annotation.JmsListener; -import org.springframework.stereotype.Component; - -/** - * 商户用户登录信息清除 - * - * @author pangxiaoyu - * @site https://www.jeepay.vip - * @date 2021-04-27 15:50 - */ -@Slf4j -@Component -@Profile(CS.MQTYPE.ACTIVE_MQ) -public class MqQueueReceive extends ActiveMQQueue { - - @Autowired private MqReceiveCommon mqReceiveCommon; - - public MqQueueReceive(){ - super(CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE); - } - - /** - * @author: pangxiaoyu - * @date: 2021/6/7 16:17 - * @describe: 接收 商户用户登录信息清除消息 - */ - @JmsListener(destination = CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE) - public void receive(String userIdStr) { - mqReceiveCommon.removeMchUser(userIdStr); - } - -} diff --git a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/activemq/topic/MqTopicReceive.java b/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/activemq/topic/MqTopicReceive.java deleted file mode 100644 index ee7fa18..0000000 --- a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/activemq/topic/MqTopicReceive.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.mch.mq.activemq.topic; - -import com.jeequan.jeepay.core.constants.CS; -import com.jeequan.jeepay.mch.mq.receive.MqReceiveCommon; -import lombok.extern.slf4j.Slf4j; -import org.apache.activemq.command.ActiveMQTopic; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Profile; -import org.springframework.jms.annotation.JmsListener; -import org.springframework.stereotype.Component; - -/** - * 更新系统配置mq - * - * @author terrfly - * @modify zhuxiao - * @site https://www.jeepay.vip - * @date 2021-04-27 15:50 - */ -@Slf4j -@Component -@Profile(CS.MQTYPE.ACTIVE_MQ) -public class MqTopicReceive extends ActiveMQTopic{ - - @Autowired private MqReceiveCommon mqReceiveCommon; - - public MqTopicReceive(){ - super(CS.MQ.TOPIC_MODIFY_SYS_CONFIG); - } - - /** 接收 更新系统配置项的消息 **/ - @JmsListener(destination = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, containerFactory = "jmsListenerContainer") - public void receive(String msg) { - mqReceiveCommon.initDbConfig(msg); - } - -} diff --git a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/config/JMSConfig.java b/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/config/JMSConfig.java deleted file mode 100644 index b623c25..0000000 --- a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/config/JMSConfig.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.mch.mq.config; - -import org.springframework.context.annotation.Bean; -import org.springframework.jms.config.DefaultJmsListenerContainerFactory; -import org.springframework.jms.config.JmsListenerContainerFactory; -import org.springframework.stereotype.Component; - -import javax.jms.ConnectionFactory; - -/** - * JMS消息配置项 - * - * @author terrfly - * @modify zhuxiao - * @site https://www.jeepay.vip - * @date 2021-04-27 15:50 - */ -@Component -public class JMSConfig { - - /** 新增jmsListenerContainer, 用于接收topic类型的消息 **/ - @Bean - public JmsListenerContainerFactory jmsListenerContainer(ConnectionFactory factory){ - DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); - bean.setPubSubDomain(true); - bean.setConnectionFactory(factory); - return bean; - } -} diff --git a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/config/RabbitMqConfig.java b/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/config/RabbitMqConfig.java deleted file mode 100644 index c41e493..0000000 --- a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/config/RabbitMqConfig.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.mch.mq.config; - -import com.jeequan.jeepay.core.constants.CS; -import org.springframework.amqp.core.Binding; -import org.springframework.amqp.core.BindingBuilder; -import org.springframework.amqp.core.DirectExchange; -import org.springframework.amqp.core.Queue; -import org.springframework.amqp.rabbit.annotation.EnableRabbit; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Profile; - -/** - * RabbitMq - * 队列交换机注册 - * @author xiaoyu - * @site https://www.jeepay.vip - * @date 2021/6/25 17:10 - */ -@Profile(CS.MQTYPE.RABBIT_MQ) -@Configuration -@EnableRabbit -public class RabbitMqConfig { - - - @Bean("modifyMchApp") - public Queue modifyMchApp() { - return new Queue(CS.MQ.TOPIC_MODIFY_MCH_APP,true); - } - - //创建 direct 交换机 - @Bean("directExchange") - DirectExchange directExchange() { - return new DirectExchange(CS.DIRECT_EXCHANGE,true,false); - } - - //绑定 将队列和交换机绑定, 并设置用于匹配键:TOPIC_MODIFY_MCH_APP - @Bean - Binding bindingMchApp(@Qualifier("modifyMchApp") Queue modifyMchApp, @Qualifier("directExchange") DirectExchange directExchange) { - return BindingBuilder.bind(modifyMchApp).to(directExchange).with(CS.MQ.TOPIC_MODIFY_MCH_APP); - } - -} diff --git a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/rabbitmq/RabbitMqReceive.java b/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/rabbitmq/RabbitMqReceive.java deleted file mode 100644 index 21f9a2d..0000000 --- a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/rabbitmq/RabbitMqReceive.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.mch.mq.rabbitmq; - -import com.jeequan.jeepay.core.constants.CS; -import com.jeequan.jeepay.mch.mq.receive.MqReceiveCommon; -import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.rabbit.annotation.Exchange; -import org.springframework.amqp.rabbit.annotation.Queue; -import org.springframework.amqp.rabbit.annotation.QueueBinding; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Profile; -import org.springframework.stereotype.Component; - -/** - * 消息接收 - * @author pangxiaoyu - * @site https://www.jeepay.vip - * @date 2021-04-27 15:50 - */ -@Slf4j -@Component -@Profile(CS.MQTYPE.RABBIT_MQ) -public class RabbitMqReceive { - - @Autowired private MqReceiveCommon mqReceiveCommon; - - /** - * @author: pangxiaoyu - * @date: 2021/6/7 16:17 - * @describe: 接收 商户用户登录信息清除消息 - */ - @RabbitListener(queues = CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE) - public void receiveRemoveMchUser(String userIdStr) { - mqReceiveCommon.removeMchUser(userIdStr); - } - - /** 接收 更新系统配置项的消息 **/ - @RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name = CS.FANOUT_EXCHANGE_SYS_CONFIG,type = "fanout"))}) - public void receiveInitDbConfig(String msg) { - mqReceiveCommon.initDbConfig(msg); - } -} diff --git a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/receive/MqReceiveCommon.java b/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/receive/MqReceiveCommon.java deleted file mode 100644 index 3d237ac..0000000 --- a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/receive/MqReceiveCommon.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.mch.mq.receive; - -import com.alibaba.fastjson.JSONArray; -import com.jeequan.jeepay.core.cache.RedisUtil; -import com.jeequan.jeepay.core.constants.CS; -import com.jeequan.jeepay.service.impl.SysConfigService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import java.util.Collection; -import java.util.List; - -/** - * 处理公共接收消息方法 - * - * @author xiaoyu - * @site https://www.jeepay.vip - * @date 2021/6/25 17:10 - */ -@Slf4j -@Service -public class MqReceiveCommon { - - @Autowired private SysConfigService sysConfigService; - - public void removeMchUser(String userIdStr) { - log.info("成功接收删除商户用户登录的订阅通知, msg={}", userIdStr); - // 字符串转List - List userIdList = JSONArray.parseArray(userIdStr, Long.class); - // 删除redis用户缓存 - if(userIdList == null || userIdList.isEmpty()){ - log.info("用户ID为空"); - return ; - } - for (Long sysUserId : userIdList) { - Collection cacheKeyList = RedisUtil.keys(CS.getCacheKeyToken(sysUserId, "*")); - if(cacheKeyList == null || cacheKeyList.isEmpty()){ - continue; - } - for (String cacheKey : cacheKeyList) { - // 删除用户Redis信息 - RedisUtil.del(cacheKey); - continue; - } - } - log.info("无权限登录用户信息已清除"); - } - - public void initDbConfig(String msg) { - log.info("成功接收更新系统配置的订阅通知, msg={}", msg); - sysConfigService.initDBConfig(msg); - log.info("系统配置静态属性已重置"); - } -} diff --git a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/rocketmq/RocketMqReceive.java b/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/rocketmq/RocketMqReceive.java deleted file mode 100644 index 989bac6..0000000 --- a/jeepay-merchant/src/main/java/com/jeequan/jeepay/mch/mq/rocketmq/RocketMqReceive.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.mch.mq.rocketmq; - -import com.jeequan.jeepay.core.constants.CS; -import com.jeequan.jeepay.mch.mq.receive.MqReceiveCommon; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.annotation.MessageModel; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Profile; -import org.springframework.stereotype.Component; -import org.springframework.stereotype.Service; - -/** - * 消息接收 - * @author pangxiaoyu - * @site https://www.jeepay.vip - * @date 2021-04-27 15:50 - */ -@Slf4j -@Component -@Profile(CS.MQTYPE.ROCKET_MQ) -public class RocketMqReceive { - - @Autowired private MqReceiveCommon mqReceiveCommon; - - /** - * @author: pangxiaoyu - * @date: 2021/6/7 16:17 - * @describe: 接收 商户用户登录信息清除消息 - */ - @Service - @RocketMQMessageListener(topic = CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE, consumerGroup = CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE) - class receiveRemoveMchUser implements RocketMQListener { - @Override - public void onMessage(String userIdStr) { - mqReceiveCommon.removeMchUser(userIdStr); - } - } - - /** 接收 更新系统配置项的消息 **/ - @Service - @RocketMQMessageListener(topic = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, consumerGroup = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, messageModel = MessageModel.BROADCASTING) - class receiveInitDbConfig implements RocketMQListener { - @Override - public void onMessage(String msg) { - mqReceiveCommon.initDbConfig(msg); - } - } -} diff --git a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/ActiveMqMessage.java b/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/ActiveMqMessage.java deleted file mode 100644 index 9b27def..0000000 --- a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/ActiveMqMessage.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.pay.mq; - -import com.jeequan.jeepay.core.constants.CS; -import com.jeequan.jeepay.core.mq.MqCommonService; -import com.jeequan.jeepay.pay.mq.config.MqThreadExecutor; -import com.jeequan.jeepay.pay.mq.receive.MqReceiveCommon; -import lombok.extern.slf4j.Slf4j; -import org.apache.activemq.ScheduledMessage; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Lazy; -import org.springframework.context.annotation.Profile; -import org.springframework.jms.annotation.JmsListener; -import org.springframework.jms.core.JmsTemplate; -import org.springframework.scheduling.annotation.Async; -import org.springframework.stereotype.Component; - -import javax.jms.Queue; -import javax.jms.TextMessage; - -/* - * 上游渠道订单轮询查单 - * 如:微信的条码支付,没有回调接口, 需要轮询查单完成交易结果通知。 - * -* -* @author terrfly -* @site https://www.jeepay.vip -* @date 2021/6/8 17:30 -*/ -@Slf4j -@Component -@Profile(CS.MQTYPE.ACTIVE_MQ) -public class ActiveMqMessage extends MqCommonService { - - @Autowired private JmsTemplate jmsTemplate; - - @Lazy - @Autowired - private MqReceiveCommon mqReceiveCommon; - - @Bean("activeChannelOrderQuery") - public Queue mqQueue4ChannelOrderQuery(){ - return new ActiveMQQueue(CS.MQ.QUEUE_CHANNEL_ORDER_QUERY); - } - - @Lazy - @Autowired - @Qualifier("activeChannelOrderQuery") - private Queue mqQueue4ChannelOrderQuery; - - @Bean("activePayOrderMchNotifyInner") - public Queue mqQueue4PayOrderMchNotifyInner(){ - return new ActiveMQQueue(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY); - } - - @Lazy - @Autowired - @Qualifier("activePayOrderMchNotifyInner") - private Queue mqQueue4PayOrderMchNotifyInner; - - @Bean("activeMqSendModifyMchApp") - public ActiveMQTopic mqTopic4ModifyMchApp(){ - return new ActiveMQTopic(CS.MQ.TOPIC_MODIFY_MCH_APP); - } - - @Lazy - @Autowired - @Qualifier("activeMqSendModifyMchApp") - private ActiveMQTopic mqTopic4ModifyMchApp; - - /** - * 发送消息 - * @param msg - * @param sendType - */ - @Override - public void send(String msg, String sendType) { - if (sendType.equals(CS.MQ.MQ_TYPE_CHANNEL_ORDER_QUERY)) { - channelOrderQuery(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY)) { - payOrderMchNotify(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_MCH_APP)) { // 商户应用修改 - this.jmsTemplate.convertAndSend(mqTopic4ModifyMchApp, msg); - } - } - - /** - * 发送延迟消息 - * @param msg - * @param delay - * @param sendType - */ - @Override - public void send(String msg, long delay, String sendType) { - if (sendType.equals(CS.MQ.MQ_TYPE_CHANNEL_ORDER_QUERY)) { - channelOrderQueryFixed(msg, delay); - }else if (sendType.equals(CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY)) { - payOrderMchNotifyFixed(msg, delay); - } - } - - /** 发送订单查询消息 **/ - public void channelOrderQuery(String msg) { - this.jmsTemplate.convertAndSend(mqQueue4ChannelOrderQuery, msg); - } - - /** 发送订单查询延迟消息 **/ - public void channelOrderQueryFixed(String msg, long delay) { - jmsTemplate.send(mqQueue4ChannelOrderQuery, session -> { - TextMessage tm = session.createTextMessage(msg); - tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); - tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1*1000); - tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 1); - return tm; - }); - } - - /** 发送订单回调消息 **/ - public void payOrderMchNotify(String msg) { - this.jmsTemplate.convertAndSend(mqQueue4PayOrderMchNotifyInner, msg); - } - - /** 发送订单回调延迟消息 **/ - public void payOrderMchNotifyFixed(String msg, long delay) { - jmsTemplate.send(mqQueue4PayOrderMchNotifyInner, session -> { - TextMessage tm = session.createTextMessage(msg); - tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); - tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1*1000); - tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 1); - return tm; - }); - } - - - /** 接收 查单消息 **/ - @JmsListener(destination = CS.MQ.QUEUE_CHANNEL_ORDER_QUERY) - public void receiveChannelOrderQuery(String msg) { - mqReceiveCommon.channelOrderQuery(msg); - } - - /** 接收 支付订单商户回调消息 **/ - @Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY) - @JmsListener(destination = CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY) - public void receivePayOrderMchNotify(String msg) { - mqReceiveCommon.payOrderMchNotify(msg); - } - -} diff --git a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/RabbitMqMessage.java b/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/RabbitMqMessage.java deleted file mode 100644 index e880530..0000000 --- a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/RabbitMqMessage.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.pay.mq; - -import com.jeequan.jeepay.core.constants.CS; -import com.jeequan.jeepay.core.mq.MqCommonService; -import com.jeequan.jeepay.pay.mq.receive.MqReceiveCommon; -import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Lazy; -import org.springframework.context.annotation.Profile; -import org.springframework.stereotype.Component; - -/** -* RabbitMQ -* 上游渠道订单轮询查单 -* 如:微信的条码支付,没有回调接口, 需要轮询查单完成交易结果通知。 -* -* -* @author xiaoyu -* @site https://www.jeepay.vip -* @date 2021/6/25 17:10 -*/ -@Slf4j -@Component -@Profile(CS.MQTYPE.RABBIT_MQ) -public class RabbitMqMessage extends MqCommonService { - - @Autowired private RabbitTemplate rabbitTemplate; - - @Lazy - @Autowired - private MqReceiveCommon mqReceiveCommon; - - @Override - public void send(String msg, String sendType) { - if (sendType.equals(CS.MQ.MQ_TYPE_CHANNEL_ORDER_QUERY)) { - channelOrderQuery(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY)) { - payOrderMchNotify(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_MCH_APP)) { // 商户应用修改 - directModifyMchApp(msg); - } - } - - @Override - public void send(String msg, long delay, String sendType) { - if (sendType.equals(CS.MQ.MQ_TYPE_CHANNEL_ORDER_QUERY)) { - channelOrderQueryFixed(msg, delay); - }else if (sendType.equals(CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY)) { - payOrderMchNotifyFixed(msg, delay); - } - } - - /** 发送订单查询消息 **/ - public void channelOrderQuery(String msg) { - rabbitTemplate.convertAndSend(CS.MQ.QUEUE_CHANNEL_ORDER_QUERY, msg); - } - - /** 发送订单查询延迟消息 **/ - public void channelOrderQueryFixed(String msg, long delay) { - rabbitTemplate.convertAndSend(CS.DELAYED_EXCHANGE, CS.MQ.QUEUE_CHANNEL_ORDER_QUERY, msg, a ->{ - a.getMessageProperties().setDelay(Math.toIntExact(delay)); - return a; - }); - } - - /** 发送订单回调消息 **/ - public void payOrderMchNotify(String msg) { - rabbitTemplate.convertAndSend(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY, msg); - } - - /** 发送订单回调延迟消息 **/ - public void payOrderMchNotifyFixed(String msg, long delay) { - rabbitTemplate.convertAndSend(CS.DELAYED_EXCHANGE, CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY, msg, a ->{ - a.getMessageProperties().setDelay(Math.toIntExact(delay)); - return a; - }); - } - - /** 发送商户应用修改消息 **/ - public void directModifyMchApp(String msg) { - rabbitTemplate.convertAndSend(CS.DIRECT_EXCHANGE, CS.MQ.TOPIC_MODIFY_MCH_APP, msg); - } - - - /** 接收 查单消息 **/ - @RabbitListener(queues = CS.MQ.QUEUE_CHANNEL_ORDER_QUERY) - public void receiveChannelOrderQuery(String msg) { - mqReceiveCommon.channelOrderQuery(msg); - } - - /** 接收 支付订单商户回调消息 **/ - @RabbitListener(queues = CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY) - public void receivePayOrderMchNotify(String msg) { - mqReceiveCommon.payOrderMchNotify(msg); - } -} diff --git a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/RocketMqMessage.java b/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/RocketMqMessage.java deleted file mode 100644 index 0f1af61..0000000 --- a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/RocketMqMessage.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.pay.mq; - -import com.jeequan.jeepay.core.constants.CS; -import com.jeequan.jeepay.core.mq.MqCommonService; -import com.jeequan.jeepay.pay.mq.receive.MqReceiveCommon; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; -import org.apache.rocketmq.spring.core.RocketMQTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Lazy; -import org.springframework.context.annotation.Profile; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.stereotype.Component; -import org.springframework.stereotype.Service; - -/** -* 上游渠道订单轮询查单 -* 如:微信的条码支付,没有回调接口, 需要轮询查单完成交易结果通知。 -* -* -* @author xiaoyu -* @site https://www.jeepay.vip -* @date 2021/6/25 17:10 -*/ -@Slf4j -@Component -@Profile(CS.MQTYPE.ROCKET_MQ) -public class RocketMqMessage extends MqCommonService { - - @Autowired private RocketMQTemplate rocketMQTemplate; - - @Lazy - @Autowired - private MqReceiveCommon mqReceiveCommon; - - @Override - public void send(String msg, String sendType) { - if (sendType.equals(CS.MQ.MQ_TYPE_CHANNEL_ORDER_QUERY)) { - channelOrderQuery(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY)) { - payOrderMchNotify(msg); - }else if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_MCH_INFO)) { // 商户信息修改 - modifyMchInfo(msg); - } - } - - @Override - public void send(String msg, long delay, String sendType) { - if (sendType.equals(CS.MQ.MQ_TYPE_CHANNEL_ORDER_QUERY)) { - channelOrderQueryFixed(msg, delay); - }else if (sendType.equals(CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY)) { - payOrderMchNotifyFixed(msg, delay); - } - } - - /** 发送订单查询消息 **/ - public void channelOrderQuery(String msg) { - rocketMQTemplate.convertAndSend(CS.MQ.QUEUE_CHANNEL_ORDER_QUERY, msg); - } - - /** 发送订单查询延迟消息 **/ - public void channelOrderQueryFixed(String msg, long delay) { - rocketMQTemplate.asyncSend(CS.MQ.QUEUE_CHANNEL_ORDER_QUERY, MessageBuilder.withPayload(msg).build(), new SendCallback() { - @Override - public void onSuccess(SendResult var1) { - log.info("async onSucess SendResult :{}", var1); - } - @Override - public void onException(Throwable var1) { - log.info("async onException Throwable :{}", var1); - } - }, 300000, 2); - } - - /** 发送订单回调消息 **/ - public void payOrderMchNotify(String msg) { - rocketMQTemplate.convertAndSend(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY, msg); - } - - /** 发送订单回调延迟消息 **/ - public void payOrderMchNotifyFixed(String msg, long delay) { - rocketMQTemplate.asyncSend(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY, MessageBuilder.withPayload(msg).build(), new SendCallback() { - @Override - public void onSuccess(SendResult var1) { - log.info("async onSucess SendResult :{}", var1); - } - @Override - public void onException(Throwable var1) { - log.info("async onException Throwable :{}", var1); - } - }, 300000, 4); - } - - /** 接收 查单消息 **/ - @Service - @RocketMQMessageListener(topic = CS.MQ.QUEUE_CHANNEL_ORDER_QUERY, consumerGroup = CS.MQ.QUEUE_CHANNEL_ORDER_QUERY) - class receiveChannelOrderQuery implements RocketMQListener { - @Override - public void onMessage(String msg) { - mqReceiveCommon.channelOrderQuery(msg); - } - } - - - /** 接收 支付订单商户回调消息 **/ - @Service - @RocketMQMessageListener(topic = CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY, consumerGroup = CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY) - class receivePayOrderMchNotify implements RocketMQListener { - @Override - public void onMessage(String msg) { - mqReceiveCommon.payOrderMchNotify(msg); - } - } - - - /** 发送商户信息修改消息 **/ - public void modifyMchInfo(String msg) { - sendMsg(msg, CS.MQ.TOPIC_MODIFY_MCH_INFO); - } - - public void sendMsg(String msg, String group) { - // 这里的分组和消息名称未做区分 - rocketMQTemplate.getProducer().setProducerGroup(group); - this.rocketMQTemplate.convertAndSend(group, msg); - } - -} diff --git a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/activemq/topic/MqTopicReceive.java b/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/activemq/topic/MqTopicReceive.java deleted file mode 100644 index fe33604..0000000 --- a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/activemq/topic/MqTopicReceive.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.pay.mq.activemq.topic; - -import com.jeequan.jeepay.core.constants.CS; -import com.jeequan.jeepay.pay.mq.receive.MqReceiveCommon; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Profile; -import org.springframework.jms.annotation.JmsListener; -import org.springframework.stereotype.Component; - -/* -* 接收mq消息 -* -* @author terrfly -* @site https://www.jeepay.vip -* @date 2021/6/8 17:31 -*/ -@Slf4j -@Component -@Profile(CS.MQTYPE.ACTIVE_MQ) -public class MqTopicReceive { - - @Autowired private MqReceiveCommon mqReceiveCommon; - - /** 接收 更新服务商信息的消息 **/ - @JmsListener(destination = CS.MQ.TOPIC_MODIFY_ISV_INFO, containerFactory = "jmsListenerContainer") - public void receiveModifyIsvInfo(String isvNo) { - mqReceiveCommon.modifyIsvInfo(isvNo); - } - - /** 接收 [商户配置信息] 的消息 - * 已知推送节点: - * 1. 更新商户基本资料和状态 - * 2. 删除商户时 - * **/ - @JmsListener(destination = CS.MQ.TOPIC_MODIFY_MCH_INFO, containerFactory = "jmsListenerContainer") - public void receiveModifyMchInfo(String mchNo) { - mqReceiveCommon.modifyMchInfo(mchNo); - } - - /** 接收 [商户应用支付参数配置信息] 的消息 - * 已知推送节点: - * 1. 更新商户应用配置 - * 2. 删除商户应用配置 - * **/ - @JmsListener(destination = CS.MQ.TOPIC_MODIFY_MCH_APP, containerFactory = "jmsListenerContainer") - public void receiveModifyMchApp(String mchNoAndAppId) { - mqReceiveCommon.modifyMchApp(mchNoAndAppId); - } - - /** 接收 更新系统配置项的消息 **/ - @JmsListener(destination = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, containerFactory = "jmsListenerContainer") - public void receiveModifySysConfig(String msg) { - mqReceiveCommon.initDbConfig(msg); - } - - -} diff --git a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/config/JMSConfig.java b/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/config/JMSConfig.java deleted file mode 100644 index 3d23a83..0000000 --- a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/config/JMSConfig.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.pay.mq.config; - -import com.jeequan.jeepay.core.constants.CS; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Profile; -import org.springframework.jms.config.DefaultJmsListenerContainerFactory; -import org.springframework.jms.config.JmsListenerContainerFactory; -import org.springframework.stereotype.Component; - -import javax.jms.ConnectionFactory; - -/* -* JMS消息配置项 -* -* @author terrfly -* @site https://www.jeepay.vip -* @date 2021/6/8 17:31 -*/ -@Component -@Profile(CS.MQTYPE.ACTIVE_MQ) -public class JMSConfig { - - /** 新增jmsListenerContainer, 用于接收topic类型的消息 **/ - @Bean - public JmsListenerContainerFactory jmsListenerContainer(ConnectionFactory factory){ - DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); - bean.setPubSubDomain(true); - bean.setConnectionFactory(factory); - return bean; - } -} diff --git a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/config/MqThreadExecutor.java b/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/config/MqThreadExecutor.java deleted file mode 100644 index aea26c0..0000000 --- a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/config/MqThreadExecutor.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.pay.mq.config; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.scheduling.annotation.EnableAsync; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; - -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadPoolExecutor; - -/* -* MQ 线程池配置 -* -* @author terrfly -* @site https://www.jeepay.vip -* @date 2021/6/8 17:33 -*/ -//@Configuration -//@EnableAsync -public class MqThreadExecutor { - - public static final String EXECUTOR_PAYORDER_MCH_NOTIFY = "mqQueue4PayOrderMchNotifyExecutor"; - - /* - * 功能描述: - * 支付结果通知到商户的异步执行器 (由于量大, 单独新建一个线程池处理, 之前的不做变动 ) - * 20, 300, 10, 60 该配置: 同一时间最大并发量300,(已经验证通过, 商户都可以收到请求消息) - * 缓存队列尽量减少,否则将堵塞在队列中无法执行。 corePoolSize 根据机器的配置进行添加。此处设置的为20 - */ - @Bean - public Executor mqQueue4PayOrderMchNotifyExecutor() { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(20); // 线程池维护线程的最少数量 - executor.setMaxPoolSize(300); // 线程池维护线程的最大数量 - executor.setQueueCapacity(10); // 缓存队列 - executor.setThreadNamePrefix("payOrderMchNotifyExecutor-"); - // rejection-policy:当pool已经达到max size的时候,如何处理新任务 - // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行 - executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //对拒绝task的处理策略 - executor.setKeepAliveSeconds(60); // 允许的空闲时间 - executor.initialize(); - return executor; - } - - - -} diff --git a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/config/RabbitMqConfig.java b/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/config/RabbitMqConfig.java deleted file mode 100644 index 9ef0b8c..0000000 --- a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/config/RabbitMqConfig.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.pay.mq.config; - -import com.jeequan.jeepay.core.constants.CS; -import org.springframework.amqp.core.*; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Profile; - -import java.util.HashMap; -import java.util.Map; - -/** - * RabbitMq - * 延迟消息队列绑定交换机 - * @author xiaoyu - * @site https://www.jeepay.vip - * @date 2021/6/25 17:10 - */ -@Configuration -@Profile(CS.MQTYPE.RABBIT_MQ) -public class RabbitMqConfig { - - @Bean("channelOrderQuery") - public Queue channelOrderQuery() { - return new Queue(CS.MQ.QUEUE_CHANNEL_ORDER_QUERY,true); - } - - @Bean("payOrderMchNotify") - public Queue payOrderMchNotify() { - return new Queue(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY,true); - } - - //创建 custom 交换机 - @Bean - CustomExchange customExchange() { - Map args = new HashMap<>(); - args.put("x-delayed-type", "direct"); - return new CustomExchange(CS.DELAYED_EXCHANGE, "x-delayed-message", true, false, args); - } - - //绑定 将队列和交换机绑定, 并设置用于匹配键:QUEUE_CHANNEL_ORDER_QUERY - @Bean - Binding bindingChannelOrderQuery(@Qualifier("channelOrderQuery") Queue channelOrderQuery) { - return BindingBuilder.bind(channelOrderQuery).to(customExchange()).with(CS.MQ.QUEUE_CHANNEL_ORDER_QUERY).noargs(); - } - - //绑定 将队列和交换机绑定, 并设置用于匹配键:QUEUE_PAYORDER_MCH_NOTIFY - @Bean - Binding bindingPayOrderNotify(@Qualifier("payOrderMchNotify") Queue payOrderMchNotify) { - return BindingBuilder.bind(payOrderMchNotify).to(customExchange()).with(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY).noargs(); - } - -} diff --git a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/rabbitmq/RabbitMqDirectReceive.java b/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/rabbitmq/RabbitMqDirectReceive.java deleted file mode 100644 index c973645..0000000 --- a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/rabbitmq/RabbitMqDirectReceive.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.pay.mq.rabbitmq; - -import com.jeequan.jeepay.core.constants.CS; -import com.jeequan.jeepay.pay.mq.receive.MqReceiveCommon; -import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.rabbit.annotation.Exchange; -import org.springframework.amqp.rabbit.annotation.Queue; -import org.springframework.amqp.rabbit.annotation.QueueBinding; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Profile; -import org.springframework.stereotype.Component; - -/** -* 接收mq消息 -* -* @author xiaoyu -* @site https://www.jeepay.vip -* @date 2021/6/25 17:10 -*/ -@Slf4j -@Component -@Profile(CS.MQTYPE.RABBIT_MQ) -public class RabbitMqDirectReceive { - - @Autowired private MqReceiveCommon mqReceiveCommon; - - /** 接收 更新服务商信息的消息 **/ - @RabbitListener(queues = CS.MQ.TOPIC_MODIFY_ISV_INFO) - public void receiveModifyIsvInfo(String isvNo) { - mqReceiveCommon.modifyIsvInfo(isvNo); - } - - /** 接收 [商户配置信息] 的消息 - * 已知推送节点: - * 1. 更新商户基本资料和状态 - * 2. 删除商户时 - * **/ - @RabbitListener(queues = CS.MQ.TOPIC_MODIFY_MCH_INFO) - public void receiveModifyMchInfo(String mchNo) { - mqReceiveCommon.modifyMchInfo(mchNo); - } - - /** 接收 [商户应用支付参数配置信息] 的消息 - * 已知推送节点: - * 1. 更新商户应用配置 - * 2. 删除商户应用配置 - * **/ - @RabbitListener(queues = CS.MQ.TOPIC_MODIFY_MCH_APP) - public void receiveModifyMchApp(String mchNoAndAppId) { - mqReceiveCommon.modifyMchApp(mchNoAndAppId); - } - - - /** 接收 更新系统配置项的消息 **/ - @RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name = CS.FANOUT_EXCHANGE_SYS_CONFIG,type = "fanout"))}) - public void receiveModifySysConfig(String msg) { - mqReceiveCommon.initDbConfig(msg); - } - - -} diff --git a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/receive/MqReceiveCommon.java b/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/receive/MqReceiveCommon.java deleted file mode 100644 index 210ab6c..0000000 --- a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/receive/MqReceiveCommon.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.pay.mq.receive; - -import cn.hutool.http.HttpException; -import cn.hutool.http.HttpUtil; -import com.alibaba.fastjson.JSONObject; -import com.jeequan.jeepay.core.constants.CS; -import com.jeequan.jeepay.core.entity.MchNotifyRecord; -import com.jeequan.jeepay.core.entity.PayOrder; -import com.jeequan.jeepay.core.mq.MqCommonService; -import com.jeequan.jeepay.pay.rqrs.msg.ChannelRetMsg; -import com.jeequan.jeepay.pay.service.ChannelOrderReissueService; -import com.jeequan.jeepay.pay.service.ConfigContextService; -import com.jeequan.jeepay.service.impl.MchNotifyRecordService; -import com.jeequan.jeepay.service.impl.PayOrderService; -import com.jeequan.jeepay.service.impl.SysConfigService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -/** - * 处理公共接收消息方法 - * - * @author xiaoyu - * @site https://www.jeepay.vip - * @date 2021/6/25 17:10 - */ -@Slf4j -@Service -public class MqReceiveCommon { - - @Autowired - private SysConfigService sysConfigService; - @Autowired - private ConfigContextService configContextService; - @Autowired - private PayOrderService payOrderService; - @Autowired - private ChannelOrderReissueService channelOrderReissueService; - @Autowired - private MchNotifyRecordService mchNotifyRecordService; - @Autowired - private MqCommonService mqCommonService; - - /** 接收 [商户配置信息] 的消息 **/ - public void modifyMchInfo(String mchNo) { - log.info("成功接收 [商户配置信息] 的消息, msg={}", mchNo); - configContextService.initMchInfoConfigContext(mchNo); - log.info(" [商户配置信息] 已重置"); - } - - /** 接收 [商户应用支付参数配置信息] 的消息 **/ - public void modifyMchApp(String mchNoAndAppId) { - log.info("成功接收 [商户应用支付参数配置信息] 的消息, msg={}", mchNoAndAppId); - JSONObject jsonObject = (JSONObject) JSONObject.parse(mchNoAndAppId); - configContextService.initMchAppConfigContext(jsonObject.getString("mchNo"), jsonObject.getString("appId")); - log.info(" [商户应用支付参数配置信息] 已重置"); - } - - /** 重置ISV信息 **/ - public void modifyIsvInfo(String isvNo) { - log.info("成功接收 [ISV信息] 重置, msg={}", isvNo); - configContextService.initIsvConfigContext(isvNo); - log.info("[ISV信息] 已重置"); - } - - /** 接收商户订单回调通知 **/ - public void payOrderMchNotify(String msg) { - try { - log.info("接收商户通知MQ, msg={}", msg); - Long notifyId = Long.parseLong(msg); - MchNotifyRecord record = mchNotifyRecordService.getById(notifyId); - if(record == null || record.getState() != MchNotifyRecord.STATE_ING){ - log.info("查询通知记录不存在或状态不是通知中"); - return; - } - if( record.getNotifyCount() >= record.getNotifyCountLimit() ){ - log.info("已达到最大发送次数"); - return; - } - - //1. (发送结果最多6次) - Integer currentCount = record.getNotifyCount() + 1; - - String notifyUrl = record.getNotifyUrl(); - String res = ""; - try { - res = HttpUtil.createPost(notifyUrl).timeout(20000).execute().body(); - } catch (HttpException e) { - log.error("http error", e); - } - - if(currentCount == 1){ //第一次通知: 更新为已通知 - payOrderService.updateNotifySent(record.getOrderId()); - } - - //通知成功 - if("SUCCESS".equalsIgnoreCase(res)){ - mchNotifyRecordService.updateNotifyResult(notifyId, MchNotifyRecord.STATE_SUCCESS, res); - return; - } - - //通知次数 >= 最大通知次数时, 更新响应结果为异常, 不在继续延迟发送消息 - if( currentCount >= record.getNotifyCountLimit() ){ - mchNotifyRecordService.updateNotifyResult(notifyId, MchNotifyRecord.STATE_FAIL, res); - return; - } - - // 继续发送MQ 延迟发送 - mchNotifyRecordService.updateNotifyResult(notifyId, MchNotifyRecord.STATE_ING, res); - // 通知延时次数 - // 1 2 3 4 5 6 - // 0 30 60 90 120 150 - mqCommonService.send(msg, currentCount * 30 * 1000, CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY); - return; - }catch (Exception e) { - log.error(e.getMessage()); - return; - } - } - - /** 接收订单查单通知 **/ - public void channelOrderQuery(String msg) { - try { - String [] arr = msg.split(","); - String payOrderId = arr[0]; - int currentCount = Integer.parseInt(arr[1]); - log.info("接收轮询查单通知MQ, payOrderId={}, count={}", payOrderId, currentCount); - currentCount++ ; - - PayOrder payOrder = payOrderService.getById(payOrderId); - if(payOrder == null) { - log.warn("查询支付订单为空,payOrderId={}", payOrderId); - return; - } - - if(payOrder.getState() != PayOrder.STATE_ING) { - log.warn("订单状态不是支付中,不需查询渠道.payOrderId={}", payOrderId); - return; - } - - if (payOrder == null) return; - ChannelRetMsg channelRetMsg = channelOrderReissueService.processPayOrder(payOrder); - - //返回null 可能为接口报错等, 需要再次轮询 - if(channelRetMsg == null || channelRetMsg.getChannelState() == null || channelRetMsg.getChannelState().equals(ChannelRetMsg.ChannelState.WAITING)){ - - //最多查询6次 - if(currentCount <= 6){ - mqCommonService.send(buildMsg(payOrderId, currentCount), 5 * 1000, CS.MQ.MQ_TYPE_CHANNEL_ORDER_QUERY); //延迟5s再次查询 - }else{ - - //TODO 调用【撤销订单】接口 - - } - - }else{ //其他状态, 不需要再次轮询。 - } - }catch (Exception e) { - log.error(e.getMessage()); - return; - } - } - /** 接收系统配置修改通知 **/ - public void initDbConfig(String msg) { - log.info("成功接收更新系统配置的订阅通知, msg={}", msg); - sysConfigService.initDBConfig(msg); - log.info("系统配置静态属性已重置"); - } - - public static final String buildMsg(String payOrderId, int count){ - return payOrderId + "," + count; - } -} diff --git a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/rocketmq/RocketMqReceive.java b/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/rocketmq/RocketMqReceive.java deleted file mode 100644 index 25812db..0000000 --- a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/rocketmq/RocketMqReceive.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). - *

- * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.gnu.org/licenses/lgpl.html - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jeequan.jeepay.pay.mq.rocketmq; - -import com.jeequan.jeepay.core.constants.CS; -import com.jeequan.jeepay.pay.mq.receive.MqReceiveCommon; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.annotation.MessageModel; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Profile; -import org.springframework.stereotype.Component; -import org.springframework.stereotype.Service; - -/** - * 消息接收 - * @author pangxiaoyu - * @site https://www.jeepay.vip - * @date 2021-04-27 15:50 - */ -@Slf4j -@Component -@Profile(CS.MQTYPE.ROCKET_MQ) -public class RocketMqReceive { - - @Autowired private MqReceiveCommon mqReceiveCommon; - - /** 接收 更新服务商信息的消息 **/ - @Service - @RocketMQMessageListener(topic = CS.MQ.TOPIC_MODIFY_ISV_INFO, consumerGroup = CS.MQ.TOPIC_MODIFY_ISV_INFO) - class receiveModifyIsvInfo implements RocketMQListener { - @Override - public void onMessage(String isvNo) { - mqReceiveCommon.modifyIsvInfo(isvNo); - } - } - - /** 接收 [商户配置信息] 的消息 - * 已知推送节点: - * 1. 更新商户基本资料和状态 - * 2. 删除商户时 - * **/ - @Service - @RocketMQMessageListener(topic = CS.MQ.TOPIC_MODIFY_MCH_INFO, consumerGroup = CS.MQ.TOPIC_MODIFY_MCH_INFO) - class receiveModifyMchInfo implements RocketMQListener { - @Override - public void onMessage(String mchNo) { - mqReceiveCommon.modifyMchInfo(mchNo); - } - } - - /** 接收 [商户应用支付参数配置信息] 的消息 - * 已知推送节点: - * 1. 更新商户应用配置 - * 2. 删除商户应用配置 - * **/ - @Service - @RocketMQMessageListener(topic = CS.MQ.TOPIC_MODIFY_MCH_APP, consumerGroup = CS.MQ.TOPIC_MODIFY_MCH_APP) - class receiveModifyMchApp implements RocketMQListener { - @Override - public void onMessage(String mchNoAndAppId) { - mqReceiveCommon.modifyMchApp(mchNoAndAppId); - } - } - - /** 接收 更新系统配置项的消息 **/ - @Service - @RocketMQMessageListener(topic = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, consumerGroup = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, messageModel = MessageModel.BROADCASTING) - class receiveInitDbConfig implements RocketMQListener { - @Override - public void onMessage(String msg) { - mqReceiveCommon.initDbConfig(msg); - } - } -}