一.本文基于 【ES】RestHighLevelClient连接阿里ElasticSearch7.10(一) 基础环境之上。记录对于es处于java中的相关业务需要做按月索引的实现
https://blog.csdn.net/ssdlleave/article/details/107846094?spm=1001.2014.3001.5501
方式:根据es模版自动创建索引。指定mapping类型
按月创建 根据创建时间 cretetime字段按月划分。追加索引名称后自动创建索引实现按月索引
定义es gis-location模版kibana添加
POST _template/template-gis-location
{
“template”: “gis-location”,
“order”: 2,
“index_patterns”: [“gis-location*”],
“aliases”: {
“gis-location”: {}
},
“settings”: {
“index”: {
“number_of_shards”: “6”,
“number_of_replicas”: “1”
}
},
“mappings”: {
“dynamic_templates”: [{
“strings”: {
“match_mapping_type”: “string”,
“mapping”: {
“type”: “keyword”
}
}
}],
“properties”: {
“id”: {
“type”: “keyword”
},
“truckNumber”: {
“type”: “keyword”
},
“parkingTime”: {
“type”: “long”
},
“travelStatus”: {
“type”: “integer”
},
“sourceType”: {
“type”: “integer”
},
“accStatus”: {
“type”: “keyword”
},
“mileage”: {
“type”: “keyword”
},
“speed”: {
“type”: “keyword”
},
“provinceCode”: {
“type”: “keyword”
},
“cityCode”: {
“type”: “keyword”
},
“areaCode”: {
“type”: “keyword”
},
“provinceName”: {
“type”: “keyword”
},
“cityName”: {
“type”: “keyword”
},
“areaName”: {
“type”: “keyword”
},
“locationDetails”: {
“type”: “keyword”
},
“latitude”: {
“type”: “double”
},
“longitude”: {
“type”: “double”
},
“location”: {
“type”: “geo_point”
},
“enable”: {
“type”: “integer”
},
“updateUserNo”: {
“type”: “keyword”
},
“update_time”: {
“type”: “long”
},
“createUserNo”: {
“type”: “keyword”
},
“createTime”: {
“type”: “long”
}
}
}
}
二 代码相关分享
1.相关注解定义
Document 文档注解 注意:(type属性是为了兼容7.*版本以下设置。 7.*之后版本type无法定义默认都为_doc类型)
import java.lang.annotation.*;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface Document {
/**
* 索引名称
*/
public String index();
/**
* 索引类型
*/
public String type();
/**
* 分片数
*/
public int shardNum();
/**
* 副本数
*/
public int replicaNum();
/**
* @Description:
* 分区索引转换格式(默认为不分区)
* 日期格式:年:-YYYY 月:-YYYYMM 周:-YYYYMMWW 日:-YYYYMMDD
* @return
*/
public PARTITION_TYPE indexPartition() default PARTITION_TYPE.NULL;
/**
* @Description: 定义分区索引类型
*/
public static enum PARTITION_TYPE {NULL, DAY, WEEK, MONTH, YEAR};
}
ESearchTypeColumn 字段类型注解
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 构建为elasticsearch
* 方便使用的jsonBuilder对象
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ESearchTypeColumn {
/**
* 字段类型
*
* @return
*/
String type() default "string";
/**
* 是否分词
*
* @return
*/
boolean analyze() default false;
}
id 主键注解
import java.lang.annotation.*;
/**
* @Description: 主键ID
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface Id {
}
IndexPartitionTime 分区索引时间注解
import java.lang.annotation.*;
/**
* @Description: 分区索引时间
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface IndexPartitionTime {
}
Routing 路由注解
import java.lang.annotation.*;
/**
* @Description: 路由值
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface Routing {
}
Version 版本号注解
import java.lang.annotation.*;
/**
* @Description: 版本号
* @author ss
* @date
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface Version {
}
2.注解定义完成后 添加一个文档
定义AbstractElasticsearchDocument 基础文档类型
/**
* 基础文档类型
*/
public abstract class AbstractElasticsearchDocument {
}
import com.jianan.gis.info.entity.GeoPoint;
import com.jianan.gis.info.es.support.*;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.math.BigDecimal;
import java.util.Date;
/**
* 车辆位置记录文档
*/
@Data
@ApiModel(value = "GisLocationDocument对象", description = "GisLocationDocument对象")
@Document(index = "gis-location", type = "_doc", shardNum = 3, replicaNum = 0, indexPartition = Document.PARTITION_TYPE.MONTH)
public class GisLocationDocument extends AbstractElasticsearchDocument{
/**
*
*/
private static final long serialVersionUID = 3668112997477936631L;
@Id
@ApiModelProperty(value = "id")
private String id;// 流水号
/**
* 车牌号码
*/
@ESearchTypeColumn
@Routing
@ApiModelProperty(value = "车牌号码")
private String truckNumber;
/**
* 停车时长
*/
@ESearchTypeColumn
@ApiModelProperty(value = "停车时长")
private Long parkingTime;
/**
* 行驶状态(0-离线 1-开火静止 2-停车静止 3-行驶中)
*/
@ESearchTypeColumn
@ApiModelProperty(value = "行驶状态(0-离线 1-开火静止 2-停车静止 3-行驶中)")
private Integer travelStatus;
/**
*数据来源
*/
@ESearchTypeColumn
@ApiModelProperty(value = "数据来源")
private Integer sourceType;
/**
*acc状态
*/
@ESearchTypeColumn
@ApiModelProperty(value = "acc状态")
private Integer accStatus;
/**
*里程(KM)
*/
@ESearchTypeColumn
@ApiModelProperty(value = "里程(KM)")
private BigDecimal mileage;
/**
*总里程(KM)
*/
@ESearchTypeColumn
@ApiModelProperty(value = "总里程(KM)")
private BigDecimal mileageSum;
/**
* 方向
*/
@ESearchTypeColumn
@ApiModelProperty(value = "方向")
private String direction;
/**
*速度(KM/h)
*/
@ESearchTypeColumn
@ApiModelProperty(value = "速度(KM/h)")
private BigDecimal speed;
/**
*省
*/
@ESearchTypeColumn
@ApiModelProperty(value = "省code")
private String provinceCode;
/**
*市
*/
@ESearchTypeColumn
@ApiModelProperty(value = "市code")
private String cityCode;
/**
*区
*/
@ESearchTypeColumn
@ApiModelProperty(value = "区code")
private String areaCode;
/**
*省
*/
@ESearchTypeColumn
@ApiModelProperty(value = "省")
private String provinceName;
/**
*市
*/
@ESearchTypeColumn
@ApiModelProperty(value = "市")
private String cityName;
/**
*区
*/
@ESearchTypeColumn
@ApiModelProperty(value = "区")
private String areaName;
/**
*位置详情
*/
@ESearchTypeColumn
@ApiModelProperty(value = "位置详情")
private String locationDetails;
/**
*GPS地图纬度
*/
@ESearchTypeColumn
@ApiModelProperty(value = "GPS地图纬度")
private double latitude;
/**
*GPS地图经度
*/
@ESearchTypeColumn
@ApiModelProperty(value = "GPS地图经度")
private double longitude;
/**
* 坐标位置 [ -73.983, 40.719 ]
*/
@ESearchTypeColumn
@ApiModelProperty(value = "坐标位置")
private GeoPoint location;
/**
*是否删除(0-已删除,1-正常)
*/
@ESearchTypeColumn
@ApiModelProperty(value = "是否删除(0-已删除,1-正常)")
private Integer enable;
/**
*创建人
*/
@ESearchTypeColumn
@ApiModelProperty(value = "创建人")
private String createUserNo;
/**
*创建时间
*/
@IndexPartitionTime
@ESearchTypeColumn
@ApiModelProperty(value = "创建时间")
private Date createTime;
/**
*修改人
*/
@ESearchTypeColumn
@ApiModelProperty(value = "修改人")
private String updateUserNo;
/**
*修改时间
*/
@ESearchTypeColumn
@ApiModelProperty(value = "修改时间")
private Date updateTime;
}
GeoPoint为es地址位置坐标
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
*
* lon:经度 lat:纬度
* 字符串形式以半角逗号分割,如 "lat,lon" 。
*
*
* 对象形式显式命名为 lat 和 lon 。
*
*
* 数组形式表示为 [lon,lat] 。
*/
@Data
public class GeoPoint {
/**
* 纬度
*/
@ApiModelProperty(value = "纬度")
private double lat;
/**
* 经度
*/
@ApiModelProperty(value = "经度")
private double lon;
public GeoPoint(double lat, double lon) {
this.lat = lat;
this.lon = lon;
}
}
2.相关文档配置获取解决方法定义
ElasticsearchDocumentConfig
@Configurable
public class ElasticsearchDocumentConfig {
/**
* 索引名称
*/
private String indexName;
/**
* 索引类型
*/
private String type;
/**
* 分片数
*/
private int shardNum;
/**
* 副本数
*/
private int replicaNum;
/**
* 主键ID
*/
private String id;
/**
* 主键属性名称
*/
private String idPropertiesName;
/**
* 路由值
*/
private String routing;
/**
* 版本号
*/
private Long version;
/**
* 分区类型
*/
private PARTITION_TYPE partitionType;
/**
* 分区时间
*/
private Date partitionTime;
/**
* 别名
*/
private String aliasName;
/**
* @return indexName
*/
public String getIndexName() {
// 通过配置分区时间和分区类型,获取索引名称
if (partitionTime != null && partitionType != PARTITION_TYPE.NULL) {
return indexName + ElasticsearchDocumentResolver.getPartitionIndexSuffix(partitionTime, partitionType);
}
return indexName;
}
/**
* @param indexName
* the indexName to set
*/
public void setIndexName(String indexName) {
this.indexName = indexName;
}
/**
* @return type
*/
public String getType() {
return type;
}
/**
* @param type
* the type to set
*/
public void setType(String type) {
this.type = type;
}
/**
* @return shardNum
*/
public int getShardNum() {
return shardNum;
}
/**
* @param shardNum
* the shardNum to set
*/
public void setShardNum(int shardNum) {
this.shardNum = shardNum;
}
/**
* @return replicaNum
*/
public int getReplicaNum() {
return replicaNum;
}
/**
* @param replicaNum
* the replicaNum to set
*/
public void setReplicaNum(int replicaNum) {
this.replicaNum = replicaNum;
}
/**
* @return id
*/
public String getId() {
return id;
}
/**
* @param id
* the id to set
*/
public void setId(String id) {
this.id = id;
}
/**
* @return idPropertiesName
*/
public String getIdPropertiesName() {
return idPropertiesName;
}
/**
* @param idPropertiesName
* the idPropertiesName to set
*/
public void setIdPropertiesName(String idPropertiesName) {
this.idPropertiesName = idPropertiesName;
}
/**
* @return routing
*/
public String getRouting() {
return routing;
}
/**
* @param routing
* the routing to set
*/
public void setRouting(String routing) {
this.routing = routing;
}
/**
* @return version
*/
public Long getVersion() {
return version;
}
/**
* @param version
* the version to set
*/
public void setVersion(Long version) {
this.version = version;
}
/**
* @return partitionType
*/
public PARTITION_TYPE getPartitionType() {
return partitionType;
}
/**
* @param partitionType
* the partitionType to set
*/
public void setPartitionType(PARTITION_TYPE partitionType) {
this.partitionType = partitionType;
}
/**
* @return partitionTime
*/
public Date getPartitionTime() {
return partitionTime;
}
/**
* @param partitionTime
* the partitionTime to set
*/
public void setPartitionTime(Date partitionTime) {
this.partitionTime = partitionTime;
}
/**
* @Description: 计算文档存放分片ID
* 路由值
* 分片数
* @author raowenbin
* @date 2019年3月18日 下午5:09:23
*/
public String getShardId() {
if (StringUtil.isEmpty(this.routing)) {
return null;
}
// 取值在hash之后再与有多少个shared的数量取模
return String.valueOf(this.routing.hashCode() % this.shardNum);
}
public String getAliasName() {
return this.indexName;
}
}
ElasticsearchDocumentResolver
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.jianan.gis.info.es.core.ElasticsearchDocumentConfig;
import com.jianan.gis.info.es.document.AbstractElasticsearchDocument;
import com.jianan.gis.info.es.support.*;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.beans.PropertyDescriptor;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.*;
public class ElasticsearchDocumentResolver {
/**
* 获取文档配置
* @param T 文档类型
* @return 配置项
*/
public static ElasticsearchDocumentConfig getDocumentConfig(AbstractElasticsearchDocument T) {
// 构造文档配置项
ElasticsearchDocumentConfig docConfig = new ElasticsearchDocumentConfig();
// 此处要用反射将字段中的注解解析出来
Class<? extends AbstractElasticsearchDocument> clz = T.getClass();
// 判断类上是否有次注解
boolean clzHasAnno = clz.isAnnotationPresent(Document.class);
if (clzHasAnno) {
// 获取类上的注解
Document annotation = clz.getAnnotation(Document.class);
if (annotation != null) {
// 设置索引配置
docConfig.setIndexName(annotation.index());
docConfig.setType(annotation.type());
docConfig.setShardNum(annotation.shardNum());
docConfig.setReplicaNum(annotation.replicaNum());
docConfig.setPartitionType(annotation.indexPartition());
}
}
// 设置ID
Object id = getAnnotationVaue(T, docConfig, Id.class);
if (id != null) {
docConfig.setId(id.toString());
}
// 设置路由值
Object routing = getAnnotationVaue(T, docConfig, Routing.class);
if (routing != null) {
docConfig.setRouting(routing.toString());
}
// 设置版本
Object version = getAnnotationVaue(T, docConfig, Version.class);
if (version != null) {
docConfig.setVersion(Long.valueOf(version.toString()));
}
// 设置分区时间
Object partitionTime = getAnnotationVaue(T, docConfig, IndexPartitionTime.class);
if (partitionTime != null) {
docConfig.setPartitionTime((Date) partitionTime);
}
// 返回值
return docConfig;
}
/**
* 获取文档配置索引和类型
* @param clazz
* 文档类型
* @return 配置项
*/
public static <T extends AbstractElasticsearchDocument> ElasticsearchDocumentConfig getDocumentConfig(
Class<T> clazz) {
// 构造文档配置项
ElasticsearchDocumentConfig config = new ElasticsearchDocumentConfig();
try {
// 获取索引配置
config = getDocumentConfig(clazz.newInstance());
return config;
} catch (Exception e) {
// 处理异常返回
return config;
}
}
/**
* @Description: 获取Annotation注解对应值
* @param object
* 对象
* @param annoClz
* annotation类型
* @return
* @author raowenbin
* @param docConfig
* @date 2019年3月18日 下午6:44:04
*/
public static Object getAnnotationVaue(Object object, ElasticsearchDocumentConfig docConfig,
Class<? extends Annotation> annoClz) {
// 获取字段
Field[] fields = object.getClass().getDeclaredFields();
for (Field field : fields) {
// 字段是否包含注解
if (field.isAnnotationPresent(annoClz)) {
// 字段名称
String fieldName = field.getName();
if (annoClz.equals(Id.class)) {
docConfig.setIdPropertiesName(fieldName);
}
char[] chars = fieldName.toCharArray();
chars[0] -= 32;
try {
// 反射获取字段对应值
Method method = object.getClass().getMethod("get" + String.valueOf(chars), null);
return method.invoke(object);
} catch (Exception e) {
return null;
}
}
}
return null;
}
public static XContentBuilder getXContentBuilderKeyValue(Object o) {
try {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
List<Field> fieldList = new ArrayList<Field>();
@SuppressWarnings("rawtypes")
Class tempClass = o.getClass();
while (tempClass != null) {// 当父类为null的时候说明到达了最上层的父类(Object类).
fieldList.addAll(Arrays.asList(tempClass.getDeclaredFields()));
tempClass = tempClass.getSuperclass();// 得到父类,然后赋给自己
}
for (Field field : fieldList) {
if (field.isAnnotationPresent(ESearchTypeColumn.class)) {
PropertyDescriptor ordescriptor = new PropertyDescriptor(field.getName(), o.getClass());
Object value = ordescriptor.getReadMethod().invoke(o);
if (value != null) {
builder.field(field.getName(), value.toString());
}
}
}
builder.endObject();
return builder;
} catch (Exception e) {
// log.error("获取object key-value失败,{}", e.getMessage());
}
return null;
}
/**
* Object转换成json字符串
*/
public static String toJsonString(Object obj, SerializerFeature seralizerClass) {
// 对象为空不转换
if (obj == null) {
return "null";
}
// 判定为空字符串时,返回空
if (obj instanceof String && StringUtils.isBlank((String) obj)) {
return "";
}
// Object转换为Json字符串
try {
return JSON.toJSONString(obj, seralizerClass);
} catch (Exception e) {
return "null";
}
}
/**
* @Description: 计算分区索引后缀
* @return
*/
public static String getPartitionIndexSuffix(Date partitionTime, Document.PARTITION_TYPE partitionType) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(partitionTime);
int year = calendar.get(Calendar.YEAR);
int month = calendar.get(Calendar.MONTH) + 1;
int day = calendar.get(Calendar.DAY_OF_MONTH);
// 计算周数,默认1号至7号算第一周 最大支持4周
int week = (day - 1) / 7 + 1 > 4 ? 4 : (day - 1) / 7 + 1;
StringBuffer buffer = new StringBuffer().append('-');
switch (partitionType) {
case NULL:
return "";
case YEAR:
buffer.append(year);
break;
case MONTH:
buffer.append(year);
buffer.append(String.format("%02d", month));
break;
case WEEK:
buffer.append(year);
buffer.append(String.format("%02d", month));
buffer.append(String.format("%02d", week));
break;
case DAY:
buffer.append(year);
buffer.append(String.format("%02d", month));
buffer.append(String.format("%02d", day));
break;
default:
return "";
}
return buffer.toString();
}
}
3.定义基础dao接口
import com.fasterxml.jackson.core.JsonProcessingException;
import com.jianan.gis.info.es.document.AbstractElasticsearchDocument;
import org.elasticsearch.index.query.QueryBuilder;
import java.io.IOException;
import java.util.List;
/**
* ES DAO接口
* @param <DOCUMENT>
*/
public interface ElasticsearchRepository <DOCUMENT extends AbstractElasticsearchDocument> {
/**
* 保存文档
* @param D
* @return
*/
String saveForObject(DOCUMENT D);
}
4.添加ES基础操作默认实现
@Repository
public class DefaultRestElasticsearchRepository<DOCUMENT extends AbstractElasticsearchDocument>
implements ElasticsearchRepository<DOCUMENT> {
@Autowired
protected RestHighLevelClient restHighLevelClient;
@Override
public String saveForObject(DOCUMENT document, boolean replaceFlag) {
// 获取文档配置
ElasticsearchDocumentConfig config = ElasticsearchDocumentResolver.getDocumentConfig(document);
// LOG.info("####打印对象信息:{}、打印索引信息:{}", FastJsonUtils.toJsonString(document),FastJsonUtils.toJsonString(config));
// 必须传入ID,分区索引必须传分区时间
if (StringUtil.isEmpty(config.getId()) || (config.getPartitionType() != null
&& config.getPartitionType() != Document.PARTITION_TYPE.NULL && config.getPartitionTime() == null)) {
//LOG.warn("创建文档传入参数异常,参数内容:" + FastJsonUtils.toJsonString(document));
return null;
}
String source = JSON.toJSONString(document);
// 构建索引请求参数
try {
String indexName = config.getIndexName();
IndexRequest request = new IndexRequest();
request.index(indexName);
request.type(config.getType());
request.id(config.getId());
request.source(source, XContentType.JSON);
if (replaceFlag){
request.opType(DocWriteRequest.OpType.UPDATE);
}else {
request.opType(DocWriteRequest.OpType.CREATE);
}
IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
// 发起保存文档请求
return response.getId();
} catch (IOException e) {
// 版本冲突控制
throw new ElasticsearchException(e);
}
}
}
调用保存
注入方法内调用即可
@Autowired
private GisLocationRestElasticSearchRepository gisLocationElasticSearchRepository;
GisLocationDocument gisLocationDocument=new GisLocationDocument();
// String truckNumber="沪A"+String.format("%04d",new Random().nextInt(9999));
gisLocationDocument.setId(IdPrefixGenerateUtil.getId(truckNumber));
gisLocationDocument.setTruckNumber(truckNumber);
gisLocationDocument.setParkingTime(parkingTime);
gisLocationDocument.setTravelStatus(travelStatus);
gisLocationDocument.setLocation(new GeoPoint(latitude,longitude));
gisLocationDocument.setLatitude(latitude);
gisLocationDocument.setLongitude(longitude);
gisLocationDocument.setCreateTime(new Date());
gisLocationElasticSearchRepository.saveForObject(gisLocationDocument);
kibana查看索引。
查询接口 由于保存的无法知道具体的月份索引
索引统一使用模版定义的别名去查询 如果模版未定义 查询可能提示找不到索引异常
/** * 根据车牌获取最新车辆位置信息 * @param trackNumber * @return */ public GisCarLocationVO getCarLocationByTrackNumber(String trackNumber){ // 获取文档配置 这里获取的是注解的值,即为 gis-location ElasticsearchDocumentConfig config = ElasticsearchDocumentResolver.getDocumentConfig(GisLocationDocument.class); String indexName = config.getIndexName(); SearchRequest searchRequest = new SearchRequest(config.getIndexName()); searchRequest.types(config.getType()); //创建一个要搜索的索引库 SearchSourceBuilder searchRequestBuilder = new SearchSourceBuilder(); //创建组合查询 BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); boolQueryBuilder.should(QueryBuilders.matchQuery("truckNumber",trackNumber)); //设置查询的类型 // searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH); searchRequestBuilder.query(boolQueryBuilder); //分页 searchRequestBuilder.from(0); searchRequestBuilder.size(1); searchRequestBuilder.sort("createTime", SortOrder.DESC); searchRequest.source(searchRequestBuilder); SearchResponse searchResponse =null; try { searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); return null; } SearchHits hits = searchResponse.getHits(); //获取Hits中json对象数据 SearchHit[] hits1 = hits.getHits(); for (int i=0;i<hits1.length;i++){ //获取Map对象 String sourceAsMap = hits1[i].getSourceAsString(); if (StringUtil.isNotBlank(sourceAsMap)) { GisCarLocationVO gisCarLocationVO= FastJsonUtils.toObject(sourceAsMap, GisCarLocationVO.class); return gisCarLocationVO; } } return null; }
即可查询。