Elasticsearch实战-数据同步(解决es数据增量同步)

  • Post author:
  • Post category:其他

一、数据同步问题分析

之前测试的数据都是一次从mysql导入到es,随着时间的推移,每天都有可能发生增删改查,不可能每次都全量同步,所以需要考虑增量同步问题。
在这里插入图片描述

二、解决方案

1、同步调用

在这里插入图片描述

缺点:
耦合性高,服务之间会相互影响

2、异步通知

依赖消息队列的可靠性
在这里插入图片描述

3、监听binlog

在这里插入图片描述

4、方案对比

在这里插入图片描述

三、案例-利用MQ实现Mysql与Elasticsearch数据同步

在这里插入图片描述

1、导入hotel-admin项目

启动:端口8099
在这里插入图片描述

2、申明exchange、queue、RoutingKey

在这里插入图片描述

/**
 * 定义队列和exchange
 * @author edevp
 */
@Configuration
public class MqConfig {
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(MqConstants.EXCHANGE_NAME, true, false);
    }

    @Bean
    public Queue InsertQueue() {
        return new Queue(MqConstants.INSERT_QUEUE_NAME, true);
    }

    @Bean
    public Queue DeleteQueue() {
        return new Queue(MqConstants.DELETE_QUEUE_NAME, true);
    }

    @Bean
    public Binding insertBinding() {
        return BindingBuilder.bind(InsertQueue()).to(topicExchange()).with(MqConstants.INSERT_KEY);
    }

    @Bean
    public Binding deleteBinding() {
        return BindingBuilder.bind(DeleteQueue()).to(topicExchange()).with(MqConstants.DELETE_KEY);
    }

}

3、在hotel-admin中完成增删改查的消息推送

 @PostMapping
    public void saveHotel(@RequestBody Hotel hotel){

        hotelService.save(hotel);
        rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_NAME,MqConstants.INSERT_KEY,hotel.getId());
    }

    @PutMapping()
    public void updateById(@RequestBody Hotel hotel){
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        hotelService.updateById(hotel);
        rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_NAME,MqConstants.INSERT_KEY,hotel.getId());
    }

    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {

        hotelService.removeById(id);
        rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_NAME,MqConstants.DELETE_KEY,id);
    }

4、在hotel-demo中完成消息监听并更新到es中


```java
/**
 * 消息监听
 * @author edevp
 **/
@Component
public class HotelListener {

    @Autowired
    private IHotelService hotelService;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = MqConstants.INSERT_QUEUE_NAME),
            exchange = @Exchange(name = MqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
            key = MqConstants.INSERT_KEY
    ))
    public void listenHotelInsert(Long hotelId){
        // 新增
        hotelService.saveById(hotelId);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = MqConstants.DELETE_QUEUE_NAME),
            exchange = @Exchange(name = MqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
            key = MqConstants.DELETE_KEY
    ))
    public void listenHotelDelete(Long hotelId){
        // 删除
        hotelService.deleteById(hotelId);
    }
}

# 四、代码仓库
hotel-admin:[https://gitee.com/edevp/hotel-admin](https://gitee.com/edevp/hotel-admin)(hotel-db-sync分支)
						
hotel-demo:[https://gitee.com/edevp/hotel-demo](https://gitee.com/edevp/hotel-demo)


版权声明:本文为Blueeyedboy521原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。