liujc 1 год назад
Родитель
Сommit
798add2aa5

+ 88 - 64
mec-application/src/main/java/com/ym/mec/web/controller/ImHistoryMessageController.java

@@ -1,5 +1,6 @@
 package com.ym.mec.web.controller;
 
+import com.google.common.collect.Lists;
 import com.yonge.log.wrapper.HistoryMessageTencentWrapper;
 import com.ym.mec.biz.service.ImGroupService;
 import com.ym.mec.biz.service.UploadFileService;
@@ -20,6 +21,8 @@ import org.springframework.web.bind.annotation.*;
 
 import java.io.File;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 @RequestMapping("${app-config.url.web:}/imHistoryMessage")
 @Api(tags = "系统操作日志")
@@ -27,93 +30,114 @@ import java.util.List;
 @RestController
 public class ImHistoryMessageController extends BaseController {
 
-	@Autowired
-	private HistoryMessageService historyMessageService;
-	@Autowired
-	private UploadFileService uploadFileService;
+    @Autowired
+    private HistoryMessageService historyMessageService;
+    @Autowired
+    private UploadFileService uploadFileService;
 
-	@Autowired
-	private ImGroupService imGroupService;
+    @Autowired
+    private ImGroupService imGroupService;
 
     @Autowired
     private HistoryMessageTenantService historyMessageTenantService;
 
-	@GetMapping("/save")
-	public Object saveImHistoryMessage(String fileDir) throws Exception {
-		File file = new File(fileDir);
-		UploadReturnBean uploadReturnBean = uploadFileService.uploadImHistoryMsgFile(file);
-		historyMessageService.saveImHistoryMessage(file);
-		return succeed();
-	}
+    @GetMapping("/save")
+    public Object saveImHistoryMessage(String fileDir) throws Exception {
+        File file = new File(fileDir);
+        UploadReturnBean uploadReturnBean = uploadFileService.uploadImHistoryMsgFile(file);
+        historyMessageService.saveImHistoryMessage(file);
+        return succeed();
+    }
+
 
+    @ApiOperation("IM导入消息")
+    @PostMapping(value = "/ImportIM")
+    public void ImportIM() {
 
-	@ApiOperation("IM导入消息")
-	@PostMapping(value = "/ImportIM")
-	public void ImportIM() throws Exception {
-		com.yonge.mongodb.PageInfo<HistoryMessage> info;
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        executor.execute(() -> {
+            com.yonge.mongodb.PageInfo<HistoryMessage> info;
 //        historyMessageService.updateAllStatus(0);
-		//计算总数据量
-		int count = imGroupService.queryInfoCount();
-		//计算调用次数
-		int num = (int) Math.ceil(count / 100);
-		for (int i = 0; i <=num ; i++) {
-			int size =100;
-			//获取融云消息
-			//List<ImGroup> list = imGroupService.lambdaQuery().last("limit "+(page-1)*size+","+size).list();
-			info = imGroupService.getRongYunInfo(1,size);
-			List<HistoryMessage> footer = info.getRows();
-            if (CollectionUtils.isEmpty(footer)) {
-                break;
+            //计算总数据量
+            int count = imGroupService.queryInfoCount();
+            //计算调用次数
+            int num = (int) Math.ceil(count / 100);
+            for (int i = 0; i <= num; i++) {
+                int size = 100;
+                //获取融云消息
+                //List<ImGroup> list = imGroupService.lambdaQuery().last("limit "+(page-1)*size+","+size).list();
+                info = imGroupService.getRongYunInfo(1, size);
+                List<HistoryMessage> footer = info.getRows();
+                if (CollectionUtils.isEmpty(footer)) {
+                    break;
+                }
+                //IM导入
+                try {
+                    imGroupService.importInfo(footer);
+                } catch (Exception e) {
+                    log.error("IM导入失败",e);
+                }
             }
-			//IM导入
-			imGroupService.importInfo(footer);
-		}
-	}
+        });
+    }
 
     @ApiOperation("融云im To 腾讯")
     @PostMapping(value = "/imToTencent")
     public void imToTencent() throws Exception {
 
-        // 删除旧数据
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        executor.execute(() -> {
+            // 删除旧数据
 //        historyMessageTenantService.deleteOld();
 
-        com.yonge.mongodb.PageInfo<HistoryMessage> info;
+            com.yonge.mongodb.PageInfo<HistoryMessage> info;
 //        historyMessageService.updateAllStatus(0);
-        //计算总数据量
-        long count = historyMessageService.querySyncCount();
-        log.info("总数据量:"+count);
-        //计算调用次数
-        long num = (int) Math.ceil(count / 100);
-        for (long i = 0; i <=num ; i++) {
-            int size =100;
-            //获取融云消息
-            //List<ImGroup> list = imGroupService.lambdaQuery().last("limit "+(page-1)*size+","+size).list();
-            info = historyMessageService.getImToTencent(1,size);
-            List<HistoryMessage> footer = info.getRows();
-            if (CollectionUtils.isEmpty(footer)) {
-                break;
+            //计算总数据量
+            long count = historyMessageService.querySyncCount();
+            log.info("总数据量:"+count);
+            //计算调用次数
+            long num = (int) Math.ceil(count / 100);
+            for (long i = 0; i <=num ; i++) {
+                int size = 100;
+                //获取融云消息
+                //List<ImGroup> list = imGroupService.lambdaQuery().last("limit "+(page-1)*size+","+size).list();
+                info = historyMessageService.getImToTencent(1, size);
+                List<HistoryMessage> footer = info.getRows();
+                if (CollectionUtils.isEmpty(footer)) {
+                    break;
+                }
+                //IM转换
+                Lists.partition(footer,10).parallelStream().forEach(historyMessages -> {
+                    try {
+                        imGroupService.imToTencent(footer);
+                    } catch (Exception e) {
+                        log.error("IM转换失败",e);
+                    }
+                });
+                log.info("已转换:" + (i + 1) * size);
             }
-            //IM转换
-            imGroupService.imToTencent(footer);
-            log.info("已转换:"+(i+1)*size);
-        }
+        });
     }
 
     @ApiOperation("初始化消息体")
     @PostMapping(value = "/initMsgBodyJson")
     public void initMsgBodyJson() throws Exception {
-        //计算总数据量
-        int count = historyMessageTenantService.initMsgBodyJsonCount();
-        log.info("总数据量:"+count);
-        //计算调用次数
-        int num = (int) Math.ceil(count / 100);
-        for (int i = 0; i <=num ; i++) {
-            int size =100;
-            //获取融云消息
-            //List<ImGroup> list = imGroupService.lambdaQuery().last("limit "+(page-1)*size+","+size).list();
-            historyMessageTenantService.initMsgBodyJson(1,size);
-            log.info("已转换:"+(i+1)*size);
-        }
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        executor.execute(() -> {
+            //计算总数据量
+            int count = historyMessageTenantService.initMsgBodyJsonCount();
+            log.info("总数据量:" + count);
+            //计算调用次数
+            int num = (int) Math.ceil(count / 100);
+            for (int i = 0; i <= num; i++) {
+                int size = 100;
+                //获取融云消息
+                //List<ImGroup> list = imGroupService.lambdaQuery().last("limit "+(page-1)*size+","+size).list();
+                historyMessageTenantService.initMsgBodyJson(1, size);
+                log.info("已转换:" + (i + 1) * size);
+            }
+        });
     }
 
     @ApiOperation("历史聊天记录查询")

+ 11 - 11
mec-biz/src/main/java/com/ym/mec/biz/service/impl/ImGroupServiceImpl.java

@@ -924,7 +924,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
             } catch (ParseException e) {
                 throw new BizException("时间区间参数错误,格式为:yyyy-MM-dd");
             }
-            groupTransfer(rows);
+            Lists.partition(rows,20).parallelStream().forEach(this::groupTransfer);
             importImGroupCount += rows.size();
             log.info("------------------------------- import im group --------------------------------------------");
             log.info("import im group success count:{}/{}", importImGroupCount, total);
@@ -1064,7 +1064,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
     @Override
     public void importUser() {
         int page = 1;
-        int rows = 200;
+        int rows = 100;
 
         int count = 0;
 
@@ -1072,13 +1072,13 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
         List<String[]> student = getStudent(page, rows);
         SysConfig studentAvatar = sysConfigService.findByParamName(SysConfigService.STUDENT_DEFAULT_AVATAR);
         while (!student.isEmpty()) {
-            for (String[] next : student) {
+            student.parallelStream().forEach(next -> {
                 String avatar = next[2];
                 if (StringUtils.isEmpty(avatar)) {
                     avatar = studentAvatar.getParanValue();
                 }
                 register(next[0], next[1], avatar);
-            }
+            });
             count += student.size();
             log.info("import im student success count:{}", count);
             page++;
@@ -1092,13 +1092,13 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
         SysConfig staffAvatar = sysConfigService.findByParamName(SysConfigService.USER_DEFAULT_AVATAR);
         List<String[]> staff = getStaff(page, rows);
         while (!staff.isEmpty()) {
-            for (String[] next : staff) {
+            staff.parallelStream().forEach(next -> {
                 String avatar = next[2];
                 if (StringUtils.isEmpty(avatar)) {
                     avatar = staffAvatar.getParanValue();
                 }
                 register(next[0], next[1], avatar);
-            }
+            });
             count += staff.size();
             log.info("import im staff success count:{}", count);
             page++;
@@ -1112,13 +1112,13 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
         SysConfig empAvatar = sysConfigService.findByParamName(SysConfigService.TEACHER_DEFAULT_AVATAR);
         List<String[]> emps = getEmp(page, rows);
         while (!emps.isEmpty()) {
-            for (String[] next : emps) {
+            emps.parallelStream().forEach(next -> {
                 String avatar = next[2];
                 if (StringUtils.isEmpty(avatar)) {
                     avatar = empAvatar.getParanValue();
                 }
                 register(next[0], next[1], avatar);
-            }
+            });
             count += emps.size();
             log.info("import im emps success count:{}", count);
             page++;
@@ -1131,13 +1131,13 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
         SysConfig teacherAvatar = sysConfigService.findByParamName(SysConfigService.TEACHER_DEFAULT_AVATAR);
         List<String[]> teachers = getTeachers(page, rows);
         while (!teachers.isEmpty()) {
-            for (String[] next : teachers) {
+            teachers.parallelStream().forEach(next -> {
                 String avatar = next[2];
                 if (StringUtils.isEmpty(avatar)) {
                     avatar = teacherAvatar.getParanValue();
                 }
                 register(next[0], next[1], avatar);
-            }
+            });
             count += teachers.size();
             log.info("import im teacher success count:{}", count);
             page++;
@@ -1579,7 +1579,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
             String content = jsonObject.getString("content");
 
             // http开头的链接 扩展字段没有值 放在扩展字段里
-            if (content.startsWith("http")) {
+            if (StringUtils.isNotBlank(content) && content.startsWith("http")) {
                 String extra = jsonObject.getString("extra");
                 if (StringUtils.isEmpty(extra)) {
                     jsonObject.put("extra", content);