|
@@ -3,32 +3,55 @@ package com.yonge.cooleshow.biz.dal.service.impl;
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.baomidou.mybatisplus.core.metadata.IPage;
|
|
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
|
|
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
|
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
|
|
import com.google.common.collect.Lists;
|
|
|
+import com.yonge.cooleshow.auth.config.CustomerServiceConfig;
|
|
|
+import com.yonge.cooleshow.biz.dal.config.RongCloudConfig;
|
|
|
import com.yonge.cooleshow.biz.dal.entity.CustomerServiceBatchSending;
|
|
|
import com.yonge.cooleshow.biz.dal.entity.CustomerServiceReceive;
|
|
|
import com.yonge.cooleshow.biz.dal.entity.Subject;
|
|
|
+import com.yonge.cooleshow.biz.dal.entity.SysUser;
|
|
|
import com.yonge.cooleshow.biz.dal.enums.ClientEnum;
|
|
|
+import com.yonge.cooleshow.biz.dal.enums.im.EImReceiveType;
|
|
|
import com.yonge.cooleshow.biz.dal.enums.im.EImSendStatus;
|
|
|
+import com.yonge.cooleshow.biz.dal.enums.im.EImSendType;
|
|
|
import com.yonge.cooleshow.biz.dal.mapper.CustomerServiceBatchSendingMapper;
|
|
|
+import com.yonge.cooleshow.biz.dal.mapper.SysUserMapper;
|
|
|
import com.yonge.cooleshow.biz.dal.service.CustomerServiceBatchSendingService;
|
|
|
import com.yonge.cooleshow.biz.dal.service.CustomerServiceReceiveService;
|
|
|
import com.yonge.cooleshow.biz.dal.service.SubjectService;
|
|
|
+import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerService;
|
|
|
import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceBatchSendingWrapper;
|
|
|
import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceReceiveWrapper;
|
|
|
import com.yonge.toolset.base.exception.BizException;
|
|
|
+import com.yonge.toolset.base.util.ImUtil;
|
|
|
+import com.yonge.toolset.base.util.ThreadPool;
|
|
|
+import com.yonge.toolset.mybatis.support.PageUtil;
|
|
|
+import com.yonge.toolset.payment.util.DistributedLock;
|
|
|
+import io.rong.messages.BaseMessage;
|
|
|
+import io.rong.messages.ImgMessage;
|
|
|
+import io.rong.messages.TxtMessage;
|
|
|
+import io.rong.models.message.PrivateMessage;
|
|
|
+import io.rong.models.message.PushExt;
|
|
|
+import io.rong.models.response.ResponseResult;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.collections.CollectionUtils;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.redisson.api.RedissonClient;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
|
|
|
+import java.text.MessageFormat;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
import java.util.Optional;
|
|
|
+import java.util.Random;
|
|
|
import java.util.stream.Collectors;
|
|
|
+import java.util.stream.IntStream;
|
|
|
|
|
|
/**
|
|
|
* 客服群发
|
|
@@ -42,6 +65,12 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
|
|
|
private CustomerServiceReceiveService customerServiceReceiveService;
|
|
|
@Autowired
|
|
|
private SubjectService subjectService;
|
|
|
+ @Autowired
|
|
|
+ private CustomerServiceConfig customerServiceConfig;
|
|
|
+ @Autowired
|
|
|
+ private SysUserMapper sysUserMapper;
|
|
|
+ @Autowired
|
|
|
+ private RedissonClient redissonClient;
|
|
|
|
|
|
/**
|
|
|
* 查询详情
|
|
@@ -133,9 +162,26 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
|
|
|
@Override
|
|
|
public Boolean add(CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending info) {
|
|
|
|
|
|
+ // 发送客服ID
|
|
|
+ String customerService = customerServiceConfig.getCustomerService();
|
|
|
+ if (StringUtils.isNotEmpty(customerService)) {
|
|
|
+
|
|
|
+ List<String> collect = Arrays.stream(customerService.split(",")).collect(Collectors.toList());
|
|
|
+
|
|
|
+ Random rand = new Random();
|
|
|
+ String mobile = collect.get(rand.nextInt(collect.size()));
|
|
|
+
|
|
|
+ // 查询发送客服ID
|
|
|
+ SysUser senderUser = sysUserMapper.selectOne(Wrappers.<SysUser>lambdaQuery()
|
|
|
+ .eq(SysUser::getPhone, mobile));
|
|
|
+ if (Objects.isNull(senderUser)) {
|
|
|
+ throw new BizException("未匹配到客服人员");
|
|
|
+ }
|
|
|
+ info.setSenderId(senderUser.getId());
|
|
|
+ }
|
|
|
+
|
|
|
// 推送消息
|
|
|
- CustomerServiceBatchSending sending = JSON.parseObject(info.jsonString(),
|
|
|
- CustomerServiceBatchSending.class);
|
|
|
+ CustomerServiceBatchSending sending = JSON.parseObject(info.jsonString(), CustomerServiceBatchSending.class);
|
|
|
|
|
|
// 保存推送群发消息
|
|
|
save(sending);
|
|
@@ -149,10 +195,67 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
|
|
|
CustomerServiceReceive.class));
|
|
|
}
|
|
|
|
|
|
+ // 更新群发消息人数
|
|
|
+ updateBatchSendingSendNumber(sending);
|
|
|
+
|
|
|
+ // 发送推送消息
|
|
|
+ if (EImSendType.IMMEDIATELY == info.getSendType()) {
|
|
|
+ sendMessage(info.getId());
|
|
|
+ }
|
|
|
+
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
+ * 更新群发消息人数
|
|
|
+ * @param info CustomerServiceBatchSending
|
|
|
+ */
|
|
|
+ private void updateBatchSendingSendNumber(CustomerServiceBatchSending info) {
|
|
|
+
|
|
|
+ // 发送条件,接收人数
|
|
|
+ CustomerServiceBatchSendingWrapper.CustomerServiceBatchSendingQuery query = CustomerServiceBatchSendingWrapper.CustomerServiceBatchSendingQuery
|
|
|
+ .builder()
|
|
|
+ .sendSubject(info.getSendSubject())
|
|
|
+ .targetGroup(info.getTargetGroup())
|
|
|
+ .build();
|
|
|
+ info.setCondition(query.jsonString()); // 发送条件
|
|
|
+
|
|
|
+ // 更新发送人数,接收人数
|
|
|
+ if (EImReceiveType.ALL == info.getReceiveType()) {
|
|
|
+
|
|
|
+ info.setSendNumber(0);
|
|
|
+ List<Integer> receiveNums = Lists.newCopyOnWriteArrayList();
|
|
|
+ // 条件匹配的老师、学生数量
|
|
|
+ query.getClientTypes().parallelStream().forEach(item -> {
|
|
|
+
|
|
|
+ CustomerService.NotifyMessage receiveQuery = CustomerService.NotifyMessage.builder()
|
|
|
+ .clientType(ClientEnum.valueOf(item))
|
|
|
+ .subjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()))
|
|
|
+ .build();
|
|
|
+
|
|
|
+ Page<CustomerService.MessageReceives> page = PageUtil.getPage(1, 10);
|
|
|
+ // 统计当前匹配用户数量
|
|
|
+ getBaseMapper().selectMessageReceives(page, receiveQuery);
|
|
|
+
|
|
|
+ receiveNums.add((int) page.getTotal());
|
|
|
+ });
|
|
|
+ info.setSendNumber(receiveNums.stream().mapToInt(Integer::intValue).sum());
|
|
|
+ } else {
|
|
|
+ // 选择的老师、学生数量
|
|
|
+ int sendNumber = customerServiceReceiveService.count(Wrappers.<CustomerServiceReceive>lambdaQuery()
|
|
|
+ .eq(CustomerServiceReceive::getBatchSendingId, info.getId()));
|
|
|
+ info.setSendNumber(sendNumber);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 更新群发消息人数
|
|
|
+ lambdaUpdate()
|
|
|
+ .eq(CustomerServiceBatchSending::getId, info.getId())
|
|
|
+ .set(CustomerServiceBatchSending::getSendNumber, info.getSendNumber())
|
|
|
+ .set(CustomerServiceBatchSending::getCondition, info.getCondition())
|
|
|
+ .update();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
* 更新
|
|
|
* @param info CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending
|
|
|
* @return Boolean
|
|
@@ -179,6 +282,15 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
|
|
|
CustomerServiceReceive.class));
|
|
|
}
|
|
|
|
|
|
+ sending = getById(info.getId());
|
|
|
+ // 更新群发消息人数
|
|
|
+ updateBatchSendingSendNumber(sending);
|
|
|
+
|
|
|
+ // 发送推送消息
|
|
|
+ if (EImSendType.IMMEDIATELY == sending.getSendType()) {
|
|
|
+ sendMessage(info.getId());
|
|
|
+ }
|
|
|
+
|
|
|
return true;
|
|
|
}
|
|
|
|
|
@@ -212,4 +324,188 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
|
|
|
.set(CustomerServiceBatchSending::getSendStatus, sendStatus)
|
|
|
.update();
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送消息
|
|
|
+ *
|
|
|
+ * @param id 群发消息ID
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void sendMessage(Long id) {
|
|
|
+
|
|
|
+ CustomerServiceBatchSending info = getById(id);
|
|
|
+ if (Objects.isNull(info)) {
|
|
|
+ throw new BizException("无效的群发消息ID");
|
|
|
+ }
|
|
|
+
|
|
|
+ String lockName = MessageFormat.format("batchSending:{0}", String.valueOf(info.getId()));
|
|
|
+ // 消息状态判定,且不能重复发送
|
|
|
+ DistributedLock.of(redissonClient).runIfLockCanGet(lockName, () -> {
|
|
|
+
|
|
|
+ List<BaseMessage> messages = getReceiveMessage(info);
|
|
|
+
|
|
|
+ // 异步发送消息且同步更新已发送人数,稍后可在页面刷新查看已发送用户数
|
|
|
+ ThreadPool.getExecutor().submit(() -> {
|
|
|
+ // 接收消息用户数
|
|
|
+ List<Integer> receiveNums = Lists.newCopyOnWriteArrayList();
|
|
|
+ // 分页数限制
|
|
|
+ int limit = 500;
|
|
|
+
|
|
|
+ try {
|
|
|
+
|
|
|
+ if (EImReceiveType.ALL == info.getReceiveType()) {
|
|
|
+
|
|
|
+ // 推送消息给匹配用户
|
|
|
+ List<String> targetGroupes = Arrays.stream(info.getTargetGroup().split(",")).collect(Collectors.toList());
|
|
|
+
|
|
|
+ targetGroupes.parallelStream().forEach(clientType -> {
|
|
|
+
|
|
|
+ CustomerService.NotifyMessage receiveQuery = CustomerService.NotifyMessage.builder()
|
|
|
+ .clientType(ClientEnum.valueOf(clientType))
|
|
|
+ .subjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()))
|
|
|
+ .build();
|
|
|
+
|
|
|
+ Page<CustomerService.MessageReceives> page = PageUtil.getPage(1, 10);
|
|
|
+ // 统计当前匹配用户数量
|
|
|
+ getBaseMapper().selectMessageReceives(page, receiveQuery);
|
|
|
+
|
|
|
+ // 计算总页数
|
|
|
+ int pages = (int) (page.getTotal() - 1) / limit + 1;
|
|
|
+
|
|
|
+ List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
|
|
|
+
|
|
|
+ for (Integer pageNum : collect) {
|
|
|
+
|
|
|
+ List<String> receiveUserIds = getBaseMapper().selectMessageReceives(PageUtil.getPage(pageNum, limit), receiveQuery).stream()
|
|
|
+ .map(x -> String.valueOf(x.getUserId()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ if (ClientEnum.STUDENT.match(clientType)) {
|
|
|
+ receiveUserIds = receiveUserIds.stream()
|
|
|
+ .map(x -> MessageFormat.format("{0}:STUDENT", x))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ }
|
|
|
+
|
|
|
+ batchSendCustomerServiceMessage(info, messages, receiveNums, receiveUserIds);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ } else {
|
|
|
+
|
|
|
+ // 查询条件
|
|
|
+ CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery receiveQuery = CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery
|
|
|
+ .builder().build();
|
|
|
+
|
|
|
+ Page<CustomerServiceReceive> page = PageUtil.getPage(1, 10);
|
|
|
+ // 推送消息给指定用户
|
|
|
+ customerServiceReceiveService.selectPage(page, receiveQuery);
|
|
|
+
|
|
|
+ // 计算总页数
|
|
|
+ int pages = (int) (page.getTotal() - 1) / limit + 1;
|
|
|
+
|
|
|
+ List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
|
|
|
+
|
|
|
+ for (Integer pageNum : collect) {
|
|
|
+
|
|
|
+ List<String> receiveUserIds = customerServiceReceiveService.selectPage(PageUtil.getPage(pageNum, limit), receiveQuery).getRecords().stream()
|
|
|
+ .map(x -> {
|
|
|
+ if (ClientEnum.STUDENT.match(x.getClientType())) {
|
|
|
+ return MessageFormat.format("{0}:STUDENT", String.valueOf(x.getUserId()));
|
|
|
+ }
|
|
|
+ return String.valueOf(x.getUserId());
|
|
|
+ })
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ batchSendCustomerServiceMessage(info, messages, receiveNums, receiveUserIds);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 更新消息状态为已发送
|
|
|
+ lambdaUpdate()
|
|
|
+ .eq(CustomerServiceBatchSending::getId, info.getId())
|
|
|
+ .set(CustomerServiceBatchSending::getSendStatus, EImSendStatus.SEND)
|
|
|
+ .set(CustomerServiceBatchSending::getReceiveNumber, receiveNums.stream().mapToInt(Integer::intValue).sum())
|
|
|
+ .update();
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("sendMessage id={}", id, e);
|
|
|
+ }
|
|
|
+
|
|
|
+ });
|
|
|
+
|
|
|
+ });
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 批量发送客服消息
|
|
|
+ * @param info CustomerServiceBatchSending
|
|
|
+ * @param messages 推送消息类型
|
|
|
+ * @param receiveNums 接收消息用户数
|
|
|
+ * @param receiveUserIds 接收消息用户Id
|
|
|
+ */
|
|
|
+ private static void batchSendCustomerServiceMessage(CustomerServiceBatchSending info, List<BaseMessage> messages,
|
|
|
+ List<Integer> receiveNums, List<String> receiveUserIds) {
|
|
|
+ // 拓展消息
|
|
|
+ PushExt pushExt = PushExt.build(info.getTitle(), 1,
|
|
|
+ new PushExt.HW("channelId", "NORMAL"), new PushExt.VIVO("1"),
|
|
|
+ new PushExt.APNs("", ""),
|
|
|
+ new PushExt.OPPO(""));
|
|
|
+
|
|
|
+ String senderId = String.valueOf(info.getSenderId());
|
|
|
+ PrivateMessage privateMessage;
|
|
|
+ ResponseResult privateResult;
|
|
|
+ for (List<String> item : Lists.partition(receiveUserIds, 100)) {
|
|
|
+
|
|
|
+ try {
|
|
|
+
|
|
|
+ for (BaseMessage message : messages) {
|
|
|
+
|
|
|
+ // 发送用户IM通知消息
|
|
|
+ privateMessage = new PrivateMessage()
|
|
|
+ .setSenderId(senderId)
|
|
|
+ .setTargetId(item.toArray(new String[0]))
|
|
|
+ .setObjectName(message.getType())
|
|
|
+ .setContent(message)
|
|
|
+ .setPushExt(pushExt)
|
|
|
+ .setIsIncludeSender(0);
|
|
|
+
|
|
|
+ privateResult = RongCloudConfig.rongCloud.message.msgPrivate.send(privateMessage);
|
|
|
+ log.info("batchSendCustomerServiceMessage senderId={}, ret={}", senderId, privateResult.getCode());
|
|
|
+ }
|
|
|
+
|
|
|
+ receiveNums.add(item.size());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("batchSendCustomerServiceMessage senderId={}", senderId, e);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 推送消息类型
|
|
|
+ * @param info CustomerServiceBatchSending
|
|
|
+ * @return List<BaseMessage>
|
|
|
+ */
|
|
|
+ private static List<BaseMessage> getReceiveMessage(CustomerServiceBatchSending info) {
|
|
|
+ List<BaseMessage> messages = Lists.newArrayList();
|
|
|
+
|
|
|
+ if (StringUtils.isNotEmpty(info.getImgMessage())) {
|
|
|
+
|
|
|
+ // 发送图片消息
|
|
|
+ ImgMessage imgMessage = new ImgMessage(ImUtil.imageToBase64(info.getImgMessage(), "png"), "", info.getImgUrl());
|
|
|
+
|
|
|
+ messages.add(imgMessage);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (StringUtils.isNotEmpty(info.getTextMessage())) {
|
|
|
+
|
|
|
+ // 发送文本消息
|
|
|
+ TxtMessage txtMessage = new TxtMessage(info.getTextMessage(), "");
|
|
|
+
|
|
|
+ messages.add(txtMessage);
|
|
|
+ }
|
|
|
+ return messages;
|
|
|
+ }
|
|
|
+
|
|
|
}
|