| 
					
				 | 
			
			
				@@ -12,10 +12,10 @@ import com.microsvc.toolkit.middleware.im.ImPluginContext; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.microsvc.toolkit.middleware.im.impl.TencentCloudImPlugin; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.microsvc.toolkit.middleware.im.message.MessageWrapper; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.microsvc.toolkit.middleware.im.message.TencentRequest; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import com.yonge.cooleshow.auth.config.CustomerServiceConfig; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.yonge.cooleshow.biz.dal.entity.CustomerServiceBatchSending; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.yonge.cooleshow.biz.dal.entity.CustomerServiceReceive; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.yonge.cooleshow.biz.dal.entity.Subject; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.yonge.cooleshow.biz.dal.entity.SysConfig; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.yonge.cooleshow.biz.dal.entity.SysUser; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.yonge.cooleshow.biz.dal.entity.Teacher; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.yonge.cooleshow.biz.dal.enums.ClientEnum; 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -25,10 +25,16 @@ import com.yonge.cooleshow.biz.dal.enums.im.EImSendStatus; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.yonge.cooleshow.biz.dal.enums.im.EImSendType; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.yonge.cooleshow.biz.dal.mapper.CustomerServiceBatchSendingMapper; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.yonge.cooleshow.biz.dal.mapper.SysUserMapper; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import com.yonge.cooleshow.biz.dal.service.*; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.yonge.cooleshow.biz.dal.service.CustomerServiceBatchSendingService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.yonge.cooleshow.biz.dal.service.CustomerServiceReceiveService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.yonge.cooleshow.biz.dal.service.ImGroupService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.yonge.cooleshow.biz.dal.service.SubjectService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.yonge.cooleshow.biz.dal.service.SysConfigService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.yonge.cooleshow.biz.dal.service.TeacherService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceBatchSendingWrapper; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceReceiveWrapper; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.yonge.cooleshow.common.constant.SysConfigConstant; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.yonge.toolset.base.exception.BizException; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.yonge.toolset.base.util.ImUtil; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.yonge.toolset.base.util.ThreadPool; 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -37,9 +43,7 @@ import com.yonge.toolset.payment.util.DistributedLock; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import io.rong.messages.BaseMessage; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import io.rong.messages.ImgMessage; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import io.rong.messages.TxtMessage; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import io.rong.models.message.PrivateMessage; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import io.rong.models.message.PushExt; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import io.rong.models.response.ResponseResult; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import lombok.extern.slf4j.Slf4j; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.apache.commons.collections.CollectionUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.apache.commons.lang3.StringUtils; 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -50,7 +54,15 @@ import org.springframework.stereotype.Service; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.springframework.transaction.annotation.Transactional; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import java.text.MessageFormat; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import java.util.*; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.ArrayList; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.Arrays; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.Date; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.List; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.Map; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.Objects; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.Optional; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.Random; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.concurrent.TimeUnit; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import java.util.function.Function; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import java.util.stream.Collectors; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import java.util.stream.IntStream; 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -67,8 +79,7 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     private CustomerServiceReceiveService customerServiceReceiveService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     @Autowired 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     private SubjectService subjectService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    @Autowired 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    private CustomerServiceConfig customerServiceConfig; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     @Autowired 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     private SysUserMapper sysUserMapper; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     @Autowired 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -82,6 +93,9 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     @Autowired 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     private ImGroupService imGroupService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Autowired 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private SysConfigService sysConfigService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 	/** 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				      * 查询详情 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				      * @param id 详情ID 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -424,12 +438,38 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    // 消息发送数量,默认只发送1类消息(文字或图片) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    int messageNum = 1; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    if (StringUtils.isNoneBlank(info.getTextMessage(), info.getImgMessage())) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        // 同时发送图片和文字消息 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        messageNum += 1; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    int messageSendLimit; // 默认发送10000/分钟 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    // 群发消息频率限制 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    SysConfig config = sysConfigService.findByParamName(SysConfigConstant.MESSAGE_SEND_LIMIT); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    // 群发消息频率限制不能超过10000/分钟,超过当前值默认取值为10000 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    if (config != null && config.getParamValue().matches("\\d+")) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        int sendLimit = Integer.parseInt(config.getParamValue()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        if (sendLimit > 0 && sendLimit < 10000) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            // 消息发送频率限制 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            messageSendLimit = Integer.parseInt(config.getParamValue()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            // 默认发送10000/分钟 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            messageSendLimit = 10000; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        // 默认发送10000/分钟 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        messageSendLimit = 10000; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    int finalMessageNum = messageNum; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                     if (EImReceiveType.ALL == info.getReceiveType()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                         // 推送消息给匹配用户 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        List<String> targetGroupes = Arrays.stream(info.getTargetGroup().split(",")).collect(Collectors.toList()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        List<String> targetGroups = Arrays.stream(info.getTargetGroup().split(",")).collect(Collectors.toList()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        targetGroupes.parallelStream().forEach(clientType -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        targetGroups.forEach(clientType -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                             CustomerService.NotifyMessage receiveQuery = CustomerService.NotifyMessage.builder() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                     .clientType(ClientEnum.valueOf(clientType)) 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -445,6 +485,8 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                             List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            // 线程休眠计算器 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            int sleepCount = 0; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                             for (Integer pageNum : collect) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                 List<String> receiveUserIds = getBaseMapper().selectMessageReceives(PageUtil.getPage(pageNum, limit), receiveQuery).stream() 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -455,6 +497,9 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                         .map(x -> imGroupService.getImUserId(x, clientType)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                         .collect(Collectors.toList()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                sleepCount = messageSendSleepCondition(info, sleepCount, receiveUserIds.size(), finalMessageNum, messageSendLimit); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                 batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                             } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                         }); 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -474,12 +519,17 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                         List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        // 线程休眠计算器 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        int sleepCount = 0; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                         for (Integer pageNum : collect) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                             List<String> receiveUserIds = customerServiceReceiveService.selectPage(PageUtil.getPage(pageNum, limit), receiveQuery).getRecords().stream() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                     .map(x -> imGroupService.getImUserId(String.valueOf(x.getUserId()), x.getClientType())) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                     .collect(Collectors.toList()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            sleepCount = messageSendSleepCondition(info, sleepCount, receiveUserIds.size(), finalMessageNum, messageSendLimit); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                             batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                     } 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -498,7 +548,35 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        }, 30L, TimeUnit.MINUTES); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /** 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+     * 消息发送休眠条件 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+     * @param info CustomerServiceBatchSending 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+     * @param sleepCount 休眠次数统计 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+     * @param receiveSize 发送人数 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+     * @param finalMessageNum 发送消息数 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+     * @param messageSendLimit 消息休眠频率限制 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+     * @return int 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+     */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private static int messageSendSleepCondition(CustomerServiceBatchSending info, int sleepCount, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                                 int receiveSize, int finalMessageNum, int messageSendLimit) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        sleepCount += receiveSize * finalMessageNum; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (sleepCount > messageSendLimit) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                // 线程休眠1分钟 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                Thread.sleep(1000 * 60); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                log.info("messageSendSleepCondition THREAD_SLEEP END, sleepCount={}, receiveSize={}, finalMessageNum={}, messageSendLimit={}", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    sleepCount, receiveSize, finalMessageNum, messageSendLimit); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } catch (InterruptedException e) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                log.error("messageSendSleepCondition THREAD_SLEEP EX id={}", info.getId(), e); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            // 重置线程休眠计算器 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            sleepCount = 0; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return sleepCount; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     /** 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -563,7 +641,6 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     /** 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				      * 推送消息类型 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				      * @param info CustomerServiceBatchSending 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-     * @return List<BaseMessage> 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				      */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     private static void getReceiveMessage(CustomerServiceBatchSending info, List<BaseMessage> messages, List<TencentRequest.MessageBody> tencentMessages) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 |