Browse Source

Merge branch 'feature/1020-tencent-im' into test

liujc 2 years ago
parent
commit
5e537237c5

+ 4 - 2
mec-application/src/main/java/com/ym/mec/web/controller/ImGroupController.java

@@ -322,9 +322,11 @@ public class ImGroupController extends BaseController {
 
 	@GetMapping("/groupTransfer")
 	public Object groupTransfer(@RequestParam(required = false) String startTime,
-								@RequestParam(required = false) String endTime) {
+								@RequestParam(required = false) String endTime,
+								@RequestParam(required = false) Integer type
+                                ) {
 		ExecutorService executor = Executors.newSingleThreadExecutor();
-		executor.execute(() -> imGroupService.groupTransfer(startTime, endTime));
+		executor.execute(() -> imGroupService.groupTransfer(startTime, endTime,type));
 		return succeed();
 	}
 

+ 2 - 27
mec-application/src/main/java/com/ym/mec/web/controller/ImHistoryMessageController.java

@@ -22,6 +22,7 @@ import org.springframework.web.bind.annotation.*;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -63,7 +64,6 @@ public class ImHistoryMessageController extends BaseController {
         executor.execute(() -> {
 
             // 100个线程的无界线程池
-            ExecutorService executorService = Executors.newFixedThreadPool(100);
             com.yonge.mongodb.PageInfo<HistoryMessage> info;
 //        historyMessageService.updateAllStatus(0);
             //计算总数据量
@@ -84,37 +84,12 @@ public class ImHistoryMessageController extends BaseController {
                 try {
                     // 过滤 groupId为空的
                     footer = footer.stream().filter(o -> StringUtils.isNotBlank(o.getGroupId())).collect(Collectors.toList());
-                    // 根据 groupId 分组
-                    Map<String, List<HistoryMessage>> map = footer.stream().collect(Collectors.groupingBy(HistoryMessage::getGroupId));
-
-                    List<Future<?>> list = new ArrayList<>();
-                    for (Map.Entry<String, List<HistoryMessage>> entry : map.entrySet()) {
-                        String k = entry.getKey();
-                        List<HistoryMessage> v = entry.getValue();
-
-                        list.add(executorService.submit(() -> {
-                            try {
-                                imGroupService.importInfo(v);
-                            } catch (Exception e) {
-
-                                log.error("群聊导入失败",e);
-                            }
-
-                        }));
-                    }
-                    for (Future<?> future : list) {
-                        try {
-                            future.get();
-                        } catch (Exception e){
-                            log.error("群聊导入失败",e);
-                        }
-                    }
+                    imGroupService.importInfo(footer);
                 } catch (Exception e) {
                     log.error("IM导入失败",e);
                 }
                 log.info("已导入:" + (i + 1) * size);
             }
-            executorService.shutdown();
         });
     }
 

+ 1 - 1
mec-biz/src/main/java/com/ym/mec/biz/service/ImGroupService.java

@@ -181,7 +181,7 @@ public interface ImGroupService extends BaseService<String, ImGroup> {
 	/**
 	 * 群迁移
 	 */
-	void groupTransfer(String startTime,String endTime);
+	void groupTransfer(String startTime,String endTime,Integer type);
 
 	/**
 	 * 导入用户

+ 70 - 49
mec-biz/src/main/java/com/ym/mec/biz/service/impl/ImGroupServiceImpl.java

@@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.core.toolkit.IdWorker;
 import com.google.common.collect.Lists;
+import com.microsvc.toolkit.middleware.common.http.DateUtil;
 import com.microsvc.toolkit.middleware.common.http.ImageUtil;
 import com.microsvc.toolkit.middleware.im.ImPluginContext;
 import com.microsvc.toolkit.middleware.im.message.GroupMemberWrapper;
@@ -886,23 +887,27 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
      * 群迁移
      */
     @Override
-    public void groupTransfer(String startTime, String endTime) {
+    public void groupTransfer(String startTime, String endTime,Integer type) {
 
         // 100个线程的无界线程池
-        ExecutorService executorService = Executors.newFixedThreadPool(100);
+
+        // 100个线程的无界线程池
+        ThreadPoolExecutor executorService =  new ThreadPoolExecutor(100, 100,
+            0L, TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>());
         int page = 1;
         int size = 100;
         QueryInfo queryInfo = new QueryInfo();
         queryInfo.setPage(page);
         queryInfo.setRows(size);
-        queryInfo.setImportFlag(false);
+//        queryInfo.setImportFlag(false);
         PageInfo<ImGroup> imGroupPageInfo = this.queryPage(queryInfo);
         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
         int total = imGroupPageInfo.getTotal();
         int importImGroupCount = 0;
         int num = (int) Math.ceil(total / 100);
         for (int i = 0; i <=num ; i++) {
-
+            queryInfo.setPage(page);
             imGroupPageInfo = this.queryPage(queryInfo);
             List<ImGroup> rows = imGroupPageInfo.getRows();
             try {
@@ -923,54 +928,54 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
             } catch (ParseException e) {
                 throw new BizException("时间区间参数错误,格式为:yyyy-MM-dd");
             }
-            List<Future<?>> list = new ArrayList<>();
 
-            rows.forEach(imGroup -> {
+            if (type == 1) {
+                rows.forEach(imGroup -> {
+
+                    executorService.submit(() -> {
+                        log.info("群销毁开始:{}", imGroup.getId());
+                        // 先删除群组
+                        try {
+                            // 解散群
+                            imPluginContext.getPluginService().groupDismiss(imGroup.getId(), new ArrayList<>());
+                        } catch (Exception e) {
+                            log.error(String.format("群迁移删除群聊失败:%s", e.getMessage()), e);
+                        }
+                        try {
+                            Thread.sleep(1000);
+                        } catch (InterruptedException e) {
+                            log.error("线程等待异常", e);
+                        }
+                    });
+                });
+            } else {
 
-                list.add(executorService.submit(() -> {
-                    log.info("群销毁开始:{}", imGroup.getId());
-                    // 先删除群组
-                    try {
-                        // 解散群
-                        imPluginContext.getPluginService().groupDismiss(imGroup.getId(), new ArrayList<>());
-                    } catch (Exception e) {
-                        log.error(String.format("群迁移删除群聊失败:%s", e.getMessage()), e);
-                    }
-                }));
-            });
-            for (Future<?> future : list) {
-                try {
-                    future.get();
-                } catch (Exception e){
-                    log.error("群销毁失败",e);
+                for (ImGroup row : rows) {
+                    executorService.execute(() -> {
+                        groupTransfer(Lists.newArrayList(row));
+                        try {
+                            Thread.sleep(1000);
+                        } catch (InterruptedException e) {
+                            log.error("线程等待异常", e);
+                        }
+                    });
                 }
-            }
 
-            try {
-                Thread.sleep(1500);
-            } catch (InterruptedException e) {
-                log.error("线程休眠失败",e);
-            }
-
-
-            list.clear();
-            for (ImGroup row : rows) {
-                list.add(executorService.submit(() -> {
-                    groupTransfer(Lists.newArrayList(row));
-                }));
-            }
-
-            for (Future<?> future : list) {
-                try {
-                    future.get();
-                } catch (Exception e){
-                    log.error("群迁移失败",e);
-                }
             }
+            page ++;
             importImGroupCount += rows.size();
             log.info("------------------------------- import im group --------------------------------------------");
             log.info("import im group success count:{}/{}", importImGroupCount, total);
         }
+
+        while (executorService.getQueue().size() > 0) {
+            try {
+                log.info("线程池中正在执行的任务数量:{},等待数量:{}", executorService.getActiveCount(),executorService.getQueue().size());
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                log.error("线程等待异常", e);
+            }
+        }
         log.info("-------------------- import im group finished and success! -------------------------------");
         executorService.shutdown();
     }
@@ -1056,7 +1061,11 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
                             return data;
                         }).collect(Collectors.toList());
                 importGroupMember.setMemberList(members);
-                imPluginContext.getPluginService().importGroupMember(importGroupMember);
+                List<List<MessageWrapper.ImportGroupMemberData>> partition = Lists.partition(members, 88);
+                for (List<MessageWrapper.ImportGroupMemberData> importGroupMemberData : partition) {
+                    importGroupMember.setMemberList(importGroupMemberData);
+                    imPluginContext.getPluginService().importGroupMember(importGroupMember);
+                }
                 imGroupDao.updateImportStatusSuccess(imGroup.getId());
             } catch (Exception e) {
                 log.error(String.format("群迁移失败,失败群组:%s", JSON.toJSONString(imGroup)));
@@ -1436,7 +1445,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
                     privateImportMessage.setTencentMessageBody(list);
                     privateImportMessage.setCloudCustomData(jsonObject.getString("extra"));
 
-                    log.info("导入私聊消息:{}", JSON.toJSONString(privateImportMessage));
+//                    log.info("导入私聊消息:{}", JSON.toJSONString(privateImportMessage));
                     try {
                         imPluginContext.getPluginService().importPrivateMessage(privateImportMessage);
                         //为已导入数据更改标识
@@ -1445,7 +1454,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
                         log.info("导入私聊消息成功");
                     } catch (Exception e) {
                         updateStatus(i,2);
-                        log.error("导入私聊IM消息失败 msg:{},entity:{}", list, i, e);
+                        log.error("导入私聊IM消息失败 msg:{},entity:{}", JSON.toJSONString(privateImportMessage), i, e);
                     }
                 } else if (type == 3) {
                     //群组会话
@@ -1471,7 +1480,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
 
                     data1.setFromAccount(imUserId);
                     //设置随机数
-                    data1.setRandom(new Random().nextInt());
+                    data1.setRandom(Math.abs(new Random().nextInt()));
                     //设置发送时间
 
                     String time = i.getDateTime();
@@ -1514,7 +1523,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
                     list.add(data1);
                     //导入消息列表
                     groupImportMessage.setMsgList(list);
-                    log.info("导入群聊消息:{}", JSON.toJSONString(groupImportMessage));
+//                    log.info("导入群聊消息:{}", JSON.toJSONString(groupImportMessage));
                     try {
                         imPluginContext.getPluginService().importGroupMessage(groupImportMessage);
                         //为已导入数据更改标识
@@ -1522,7 +1531,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
                         log.info("导入消息成功");
                     } catch (Exception e) {
                         updateStatus(i,2);
-                        log.error("导入群组IM消息失败 msg:{},entity:{}", list, i, e);
+                        log.error("导入群组IM消息失败 msg:{},entity:{}", JSON.toJSONString(groupImportMessage), i, e);
                     }
                 }
             }catch (Exception e) {
@@ -1758,6 +1767,18 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
         if (CollectionUtils.isEmpty(rows)) {
             return pageInfo;
         }
+
+        for (HistoryMessageTencentWrapper.HistoryMessageTencent row : rows) {
+            // 时间戳 转换
+            Long msgTime = row.getMsgTimestamp();
+            if (msgTime != null) {
+                String dateTime = String.valueOf(msgTime);
+                if (dateTime.length() == 10) {
+                    dateTime = dateTime + "000";
+                }
+                row.setMsgTime(Long.parseLong(DateUtil.format(new Date(Long.parseLong(dateTime)), "yyyyMMddHH")));
+            }
+        }
         // 发送人信息
         List<Integer> fromAccountList = rows.stream()
             .map(HistoryMessageTencentWrapper.HistoryMessageTencent::getFromAccount)

+ 1 - 1
mec-common/audit-log/src/main/java/com/yonge/log/service/impl/HistoryMessageTenantServiceImpl.java

@@ -143,7 +143,7 @@ public class HistoryMessageTenantServiceImpl extends BaseServiceImplWithMongo<St
 
 
         query.addCriteria(criteria);
-        query.with(Sort.by(Sort.Order.asc("msg_timestamp"))); // 以升序方式按字段排序
+        query.with(Sort.by(Sort.Order.desc("msg_timestamp"))); // 以升序方式按字段排序
 
         return query;
     }