来吧展示
这里两边ES没关 导致有点数据差异和数量对不上
注意事项
迁移数据的时候尽量将两边服务器防火墙打开, 避免写入新数据造成不必要的麻烦
生产者使用ES 滚动API遍历全部数据,
消费者获取队列中_id 和 调用新ES的API 做MD5对比
代码还有点小缺陷无伤大雅,思路可以借鉴,数据量大可以改造成多生产多消费者模型 自测 10分钟 107518 可以扫描完毕
POM依赖
<dependencies>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.70</version></dependency>
<dependency><groupId>commons-logging</groupId><artifactId>commons-logging</artifactId><version>1.2</version></dependency>
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>6.8.2</version></dependency>
<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>6.8.2</version></dependency>
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.9.9</version></dependency>
<dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency>
</dependencies>
ItemDataContrast.java
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.action.search.*;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ItemDataContrast {
private final static String LEFT_IP ="迁移的ES机器IP";
private final static int LEFT_PORT =新机器端口;
private final static String LEFT_USER ="迁移的ES机器账号";
private final static String LEFT_PWD ="迁移的ES机器密码";
private static final String INDEX_NAME = "对比的索引";
private final static String RIGHT_IP ="新机器IP";
private final static int RIGHT_PORT =新机器端口;
private final static String RIGHT_USER ="新机器账号";
private final static String RIGHT_PWD ="新机器密码";
private static final int OFFSET =10000;
private static int leftTotalHits =0;
private static int countItem = 0;
private static LinkedHashMap<String,Object> map = new LinkedHashMap<>();
private static RestHighLevelClient leftClient;
private static RestHighLevelClient rightclient;
private static BlockingQueue<SearchHit> queue = new LinkedBlockingQueue<>();
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒");
public static void main(String[] args) {
HttpHost httpHost = new HttpHost(LEFT_IP, LEFT_PORT);
Header[] defaultHeaders = {new BasicHeader("charset", "utf-8"),new BasicHeader("content-type", "application/json")};
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(LEFT_USER, LEFT_PWD));
RestClientBuilder builder = RestClient.builder(httpHost).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
builder.setDefaultHeaders(defaultHeaders);
leftClient = new RestHighLevelClient(builder);
// 创建生产者和消费者
Producer producer = new Producer();
Consumer consumer = new Consumer();
FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_开始核对");
// 启动生产者和消费者线程
Thread producerThread = new Thread(producer);
producerThread.start();
for (int i = 0;i<10;i++) {
Thread consumerThread = new Thread(consumer);
consumerThread.start();
}
}
static class Producer implements Runnable {
private static int count = 1;
@Override
public void run() {
try {
SearchRequest searchRequest = new SearchRequest(INDEX_NAME);
searchRequest.scroll(TimeValue.timeValueMinutes(1L));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
searchSourceBuilder.size(OFFSET);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = leftClient.search(searchRequest, RequestOptions.DEFAULT);
leftTotalHits = Integer.parseInt(String.valueOf(searchResponse.getHits().totalHits));
FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+"_"+LEFT_IP+": 总数:"+leftTotalHits);
String scrollId = searchResponse.getScrollId();
SearchHit[] searchHits = searchResponse.getHits().getHits();
FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_开始第"+count+"次遍历 生产者数据:"+OFFSET + "获取到的数据:"+searchHits.length);
while (searchHits != null && searchHits.length > 0) {
count++;
for (SearchHit hit : searchHits) {
countItem++;
// 处理单个文档
queue.put(hit);
}
Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
scrollRequest.scroll(scroll);
searchResponse = leftClient.scroll(scrollRequest, RequestOptions.DEFAULT);
scrollId = searchResponse.getScrollId();
searchHits = searchResponse.getHits().getHits();
FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_开始第"+count+"次遍历 生产者数据:"+OFFSET + "获取到的数据:"+searchHits.length);
}
// 清除滚动
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
ClearScrollResponse clearScrollResponse = leftClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
System.out.println("生产者结束");
FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_生产者任务完成");
} catch (Exception e) {
FileUtil.writeFile("logs","ItemData.log",JSONObject.toJSONString(e));
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
static class Consumer implements Runnable {
static {
HttpHost httpHost = new HttpHost(RIGHT_IP, RIGHT_PORT);
Header[] defaultHeaders = {new BasicHeader("charset", "utf-8"),new BasicHeader("content-type", "application/json")};
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(RIGHT_USER, RIGHT_PWD));
RestClientBuilder builder = RestClient.builder(httpHost).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
builder.setDefaultHeaders(defaultHeaders);
rightclient = new RestHighLevelClient(builder);
}
@Override
public void run() {
try {
while (true) {
SearchHit hit = queue.take();
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
SearchHit[] rightSearchHits = getData(rightclient,INDEX_NAME,"_id", hit.getId());
JSONArray jsonArray;
if(null==map.get("diff_ids")){
jsonArray = new JSONArray();
}else {
jsonArray = (JSONArray) map.get("diff_ids");
}
if(rightSearchHits==null || rightSearchHits.length==0){
if(null==map.get("diff_ids")){
jsonArray = new JSONArray();
}else {
jsonArray = (JSONArray) map.get("diff_ids");
}
JSONObject object = new JSONObject();
object.put("id",hit.getId());
object.put("msg",RIGHT_IP+"无数据");
jsonArray.add(object);
map.put("diff_ids",jsonArray);
FileUtil.writeFile("logs","ItemDataError.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_核对数据错误:"+object.toJSONString());
//右边无数据直接跳过
continue;
}
String left = JSONObject.toJSONString(sourceAsMap);
String leftMd5 = md5(left);
boolean flag = false;
String right = "";
for (SearchHit searchHitR:rightSearchHits) {
right = JSONObject.toJSONString(searchHitR.getSourceAsMap());
String rightMd5 = md5(right);
if(leftMd5.equals(rightMd5)){
flag = true;
break;
}
}
if(!flag){
JSONObject diff = new JSONObject();
diff.put("id",hit.getId());
diff.put("msg",LEFT_IP+"_"+RIGHT_IP+"_MD5差异");
diff.put("left",left);
diff.put("right",right);
jsonArray.add(diff);
map.put("diff_ids",jsonArray);
FileUtil.writeFile("logs","ItemDataError.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_MD5差异:"+diff.toJSONString());
}
Map<String,Object> notDiffMap;
if(null==map.get("notDiff")){
notDiffMap = new HashMap<>();
}else {
notDiffMap = (Map<String, Object>) map.get("notDiff");
}
JSONObject notDiffOBJ = new JSONObject();
notDiffOBJ.put("id",hit.getId());
notDiffOBJ.put("type",hit.getType());
notDiffMap.put(hit.getId()+"_"+hit.getType(),notDiffOBJ);
FileUtil.writeFile("logs","ItemDataSuccess.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"核对数据正确:"+notDiffOBJ.toJSONString());
map.put("notDiff",notDiffMap);
System.out.println(queue.size() + "___"+countItem+"___"+leftTotalHits);
if (queue.size()==0 ) {
FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_正确数据:"+notDiffMap.size());
FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_差异如下:"+jsonArray.size());
FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_数据完毕: 扫描行数"+countItem);
FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_数据完毕: 总行数"+leftTotalHits);
if(countItem==leftTotalHits){
FileUtil.writeFile("logs","ItemData.log",Thread.currentThread().getName()+" _"+Thread.currentThread().getId()+"_消费者完成对比");
break;
}
}
}
System.out.println("消费者结束");
} catch (Exception e) {
e.printStackTrace();
FileUtil.writeFile("logs","ItemData.log",JSONObject.toJSONString(e));
Thread.currentThread().interrupt();
}
}
public static String md5(String message) {
try {
// 创建MD5消息摘要对象
MessageDigest md = MessageDigest.getInstance("MD5");
// 计算消息的摘要
byte[] digest = md.digest(message.getBytes());
// 将摘要转换为十六进制字符串
String hexString = bytesToHex(digest);
return hexString;
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
FileUtil.writeFile("logs","ItemData.log",JSONObject.toJSONString(e));
}
return null;
}
public static String bytesToHex(byte[] bytes) {
StringBuilder hexString = new StringBuilder();
for (byte b : bytes) {
String hex = Integer.toHexString(0xff & b);
if (hex.length() == 1) {
hexString.append('0');
}
hexString.append(hex);
}
return hexString.toString();
}
public static SearchHit[] getData(RestHighLevelClient client,String indexName,String key,String value) {
//1 创建搜索文档请求
SearchRequest searchRequest=new SearchRequest(indexName); //请求索引
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchQuery(key,value));
searchRequest.source(builder);
SearchHit[] hits=null;
try{
// 2 执行检索
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
// 3 分析响应结果
//遍历数据
hits = response.getHits().getHits();
return hits;
} catch (Exception e) {
e.printStackTrace();
FileUtil.writeFile("logs","ItemData.log",JSONObject.toJSONString(e));
}
return hits;
}
}
}
FileUtil.java
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class FileUtil {
//创建文件夹
public static boolean mkdir(String directory) {
boolean flag = false;
if (null==directory || directory.trim().length()==0) {
directory = "D:" + File.separator + "usr" + File.separator + "log";
}
File file = new File(directory);
if (!file.exists()) {
flag = file.mkdirs();
} else {
flag = true;
}
return flag;
}
//指定目录下创建文件
public static void writeFile(String directory, String fileName, String context) {
File file = new File(directory, fileName);
boolean isSuccess = false;
if (!file.exists()) {
try {
isSuccess = file.createNewFile();
} catch (IOException e) {
boolean mkdir = mkdir(directory);
if (mkdir) {
writeFile(directory, fileName, context);
}
}
if (isSuccess) {
write(directory, fileName, context);
}
} else {
write(directory, fileName, context);
}
}
//换行追加写文件
public static void write(String directory, String fileName, String context) {
try {
FileWriter fileWriter = new FileWriter(directory + File.separator + fileName, true);
BufferedWriter bufferedWriter = new BufferedWriter(fileWriter);
SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss sss");
//格式化
Date nowtime = new Date();
String strTime =sd.format(nowtime);
bufferedWriter.write(strTime+":"+context + "\r\n");
bufferedWriter.flush();
bufferedWriter.close();
fileWriter.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
评论区