使用命令
启动
bin/elasticsearch
bin/elasticsearch -Ehttp.port=8200 -Epath.data=node2 指定端口号 制定data2
bin/elasticsearch -Ehttp.port=7200 -Epath.data=node3 指定端口号 制定data3
查看集群
http://localhost:9200/_cat/nodes?v
查看集群详情
http://localhost:9200/_cluster/stats
elasticsearch术语
Index索引 :相同属性的文档集合
Type索引中的数据类型:索引可以定义一个或者多个类型,文档必须属于一个类型
Document:文档数据 被索引的基本数据单位
Field字段 文档属性 现在只有_doc
- linear : 线性函数是条直线,一旦直线与横轴0香蕉,所有其他值的评分都是0
-
text
:被分析索引的字符串类型 -
keyword
:不能被分析只能被精确匹配的字符串类型 -
date
:日期时间类型 可以配合format一起使用 - 数字类型:long,integer,short,double等
- boolean**类型:true,false
-
array
类型:[“one”,“two”]等 -
object
类型:json嵌套{“property1”:“value1”,“property2”:“value2”} -
ip
:ip类型 -
geo_point
:地理位置类型
分片:每个索引都有多个分片,每个分片是一个Lucene索引,通过 lucene,我们可以将已有的数据建立索引,lucene 会在本地磁盘上面,给我们组织索引的数据结构
倒排索引:倒排索引就是关键词到文档 ID 的映射,每个关键词都对应着一系列的文件,这些文件中都出现了关键词
倒排索引项主要包含如下信息:
1.文档id用于获取原始信息
2.单词频率(TF,Term Frequency),记录该单词在该文档中出现的次数,用于后续相关性算分
3.位置(Posting),记录单词在文档中的分词位置(多个),用于做词语搜索(Phrase Query)
4.偏移(Offset),记录单词在文档的开始和结束位置,用于高亮显示
倒排索引、正排索引的区别:
倒排和正排的区别在于是通过id找内容还是通过内容找id, 分片和备份的区别在于主库和备库的区别。
Translog:translog是用来恢复数据的。
es读写原理
es 写数据过程:
客户端选择一个coordinating node 发送请求过去,对 document 进行路由,将请求转发给对应的 node,primary node和所有 replica node写完之后,响应结果给客户端
es 读数据过程:
通过
doc id
来查询,会根据
doc id
进行 hash,判断出来当时把
doc id
分配到了哪个 shard 上面去。客户端发送请求到任意一个 node,成为
coordinate node
。
round-robin
随机轮询算法,在
primary shard
以及其所有 replica 中随机选择一个,让读请求负载均衡
es搜索:
客户端发送请求到一个
coordinate node
。协调节点将搜索请求转发到所有的 shard 对应的
primary shard
或
replica shard
,每个 shard 将自己的搜索出的
doc id
返回给协调节点,由协调节点进行数据的合并、排序、分页等操作,产出最终结果。由协调节点根据
doc id
去各个节点上拉取实际的
document
数据,最终返回给客户端。
数据清洗
1、根据人、商品、订单等宽表,在es中创建对应属性的索引
2、用编码语言连接数据库,执行对应索引条件的sql,将元数据归类
3、数据归类时,需要对数据执行清洗,如果使用分词器(analyzer、ik中文分词器)、模糊查询、范围查询、分片
4、我们使用java jar包查询对应es数据时,会通过批量查询在 mysql等关系型数据库中取出真正的数据
数据分层
1、APP:数据大盘
2、DW:数据宽表join
DWS:数据集市(宽表),高度汇总数据
DWM:轻度汇总数据
DWD:原始数据粒度
ODS:营销数据、订单数据、商品数据
ETL-数据整合
ETL是将业务系统的数据经过抽取(Extract)、清洗转换(Transform)之后加载(Load)到数据仓库的过程
性别指标:根据性别实现人数查询
注册渠道、是否关注指标
热度指标:点击排名前十
环比指标实现:每月30天的各天开通人数比率
提醒指标实现:查询对应券过期时应该提醒的人数
索引
索引操作
es分片: number_of_shards:数据分片数和 number_of_replicas数据备份数
//指定id创建索引
PUT /employee/_doc/1
//指定id查询所以
GET/employee/_doc/1
//删除索引文档
delete/employee/_doc/1
//结构化索引
PUT/employee/{"settings":{},"mappings":{}}
//查询 match 查询支持开箱即用的模糊匹配
GET/*/* {"query":{ "match": {"name":"ES"} }}
// Bool查询 must等于 并 ,should等于 与
GET /movie/_search { "query":{ "bool": { "should": [ { "match": { "title":"basketball with cartoom aliens"}}, { "match": { "overview":"basketball with cartoom aliens"}} ] } } }
//filter过滤查询
GET /movie/_search { "query":{ "bool":{ "filter":{ "term":{"title":"steve"} } } } }
ES的写入调优:
采用批处理的bulk写入,减少副本数。待完全写入后恢复副本数
分词器
analyzer api分词器默认分词器、
IK
中文分词器
analyze分析——》字符标点过滤器 ——〉字符处理器 ——》分词处理
English analyze分析 ——〉处理量词、符号——》字符处理器 ——〉分词处理
//标准分词
GET _analyze?pretty { "analyzer": "standard", "text":"中华人名共和国国歌" }
//最大化分词 索引时使用
GET _analyze?pretty { "analyzer": "ik_max_word", "text":"中华人名共和国国歌" }
//智能分词 查询使用
GET _analyze?pretty { "analyzer": "ik_smart", "text":"中华人名共和国国歌" }
定制化分词
//同义词
重建索引fiter过滤
距离索引
//距离分词
GET /shop/_search { "query":{ "match": {"name":"凯悦"} },"_source": "*", "script_fields":{ "distance":{ "script":{ "source":"haversin(lat, lon, doc['location'].lat, doc['location'].lon)", "lang":"expression", "params":{"lat":31.37,"lon":127.12} } } } }
//距离排序
"sort": [ { "_geo_distance": { "location": { "lat": 31.37, "lon": 127.12 },"order": "desc", "unit": "km", "distance_type": "arc" } } ]
function_score:距离索引分数计算
衰减函数 linear、exp、gauss
- linear : 线性函数是条直线,一旦直线与横轴0香蕉,所有其他值的评分都是0
- exp : 指数函数是先剧烈衰减然后变缓
- guass(最常用) : 高斯函数则是钟形的,他的衰减速率是先缓慢,然后变快,最后又放缓
Spark Java操作
连接到ES
SparkConf conf = new SparkConf().setAppName("hot word").setMaster("local[2]");`
//spark新增数据
JavaEsSpark.saveToEs(userJavaRDD,"/user/_doc");
//spark查询数据
JavaPairRDD<String, Map<String, Object>> pairRDD = JavaEsSpark.esRDD(jsc, "/user/_doc", query);
//创建RDD,全称Resilient Distributed Datasets(弹性分布式数据集),是Spark对数据的抽象。
SparkConf conf = new SparkConf().setAppName("hot word").setMaster("local[2]");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> linesRdd = jsc.textFile("hdfs://namenode:8020/SogouQ.sample.txt");
//mapToPair() RDD算子创建键值对
JavaPairRDD<String, Integer> pairRDD = linesRdd.mapToPair(new PairFunction<String, String, Integer>() {
// string -- (string,integer)
@Override
//重写call()接口,Tuple2 生成键值对
public Tuple2<String, Integer> call(String s) throws Exception {
String word = s.split("\t")[2];
return new Tuple2<>(word, 1);
}
});
//reduceByKey() RDD算子相同key合并汇聚
JavaPairRDD<String, Integer> wordCount = listRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {}
//swap() 交换元素位置
//top(10) 返回RDD中的最大的k个元素
//take(10) 获取RDD中的前num个元素
//takeOrdered() RDD中最小的k个元素,其中排序规则用户可以自定义ord。
//操作行记录
Row(rank_click(0).toInt, rank_click(1).toInt)
Scala
//用\t分隔符取第三个元素
val hitCount = lines.map(x => x.split("\t")(3))
//让空格左边的元素和右边的元素相等
.map(word => word.split(" ")(0).equals(word.split(" ")(1)))
//取出if条件=ture的所有计数
.filter(x => x == true).count()
Row(rank_click(0).toInt, rank_click(1).toInt)
//Dataset 用数据集填充DateSet
Dataset<Row> dataset = session.sql("select member_channel as memberChannel, count(id) as channelCount " +
" from i_member.t_member group by member_channel");
//SparkSession sql查询
val ss = SparkSession.builder().getOrCreate()
val df = ss.createDataFrame(row,structType)
df.createOrReplaceTempView("tb")
val re = df.sqlContext.sql("select count(if(t.rank=t.click,1,null)) as hit" +
", count(1) as total from tb as t ")
re.show()
QureyDSL
QueryDSL 是基于各种 ORM 框架以及 Sql 之上的一个通用的查询框架,QueryDSL 的查询,类是于 SQL 查询
RestHighLevelClient
客户端组件RestHighLevelClient的使用,其封装了操作es的crud方法,底层原理就是模拟各种es需要的请求,如put,delete,get等方式;本篇主要分享常用查询,希望能给大家带来好的帮助;
分页查询、条件查询、超时设置、排序、指定返回列、模拟一个post获取es数据
SearchRequest
BoolQueryBuilder
数据血缘,元信息
LogicalPlan
canal
mysql -> bingo -> canal client -> Kafka ->TMP HDFS -> HDFS ->
利用主从备份机制,模仿一台slave机器,读取二进制文件binlog实时监控数据,然后利用管道返回结果
java接入CanalConnector,来连接canal,CanalEntry.RowChange,监控行变化
ES调优
机器调优
1、公司 es 的集群架构,索引数据大小,分片有多少,以及一些调优手段
比如:ES 集群架构 13 个节点,索引根据通道不同共 20+索引,根据日期,每日递增 20+,索引:10 分片,每日递增 1 亿+数据,
每个通道每天索引大小控制:150GB 之内。
2、部署调优,业务调优
(1)关闭缓存 swap;
(2)堆内存设置为:Min(节点内存/2, 32GB);
(3)设置最大文件句柄数;
(4)线程池+队列大小根据业务需要做调整;
(5)磁盘存储 raid 方式——存储有条件使用 RAID10,增加单节点性能以及避免单节点存储故障。
索引层面调优
1.1、设计阶段调优
(1)根据业务增量需求,采取基于日期模板创建索引,通过 roll over API 滚动索引;
(2)使用别名进行索引管理;
(3) 每天凌晨定时对索引做 force_merge 操作,以释放空间;
(4)采取冷热分离机制,热数据存储到 SSD,提高检索效率;冷数据定期进行 shrink操作,以缩减存储;
(5)采取 curator 进行索引的生命周期管理;
(5) 仅针对需要分词的字段,合理的设置分词器;
(6)Mapping 阶段充分结合各个字段的属性,是否需要检索、是否需要存储等。………
1.2、写入调优
(1)写入前副本数设置为 0;
(2)写入前关闭 refresh_interval 设置为-1,禁用刷新机制;
(3)写入过程中:采取 bulk 批量写入;
(4)写入后恢复副本数和刷新间隔;
(5)尽量使用自动生成的 id。
1.3、查询调优
(1)禁用 wildcard;
(2)禁用批量 terms(成百上千的场景);
(3)充分利用倒排索引机制,能 keyword 类型尽量 keyword;
(4)数据量大时候,可以先基于时间敲定索引再检索;
(5)设置合理的路由机制。
ES在ABtest案例
题目描述:如何管理AB测试复杂的路由策略
实际:项目中的ABtest,将uid进行hash 取模,路由到A、B方案。在map中过滤,给出对应方案
优化:可以自己开发并设计一个实验平台,存放试验标签,例如标签A代表用userId区分什么内容,标签B代表用客户端ip区分什么内容,在实时查询的时候根据试验标签去请求实验平台并将用户id,ip传过去,做逻辑解耦,也方便调整,避免所有的代码都写在搜索服务业务内
机器学习ALS、LR、GBDT
千人千面的实现,需要根据场景去推荐,无限逼近用户的真实选择
常用的算法有ALS、LR、GBDT
1、ALS是最小二乘法,他利用矩阵分解的结果无限逼近现有数据,得到隐含特征。就是根据user、商品表的特征加权,与user的加权属性进行匹配。
//过拟合:增大数据规模,减少RANK,增大正则化的系数
ALS als = new ALS().setMaxIter(10).setRank(5).setRegParam(0.01).
setUserCol("userId").setItemCol("shopId").setRatingCol("rating");
//模型训练
ALSModel alsModel = als.fit(trainingData);
//模型评测
Dataset<Row> predictions = alsModel.transform(testingData);
//rmse 均方根误差,预测值与真实值的偏差的平方除以观测次数,开个根号
RegressionEvaluator evaluator = new RegressionEvaluator().setMetricName("rmse")
.setLabelCol("rating").setPredictionCol("prediction");
double rmse = evaluator.evaluate(predictions);
。
。
。
晚点补充