一、数据同步问题分析
之前测试的数据都是一次从mysql导入到es,随着时间的推移,每天都有可能发生增删改查,不可能每次都全量同步,所以需要考虑增量同步问题。
二、解决方案
1、同步调用
缺点:
耦合性高,服务之间会相互影响
2、异步通知
依赖消息队列的可靠性
3、监听binlog
4、方案对比
三、案例-利用MQ实现Mysql与Elasticsearch数据同步
1、导入hotel-admin项目
启动:端口8099
2、申明exchange、queue、RoutingKey
/**
* 定义队列和exchange
* @author edevp
*/
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(MqConstants.EXCHANGE_NAME, true, false);
}
@Bean
public Queue InsertQueue() {
return new Queue(MqConstants.INSERT_QUEUE_NAME, true);
}
@Bean
public Queue DeleteQueue() {
return new Queue(MqConstants.DELETE_QUEUE_NAME, true);
}
@Bean
public Binding insertBinding() {
return BindingBuilder.bind(InsertQueue()).to(topicExchange()).with(MqConstants.INSERT_KEY);
}
@Bean
public Binding deleteBinding() {
return BindingBuilder.bind(DeleteQueue()).to(topicExchange()).with(MqConstants.DELETE_KEY);
}
}
3、在hotel-admin中完成增删改查的消息推送
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_NAME,MqConstants.INSERT_KEY,hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_NAME,MqConstants.INSERT_KEY,hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_NAME,MqConstants.DELETE_KEY,id);
}
4、在hotel-demo中完成消息监听并更新到es中
```java
/**
* 消息监听
* @author edevp
**/
@Component
public class HotelListener {
@Autowired
private IHotelService hotelService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = MqConstants.INSERT_QUEUE_NAME),
exchange = @Exchange(name = MqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
key = MqConstants.INSERT_KEY
))
public void listenHotelInsert(Long hotelId){
// 新增
hotelService.saveById(hotelId);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = MqConstants.DELETE_QUEUE_NAME),
exchange = @Exchange(name = MqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
key = MqConstants.DELETE_KEY
))
public void listenHotelDelete(Long hotelId){
// 删除
hotelService.deleteById(hotelId);
}
}
# 四、代码仓库
hotel-admin:[https://gitee.com/edevp/hotel-admin](https://gitee.com/edevp/hotel-admin)(hotel-db-sync分支)
hotel-demo:[https://gitee.com/edevp/hotel-demo](https://gitee.com/edevp/hotel-demo)
版权声明:本文为Blueeyedboy521原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。