Elasticsearch概念理解及工作使用总结

  • Post author:
  • Post category:其他


使用命令

启动

bin/elasticsearch

bin/elasticsearch -Ehttp.port=8200 -Epath.data=node2 指定端口号 制定data2

bin/elasticsearch -Ehttp.port=7200 -Epath.data=node3 指定端口号 制定data3

查看集群

http://localhost:9200/_cat/nodes?v

查看集群详情

http://localhost:9200/_cluster/stats



elasticsearch术语

Index索引 :相同属性的文档集合

Type索引中的数据类型:索引可以定义一个或者多个类型,文档必须属于一个类型

Document:文档数据 被索引的基本数据单位

Field字段 文档属性 现在只有_doc

  • linear : 线性函数是条直线,一旦直线与横轴0香蕉,所有其他值的评分都是0

  • text

    :被分析索引的字符串类型

  • keyword

    :不能被分析只能被精确匹配的字符串类型

  • date

    :日期时间类型 可以配合format一起使用
  • 数字类型:long,integer,short,double等
  • boolean**类型:true,false

  • array

    类型:[“one”,“two”]等

  • object

    类型:json嵌套{“property1”:“value1”,“property2”:“value2”}

  • ip

    :ip类型

  • geo_point

    :地理位置类型

分片:每个索引都有多个分片,每个分片是一个Lucene索引,通过 lucene,我们可以将已有的数据建立索引,lucene 会在本地磁盘上面,给我们组织索引的数据结构

倒排索引:倒排索引就是关键词到文档 ID 的映射,每个关键词都对应着一系列的文件,这些文件中都出现了关键词

倒排索引项主要包含如下信息:

1.文档id用于获取原始信息

2.单词频率(TF,Term Frequency),记录该单词在该文档中出现的次数,用于后续相关性算分

3.位置(Posting),记录单词在文档中的分词位置(多个),用于做词语搜索(Phrase Query)

4.偏移(Offset),记录单词在文档的开始和结束位置,用于高亮显示

倒排索引、正排索引的区别:

倒排和正排的区别在于是通过id找内容还是通过内容找id, 分片和备份的区别在于主库和备库的区别。

Translog:translog是用来恢复数据的。



es读写原理

es 写数据过程:

客户端选择一个coordinating node 发送请求过去,对 document 进行路由,将请求转发给对应的 node,primary node和所有 replica node写完之后,响应结果给客户端

es 读数据过程:

通过

doc id

来查询,会根据

doc id

进行 hash,判断出来当时把

doc id

分配到了哪个 shard 上面去。客户端发送请求到任意一个 node,成为

coordinate node



round-robin

随机轮询算法,在

primary shard

以及其所有 replica 中随机选择一个,让读请求负载均衡

es搜索:

客户端发送请求到一个

coordinate node

。协调节点将搜索请求转发到所有的 shard 对应的

primary shard



replica shard

,每个 shard 将自己的搜索出的

doc id

返回给协调节点,由协调节点进行数据的合并、排序、分页等操作,产出最终结果。由协调节点根据

doc id

去各个节点上拉取实际的

document

数据,最终返回给客户端。



数据清洗

1、根据人、商品、订单等宽表,在es中创建对应属性的索引

2、用编码语言连接数据库,执行对应索引条件的sql,将元数据归类

3、数据归类时,需要对数据执行清洗,如果使用分词器(analyzer、ik中文分词器)、模糊查询、范围查询、分片

4、我们使用java jar包查询对应es数据时,会通过批量查询在 mysql等关系型数据库中取出真正的数据



数据分层

1、APP:数据大盘

2、DW:数据宽表join

DWS:数据集市(宽表),高度汇总数据

DWM:轻度汇总数据

DWD:原始数据粒度

ODS:营销数据、订单数据、商品数据



ETL-数据整合

ETL是将业务系统的数据经过抽取(Extract)、清洗转换(Transform)之后加载(Load)到数据仓库的过程

性别指标:根据性别实现人数查询

注册渠道、是否关注指标

热度指标:点击排名前十

环比指标实现:每月30天的各天开通人数比率

提醒指标实现:查询对应券过期时应该提醒的人数



索引



索引操作

es分片: number_of_shards:数据分片数和 number_of_replicas数据备份数

//指定id创建索引
PUT /employee/_doc/1
//指定id查询所以
GET/employee/_doc/1
//删除索引文档
delete/employee/_doc/1
//结构化索引
PUT/employee/{"settings":{},"mappings":{}}

//查询	match 查询支持开箱即用的模糊匹配		
GET/*/* {"query":{ "match": {"name":"ES"} }}

// Bool查询		must等于 并 ,should等于 与
GET /movie/_search { "query":{ "bool": { "should": [ { "match": { "title":"basketball with cartoom aliens"}}, { "match": { "overview":"basketball with cartoom aliens"}} ] } } }

//filter过滤查询
GET /movie/_search { "query":{ "bool":{ "filter":{ "term":{"title":"steve"} } } } }

ES的写入调优:

采用批处理的bulk写入,减少副本数。待完全写入后恢复副本数



分词器

analyzer api分词器默认分词器、

IK

中文分词器

analyze分析——》字符标点过滤器 ——〉字符处理器 ——》分词处理

English analyze分析 ——〉处理量词、符号——》字符处理器 ——〉分词处理

//标准分词
GET _analyze?pretty { "analyzer": "standard", "text":"中华人名共和国国歌" }
//最大化分词  索引时使用
GET _analyze?pretty { "analyzer": "ik_max_word", "text":"中华人名共和国国歌" }
//智能分词		查询使用
GET _analyze?pretty { "analyzer": "ik_smart", "text":"中华人名共和国国歌" }

定制化分词
//同义词
重建索引fiter过滤



距离索引

//距离分词
GET /shop/_search { "query":{ "match": {"name":"凯悦"} },"_source": "*", "script_fields":{ "distance":{ "script":{ "source":"haversin(lat, lon, doc['location'].lat, doc['location'].lon)", "lang":"expression", "params":{"lat":31.37,"lon":127.12} } } } }

//距离排序
"sort": [ { "_geo_distance": { "location": { "lat": 31.37, "lon": 127.12 },"order": "desc", "unit": "km", "distance_type": "arc" } } ]



function_score:距离索引分数计算

衰减函数 linear、exp、gauss

  • linear : 线性函数是条直线,一旦直线与横轴0香蕉,所有其他值的评分都是0
  • exp : 指数函数是先剧烈衰减然后变缓
  • guass(最常用) : 高斯函数则是钟形的,他的衰减速率是先缓慢,然后变快,最后又放缓



Spark Java操作

				连接到ES
SparkConf conf = new SparkConf().setAppName("hot word").setMaster("local[2]");`
//spark新增数据
JavaEsSpark.saveToEs(userJavaRDD,"/user/_doc");
//spark查询数据
JavaPairRDD<String, Map<String, Object>> pairRDD = JavaEsSpark.esRDD(jsc, "/user/_doc", query);

//创建RDD,全称Resilient Distributed Datasets(弹性分布式数据集),是Spark对数据的抽象。
SparkConf conf = new SparkConf().setAppName("hot word").setMaster("local[2]");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> linesRdd = jsc.textFile("hdfs://namenode:8020/SogouQ.sample.txt");

//mapToPair()   RDD算子创建键值对 
JavaPairRDD<String, Integer> pairRDD = linesRdd.mapToPair(new PairFunction<String, String, Integer>() {
    // string -- (string,integer)
    @Override
    //重写call()接口,Tuple2 生成键值对
    public Tuple2<String, Integer> call(String s) throws Exception {
        String word = s.split("\t")[2];
        return new Tuple2<>(word, 1);
    }
});

//reduceByKey()  RDD算子相同key合并汇聚
JavaPairRDD<String, Integer> wordCount = listRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {}

//swap()		交换元素位置
//top(10)			返回RDD中的最大的k个元素
//take(10)		获取RDD中的前num个元素
//takeOrdered()				RDD中最小的k个元素,其中排序规则用户可以自定义ord。

//操作行记录
Row(rank_click(0).toInt, rank_click(1).toInt)

Scala

   //用\t分隔符取第三个元素
   val hitCount = lines.map(x => x.split("\t")(3))
   //让空格左边的元素和右边的元素相等
      .map(word => word.split(" ")(0).equals(word.split(" ")(1)))
      //取出if条件=ture的所有计数
      .filter(x => x == true).count()
      
Row(rank_click(0).toInt, rank_click(1).toInt)
      
//Dataset			用数据集填充DateSet
Dataset<Row> dataset = session.sql("select member_channel as memberChannel, count(id) as channelCount " +
                " from i_member.t_member group by member_channel");
                
//SparkSession			sql查询
    val ss = SparkSession.builder().getOrCreate()
    val df = ss.createDataFrame(row,structType)
    df.createOrReplaceTempView("tb")
    val re = df.sqlContext.sql("select count(if(t.rank=t.click,1,null)) as hit" +
      ", count(1) as total from tb as t ")
    re.show()



QureyDSL

QueryDSL 是基于各种 ORM 框架以及 Sql 之上的一个通用的查询框架,QueryDSL 的查询,类是于 SQL 查询



RestHighLevelClient

客户端组件RestHighLevelClient的使用,其封装了操作es的crud方法,底层原理就是模拟各种es需要的请求,如put,delete,get等方式;本篇主要分享常用查询,希望能给大家带来好的帮助;

分页查询、条件查询、超时设置、排序、指定返回列、模拟一个post获取es数据

SearchRequest

BoolQueryBuilder

数据血缘,元信息

LogicalPlan



canal

mysql -> bingo -> canal client -> Kafka ->TMP HDFS -> HDFS ->

利用主从备份机制,模仿一台slave机器,读取二进制文件binlog实时监控数据,然后利用管道返回结果

java接入CanalConnector,来连接canal,CanalEntry.RowChange,监控行变化



ES调优

机器调优

1、公司 es 的集群架构,索引数据大小,分片有多少,以及一些调优手段

比如:ES 集群架构 13 个节点,索引根据通道不同共 20+索引,根据日期,每日递增 20+,索引:10 分片,每日递增 1 亿+数据,

每个通道每天索引大小控制:150GB 之内。

2、部署调优,业务调优

(1)关闭缓存 swap;

(2)堆内存设置为:Min(节点内存/2, 32GB);

(3)设置最大文件句柄数;

(4)线程池+队列大小根据业务需要做调整;

(5)磁盘存储 raid 方式——存储有条件使用 RAID10,增加单节点性能以及避免单节点存储故障。

索引层面调优

1.1、设计阶段调优

(1)根据业务增量需求,采取基于日期模板创建索引,通过 roll over API 滚动索引;

(2)使用别名进行索引管理;

(3) 每天凌晨定时对索引做 force_merge 操作,以释放空间;

(4)采取冷热分离机制,热数据存储到 SSD,提高检索效率;冷数据定期进行 shrink操作,以缩减存储;

(5)采取 curator 进行索引的生命周期管理;

(5) 仅针对需要分词的字段,合理的设置分词器;

(6)Mapping 阶段充分结合各个字段的属性,是否需要检索、是否需要存储等。………

1.2、写入调优

(1)写入前副本数设置为 0;

(2)写入前关闭 refresh_interval 设置为-1,禁用刷新机制;

(3)写入过程中:采取 bulk 批量写入;

(4)写入后恢复副本数和刷新间隔;

(5)尽量使用自动生成的 id。

1.3、查询调优

(1)禁用 wildcard;

(2)禁用批量 terms(成百上千的场景);

(3)充分利用倒排索引机制,能 keyword 类型尽量 keyword;

(4)数据量大时候,可以先基于时间敲定索引再检索;

(5)设置合理的路由机制。



ES在ABtest案例

题目描述:如何管理AB测试复杂的路由策略

实际:项目中的ABtest,将uid进行hash 取模,路由到A、B方案。在map中过滤,给出对应方案

优化:可以自己开发并设计一个实验平台,存放试验标签,例如标签A代表用userId区分什么内容,标签B代表用客户端ip区分什么内容,在实时查询的时候根据试验标签去请求实验平台并将用户id,ip传过去,做逻辑解耦,也方便调整,避免所有的代码都写在搜索服务业务内



机器学习ALS、LR、GBDT

千人千面的实现,需要根据场景去推荐,无限逼近用户的真实选择

常用的算法有ALS、LR、GBDT

1、ALS是最小二乘法,他利用矩阵分解的结果无限逼近现有数据,得到隐含特征。就是根据user、商品表的特征加权,与user的加权属性进行匹配。

//过拟合:增大数据规模,减少RANK,增大正则化的系数
ALS als = new ALS().setMaxIter(10).setRank(5).setRegParam(0.01).
        setUserCol("userId").setItemCol("shopId").setRatingCol("rating");
      
//模型训练
ALSModel alsModel = als.fit(trainingData); 

//模型评测
Dataset<Row> predictions = alsModel.transform(testingData);

//rmse 均方根误差,预测值与真实值的偏差的平方除以观测次数,开个根号
RegressionEvaluator evaluator = new RegressionEvaluator().setMetricName("rmse")
                .setLabelCol("rating").setPredictionCol("prediction");
double rmse = evaluator.evaluate(predictions);

晚点补充



版权声明:本文为Leeand原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。