|  | @@ -78,6 +78,7 @@ import java.time.LocalDateTime;
 | 
	
		
			
				|  |  |  import java.time.ZoneId;
 | 
	
		
			
				|  |  |  import java.time.format.DateTimeFormatter;
 | 
	
		
			
				|  |  |  import java.util.*;
 | 
	
		
			
				|  |  | +import java.util.concurrent.*;
 | 
	
		
			
				|  |  |  import java.util.function.Function;
 | 
	
		
			
				|  |  |  import java.util.stream.Collectors;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -891,8 +892,11 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
 | 
	
		
			
				|  |  |       */
 | 
	
		
			
				|  |  |      @Override
 | 
	
		
			
				|  |  |      public void groupTransfer(String startTime, String endTime) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 100个线程的无界线程池
 | 
	
		
			
				|  |  | +        ExecutorService executorService = Executors.newFixedThreadPool(100);
 | 
	
		
			
				|  |  |          int page = 1;
 | 
	
		
			
				|  |  | -        int size = 100;
 | 
	
		
			
				|  |  | +        int size = 400;
 | 
	
		
			
				|  |  |          QueryInfo queryInfo = new QueryInfo();
 | 
	
		
			
				|  |  |          queryInfo.setPage(page);
 | 
	
		
			
				|  |  |          queryInfo.setRows(size);
 | 
	
	
		
			
				|  | @@ -901,7 +905,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
 | 
	
		
			
				|  |  |          SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
 | 
	
		
			
				|  |  |          int total = imGroupPageInfo.getTotal();
 | 
	
		
			
				|  |  |          int importImGroupCount = 0;
 | 
	
		
			
				|  |  | -        int num = (int) Math.ceil(total / 100);
 | 
	
		
			
				|  |  | +        int num = (int) Math.ceil(total / 400);
 | 
	
		
			
				|  |  |          for (int i = 0; i <=num ; i++) {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              imGroupPageInfo = this.queryPage(queryInfo);
 | 
	
	
		
			
				|  | @@ -924,12 +928,25 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
 | 
	
		
			
				|  |  |              } catch (ParseException e) {
 | 
	
		
			
				|  |  |                  throw new BizException("时间区间参数错误,格式为:yyyy-MM-dd");
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | -            Lists.partition(rows,10).parallelStream().forEach(this::groupTransfer);
 | 
	
		
			
				|  |  | +            List<Future<?>> list = new ArrayList<>();
 | 
	
		
			
				|  |  | +            for (List<ImGroup> imGroups : Lists.partition(rows, 20)) {
 | 
	
		
			
				|  |  | +                list.add(executorService.submit(() -> {
 | 
	
		
			
				|  |  | +                    groupTransfer(imGroups);
 | 
	
		
			
				|  |  | +                }));
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +            for (Future<?> future : list) {
 | 
	
		
			
				|  |  | +                try {
 | 
	
		
			
				|  |  | +                    future.get();
 | 
	
		
			
				|  |  | +                } catch (Exception e){
 | 
	
		
			
				|  |  | +                    log.error("群迁移失败",e);
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  |              importImGroupCount += rows.size();
 | 
	
		
			
				|  |  |              log.info("------------------------------- import im group --------------------------------------------");
 | 
	
		
			
				|  |  |              log.info("import im group success count:{}/{}", importImGroupCount, total);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |          log.info("-------------------- import im group finished and success! -------------------------------");
 | 
	
		
			
				|  |  | +        executorService.shutdown();
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      private void groupTransfer(List<ImGroup> records) {
 | 
	
	
		
			
				|  | @@ -1068,17 +1085,31 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          int count = 0;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +        // 100个线程的无界线程池
 | 
	
		
			
				|  |  | +        ThreadPoolExecutor executorService =  new ThreadPoolExecutor(100, 100,
 | 
	
		
			
				|  |  | +            0L, TimeUnit.MILLISECONDS,
 | 
	
		
			
				|  |  | +            new LinkedBlockingQueue<Runnable>());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |          log.info("------------------------------- import user student ---------------------------");
 | 
	
		
			
				|  |  |          List<String[]> student = getStudent(page, rows);
 | 
	
		
			
				|  |  |          SysConfig studentAvatar = sysConfigService.findByParamName(SysConfigService.STUDENT_DEFAULT_AVATAR);
 | 
	
		
			
				|  |  |          while (!student.isEmpty()) {
 | 
	
		
			
				|  |  | -            student.parallelStream().forEach(next -> {
 | 
	
		
			
				|  |  | +            for (String[] next : student) {
 | 
	
		
			
				|  |  |                  String avatar = next[2];
 | 
	
		
			
				|  |  |                  if (StringUtils.isEmpty(avatar)) {
 | 
	
		
			
				|  |  |                      avatar = studentAvatar.getParanValue();
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  | -                register(next[0], next[1], avatar);
 | 
	
		
			
				|  |  | -            });
 | 
	
		
			
				|  |  | +                String finalAvatar = avatar;
 | 
	
		
			
				|  |  | +                executorService.execute(() -> {
 | 
	
		
			
				|  |  | +                    register(next[0], next[1], finalAvatar);
 | 
	
		
			
				|  |  | +                    try {
 | 
	
		
			
				|  |  | +                        Thread.sleep(1000);
 | 
	
		
			
				|  |  | +                    } catch (InterruptedException e) {
 | 
	
		
			
				|  |  | +                        log.error("线程等待异常", e);
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                });
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |              count += student.size();
 | 
	
		
			
				|  |  |              log.info("import im student success count:{}", count);
 | 
	
		
			
				|  |  |              page++;
 | 
	
	
		
			
				|  | @@ -1092,13 +1123,23 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
 | 
	
		
			
				|  |  |          SysConfig staffAvatar = sysConfigService.findByParamName(SysConfigService.USER_DEFAULT_AVATAR);
 | 
	
		
			
				|  |  |          List<String[]> staff = getStaff(page, rows);
 | 
	
		
			
				|  |  |          while (!staff.isEmpty()) {
 | 
	
		
			
				|  |  | -            staff.parallelStream().forEach(next -> {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            for (String[] next : staff) {
 | 
	
		
			
				|  |  |                  String avatar = next[2];
 | 
	
		
			
				|  |  |                  if (StringUtils.isEmpty(avatar)) {
 | 
	
		
			
				|  |  |                      avatar = staffAvatar.getParanValue();
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  | -                register(next[0], next[1], avatar);
 | 
	
		
			
				|  |  | -            });
 | 
	
		
			
				|  |  | +                String finalAvatar = avatar;
 | 
	
		
			
				|  |  | +                executorService.execute(() -> {
 | 
	
		
			
				|  |  | +                    register(next[0], next[1], finalAvatar);
 | 
	
		
			
				|  |  | +                    try {
 | 
	
		
			
				|  |  | +                        Thread.sleep(1000);
 | 
	
		
			
				|  |  | +                    } catch (InterruptedException e) {
 | 
	
		
			
				|  |  | +                        log.error("线程等待异常", e);
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                });
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |              count += staff.size();
 | 
	
		
			
				|  |  |              log.info("import im staff success count:{}", count);
 | 
	
		
			
				|  |  |              page++;
 | 
	
	
		
			
				|  | @@ -1112,13 +1153,21 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
 | 
	
		
			
				|  |  |          SysConfig empAvatar = sysConfigService.findByParamName(SysConfigService.TEACHER_DEFAULT_AVATAR);
 | 
	
		
			
				|  |  |          List<String[]> emps = getEmp(page, rows);
 | 
	
		
			
				|  |  |          while (!emps.isEmpty()) {
 | 
	
		
			
				|  |  | -            emps.parallelStream().forEach(next -> {
 | 
	
		
			
				|  |  | +            for (String[] next : emps) {
 | 
	
		
			
				|  |  |                  String avatar = next[2];
 | 
	
		
			
				|  |  |                  if (StringUtils.isEmpty(avatar)) {
 | 
	
		
			
				|  |  |                      avatar = empAvatar.getParanValue();
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  | -                register(next[0], next[1], avatar);
 | 
	
		
			
				|  |  | -            });
 | 
	
		
			
				|  |  | +                String finalAvatar = avatar;
 | 
	
		
			
				|  |  | +                executorService.execute(() -> {
 | 
	
		
			
				|  |  | +                    register(next[0], next[1], finalAvatar);
 | 
	
		
			
				|  |  | +                    try {
 | 
	
		
			
				|  |  | +                        Thread.sleep(1000);
 | 
	
		
			
				|  |  | +                    } catch (InterruptedException e) {
 | 
	
		
			
				|  |  | +                        log.error("线程等待异常", e);
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                });
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  |              count += emps.size();
 | 
	
		
			
				|  |  |              log.info("import im emps success count:{}", count);
 | 
	
		
			
				|  |  |              page++;
 | 
	
	
		
			
				|  | @@ -1131,19 +1180,36 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
 | 
	
		
			
				|  |  |          SysConfig teacherAvatar = sysConfigService.findByParamName(SysConfigService.TEACHER_DEFAULT_AVATAR);
 | 
	
		
			
				|  |  |          List<String[]> teachers = getTeachers(page, rows);
 | 
	
		
			
				|  |  |          while (!teachers.isEmpty()) {
 | 
	
		
			
				|  |  | -            teachers.parallelStream().forEach(next -> {
 | 
	
		
			
				|  |  | +            for (String[] next : teachers) {
 | 
	
		
			
				|  |  |                  String avatar = next[2];
 | 
	
		
			
				|  |  |                  if (StringUtils.isEmpty(avatar)) {
 | 
	
		
			
				|  |  |                      avatar = teacherAvatar.getParanValue();
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  | -                register(next[0], next[1], avatar);
 | 
	
		
			
				|  |  | -            });
 | 
	
		
			
				|  |  | +                String finalAvatar = avatar;
 | 
	
		
			
				|  |  | +                executorService.execute(() -> {
 | 
	
		
			
				|  |  | +                    register(next[0], next[1], finalAvatar);
 | 
	
		
			
				|  |  | +                    try {
 | 
	
		
			
				|  |  | +                        Thread.sleep(1000);
 | 
	
		
			
				|  |  | +                    } catch (InterruptedException e) {
 | 
	
		
			
				|  |  | +                        log.error("线程等待异常", e);
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                });
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  |              count += teachers.size();
 | 
	
		
			
				|  |  |              log.info("import im teacher success count:{}", count);
 | 
	
		
			
				|  |  |              page++;
 | 
	
		
			
				|  |  |              teachers = getTeachers(page, rows);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +        while (executorService.getQueue().size() > 0) {
 | 
	
		
			
				|  |  | +            try {
 | 
	
		
			
				|  |  | +                log.info("线程池中正在执行的任务数量:{},等待数量:{}", executorService.getActiveCount(),executorService.getQueue().size());
 | 
	
		
			
				|  |  | +                Thread.sleep(1000);
 | 
	
		
			
				|  |  | +            } catch (InterruptedException e) {
 | 
	
		
			
				|  |  | +                log.error("线程等待异常", e);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        executorService.shutdown();
 | 
	
		
			
				|  |  |          log.info("------------------------------- import user success ---------------------------");
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -1226,7 +1292,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
 | 
	
		
			
				|  |  |      public void importInfo(List<HistoryMessage> info) throws Exception {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          for (HistoryMessage i : info) {//判断消息类型
 | 
	
		
			
				|  |  | -            log.info("消息导入 HistoryMessage:{}", JSON.toJSONString(i));
 | 
	
		
			
				|  |  | +//            log.info("消息导入 HistoryMessage:{}", JSON.toJSONString(i));
 | 
	
		
			
				|  |  |              try {
 | 
	
		
			
				|  |  |                  Integer type = i.getTargetType();
 | 
	
		
			
				|  |  |                  if (type == 1) {
 | 
	
	
		
			
				|  | @@ -1336,7 +1402,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
 | 
	
		
			
				|  |  |                          //为已导入数据更改标识
 | 
	
		
			
				|  |  |                          updateStatus(i,1);
 | 
	
		
			
				|  |  |                          historyMessageService.updateMsg(i.getMsgUID(),i.getMsgSeq(),i.getMsgRandom(),i.getMsgTimeStamp());
 | 
	
		
			
				|  |  | -                        log.info("导入私聊消息成功:{}", i);
 | 
	
		
			
				|  |  | +                        log.info("导入私聊消息成功");
 | 
	
		
			
				|  |  |                      } catch (Exception e) {
 | 
	
		
			
				|  |  |                          updateStatus(i,2);
 | 
	
		
			
				|  |  |                          log.error("导入私聊IM消息失败 msg:{},entity:{}", list, i, e);
 | 
	
	
		
			
				|  | @@ -1413,7 +1479,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
 | 
	
		
			
				|  |  |                          imPluginContext.getPluginService().importGroupMessage(groupImportMessage);
 | 
	
		
			
				|  |  |                          //为已导入数据更改标识
 | 
	
		
			
				|  |  |                          updateStatus(i,1);
 | 
	
		
			
				|  |  | -                        log.info("导入消息成功:{}", i);
 | 
	
		
			
				|  |  | +                        log.info("导入消息成功");
 | 
	
		
			
				|  |  |                      } catch (Exception e) {
 | 
	
		
			
				|  |  |                          updateStatus(i,2);
 | 
	
		
			
				|  |  |                          log.error("导入群组IM消息失败 msg:{},entity:{}", list, i, e);
 |