浏览代码

Merge branch 'feature/1020-tencent-im' into saas

liujc 1 年之前
父节点
当前提交
c65d8f3fc7

+ 69 - 2
mec-application/src/main/java/com/ym/mec/web/controller/ImHistoryMessageController.java

@@ -16,15 +16,18 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.*;
 
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 @RequestMapping("${app-config.url.web:}/imHistoryMessage")
 @Api(tags = "系统操作日志")
@@ -58,10 +61,74 @@ public class ImHistoryMessageController extends BaseController {
 
         ExecutorService executor = Executors.newSingleThreadExecutor();
         executor.execute(() -> {
+
+            // 100个线程的无界线程池
+            ExecutorService executorService = Executors.newFixedThreadPool(100);
+            com.yonge.mongodb.PageInfo<HistoryMessage> info;
+//        historyMessageService.updateAllStatus(0);
+            //计算总数据量
+            int count = imGroupService.queryInfoCount(3);
+            log.info("总数据量:"+count);
+            //计算调用次数
+            int num = (int) Math.ceil(count / 100);
+            for (int i = 0; i <= num; i++) {
+                int size = 100;
+                //获取融云消息
+                //List<ImGroup> list = imGroupService.lambdaQuery().last("limit "+(page-1)*size+","+size).list();
+                info = imGroupService.getRongYunInfo(1, size,3);
+                List<HistoryMessage> footer = info.getRows();
+                if (CollectionUtils.isEmpty(footer)) {
+                    break;
+                }
+                //IM导入
+                try {
+                    // 过滤 groupId为空的
+                    footer = footer.stream().filter(o -> StringUtils.isNotBlank(o.getGroupId())).collect(Collectors.toList());
+                    // 根据 groupId 分组
+                    Map<String, List<HistoryMessage>> map = footer.stream().collect(Collectors.groupingBy(HistoryMessage::getGroupId));
+
+                    List<Future<?>> list = new ArrayList<>();
+                    for (Map.Entry<String, List<HistoryMessage>> entry : map.entrySet()) {
+                        String k = entry.getKey();
+                        List<HistoryMessage> v = entry.getValue();
+
+                        list.add(executorService.submit(() -> {
+                            try {
+                                imGroupService.importInfo(v);
+                            } catch (Exception e) {
+
+                                log.error("群聊导入失败",e);
+                            }
+
+                        }));
+                    }
+                    for (Future<?> future : list) {
+                        try {
+                            future.get();
+                        } catch (Exception e){
+                            log.error("群聊导入失败",e);
+                        }
+                    }
+                } catch (Exception e) {
+                    log.error("IM导入失败",e);
+                }
+                log.info("已导入:" + (i + 1) * size);
+            }
+            executorService.shutdown();
+        });
+    }
+
+
+    @ApiOperation("IM导入消息")
+    @PostMapping(value = "/ImportIMPrivate")
+    public void ImportIMPrivate() {
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        executor.execute(() -> {
             com.yonge.mongodb.PageInfo<HistoryMessage> info;
 //        historyMessageService.updateAllStatus(0);
             //计算总数据量
-            int count = imGroupService.queryInfoCount();
+            int count = imGroupService.queryInfoCount(1);
             log.info("总数据量:"+count);
             //计算调用次数
             int num = (int) Math.ceil(count / 100);
@@ -69,7 +136,7 @@ public class ImHistoryMessageController extends BaseController {
                 int size = 100;
                 //获取融云消息
                 //List<ImGroup> list = imGroupService.lambdaQuery().last("limit "+(page-1)*size+","+size).list();
-                info = imGroupService.getRongYunInfo(1, size);
+                info = imGroupService.getRongYunInfo(1, size,1);
                 List<HistoryMessage> footer = info.getRows();
                 if (CollectionUtils.isEmpty(footer)) {
                     break;

+ 2 - 5
mec-biz/src/main/java/com/ym/mec/biz/service/ImGroupService.java

@@ -3,13 +3,10 @@ package com.ym.mec.biz.service;
 import com.alibaba.fastjson.JSONObject;
 import com.microsvc.toolkit.middleware.im.message.GroupMemberWrapper;
 import com.microsvc.toolkit.middleware.im.message.TencentRequest;
-import com.microsvc.toolkit.middleware.rtc.RTCRoomPluginService;
-import com.ym.mec.biz.dal.dto.im.BasicUserInfo;
 import com.ym.mec.biz.dal.dto.ImGroupDto;
 import com.ym.mec.biz.dal.dto.ImGroupMemberDto;
 import com.ym.mec.biz.dal.dto.NameDto;
 import com.ym.mec.biz.dal.entity.ImGroup;
-import com.ym.mec.biz.dal.entity.ImHistoryMessage;
 import com.ym.mec.biz.dal.enums.school.ESchoolStaffType;
 import com.ym.mec.biz.dal.wrapper.ImGroupWrapper;
 import com.ym.mec.common.service.BaseService;
@@ -195,12 +192,12 @@ public interface ImGroupService extends BaseService<String, ImGroup> {
 	 * 查询总数
 	 * @return
 	 */
-	int queryInfoCount();
+	int queryInfoCount(int targetType);
 
 	/**
 	 * 读取融云数据库信息
 	 */
-	com.yonge.mongodb.PageInfo getRongYunInfo(int page, int size);
+	com.yonge.mongodb.PageInfo getRongYunInfo(int page, int size, int targetType);
 
 	/**
 	 * 导入IM腾讯云

+ 4 - 5
mec-biz/src/main/java/com/ym/mec/biz/service/impl/ImGroupServiceImpl.java

@@ -12,7 +12,6 @@ import com.microsvc.toolkit.middleware.im.message.GroupMemberWrapper;
 import com.microsvc.toolkit.middleware.im.message.MessageWrapper;
 import com.microsvc.toolkit.middleware.im.message.TencentRequest;
 import com.ym.mec.auth.api.entity.SysUser;
-import com.ym.mec.auth.api.enums.SysUserType;
 import com.ym.mec.biz.dal.dao.ClassGroupTeacherMapperDao;
 import com.ym.mec.biz.dal.dao.ImGroupDao;
 import com.ym.mec.biz.dal.dao.ImGroupMemberDao;
@@ -1259,8 +1258,8 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
     }
 
     @Override
-    public int queryInfoCount() {
-        long count = historyMessageService.selectCount(0,new DateTime().plusMonths(-2).toDate());
+    public int queryInfoCount(int targetType) {
+        long count = historyMessageService.selectCount(0,new DateTime().plusMonths(-2).toDate(),targetType);
         return (int) count;
     }
 
@@ -1322,8 +1321,8 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
 
 
     @Override
-    public com.yonge.mongodb.PageInfo<HistoryMessage> getRongYunInfo( int page, int size) {
-        com.yonge.mongodb.PageInfo<HistoryMessage> pageInfo = historyMessageService.selectPage(page, size, 0,new DateTime().plusMonths(-2).toDate());
+    public com.yonge.mongodb.PageInfo<HistoryMessage> getRongYunInfo(int page, int size, int targetType) {
+        com.yonge.mongodb.PageInfo<HistoryMessage> pageInfo = historyMessageService.selectPage(page, size, 0,new DateTime().plusMonths(-2).toDate(),targetType);
         //List<HistoryMessage> imHistoryMessageslist = imGroupDao.selectAll(result,size);
         return pageInfo;
     }

+ 2 - 4
mec-common/audit-log/src/main/java/com/yonge/log/service/HistoryMessageService.java

@@ -5,9 +5,7 @@ import com.yonge.mongodb.PageInfo;
 import com.yonge.mongodb.service.BaseServiceWithMongo;
 
 import java.io.File;
-import java.time.LocalDateTime;
 import java.util.Date;
-import java.util.List;
 
 public interface HistoryMessageService extends BaseServiceWithMongo<String, HistoryMessage> {
 
@@ -18,9 +16,9 @@ public interface HistoryMessageService extends BaseServiceWithMongo<String, Hist
      */
     void saveImHistoryMessage(File file) throws Exception;
 
-    long selectCount(int status, Date date);
+    long selectCount(int status, Date date, int targetType);
 
-    PageInfo<HistoryMessage> selectPage(int page, int size, int status, Date date);
+    PageInfo<HistoryMessage> selectPage(int page, int size, int status, Date date, int targetType);
 
     void updateStatus(String id, Integer status);
 

+ 7 - 8
mec-common/audit-log/src/main/java/com/yonge/log/service/impl/HistoryMessageServiceImpl.java

@@ -1,22 +1,18 @@
 package com.yonge.log.service.impl;
 
 import com.alibaba.fastjson.JSONObject;
-import com.mongodb.operation.AggregateOperation;
 import com.yonge.log.dal.dao.HistoryMessageDao;
 import com.yonge.log.dal.model.HistoryMessage;
-import com.yonge.log.dal.model.HistoryMessageTencent;
 import com.yonge.log.service.HistoryMessageService;
 import com.yonge.mongodb.PageInfo;
 import com.yonge.mongodb.dao.BaseDaoWithMongo;
 import com.yonge.mongodb.service.impl.BaseServiceImplWithMongo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.mongodb.core.MongoTemplate;
-import org.springframework.data.mongodb.core.aggregation.Aggregation;
 import org.springframework.data.mongodb.core.query.Criteria;
 import org.springframework.data.mongodb.core.query.Query;
 import org.springframework.data.mongodb.core.query.Update;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -33,7 +29,6 @@ import java.util.Set;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
-import java.util.*;
 import java.util.stream.Collectors;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
@@ -64,7 +59,7 @@ public class HistoryMessageServiceImpl extends BaseServiceImplWithMongo<String,
     }
 
     @Override
-    public long selectCount(int status, Date date) {
+    public long selectCount(int status, Date date, int targetType) {
         Query query = new Query();
 
         // 查询status 为null or 0
@@ -77,12 +72,14 @@ public class HistoryMessageServiceImpl extends BaseServiceImplWithMongo<String,
 
 
         Criteria criteria = Criteria.where("dateTime").gt(dateFormat).orOperator(Criteria.where("status").is(0),Criteria.where("status").is(null));
+        criteria.and("targetType").is(targetType);
+        criteria.and("classname").in("RC:ImgMsg", "RC:GIFMsg", "RC:HQVCMsg", "RC:FileMsg", "RC:SightMsg", "RC:LBSMsg", "RC:ImgTextMsg","RC:TxtMsg");
         query.addCriteria(criteria);
         return mongoTemplate.count(query,HistoryMessage.class);
     }
 
     @Override
-    public PageInfo<HistoryMessage> selectPage(int page, int size, int status, Date date) {
+    public PageInfo<HistoryMessage> selectPage(int page, int size, int status, Date date, int targetType) {
         PageInfo<HistoryMessage> pageInfo = new PageInfo<HistoryMessage>(page, size);
 
         // date 转 LocalDateTime
@@ -90,12 +87,14 @@ public class HistoryMessageServiceImpl extends BaseServiceImplWithMongo<String,
 
         String dateFormat = ldt.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
         List<HistoryMessage> dataList = null;
-        long total = this.selectCount(status,date);
+        long total = this.selectCount(status,date, targetType);
         if (total > 0) {
             pageInfo.setTotal(total);
 
             Query query = new Query();
             Criteria criteria = Criteria.where("dateTime").gt(dateFormat).orOperator(Criteria.where("status").is(0),Criteria.where("status").is(null));
+            criteria.and("targetType").is(targetType);
+            criteria.and("classname").in("RC:ImgMsg", "RC:GIFMsg", "RC:HQVCMsg", "RC:FileMsg", "RC:SightMsg", "RC:LBSMsg", "RC:ImgTextMsg","RC:TxtMsg");
             query.addCriteria(criteria);
             query.skip(pageInfo.getOffset()).limit(pageInfo.getLimit());
             dataList = mongoTemplate.find(query, HistoryMessage.class);