瀏覽代碼

fix 群发消息频率限制

Eric 10 月之前
父節點
當前提交
efd0e94ec1

+ 5 - 0
cooleshow-common/src/main/java/com/yonge/cooleshow/common/constant/SysConfigConstant.java

@@ -445,4 +445,9 @@ public interface SysConfigConstant {
      * 群成员人数限制
      */
     String GROUP_MEMBER_LIMIT = "group_member_limit";
+
+    /**
+     * 消息发送频率限制
+     */
+    String MESSAGE_SEND_LIMIT = "message_send_limit";
 }

+ 77 - 10
cooleshow-user/user-biz/src/main/java/com/yonge/cooleshow/biz/dal/service/impl/CustomerServiceBatchSendingServiceImpl.java

@@ -12,10 +12,10 @@ import com.microsvc.toolkit.middleware.im.ImPluginContext;
 import com.microsvc.toolkit.middleware.im.impl.TencentCloudImPlugin;
 import com.microsvc.toolkit.middleware.im.message.MessageWrapper;
 import com.microsvc.toolkit.middleware.im.message.TencentRequest;
-import com.yonge.cooleshow.auth.config.CustomerServiceConfig;
 import com.yonge.cooleshow.biz.dal.entity.CustomerServiceBatchSending;
 import com.yonge.cooleshow.biz.dal.entity.CustomerServiceReceive;
 import com.yonge.cooleshow.biz.dal.entity.Subject;
+import com.yonge.cooleshow.biz.dal.entity.SysConfig;
 import com.yonge.cooleshow.biz.dal.entity.SysUser;
 import com.yonge.cooleshow.biz.dal.entity.Teacher;
 import com.yonge.cooleshow.biz.dal.enums.ClientEnum;
@@ -25,10 +25,16 @@ import com.yonge.cooleshow.biz.dal.enums.im.EImSendStatus;
 import com.yonge.cooleshow.biz.dal.enums.im.EImSendType;
 import com.yonge.cooleshow.biz.dal.mapper.CustomerServiceBatchSendingMapper;
 import com.yonge.cooleshow.biz.dal.mapper.SysUserMapper;
-import com.yonge.cooleshow.biz.dal.service.*;
+import com.yonge.cooleshow.biz.dal.service.CustomerServiceBatchSendingService;
+import com.yonge.cooleshow.biz.dal.service.CustomerServiceReceiveService;
+import com.yonge.cooleshow.biz.dal.service.ImGroupService;
+import com.yonge.cooleshow.biz.dal.service.SubjectService;
+import com.yonge.cooleshow.biz.dal.service.SysConfigService;
+import com.yonge.cooleshow.biz.dal.service.TeacherService;
 import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerService;
 import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceBatchSendingWrapper;
 import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceReceiveWrapper;
+import com.yonge.cooleshow.common.constant.SysConfigConstant;
 import com.yonge.toolset.base.exception.BizException;
 import com.yonge.toolset.base.util.ImUtil;
 import com.yonge.toolset.base.util.ThreadPool;
@@ -37,9 +43,7 @@ import com.yonge.toolset.payment.util.DistributedLock;
 import io.rong.messages.BaseMessage;
 import io.rong.messages.ImgMessage;
 import io.rong.messages.TxtMessage;
-import io.rong.models.message.PrivateMessage;
 import io.rong.models.message.PushExt;
-import io.rong.models.response.ResponseResult;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -50,7 +54,14 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.text.MessageFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Random;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -67,8 +78,7 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
     private CustomerServiceReceiveService customerServiceReceiveService;
     @Autowired
     private SubjectService subjectService;
-    @Autowired
-    private CustomerServiceConfig customerServiceConfig;
+
     @Autowired
     private SysUserMapper sysUserMapper;
     @Autowired
@@ -82,6 +92,9 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
     @Autowired
     private ImGroupService imGroupService;
 
+    @Autowired
+    private SysConfigService sysConfigService;
+
 	/**
      * 查询详情
      * @param id 详情ID
@@ -424,12 +437,31 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
 
                 try {
 
+                    // 消息发送数量,默认只发送1类消息(文字或图片)
+                    int messageNum = 1;
+                    if (StringUtils.isNoneBlank(info.getTextMessage(), info.getImgMessage())) {
+                        // 同时发送图片和文字消息
+                        messageNum += 1;
+                    }
+
+                    int messageSendLimit; // 默认发送11000/分钟
+                    // 群发消息频率限制
+                    SysConfig config = sysConfigService.findByParamName(SysConfigConstant.MESSAGE_SEND_LIMIT);
+                    // 群发消息频率限制不能超过11000/分钟,超过当前值默认取值为11000
+                    if (config != null && config.getParamValue().matches("\\d+")
+                        && Integer.parseInt(config.getParamValue()) < 11000) {
+                        messageSendLimit = Integer.parseInt(config.getParamValue());
+                    } else {
+                        messageSendLimit = 11000;
+                    }
+
+                    int finalMessageNum = messageNum;
                     if (EImReceiveType.ALL == info.getReceiveType()) {
 
                         // 推送消息给匹配用户
-                        List<String> targetGroupes = Arrays.stream(info.getTargetGroup().split(",")).collect(Collectors.toList());
+                        List<String> targetGroups = Arrays.stream(info.getTargetGroup().split(",")).collect(Collectors.toList());
 
-                        targetGroupes.parallelStream().forEach(clientType -> {
+                        targetGroups.parallelStream().forEach(clientType -> {
 
                             CustomerService.NotifyMessage receiveQuery = CustomerService.NotifyMessage.builder()
                                     .clientType(ClientEnum.valueOf(clientType))
@@ -445,6 +477,8 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
 
                             List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
 
+                            // 线程休眠计算器
+                            int sleepCount = 0;
                             for (Integer pageNum : collect) {
 
                                 List<String> receiveUserIds = getBaseMapper().selectMessageReceives(PageUtil.getPage(pageNum, limit), receiveQuery).stream()
@@ -455,6 +489,8 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
                                         .map(x -> imGroupService.getImUserId(x, clientType))
                                         .collect(Collectors.toList());
 
+                                sleepCount = messageSendSleepCondition(info, sleepCount, receiveUserIds.size(), finalMessageNum, messageSendLimit);
+
                                 batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds);
                             }
                         });
@@ -474,12 +510,17 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
 
                         List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
 
+                        // 线程休眠计算器
+                        int sleepCount = 0;
                         for (Integer pageNum : collect) {
 
                             List<String> receiveUserIds = customerServiceReceiveService.selectPage(PageUtil.getPage(pageNum, limit), receiveQuery).getRecords().stream()
                                     .map(x -> imGroupService.getImUserId(String.valueOf(x.getUserId()), x.getClientType()))
                                     .collect(Collectors.toList());
 
+                            // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟
+                            sleepCount = messageSendSleepCondition(info, sleepCount, receiveUserIds.size(), finalMessageNum, messageSendLimit);
+
                             batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds);
                         }
                     }
@@ -502,6 +543,33 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
     }
 
     /**
+     * 消息发送休眠条件
+     * @param info CustomerServiceBatchSending
+     * @param sleepCount 休眠次数统计
+     * @param receiveSize 发送人数
+     * @param finalMessageNum 发送消息数
+     * @param messageSendLimit 消息休眠频率限制
+     * @return int
+     */
+    private static int messageSendSleepCondition(CustomerServiceBatchSending info, int sleepCount,
+                                                 int receiveSize, int finalMessageNum, int messageSendLimit) {
+        // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟
+        sleepCount += receiveSize * finalMessageNum;
+        if (sleepCount > messageSendLimit) {
+            try {
+                // 线程休眠1分钟
+                Thread.sleep(1000 * 60);
+                log.info("messageSendSleepCondition THREAD_SLEEP END");
+            } catch (InterruptedException e) {
+                log.error("messageSendSleepCondition THREAD_SLEEP EX id={}", info.getId(), e);
+            }
+            // 重置线程休眠计算器
+            sleepCount = 0;
+        }
+        return sleepCount;
+    }
+
+    /**
      * 批量发送客服消息
      * @param info CustomerServiceBatchSending
      * @param receiveNums 接收消息用户数
@@ -563,7 +631,6 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
     /**
      * 推送消息类型
      * @param info CustomerServiceBatchSending
-     * @return List<BaseMessage>
      */
     private static void getReceiveMessage(CustomerServiceBatchSending info, List<BaseMessage> messages, List<TencentRequest.MessageBody> tencentMessages) {