Forráskód Böngészése

Merge branch 'feature/1020-tencent-im' into saas

liujc 1 éve
szülő
commit
9c492f2bbd

+ 24 - 6
mec-application/src/main/java/com/ym/mec/web/controller/ImHistoryMessageController.java

@@ -20,9 +20,11 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.*;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 @RequestMapping("${app-config.url.web:}/imHistoryMessage")
 @Api(tags = "系统操作日志")
@@ -60,6 +62,7 @@ public class ImHistoryMessageController extends BaseController {
 //        historyMessageService.updateAllStatus(0);
             //计算总数据量
             int count = imGroupService.queryInfoCount();
+            log.info("总数据量:"+count);
             //计算调用次数
             int num = (int) Math.ceil(count / 100);
             for (int i = 0; i <= num; i++) {
@@ -77,6 +80,7 @@ public class ImHistoryMessageController extends BaseController {
                 } catch (Exception e) {
                     log.error("IM导入失败",e);
                 }
+                log.info("已导入:" + (i + 1) * size);
             }
         });
     }
@@ -90,15 +94,18 @@ public class ImHistoryMessageController extends BaseController {
             // 删除旧数据
 //        historyMessageTenantService.deleteOld();
 
+
+            // 100个线程的无界线程池
+            ExecutorService executorService = Executors.newFixedThreadPool(100);
             com.yonge.mongodb.PageInfo<HistoryMessage> info;
 //        historyMessageService.updateAllStatus(0);
             //计算总数据量
             long count = historyMessageService.querySyncCount();
             log.info("总数据量:"+count);
             //计算调用次数
-            long num = (int) Math.ceil(count / 100);
+            long num = (int) Math.ceil(count / 200);
             for (long i = 0; i <=num ; i++) {
-                int size = 100;
+                int size = 200;
                 //获取融云消息
                 //List<ImGroup> list = imGroupService.lambdaQuery().last("limit "+(page-1)*size+","+size).list();
                 info = historyMessageService.getImToTencent(1, size);
@@ -107,15 +114,26 @@ public class ImHistoryMessageController extends BaseController {
                     break;
                 }
                 //IM转换
-                Lists.partition(footer,10).parallelStream().forEach(historyMessages -> {
+                List<Future<?>> list = new ArrayList<>();
+                for (List<HistoryMessage> historyMessages : Lists.partition(footer, 10)) {
+                    list.add(executorService.submit(() -> {
+                        try {
+                            imGroupService.imToTencent(historyMessages);
+                        } catch (Exception e) {
+                            log.error("IM转换失败",e);
+                        }
+                    }));
+                }
+                for (Future<?> future : list) {
                     try {
-                        imGroupService.imToTencent(footer);
-                    } catch (Exception e) {
+                        future.get();
+                    } catch (Exception e){
                         log.error("IM转换失败",e);
                     }
-                });
+                }
                 log.info("已转换:" + (i + 1) * size);
             }
+            executorService.shutdown();
         });
     }
 

+ 84 - 18
mec-biz/src/main/java/com/ym/mec/biz/service/impl/ImGroupServiceImpl.java

@@ -78,6 +78,7 @@ import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.*;
+import java.util.concurrent.*;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -891,8 +892,11 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
      */
     @Override
     public void groupTransfer(String startTime, String endTime) {
+
+        // 100个线程的无界线程池
+        ExecutorService executorService = Executors.newFixedThreadPool(100);
         int page = 1;
-        int size = 100;
+        int size = 400;
         QueryInfo queryInfo = new QueryInfo();
         queryInfo.setPage(page);
         queryInfo.setRows(size);
@@ -901,7 +905,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
         int total = imGroupPageInfo.getTotal();
         int importImGroupCount = 0;
-        int num = (int) Math.ceil(total / 100);
+        int num = (int) Math.ceil(total / 400);
         for (int i = 0; i <=num ; i++) {
 
             imGroupPageInfo = this.queryPage(queryInfo);
@@ -924,12 +928,25 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
             } catch (ParseException e) {
                 throw new BizException("时间区间参数错误,格式为:yyyy-MM-dd");
             }
-            Lists.partition(rows,10).parallelStream().forEach(this::groupTransfer);
+            List<Future<?>> list = new ArrayList<>();
+            for (List<ImGroup> imGroups : Lists.partition(rows, 20)) {
+                list.add(executorService.submit(() -> {
+                    groupTransfer(imGroups);
+                }));
+            }
+            for (Future<?> future : list) {
+                try {
+                    future.get();
+                } catch (Exception e){
+                    log.error("群迁移失败",e);
+                }
+            }
             importImGroupCount += rows.size();
             log.info("------------------------------- import im group --------------------------------------------");
             log.info("import im group success count:{}/{}", importImGroupCount, total);
         }
         log.info("-------------------- import im group finished and success! -------------------------------");
+        executorService.shutdown();
     }
 
     private void groupTransfer(List<ImGroup> records) {
@@ -1068,17 +1085,31 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
 
         int count = 0;
 
+        // 100个线程的无界线程池
+        ThreadPoolExecutor executorService =  new ThreadPoolExecutor(100, 100,
+            0L, TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>());
+
         log.info("------------------------------- import user student ---------------------------");
         List<String[]> student = getStudent(page, rows);
         SysConfig studentAvatar = sysConfigService.findByParamName(SysConfigService.STUDENT_DEFAULT_AVATAR);
         while (!student.isEmpty()) {
-            student.parallelStream().forEach(next -> {
+            for (String[] next : student) {
                 String avatar = next[2];
                 if (StringUtils.isEmpty(avatar)) {
                     avatar = studentAvatar.getParanValue();
                 }
-                register(next[0], next[1], avatar);
-            });
+                String finalAvatar = avatar;
+                executorService.execute(() -> {
+                    register(next[0], next[1], finalAvatar);
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        log.error("线程等待异常", e);
+                    }
+                });
+            }
+
             count += student.size();
             log.info("import im student success count:{}", count);
             page++;
@@ -1092,13 +1123,23 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
         SysConfig staffAvatar = sysConfigService.findByParamName(SysConfigService.USER_DEFAULT_AVATAR);
         List<String[]> staff = getStaff(page, rows);
         while (!staff.isEmpty()) {
-            staff.parallelStream().forEach(next -> {
+
+            for (String[] next : staff) {
                 String avatar = next[2];
                 if (StringUtils.isEmpty(avatar)) {
                     avatar = staffAvatar.getParanValue();
                 }
-                register(next[0], next[1], avatar);
-            });
+                String finalAvatar = avatar;
+                executorService.execute(() -> {
+                    register(next[0], next[1], finalAvatar);
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        log.error("线程等待异常", e);
+                    }
+                });
+            }
+
             count += staff.size();
             log.info("import im staff success count:{}", count);
             page++;
@@ -1112,13 +1153,21 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
         SysConfig empAvatar = sysConfigService.findByParamName(SysConfigService.TEACHER_DEFAULT_AVATAR);
         List<String[]> emps = getEmp(page, rows);
         while (!emps.isEmpty()) {
-            emps.parallelStream().forEach(next -> {
+            for (String[] next : emps) {
                 String avatar = next[2];
                 if (StringUtils.isEmpty(avatar)) {
                     avatar = empAvatar.getParanValue();
                 }
-                register(next[0], next[1], avatar);
-            });
+                String finalAvatar = avatar;
+                executorService.execute(() -> {
+                    register(next[0], next[1], finalAvatar);
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        log.error("线程等待异常", e);
+                    }
+                });
+            }
             count += emps.size();
             log.info("import im emps success count:{}", count);
             page++;
@@ -1131,19 +1180,36 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
         SysConfig teacherAvatar = sysConfigService.findByParamName(SysConfigService.TEACHER_DEFAULT_AVATAR);
         List<String[]> teachers = getTeachers(page, rows);
         while (!teachers.isEmpty()) {
-            teachers.parallelStream().forEach(next -> {
+            for (String[] next : teachers) {
                 String avatar = next[2];
                 if (StringUtils.isEmpty(avatar)) {
                     avatar = teacherAvatar.getParanValue();
                 }
-                register(next[0], next[1], avatar);
-            });
+                String finalAvatar = avatar;
+                executorService.execute(() -> {
+                    register(next[0], next[1], finalAvatar);
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        log.error("线程等待异常", e);
+                    }
+                });
+            }
             count += teachers.size();
             log.info("import im teacher success count:{}", count);
             page++;
             teachers = getTeachers(page, rows);
         }
 
+        while (executorService.getQueue().size() > 0) {
+            try {
+                log.info("线程池中正在执行的任务数量:{},等待数量:{}", executorService.getActiveCount(),executorService.getQueue().size());
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                log.error("线程等待异常", e);
+            }
+        }
+        executorService.shutdown();
         log.info("------------------------------- import user success ---------------------------");
     }
 
@@ -1226,7 +1292,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
     public void importInfo(List<HistoryMessage> info) throws Exception {
 
         for (HistoryMessage i : info) {//判断消息类型
-            log.info("消息导入 HistoryMessage:{}", JSON.toJSONString(i));
+//            log.info("消息导入 HistoryMessage:{}", JSON.toJSONString(i));
             try {
                 Integer type = i.getTargetType();
                 if (type == 1) {
@@ -1336,7 +1402,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
                         //为已导入数据更改标识
                         updateStatus(i,1);
                         historyMessageService.updateMsg(i.getMsgUID(),i.getMsgSeq(),i.getMsgRandom(),i.getMsgTimeStamp());
-                        log.info("导入私聊消息成功:{}", i);
+                        log.info("导入私聊消息成功");
                     } catch (Exception e) {
                         updateStatus(i,2);
                         log.error("导入私聊IM消息失败 msg:{},entity:{}", list, i, e);
@@ -1413,7 +1479,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
                         imPluginContext.getPluginService().importGroupMessage(groupImportMessage);
                         //为已导入数据更改标识
                         updateStatus(i,1);
-                        log.info("导入消息成功:{}", i);
+                        log.info("导入消息成功");
                     } catch (Exception e) {
                         updateStatus(i,2);
                         log.error("导入群组IM消息失败 msg:{},entity:{}", list, i, e);