Browse Source

消息群发

liujc 8 months ago
parent
commit
165967e2da

+ 5 - 0
cooleshow-user/user-biz/src/main/java/com/yonge/cooleshow/biz/dal/dto/search/TeacherSearch.java

@@ -12,6 +12,7 @@ import lombok.Data;
 import org.springframework.format.annotation.DateTimeFormat;
 
 import java.util.Date;
+import java.util.List;
 
 /**
  * @Author: liweifan
@@ -41,6 +42,10 @@ public class TeacherSearch extends QueryInfo{
     
     @ApiModelProperty("声部编号")
     private Long subjectId;
+
+
+    @ApiModelProperty("声部编号")
+    private List<Long> subjectIds;
 	
     @ApiModelProperty("是否结算")
 	private Boolean isSettlement;

+ 9 - 0
cooleshow-user/user-biz/src/main/java/com/yonge/cooleshow/biz/dal/entity/CustomerServiceBatchSending.java

@@ -56,10 +56,19 @@ public class CustomerServiceBatchSending implements Serializable {
 	@TableField(value = "send_status_")
     private EImSendStatus sendStatus;
 
+    @ApiModelProperty("是否发送")
+    @TableField(value = "send_flag_")
+    private Boolean sendFlag;
+
     @ApiModelProperty("发送时间") 
 	@TableField(value = "send_time_")
     private Date sendTime;
 
+
+    @ApiModelProperty("发送完成时间")
+    @TableField(value = "send_end_time_")
+    private Date sendEndTime;
+
     @ApiModelProperty("发送条件")
     @TableField(value = "condition_")
     private String condition;

+ 1 - 0
cooleshow-user/user-biz/src/main/java/com/yonge/cooleshow/biz/dal/enums/im/EImSendStatus.java

@@ -12,6 +12,7 @@ import lombok.Getter;
 public enum EImSendStatus implements BaseEnum<String, EImSendStatus> {
 
     WAIT("待发送"),
+    SENDING("发送中"),
     SEND("已发送"),
     DISABLE("已停用"),
     EXPIRE("已失效"),

+ 192 - 141
cooleshow-user/user-biz/src/main/java/com/yonge/cooleshow/biz/dal/service/impl/CustomerServiceBatchSendingServiceImpl.java

@@ -25,6 +25,7 @@ import com.yonge.cooleshow.biz.dal.enums.im.EImSendStatus;
 import com.yonge.cooleshow.biz.dal.enums.im.EImSendType;
 import com.yonge.cooleshow.biz.dal.mapper.CustomerServiceBatchSendingMapper;
 import com.yonge.cooleshow.biz.dal.mapper.SysUserMapper;
+import com.yonge.cooleshow.biz.dal.redisson.RedissonMessageService;
 import com.yonge.cooleshow.biz.dal.service.CustomerServiceBatchSendingService;
 import com.yonge.cooleshow.biz.dal.service.CustomerServiceReceiveService;
 import com.yonge.cooleshow.biz.dal.service.ImGroupService;
@@ -34,6 +35,7 @@ import com.yonge.cooleshow.biz.dal.service.TeacherService;
 import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerService;
 import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceBatchSendingWrapper;
 import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceReceiveWrapper;
+import com.yonge.cooleshow.biz.dal.wrapper.liveroom.LiveRoomWrapper;
 import com.yonge.cooleshow.common.constant.SysConfigConstant;
 import com.yonge.toolset.base.exception.BizException;
 import com.yonge.toolset.base.util.ImUtil;
@@ -48,6 +50,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.joda.time.DateTime;
+import org.redisson.api.RBucket;
+import org.redisson.api.RLock;
 import org.redisson.api.RedissonClient;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -96,6 +100,8 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
     @Autowired
     private SysConfigService sysConfigService;
 
+    @Autowired
+    private RedissonMessageService redissonMessageService;
 	/**
      * 查询详情
      * @param id 详情ID
@@ -453,154 +459,198 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
      * @param info CustomerServiceBatchSending
      */
     private void asyncBatchSendingMessage(CustomerServiceBatchSending info) {
-        String lockName = MessageFormat.format("batchSending:{0}", String.valueOf(info.getId()));
-        // 消息状态判定,且不能重复发送
-        DistributedLock.of(redissonClient).runIfLockCanGet(lockName, () -> {
-
-            // 异步发送消息且同步更新已发送人数,稍后可在页面刷新查看已发送用户数
-            ThreadPool.getExecutor().submit(() -> {
-                // 接收消息用户数
-                List<Integer> receiveNums = Lists.newCopyOnWriteArrayList();
-                // 分页数限制
-                int limit = 500;
-
-                try {
-
-                    // 消息发送数量,默认只发送1类消息(文字或图片)
-                    int messageNum = 1;
-                    if (StringUtils.isNoneBlank(info.getTextMessage(), info.getImgMessage())) {
-                        // 同时发送图片和文字消息
-                        messageNum += 1;
+
+        // 改状态为发送中,标记为发送中但是未发送
+        if (info.getId() == null) {
+            return;
+        }
+
+        CustomerServiceBatchSending byId = getById(info.getId());
+        if (byId == null) {
+            return;
+        }else if (byId.getSendStatus() == EImSendStatus.SEND) {
+            return;
+        } else if (byId.getSendStatus() == EImSendStatus.SENDING && byId.getSendFlag()) {
+            return;
+        }
+
+        CustomerServiceBatchSending customerServiceBatchSending = new CustomerServiceBatchSending();
+        customerServiceBatchSending.setId(info.getId());
+        customerServiceBatchSending.setSendFlag(false);
+        customerServiceBatchSending.setSendTime(new Date());
+        customerServiceBatchSending.setSendStatus(EImSendStatus.SENDING);
+        updateById(customerServiceBatchSending);
+
+        // 如果有发送中,不再发送
+        Integer count = lambdaQuery()
+            .eq(CustomerServiceBatchSending::getSendStatus, EImSendStatus.SENDING)
+            .eq(CustomerServiceBatchSending::getSendFlag, 1)
+            .count();
+        if (count > 0) {
+            return;
+        }
+
+        // 异步发送消息且同步更新已发送人数,稍后可在页面刷新查看已发送用户数
+        ThreadPool.getExecutor().submit(() -> {
+
+            // 加锁, 同时只能有一个在发送
+
+            String lockName = MessageFormat.format("batchSending:{0}", String.valueOf(info.getId()));
+            // 消息状态判定,且不能重复发送
+
+
+            DistributedLock.of(redissonClient).runIfLockCanGet(lockName, () -> sendMessage(info), 30L, TimeUnit.MINUTES);
+
+
+        });
+    }
+
+    private void sendMessage(CustomerServiceBatchSending info) {
+
+        lambdaUpdate()
+            .eq(CustomerServiceBatchSending::getId, info.getId())
+            .set(CustomerServiceBatchSending::getSendFlag, 1)
+            .update();
+
+        // 接收消息用户数
+        List<Integer> receiveNums = Lists.newCopyOnWriteArrayList();
+        // 分页数限制
+        int limit = 500;
+
+        try {
+
+            // 消息发送数量,默认只发送1类消息(文字或图片)
+            int messageNum = 1;
+            if (StringUtils.isNoneBlank(info.getTextMessage(), info.getImgMessage())) {
+                // 同时发送图片和文字消息
+                messageNum += 1;
+            }
+
+            int messageSendLimit; // 默认发送10000/分钟
+            // 群发消息频率限制
+            SysConfig config = sysConfigService.findByParamName(SysConfigConstant.MESSAGE_SEND_LIMIT);
+            // 群发消息频率限制不能超过10000/分钟,超过当前值默认取值为10000
+            if (config != null && config.getParamValue().matches("\\d+")) {
+                int sendLimit = Integer.parseInt(config.getParamValue());
+                if (sendLimit > 0 && sendLimit < 10000) {
+                    // 消息发送频率限制
+                    messageSendLimit = Integer.parseInt(config.getParamValue());
+                } else {
+                    // 默认发送10000/分钟
+                    messageSendLimit = 10000;
+                }
+            } else {
+                // 默认发送10000/分钟
+                messageSendLimit = 10000;
+            }
+
+            int finalMessageNum = messageNum;
+            if (EImReceiveType.ALL == info.getReceiveType()) {
+
+                // 推送消息给匹配用户
+                List<String> targetGroups = Arrays.stream(info.getTargetGroup().split(",")).collect(Collectors.toList());
+
+                targetGroups.forEach(clientType -> {
+                    boolean tenantFlag = false;
+                    switch (clientType) {
+                        case "TEACHER":
+                            clientType = "TEACHER";
+                            break;
+                        case "STUDENT":
+                            clientType = "STUDENT";
+                            break;
+                        case "TENANT_TEACHER":
+                            clientType = "TEACHER";
+                            tenantFlag = true;
+                            break;
+                        case "TENANT_STUDENT":
+                            clientType = "STUDENT";
+                            tenantFlag = true;
+                            break;
                     }
 
-                    int messageSendLimit; // 默认发送10000/分钟
-                    // 群发消息频率限制
-                    SysConfig config = sysConfigService.findByParamName(SysConfigConstant.MESSAGE_SEND_LIMIT);
-                    // 群发消息频率限制不能超过10000/分钟,超过当前值默认取值为10000
-                    if (config != null && config.getParamValue().matches("\\d+")) {
-                        int sendLimit = Integer.parseInt(config.getParamValue());
-                        if (sendLimit > 0 && sendLimit < 10000) {
-                            // 消息发送频率限制
-                            messageSendLimit = Integer.parseInt(config.getParamValue());
-                        } else {
-                            // 默认发送10000/分钟
-                            messageSendLimit = 10000;
-                        }
-                    } else {
-                        // 默认发送10000/分钟
-                        messageSendLimit = 10000;
+                    CustomerService.NotifyMessage receiveQuery = CustomerService.NotifyMessage.builder()
+                        .clientType(ClientEnum.valueOf(clientType))
+                        .tenantFlag(tenantFlag)
+                        .lockFlag(0)
+                        .delFlag(0)
+                        //.subjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()))
+                        .build();
+                    if (StringUtils.isNotBlank(info.getSendSubject())) {
+                        receiveQuery.setSubjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()));
                     }
 
-                    int finalMessageNum = messageNum;
-                    if (EImReceiveType.ALL == info.getReceiveType()) {
-
-                        // 推送消息给匹配用户
-                        List<String> targetGroups = Arrays.stream(info.getTargetGroup().split(",")).collect(Collectors.toList());
-
-                        targetGroups.forEach(clientType -> {
-                            boolean tenantFlag = false;
-                            switch (clientType) {
-                                case "TEACHER":
-                                    clientType = "TEACHER";
-                                    break;
-                                case "STUDENT":
-                                    clientType = "STUDENT";
-                                    break;
-                                case "TENANT_TEACHER":
-                                    clientType = "TEACHER";
-                                    tenantFlag = true;
-                                    break;
-                                case "TENANT_STUDENT":
-                                    clientType = "STUDENT";
-                                    tenantFlag = true;
-                                    break;
-                            }
-
-                            CustomerService.NotifyMessage receiveQuery = CustomerService.NotifyMessage.builder()
-                                .clientType(ClientEnum.valueOf(clientType))
-                                .tenantFlag(tenantFlag)
-                                .lockFlag(0)
-                                .delFlag(0)
-                                //.subjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()))
-                                .build();
-                            if (StringUtils.isNotBlank(info.getSendSubject())) {
-                                receiveQuery.setSubjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()));
-                            }
-
-                            Page<CustomerService.MessageReceives> page = PageUtil.getPage(1, 10);
-                            // 统计当前匹配用户数量
-                            getBaseMapper().selectMessageReceives(page, receiveQuery);
-
-                            // 计算总页数
-                            int pages = (int) (page.getTotal() - 1) / limit + 1;
-
-                            List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
-
-                            // 线程休眠计算器
-                            int sleepCount = 0;
-                            for (Integer pageNum : collect) {
-
-                                List<String> receiveUserIds = getBaseMapper().selectMessageReceives(PageUtil.getPage(pageNum, limit), receiveQuery).stream()
-                                        .map(x -> String.valueOf(x.getUserId()))
-                                        .collect(Collectors.toList());
-
-                                String finalClientType = clientType;
-                                receiveUserIds = receiveUserIds.stream()
-                                        .map(x -> imGroupService.getImUserId(x, finalClientType))
-                                        .collect(Collectors.toList());
-
-                                // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟
-                                sleepCount = messageSendSleepCondition(info, sleepCount, receiveUserIds.size(), finalMessageNum, messageSendLimit);
-
-                                batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds);
-                            }
-                        });
-
-                    } else {
-
-                        // 查询条件
-                        CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery receiveQuery = CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery
-                                .builder().batchSendingId(info.getId()).build();
-
-                        Page<CustomerServiceReceiveWrapper.CustomerServiceReceive> page = PageUtil.getPage(1, 10);
-                        // 推送消息给指定用户
-                        customerServiceReceiveService.selectPage(page, receiveQuery);
-
-                        // 计算总页数
-                        int pages = (int) (page.getTotal() - 1) / limit + 1;
-
-                        List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
-
-                        // 线程休眠计算器
-                        int sleepCount = 0;
-                        for (Integer pageNum : collect) {
-
-                            List<String> receiveUserIds = customerServiceReceiveService.selectPage(PageUtil.getPage(pageNum, limit), receiveQuery).getRecords().stream()
-                                    .map(x -> imGroupService.getImUserId(String.valueOf(x.getUserId()), x.getClientType()))
-                                    .collect(Collectors.toList());
-
-                            // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟
-                            sleepCount = messageSendSleepCondition(info, sleepCount, receiveUserIds.size(), finalMessageNum, messageSendLimit);
-
-                            batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds);
-                        }
+                    Page<CustomerService.MessageReceives> page = PageUtil.getPage(1, 10);
+                    // 统计当前匹配用户数量
+                    getBaseMapper().selectMessageReceives(page, receiveQuery);
+
+                    // 计算总页数
+                    int pages = (int) (page.getTotal() - 1) / limit + 1;
+
+                    List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
+
+                    // 线程休眠计算器
+                    int sleepCount = 0;
+                    for (Integer pageNum : collect) {
+
+                        List<String> receiveUserIds = getBaseMapper().selectMessageReceives(PageUtil.getPage(pageNum, limit), receiveQuery).stream()
+                                .map(x -> String.valueOf(x.getUserId()))
+                                .collect(Collectors.toList());
+
+                        String finalClientType = clientType;
+                        receiveUserIds = receiveUserIds.stream()
+                                .map(x -> imGroupService.getImUserId(x, finalClientType))
+                                .collect(Collectors.toList());
+
+                        // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟
+                        sleepCount = messageSendSleepCondition(info, sleepCount, receiveUserIds.size(), finalMessageNum, messageSendLimit);
+
+                        batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds);
                     }
+                });
+
+            } else {
 
-                    // 更新消息状态为已发送
-                    lambdaUpdate()
-                            .eq(CustomerServiceBatchSending::getId, info.getId())
-                            .set(CustomerServiceBatchSending::getSendStatus, EImSendStatus.SEND)
-                            .set(CustomerServiceBatchSending::getReceiveNumber, receiveNums.stream().mapToInt(Integer::intValue).sum())
-                            .set(CustomerServiceBatchSending::getSendTime, DateTime.now().toDate())
-                            .update();
+                // 查询条件
+                CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery receiveQuery = CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery
+                        .builder().batchSendingId(info.getId()).build();
 
-                } catch (Exception e) {
-                    log.error("sendMessage id={}", info.getId(), e);
+                Page<CustomerServiceReceiveWrapper.CustomerServiceReceive> page = PageUtil.getPage(1, 10);
+                // 推送消息给指定用户
+                customerServiceReceiveService.selectPage(page, receiveQuery);
+
+                // 计算总页数
+                int pages = (int) (page.getTotal() - 1) / limit + 1;
+
+                List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
+
+                // 线程休眠计算器
+                int sleepCount = 0;
+                for (Integer pageNum : collect) {
+
+                    List<String> receiveUserIds = customerServiceReceiveService.selectPage(PageUtil.getPage(pageNum, limit), receiveQuery).getRecords().stream()
+                            .map(x -> imGroupService.getImUserId(String.valueOf(x.getUserId()), x.getClientType()))
+                            .collect(Collectors.toList());
+
+                    // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟
+                    sleepCount = messageSendSleepCondition(info, sleepCount, receiveUserIds.size(), finalMessageNum, messageSendLimit);
+
+                    batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds);
                 }
+            }
 
-            });
 
-        }, 30L, TimeUnit.MINUTES);
+        } catch (Exception e) {
+            log.error("sendMessage id={}", info.getId(), e);
+        } finally {
+            // 更新消息状态为已发送
+            lambdaUpdate()
+                .eq(CustomerServiceBatchSending::getId, info.getId())
+                .set(CustomerServiceBatchSending::getSendStatus, EImSendStatus.SEND)
+                .set(CustomerServiceBatchSending::getReceiveNumber, receiveNums.stream().mapToInt(Integer::intValue).sum())
+                .set(CustomerServiceBatchSending::getSendEndTime, DateTime.now().toDate())
+                .update();
+        }
     }
 
     /**
@@ -788,9 +838,10 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
             Date endTime = DateTime.now().toDate();
 
             List<CustomerServiceBatchSending> batchSendings = lambdaQuery()
-                    .between(CustomerServiceBatchSending::getSendTime, startTime, endTime)
-                    .eq(CustomerServiceBatchSending::getSendType, EImSendType.SCHEDULED)
-                    .eq(CustomerServiceBatchSending::getSendStatus, EImSendStatus.WAIT)
+                    .lt(CustomerServiceBatchSending::getSendTime, endTime)
+//                    .eq(CustomerServiceBatchSending::getSendType, EImSendType.SCHEDULED)
+                    .in(CustomerServiceBatchSending::getSendStatus, EImSendStatus.WAIT,EImSendStatus.SENDING)
+                    .ne(CustomerServiceBatchSending::getSendFlag,1)
                     .list();
 
             if (CollectionUtils.isNotEmpty(batchSendings)) {

+ 2 - 0
cooleshow-user/user-biz/src/main/resources/config/mybatis/CustomerServiceBatchSendingMapper.xml

@@ -15,12 +15,14 @@
         , t.send_type_ AS sendType
         , t.send_status_ AS sendStatus
         , t.send_time_ AS sendTime
+        , t.send_end_time_ AS sendEndTime
         , t.condition_ AS `condition`
         , t.title_ AS title
         , t.text_message_ AS textMessage
         , t.img_message_ AS imgMessage
         , t.img_url_ AS imgUrl
         , t.sender_id_ AS senderId
+        , t.send_flag_ AS sendFlag
         , t.create_by_ AS createBy
         , t.create_time_ AS createTime
     </sql>

+ 7 - 0
cooleshow-user/user-biz/src/main/resources/config/mybatis/TeacherMapper.xml

@@ -178,6 +178,13 @@
             <if test="param.subjectId != null">
                 and find_in_set(#{param.subjectId},t.subject_id_)
             </if>
+            <if test="param.subjectIds != null and param.subjectIds.size() != 0">
+                and (
+                <foreach collection="param.subjectIds" item="subjectId" separator=" or " open="(" close=")">
+                    find_in_set(#{subjectId},t.subject_id_)
+                </foreach>
+                )
+            </if>
             <if test="param.isSettlement != null">
                 and t.is_settlement_ = #{param.isSettlement}
             </if>