成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

干貨分享:利用Java多線程技術導入數據到Elasticsearch

新聞 前端
作者花了3天的時間,利用java線程池框架Executors中的FixedThreadPool線程池重寫了MTE導入工具,單臺服務器導入效率提高十幾倍(合理調整線程數據,效率更高)。

 前言

干貨分享:利用java多線程技術導入數據到Elasticsearch

近期接到一個任務,需要改造現有從mysql往Elasticsearch導入數據MTE(mysqlToEs)小工具,由于之前采用單線程導入,千億數據需要兩周左右的時間才能導入完成,導入效率非常低。所以樓主花了3天的時間,利用java線程池框架Executors中的FixedThreadPool線程池重寫了MTE導入工具,單臺服務器導入效率提高十幾倍(合理調整線程數據,效率更高)。

關鍵技術棧

  • Elasticsearch
  • jdbc
  • ExecutorService\Thread
  • sql

工具說明

maven依賴

  1. <dependency> 
  2.  <groupId>mysql</groupId> 
  3.  <artifactId>mysql-connector-java</artifactId> 
  4.  <version>${mysql.version}</version> 
  5. </dependency> 
  6. <dependency> 
  7.  <groupId>org.elasticsearch</groupId> 
  8.  <artifactId>elasticsearch</artifactId> 
  9.  <version>${elasticsearch.version}</version> 
  10. </dependency> 
  11. <dependency> 
  12.  <groupId>org.elasticsearch.client</groupId> 
  13.  <artifactId>transport</artifactId> 
  14.  <version>${elasticsearch.version}</version> 
  15. </dependency> 
  16. <dependency> 
  17.  <groupId>org.projectlombok</groupId> 
  18.  <artifactId>lombok</artifactId> 
  19.  <version>${lombok.version}</version> 
  20. </dependency> 
  21. <dependency> 
  22.  <groupId>com.alibaba</groupId> 
  23.  <artifactId>fastjson</artifactId> 
  24.  <version>${fastjson.version}</version> 
  25. </dependency> 

java線程池設置

默認線程池大小為21個,可調整。其中POR為處理流程已辦數據線程池,ROR為處理流程已閱數據線程池。

  1. private static int THREADS = 21
  2. public static ExecutorService POR = Executors.newFixedThreadPool(THREADS); 
  3. public static ExecutorService ROR = Executors.newFixedThreadPool(THREADS); 

定義已辦生產者線程/已閱生產者線程:ZlPendProducer/ZlReadProducer

  1. public class ZlPendProducer implements Runnable { 
  2.  ... 
  3.  @Override 
  4.  public void run() { 
  5.  System.out.println(threadName + "::啟動..."); 
  6.  for (int j = 0; j < Const.TBL.TBL_PEND_COUNT; j++) 
  7.  try { 
  8.  .... 
  9.  int size = 1000
  10.  for (int i = 0; i < count; i += size) { 
  11.  if (i + size > count) { 
  12.  //作用為size***沒有100條數據則剩余幾條newList中就裝幾條 
  13.  size = count - i; 
  14.  } 
  15.  String sql = "select * from " + tableName + " limit " + i + ", " + size; 
  16.  System.out.println(tableName + "::sql::" + sql); 
  17.  rs = statement.executeQuery(sql); 
  18.  List<HistPendingEntity> lst = new ArrayList<>(); 
  19.  while (rs.next()) { 
  20.  HistPendingEntity p = PendUtils.getHistPendingEntity(rs); 
  21.  lst.add(p); 
  22.  } 
  23.  MteExecutor.POR.submit(new ZlPendConsumer(lst)); 
  24.  Thread.sleep(2000); 
  25.  } 
  26.  .... 
  27.  } catch (Exception e) { 
  28.  e.printStackTrace(); 
  29.  } 
  30.  } 
  31. public class ZlReadProducer implements Runnable { 
  32.  ...已閱生產者處理邏輯同已辦生產者 

定義已辦消費者線程/已閱生產者線程:ZlPendConsumer/ZlReadConsumer

  1. public class ZlPendConsumer implements Runnable { 
  2.  private String threadName; 
  3.  private List<HistPendingEntity> lst; 
  4.  public ZlPendConsumer(List<HistPendingEntity> lst) { 
  5.  this.lst = lst; 
  6.  } 
  7.  @Override 
  8.  public void run() { 
  9.  ... 
  10.  lst.forEach(v -> { 
  11.  try { 
  12.  String json = new Gson().toJson(v); 
  13.  EsClient.addDataInJSON(json, Const.ES.HistPendDB_Index, Const.ES.HistPendDB_type, v.getPendingId(), null); 
  14.  Const.COUNTER.LD_P.incrementAndGet(); 
  15.  } catch (Exception e) { 
  16.  e.printStackTrace(); 
  17.  System.out.println("err::PendingId::" + v.getPendingId()); 
  18.  } 
  19.  }); 
  20.  ... 
  21.  } 
  22. public class ZlReadConsumer implements Runnable { 
  23.  //已閱消費者處理邏輯同已辦消費者 

定義導入Elasticsearch數據監控線程:Monitor

監控線程-Monitor為了計算每分鐘導入Elasticsearch的數據總條數,利用監控線程,可以調整線程池的線程數的大小,以便利用多線程更快速的導入數據。

  1. public void monitorToES() { 
  2.  new Thread(() -> { 
  3.  while (true) { 
  4.  StringBuilder sb = new StringBuilder(); 
  5.  sb.append("已辦表數::").append(Const.TBL.TBL_PEND_COUNT) 
  6.  .append("::已辦總數::").append(Const.COUNTER.LD_P_TOTAL) 
  7.  .append("::已辦入庫總數::").append(Const.COUNTER.LD_P); 
  8.  sb.append("~~~~已閱表數::").append(Const.TBL.TBL_READ_COUNT); 
  9.  sb.append("::已閱總數::").append(Const.COUNTER.LD_R_TOTAL) 
  10.  .append("::已閱入庫總數::").append(Const.COUNTER.LD_R); 
  11.  if (ldPrevPendCount == 0 && ldPrevReadCount == 0) { 
  12.  ldPrevPendCount = Const.COUNTER.LD_P.get(); 
  13.  ldPrevReadCount = Const.COUNTER.LD_R.get(); 
  14.  start = System.currentTimeMillis(); 
  15.  } else { 
  16.  long end = System.currentTimeMillis(); 
  17.  if ((end - start) / 1000 >= 60) { 
  18.  start = end; 
  19.  sb.append("\n#########################################\n"); 
  20.  sb.append("已辦每分鐘TPS::" + (Const.COUNTER.LD_P.get() - ldPrevPendCount) + "條"); 
  21.  sb.append("::已閱每分鐘TPS::" + (Const.COUNTER.LD_R.get() - ldPrevReadCount) + "條"); 
  22.  ldPrevPendCount = Const.COUNTER.LD_P.get(); 
  23.  ldPrevReadCount = Const.COUNTER.LD_R.get(); 
  24.  } 
  25.  } 
  26.  System.out.println(sb.toString()); 
  27.  try { 
  28.  Thread.sleep(3000); 
  29.  } catch (InterruptedException e) { 
  30.  e.printStackTrace(); 
  31.  } 
  32.  } 
  33.  }).start(); 

初始化Elasticsearch:EsClient

  1. String cName = meta.get("cName");//es集群名字 
  2. String esNodes = meta.get("esNodes");//es集群ip節點 
  3. Settings esSetting = Settings.builder() 
  4.  .put("cluster.name", cName) 
  5.  .put("client.transport.sniff"true)//增加嗅探機制,找到ES集群 
  6.  .put("thread_pool.search.size"5)//增加線程池個數,暫時設為5 
  7.  .build(); 
  8. String[] nodes = esNodes.split(","); 
  9. client = new PreBuiltTransportClient(esSetting); 
  10. for (String node : nodes) { 
  11.  if (node.length() > 0) { 
  12.  String[] hostPort = node.split(":"); 
  13.  client.addTransportAddress(new TransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1]))); 
  14.  } 

初始化數據庫連接

  1. conn = DriverManager.getConnection(url, user, password); 

啟動參數

  1. nohup java -jar mte.jar ES-Cluster2019 node1:9300,node2:9300,node3:9300 root 123456! jdbc:mysql://ip:3306/mte 130 130 >> ./mte.log 2>&1 & 

參數說明

ES-Cluster2019 為Elasticsearch集群名字

node1:9300,node2:9300,node3:9300為es的節點IP

130 130為已辦已閱分表的數據

程序入口:MteMain

干貨分享:利用java多線程技術導入數據到Elasticsearch
 
 
  1. // 監控線程 
  2. Monitor monitorService = new Monitor(); 
  3. monitorService.monitorToES(); 
  4. // 已辦生產者線程 
  5. Thread pendProducerThread = new Thread(new ZlPendProducer(conn, "ZlPendProducer")); 
  6. pendProducerThread.start(); 
  7. // 已閱生產者線程 
  8. Thread readProducerThread = new Thread(new ZlReadProducer(conn, "ZlReadProducer")); 
  9. readProducerThread.start(); 
責任編輯:張燕妮 來源: 頭條科技
相關推薦

2021-04-28 08:00:16

多線程高并發操作

2018-05-30 16:55:47

阿里Java多線程

2019-09-16 12:55:27

HBaseKafka數據

2024-07-03 08:02:19

MySQL數據搜索

2012-01-12 10:09:30

Java

2016-12-21 14:14:51

SQOOP數據庫HDFS

2016-11-11 11:11:25

2009-03-12 10:52:43

Java線程多線程

2010-07-15 15:21:07

Perl線程

2010-07-16 13:21:26

Perl哈希表

2016-07-27 16:45:12

大數據IT

2009-10-23 09:26:09

VB.NET多線程

2009-04-27 13:15:04

多線程方法run()

2019-08-15 11:11:38

Java數據庫設計

2009-06-29 17:49:47

Java多線程

2021-12-26 18:22:30

Java線程多線程

2010-05-25 14:54:18

2024-10-24 17:13:55

WinformUI多線程

2023-10-18 15:19:56

2009-07-21 17:09:47

ASP.NET多線程
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 日韩不卡在线观看 | 成人免费视频网站在线观看 | 亚洲一区不卡在线 | 国产福利在线 | 亚洲国产精品视频 | 日日操夜夜操天天操 | 欧美日韩高清一区 | 日韩成人在线播放 | 欧美电影免费观看高清 | 华丽的挑战在线观看 | 日韩精品在线观看一区二区 | 亚洲精品一 | 一级毛片视频 | 午夜爱爱毛片xxxx视频免费看 | 日韩免费av | 亚洲欧美中文日韩在线v日本 | 欧美日韩黄色一级片 | 亚洲av毛片 | 黄色毛片在线观看 | 密色视频| 午夜视频一区 | 四虎在线播放 | 日韩高清一区二区 | 亚洲一区国产精品 | 久久人爽爽人爽爽 | 97国产一区二区精品久久呦 | 国产日韩欧美一区 | 狠狠干网站 | 日韩中文字幕免费在线观看 | 午夜在线视频 | caoporn视频在线 | av不卡一区 | 国内精品伊人久久久久网站 | 麻豆av在线| 97伦理最新伦理 | 一级黄色片网址 | 成人不卡视频 | 国产欧美精品一区二区 | 免费a级毛片在线播放 | 中文在线播放 | 欧美日韩精品在线一区 |