1. 常用ES语法
-
- Kiban: DSL 短查询作用,查看系统的状态与集群的状态,调试集群的线程
-
2 cerebro: 集群监控
cerebro说明
1-1 RestHighLevel Client 创建连接:
-
由于 elasticsearch 7.X 版本之后 提供高级REST API,使用Elasticsearch中使用高级API
MAVEN 配置如下:
单纯的添加
elasticsearch-rest-high-clinet
有问题,需要添加其他一些对应的依赖
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.3.1</version>
</dependency>
<!-- Java High Level REST Client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.3.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.3.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>lang-mustache-client</artifactId>
<version>7.3.1</version>
</dependency>
- 配置 ResthighLevelClient:
private final static String HTTP_SCHEM = "http";
private final static int CONNECT_TIMEOUT_MILLIS = 1000;
private final static int SOCKET_TIMEOUT_MILLIS = 30000;
public final static int CONNECTION_REQUEST_TIMEOUT_MILLIS = 500;
public final static int MAX_CONN_PER_ROUTE = 10;
public final static int MAX_CONN_TOTAL = 30;
@Bean
public RestHighLevelClient restHighLevelClient() {
assert (StringUtils.isNotEmpty(clusterName));
assert (StringUtils.isNotEmpty(clusterNodes));
HttpHost[] httpHosts = Arrays.stream(clusterNodes.split(","))
.map(this::mHttpPost)
.filter(Objects::nonNull)
.toArray(HttpHost[]::new);
//RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
// 注意这里使用网址或者IP方式, IP方法 帮你自动进行轮询访问,域名方式需要自己使用nginx 配置转发
RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost("sobug-es.inhuawei.com",80,HTTP_SCHEM));
//RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost("10.44.150.108",9200,HTTP_SCHEM));
// 设置 连接时间延时
restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
builder.setConnectTimeout(CONNECT_TIMEOUT_MILLIS);
builder.setSocketTimeout(SOCKET_TIMEOUT_MILLIS);
builder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MILLIS);
return builder;
}
});
// 配置 HTTPClient 异步设置并发连接数
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
httpAsyncClientBuilder.setMaxConnTotal(MAX_CONN_TOTAL);
httpAsyncClientBuilder.setMaxConnPerRoute(MAX_CONN_PER_ROUTE);
return httpAsyncClientBuilder;
}
});
return new RestHighLevelClient(restClientBuilder);
}
private HttpHost mHttpPost(String s) {
assert (StringUtils.isNoneBlank(s));
String[] address = s.split(":");
if (address.length == 2) {
return new HttpHost(address[0], Integer.parseInt(address[1]));
}
return null;
}
1-2 创建索引
创建索引的时候必须要 指定 index 的
settings,mappings
- 使用DSL:
PUT /dts
{
"settings":{
"number_of_shards" : 3, ## 分片3 副本1
"number_of_replicas" : 1 ,
"analysis":{
"analyzer":{
"trigram_analyzer":{
"tokenizer": "trigram_tokenizer",
"filter" : ["lowercase"]
},
"code_analyzer": {
"tokenizer": "it_samrt",
"filter" : ["knsynonym"]
},
"code_searcher":{
"tokenizer": "whitespace",
"filter" : ["knsynonym"]
}
},
"tokenizer":{
"trigram_tokenizer":{
"type": "ngram",
"min_gram": 3,
"max_gram": 3
}
},
"filter":{
"knsynonym":{
"type": "synonym",
"synonyms_path" : "analysis/synonym.txt",
"ignore_case" : true
}
}
}
},
"mappings":{
"properties" :{
"dtsNo": {"type": "keyword"},
"briefDesc": {"type": "text","analyzer": "code_analyzer","search_analyzer": "code_searcher"},
"detailDesc": {"type": "text","analyzer": "code_analyzer", "search_analyzer": "code_searcher"},
"reasonDesc": {"type": "text","analyzer": "code_analyzer", "search_analyzer": "code_searcher"},
"productNo": {"type": "text"},
"productxdtNo": {"type": "text"},
"excludeNo": {"type": "text"},
"starttime": {"type": "text"},
"endTime": {"type": "text"},
"lastHandler" : {"type": "keyword"},
"vVersion": {"type": "keyword"},
"rVersion": {"type": "keyword"},
"cVersion": {"type": "keyword"}
}
}
}
对应的 java 脚本:使用
CreateIndexRequest
7.5.1 放弃TransportClient
public boolean createIndex(String name, String source) {
if (isIndexEists(name) || isAliasExists(name)) {
return false;
}
String index = name + "_" + TimeUtils.now();
if (doCreateIndex(index, source))
return createAlias(name, index) && createAlias(getIndexingAlias(name), index);
return false;
}
private boolean doCreateIndex(String index, String source) {
CreateIndexRequest request = new CreateIndexRequest(index);
request.source(source, XContentType.JSON);
request.setTimeout(TimeValue.timeValueMinutes(2));
request.waitForActiveShards(ActiveShardCount.from(2));
request.waitForActiveShards(ActiveShardCount.DEFAULT);
try {
CreateIndexResponse createIndexResponse = restHighLevelClient.indices()
.create(request, RequestOptions.DEFAULT);
return createIndexResponse.isAcknowledged();
} catch (IOException e) {
LOGGER.error("Index Create error ", e.getMessage());
}
return false;
}
public boolean createAlias(String alias, String index) {
IndicesAliasesRequest request = new IndicesAliasesRequest();
request.addAliasAction(new IndicesAliasesRequest
.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
.index(index)
.alias(alias));
request.addAliasAction(IndicesAliasesRequest.AliasActions.add().alias(alias).index(index));
try {
return restHighLevelClient.indices().updateAliases(request, RequestOptions.DEFAULT).isAcknowledged();
} catch (IOException e) {
e.printStackTrace();
LOGGER.error("create Alias error ", e.getMessage());
}
return false;
}
- 扩展
###1. 查看集群状态
GET _cluster/state
###2 查看分词效果 dts_issue 索引名,ik_smart 设定的分词器
GET dts_issue/_analyze
{
"analyzer":"ik_smart",
"text":"我喜欢你"
}
### 3 部分更新文档
POST /testindex/_doc/1/_update
{
"doc":{
"title": "hello world"
}
}
### 4 mappings index
索引index 控制字段怎样建立索引,怎样查询,三个可用值
no: 不把此字段添加到索引中,不建立索引同时此字段不可查询
not_analyzed:
1-3 Bulk 上传
批量上传,Bulk 上传使用 elasticsearch 给我提供的
BulkProcess
这个工具类,在上传之前需要对其进行配置
配置信息:
批量配置 记录一旦上传失败是在 哪个 上传请求中
public BulkProcessor buildBulkProcess() {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
LOGGER.debug("Executing bulk [{}] with {} requests",
executionId, numberOfActions);
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
LOGGER.warn("Bulk [{}] executed with failures", executionId);
} else {
LOGGER.debug("Bulk [{}] completed in {} milliseconds",
executionId, response.getIngestTookInMillis());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable throwable) {
LOGGER.error("Failed to execute bulk", throwable);
}
};
BulkProcessor bulkProcessor = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener))
, listener)
.setBulkActions(500)
.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB))
.setConcurrentRequests(1)
.setFlushInterval(TimeValue.timeValueSeconds(10L))
.setBackoffPolicy(BackoffPolicy
.constantBackoff(TimeValue.timeValueSeconds(1L), 3))
.build();
return bulkProcessor;
}
-
批量上传在 7.5.1 版本官方文档中使用
XconetntFactory.jsonBuilder()
:
for (ElasticTranfomerPo vo : data) {
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder();
xContentBuilder.startObject();
{
xContentBuilder.field("dtsNo", vo.getDtsNo());
xContentBuilder.field("briefDesc", vo.getBriefDesc());
xContentBuilder.field("detailDesc", vo.getDetailDesc());
xContentBuilder.field("reasonDesc", vo.getReasonDesc());
xContentBuilder.field("productNo", vo.getProductNo());
xContentBuilder.field("productxdtNo", vo.getProductxdtNo());
xContentBuilder.field("dmodifyTime", vo.getDmodifyTime());
xContentBuilder.field("modifier", vo.getModifier());
xContentBuilder.field("rVersion", vo.getRVersion());
xContentBuilder.field("vVsersion", vo.getVVsersion());
xContentBuilder.field("cVersion", vo.getCVersion());
xContentBuilder.timeField("uploadTime", TimeUtils.now());
}
xContentBuilder.endObject();
bulkProcessor.add(new IndexRequest(index).id(vo.getDtsNo())
.source(xContentBuilder));
}
版权声明:本文为qq_27217017原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。