From ce49d292009c586476130729b5914c94c8a63c29 Mon Sep 17 00:00:00 2001 From: zhaopei Date: Sat, 30 Apr 2022 00:29:43 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=A0=B9=E6=8D=AE=E5=8F=91?= =?UTF-8?q?=E9=80=81=E8=80=85id=E8=BF=87=E6=BB=A4=E7=9A=84=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 2 +- .../config/DistributionProp.java | 2 ++ .../DistributionSendingMessageHandler.java | 7 +++--- .../utils/DistributionUtils.java | 22 ++++++++++++++++++- src/main/resources/application.yml | 14 ++++++++---- 5 files changed, 38 insertions(+), 9 deletions(-) diff --git a/build.gradle b/build.gradle index ab5db75..c847019 100644 --- a/build.gradle +++ b/build.gradle @@ -6,7 +6,7 @@ plugins { apply plugin: 'io.spring.dependency-management' group = 'com.github' -version = '8.0' +version = '9.0' sourceCompatibility = '1.8' repositories { diff --git a/src/main/java/com/github/distributionmessage/config/DistributionProp.java b/src/main/java/com/github/distributionmessage/config/DistributionProp.java index eb05d65..95cf315 100644 --- a/src/main/java/com/github/distributionmessage/config/DistributionProp.java +++ b/src/main/java/com/github/distributionmessage/config/DistributionProp.java @@ -35,6 +35,8 @@ public class DistributionProp { private Map msgtypeDistribution; + private Map senderIdDistribution; + private Map percentageDistribution; //比重总数 diff --git a/src/main/java/com/github/distributionmessage/handler/DistributionSendingMessageHandler.java b/src/main/java/com/github/distributionmessage/handler/DistributionSendingMessageHandler.java index a2badf7..848a813 100644 --- a/src/main/java/com/github/distributionmessage/handler/DistributionSendingMessageHandler.java +++ b/src/main/java/com/github/distributionmessage/handler/DistributionSendingMessageHandler.java @@ -64,13 +64,14 @@ protected void handleMessageInternal(Message message) { playload = sm.getBytes(CommonConstant.CHARSET); } String dxpid = DistributionUtils.getDxpIdByMessage(sm); + String senderId = DistributionUtils.getSenderIdByMessage(sm); String msgtype = DistributionUtils.getMessageType(sm); - String queueName = DistributionUtils.getDestinationQueueName(this.distributionProp, dxpid, msgtype); + String queueName = DistributionUtils.getDestinationQueueName(this.distributionProp, dxpid, msgtype, senderId); logger.info("search queueName is [" + queueName + "]"); if (queueName.indexOf("|||") != -1) { String dir = queueName.replaceAll("\\|\\|\\|", ""); distributionMessageGateway.writeToFile(new File(dir), playload); - logger.info("dxpId=[" + dxpid + "] messageType=[" + msgtype + "] write to dir=[" + dir + "] use[" + logger.info("senderId=[" + senderId + "] dxpId=[" + dxpid + "] messageType=[" + msgtype + "] write to dir=[" + dir + "] use[" + ((double) (System.nanoTime() - startTime) / 1000000.0) + "]ms"); return; } else if (queueName.indexOf("||") != -1) { @@ -92,7 +93,7 @@ protected void handleMessageInternal(Message message) { SendMessageThread.getExecutorService().execute( null != useJmsTemplate ? new SendMessageThread(useJmsTemplate, playload, queue, messagePostProcessor) : new RabbitSendMessageThread(userRabbitmqTemplate, sm, queueName)); - logger.info("cache size [" + IntegrationConfiguration.CACHE_QUEUE.size() + "] dxpId=[" + dxpid + "] messageType=[" + logger.info("cache size [" + IntegrationConfiguration.CACHE_QUEUE.size() + "] senderId=[" + senderId + "] dxpId=[" + dxpid + "] messageType=[" + msgtype + "] ccsid=[" + useCcsid + "] distributionQueue=[" + queueName + "] use[" + ((double) (System.nanoTime() - startTime) / 1000000.0) + "]ms"); } catch (Exception e) { diff --git a/src/main/java/com/github/distributionmessage/utils/DistributionUtils.java b/src/main/java/com/github/distributionmessage/utils/DistributionUtils.java index 897784d..eb1aae5 100644 --- a/src/main/java/com/github/distributionmessage/utils/DistributionUtils.java +++ b/src/main/java/com/github/distributionmessage/utils/DistributionUtils.java @@ -15,7 +15,7 @@ public class DistributionUtils { private final static Log logger = LogFactory.getLog(DistributionUtils.class); - public static String getDestinationQueueName(DistributionProp distributionProp, String dxpid, String msgtype) { + public static String getDestinationQueueName(DistributionProp distributionProp, String dxpid, String msgtype, String senderId) { String result = null; if (distributionProp.getConditionMutualExclusion()) { @@ -25,6 +25,9 @@ public static String getDestinationQueueName(DistributionProp distributionProp, } else if (null != distributionProp.getMsgtypeDistribution() && !distributionProp.getMsgtypeDistribution().isEmpty()) { logger.debug("msgtype distribution"); result = distributionProp.getMsgtypeDistribution().get(msgtype); + } else if (null != distributionProp.getSenderIdDistribution() && !distributionProp.getSenderIdDistribution().isEmpty()) { + logger.debug("sender distribution"); + result = distributionProp.getSenderIdDistribution().get(senderId); } else if (null != distributionProp.getPercentageDistribution() && !distributionProp.getPercentageDistribution().isEmpty()) { logger.debug("percentage distribution"); if (distributionProp.isUpdate()) { @@ -68,6 +71,11 @@ public static String getDestinationQueueName(DistributionProp distributionProp, result = distributionProp.getMsgtypeDistribution().get(msgtype); } + if (null == result && null != distributionProp.getSenderIdDistribution() && !distributionProp.getSenderIdDistribution().isEmpty()) { + logger.debug("senderId distribution"); + result = distributionProp.getSenderIdDistribution().get(senderId); + } + if (null == result && null != distributionProp.getPercentageDistribution() && !distributionProp.getPercentageDistribution().isEmpty()) { logger.debug("percentage distribution"); if (distributionProp.isUpdate()) { @@ -173,6 +181,18 @@ public static String getDxpIdByMessage(String message) { return null; } + public static String getSenderIdByMessage(String message) { + if (null == message) { + return null; + } + Pattern pattern = Pattern.compile("(.+)"); + Matcher matcher = pattern.matcher(message); + if (matcher.find()) { + return matcher.group(1); + } + return null; + } + public static String getMessageType(String message) { if (null == message) { return null; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 5519585..297ad6a 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -17,16 +17,22 @@ distribution: rabbitOtherOutputQueue: conditionMutualExclusion: true # 优先级第一 map key:dxpid, value: 队列名称 如: {DXPENT0000011951: DXP_11951_ENT}, - # 加"|数字" 是 otherOutputQueue 中的索引, 加"||数字"是 rabbitOtherOutputQueue中的索引,rabbitOutputQueue为第一个, + # 加"|数字" 是 otherOutputQueue 中的索引, 加"||数字"是 rabbitOtherOutputQueue中的索引,rabbitOtherOutputQueue为第一个, # 加|||则前面为输入目录名,直接把数据写入到相应目录 dxpidDistribution: {} # 优先级第二 map key:msgtype, value: 队列名称 如: {CEB312Message: DXP_11951_ENT} msgtypeDistribution: {} - # 优先级第三 map key: 队列名称, value: 比重 如: {GGFW_TO_ENT: 1, GGFW_TO_ENT_INVT: 1} + # 优先级第三 map key:dxpid, value: 队列名称 如: {DXPENT0000011951: DXP_11951_ENT}, + # 加"|数字" 是 otherOutputQueue 中的索引, 加"||数字"是 rabbitOtherOutputQueue中的索引,rabbitOtherOutputQueue为第一个, + # 加|||则前面为输入目录名,直接把数据写入到相应目录 + senderIdDistribution: {} + # 优先级第四 map key: 队列名称, value: 比重 如: {GGFW_TO_ENT: 1, GGFW_TO_ENT_INVT: 1} percentageDistribution: {} - # 前面三个没有,才执行最后一个 列表格式,随机分发 + # 前面四个没有,才执行最后一个 列表格式,随机分发 randomDistribution: - # 所有没有区配到,就转发到默认队列,队列后面加一个:号,走second队列管理器, 两个::号走third队列管理器 + # 所有没有区配到,就转发到默认队列 + # 加"|数字" 是 otherOutputQueue 中的索引, 加"||数字"是 rabbitOtherOutputQueue中的索引,rabbitOtherOutputQueue为第一个, + # 加|||则前面为输入目录名,直接把数据写入到相应目录 defaultQueue: GGFW_TO_ENT filePrefix: FILE_RECEIVE_ fileSuffix: .xml