远程读取elasticSearch数据库并导出数据「建议收藏」

远程读取elasticSearch数据库并导出数据「建议收藏」packageorg.elasticsearch.esTest;importjava.awt.List;importjava.io.BufferedWriter;importjava.io.File;importjava.io.FileWriter;importjava.io.IOException;importjava.util.ArrayList;importjava

大家好,又见面了,我是你们的朋友全栈君。

最近刚开完题,毕设是使用机器学习算法对电磁数据中的异常进行检测。所有的电磁数据都存储在分布式数据库es中,所以第一步需要导出数据,这两天写了点这部分的程序,已经导出部分数据。

package org.elasticsearch.esTest;

import java.awt.List;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.ExecutionException;
//maven管理依赖
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;

import org.elasticsearch.index.query.QueryBuilders.*;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.search.SearchHits;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

/** * Hello world! * */
public class EsClient {
    static File trace = new File("E:/es data/emcas-2018.01.04_trace.txt");
    static File warning = new File("E:/es data/emcas-2018.01.04_warning.txt");
    static File other = new File("E:/es data/emcas-2018.01.04_other.txt");

    public static Client  getClient() throws IOException {
         Settings settings = ImmutableSettings.settingsBuilder()
                    .put("cluster.name", "estest1")
                    .build();
            TransportClient client = new TransportClient(settings).addTransportAddress(  
                    new InetSocketTransportAddress("10.10.41.153", 9300));
//          FileWriter fw = new FileWriter(article);
//          BufferedWriter bfw = new BufferedWriter(fw);
            return client; 
    }


   public static HashSet<String> write2File(Client client) throws IOException{
       long start = System.currentTimeMillis();
       int scrollSize = 1000;
       SearchResponse response = null; 
       FileWriter fw_trace1 = new FileWriter(trace);
       BufferedWriter bfw1 = new BufferedWriter(fw_trace1);

       FileWriter fw_warning1 = new FileWriter(warning);
       BufferedWriter bfw2 = new BufferedWriter(fw_warning1);

       FileWriter fw_other1 = new FileWriter(trace);
       BufferedWriter bfw3 = new BufferedWriter(fw_other1);

//     ArrayList<Integer>collectid = new ArrayList<Integer>();
       HashSet collectid = new HashSet();

       int i =0;
       while (response == null || response.getHits().hits().length != 0 && i <=1) {            
//         if(i % 100 == 0){
//             fw = new FileWriter(autoCreateFile(i/10+1));
//             BufferedWriter bfw1 = new BufferedWriter(fw); 
//             bfw = bfw1;
//             System.out.println("这是第"+i/10+"万条数据");
//         }           
           try{
           response = client.prepareSearch("emcas-2017.10.16")
                    .setQuery(QueryBuilders.matchAllQuery())
                    .setSize(scrollSize)                            
                    .setFrom(i*scrollSize)
//                  .setFrom(0)
                    .execute()
                    .actionGet();
           }
           catch (IndexMissingException e) {
             System.out.println("not found");
        }   




           SearchHits hits = response.getHits();

           int trace_count = 0;
           int warning_count =0;
           int other_count = 0;

           for(int j = 0 ; j < hits.getHits().length; j++){ 
   
               String jsonstr = hits.getHits()[j]
                       .getSourceAsString(); 
               JSONObject json_1 = JSON.parseObject(jsonstr);
               System.out.println(json_1);



               if(json_1.get("eventType").equals("trace")){
                   trace_count++;
                   collectid.add(json_1.get("collectorId"));
                   if(trace_count % 100000 == 0){
                       FileWriter fw_trace2 = new FileWriter(autoCreateFile(trace_count/100000));
                       BufferedWriter bfw_trace = new BufferedWriter(fw_trace2);
                       bfw1 = bfw_trace;
                   }
                   bfw1.write(json_1.toString()+'\r');
                   bfw1.flush();
               }else if(json_1.get("eventType").equals("warning")){
                   warning_count++;
                   if(warning_count % 100 == 0){
                       FileWriter fw_warning2 = new FileWriter(autoCreateFile(warning_count/100));
                       BufferedWriter bfw_warning2 = new BufferedWriter(fw_warning2);
                       bfw2 = bfw_warning2;
                   }
                   bfw2.write(json_1.toString()+'\r');
                   bfw2.flush();
               }else{
                   other_count++;
                   if(other_count % 100 == 0){
                       FileWriter fw_other2 = new FileWriter(autoCreateFile(other_count/100));
                       BufferedWriter bfw_other2 = new BufferedWriter(fw_other2);
                       bfw3 = bfw_other2;
                   }
                   bfw3.write(json_1.toString()+'\r');
                   bfw3.flush();
               }
              }                                           
           i++;
       }            
            bfw1.close();
            bfw2.close();
            bfw3.close();
            fw_other1.close();
            fw_trace1.close();
            fw_warning1.close();
            long end = System.currentTimeMillis();
            long totalTime = end - start;
            System.out.println("总耗时:"+totalTime);
            return collectid;
      }



   public static File autoCreateFile(int i ) throws IOException {
       File file = new File("E:/es data/"+i+".txt"); 
       file.createNewFile();
       return file;
   }




    public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
        EsClient instance = new EsClient();
        Client client = instance.getClient();
        HashSet hashSet = new HashSet();
        hashSet = write2File(client);
        for (Object object : hashSet) {
            System.out.println(object);
        }
        System.out.println(hashSet.size()+"size!!!!!!!!");
//      GetResponse response = client.prepareGet("emcas-2017.10.18","trace","AV8tK5NeSBmsIUk260HQ")
//      GetResponse response = client.prepareGet("emcas-2017.10.18","status","4")
//              .execute()
//              .actionGet(); 
//      System.out.println(response.getSource()); 
//用于计算es数据库中一个index下docs的总记录数
//      SearchResponse response2 = client.prepareSearch("emcas-2018.01.04")
//                  .setQuery(QueryBuilders.matchAllQuery())
//                  .setSize(0)                             
//                  .execute()
//                  .actionGet();
//      SearchHits hits = response2.getHits();
//      long hitscount = hits.getTotalHits();
//      System.out.println(hitscount);
    }
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/134374.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • React框架开发使用部分常见问题

    React框架开发使用部分常见问题

    2021年7月3日
    94
  • VMware Tools安装步骤(windows10)[通俗易懂]

    VMware Tools安装步骤(windows10)[通俗易懂]VMwareTools是VMware虚拟机中的一个工具,其主要作用是能够使鼠标在虚拟机和主机之前流畅地切换,并且能够共享剪贴板。我们可以通过VMwareTools将主机的文件复制粘贴到虚拟机。同时使得Ubuntu界面完全填充VMware界面在Windows10环境下,在电脑上安装VMware和Ubuntu的具体步骤可以看此篇博客:Windows环境下,在VMware中安装Ubuntu的详细步骤本文讲述了在VMware16.0.0,Ubuntu21.10环境下,VMwareTools的安装步骤。1

    2022年5月9日
    801
  • JRTPLIB_刘伯温传简介

    JRTPLIB_刘伯温传简介jrtplib是一个基于C++、面向对象的RTP封装库,最新的版本是3.9.1(2011年11月)。为了与RFC3550相兼容,3.x.x版本经过完全重写,现在它提供了一些非常有用的组件,这些组件为构建各种各样的RTP应用程序开发提供了有用的帮助。较旧的2.x版本依然可用,但是不兼容RFC3550。1.特性    jrtplib支持定义于RFC3550中的RTP协议,它使得发送和接

    2022年7月28日
    1
  • 树莓派3B+ 软件源更改

    树莓派3B+ 软件源更改树莓派3B+软件源更改由于树莓派软件官方源在国外,所以连接不稳定,且速度慢,所以安装初次进入系统后,一定要修改一下软件源。国内软件源有很多,在这里,我推荐自己常使用的:中国科学技术大学Raspbianhttp://mirrors.ustc.edu.cn/raspbian/raspbian/1.替换脚本下面脚本请直接复制到终端

    2022年6月25日
    23
  • 在IDEA上Git的入门使用(IDEA+Git)[通俗易懂]

    在IDEA上Git的入门使用(IDEA+Git)[通俗易懂]前言:Git是目前最常用的版本控制系统,而IDEA又是目前日渐流行的ide,因此现在来介绍在IDEA上Git的入门使用。 准备:Git、IDEA、GitHub账号开始之前先创建一个简单的测试项目 将代码交由Git管理    VCS ——&gt; EnableVersionControlIntegration…    ——&gt; 选择要使…

    2022年6月16日
    42
  • Java finalize方法使用

    Java finalize方法使用《JAVA编程思想》:java提供finalize()方法,垃圾回收器准备释放内存的时候,会先调用finalize()。         (1).对象不一定会被回收。      (2).垃圾回收不是析构函数。      (3).垃圾回收只与内存有关。

    2022年9月19日
    0

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号