瀏覽代碼

群导入

liujc 1 年之前
父節點
當前提交
ce761e920a

+ 4 - 2
mec-application/src/main/java/com/ym/mec/web/controller/ImGroupController.java

@@ -322,9 +322,11 @@ public class ImGroupController extends BaseController {
 
 	@GetMapping("/groupTransfer")
 	public Object groupTransfer(@RequestParam(required = false) String startTime,
-								@RequestParam(required = false) String endTime) {
+								@RequestParam(required = false) String endTime,
+								@RequestParam(required = false) Integer type
+                                ) {
 		ExecutorService executor = Executors.newSingleThreadExecutor();
-		executor.execute(() -> imGroupService.groupTransfer(startTime, endTime));
+		executor.execute(() -> imGroupService.groupTransfer(startTime, endTime,type));
 		return succeed();
 	}
 

+ 1 - 1
mec-biz/src/main/java/com/ym/mec/biz/service/ImGroupService.java

@@ -181,7 +181,7 @@ public interface ImGroupService extends BaseService<String, ImGroup> {
 	/**
 	 * 群迁移
 	 */
-	void groupTransfer(String startTime,String endTime);
+	void groupTransfer(String startTime,String endTime,Integer type);
 
 	/**
 	 * 导入用户

+ 45 - 41
mec-biz/src/main/java/com/ym/mec/biz/service/impl/ImGroupServiceImpl.java

@@ -890,10 +890,14 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
      * 群迁移
      */
     @Override
-    public void groupTransfer(String startTime, String endTime) {
+    public void groupTransfer(String startTime, String endTime,Integer type) {
 
         // 100个线程的无界线程池
-        ExecutorService executorService = Executors.newFixedThreadPool(100);
+
+        // 100个线程的无界线程池
+        ThreadPoolExecutor executorService =  new ThreadPoolExecutor(100, 100,
+            0L, TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>());
         int page = 1;
         int size = 100;
         QueryInfo queryInfo = new QueryInfo();
@@ -927,54 +931,54 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
             } catch (ParseException e) {
                 throw new BizException("时间区间参数错误,格式为:yyyy-MM-dd");
             }
-            List<Future<?>> list = new ArrayList<>();
 
-            rows.forEach(imGroup -> {
+            if (type == 1) {
+                rows.forEach(imGroup -> {
+
+                    executorService.submit(() -> {
+                        log.info("群销毁开始:{}", imGroup.getId());
+                        // 先删除群组
+                        try {
+                            // 解散群
+                            imPluginContext.getPluginService().groupDismiss(imGroup.getId(), new ArrayList<>());
+                        } catch (Exception e) {
+                            log.error(String.format("群迁移删除群聊失败:%s", e.getMessage()), e);
+                        }
+                        try {
+                            Thread.sleep(1000);
+                        } catch (InterruptedException e) {
+                            log.error("线程等待异常", e);
+                        }
+                    });
+                });
+                page ++;
+            } else {
 
-                list.add(executorService.submit(() -> {
-                    log.info("群销毁开始:{}", imGroup.getId());
-                    // 先删除群组
-                    try {
-                        // 解散群
-                        imPluginContext.getPluginService().groupDismiss(imGroup.getId(), new ArrayList<>());
-                    } catch (Exception e) {
-                        log.error(String.format("群迁移删除群聊失败:%s", e.getMessage()), e);
-                    }
-                }));
-            });
-            for (Future<?> future : list) {
-                try {
-                    future.get();
-                } catch (Exception e){
-                    log.error("群销毁失败",e);
+                for (ImGroup row : rows) {
+                    executorService.execute(() -> {
+                        groupTransfer(Lists.newArrayList(row));
+                        try {
+                            Thread.sleep(1000);
+                        } catch (InterruptedException e) {
+                            log.error("线程等待异常", e);
+                        }
+                    });
                 }
-            }
 
-            try {
-                Thread.sleep(1500);
-            } catch (InterruptedException e) {
-                log.error("线程休眠失败",e);
-            }
-
-
-            list.clear();
-            for (ImGroup row : rows) {
-                list.add(executorService.submit(() -> {
-                    groupTransfer(Lists.newArrayList(row));
-                }));
-            }
-
-            for (Future<?> future : list) {
-                try {
-                    future.get();
-                } catch (Exception e){
-                    log.error("群迁移失败",e);
-                }
             }
             importImGroupCount += rows.size();
             log.info("------------------------------- import im group --------------------------------------------");
             log.info("import im group success count:{}/{}", importImGroupCount, total);
         }
+
+        while (executorService.getQueue().size() > 0) {
+            try {
+                log.info("线程池中正在执行的任务数量:{},等待数量:{}", executorService.getActiveCount(),executorService.getQueue().size());
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                log.error("线程等待异常", e);
+            }
+        }
         log.info("-------------------- import im group finished and success! -------------------------------");
         executorService.shutdown();
     }