Browse Source

消息导入

liujc 1 năm trước cách đây
mục cha
commit
d454e0f24f

+ 7 - 2
mec-application/src/main/java/com/ym/mec/web/controller/ImHistoryMessageController.java

@@ -10,6 +10,7 @@ import com.yonge.log.dal.model.HistoryMessage;
 import com.yonge.log.service.HistoryMessageService;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
+import org.apache.commons.collections.CollectionUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
@@ -45,16 +46,20 @@ public class ImHistoryMessageController extends BaseController {
 	@PostMapping(value = "/ImportIM")
 	public void ImportIM() throws Exception {
 		com.yonge.mongodb.PageInfo<HistoryMessage> info;
+//        historyMessageService.updateAllStatus(0);
 		//计算总数据量
 		int count = imGroupService.queryInfoCount();
 		//计算调用次数
 		int num = (int) Math.ceil(count / 100);
-		for (int i = 1; i <=num ; i++) {
+		for (int i = 0; i <=num ; i++) {
 			int size =100;
 			//获取融云消息
 			//List<ImGroup> list = imGroupService.lambdaQuery().last("limit "+(page-1)*size+","+size).list();
 			info = imGroupService.getRongYunInfo(1,size);
-			List<HistoryMessage> footer = info.getFooter();
+			List<HistoryMessage> footer = info.getRows();
+            if (CollectionUtils.isEmpty(footer)) {
+                break;
+            }
 			//IM导入
 			imGroupService.importInfo(footer);
 		}

+ 3 - 3
mec-biz/src/main/java/com/ym/mec/biz/service/im/impl/ImGroupCoreServiceImpl.java

@@ -122,9 +122,9 @@ public class ImGroupCoreServiceImpl implements ImGroupCoreService {
      */
     @Override
     public String analysisImUserId(String imUserId) {
-        if (StringUtils.isNotBlank(imConfig.getAppPrefix()) && imUserId.startsWith(imConfig.getAppPrefix())) {
-            return imUserId.replace(imConfig.getAppPrefix() + "_", "").split("_")[0];
-        }
+//        if (StringUtils.isNotBlank(imConfig.getAppPrefix()) && imUserId.startsWith(imConfig.getAppPrefix())) {
+//            return imUserId.replace(imConfig.getAppPrefix() + "_", "").split("_")[0];
+//        }
         return imUserId;
     }
 

+ 11 - 5
mec-biz/src/main/java/com/ym/mec/biz/service/impl/ImGroupServiceImpl.java

@@ -67,6 +67,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.joda.time.DateTime;
+import org.joda.time.LocalDateTime;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
@@ -1183,7 +1184,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
     public void importInfo(List<HistoryMessage> info) throws Exception {
 
         for (HistoryMessage i : info) {//判断消息类型
-            log.info("消息导入:{}", i);
+            log.info("消息导入 HistoryMessage:{}", JSON.toJSONString(i));
             try {
                 Integer type = i.getTargetType();
                 if (type == 1) {
@@ -1295,6 +1296,9 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
                         TencentRequest.MessageBody body2 = getTimImageElem(jsonObject.getString("imageUri"));
                         list.add(body1);
                         list.add(body2);
+                    } else {
+                        updateStatus(i,1);
+                        continue;
                     }
 
                /* body.setMsgType(i.getClassname());
@@ -1320,6 +1324,8 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
                 }*/
                     privateImportMessage.setTencentMessageBody(list);
                     privateImportMessage.setCloudCustomData(jsonObject.getString("extra"));
+
+                    log.info("导入私聊消息:{}", JSON.toJSONString(privateImportMessage));
                     try {
                         imPluginContext.getPluginService().importPrivateMessage(privateImportMessage);
                         //为已导入数据更改标识
@@ -1435,6 +1441,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
                         int conversationType = jsonObject.getInteger("conversationType");
                         getTimRelayElem();
                     } else {
+                        updateStatus(i,1);
                         continue;
                     }
                 /*body.setMsgContent(i.getContent());
@@ -1454,6 +1461,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
                     list.add(data1);
                     //导入消息列表
                     groupImportMessage.setMsgList(list);
+                    log.info("导入群聊消息:{}", JSON.toJSONString(groupImportMessage));
                     try {
                         imPluginContext.getPluginService().importGroupMessage(groupImportMessage);
                         //为已导入数据更改标识
@@ -1516,9 +1524,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
      * @param info
      */
     public void updateStatus(HistoryMessage info,int status) {
-        HistoryMessage historyMessage = new HistoryMessage();
-        historyMessage.setStatus(status);
-        historyMessageDao.update(info.getMsgUID(), historyMessage);
+        historyMessageService.updateStatus(info.getMsgUID(), status);
     }
 
 
@@ -1715,7 +1721,7 @@ public class ImGroupServiceImpl extends BaseServiceImpl<String, ImGroup> impleme
     //url路径改造
     private static String urlTypeChange(String url){
         if (url.isEmpty()){
-            throw new BizException("链接地址为空!!!");
+            log.warn("url为空");
         }
         String replace;
         if (url.startsWith("https://ks3-cn-beijing.ksyuncs.com")){

+ 5 - 0
mec-common/audit-log/src/main/java/com/yonge/log/service/HistoryMessageService.java

@@ -5,6 +5,7 @@ import com.yonge.mongodb.PageInfo;
 import com.yonge.mongodb.service.BaseServiceWithMongo;
 
 import java.io.File;
+import java.time.LocalDateTime;
 import java.util.Date;
 
 public interface HistoryMessageService extends BaseServiceWithMongo<String, HistoryMessage> {
@@ -19,4 +20,8 @@ public interface HistoryMessageService extends BaseServiceWithMongo<String, Hist
     long selectCount(int status, Date date);
 
     PageInfo<HistoryMessage> selectPage(int page, int size, int status, Date date);
+
+    void updateStatus(String id, Integer status);
+
+    void updateAllStatus(int status);
 }

+ 42 - 2
mec-common/audit-log/src/main/java/com/yonge/log/service/impl/HistoryMessageServiceImpl.java

@@ -1,6 +1,7 @@
 package com.yonge.log.service.impl;
 
 import com.alibaba.fastjson.JSONObject;
+import com.mongodb.operation.AggregateOperation;
 import com.yonge.log.dal.dao.HistoryMessageDao;
 import com.yonge.log.dal.model.HistoryMessage;
 import com.yonge.log.service.HistoryMessageService;
@@ -9,8 +10,10 @@ import com.yonge.mongodb.dao.BaseDaoWithMongo;
 import com.yonge.mongodb.service.impl.BaseServiceImplWithMongo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.data.mongodb.core.aggregation.Aggregation;
 import org.springframework.data.mongodb.core.query.Criteria;
 import org.springframework.data.mongodb.core.query.Query;
+import org.springframework.data.mongodb.core.query.Update;
 import org.springframework.stereotype.Service;
 
 import java.io.BufferedReader;
@@ -25,6 +28,10 @@ import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.*;
 import java.util.stream.Collectors;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
@@ -60,7 +67,14 @@ public class HistoryMessageServiceImpl extends BaseServiceImplWithMongo<String,
 
         // 查询status 为null or 0
         // 查询时间大于date
-        Criteria criteria = Criteria.where("dateTime").gt(date).orOperator(Criteria.where("status").is(0),Criteria.where("status").is(null));
+
+        // date 转 LocalDateTime
+        LocalDateTime ldt = LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault());
+
+        String dateFormat = ldt.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
+
+
+        Criteria criteria = Criteria.where("dateTime").gt(dateFormat).orOperator(Criteria.where("status").is(0),Criteria.where("status").is(null));
         query.addCriteria(criteria);
         return mongoTemplate.count(query,HistoryMessage.class);
     }
@@ -69,13 +83,17 @@ public class HistoryMessageServiceImpl extends BaseServiceImplWithMongo<String,
     public PageInfo<HistoryMessage> selectPage(int page, int size, int status, Date date) {
         PageInfo<HistoryMessage> pageInfo = new PageInfo<HistoryMessage>(page, size);
 
+        // date 转 LocalDateTime
+        LocalDateTime ldt = LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault());
+
+        String dateFormat = ldt.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
         List<HistoryMessage> dataList = null;
         long total = this.selectCount(status,date);
         if (total > 0) {
             pageInfo.setTotal(total);
 
             Query query = new Query();
-            Criteria criteria = Criteria.where("dateTime").gt(date).orOperator(Criteria.where("status").is(0),Criteria.where("status").is(null));
+            Criteria criteria = Criteria.where("dateTime").gt(dateFormat).orOperator(Criteria.where("status").is(0),Criteria.where("status").is(null));
             query.addCriteria(criteria);
             query.skip(pageInfo.getOffset()).limit(pageInfo.getLimit());
             dataList = mongoTemplate.find(query, HistoryMessage.class);
@@ -87,6 +105,28 @@ public class HistoryMessageServiceImpl extends BaseServiceImplWithMongo<String,
         return pageInfo;
     }
 
+    @Override
+    public void updateStatus(String id, Integer status) {
+        Criteria criteria = Criteria.where("_id").is(id);
+        Query query = new Query(criteria);
+
+        Update update = new Update();
+        update.set("status", status); // 设置新的 status 值
+
+        mongoTemplate.updateMulti(query, update, HistoryMessage.class);
+    }
+
+    @Override
+    public void updateAllStatus(int status) {
+
+        Query query = new Query();
+
+        Update update = new Update();
+        update.set("status", status); // 设置新的 status 值
+
+        mongoTemplate.updateMulti(query, update, HistoryMessage.class);
+    }
+
 //	public static void main(String[] args) throws Exception {
 //		doNioReadFile1(new File("/Users/chenxiaoyu/Documents/77fe9ce6-7d91-4568-afe3-9e8ac351e87f.zip"));
 //	}