|  | @@ -0,0 +1,582 @@
 | 
	
		
			
				|  |  | +package com.yonge.cooleshow.biz.dal.service.impl;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +import com.alibaba.fastjson.JSON;
 | 
	
		
			
				|  |  | +import com.baomidou.mybatisplus.core.metadata.IPage;
 | 
	
		
			
				|  |  | +import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 | 
	
		
			
				|  |  | +import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 | 
	
		
			
				|  |  | +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 | 
	
		
			
				|  |  | +import com.google.common.collect.Lists;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.auth.config.CustomerServiceConfig;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.config.RongCloudConfig;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.entity.CustomerServiceBatchSending;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.entity.CustomerServiceReceive;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.entity.Subject;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.entity.SysUser;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.enums.ClientEnum;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.enums.MK;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.enums.im.EImReceiveType;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.enums.im.EImSendStatus;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.enums.im.EImSendType;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.mapper.CustomerServiceBatchSendingMapper;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.mapper.SysUserMapper;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.service.CustomerServiceBatchSendingService;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.service.CustomerServiceReceiveService;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.service.SubjectService;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerService;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceBatchSendingWrapper;
 | 
	
		
			
				|  |  | +import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceReceiveWrapper;
 | 
	
		
			
				|  |  | +import com.yonge.toolset.base.exception.BizException;
 | 
	
		
			
				|  |  | +import com.yonge.toolset.base.util.ImUtil;
 | 
	
		
			
				|  |  | +import com.yonge.toolset.base.util.ThreadPool;
 | 
	
		
			
				|  |  | +import com.yonge.toolset.mybatis.support.PageUtil;
 | 
	
		
			
				|  |  | +import com.yonge.toolset.payment.util.DistributedLock;
 | 
	
		
			
				|  |  | +import io.rong.messages.BaseMessage;
 | 
	
		
			
				|  |  | +import io.rong.messages.ImgMessage;
 | 
	
		
			
				|  |  | +import io.rong.messages.TxtMessage;
 | 
	
		
			
				|  |  | +import io.rong.models.message.PrivateMessage;
 | 
	
		
			
				|  |  | +import io.rong.models.message.PushExt;
 | 
	
		
			
				|  |  | +import io.rong.models.response.ResponseResult;
 | 
	
		
			
				|  |  | +import lombok.extern.slf4j.Slf4j;
 | 
	
		
			
				|  |  | +import org.apache.commons.collections.CollectionUtils;
 | 
	
		
			
				|  |  | +import org.apache.commons.lang3.StringUtils;
 | 
	
		
			
				|  |  | +import org.joda.time.DateTime;
 | 
	
		
			
				|  |  | +import org.redisson.api.RedissonClient;
 | 
	
		
			
				|  |  | +import org.springframework.beans.factory.annotation.Autowired;
 | 
	
		
			
				|  |  | +import org.springframework.stereotype.Service;
 | 
	
		
			
				|  |  | +import org.springframework.transaction.annotation.Transactional;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +import java.text.MessageFormat;
 | 
	
		
			
				|  |  | +import java.util.Arrays;
 | 
	
		
			
				|  |  | +import java.util.Date;
 | 
	
		
			
				|  |  | +import java.util.List;
 | 
	
		
			
				|  |  | +import java.util.Map;
 | 
	
		
			
				|  |  | +import java.util.Objects;
 | 
	
		
			
				|  |  | +import java.util.Optional;
 | 
	
		
			
				|  |  | +import java.util.Random;
 | 
	
		
			
				|  |  | +import java.util.stream.Collectors;
 | 
	
		
			
				|  |  | +import java.util.stream.IntStream;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +/**
 | 
	
		
			
				|  |  | + * 客服群发
 | 
	
		
			
				|  |  | + * 2022-12-09 10:49:10
 | 
	
		
			
				|  |  | + */
 | 
	
		
			
				|  |  | +@Slf4j
 | 
	
		
			
				|  |  | +@Service
 | 
	
		
			
				|  |  | +public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<CustomerServiceBatchSendingMapper, CustomerServiceBatchSending> implements CustomerServiceBatchSendingService {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    @Autowired
 | 
	
		
			
				|  |  | +    private CustomerServiceReceiveService customerServiceReceiveService;
 | 
	
		
			
				|  |  | +    @Autowired
 | 
	
		
			
				|  |  | +    private SubjectService subjectService;
 | 
	
		
			
				|  |  | +    @Autowired
 | 
	
		
			
				|  |  | +    private CustomerServiceConfig customerServiceConfig;
 | 
	
		
			
				|  |  | +    @Autowired
 | 
	
		
			
				|  |  | +    private SysUserMapper sysUserMapper;
 | 
	
		
			
				|  |  | +    @Autowired
 | 
	
		
			
				|  |  | +    private RedissonClient redissonClient;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	/**
 | 
	
		
			
				|  |  | +     * 查询详情
 | 
	
		
			
				|  |  | +     * @param id 详情ID
 | 
	
		
			
				|  |  | +     * @return CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +	@Override
 | 
	
		
			
				|  |  | +    public CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending detail(Long id) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        CustomerServiceBatchSending sending = baseMapper.selectById(id);
 | 
	
		
			
				|  |  | +        if (Objects.isNull(sending)) {
 | 
	
		
			
				|  |  | +            throw new BizException("无效的请求ID");
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending record = CustomerServiceBatchSendingWrapper
 | 
	
		
			
				|  |  | +                .CustomerServiceBatchSending.from(JSON.toJSONString(sending));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 填充声部,目标群体
 | 
	
		
			
				|  |  | +        List<CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending> records = Lists.newArrayList(record);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 返回消息封闭
 | 
	
		
			
				|  |  | +        getBatchSendingPaddingData(records);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 部分推送,查询用户信息
 | 
	
		
			
				|  |  | +        if (EImReceiveType.PORTION == sending.getReceiveType()) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery receiveQuery = CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery
 | 
	
		
			
				|  |  | +                    .builder().batchSendingId(sending.getId()).build();
 | 
	
		
			
				|  |  | +            IPage<CustomerServiceReceiveWrapper.CustomerServiceReceive> page = customerServiceReceiveService.selectPage(PageUtil.getPage(1, 9999), receiveQuery);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            record.setReceives(page.getRecords());
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        return record;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * 分页查询
 | 
	
		
			
				|  |  | +     * @param page IPage<CustomerServiceBatchSending>
 | 
	
		
			
				|  |  | +     * @param query CustomerServiceBatchSendingWrapper.CustomerServiceBatchSendingQuery
 | 
	
		
			
				|  |  | +     * @return IPage<CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending>
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    @Override
 | 
	
		
			
				|  |  | +    public IPage<CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending> selectPage(IPage<CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending> page,
 | 
	
		
			
				|  |  | +                                                                                            CustomerServiceBatchSendingWrapper.CustomerServiceBatchSendingQuery query){
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 群发消息记录
 | 
	
		
			
				|  |  | +        List<CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending> sendings = baseMapper.selectPage(page, query);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if (CollectionUtils.isNotEmpty(sendings)) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // 返回消息封闭
 | 
	
		
			
				|  |  | +            getBatchSendingPaddingData(sendings);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        return page.setRecords(sendings);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * 群发消息返回封装
 | 
	
		
			
				|  |  | +     * @param sendings List<CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending>
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    private void getBatchSendingPaddingData(List<CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending> sendings) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 声部信息
 | 
	
		
			
				|  |  | +        List<Long> subjectIds = sendings.stream()
 | 
	
		
			
				|  |  | +                .flatMap(x -> Arrays.stream(x.getSendSubject().split(",")))
 | 
	
		
			
				|  |  | +                .map(Long::parseLong).distinct().collect(Collectors.toList());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        Map<Long, String> subjectNameMap = subjectService.findBySubjectByIdList(subjectIds).stream()
 | 
	
		
			
				|  |  | +                .collect(Collectors.toMap(Subject::getId, Subject::getName, (o, n) -> n));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 创建用户
 | 
	
		
			
				|  |  | +        List<Long> userIds = sendings.stream()
 | 
	
		
			
				|  |  | +                .map(CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending::getCreateBy)
 | 
	
		
			
				|  |  | +                .filter(Objects::nonNull).distinct().collect(Collectors.toList());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        Map<Long, String> usernameMap = sysUserMapper.selectBatchIds(userIds).stream()
 | 
	
		
			
				|  |  | +                .collect(Collectors.toMap(SysUser::getId, SysUser::getUsername, (o, n) -> n));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 发送群体,发送声部
 | 
	
		
			
				|  |  | +        for (CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending item : sendings) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // 创建用户
 | 
	
		
			
				|  |  | +            item.setCreateUser(usernameMap.getOrDefault(item.getCreateBy(), ""));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            List<Long> collect = Arrays.stream(Optional.ofNullable(item.getSendSubject()).orElse("").split(","))
 | 
	
		
			
				|  |  | +                    .map(Long::parseLong).distinct().collect(Collectors.toList());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // 发送声部
 | 
	
		
			
				|  |  | +            String subjectName = subjectNameMap.entrySet().stream()
 | 
	
		
			
				|  |  | +                    .filter(x -> collect.contains(x.getKey()))
 | 
	
		
			
				|  |  | +                    .map(Map.Entry::getValue).collect(Collectors.joining(","));
 | 
	
		
			
				|  |  | +            item.setSubjectName(subjectName);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // 发送群体
 | 
	
		
			
				|  |  | +            List<String> clientTypes = Arrays.stream(Optional.ofNullable(item.getTargetGroup()).orElse("").split(","))
 | 
	
		
			
				|  |  | +                    .collect(Collectors.toList());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            String targetGroup = Arrays.stream(ClientEnum.values())
 | 
	
		
			
				|  |  | +                    .filter(x -> clientTypes.contains(x.getCode())).map(ClientEnum::getMsg)
 | 
	
		
			
				|  |  | +                    .collect(Collectors.joining(","));
 | 
	
		
			
				|  |  | +            item.setTargetGroupName(targetGroup);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * 添加
 | 
	
		
			
				|  |  | +     * @param info CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending
 | 
	
		
			
				|  |  | +     * @return Boolean
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    @Transactional
 | 
	
		
			
				|  |  | +    @Override
 | 
	
		
			
				|  |  | +    public Boolean add(CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending info) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 发送客服ID
 | 
	
		
			
				|  |  | +        String customerService = customerServiceConfig.getCustomerService();
 | 
	
		
			
				|  |  | +        if (StringUtils.isNotEmpty(customerService)) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            List<String> collect = Arrays.stream(customerService.split(",")).collect(Collectors.toList());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Random rand = new Random();
 | 
	
		
			
				|  |  | +            String mobile = collect.get(rand.nextInt(collect.size()));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // 查询发送客服ID
 | 
	
		
			
				|  |  | +            SysUser senderUser = sysUserMapper.selectOne(Wrappers.<SysUser>lambdaQuery()
 | 
	
		
			
				|  |  | +                    .eq(SysUser::getPhone, mobile));
 | 
	
		
			
				|  |  | +            if (Objects.isNull(senderUser)) {
 | 
	
		
			
				|  |  | +                throw new BizException("未匹配到客服人员");
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +            info.setSenderId(senderUser.getId());
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 推送消息
 | 
	
		
			
				|  |  | +        CustomerServiceBatchSending sending = JSON.parseObject(info.jsonString(), CustomerServiceBatchSending.class);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 保存推送群发消息
 | 
	
		
			
				|  |  | +        save(sending);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 部分推送用户
 | 
	
		
			
				|  |  | +       if (CollectionUtils.isNotEmpty(info.getReceives())) {
 | 
	
		
			
				|  |  | +           for (CustomerServiceReceiveWrapper.CustomerServiceReceive item : info.getReceives()) {
 | 
	
		
			
				|  |  | +               item.setBatchSendingId(sending.getId());
 | 
	
		
			
				|  |  | +           }
 | 
	
		
			
				|  |  | +           customerServiceReceiveService.saveBatch(JSON.parseArray(JSON.toJSONString(info.getReceives()),
 | 
	
		
			
				|  |  | +                   CustomerServiceReceive.class));
 | 
	
		
			
				|  |  | +       }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 更新群发消息人数
 | 
	
		
			
				|  |  | +        updateBatchSendingSendNumber(sending);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +       // 发送推送消息
 | 
	
		
			
				|  |  | +        if (EImSendType.IMMEDIATELY == info.getSendType()) {
 | 
	
		
			
				|  |  | +            asyncBatchSendingMessage(sending);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        return true;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * 更新群发消息人数
 | 
	
		
			
				|  |  | +     * @param info CustomerServiceBatchSending
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    private void updateBatchSendingSendNumber(CustomerServiceBatchSending info) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 发送条件,接收人数
 | 
	
		
			
				|  |  | +        CustomerServiceBatchSendingWrapper.CustomerServiceBatchSendingQuery query = CustomerServiceBatchSendingWrapper.CustomerServiceBatchSendingQuery
 | 
	
		
			
				|  |  | +                .builder()
 | 
	
		
			
				|  |  | +                .sendSubject(info.getSendSubject())
 | 
	
		
			
				|  |  | +                .targetGroup(info.getTargetGroup())
 | 
	
		
			
				|  |  | +                .build();
 | 
	
		
			
				|  |  | +        info.setCondition(query.jsonString()); // 发送条件
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 更新发送人数,接收人数
 | 
	
		
			
				|  |  | +        if (EImReceiveType.ALL == info.getReceiveType()) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            info.setSendNumber(0);
 | 
	
		
			
				|  |  | +            List<Integer> receiveNums = Lists.newCopyOnWriteArrayList();
 | 
	
		
			
				|  |  | +            // 条件匹配的老师、学生数量
 | 
	
		
			
				|  |  | +            query.getClientTypes().parallelStream().forEach(item -> {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                CustomerService.NotifyMessage receiveQuery = CustomerService.NotifyMessage.builder()
 | 
	
		
			
				|  |  | +                        .clientType(ClientEnum.valueOf(item))
 | 
	
		
			
				|  |  | +                        .subjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()))
 | 
	
		
			
				|  |  | +                        .build();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                Page<CustomerService.MessageReceives> page = PageUtil.getPage(1, 10);
 | 
	
		
			
				|  |  | +                // 统计当前匹配用户数量
 | 
	
		
			
				|  |  | +                getBaseMapper().selectMessageReceives(page, receiveQuery);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                receiveNums.add((int) page.getTotal());
 | 
	
		
			
				|  |  | +            });
 | 
	
		
			
				|  |  | +            info.setSendNumber(receiveNums.stream().mapToInt(Integer::intValue).sum());
 | 
	
		
			
				|  |  | +        } else {
 | 
	
		
			
				|  |  | +            // 选择的老师、学生数量
 | 
	
		
			
				|  |  | +            int sendNumber = customerServiceReceiveService.count(Wrappers.<CustomerServiceReceive>lambdaQuery()
 | 
	
		
			
				|  |  | +                    .eq(CustomerServiceReceive::getBatchSendingId, info.getId()));
 | 
	
		
			
				|  |  | +            info.setSendNumber(sendNumber);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 更新群发消息人数
 | 
	
		
			
				|  |  | +        lambdaUpdate()
 | 
	
		
			
				|  |  | +                .eq(CustomerServiceBatchSending::getId, info.getId())
 | 
	
		
			
				|  |  | +                .set(CustomerServiceBatchSending::getSendNumber, info.getSendNumber())
 | 
	
		
			
				|  |  | +                .set(CustomerServiceBatchSending::getCondition, info.getCondition())
 | 
	
		
			
				|  |  | +                .update();
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * 更新
 | 
	
		
			
				|  |  | +     * @param info CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending
 | 
	
		
			
				|  |  | +     * @return Boolean
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    @Transactional
 | 
	
		
			
				|  |  | +    @Override
 | 
	
		
			
				|  |  | +    public Boolean update(CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending info){
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        CustomerServiceBatchSending sending = JSON.parseObject(info.jsonString(),
 | 
	
		
			
				|  |  | +                CustomerServiceBatchSending.class);
 | 
	
		
			
				|  |  | +        // 更新群发消息
 | 
	
		
			
				|  |  | +        this.updateById(sending);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if (CollectionUtils.isNotEmpty(info.getReceives())) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // 先清除数据,后重新插入
 | 
	
		
			
				|  |  | +            customerServiceReceiveService.remove(Wrappers.<CustomerServiceReceive>lambdaQuery()
 | 
	
		
			
				|  |  | +                    .eq(CustomerServiceReceive::getBatchSendingId, sending.getId()));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            for (CustomerServiceReceiveWrapper.CustomerServiceReceive item : info.getReceives()) {
 | 
	
		
			
				|  |  | +                item.setBatchSendingId(sending.getId());
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +            customerServiceReceiveService.saveBatch(JSON.parseArray(JSON.toJSONString(info.getReceives()),
 | 
	
		
			
				|  |  | +                    CustomerServiceReceive.class));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        sending = getById(info.getId());
 | 
	
		
			
				|  |  | +        // 更新群发消息人数
 | 
	
		
			
				|  |  | +        updateBatchSendingSendNumber(sending);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 发送推送消息
 | 
	
		
			
				|  |  | +        if (EImSendType.IMMEDIATELY == sending.getSendType()) {
 | 
	
		
			
				|  |  | +            sendMessage(info.getId());
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        return true;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * 更新消息状态
 | 
	
		
			
				|  |  | +     *
 | 
	
		
			
				|  |  | +     * @param id         消息Id
 | 
	
		
			
				|  |  | +     * @param sendStatus EImSendStatus
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    @Override
 | 
	
		
			
				|  |  | +    public void status(Long id, EImSendStatus sendStatus) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        CustomerServiceBatchSending record = getById(id);
 | 
	
		
			
				|  |  | +        if (Objects.isNull(record)) {
 | 
	
		
			
				|  |  | +            throw new BizException("无效的请求Id");
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 校验消息发送状态
 | 
	
		
			
				|  |  | +        if (EImSendStatus.SEND == record.getSendStatus()) {
 | 
	
		
			
				|  |  | +            throw new BizException("当前消息已发送");
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 当前群发消息状态一致
 | 
	
		
			
				|  |  | +        if (record.getSendStatus() == sendStatus) {
 | 
	
		
			
				|  |  | +            throw new BizException("群发消息状态一致");
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 更新群发消息状态
 | 
	
		
			
				|  |  | +        lambdaUpdate()
 | 
	
		
			
				|  |  | +                .eq(CustomerServiceBatchSending::getId, id)
 | 
	
		
			
				|  |  | +                .set(CustomerServiceBatchSending::getSendStatus, sendStatus)
 | 
	
		
			
				|  |  | +                .update();
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * 发送消息
 | 
	
		
			
				|  |  | +     *
 | 
	
		
			
				|  |  | +     * @param id 群发消息ID
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    @Override
 | 
	
		
			
				|  |  | +    public void sendMessage(Long id) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        CustomerServiceBatchSending info = getById(id);
 | 
	
		
			
				|  |  | +        if (Objects.isNull(info)) {
 | 
	
		
			
				|  |  | +            throw new BizException("无效的群发消息ID");
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 发送消息
 | 
	
		
			
				|  |  | +        asyncBatchSendingMessage(info);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * 异步发送消息
 | 
	
		
			
				|  |  | +     * @param info CustomerServiceBatchSending
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    private void asyncBatchSendingMessage(CustomerServiceBatchSending info) {
 | 
	
		
			
				|  |  | +        String lockName = MessageFormat.format("batchSending:{0}", String.valueOf(info.getId()));
 | 
	
		
			
				|  |  | +        // 消息状态判定,且不能重复发送
 | 
	
		
			
				|  |  | +        DistributedLock.of(redissonClient).runIfLockCanGet(lockName, () -> {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            List<BaseMessage> messages = getReceiveMessage(info);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // 异步发送消息且同步更新已发送人数,稍后可在页面刷新查看已发送用户数
 | 
	
		
			
				|  |  | +            ThreadPool.getExecutor().submit(() -> {
 | 
	
		
			
				|  |  | +                // 接收消息用户数
 | 
	
		
			
				|  |  | +                List<Integer> receiveNums = Lists.newCopyOnWriteArrayList();
 | 
	
		
			
				|  |  | +                // 分页数限制
 | 
	
		
			
				|  |  | +                int limit = 500;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                try {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    if (EImReceiveType.ALL == info.getReceiveType()) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                        // 推送消息给匹配用户
 | 
	
		
			
				|  |  | +                        List<String> targetGroupes = Arrays.stream(info.getTargetGroup().split(",")).collect(Collectors.toList());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                        targetGroupes.parallelStream().forEach(clientType -> {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                            CustomerService.NotifyMessage receiveQuery = CustomerService.NotifyMessage.builder()
 | 
	
		
			
				|  |  | +                                    .clientType(ClientEnum.valueOf(clientType))
 | 
	
		
			
				|  |  | +                                    .subjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()))
 | 
	
		
			
				|  |  | +                                    .build();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                            Page<CustomerService.MessageReceives> page = PageUtil.getPage(1, 10);
 | 
	
		
			
				|  |  | +                            // 统计当前匹配用户数量
 | 
	
		
			
				|  |  | +                            getBaseMapper().selectMessageReceives(page, receiveQuery);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                            // 计算总页数
 | 
	
		
			
				|  |  | +                            int pages = (int) (page.getTotal() - 1) / limit + 1;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                            List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                            for (Integer pageNum : collect) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                                List<String> receiveUserIds = getBaseMapper().selectMessageReceives(PageUtil.getPage(pageNum, limit), receiveQuery).stream()
 | 
	
		
			
				|  |  | +                                        .map(x -> String.valueOf(x.getUserId()))
 | 
	
		
			
				|  |  | +                                        .collect(Collectors.toList());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                                if (ClientEnum.STUDENT.match(clientType)) {
 | 
	
		
			
				|  |  | +                                    receiveUserIds = receiveUserIds.stream()
 | 
	
		
			
				|  |  | +                                            .map(x -> MessageFormat.format("{0}:STUDENT", x))
 | 
	
		
			
				|  |  | +                                            .collect(Collectors.toList());
 | 
	
		
			
				|  |  | +                                }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                                batchSendCustomerServiceMessage(info, messages, receiveNums, receiveUserIds);
 | 
	
		
			
				|  |  | +                            }
 | 
	
		
			
				|  |  | +                        });
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    } else {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                        // 查询条件
 | 
	
		
			
				|  |  | +                        CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery receiveQuery = CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery
 | 
	
		
			
				|  |  | +                                .builder().batchSendingId(info.getId()).build();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                        Page<CustomerServiceReceiveWrapper.CustomerServiceReceive> page = PageUtil.getPage(1, 10);
 | 
	
		
			
				|  |  | +                        // 推送消息给指定用户
 | 
	
		
			
				|  |  | +                        customerServiceReceiveService.selectPage(page, receiveQuery);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                        // 计算总页数
 | 
	
		
			
				|  |  | +                        int pages = (int) (page.getTotal() - 1) / limit + 1;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                        List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                        for (Integer pageNum : collect) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                            List<String> receiveUserIds = customerServiceReceiveService.selectPage(PageUtil.getPage(pageNum, limit), receiveQuery).getRecords().stream()
 | 
	
		
			
				|  |  | +                                    .map(x -> {
 | 
	
		
			
				|  |  | +                                        if (ClientEnum.STUDENT.match(x.getClientType())) {
 | 
	
		
			
				|  |  | +                                            return MessageFormat.format("{0}:STUDENT", String.valueOf(x.getUserId()));
 | 
	
		
			
				|  |  | +                                        }
 | 
	
		
			
				|  |  | +                                        return String.valueOf(x.getUserId());
 | 
	
		
			
				|  |  | +                                    })
 | 
	
		
			
				|  |  | +                                    .collect(Collectors.toList());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                            batchSendCustomerServiceMessage(info, messages, receiveNums, receiveUserIds);
 | 
	
		
			
				|  |  | +                        }
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    // 更新消息状态为已发送
 | 
	
		
			
				|  |  | +                    lambdaUpdate()
 | 
	
		
			
				|  |  | +                            .eq(CustomerServiceBatchSending::getId, info.getId())
 | 
	
		
			
				|  |  | +                            .set(CustomerServiceBatchSending::getSendStatus, EImSendStatus.SEND)
 | 
	
		
			
				|  |  | +                            .set(CustomerServiceBatchSending::getReceiveNumber, receiveNums.stream().mapToInt(Integer::intValue).sum())
 | 
	
		
			
				|  |  | +                            .set(CustomerServiceBatchSending::getSendTime, DateTime.now().toDate())
 | 
	
		
			
				|  |  | +                            .update();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                } catch (Exception e) {
 | 
	
		
			
				|  |  | +                    log.error("sendMessage id={}", info.getId(), e);
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            });
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        });
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * 批量发送客服消息
 | 
	
		
			
				|  |  | +     * @param info CustomerServiceBatchSending
 | 
	
		
			
				|  |  | +     * @param messages 推送消息类型
 | 
	
		
			
				|  |  | +     * @param receiveNums 接收消息用户数
 | 
	
		
			
				|  |  | +     * @param receiveUserIds 接收消息用户Id
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    private static void batchSendCustomerServiceMessage(CustomerServiceBatchSending info, List<BaseMessage> messages,
 | 
	
		
			
				|  |  | +                                                        List<Integer> receiveNums, List<String> receiveUserIds) {
 | 
	
		
			
				|  |  | +        // 拓展消息
 | 
	
		
			
				|  |  | +        PushExt pushExt = PushExt.build(info.getTitle(), 1,
 | 
	
		
			
				|  |  | +                new PushExt.HW("channelId", "NORMAL"), new PushExt.VIVO("1"),
 | 
	
		
			
				|  |  | +                new PushExt.APNs("", ""),
 | 
	
		
			
				|  |  | +                new PushExt.OPPO(""));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        String senderId = String.valueOf(info.getSenderId());
 | 
	
		
			
				|  |  | +        PrivateMessage privateMessage;
 | 
	
		
			
				|  |  | +        ResponseResult privateResult;
 | 
	
		
			
				|  |  | +        for (List<String> item : Lists.partition(receiveUserIds, 100)) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            try {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                for (BaseMessage message : messages) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    // 发送用户IM通知消息
 | 
	
		
			
				|  |  | +                    privateMessage = new PrivateMessage()
 | 
	
		
			
				|  |  | +                            .setSenderId(senderId)
 | 
	
		
			
				|  |  | +                            .setTargetId(item.toArray(new String[0]))
 | 
	
		
			
				|  |  | +                            .setObjectName(message.getType())
 | 
	
		
			
				|  |  | +                            .setContent(message)
 | 
	
		
			
				|  |  | +                            .setPushExt(pushExt)
 | 
	
		
			
				|  |  | +                            .setIsIncludeSender(0);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    privateResult = RongCloudConfig.rongCloud.message.msgPrivate.send(privateMessage);
 | 
	
		
			
				|  |  | +                    log.info("batchSendCustomerServiceMessage senderId={}, ret={}", senderId, privateResult.getCode());
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                receiveNums.add(item.size());
 | 
	
		
			
				|  |  | +            } catch (Exception e) {
 | 
	
		
			
				|  |  | +                log.error("batchSendCustomerServiceMessage senderId={}", senderId, e);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * 推送消息类型
 | 
	
		
			
				|  |  | +     * @param info CustomerServiceBatchSending
 | 
	
		
			
				|  |  | +     * @return List<BaseMessage>
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    private static List<BaseMessage> getReceiveMessage(CustomerServiceBatchSending info) {
 | 
	
		
			
				|  |  | +        List<BaseMessage> messages = Lists.newArrayList();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if (StringUtils.isNotEmpty(info.getImgMessage())) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            //String suffix = info.getImgMessage().substring(info.getImgMessage().lastIndexOf("."));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // 发送图片消息
 | 
	
		
			
				|  |  | +            ImgMessage imgMessage = new ImgMessage(ImUtil.imageToBase64(info.getImgMessage(), "png"), "", info.getImgMessage());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            messages.add(imgMessage);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if (StringUtils.isNotEmpty(info.getTextMessage())) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // 发送文本消息
 | 
	
		
			
				|  |  | +            TxtMessage txtMessage = new TxtMessage(info.getTextMessage(), "");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            messages.add(txtMessage);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        return messages;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * 定时发送消息
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    @Override
 | 
	
		
			
				|  |  | +    public void scheduleSendMessage() {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        DistributedLock.of(redissonClient).runIfLockCanGet("scheduleSendMessage:LOCK", () -> {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // 定时发送时间
 | 
	
		
			
				|  |  | +            // 开始时间
 | 
	
		
			
				|  |  | +            Date startTime = DateTime.now().minusMinutes(2).toDate();
 | 
	
		
			
				|  |  | +            // 结束时间
 | 
	
		
			
				|  |  | +            Date endTime = DateTime.now().toDate();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            List<CustomerServiceBatchSending> batchSendings = lambdaQuery()
 | 
	
		
			
				|  |  | +                    .between(CustomerServiceBatchSending::getSendTime, startTime, endTime)
 | 
	
		
			
				|  |  | +                    .eq(CustomerServiceBatchSending::getSendType, EImSendType.SCHEDULED)
 | 
	
		
			
				|  |  | +                    .eq(CustomerServiceBatchSending::getSendStatus, EImSendStatus.WAIT)
 | 
	
		
			
				|  |  | +                    .list();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            if (CollectionUtils.isNotEmpty(batchSendings)) {
 | 
	
		
			
				|  |  | +                log.info("scheduleSendMessage time={}, size={}", DateTime.now().toString(MK.TIME_PATTERN), batchSendings.size());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                for (List<CustomerServiceBatchSending> item : Lists.partition(batchSendings, 10)) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    item.parallelStream().forEach(x -> sendMessage(x.getId()));
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        });
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +}
 |