|  | @@ -25,6 +25,7 @@ import com.yonge.cooleshow.biz.dal.enums.im.EImSendStatus;
 | 
	
		
			
				|  |  |  import com.yonge.cooleshow.biz.dal.enums.im.EImSendType;
 | 
	
		
			
				|  |  |  import com.yonge.cooleshow.biz.dal.mapper.CustomerServiceBatchSendingMapper;
 | 
	
		
			
				|  |  |  import com.yonge.cooleshow.biz.dal.mapper.SysUserMapper;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.redisson.RedissonMessageService;
 | 
	
		
			
				|  |  |  import com.yonge.cooleshow.biz.dal.service.CustomerServiceBatchSendingService;
 | 
	
		
			
				|  |  |  import com.yonge.cooleshow.biz.dal.service.CustomerServiceReceiveService;
 | 
	
		
			
				|  |  |  import com.yonge.cooleshow.biz.dal.service.ImGroupService;
 | 
	
	
		
			
				|  | @@ -34,6 +35,7 @@ import com.yonge.cooleshow.biz.dal.service.TeacherService;
 | 
	
		
			
				|  |  |  import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerService;
 | 
	
		
			
				|  |  |  import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceBatchSendingWrapper;
 | 
	
		
			
				|  |  |  import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceReceiveWrapper;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.wrapper.liveroom.LiveRoomWrapper;
 | 
	
		
			
				|  |  |  import com.yonge.cooleshow.common.constant.SysConfigConstant;
 | 
	
		
			
				|  |  |  import com.yonge.toolset.base.exception.BizException;
 | 
	
		
			
				|  |  |  import com.yonge.toolset.base.util.ImUtil;
 | 
	
	
		
			
				|  | @@ -48,6 +50,8 @@ import lombok.extern.slf4j.Slf4j;
 | 
	
		
			
				|  |  |  import org.apache.commons.collections.CollectionUtils;
 | 
	
		
			
				|  |  |  import org.apache.commons.lang3.StringUtils;
 | 
	
		
			
				|  |  |  import org.joda.time.DateTime;
 | 
	
		
			
				|  |  | +import org.redisson.api.RBucket;
 | 
	
		
			
				|  |  | +import org.redisson.api.RLock;
 | 
	
		
			
				|  |  |  import org.redisson.api.RedissonClient;
 | 
	
		
			
				|  |  |  import org.springframework.beans.factory.annotation.Autowired;
 | 
	
		
			
				|  |  |  import org.springframework.stereotype.Service;
 | 
	
	
		
			
				|  | @@ -96,6 +100,8 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
 | 
	
		
			
				|  |  |      @Autowired
 | 
	
		
			
				|  |  |      private SysConfigService sysConfigService;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    @Autowired
 | 
	
		
			
				|  |  | +    private RedissonMessageService redissonMessageService;
 | 
	
		
			
				|  |  |  	/**
 | 
	
		
			
				|  |  |       * 查询详情
 | 
	
		
			
				|  |  |       * @param id 详情ID
 | 
	
	
		
			
				|  | @@ -453,154 +459,198 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
 | 
	
		
			
				|  |  |       * @param info CustomerServiceBatchSending
 | 
	
		
			
				|  |  |       */
 | 
	
		
			
				|  |  |      private void asyncBatchSendingMessage(CustomerServiceBatchSending info) {
 | 
	
		
			
				|  |  | -        String lockName = MessageFormat.format("batchSending:{0}", String.valueOf(info.getId()));
 | 
	
		
			
				|  |  | -        // 消息状态判定,且不能重复发送
 | 
	
		
			
				|  |  | -        DistributedLock.of(redissonClient).runIfLockCanGet(lockName, () -> {
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -            // 异步发送消息且同步更新已发送人数,稍后可在页面刷新查看已发送用户数
 | 
	
		
			
				|  |  | -            ThreadPool.getExecutor().submit(() -> {
 | 
	
		
			
				|  |  | -                // 接收消息用户数
 | 
	
		
			
				|  |  | -                List<Integer> receiveNums = Lists.newCopyOnWriteArrayList();
 | 
	
		
			
				|  |  | -                // 分页数限制
 | 
	
		
			
				|  |  | -                int limit = 500;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                try {
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                    // 消息发送数量,默认只发送1类消息(文字或图片)
 | 
	
		
			
				|  |  | -                    int messageNum = 1;
 | 
	
		
			
				|  |  | -                    if (StringUtils.isNoneBlank(info.getTextMessage(), info.getImgMessage())) {
 | 
	
		
			
				|  |  | -                        // 同时发送图片和文字消息
 | 
	
		
			
				|  |  | -                        messageNum += 1;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 改状态为发送中,标记为发送中但是未发送
 | 
	
		
			
				|  |  | +        if (info.getId() == null) {
 | 
	
		
			
				|  |  | +            return;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        CustomerServiceBatchSending byId = getById(info.getId());
 | 
	
		
			
				|  |  | +        if (byId == null) {
 | 
	
		
			
				|  |  | +            return;
 | 
	
		
			
				|  |  | +        }else if (byId.getSendStatus() == EImSendStatus.SEND) {
 | 
	
		
			
				|  |  | +            return;
 | 
	
		
			
				|  |  | +        } else if (byId.getSendStatus() == EImSendStatus.SENDING && byId.getSendFlag()) {
 | 
	
		
			
				|  |  | +            return;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        CustomerServiceBatchSending customerServiceBatchSending = new CustomerServiceBatchSending();
 | 
	
		
			
				|  |  | +        customerServiceBatchSending.setId(info.getId());
 | 
	
		
			
				|  |  | +        customerServiceBatchSending.setSendFlag(false);
 | 
	
		
			
				|  |  | +        customerServiceBatchSending.setSendTime(new Date());
 | 
	
		
			
				|  |  | +        customerServiceBatchSending.setSendStatus(EImSendStatus.SENDING);
 | 
	
		
			
				|  |  | +        updateById(customerServiceBatchSending);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 如果有发送中,不再发送
 | 
	
		
			
				|  |  | +        Integer count = lambdaQuery()
 | 
	
		
			
				|  |  | +            .eq(CustomerServiceBatchSending::getSendStatus, EImSendStatus.SENDING)
 | 
	
		
			
				|  |  | +            .eq(CustomerServiceBatchSending::getSendFlag, 1)
 | 
	
		
			
				|  |  | +            .count();
 | 
	
		
			
				|  |  | +        if (count > 0) {
 | 
	
		
			
				|  |  | +            return;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 异步发送消息且同步更新已发送人数,稍后可在页面刷新查看已发送用户数
 | 
	
		
			
				|  |  | +        ThreadPool.getExecutor().submit(() -> {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // 加锁, 同时只能有一个在发送
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            String lockName = MessageFormat.format("batchSending:{0}", String.valueOf(info.getId()));
 | 
	
		
			
				|  |  | +            // 消息状态判定,且不能重复发送
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            DistributedLock.of(redissonClient).runIfLockCanGet(lockName, () -> sendMessage(info), 30L, TimeUnit.MINUTES);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        });
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    private void sendMessage(CustomerServiceBatchSending info) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        lambdaUpdate()
 | 
	
		
			
				|  |  | +            .eq(CustomerServiceBatchSending::getId, info.getId())
 | 
	
		
			
				|  |  | +            .set(CustomerServiceBatchSending::getSendFlag, 1)
 | 
	
		
			
				|  |  | +            .update();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 接收消息用户数
 | 
	
		
			
				|  |  | +        List<Integer> receiveNums = Lists.newCopyOnWriteArrayList();
 | 
	
		
			
				|  |  | +        // 分页数限制
 | 
	
		
			
				|  |  | +        int limit = 500;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        try {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // 消息发送数量,默认只发送1类消息(文字或图片)
 | 
	
		
			
				|  |  | +            int messageNum = 1;
 | 
	
		
			
				|  |  | +            if (StringUtils.isNoneBlank(info.getTextMessage(), info.getImgMessage())) {
 | 
	
		
			
				|  |  | +                // 同时发送图片和文字消息
 | 
	
		
			
				|  |  | +                messageNum += 1;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            int messageSendLimit; // 默认发送10000/分钟
 | 
	
		
			
				|  |  | +            // 群发消息频率限制
 | 
	
		
			
				|  |  | +            SysConfig config = sysConfigService.findByParamName(SysConfigConstant.MESSAGE_SEND_LIMIT);
 | 
	
		
			
				|  |  | +            // 群发消息频率限制不能超过10000/分钟,超过当前值默认取值为10000
 | 
	
		
			
				|  |  | +            if (config != null && config.getParamValue().matches("\\d+")) {
 | 
	
		
			
				|  |  | +                int sendLimit = Integer.parseInt(config.getParamValue());
 | 
	
		
			
				|  |  | +                if (sendLimit > 0 && sendLimit < 10000) {
 | 
	
		
			
				|  |  | +                    // 消息发送频率限制
 | 
	
		
			
				|  |  | +                    messageSendLimit = Integer.parseInt(config.getParamValue());
 | 
	
		
			
				|  |  | +                } else {
 | 
	
		
			
				|  |  | +                    // 默认发送10000/分钟
 | 
	
		
			
				|  |  | +                    messageSendLimit = 10000;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            } else {
 | 
	
		
			
				|  |  | +                // 默认发送10000/分钟
 | 
	
		
			
				|  |  | +                messageSendLimit = 10000;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            int finalMessageNum = messageNum;
 | 
	
		
			
				|  |  | +            if (EImReceiveType.ALL == info.getReceiveType()) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                // 推送消息给匹配用户
 | 
	
		
			
				|  |  | +                List<String> targetGroups = Arrays.stream(info.getTargetGroup().split(",")).collect(Collectors.toList());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                targetGroups.forEach(clientType -> {
 | 
	
		
			
				|  |  | +                    boolean tenantFlag = false;
 | 
	
		
			
				|  |  | +                    switch (clientType) {
 | 
	
		
			
				|  |  | +                        case "TEACHER":
 | 
	
		
			
				|  |  | +                            clientType = "TEACHER";
 | 
	
		
			
				|  |  | +                            break;
 | 
	
		
			
				|  |  | +                        case "STUDENT":
 | 
	
		
			
				|  |  | +                            clientType = "STUDENT";
 | 
	
		
			
				|  |  | +                            break;
 | 
	
		
			
				|  |  | +                        case "TENANT_TEACHER":
 | 
	
		
			
				|  |  | +                            clientType = "TEACHER";
 | 
	
		
			
				|  |  | +                            tenantFlag = true;
 | 
	
		
			
				|  |  | +                            break;
 | 
	
		
			
				|  |  | +                        case "TENANT_STUDENT":
 | 
	
		
			
				|  |  | +                            clientType = "STUDENT";
 | 
	
		
			
				|  |  | +                            tenantFlag = true;
 | 
	
		
			
				|  |  | +                            break;
 | 
	
		
			
				|  |  |                      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                    int messageSendLimit; // 默认发送10000/分钟
 | 
	
		
			
				|  |  | -                    // 群发消息频率限制
 | 
	
		
			
				|  |  | -                    SysConfig config = sysConfigService.findByParamName(SysConfigConstant.MESSAGE_SEND_LIMIT);
 | 
	
		
			
				|  |  | -                    // 群发消息频率限制不能超过10000/分钟,超过当前值默认取值为10000
 | 
	
		
			
				|  |  | -                    if (config != null && config.getParamValue().matches("\\d+")) {
 | 
	
		
			
				|  |  | -                        int sendLimit = Integer.parseInt(config.getParamValue());
 | 
	
		
			
				|  |  | -                        if (sendLimit > 0 && sendLimit < 10000) {
 | 
	
		
			
				|  |  | -                            // 消息发送频率限制
 | 
	
		
			
				|  |  | -                            messageSendLimit = Integer.parseInt(config.getParamValue());
 | 
	
		
			
				|  |  | -                        } else {
 | 
	
		
			
				|  |  | -                            // 默认发送10000/分钟
 | 
	
		
			
				|  |  | -                            messageSendLimit = 10000;
 | 
	
		
			
				|  |  | -                        }
 | 
	
		
			
				|  |  | -                    } else {
 | 
	
		
			
				|  |  | -                        // 默认发送10000/分钟
 | 
	
		
			
				|  |  | -                        messageSendLimit = 10000;
 | 
	
		
			
				|  |  | +                    CustomerService.NotifyMessage receiveQuery = CustomerService.NotifyMessage.builder()
 | 
	
		
			
				|  |  | +                        .clientType(ClientEnum.valueOf(clientType))
 | 
	
		
			
				|  |  | +                        .tenantFlag(tenantFlag)
 | 
	
		
			
				|  |  | +                        .lockFlag(0)
 | 
	
		
			
				|  |  | +                        .delFlag(0)
 | 
	
		
			
				|  |  | +                        //.subjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()))
 | 
	
		
			
				|  |  | +                        .build();
 | 
	
		
			
				|  |  | +                    if (StringUtils.isNotBlank(info.getSendSubject())) {
 | 
	
		
			
				|  |  | +                        receiveQuery.setSubjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()));
 | 
	
		
			
				|  |  |                      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                    int finalMessageNum = messageNum;
 | 
	
		
			
				|  |  | -                    if (EImReceiveType.ALL == info.getReceiveType()) {
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                        // 推送消息给匹配用户
 | 
	
		
			
				|  |  | -                        List<String> targetGroups = Arrays.stream(info.getTargetGroup().split(",")).collect(Collectors.toList());
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                        targetGroups.forEach(clientType -> {
 | 
	
		
			
				|  |  | -                            boolean tenantFlag = false;
 | 
	
		
			
				|  |  | -                            switch (clientType) {
 | 
	
		
			
				|  |  | -                                case "TEACHER":
 | 
	
		
			
				|  |  | -                                    clientType = "TEACHER";
 | 
	
		
			
				|  |  | -                                    break;
 | 
	
		
			
				|  |  | -                                case "STUDENT":
 | 
	
		
			
				|  |  | -                                    clientType = "STUDENT";
 | 
	
		
			
				|  |  | -                                    break;
 | 
	
		
			
				|  |  | -                                case "TENANT_TEACHER":
 | 
	
		
			
				|  |  | -                                    clientType = "TEACHER";
 | 
	
		
			
				|  |  | -                                    tenantFlag = true;
 | 
	
		
			
				|  |  | -                                    break;
 | 
	
		
			
				|  |  | -                                case "TENANT_STUDENT":
 | 
	
		
			
				|  |  | -                                    clientType = "STUDENT";
 | 
	
		
			
				|  |  | -                                    tenantFlag = true;
 | 
	
		
			
				|  |  | -                                    break;
 | 
	
		
			
				|  |  | -                            }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                            CustomerService.NotifyMessage receiveQuery = CustomerService.NotifyMessage.builder()
 | 
	
		
			
				|  |  | -                                .clientType(ClientEnum.valueOf(clientType))
 | 
	
		
			
				|  |  | -                                .tenantFlag(tenantFlag)
 | 
	
		
			
				|  |  | -                                .lockFlag(0)
 | 
	
		
			
				|  |  | -                                .delFlag(0)
 | 
	
		
			
				|  |  | -                                //.subjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()))
 | 
	
		
			
				|  |  | -                                .build();
 | 
	
		
			
				|  |  | -                            if (StringUtils.isNotBlank(info.getSendSubject())) {
 | 
	
		
			
				|  |  | -                                receiveQuery.setSubjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()));
 | 
	
		
			
				|  |  | -                            }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                            Page<CustomerService.MessageReceives> page = PageUtil.getPage(1, 10);
 | 
	
		
			
				|  |  | -                            // 统计当前匹配用户数量
 | 
	
		
			
				|  |  | -                            getBaseMapper().selectMessageReceives(page, receiveQuery);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                            // 计算总页数
 | 
	
		
			
				|  |  | -                            int pages = (int) (page.getTotal() - 1) / limit + 1;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                            List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                            // 线程休眠计算器
 | 
	
		
			
				|  |  | -                            int sleepCount = 0;
 | 
	
		
			
				|  |  | -                            for (Integer pageNum : collect) {
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                                List<String> receiveUserIds = getBaseMapper().selectMessageReceives(PageUtil.getPage(pageNum, limit), receiveQuery).stream()
 | 
	
		
			
				|  |  | -                                        .map(x -> String.valueOf(x.getUserId()))
 | 
	
		
			
				|  |  | -                                        .collect(Collectors.toList());
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                                String finalClientType = clientType;
 | 
	
		
			
				|  |  | -                                receiveUserIds = receiveUserIds.stream()
 | 
	
		
			
				|  |  | -                                        .map(x -> imGroupService.getImUserId(x, finalClientType))
 | 
	
		
			
				|  |  | -                                        .collect(Collectors.toList());
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                                // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟
 | 
	
		
			
				|  |  | -                                sleepCount = messageSendSleepCondition(info, sleepCount, receiveUserIds.size(), finalMessageNum, messageSendLimit);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                                batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds);
 | 
	
		
			
				|  |  | -                            }
 | 
	
		
			
				|  |  | -                        });
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                    } else {
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                        // 查询条件
 | 
	
		
			
				|  |  | -                        CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery receiveQuery = CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery
 | 
	
		
			
				|  |  | -                                .builder().batchSendingId(info.getId()).build();
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                        Page<CustomerServiceReceiveWrapper.CustomerServiceReceive> page = PageUtil.getPage(1, 10);
 | 
	
		
			
				|  |  | -                        // 推送消息给指定用户
 | 
	
		
			
				|  |  | -                        customerServiceReceiveService.selectPage(page, receiveQuery);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                        // 计算总页数
 | 
	
		
			
				|  |  | -                        int pages = (int) (page.getTotal() - 1) / limit + 1;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                        List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                        // 线程休眠计算器
 | 
	
		
			
				|  |  | -                        int sleepCount = 0;
 | 
	
		
			
				|  |  | -                        for (Integer pageNum : collect) {
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                            List<String> receiveUserIds = customerServiceReceiveService.selectPage(PageUtil.getPage(pageNum, limit), receiveQuery).getRecords().stream()
 | 
	
		
			
				|  |  | -                                    .map(x -> imGroupService.getImUserId(String.valueOf(x.getUserId()), x.getClientType()))
 | 
	
		
			
				|  |  | -                                    .collect(Collectors.toList());
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                            // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟
 | 
	
		
			
				|  |  | -                            sleepCount = messageSendSleepCondition(info, sleepCount, receiveUserIds.size(), finalMessageNum, messageSendLimit);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                            batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds);
 | 
	
		
			
				|  |  | -                        }
 | 
	
		
			
				|  |  | +                    Page<CustomerService.MessageReceives> page = PageUtil.getPage(1, 10);
 | 
	
		
			
				|  |  | +                    // 统计当前匹配用户数量
 | 
	
		
			
				|  |  | +                    getBaseMapper().selectMessageReceives(page, receiveQuery);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    // 计算总页数
 | 
	
		
			
				|  |  | +                    int pages = (int) (page.getTotal() - 1) / limit + 1;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    // 线程休眠计算器
 | 
	
		
			
				|  |  | +                    int sleepCount = 0;
 | 
	
		
			
				|  |  | +                    for (Integer pageNum : collect) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                        List<String> receiveUserIds = getBaseMapper().selectMessageReceives(PageUtil.getPage(pageNum, limit), receiveQuery).stream()
 | 
	
		
			
				|  |  | +                                .map(x -> String.valueOf(x.getUserId()))
 | 
	
		
			
				|  |  | +                                .collect(Collectors.toList());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                        String finalClientType = clientType;
 | 
	
		
			
				|  |  | +                        receiveUserIds = receiveUserIds.stream()
 | 
	
		
			
				|  |  | +                                .map(x -> imGroupService.getImUserId(x, finalClientType))
 | 
	
		
			
				|  |  | +                                .collect(Collectors.toList());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                        // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟
 | 
	
		
			
				|  |  | +                        sleepCount = messageSendSleepCondition(info, sleepCount, receiveUserIds.size(), finalMessageNum, messageSendLimit);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                        batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds);
 | 
	
		
			
				|  |  |                      }
 | 
	
		
			
				|  |  | +                });
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            } else {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                    // 更新消息状态为已发送
 | 
	
		
			
				|  |  | -                    lambdaUpdate()
 | 
	
		
			
				|  |  | -                            .eq(CustomerServiceBatchSending::getId, info.getId())
 | 
	
		
			
				|  |  | -                            .set(CustomerServiceBatchSending::getSendStatus, EImSendStatus.SEND)
 | 
	
		
			
				|  |  | -                            .set(CustomerServiceBatchSending::getReceiveNumber, receiveNums.stream().mapToInt(Integer::intValue).sum())
 | 
	
		
			
				|  |  | -                            .set(CustomerServiceBatchSending::getSendTime, DateTime.now().toDate())
 | 
	
		
			
				|  |  | -                            .update();
 | 
	
		
			
				|  |  | +                // 查询条件
 | 
	
		
			
				|  |  | +                CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery receiveQuery = CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery
 | 
	
		
			
				|  |  | +                        .builder().batchSendingId(info.getId()).build();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                } catch (Exception e) {
 | 
	
		
			
				|  |  | -                    log.error("sendMessage id={}", info.getId(), e);
 | 
	
		
			
				|  |  | +                Page<CustomerServiceReceiveWrapper.CustomerServiceReceive> page = PageUtil.getPage(1, 10);
 | 
	
		
			
				|  |  | +                // 推送消息给指定用户
 | 
	
		
			
				|  |  | +                customerServiceReceiveService.selectPage(page, receiveQuery);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                // 计算总页数
 | 
	
		
			
				|  |  | +                int pages = (int) (page.getTotal() - 1) / limit + 1;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                // 线程休眠计算器
 | 
	
		
			
				|  |  | +                int sleepCount = 0;
 | 
	
		
			
				|  |  | +                for (Integer pageNum : collect) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    List<String> receiveUserIds = customerServiceReceiveService.selectPage(PageUtil.getPage(pageNum, limit), receiveQuery).getRecords().stream()
 | 
	
		
			
				|  |  | +                            .map(x -> imGroupService.getImUserId(String.valueOf(x.getUserId()), x.getClientType()))
 | 
	
		
			
				|  |  | +                            .collect(Collectors.toList());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟
 | 
	
		
			
				|  |  | +                    sleepCount = messageSendSleepCondition(info, sleepCount, receiveUserIds.size(), finalMessageNum, messageSendLimit);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds);
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            });
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        }, 30L, TimeUnit.MINUTES);
 | 
	
		
			
				|  |  | +        } catch (Exception e) {
 | 
	
		
			
				|  |  | +            log.error("sendMessage id={}", info.getId(), e);
 | 
	
		
			
				|  |  | +        } finally {
 | 
	
		
			
				|  |  | +            // 更新消息状态为已发送
 | 
	
		
			
				|  |  | +            lambdaUpdate()
 | 
	
		
			
				|  |  | +                .eq(CustomerServiceBatchSending::getId, info.getId())
 | 
	
		
			
				|  |  | +                .set(CustomerServiceBatchSending::getSendStatus, EImSendStatus.SEND)
 | 
	
		
			
				|  |  | +                .set(CustomerServiceBatchSending::getReceiveNumber, receiveNums.stream().mapToInt(Integer::intValue).sum())
 | 
	
		
			
				|  |  | +                .set(CustomerServiceBatchSending::getSendEndTime, DateTime.now().toDate())
 | 
	
		
			
				|  |  | +                .update();
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      /**
 | 
	
	
		
			
				|  | @@ -788,9 +838,10 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
 | 
	
		
			
				|  |  |              Date endTime = DateTime.now().toDate();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              List<CustomerServiceBatchSending> batchSendings = lambdaQuery()
 | 
	
		
			
				|  |  | -                    .between(CustomerServiceBatchSending::getSendTime, startTime, endTime)
 | 
	
		
			
				|  |  | -                    .eq(CustomerServiceBatchSending::getSendType, EImSendType.SCHEDULED)
 | 
	
		
			
				|  |  | -                    .eq(CustomerServiceBatchSending::getSendStatus, EImSendStatus.WAIT)
 | 
	
		
			
				|  |  | +                    .lt(CustomerServiceBatchSending::getSendTime, endTime)
 | 
	
		
			
				|  |  | +//                    .eq(CustomerServiceBatchSending::getSendType, EImSendType.SCHEDULED)
 | 
	
		
			
				|  |  | +                    .in(CustomerServiceBatchSending::getSendStatus, EImSendStatus.WAIT,EImSendStatus.SENDING)
 | 
	
		
			
				|  |  | +                    .ne(CustomerServiceBatchSending::getSendFlag,1)
 | 
	
		
			
				|  |  |                      .list();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              if (CollectionUtils.isNotEmpty(batchSendings)) {
 |