侧边栏壁纸
博主头像
MDZZW博主等级

曾经也是帅哥,如今只是肉多

  • 累计撰写 28 篇文章
  • 累计创建 26 个标签
  • 累计收到 7 条评论

Elasticsearch 集群迁移后数据逐行对比

MDZZW
2023-09-14 / 0 评论 / 1 点赞 / 625 阅读 / 2,178 字
温馨提示:
本文最后更新于 2023-09-14,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

来吧展示

这里两边ES没关 导致有点数据差异和数量对不上

image-1694665584825

image-1694665215626

image

image-1694665290370

注意事项

迁移数据的时候尽量将两边服务器防火墙打开, 避免写入新数据造成不必要的麻烦

生产者使用ES 滚动API遍历全部数据,
消费者获取队列中_id 和 调用新ES的API 做MD5对比
代码还有点小缺陷无伤大雅,思路可以借鉴,数据量大可以改造成多生产多消费者模型 自测 10分钟 107518 可以扫描完毕

POM依赖

<dependencies>
  <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.70</version></dependency>
  <dependency><groupId>commons-logging</groupId><artifactId>commons-logging</artifactId><version>1.2</version></dependency>
  <dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>6.8.2</version></dependency>
  <dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>6.8.2</version></dependency>
  <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.9.9</version></dependency>
  <dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency>
</dependencies>

ItemDataContrast.java

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.action.search.*;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ItemDataContrast {
    private final static String LEFT_IP ="迁移的ES机器IP";
    private final static int LEFT_PORT =新机器端口;
    private final static String LEFT_USER ="迁移的ES机器账号";
    private final static String LEFT_PWD ="迁移的ES机器密码";
    private static final String INDEX_NAME = "对比的索引";


    private final static String RIGHT_IP ="新机器IP";
    private final static int RIGHT_PORT =新机器端口;
    private final static String RIGHT_USER ="新机器账号";
    private final static String RIGHT_PWD ="新机器密码";
    
 	private static final int OFFSET =10000;
    private static  int leftTotalHits =0;
    private  static int countItem = 0;
    private static LinkedHashMap<String,Object> map = new LinkedHashMap<>();
    private static RestHighLevelClient leftClient;
    private static RestHighLevelClient rightclient;
    private static BlockingQueue<SearchHit> queue = new LinkedBlockingQueue<>();


    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒");

    public static void main(String[] args) {

        HttpHost httpHost = new HttpHost(LEFT_IP, LEFT_PORT);
        Header[] defaultHeaders = {new BasicHeader("charset", "utf-8"),new BasicHeader("content-type", "application/json")};

        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(LEFT_USER, LEFT_PWD));
        RestClientBuilder builder = RestClient.builder(httpHost).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(
                    HttpAsyncClientBuilder httpClientBuilder) {
                return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            }
        });
        builder.setDefaultHeaders(defaultHeaders);
        leftClient = new RestHighLevelClient(builder);

        // 创建生产者和消费者
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_开始核对");

        // 启动生产者和消费者线程
        Thread producerThread = new Thread(producer);
        producerThread.start();
        for (int i = 0;i<10;i++) {
            Thread consumerThread = new Thread(consumer);
            consumerThread.start();
        }


    }

    static class Producer implements Runnable {
        private  static int count = 1;
        @Override
        public void run() {
            try {
                SearchRequest searchRequest = new SearchRequest(INDEX_NAME);
                searchRequest.scroll(TimeValue.timeValueMinutes(1L));
                SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
                searchSourceBuilder.query(QueryBuilders.matchAllQuery());
                searchSourceBuilder.size(OFFSET);

                searchRequest.source(searchSourceBuilder);

                SearchResponse searchResponse = leftClient.search(searchRequest, RequestOptions.DEFAULT);

                leftTotalHits =  Integer.parseInt(String.valueOf(searchResponse.getHits().totalHits));
                FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+"_"+LEFT_IP+": 总数:"+leftTotalHits);

                String scrollId = searchResponse.getScrollId();
                SearchHit[] searchHits = searchResponse.getHits().getHits();
                FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_开始第"+count+"次遍历 生产者数据:"+OFFSET + "获取到的数据:"+searchHits.length);

                while (searchHits != null && searchHits.length > 0) {
                    count++;
                    for (SearchHit hit : searchHits) {
                        countItem++;
                        // 处理单个文档
                        queue.put(hit);
                    }

                    Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
                    SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
                    scrollRequest.scroll(scroll);
                    searchResponse = leftClient.scroll(scrollRequest, RequestOptions.DEFAULT);
                    scrollId = searchResponse.getScrollId();
                    searchHits = searchResponse.getHits().getHits();
                    FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_开始第"+count+"次遍历 生产者数据:"+OFFSET + "获取到的数据:"+searchHits.length);
                }
                // 清除滚动
                ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
                clearScrollRequest.addScrollId(scrollId);
                ClearScrollResponse clearScrollResponse = leftClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
                System.out.println("生产者结束");
                FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_生产者任务完成");
            } catch (Exception e) {
                FileUtil.writeFile("logs","ItemData.log",JSONObject.toJSONString(e));

                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
    }

    static class Consumer implements Runnable {
        static {
            HttpHost httpHost = new HttpHost(RIGHT_IP, RIGHT_PORT);
            Header[] defaultHeaders = {new BasicHeader("charset", "utf-8"),new BasicHeader("content-type", "application/json")};

            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(RIGHT_USER, RIGHT_PWD));
            RestClientBuilder builder = RestClient.builder(httpHost).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(
                        HttpAsyncClientBuilder httpClientBuilder) {
                    return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                }
            });
            builder.setDefaultHeaders(defaultHeaders);
            rightclient = new RestHighLevelClient(builder);
        }

        @Override
        public void run() {
            try {
                while (true) {

                    SearchHit hit = queue.take();

                    Map<String, Object> sourceAsMap = hit.getSourceAsMap();
                    SearchHit[] rightSearchHits = getData(rightclient,INDEX_NAME,"_id", hit.getId());
                    JSONArray jsonArray;
                    if(null==map.get("diff_ids")){
                        jsonArray = new JSONArray();
                    }else {
                        jsonArray = (JSONArray) map.get("diff_ids");
                    }

                    if(rightSearchHits==null || rightSearchHits.length==0){
                        if(null==map.get("diff_ids")){
                            jsonArray = new JSONArray();
                        }else {
                            jsonArray = (JSONArray) map.get("diff_ids");
                        }
                        JSONObject object =  new JSONObject();
                        object.put("id",hit.getId());
                        object.put("msg",RIGHT_IP+"无数据");
                        jsonArray.add(object);
                        map.put("diff_ids",jsonArray);
                        FileUtil.writeFile("logs","ItemDataError.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_核对数据错误:"+object.toJSONString());
                        //右边无数据直接跳过
                        continue;
                    }

                    String left = JSONObject.toJSONString(sourceAsMap);
                    String leftMd5 = md5(left);
                    boolean flag = false;
                    String right = "";
                    for (SearchHit searchHitR:rightSearchHits) {
                        right = JSONObject.toJSONString(searchHitR.getSourceAsMap());
                        String rightMd5 = md5(right);
                        if(leftMd5.equals(rightMd5)){
                            flag = true;
                            break;
                        }
                    }
                    if(!flag){
                        JSONObject diff = new JSONObject();
                        diff.put("id",hit.getId());
                        diff.put("msg",LEFT_IP+"_"+RIGHT_IP+"_MD5差异");
                        diff.put("left",left);
                        diff.put("right",right);
                        jsonArray.add(diff);
                        map.put("diff_ids",jsonArray);
                        FileUtil.writeFile("logs","ItemDataError.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_MD5差异:"+diff.toJSONString());

                    }


                    Map<String,Object> notDiffMap;
                    if(null==map.get("notDiff")){
                        notDiffMap = new HashMap<>();
                    }else {
                        notDiffMap = (Map<String, Object>) map.get("notDiff");
                    }

                    JSONObject notDiffOBJ =  new JSONObject();
                    notDiffOBJ.put("id",hit.getId());
                    notDiffOBJ.put("type",hit.getType());

                    notDiffMap.put(hit.getId()+"_"+hit.getType(),notDiffOBJ);
                    FileUtil.writeFile("logs","ItemDataSuccess.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"核对数据正确:"+notDiffOBJ.toJSONString());

                    map.put("notDiff",notDiffMap);
                    System.out.println(queue.size() + "___"+countItem+"___"+leftTotalHits);
                    if (queue.size()==0 ) {
                        FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_正确数据:"+notDiffMap.size());
                        FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_差异如下:"+jsonArray.size());
                        FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_数据完毕: 扫描行数"+countItem);
                        FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_数据完毕: 总行数"+leftTotalHits);
                        if(countItem==leftTotalHits){
                            FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_消费者完成对比");
                            break;
                        }
                    }

                }
                System.out.println("消费者结束");
            } catch (Exception e) {
                e.printStackTrace();
                FileUtil.writeFile("logs","ItemData.log",JSONObject.toJSONString(e));
                Thread.currentThread().interrupt();
            }
        }
        public static String md5(String message) {
            try {
                // 创建MD5消息摘要对象
                MessageDigest md = MessageDigest.getInstance("MD5");

                // 计算消息的摘要
                byte[] digest = md.digest(message.getBytes());

                // 将摘要转换为十六进制字符串
                String hexString = bytesToHex(digest);

                return hexString;
            } catch (NoSuchAlgorithmException e) {
                e.printStackTrace();
                FileUtil.writeFile("logs","ItemData.log",JSONObject.toJSONString(e));
            }
            return null;
        }
        public static String bytesToHex(byte[] bytes) {
            StringBuilder hexString = new StringBuilder();
            for (byte b : bytes) {
                String hex = Integer.toHexString(0xff & b);
                if (hex.length() == 1) {
                    hexString.append('0');
                }
                hexString.append(hex);
            }
            return hexString.toString();
        }
        public static SearchHit[] getData(RestHighLevelClient client,String indexName,String key,String value)  {
            //1 创建搜索文档请求
            SearchRequest searchRequest=new SearchRequest(indexName);  //请求索引
            SearchSourceBuilder builder = new SearchSourceBuilder();
            builder.query(QueryBuilders.matchQuery(key,value));
            searchRequest.source(builder);

            SearchHit[] hits=null;
            try{
                // 2 执行检索
                SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
                // 3 分析响应结果
                //遍历数据
                hits = response.getHits().getHits();
                return  hits;
            } catch (Exception e) {
                e.printStackTrace();
                FileUtil.writeFile("logs","ItemData.log",JSONObject.toJSONString(e));
            }
            return  hits;
        }

    }
}

FileUtil.java


import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class FileUtil {

    //创建文件夹
    public static boolean mkdir(String directory) {
        boolean flag = false;
        if (null==directory || directory.trim().length()==0) {
            directory = "D:" + File.separator + "usr" + File.separator + "log";
        }
        File file = new File(directory);
        if (!file.exists()) {
            flag = file.mkdirs();
        } else {
            flag = true;
        }
        return flag;
    }

    //指定目录下创建文件
    public static void writeFile(String directory, String fileName, String context) {
        File file = new File(directory, fileName);
        boolean isSuccess = false;
        if (!file.exists()) {
            try {
                isSuccess = file.createNewFile();
            } catch (IOException e) {
                boolean mkdir = mkdir(directory);
                if (mkdir) {
                    writeFile(directory, fileName, context);
                }
            }
            if (isSuccess) {
                write(directory, fileName, context);
            }
        } else {
            write(directory, fileName, context);
        }
    }

    //换行追加写文件
    public static void write(String directory, String fileName, String context) {
        try {
            FileWriter fileWriter = new FileWriter(directory + File.separator + fileName, true);
            BufferedWriter bufferedWriter = new BufferedWriter(fileWriter);

            SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss sss");
            //格式化
            Date nowtime = new Date();
            String strTime =sd.format(nowtime);
            bufferedWriter.write(strTime+":"+context + "\r\n");
            bufferedWriter.flush();
            bufferedWriter.close();
            fileWriter.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}
1

评论区