目录
时序数据库
倒排索引
Elasticsearch简介
Elasticsearch的应用实现搜索
时序数据库
最简单的定义就是数据格式里包含 timestamp 字段的数据。几乎所有的数据都可以打上一个 timestamp 字段。时间序列数据更重要的一个属性是如何去查询它。在查询的时候,对于时间序列我们总是会带上一个时间范围去过滤数据。同时查询的结果里也总是会包含 timestamp 字段。(系列参考
https://www.elastic.co/cn/blog/frame-of-reference-and-roaring-bitmaps
)
想要在在查询阶段做数据的聚合和转换,需要能够支持以下三点:
(1)用索引检索出行号:能够从上亿条数据中快速过滤出几百万的数据。
(2)从主存储按行号加载:能够快速加载这过滤出的几百万条数据到内存里。
(3)分布式计算:能够把这些数据按照 GROUP BY 和 SELECT 的要求计算出最终的结果集。
-
检索:这是搜索引擎最擅长的领域。代表产品是
Lucene
。其核心技术是基于高效率数据结构和算法的倒排索引。 -
加载:这是分析型数据库最擅长的领域。代表产品是
C-store
和
Monetdb
。其核心技术是按列组织的磁盘存储结构。 -
分布式计算:这是大数据计算引擎最擅长的领域。代表产品是
Hadoop
和
spark
。其核心技术是 sharding 和 map/reduce 等等。
Elasticsearch在检索、加载与分布式计算都做得比较好。Elasticsearch 是通过 Lucene 的倒排索引技术实现比关系型数据库更快的过滤。特别是它对多条件的过滤支持非常好。
Elasticsearch 现在的主要目标市场已经从
站内搜索
转移到
了监控与日志数据的收集存储和分析
,也就是ELK。Elasticsearch 现在主要的应用场景有三块。站内搜索,主要和 Solr 竞争,属于后起之秀。NoSQL json文档数据库,主要抢占 Mongo 的市场,它在读写性能上优于 Mongo,同时也支持地理位置查询,还方便地理位置和文本混合查询,属于歪打正着。监控,统计以及日志类时间序的数据的存储和分析以及可视化,这方面是引领者。
倒排索引
Mysql 只有 term dictionary 这一层,是以 b-tree 排序的方式存储在磁盘上的。检索一个 term 需要若干次的 random access 的磁盘操作。而 Lucene 在 term dictionary 的基础上添加了 term index 来加速检索,term index 以树的形式缓存在内存中。从 term index 查到对应的 term dictionary 的 block 位置之后,再去磁盘上找 term,大大减少了磁盘的 random access 次数。term index不需要存下所有的term,而仅仅是他们的一些前缀与Term Dictionary的block之间的映射关系,再结合FST(Finite State Transducers)的压缩技术,可以使term index缓存到内存中。从term index查到对应的term dictionary的block位置之后,再去磁盘上找term,大大减少了磁盘随机读的次数。
压缩技巧
:Bitmap是一种数据结构,用0/1表示某个值是否存在,比如10这个值就对应第10位,对应的bit值是1,这样用一个字节就可以代表8个文档id,旧版本(5.0之前)的Lucene就是用这样的方式来压缩的,但这样的压缩方式仍然不够高效,如果有1亿个文档,那么需要12.5MB的存储空间,这仅仅是对应一个索引字段(我们往往会有很多个索引字段)。于是有人想出了Roaring bitmaps这样更高效的数据结构。Bitmap的缺点是存储空间随着文档个数线性增长,Roaring bitmaps需要打破这个魔咒就一定要用到某些指数特性:将posting list按照65535为界限分块,比如第一块所包含的文档id范围在0~65535之间,第二块的id范围是65536~131071,以此类推。再用<商,余数>的组合表示每一组id,这样每组里的id范围都在0~65535内了,剩下的就好办了,既然每组id不会变得无限大,那么我们就可以通过最有效的方式对这里的id存储。
程序员的世界里除了1024外,65535也是一个经典值,因为它=2^16-1,正好是用2个字节能表示的最大数,一个short的存储单位,注意到上图里的最后一行“If a block has more than 4096 values, encode as a bit set, and otherwise as a simple array using 2 bytes per value”,如果是大块,用节省点用bitset存,小块用2个字节一个short[]更方便。那为什么用4096来区分大块还是小块呢?short最多存4096(2*8*1024)个数字,4096 = 65536/8/2, 大于4096只能用bitmap,小于4096没必要做转换,直接short就可以了。磁盘一次寻道可以顺序把一个小块的内容都读出来,再大一位就超过1KB了,需要两次读。
联合索引
查询:
(1)利用跳表(Skip list)的数据结构快速做“与”运算。如果使用跳表,对最短的posting list中的每个id,逐个在另外两个posting list中查找看是否存在,最后得到交集的结果。
(2)利用上面提到的bitset按位“与”。如果使用bitset,就很直观了,直接按位与,得到的结果就是最后的交集。
Elasticsearch简介
一个分布式的、Restful风格(
REST
指的是一组架构约束条件和原则
)的搜索引擎;支持对各种数据类型的数据的检索;搜索速度快,可以提供实时的搜索服务;便于水平扩展,每秒可以处理PB级数据。
搜索数据需要把数据存一份到Elasticsearch中:索引;类型;文档;字段 与 数据库中 库;表;行;列 相对应(6版本后有改变):类型被弱化了,逐渐舍弃(全部用_doc保留);索引就越来越对应的是表。集群、节点、分片、副本; 分布式部署提高整体性能;节点:每一台服务器;分片提高并发能力;副本提高可用性;
使用Elasticsearch
进行索引时需要注意:
(1)不需要索引的字段,一定要明确定义出来,因为默认是自动建索引的。
(2)同样的道理,对于String类型的字段,不需要analysis的也需要明确定义出来,因为默认也是会analysis的。
(3)选择有规律的ID很重要,随机性太大的ID(比如java的UUID)不利于查询。
Elasticsearch的应用实现搜索
Elasticsearch和redis底层都基于Netty,两者在
启动的时候会有冲突
:NettyRuntime类下的availableProcessors方法:如果已设置,就会抛异常。Elasticsearch下的一个包中的类Netty4Utils中setAvailableProcessors方法调用了availableProcessors方法;解决:需要设置属性”es.set.netty.runtime.available.processors”为false,在web应用程序启动的时候设置(在es工作之前);
@SpringBootApplication
public class CommunityApplication {
@PostConstruct
public void init() {
// 解决netty启动冲突问题
// see Netty4Utils.setAvailableProcessors()
System.setProperty("es.set.netty.runtime.available.processors", "false");
}
public static void main(String[] args) {
SpringApplication.run(CommunityApplication.class, args);
}
}
先进行相关配置
1、修改elasticsearch中elasticsearch.yml配置文件:cluster.name:liu
Path.data : C:\Users\liuze\Desktop\MyClassExcisenewProject\logData\elasticsearch\data
Path.logs: C:\Users\liuze\Desktop\MyClassExcisenewProject\logData\elasticsearch\logs
2、配置系统变量/bin
3、安装中文搜索插件:将所有文件必须解压到plugins目录下的ik文件夹;
实现站内搜索功能
1、在实体类中进行es索引映射的
配置
;
2、在dao层增加
接口
(相当于访问es服务器也相当于一个数据库),注:不使用mapper注解,因为mapper注解是myBatis专有的;使用@Repository注解;接口继承于Spring内置的ElasticSearchRepository;
3、搜索
服务
:将帖子保存至Elasticsearch服务器;从Elasticsearch服务器删除数据;从Elasticsearch服务器搜索帖子;
4、使用
消息队列:触发事件、消费事件。
发布帖子时,将帖子异步的提交到Elasticsearch服务器;增加评论时,将帖子异步提交到Elasticsearch服务器;在消费组件中增加一个方法,消费帖子发布事件;
5、显示
结果
:在控制器中处理搜索请求,在HTML上显示搜索结果
DiscussPostRepository
@Repository
public interface DiscussPostRepository extends ElasticsearchRepository<DiscussPost, Integer> {
}
ElasticsearchService
@Service
public class ElasticsearchService {
@Autowired
private DiscussPostRepository discussRepository;
@Autowired
private ElasticsearchTemplate elasticTemplate;
public void saveDiscussPost(DiscussPost post) {
discussRepository.save(post);
}
public void deleteDiscussPost(int id) {
discussRepository.deleteById(id);
}
public Page<DiscussPost> searchDiscussPost(String keyword, int current, int limit) {
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.multiMatchQuery(keyword, "title", "content"))
.withSort(SortBuilders.fieldSort("type").order(SortOrder.DESC))
.withSort(SortBuilders.fieldSort("score").order(SortOrder.DESC))
.withSort(SortBuilders.fieldSort("createTime").order(SortOrder.DESC))
.withPageable(PageRequest.of(current, limit))
.withHighlightFields(
new HighlightBuilder.Field("title").preTags("<em>").postTags("</em>"),
new HighlightBuilder.Field("content").preTags("<em>").postTags("</em>")
).build();
//返回搜索结果
return elasticTemplate.queryForPage(searchQuery, DiscussPost.class, new SearchResultMapper() {
@Override
public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> aClass, Pageable pageable) {
SearchHits hits = response.getHits();
if (hits.getTotalHits() <= 0) {
return null;
}
List<DiscussPost> list = new ArrayList<>();
for (SearchHit hit : hits) {
DiscussPost post = new DiscussPost();
String id = hit.getSourceAsMap().get("id").toString();
post.setId(Integer.valueOf(id));
String userId = hit.getSourceAsMap().get("userId").toString();
post.setUserId(Integer.valueOf(userId));
String title = hit.getSourceAsMap().get("title").toString();
post.setTitle(title);
String content = hit.getSourceAsMap().get("content").toString();
post.setContent(content);
String status = hit.getSourceAsMap().get("status").toString();
post.setStatus(Integer.valueOf(status));
String createTime = hit.getSourceAsMap().get("createTime").toString();
post.setCreateTime(new Date(Long.valueOf(createTime)));
String commentCount = hit.getSourceAsMap().get("commentCount").toString();
post.setCommentCount(Integer.valueOf(commentCount));
// 处理高亮显示的结果
HighlightField titleField = hit.getHighlightFields().get("title");
if (titleField != null) {
post.setTitle(titleField.getFragments()[0].toString());
}
HighlightField contentField = hit.getHighlightFields().get("content");
if (contentField != null) {
post.setContent(contentField.getFragments()[0].toString());
}
list.add(post);
}
return new AggregatedPageImpl(list, pageable,
hits.getTotalHits(), response.getAggregations(), response.getScrollId(), hits.getMaxScore());
}
});
}
}
消费事件
//=======================消费传来的触发消息:根据消息向elasticsearch服务器增加数据==============================
// 消费发帖事件
@KafkaListener(topics = {TOPIC_PUBLISH})
public void handlePublishMessage(ConsumerRecord record) {
if (record == null || record.value() == null) {
logger.error("消息的内容为空!");
return;
}
Event event = JSONObject.parseObject(record.value().toString(), Event.class);
if (event == null) {
logger.error("消息格式错误!");
return;
}
//得到的消息没有问题,就开始处理事件
//先查找到帖子,再将帖子保存到elasticsearch服务器
DiscussPost post = discussPostService.findDiscussPostById(event.getEntityId());
elasticsearchService.saveDiscussPost(post);
}
//@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@消费删帖事件
@KafkaListener(topics = {TOPIC_DELETE})
public void handleDeleteMessage(ConsumerRecord record) {
if (record == null || record.value() == null) {
logger.error("消息的内容为空!");
return;
}
Event event = JSONObject.parseObject(record.value().toString(), Event.class);
if (event == null) {
logger.error("消息格式错误!");
return;
}
//得到的消息没有问题,就开始处理事件
//先查找到帖子,再将帖子保存到elasticsearch服务器
elasticsearchService.deleteDiscussPost(event.getEntityId());
}
SearchController
@Controller
public class SearchController implements CommunityConstant {
//搜索到帖子后,还要展现帖子的作者,帖子的赞的情况
@Autowired
private ElasticsearchService elasticsearchService;
@Autowired
private UserService userService;
@Autowired
private LikeService likeService;
// search?keyword=xxx
@RequestMapping(path = "/search", method = RequestMethod.GET)
public String search(String keyword, Page page, Model model) {
// 搜索帖子
org.springframework.data.domain.Page<DiscussPost> searchResult =
elasticsearchService.searchDiscussPost(keyword, page.getCurrent() - 1, page.getLimit());
// 聚合数据
List<Map<String, Object>> discussPosts = new ArrayList<>();
if (searchResult != null) {
for (DiscussPost post : searchResult) {
Map<String, Object> map = new HashMap<>();
// 帖子
map.put("post", post);
// 作者
map.put("user", userService.findUserById(post.getUserId()));
// 点赞数量
map.put("likeCount", likeService.findEntityLikeCount(ENTITY_TYPE_POST, post.getId()));
discussPosts.add(map);
}
}
model.addAttribute("discussPosts", discussPosts);
model.addAttribute("keyword", keyword);
// 分页信息
page.setPath("/search?keyword=" + keyword);
page.setRows(searchResult == null ? 0 : (int) searchResult.getTotalElements());
return "/site/search";
}
}