|  | @@ -12,10 +12,10 @@ import com.microsvc.toolkit.middleware.im.ImPluginContext;
 | 
											
												
													
														|  |  import com.microsvc.toolkit.middleware.im.impl.TencentCloudImPlugin;
 |  |  import com.microsvc.toolkit.middleware.im.impl.TencentCloudImPlugin;
 | 
											
												
													
														|  |  import com.microsvc.toolkit.middleware.im.message.MessageWrapper;
 |  |  import com.microsvc.toolkit.middleware.im.message.MessageWrapper;
 | 
											
												
													
														|  |  import com.microsvc.toolkit.middleware.im.message.TencentRequest;
 |  |  import com.microsvc.toolkit.middleware.im.message.TencentRequest;
 | 
											
												
													
														|  | -import com.yonge.cooleshow.auth.config.CustomerServiceConfig;
 |  | 
 | 
											
												
													
														|  |  import com.yonge.cooleshow.biz.dal.entity.CustomerServiceBatchSending;
 |  |  import com.yonge.cooleshow.biz.dal.entity.CustomerServiceBatchSending;
 | 
											
												
													
														|  |  import com.yonge.cooleshow.biz.dal.entity.CustomerServiceReceive;
 |  |  import com.yonge.cooleshow.biz.dal.entity.CustomerServiceReceive;
 | 
											
												
													
														|  |  import com.yonge.cooleshow.biz.dal.entity.Subject;
 |  |  import com.yonge.cooleshow.biz.dal.entity.Subject;
 | 
											
												
													
														|  | 
 |  | +import com.yonge.cooleshow.biz.dal.entity.SysConfig;
 | 
											
												
													
														|  |  import com.yonge.cooleshow.biz.dal.entity.SysUser;
 |  |  import com.yonge.cooleshow.biz.dal.entity.SysUser;
 | 
											
												
													
														|  |  import com.yonge.cooleshow.biz.dal.entity.Teacher;
 |  |  import com.yonge.cooleshow.biz.dal.entity.Teacher;
 | 
											
												
													
														|  |  import com.yonge.cooleshow.biz.dal.enums.ClientEnum;
 |  |  import com.yonge.cooleshow.biz.dal.enums.ClientEnum;
 | 
											
										
											
												
													
														|  | @@ -25,10 +25,16 @@ import com.yonge.cooleshow.biz.dal.enums.im.EImSendStatus;
 | 
											
												
													
														|  |  import com.yonge.cooleshow.biz.dal.enums.im.EImSendType;
 |  |  import com.yonge.cooleshow.biz.dal.enums.im.EImSendType;
 | 
											
												
													
														|  |  import com.yonge.cooleshow.biz.dal.mapper.CustomerServiceBatchSendingMapper;
 |  |  import com.yonge.cooleshow.biz.dal.mapper.CustomerServiceBatchSendingMapper;
 | 
											
												
													
														|  |  import com.yonge.cooleshow.biz.dal.mapper.SysUserMapper;
 |  |  import com.yonge.cooleshow.biz.dal.mapper.SysUserMapper;
 | 
											
												
													
														|  | -import com.yonge.cooleshow.biz.dal.service.*;
 |  | 
 | 
											
												
													
														|  | 
 |  | +import com.yonge.cooleshow.biz.dal.service.CustomerServiceBatchSendingService;
 | 
											
												
													
														|  | 
 |  | +import com.yonge.cooleshow.biz.dal.service.CustomerServiceReceiveService;
 | 
											
												
													
														|  | 
 |  | +import com.yonge.cooleshow.biz.dal.service.ImGroupService;
 | 
											
												
													
														|  | 
 |  | +import com.yonge.cooleshow.biz.dal.service.SubjectService;
 | 
											
												
													
														|  | 
 |  | +import com.yonge.cooleshow.biz.dal.service.SysConfigService;
 | 
											
												
													
														|  | 
 |  | +import com.yonge.cooleshow.biz.dal.service.TeacherService;
 | 
											
												
													
														|  |  import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerService;
 |  |  import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerService;
 | 
											
												
													
														|  |  import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceBatchSendingWrapper;
 |  |  import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceBatchSendingWrapper;
 | 
											
												
													
														|  |  import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceReceiveWrapper;
 |  |  import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceReceiveWrapper;
 | 
											
												
													
														|  | 
 |  | +import com.yonge.cooleshow.common.constant.SysConfigConstant;
 | 
											
												
													
														|  |  import com.yonge.toolset.base.exception.BizException;
 |  |  import com.yonge.toolset.base.exception.BizException;
 | 
											
												
													
														|  |  import com.yonge.toolset.base.util.ImUtil;
 |  |  import com.yonge.toolset.base.util.ImUtil;
 | 
											
												
													
														|  |  import com.yonge.toolset.base.util.ThreadPool;
 |  |  import com.yonge.toolset.base.util.ThreadPool;
 | 
											
										
											
												
													
														|  | @@ -37,9 +43,7 @@ import com.yonge.toolset.payment.util.DistributedLock;
 | 
											
												
													
														|  |  import io.rong.messages.BaseMessage;
 |  |  import io.rong.messages.BaseMessage;
 | 
											
												
													
														|  |  import io.rong.messages.ImgMessage;
 |  |  import io.rong.messages.ImgMessage;
 | 
											
												
													
														|  |  import io.rong.messages.TxtMessage;
 |  |  import io.rong.messages.TxtMessage;
 | 
											
												
													
														|  | -import io.rong.models.message.PrivateMessage;
 |  | 
 | 
											
												
													
														|  |  import io.rong.models.message.PushExt;
 |  |  import io.rong.models.message.PushExt;
 | 
											
												
													
														|  | -import io.rong.models.response.ResponseResult;
 |  | 
 | 
											
												
													
														|  |  import lombok.extern.slf4j.Slf4j;
 |  |  import lombok.extern.slf4j.Slf4j;
 | 
											
												
													
														|  |  import org.apache.commons.collections.CollectionUtils;
 |  |  import org.apache.commons.collections.CollectionUtils;
 | 
											
												
													
														|  |  import org.apache.commons.lang3.StringUtils;
 |  |  import org.apache.commons.lang3.StringUtils;
 | 
											
										
											
												
													
														|  | @@ -50,7 +54,15 @@ import org.springframework.stereotype.Service;
 | 
											
												
													
														|  |  import org.springframework.transaction.annotation.Transactional;
 |  |  import org.springframework.transaction.annotation.Transactional;
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  import java.text.MessageFormat;
 |  |  import java.text.MessageFormat;
 | 
											
												
													
														|  | -import java.util.*;
 |  | 
 | 
											
												
													
														|  | 
 |  | +import java.util.ArrayList;
 | 
											
												
													
														|  | 
 |  | +import java.util.Arrays;
 | 
											
												
													
														|  | 
 |  | +import java.util.Date;
 | 
											
												
													
														|  | 
 |  | +import java.util.List;
 | 
											
												
													
														|  | 
 |  | +import java.util.Map;
 | 
											
												
													
														|  | 
 |  | +import java.util.Objects;
 | 
											
												
													
														|  | 
 |  | +import java.util.Optional;
 | 
											
												
													
														|  | 
 |  | +import java.util.Random;
 | 
											
												
													
														|  | 
 |  | +import java.util.concurrent.TimeUnit;
 | 
											
												
													
														|  |  import java.util.function.Function;
 |  |  import java.util.function.Function;
 | 
											
												
													
														|  |  import java.util.stream.Collectors;
 |  |  import java.util.stream.Collectors;
 | 
											
												
													
														|  |  import java.util.stream.IntStream;
 |  |  import java.util.stream.IntStream;
 | 
											
										
											
												
													
														|  | @@ -67,8 +79,7 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
 | 
											
												
													
														|  |      private CustomerServiceReceiveService customerServiceReceiveService;
 |  |      private CustomerServiceReceiveService customerServiceReceiveService;
 | 
											
												
													
														|  |      @Autowired
 |  |      @Autowired
 | 
											
												
													
														|  |      private SubjectService subjectService;
 |  |      private SubjectService subjectService;
 | 
											
												
													
														|  | -    @Autowired
 |  | 
 | 
											
												
													
														|  | -    private CustomerServiceConfig customerServiceConfig;
 |  | 
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  |      @Autowired
 |  |      @Autowired
 | 
											
												
													
														|  |      private SysUserMapper sysUserMapper;
 |  |      private SysUserMapper sysUserMapper;
 | 
											
												
													
														|  |      @Autowired
 |  |      @Autowired
 | 
											
										
											
												
													
														|  | @@ -82,6 +93,9 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
 | 
											
												
													
														|  |      @Autowired
 |  |      @Autowired
 | 
											
												
													
														|  |      private ImGroupService imGroupService;
 |  |      private ImGroupService imGroupService;
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +    @Autowired
 | 
											
												
													
														|  | 
 |  | +    private SysConfigService sysConfigService;
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  |  	/**
 |  |  	/**
 | 
											
												
													
														|  |       * 查询详情
 |  |       * 查询详情
 | 
											
												
													
														|  |       * @param id 详情ID
 |  |       * @param id 详情ID
 | 
											
										
											
												
													
														|  | @@ -424,12 +438,38 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |                  try {
 |  |                  try {
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +                    // 消息发送数量,默认只发送1类消息(文字或图片)
 | 
											
												
													
														|  | 
 |  | +                    int messageNum = 1;
 | 
											
												
													
														|  | 
 |  | +                    if (StringUtils.isNoneBlank(info.getTextMessage(), info.getImgMessage())) {
 | 
											
												
													
														|  | 
 |  | +                        // 同时发送图片和文字消息
 | 
											
												
													
														|  | 
 |  | +                        messageNum += 1;
 | 
											
												
													
														|  | 
 |  | +                    }
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +                    int messageSendLimit; // 默认发送10000/分钟
 | 
											
												
													
														|  | 
 |  | +                    // 群发消息频率限制
 | 
											
												
													
														|  | 
 |  | +                    SysConfig config = sysConfigService.findByParamName(SysConfigConstant.MESSAGE_SEND_LIMIT);
 | 
											
												
													
														|  | 
 |  | +                    // 群发消息频率限制不能超过10000/分钟,超过当前值默认取值为10000
 | 
											
												
													
														|  | 
 |  | +                    if (config != null && config.getParamValue().matches("\\d+")) {
 | 
											
												
													
														|  | 
 |  | +                        int sendLimit = Integer.parseInt(config.getParamValue());
 | 
											
												
													
														|  | 
 |  | +                        if (sendLimit > 0 && sendLimit < 10000) {
 | 
											
												
													
														|  | 
 |  | +                            // 消息发送频率限制
 | 
											
												
													
														|  | 
 |  | +                            messageSendLimit = Integer.parseInt(config.getParamValue());
 | 
											
												
													
														|  | 
 |  | +                        } else {
 | 
											
												
													
														|  | 
 |  | +                            // 默认发送10000/分钟
 | 
											
												
													
														|  | 
 |  | +                            messageSendLimit = 10000;
 | 
											
												
													
														|  | 
 |  | +                        }
 | 
											
												
													
														|  | 
 |  | +                    } else {
 | 
											
												
													
														|  | 
 |  | +                        // 默认发送10000/分钟
 | 
											
												
													
														|  | 
 |  | +                        messageSendLimit = 10000;
 | 
											
												
													
														|  | 
 |  | +                    }
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +                    int finalMessageNum = messageNum;
 | 
											
												
													
														|  |                      if (EImReceiveType.ALL == info.getReceiveType()) {
 |  |                      if (EImReceiveType.ALL == info.getReceiveType()) {
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |                          // 推送消息给匹配用户
 |  |                          // 推送消息给匹配用户
 | 
											
												
													
														|  | -                        List<String> targetGroupes = Arrays.stream(info.getTargetGroup().split(",")).collect(Collectors.toList());
 |  | 
 | 
											
												
													
														|  | 
 |  | +                        List<String> targetGroups = Arrays.stream(info.getTargetGroup().split(",")).collect(Collectors.toList());
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | -                        targetGroupes.parallelStream().forEach(clientType -> {
 |  | 
 | 
											
												
													
														|  | 
 |  | +                        targetGroups.forEach(clientType -> {
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |                              CustomerService.NotifyMessage receiveQuery = CustomerService.NotifyMessage.builder()
 |  |                              CustomerService.NotifyMessage receiveQuery = CustomerService.NotifyMessage.builder()
 | 
											
												
													
														|  |                                      .clientType(ClientEnum.valueOf(clientType))
 |  |                                      .clientType(ClientEnum.valueOf(clientType))
 | 
											
										
											
												
													
														|  | @@ -445,6 +485,8 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |                              List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
 |  |                              List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +                            // 线程休眠计算器
 | 
											
												
													
														|  | 
 |  | +                            int sleepCount = 0;
 | 
											
												
													
														|  |                              for (Integer pageNum : collect) {
 |  |                              for (Integer pageNum : collect) {
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |                                  List<String> receiveUserIds = getBaseMapper().selectMessageReceives(PageUtil.getPage(pageNum, limit), receiveQuery).stream()
 |  |                                  List<String> receiveUserIds = getBaseMapper().selectMessageReceives(PageUtil.getPage(pageNum, limit), receiveQuery).stream()
 | 
											
										
											
												
													
														|  | @@ -455,6 +497,9 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
 | 
											
												
													
														|  |                                          .map(x -> imGroupService.getImUserId(x, clientType))
 |  |                                          .map(x -> imGroupService.getImUserId(x, clientType))
 | 
											
												
													
														|  |                                          .collect(Collectors.toList());
 |  |                                          .collect(Collectors.toList());
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +                                // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟
 | 
											
												
													
														|  | 
 |  | +                                sleepCount = messageSendSleepCondition(info, sleepCount, receiveUserIds.size(), finalMessageNum, messageSendLimit);
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  |                                  batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds);
 |  |                                  batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds);
 | 
											
												
													
														|  |                              }
 |  |                              }
 | 
											
												
													
														|  |                          });
 |  |                          });
 | 
											
										
											
												
													
														|  | @@ -474,12 +519,17 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |                          List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
 |  |                          List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +                        // 线程休眠计算器
 | 
											
												
													
														|  | 
 |  | +                        int sleepCount = 0;
 | 
											
												
													
														|  |                          for (Integer pageNum : collect) {
 |  |                          for (Integer pageNum : collect) {
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |                              List<String> receiveUserIds = customerServiceReceiveService.selectPage(PageUtil.getPage(pageNum, limit), receiveQuery).getRecords().stream()
 |  |                              List<String> receiveUserIds = customerServiceReceiveService.selectPage(PageUtil.getPage(pageNum, limit), receiveQuery).getRecords().stream()
 | 
											
												
													
														|  |                                      .map(x -> imGroupService.getImUserId(String.valueOf(x.getUserId()), x.getClientType()))
 |  |                                      .map(x -> imGroupService.getImUserId(String.valueOf(x.getUserId()), x.getClientType()))
 | 
											
												
													
														|  |                                      .collect(Collectors.toList());
 |  |                                      .collect(Collectors.toList());
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +                            // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟
 | 
											
												
													
														|  | 
 |  | +                            sleepCount = messageSendSleepCondition(info, sleepCount, receiveUserIds.size(), finalMessageNum, messageSendLimit);
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  |                              batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds);
 |  |                              batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds);
 | 
											
												
													
														|  |                          }
 |  |                          }
 | 
											
												
													
														|  |                      }
 |  |                      }
 | 
											
										
											
												
													
														|  | @@ -498,7 +548,35 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |              });
 |  |              });
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | -        });
 |  | 
 | 
											
												
													
														|  | 
 |  | +        }, 30L, TimeUnit.MINUTES);
 | 
											
												
													
														|  | 
 |  | +    }
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +    /**
 | 
											
												
													
														|  | 
 |  | +     * 消息发送休眠条件
 | 
											
												
													
														|  | 
 |  | +     * @param info CustomerServiceBatchSending
 | 
											
												
													
														|  | 
 |  | +     * @param sleepCount 休眠次数统计
 | 
											
												
													
														|  | 
 |  | +     * @param receiveSize 发送人数
 | 
											
												
													
														|  | 
 |  | +     * @param finalMessageNum 发送消息数
 | 
											
												
													
														|  | 
 |  | +     * @param messageSendLimit 消息休眠频率限制
 | 
											
												
													
														|  | 
 |  | +     * @return int
 | 
											
												
													
														|  | 
 |  | +     */
 | 
											
												
													
														|  | 
 |  | +    private static int messageSendSleepCondition(CustomerServiceBatchSending info, int sleepCount,
 | 
											
												
													
														|  | 
 |  | +                                                 int receiveSize, int finalMessageNum, int messageSendLimit) {
 | 
											
												
													
														|  | 
 |  | +        // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟
 | 
											
												
													
														|  | 
 |  | +        sleepCount += receiveSize * finalMessageNum;
 | 
											
												
													
														|  | 
 |  | +        if (sleepCount > messageSendLimit) {
 | 
											
												
													
														|  | 
 |  | +            try {
 | 
											
												
													
														|  | 
 |  | +                // 线程休眠1分钟
 | 
											
												
													
														|  | 
 |  | +                Thread.sleep(1000 * 60);
 | 
											
												
													
														|  | 
 |  | +                log.info("messageSendSleepCondition THREAD_SLEEP END, sleepCount={}, receiveSize={}, finalMessageNum={}, messageSendLimit={}",
 | 
											
												
													
														|  | 
 |  | +                    sleepCount, receiveSize, finalMessageNum, messageSendLimit);
 | 
											
												
													
														|  | 
 |  | +            } catch (InterruptedException e) {
 | 
											
												
													
														|  | 
 |  | +                log.error("messageSendSleepCondition THREAD_SLEEP EX id={}", info.getId(), e);
 | 
											
												
													
														|  | 
 |  | +            }
 | 
											
												
													
														|  | 
 |  | +            // 重置线程休眠计算器
 | 
											
												
													
														|  | 
 |  | +            sleepCount = 0;
 | 
											
												
													
														|  | 
 |  | +        }
 | 
											
												
													
														|  | 
 |  | +        return sleepCount;
 | 
											
												
													
														|  |      }
 |  |      }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |      /**
 |  |      /**
 | 
											
										
											
												
													
														|  | @@ -563,7 +641,6 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
 | 
											
												
													
														|  |      /**
 |  |      /**
 | 
											
												
													
														|  |       * 推送消息类型
 |  |       * 推送消息类型
 | 
											
												
													
														|  |       * @param info CustomerServiceBatchSending
 |  |       * @param info CustomerServiceBatchSending
 | 
											
												
													
														|  | -     * @return List<BaseMessage>
 |  | 
 | 
											
												
													
														|  |       */
 |  |       */
 | 
											
												
													
														|  |      private static void getReceiveMessage(CustomerServiceBatchSending info, List<BaseMessage> messages, List<TencentRequest.MessageBody> tencentMessages) {
 |  |      private static void getReceiveMessage(CustomerServiceBatchSending info, List<BaseMessage> messages, List<TencentRequest.MessageBody> tencentMessages) {
 | 
											
												
													
														|  |  
 |  |  
 |