rocketmq支持
This commit is contained in:
parent
e4c6280d4f
commit
ccc0c49423
|
|
@ -128,28 +128,28 @@ public class CS {
|
|||
public interface MQ{
|
||||
|
||||
/** 更新配置的通知消息 **/
|
||||
String TOPIC_MODIFY_SYS_CONFIG = "topic.modify.sys.config";
|
||||
String TOPIC_MODIFY_SYS_CONFIG = "topic-modify-sys-config";
|
||||
|
||||
/** 更新商户配置信息 **/
|
||||
String TOPIC_MODIFY_MCH_INFO = "topic.modify.mch.info";
|
||||
String TOPIC_MODIFY_MCH_INFO = "topic-modify-mch-info";
|
||||
|
||||
/** 更新商户应用配置信息 **/
|
||||
String TOPIC_MODIFY_MCH_APP = "topic.modify.mch.app";
|
||||
String TOPIC_MODIFY_MCH_APP = "topic-modify-mch-app";
|
||||
|
||||
/** 更新服务商配置信息 **/
|
||||
String TOPIC_MODIFY_ISV_INFO = "topic.modify.isv.info";
|
||||
String TOPIC_MODIFY_ISV_INFO = "topic-modify-isv-info";
|
||||
|
||||
/** 支付订单 商户通知MQ **/
|
||||
String QUEUE_PAYORDER_MCH_NOTIFY = "queue.payorder.mch.notify";
|
||||
String QUEUE_PAYORDER_MCH_NOTIFY = "queue-payorder-mch-notify";
|
||||
|
||||
/** 轮询查单 MQ **/
|
||||
String QUEUE_CHANNEL_ORDER_QUERY = "queue.channel.order.query";
|
||||
String QUEUE_CHANNEL_ORDER_QUERY = "queue-channel-order-query";
|
||||
|
||||
/** 清除商户登录用户信息 **/
|
||||
String QUEUE_MODIFY_MCH_USER_REMOVE = "queue.modify.mch.user.remove";
|
||||
String QUEUE_MODIFY_MCH_USER_REMOVE = "queue-modify-mch-user-remove";
|
||||
|
||||
/** 用于activemq 发布订阅模式交换机**/
|
||||
String FANOUT_MODIFY_SYS_CONFIG = "fanout.modify.sys.config";
|
||||
String FANOUT_MODIFY_SYS_CONFIG = "fanout-modify-sys-config";
|
||||
|
||||
/** MQ消息类型 **/
|
||||
String MQ_TYPE_MODIFY_MCH_APP = "modify.mch.app";
|
||||
|
|
@ -170,6 +170,7 @@ public class CS {
|
|||
public static class MQTYPE{
|
||||
public static final String ACTIVE_MQ = "activeMQ";
|
||||
public static final String RABBIT_MQ = "rabbitMQ";
|
||||
public static final String ROCKET_MQ = "rocketMQ";
|
||||
}
|
||||
|
||||
//菜单类型
|
||||
|
|
|
|||
|
|
@ -95,6 +95,13 @@
|
|||
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- 添加对rocketMQ的支持 -->
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||
<version>${rocketmq.spring.boot.starter.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- 引入 jeepay-sdk-java -->
|
||||
<dependency>
|
||||
<groupId>com.jeequan</groupId>
|
||||
|
|
|
|||
|
|
@ -0,0 +1,116 @@
|
|||
/*
|
||||
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
|
||||
* <p>
|
||||
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.gnu.org/licenses/lgpl.html
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.jeequan.jeepay.mgr.mq;
|
||||
|
||||
import com.jeequan.jeepay.core.constants.CS;
|
||||
import com.jeequan.jeepay.core.mq.MqCommonService;
|
||||
import com.jeequan.jeepay.service.impl.SysConfigService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.MessageModel;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author pangxiaoyu
|
||||
* @site https://www.jeepay.vip
|
||||
* @date 2021-06-07 07:15
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@Profile(CS.MQTYPE.ROCKET_MQ)
|
||||
public class RocketMqSend extends MqCommonService {
|
||||
|
||||
@Autowired private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
@Autowired private SysConfigService sysConfigService;
|
||||
|
||||
@Override
|
||||
public void send(String msg, String sendType) {
|
||||
if (sendType.equals(CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY)) { // 商户订单回调
|
||||
payOrderMchNotify(msg);
|
||||
}else if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_ISV_INFO)) { // 服务商信息修改
|
||||
modifyIsvInfo(msg);
|
||||
}else if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_MCH_APP)) { // 商户应用修改
|
||||
modifyMchApp(msg);
|
||||
}else if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_MCH_INFO)) { // 商户信息修改
|
||||
modifyMchInfo(msg);
|
||||
}else if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_SYS_CONFIG)) { // 系统配置修改
|
||||
modifySysConfig(msg);
|
||||
}else if (sendType.equals(CS.MQ.MQ_TYPE_MCH_LOGIN_USER_REMOVE)) { // 商户登录用户清除信息
|
||||
mchLoginUserRemove(msg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(String msg, long delay, String sendType) {
|
||||
|
||||
}
|
||||
|
||||
/** 发送商户订单回调消息 **/
|
||||
public void payOrderMchNotify(String msg) {
|
||||
sendMsg(msg, CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY);
|
||||
}
|
||||
|
||||
/** 发送服务商信息修改消息 **/
|
||||
public void modifyIsvInfo(String msg) {
|
||||
sendMsg(msg, CS.MQ.TOPIC_MODIFY_ISV_INFO);
|
||||
}
|
||||
|
||||
/** 发送商户应用修改消息 **/
|
||||
public void modifyMchApp(String msg) {
|
||||
sendMsg(msg, CS.MQ.TOPIC_MODIFY_MCH_APP);
|
||||
}
|
||||
|
||||
/** 发送商户信息修改消息 **/
|
||||
public void modifyMchInfo(String msg) {
|
||||
sendMsg(msg, CS.MQ.TOPIC_MODIFY_MCH_INFO);
|
||||
}
|
||||
|
||||
/** 发送系统配置修改消息 **/
|
||||
public void modifySysConfig(String msg) {
|
||||
sendMsg(msg, CS.MQ.TOPIC_MODIFY_SYS_CONFIG);
|
||||
}
|
||||
|
||||
/** 发送商户登录用户清除信息消息 **/
|
||||
public void mchLoginUserRemove(String msg) {
|
||||
sendMsg(msg, CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE);
|
||||
}
|
||||
|
||||
public void sendMsg(String msg, String group) {
|
||||
// 这里的分组和消息名称未做区分
|
||||
rocketMQTemplate.getProducer().setProducerGroup(group);
|
||||
this.rocketMQTemplate.convertAndSend(group, msg);
|
||||
}
|
||||
|
||||
/** 接收 更新系统配置项的消息 **/
|
||||
@Service
|
||||
@RocketMQMessageListener(topic = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, consumerGroup = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, messageModel = MessageModel.BROADCASTING)
|
||||
class RocketMqReceive implements RocketMQListener<String> {
|
||||
@Override
|
||||
public void onMessage(String msg) {
|
||||
log.info("成功接收更新系统配置的订阅通知, msg={}", msg);
|
||||
sysConfigService.initDBConfig(msg);
|
||||
log.info("系统配置静态属性已重置");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -95,6 +95,13 @@
|
|||
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- 添加对rocketMQ的支持 -->
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||
<version>${rocketmq.spring.boot.starter.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- 引入 jeepay-sdk-java -->
|
||||
<dependency>
|
||||
<groupId>com.jeequan</groupId>
|
||||
|
|
|
|||
|
|
@ -28,8 +28,6 @@ import org.springframework.jms.core.JmsTemplate;
|
|||
import org.springframework.stereotype.Component;
|
||||
|
||||
/*
|
||||
* 更改商户应用信息
|
||||
*
|
||||
* @author terrfly
|
||||
* @site https://www.jeepay.vip
|
||||
* @date 2021/6/8 17:10
|
||||
|
|
|
|||
|
|
@ -25,7 +25,6 @@ import org.springframework.stereotype.Component;
|
|||
|
||||
/**
|
||||
* RabbitMq
|
||||
* 商户应用修改推送
|
||||
* @author xiaoyu
|
||||
* @site https://www.jeepay.vip
|
||||
* @date 2021/6/25 17:10
|
||||
|
|
|
|||
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
|
||||
* <p>
|
||||
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.gnu.org/licenses/lgpl.html
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.jeequan.jeepay.mch.mq;
|
||||
|
||||
import com.jeequan.jeepay.core.constants.CS;
|
||||
import com.jeequan.jeepay.core.mq.MqCommonService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* RocketMQ
|
||||
* @author xiaoyu
|
||||
* @site https://www.jeepay.vip
|
||||
* @date 2021/6/25 17:10
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@Profile(CS.MQTYPE.ROCKET_MQ)
|
||||
public class RocketMqSend extends MqCommonService {
|
||||
|
||||
@Autowired private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
/** 推送消息到各个节点 **/
|
||||
@Override
|
||||
public void send(String msg, String sendType) {
|
||||
if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_MCH_APP)) { // 商户应用修改
|
||||
topicModifyMchApp(msg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(String msg, long delay, String sendType) {
|
||||
|
||||
}
|
||||
|
||||
/** 发送商户应用修改信息 **/
|
||||
public void topicModifyMchApp(String msg) {
|
||||
rocketMQTemplate.getProducer().setProducerGroup(CS.MQ.TOPIC_MODIFY_MCH_APP);
|
||||
rocketMQTemplate.convertAndSend(CS.MQ.TOPIC_MODIFY_MCH_APP, msg);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
|
||||
* <p>
|
||||
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.gnu.org/licenses/lgpl.html
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.jeequan.jeepay.mch.mq.rocketmq;
|
||||
|
||||
import com.jeequan.jeepay.core.constants.CS;
|
||||
import com.jeequan.jeepay.mch.mq.receive.MqReceiveCommon;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.MessageModel;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* 消息接收
|
||||
* @author pangxiaoyu
|
||||
* @site https://www.jeepay.vip
|
||||
* @date 2021-04-27 15:50
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@Profile(CS.MQTYPE.ROCKET_MQ)
|
||||
public class RocketMqReceive {
|
||||
|
||||
@Autowired private MqReceiveCommon mqReceiveCommon;
|
||||
|
||||
/**
|
||||
* @author: pangxiaoyu
|
||||
* @date: 2021/6/7 16:17
|
||||
* @describe: 接收 商户用户登录信息清除消息
|
||||
*/
|
||||
@Service
|
||||
@RocketMQMessageListener(topic = CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE, consumerGroup = CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE)
|
||||
class receiveRemoveMchUser implements RocketMQListener<String> {
|
||||
@Override
|
||||
public void onMessage(String userIdStr) {
|
||||
mqReceiveCommon.removeMchUser(userIdStr);
|
||||
}
|
||||
}
|
||||
|
||||
/** 接收 更新系统配置项的消息 **/
|
||||
@Service
|
||||
@RocketMQMessageListener(topic = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, consumerGroup = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, messageModel = MessageModel.BROADCASTING)
|
||||
class receiveInitDbConfig implements RocketMQListener<String> {
|
||||
@Override
|
||||
public void onMessage(String msg) {
|
||||
mqReceiveCommon.initDbConfig(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -79,6 +79,13 @@
|
|||
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- 添加对rocketMQ的支持 -->
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||
<version>${rocketmq.spring.boot.starter.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!--wx_pay https://github.com/wechat-group/WxJava -->
|
||||
<dependency>
|
||||
<groupId>com.github.binarywang</groupId>
|
||||
|
|
|
|||
|
|
@ -0,0 +1,130 @@
|
|||
/*
|
||||
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
|
||||
* <p>
|
||||
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.gnu.org/licenses/lgpl.html
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.jeequan.jeepay.pay.mq;
|
||||
|
||||
import com.jeequan.jeepay.core.constants.CS;
|
||||
import com.jeequan.jeepay.core.mq.MqCommonService;
|
||||
import com.jeequan.jeepay.pay.mq.receive.MqReceiveCommon;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* 上游渠道订单轮询查单
|
||||
* 如:微信的条码支付,没有回调接口, 需要轮询查单完成交易结果通知。
|
||||
*
|
||||
*
|
||||
* @author xiaoyu
|
||||
* @site https://www.jeepay.vip
|
||||
* @date 2021/6/25 17:10
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@Profile(CS.MQTYPE.ROCKET_MQ)
|
||||
public class RocketMqMessage extends MqCommonService {
|
||||
|
||||
@Autowired private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
@Lazy
|
||||
@Autowired
|
||||
private MqReceiveCommon mqReceiveCommon;
|
||||
|
||||
@Override
|
||||
public void send(String msg, String sendType) {
|
||||
if (sendType.equals(CS.MQ.MQ_TYPE_CHANNEL_ORDER_QUERY)) {
|
||||
channelOrderQuery(msg);
|
||||
}else if (sendType.equals(CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY)) {
|
||||
payOrderMchNotify(msg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(String msg, long delay, String sendType) {
|
||||
if (sendType.equals(CS.MQ.MQ_TYPE_CHANNEL_ORDER_QUERY)) {
|
||||
channelOrderQueryFixed(msg, delay);
|
||||
}else if (sendType.equals(CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY)) {
|
||||
payOrderMchNotifyFixed(msg, delay);
|
||||
}
|
||||
}
|
||||
|
||||
/** 发送订单查询消息 **/
|
||||
public void channelOrderQuery(String msg) {
|
||||
rocketMQTemplate.convertAndSend(CS.MQ.QUEUE_CHANNEL_ORDER_QUERY, msg);
|
||||
}
|
||||
|
||||
/** 发送订单查询延迟消息 **/
|
||||
public void channelOrderQueryFixed(String msg, long delay) {
|
||||
rocketMQTemplate.asyncSend(CS.MQ.QUEUE_CHANNEL_ORDER_QUERY, MessageBuilder.withPayload(msg).build(), new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult var1) {
|
||||
log.info("async onSucess SendResult :{}", var1);
|
||||
}
|
||||
@Override
|
||||
public void onException(Throwable var1) {
|
||||
log.info("async onException Throwable :{}", var1);
|
||||
}
|
||||
}, 300000, 2);
|
||||
}
|
||||
|
||||
/** 发送订单回调消息 **/
|
||||
public void payOrderMchNotify(String msg) {
|
||||
rocketMQTemplate.convertAndSend(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY, msg);
|
||||
}
|
||||
|
||||
/** 发送订单回调延迟消息 **/
|
||||
public void payOrderMchNotifyFixed(String msg, long delay) {
|
||||
rocketMQTemplate.asyncSend(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY, MessageBuilder.withPayload(msg).build(), new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult var1) {
|
||||
log.info("async onSucess SendResult :{}", var1);
|
||||
}
|
||||
@Override
|
||||
public void onException(Throwable var1) {
|
||||
log.info("async onException Throwable :{}", var1);
|
||||
}
|
||||
}, 300000, 4);
|
||||
}
|
||||
|
||||
/** 接收 查单消息 **/
|
||||
@Service
|
||||
@RocketMQMessageListener(topic = CS.MQ.QUEUE_CHANNEL_ORDER_QUERY, consumerGroup = CS.MQ.QUEUE_CHANNEL_ORDER_QUERY)
|
||||
class receiveChannelOrderQuery implements RocketMQListener<String> {
|
||||
@Override
|
||||
public void onMessage(String msg) {
|
||||
mqReceiveCommon.channelOrderQuery(msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/** 接收 支付订单商户回调消息 **/
|
||||
@Service
|
||||
@RocketMQMessageListener(topic = CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY, consumerGroup = CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY)
|
||||
class receivePayOrderMchNotify implements RocketMQListener<String> {
|
||||
@Override
|
||||
public void onMessage(String msg) {
|
||||
mqReceiveCommon.payOrderMchNotify(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -30,7 +30,6 @@ import com.jeequan.jeepay.service.impl.PayOrderService;
|
|||
import com.jeequan.jeepay.service.impl.SysConfigService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
|
|
@ -59,21 +58,24 @@ public class MqReceiveCommon {
|
|||
|
||||
/** 接收 [商户配置信息] 的消息 **/
|
||||
public void modifyMchInfo(String mchNo) {
|
||||
log.info("接收 [商户配置信息] 的消息, msg={}", mchNo);
|
||||
log.info("成功接收 [商户配置信息] 的消息, msg={}", mchNo);
|
||||
configContextService.initMchInfoConfigContext(mchNo);
|
||||
log.info(" [商户配置信息] 已重置");
|
||||
}
|
||||
|
||||
/** 接收 [商户应用支付参数配置信息] 的消息 **/
|
||||
public void modifyMchApp(String mchNoAndAppId) {
|
||||
log.info("接收 [商户应用支付参数配置信息] 的消息, msg={}", mchNoAndAppId);
|
||||
log.info("成功接收 [商户应用支付参数配置信息] 的消息, msg={}", mchNoAndAppId);
|
||||
JSONObject jsonObject = (JSONObject) JSONObject.parse(mchNoAndAppId);
|
||||
configContextService.initMchAppConfigContext(jsonObject.getString("mchNo"), jsonObject.getString("appId"));
|
||||
log.info(" [商户应用支付参数配置信息] 已重置");
|
||||
}
|
||||
|
||||
/** 重置ISV信息 **/
|
||||
public void modifyIsvInfo(String isvNo) {
|
||||
log.info("重置ISV信息, msg={}", isvNo);
|
||||
log.info("成功接收 [ISV信息] 重置, msg={}", isvNo);
|
||||
configContextService.initIsvConfigContext(isvNo);
|
||||
log.info("[ISV信息] 已重置");
|
||||
}
|
||||
|
||||
/** 接收商户订单回调通知 **/
|
||||
|
|
@ -109,11 +111,13 @@ public class MqReceiveCommon {
|
|||
//通知成功
|
||||
if("SUCCESS".equalsIgnoreCase(res)){
|
||||
mchNotifyRecordService.updateNotifyResult(notifyId, MchNotifyRecord.STATE_SUCCESS, res);
|
||||
return;
|
||||
}
|
||||
|
||||
//通知次数 >= 最大通知次数时, 更新响应结果为异常, 不在继续延迟发送消息
|
||||
if( currentCount >= record.getNotifyCountLimit() ){
|
||||
mchNotifyRecordService.updateNotifyResult(notifyId, MchNotifyRecord.STATE_FAIL, res);
|
||||
return;
|
||||
}
|
||||
|
||||
// 继续发送MQ 延迟发送
|
||||
|
|
|
|||
|
|
@ -0,0 +1,89 @@
|
|||
/*
|
||||
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
|
||||
* <p>
|
||||
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.gnu.org/licenses/lgpl.html
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.jeequan.jeepay.pay.mq.rocketmq;
|
||||
|
||||
import com.jeequan.jeepay.core.constants.CS;
|
||||
import com.jeequan.jeepay.pay.mq.receive.MqReceiveCommon;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.MessageModel;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* 消息接收
|
||||
* @author pangxiaoyu
|
||||
* @site https://www.jeepay.vip
|
||||
* @date 2021-04-27 15:50
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@Profile(CS.MQTYPE.ROCKET_MQ)
|
||||
public class RocketMqReceive {
|
||||
|
||||
@Autowired private MqReceiveCommon mqReceiveCommon;
|
||||
|
||||
/** 接收 更新服务商信息的消息 **/
|
||||
@Service
|
||||
@RocketMQMessageListener(topic = CS.MQ.TOPIC_MODIFY_ISV_INFO, consumerGroup = CS.MQ.TOPIC_MODIFY_ISV_INFO)
|
||||
class receiveModifyIsvInfo implements RocketMQListener<String> {
|
||||
@Override
|
||||
public void onMessage(String isvNo) {
|
||||
mqReceiveCommon.modifyIsvInfo(isvNo);
|
||||
}
|
||||
}
|
||||
|
||||
/** 接收 [商户配置信息] 的消息
|
||||
* 已知推送节点:
|
||||
* 1. 更新商户基本资料和状态
|
||||
* 2. 删除商户时
|
||||
* **/
|
||||
@Service
|
||||
@RocketMQMessageListener(topic = CS.MQ.TOPIC_MODIFY_MCH_INFO, consumerGroup = CS.MQ.TOPIC_MODIFY_MCH_INFO)
|
||||
class receiveModifyMchInfo implements RocketMQListener<String> {
|
||||
@Override
|
||||
public void onMessage(String mchNo) {
|
||||
mqReceiveCommon.modifyMchInfo(mchNo);
|
||||
}
|
||||
}
|
||||
|
||||
/** 接收 [商户应用支付参数配置信息] 的消息
|
||||
* 已知推送节点:
|
||||
* 1. 更新商户应用配置
|
||||
* 2. 删除商户应用配置
|
||||
* **/
|
||||
@Service
|
||||
@RocketMQMessageListener(topic = CS.MQ.TOPIC_MODIFY_MCH_APP, consumerGroup = CS.MQ.TOPIC_MODIFY_MCH_APP)
|
||||
class receiveModifyMchApp implements RocketMQListener<String> {
|
||||
@Override
|
||||
public void onMessage(String mchNoAndAppId) {
|
||||
mqReceiveCommon.modifyMchApp(mchNoAndAppId);
|
||||
}
|
||||
}
|
||||
|
||||
/** 接收 更新系统配置项的消息 **/
|
||||
@Service
|
||||
@RocketMQMessageListener(topic = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, consumerGroup = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, messageModel = MessageModel.BROADCASTING)
|
||||
class receiveInitDbConfig implements RocketMQListener<String> {
|
||||
@Override
|
||||
public void onMessage(String msg) {
|
||||
mqReceiveCommon.initDbConfig(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
1
pom.xml
1
pom.xml
|
|
@ -44,6 +44,7 @@
|
|||
<spring.security.version>5.4.6</spring.security.version> <!-- 用于core的scope依赖 -->
|
||||
<jjwt.version>0.9.1</jjwt.version>
|
||||
<binarywang.weixin.java.version>4.1.0</binarywang.weixin.java.version>
|
||||
<rocketmq.spring.boot.starter.version>2.0.3</rocketmq.spring.boot.starter.version>
|
||||
|
||||
</properties>
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue