项目理解(六)Elasticsearch倒排索引与搜索应用

  • Post author:
  • Post category:其他



目录



时序数据库



倒排索引



Elasticsearch简介



Elasticsearch的应用实现搜索



时序数据库

最简单的定义就是数据格式里包含 timestamp 字段的数据。几乎所有的数据都可以打上一个 timestamp 字段。时间序列数据更重要的一个属性是如何去查询它。在查询的时候,对于时间序列我们总是会带上一个时间范围去过滤数据。同时查询的结果里也总是会包含 timestamp 字段。(系列参考

https://www.elastic.co/cn/blog/frame-of-reference-and-roaring-bitmaps

想要在在查询阶段做数据的聚合和转换,需要能够支持以下三点:

(1)用索引检索出行号:能够从上亿条数据中快速过滤出几百万的数据。

(2)从主存储按行号加载:能够快速加载这过滤出的几百万条数据到内存里。

(3)分布式计算:能够把这些数据按照 GROUP BY 和 SELECT 的要求计算出最终的结果集。

  • 检索:这是搜索引擎最擅长的领域。代表产品是

    Lucene

    。其核心技术是基于高效率数据结构和算法的倒排索引。
  • 加载:这是分析型数据库最擅长的领域。代表产品是

    C-store



    Monetdb

    。其核心技术是按列组织的磁盘存储结构。
  • 分布式计算:这是大数据计算引擎最擅长的领域。代表产品是

    Hadoop



    spark

    。其核心技术是 sharding 和 map/reduce 等等。

Elasticsearch在检索、加载与分布式计算都做得比较好。Elasticsearch 是通过 Lucene 的倒排索引技术实现比关系型数据库更快的过滤。特别是它对多条件的过滤支持非常好。

Elasticsearch 现在的主要目标市场已经从

站内搜索

转移到

了监控与日志数据的收集存储和分析

,也就是ELK。

Elasticsearch 现在主要的应用场景有三块。站内搜索,主要和 Solr 竞争,属于后起之秀。NoSQL json文档数据库,主要抢占 Mongo 的市场,它在读写性能上优于 Mongo,同时也支持地理位置查询,还方便地理位置和文本混合查询,属于歪打正着。监控,统计以及日志类时间序的数据的存储和分析以及可视化,这方面是引领者。


倒排索引

Mysql 只有 term dictionary 这一层,是以 b-tree 排序的方式存储在磁盘上的。检索一个 term 需要若干次的 random access 的磁盘操作。而 Lucene 在 term dictionary 的基础上添加了 term index 来加速检索,term index 以树的形式缓存在内存中。从 term index 查到对应的 term dictionary 的 block 位置之后,再去磁盘上找 term,大大减少了磁盘的 random access 次数。term index不需要存下所有的term,而仅仅是他们的一些前缀与Term Dictionary的block之间的映射关系,再结合FST(Finite State Transducers)的压缩技术,可以使term index缓存到内存中。从term index查到对应的term dictionary的block位置之后,再去磁盘上找term,大大减少了磁盘随机读的次数。


压缩技巧

:Bitmap是一种数据结构,用0/1表示某个值是否存在,比如10这个值就对应第10位,对应的bit值是1,这样用一个字节就可以代表8个文档id,旧版本(5.0之前)的Lucene就是用这样的方式来压缩的,但这样的压缩方式仍然不够高效,如果有1亿个文档,那么需要12.5MB的存储空间,这仅仅是对应一个索引字段(我们往往会有很多个索引字段)。于是有人想出了Roaring bitmaps这样更高效的数据结构。Bitmap的缺点是存储空间随着文档个数线性增长,Roaring bitmaps需要打破这个魔咒就一定要用到某些指数特性:将posting list按照65535为界限分块,比如第一块所包含的文档id范围在0~65535之间,第二块的id范围是65536~131071,以此类推。再用<商,余数>的组合表示每一组id,这样每组里的id范围都在0~65535内了,剩下的就好办了,既然每组id不会变得无限大,那么我们就可以通过最有效的方式对这里的id存储。

程序员的世界里除了1024外,65535也是一个经典值,因为它=2^16-1,正好是用2个字节能表示的最大数,一个short的存储单位,注意到上图里的最后一行“If a block has more than 4096 values, encode as a bit set, and otherwise as a simple array using 2 bytes per value”,如果是大块,用节省点用bitset存,小块用2个字节一个short[]更方便。那为什么用4096来区分大块还是小块呢?short最多存4096(2*8*1024)个数字,4096 = 65536/8/2, 大于4096只能用bitmap,小于4096没必要做转换,直接short就可以了。磁盘一次寻道可以顺序把一个小块的内容都读出来,再大一位就超过1KB了,需要两次读。


联合索引

查询:

(1)利用跳表(Skip list)的数据结构快速做“与”运算。如果使用跳表,对最短的posting list中的每个id,逐个在另外两个posting list中查找看是否存在,最后得到交集的结果。

(2)利用上面提到的bitset按位“与”。如果使用bitset,就很直观了,直接按位与,得到的结果就是最后的交集。


Elasticsearch简介

一个分布式的、Restful风格(

REST


指的是一组架构约束条件和原则

)的搜索引擎;支持对各种数据类型的数据的检索;搜索速度快,可以提供实时的搜索服务;便于水平扩展,每秒可以处理PB级数据。

搜索数据需要把数据存一份到Elasticsearch中:索引;类型;文档;字段 与 数据库中 库;表;行;列 相对应(6版本后有改变):类型被弱化了,逐渐舍弃(全部用_doc保留);索引就越来越对应的是表。集群、节点、分片、副本; 分布式部署提高整体性能;节点:每一台服务器;分片提高并发能力;副本提高可用性;


使用Elasticsearch

进行索引时需要注意:

(1)不需要索引的字段,一定要明确定义出来,因为默认是自动建索引的。

(2)同样的道理,对于String类型的字段,不需要analysis的也需要明确定义出来,因为默认也是会analysis的。

(3)选择有规律的ID很重要,随机性太大的ID(比如java的UUID)不利于查询。


Elasticsearch的应用实现搜索

Elasticsearch和redis底层都基于Netty,两者在

启动的时候会有冲突

:NettyRuntime类下的availableProcessors方法:如果已设置,就会抛异常。Elasticsearch下的一个包中的类Netty4Utils中setAvailableProcessors方法调用了availableProcessors方法;解决:需要设置属性”es.set.netty.runtime.available.processors”为false,在web应用程序启动的时候设置(在es工作之前);

@SpringBootApplication
public class CommunityApplication {
	@PostConstruct
	public void init() {
		// 解决netty启动冲突问题
		// see Netty4Utils.setAvailableProcessors()
		System.setProperty("es.set.netty.runtime.available.processors", "false");
	}
	public static void main(String[] args) {
		SpringApplication.run(CommunityApplication.class, args);
	}
}


先进行相关配置

1、修改elasticsearch中elasticsearch.yml配置文件:cluster.name:liu

Path.data :  C:\Users\liuze\Desktop\MyClassExcisenewProject\logData\elasticsearch\data

Path.logs: C:\Users\liuze\Desktop\MyClassExcisenewProject\logData\elasticsearch\logs

2、配置系统变量/bin

3、安装中文搜索插件:将所有文件必须解压到plugins目录下的ik文件夹;


实现站内搜索功能

1、在实体类中进行es索引映射的

配置

2、在dao层增加

接口

(相当于访问es服务器也相当于一个数据库),注:不使用mapper注解,因为mapper注解是myBatis专有的;使用@Repository注解;接口继承于Spring内置的ElasticSearchRepository;

3、搜索

服务

:将帖子保存至Elasticsearch服务器;从Elasticsearch服务器删除数据;从Elasticsearch服务器搜索帖子;

4、使用

消息队列:触发事件、消费事件。

发布帖子时,将帖子异步的提交到Elasticsearch服务器;增加评论时,将帖子异步提交到Elasticsearch服务器;在消费组件中增加一个方法,消费帖子发布事件;

5、显示

结果

:在控制器中处理搜索请求,在HTML上显示搜索结果

DiscussPostRepository

@Repository
public interface DiscussPostRepository extends ElasticsearchRepository<DiscussPost, Integer> {

}

ElasticsearchService

@Service
public class ElasticsearchService {

    @Autowired
    private DiscussPostRepository discussRepository;

    @Autowired
    private ElasticsearchTemplate elasticTemplate;

    public void saveDiscussPost(DiscussPost post) {
        discussRepository.save(post);
    }

    public void deleteDiscussPost(int id) {
        discussRepository.deleteById(id);
    }

    public Page<DiscussPost> searchDiscussPost(String keyword, int current, int limit) {
        SearchQuery searchQuery = new NativeSearchQueryBuilder()
                .withQuery(QueryBuilders.multiMatchQuery(keyword, "title", "content"))
                .withSort(SortBuilders.fieldSort("type").order(SortOrder.DESC))
                .withSort(SortBuilders.fieldSort("score").order(SortOrder.DESC))
                .withSort(SortBuilders.fieldSort("createTime").order(SortOrder.DESC))
                .withPageable(PageRequest.of(current, limit))
                .withHighlightFields(
                        new HighlightBuilder.Field("title").preTags("<em>").postTags("</em>"),
                        new HighlightBuilder.Field("content").preTags("<em>").postTags("</em>")
                ).build();

        //返回搜索结果
        return elasticTemplate.queryForPage(searchQuery, DiscussPost.class, new SearchResultMapper() {
            @Override
            public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> aClass, Pageable pageable) {
                SearchHits hits = response.getHits();
                if (hits.getTotalHits() <= 0) {
                    return null;
                }

                List<DiscussPost> list = new ArrayList<>();
                for (SearchHit hit : hits) {
                    DiscussPost post = new DiscussPost();

                    String id = hit.getSourceAsMap().get("id").toString();
                    post.setId(Integer.valueOf(id));

                    String userId = hit.getSourceAsMap().get("userId").toString();
                    post.setUserId(Integer.valueOf(userId));

                    String title = hit.getSourceAsMap().get("title").toString();
                    post.setTitle(title);

                    String content = hit.getSourceAsMap().get("content").toString();
                    post.setContent(content);

                    String status = hit.getSourceAsMap().get("status").toString();
                    post.setStatus(Integer.valueOf(status));

                    String createTime = hit.getSourceAsMap().get("createTime").toString();
                    post.setCreateTime(new Date(Long.valueOf(createTime)));

                    String commentCount = hit.getSourceAsMap().get("commentCount").toString();
                    post.setCommentCount(Integer.valueOf(commentCount));

                    // 处理高亮显示的结果
                    HighlightField titleField = hit.getHighlightFields().get("title");
                    if (titleField != null) {
                        post.setTitle(titleField.getFragments()[0].toString());
                    }

                    HighlightField contentField = hit.getHighlightFields().get("content");
                    if (contentField != null) {
                        post.setContent(contentField.getFragments()[0].toString());
                    }

                    list.add(post);
                }

                return new AggregatedPageImpl(list, pageable,
                        hits.getTotalHits(), response.getAggregations(), response.getScrollId(), hits.getMaxScore());
            }
        });
    }

}

消费事件

    //=======================消费传来的触发消息:根据消息向elasticsearch服务器增加数据==============================
    // 消费发帖事件
    @KafkaListener(topics = {TOPIC_PUBLISH})
    public void handlePublishMessage(ConsumerRecord record) {
        if (record == null || record.value() == null) {
            logger.error("消息的内容为空!");
            return;
        }

        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        if (event == null) {
            logger.error("消息格式错误!");
            return;
        }

        //得到的消息没有问题,就开始处理事件
        //先查找到帖子,再将帖子保存到elasticsearch服务器
        DiscussPost post = discussPostService.findDiscussPostById(event.getEntityId());
        elasticsearchService.saveDiscussPost(post);
    }

    //@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@消费删帖事件
    @KafkaListener(topics = {TOPIC_DELETE})
    public void handleDeleteMessage(ConsumerRecord record) {
        if (record == null || record.value() == null) {
            logger.error("消息的内容为空!");
            return;
        }

        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        if (event == null) {
            logger.error("消息格式错误!");
            return;
        }

        //得到的消息没有问题,就开始处理事件
        //先查找到帖子,再将帖子保存到elasticsearch服务器
        elasticsearchService.deleteDiscussPost(event.getEntityId());
    }

SearchController

@Controller
public class SearchController implements CommunityConstant {
    //搜索到帖子后,还要展现帖子的作者,帖子的赞的情况
    @Autowired
    private ElasticsearchService elasticsearchService;
    @Autowired
    private UserService userService;
    @Autowired
    private LikeService likeService;
    // search?keyword=xxx
    @RequestMapping(path = "/search", method = RequestMethod.GET)
    public String search(String keyword, Page page, Model model) {
        // 搜索帖子
        org.springframework.data.domain.Page<DiscussPost> searchResult =
                elasticsearchService.searchDiscussPost(keyword, page.getCurrent() - 1, page.getLimit());
        // 聚合数据
        List<Map<String, Object>> discussPosts = new ArrayList<>();
        if (searchResult != null) {
            for (DiscussPost post : searchResult) {
                Map<String, Object> map = new HashMap<>();
                // 帖子
                map.put("post", post);
                // 作者
                map.put("user", userService.findUserById(post.getUserId()));
                // 点赞数量
                map.put("likeCount", likeService.findEntityLikeCount(ENTITY_TYPE_POST, post.getId()));

                discussPosts.add(map);
            }
        }
        model.addAttribute("discussPosts", discussPosts);
        model.addAttribute("keyword", keyword);
        // 分页信息
        page.setPath("/search?keyword=" + keyword);
        page.setRows(searchResult == null ? 0 : (int) searchResult.getTotalElements());
        return "/site/search";
    }

}



版权声明:本文为liuzewei2015原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。