Skip to content

Commit

Permalink
添加根据发送者id过滤的功能
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaopei0418 committed Apr 29, 2022
1 parent 60cd060 commit ce49d29
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 9 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
apply plugin: 'io.spring.dependency-management'

group = 'com.github'
version = '8.0'
version = '9.0'
sourceCompatibility = '1.8'

repositories {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class DistributionProp {

private Map<String, String> msgtypeDistribution;

private Map<String, String> senderIdDistribution;

private Map<String, Integer> percentageDistribution;

//比重总数
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@ protected void handleMessageInternal(Message<?> message) {
playload = sm.getBytes(CommonConstant.CHARSET);
}
String dxpid = DistributionUtils.getDxpIdByMessage(sm);
String senderId = DistributionUtils.getSenderIdByMessage(sm);
String msgtype = DistributionUtils.getMessageType(sm);
String queueName = DistributionUtils.getDestinationQueueName(this.distributionProp, dxpid, msgtype);
String queueName = DistributionUtils.getDestinationQueueName(this.distributionProp, dxpid, msgtype, senderId);
logger.info("search queueName is [" + queueName + "]");
if (queueName.indexOf("|||") != -1) {
String dir = queueName.replaceAll("\\|\\|\\|", "");
distributionMessageGateway.writeToFile(new File(dir), playload);
logger.info("dxpId=[" + dxpid + "] messageType=[" + msgtype + "] write to dir=[" + dir + "] use["
logger.info("senderId=[" + senderId + "] dxpId=[" + dxpid + "] messageType=[" + msgtype + "] write to dir=[" + dir + "] use["
+ ((double) (System.nanoTime() - startTime) / 1000000.0) + "]ms");
return;
} else if (queueName.indexOf("||") != -1) {
Expand All @@ -92,7 +93,7 @@ protected void handleMessageInternal(Message<?> message) {
SendMessageThread.getExecutorService().execute(
null != useJmsTemplate ? new SendMessageThread(useJmsTemplate, playload, queue, messagePostProcessor)
: new RabbitSendMessageThread(userRabbitmqTemplate, sm, queueName));
logger.info("cache size [" + IntegrationConfiguration.CACHE_QUEUE.size() + "] dxpId=[" + dxpid + "] messageType=["
logger.info("cache size [" + IntegrationConfiguration.CACHE_QUEUE.size() + "] senderId=[" + senderId + "] dxpId=[" + dxpid + "] messageType=["
+ msgtype + "] ccsid=[" + useCcsid + "] distributionQueue=[" + queueName + "] use["
+ ((double) (System.nanoTime() - startTime) / 1000000.0) + "]ms");
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class DistributionUtils {

private final static Log logger = LogFactory.getLog(DistributionUtils.class);

public static String getDestinationQueueName(DistributionProp distributionProp, String dxpid, String msgtype) {
public static String getDestinationQueueName(DistributionProp distributionProp, String dxpid, String msgtype, String senderId) {
String result = null;

if (distributionProp.getConditionMutualExclusion()) {
Expand All @@ -25,6 +25,9 @@ public static String getDestinationQueueName(DistributionProp distributionProp,
} else if (null != distributionProp.getMsgtypeDistribution() && !distributionProp.getMsgtypeDistribution().isEmpty()) {
logger.debug("msgtype distribution");
result = distributionProp.getMsgtypeDistribution().get(msgtype);
} else if (null != distributionProp.getSenderIdDistribution() && !distributionProp.getSenderIdDistribution().isEmpty()) {
logger.debug("sender distribution");
result = distributionProp.getSenderIdDistribution().get(senderId);
} else if (null != distributionProp.getPercentageDistribution() && !distributionProp.getPercentageDistribution().isEmpty()) {
logger.debug("percentage distribution");
if (distributionProp.isUpdate()) {
Expand Down Expand Up @@ -68,6 +71,11 @@ public static String getDestinationQueueName(DistributionProp distributionProp,
result = distributionProp.getMsgtypeDistribution().get(msgtype);
}

if (null == result && null != distributionProp.getSenderIdDistribution() && !distributionProp.getSenderIdDistribution().isEmpty()) {
logger.debug("senderId distribution");
result = distributionProp.getSenderIdDistribution().get(senderId);
}

if (null == result && null != distributionProp.getPercentageDistribution() && !distributionProp.getPercentageDistribution().isEmpty()) {
logger.debug("percentage distribution");
if (distributionProp.isUpdate()) {
Expand Down Expand Up @@ -173,6 +181,18 @@ public static String getDxpIdByMessage(String message) {
return null;
}

public static String getSenderIdByMessage(String message) {
if (null == message) {
return null;
}
Pattern pattern = Pattern.compile("<SenderId>(.+)</SenderId>");
Matcher matcher = pattern.matcher(message);
if (matcher.find()) {
return matcher.group(1);
}
return null;
}

public static String getMessageType(String message) {
if (null == message) {
return null;
Expand Down
14 changes: 10 additions & 4 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,22 @@ distribution:
rabbitOtherOutputQueue:
conditionMutualExclusion: true
# 优先级第一 map key:dxpid, value: 队列名称 如: {DXPENT0000011951: DXP_11951_ENT},
# 加"|数字" 是 otherOutputQueue 中的索引, 加"||数字"是 rabbitOtherOutputQueue中的索引,rabbitOutputQueue为第一个,
# 加"|数字" 是 otherOutputQueue 中的索引, 加"||数字"是 rabbitOtherOutputQueue中的索引,rabbitOtherOutputQueue为第一个,
# 加|||则前面为输入目录名,直接把数据写入到相应目录
dxpidDistribution: {}
# 优先级第二 map key:msgtype, value: 队列名称 如: {CEB312Message: DXP_11951_ENT}
msgtypeDistribution: {}
# 优先级第三 map key: 队列名称, value: 比重 如: {GGFW_TO_ENT: 1, GGFW_TO_ENT_INVT: 1}
# 优先级第三 map key:dxpid, value: 队列名称 如: {DXPENT0000011951: DXP_11951_ENT},
# 加"|数字" 是 otherOutputQueue 中的索引, 加"||数字"是 rabbitOtherOutputQueue中的索引,rabbitOtherOutputQueue为第一个,
# 加|||则前面为输入目录名,直接把数据写入到相应目录
senderIdDistribution: {}
# 优先级第四 map key: 队列名称, value: 比重 如: {GGFW_TO_ENT: 1, GGFW_TO_ENT_INVT: 1}
percentageDistribution: {}
# 前面三个没有,才执行最后一个 列表格式,随机分发
# 前面四个没有,才执行最后一个 列表格式,随机分发
randomDistribution:
# 所有没有区配到,就转发到默认队列,队列后面加一个:号,走second队列管理器, 两个::号走third队列管理器
# 所有没有区配到,就转发到默认队列
# 加"|数字" 是 otherOutputQueue 中的索引, 加"||数字"是 rabbitOtherOutputQueue中的索引,rabbitOtherOutputQueue为第一个,
# 加|||则前面为输入目录名,直接把数据写入到相应目录
defaultQueue: GGFW_TO_ENT
filePrefix: FILE_RECEIVE_
fileSuffix: .xml
Expand Down

0 comments on commit ce49d29

Please sign in to comment.