ElasticSearch 开发文档

  • Post author:
  • Post category:其他




1. 常用ES语法

    1. Kiban: DSL 短查询作用,查看系统的状态与集群的状态,调试集群的线程
  • 2 cerebro: 集群监控

    cerebro说明



1-1 RestHighLevel Client 创建连接:

  • 由于 elasticsearch 7.X 版本之后 提供高级REST API,使用Elasticsearch中使用高级API

    MAVEN 配置如下:

    单纯的添加

    elasticsearch-rest-high-clinet

    有问题,需要添加其他一些对应的依赖
         <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>7.3.1</version>
        </dependency>
        <!-- Java High Level REST Client -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.plugin</groupId>
            <artifactId>lang-mustache-client</artifactId>
            <version>7.3.1</version>
        </dependency>
  • 配置 ResthighLevelClient:
 private final static String HTTP_SCHEM = "http";
    private final static int CONNECT_TIMEOUT_MILLIS = 1000;
    private final static int SOCKET_TIMEOUT_MILLIS = 30000;
    public final static int CONNECTION_REQUEST_TIMEOUT_MILLIS = 500;
    public final static int MAX_CONN_PER_ROUTE = 10;
    public final static int MAX_CONN_TOTAL = 30;

    @Bean
    public RestHighLevelClient restHighLevelClient() {

        assert (StringUtils.isNotEmpty(clusterName));
        assert (StringUtils.isNotEmpty(clusterNodes));

        HttpHost[] httpHosts = Arrays.stream(clusterNodes.split(","))
                .map(this::mHttpPost)
                .filter(Objects::nonNull)
                .toArray(HttpHost[]::new);
        //RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
        // 注意这里使用网址或者IP方式, IP方法 帮你自动进行轮询访问,域名方式需要自己使用nginx 配置转发
         RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost("sobug-es.inhuawei.com",80,HTTP_SCHEM));
        //RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost("10.44.150.108",9200,HTTP_SCHEM));

        // 设置 连接时间延时
        restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
            @Override
            public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
                builder.setConnectTimeout(CONNECT_TIMEOUT_MILLIS);
                builder.setSocketTimeout(SOCKET_TIMEOUT_MILLIS);
                builder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MILLIS);
                return builder;
            }
        });
        // 配置 HTTPClient 异步设置并发连接数
        restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                httpAsyncClientBuilder.setMaxConnTotal(MAX_CONN_TOTAL);
                httpAsyncClientBuilder.setMaxConnPerRoute(MAX_CONN_PER_ROUTE);
                return httpAsyncClientBuilder;
            }
        });

        return new RestHighLevelClient(restClientBuilder);
    }

    private HttpHost mHttpPost(String s) {
        assert (StringUtils.isNoneBlank(s));
        String[] address = s.split(":");
        if (address.length == 2) {

            return new HttpHost(address[0], Integer.parseInt(address[1]));
        }
        return null;
    }



1-2 创建索引

创建索引的时候必须要 指定 index 的

settings,mappings

  • 使用DSL:
PUT /dts 
{
  "settings":{
    "number_of_shards" : 3, ## 分片3 副本1
    "number_of_replicas" : 1 ,
    "analysis":{
      "analyzer":{
        "trigram_analyzer":{
          "tokenizer": "trigram_tokenizer",
          "filter" : ["lowercase"]
        },
        "code_analyzer": {
          "tokenizer": "it_samrt",
          "filter" : ["knsynonym"]
        },
        "code_searcher":{
          "tokenizer": "whitespace",
          "filter" : ["knsynonym"]
        }
      },
      "tokenizer":{
        "trigram_tokenizer":{
          "type": "ngram",
          "min_gram": 3,
          "max_gram": 3
        }
      },
      "filter":{
        "knsynonym":{
          "type": "synonym",
          "synonyms_path" : "analysis/synonym.txt",
          "ignore_case" : true
        }
      }
    }
  },
  "mappings":{
    
      "properties" :{
        "dtsNo": {"type": "keyword"},
        "briefDesc": {"type": "text","analyzer": "code_analyzer","search_analyzer": "code_searcher"},
        "detailDesc": {"type": "text","analyzer": "code_analyzer", "search_analyzer": "code_searcher"},
        "reasonDesc": {"type": "text","analyzer": "code_analyzer", "search_analyzer": "code_searcher"},
        "productNo": {"type": "text"},
        "productxdtNo": {"type": "text"},
        "excludeNo": {"type": "text"},
        "starttime": {"type": "text"},
        "endTime": {"type": "text"},
        "lastHandler" : {"type": "keyword"},
        "vVersion": {"type": "keyword"},
        "rVersion": {"type": "keyword"},
        "cVersion": {"type": "keyword"}
      }
  }
    
}

对应的 java 脚本:使用

CreateIndexRequest

7.5.1 放弃TransportClient

public boolean createIndex(String name, String source) {

        if (isIndexEists(name) || isAliasExists(name)) {
            return false;
        }
        String index = name + "_" + TimeUtils.now();
        if (doCreateIndex(index, source))
            return createAlias(name, index) && createAlias(getIndexingAlias(name), index);
        return false;
    }

    private boolean doCreateIndex(String index, String source) {

        CreateIndexRequest request = new CreateIndexRequest(index);
        request.source(source, XContentType.JSON);

        request.setTimeout(TimeValue.timeValueMinutes(2));
        request.waitForActiveShards(ActiveShardCount.from(2));
        request.waitForActiveShards(ActiveShardCount.DEFAULT);
        try {
            CreateIndexResponse createIndexResponse = restHighLevelClient.indices()
                    .create(request, RequestOptions.DEFAULT);
            return createIndexResponse.isAcknowledged();
        } catch (IOException e) {
            LOGGER.error("Index Create error ", e.getMessage());
        }
        return false;
    }
    public boolean createAlias(String alias, String index) {
        IndicesAliasesRequest request = new IndicesAliasesRequest();
        request.addAliasAction(new IndicesAliasesRequest
                .AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
                .index(index)
                .alias(alias));
        request.addAliasAction(IndicesAliasesRequest.AliasActions.add().alias(alias).index(index));
        try {
            return restHighLevelClient.indices().updateAliases(request, RequestOptions.DEFAULT).isAcknowledged();
        } catch (IOException e) {
            e.printStackTrace();
            LOGGER.error("create Alias error ", e.getMessage());
        }
        return false;
    }

  • 扩展
 ###1. 查看集群状态
 GET _cluster/state
 ###2 查看分词效果 dts_issue 索引名,ik_smart 设定的分词器
 GET dts_issue/_analyze
 {
    "analyzer":"ik_smart",
    "text":"我喜欢你"
 }
 ### 3 部分更新文档
 POST /testindex/_doc/1/_update
 {
	"doc":{
		"title": "hello world"
	}
}
### 4 mappings index
索引index 控制字段怎样建立索引,怎样查询,三个可用值
no: 不把此字段添加到索引中,不建立索引同时此字段不可查询
not_analyzed: 



1-3 Bulk 上传

批量上传,Bulk 上传使用 elasticsearch 给我提供的

BulkProcess

这个工具类,在上传之前需要对其进行配置

配置信息:


批量配置 记录一旦上传失败是在 哪个 上传请求中

  public BulkProcessor buildBulkProcess() {

        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                int numberOfActions = request.numberOfActions();
                LOGGER.debug("Executing bulk [{}] with {} requests",
                        executionId, numberOfActions);
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                if (response.hasFailures()) {
                    LOGGER.warn("Bulk [{}] executed with failures", executionId);
                } else {
                    LOGGER.debug("Bulk [{}] completed in {} milliseconds",
                            executionId, response.getIngestTookInMillis());
                }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable throwable) {
                LOGGER.error("Failed to execute bulk", throwable);
            }
        };
        BulkProcessor bulkProcessor = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener))
                , listener)
                .setBulkActions(500)
                .setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB))
                .setConcurrentRequests(1)
                .setFlushInterval(TimeValue.timeValueSeconds(10L))
                .setBackoffPolicy(BackoffPolicy
                        .constantBackoff(TimeValue.timeValueSeconds(1L), 3))
                .build();
        return bulkProcessor;
    }

  • 批量上传在 7.5.1 版本官方文档中使用

    XconetntFactory.jsonBuilder()

    :
for (ElasticTranfomerPo vo : data) {
            XContentBuilder xContentBuilder = XContentFactory.jsonBuilder();
            xContentBuilder.startObject();
            {
                xContentBuilder.field("dtsNo", vo.getDtsNo());
                xContentBuilder.field("briefDesc", vo.getBriefDesc());
                xContentBuilder.field("detailDesc", vo.getDetailDesc());
                xContentBuilder.field("reasonDesc", vo.getReasonDesc());
                xContentBuilder.field("productNo", vo.getProductNo());
                xContentBuilder.field("productxdtNo", vo.getProductxdtNo());
                xContentBuilder.field("dmodifyTime", vo.getDmodifyTime());
                xContentBuilder.field("modifier", vo.getModifier());
                xContentBuilder.field("rVersion", vo.getRVersion());
                xContentBuilder.field("vVsersion", vo.getVVsersion());
                xContentBuilder.field("cVersion", vo.getCVersion());
                xContentBuilder.timeField("uploadTime", TimeUtils.now());
            }
            xContentBuilder.endObject();
            bulkProcessor.add(new IndexRequest(index).id(vo.getDtsNo())
                    .source(xContentBuilder));
        }



版权声明:本文为qq_27217017原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。