主要做课程上线功能:
点击课程上架,操作步骤: 整合ES与RabbitMQ
1.把选中的id传入后台,修改上线状态;
2.把修改状态的对象存入ES索引库;
3.通过rabbitMQ给用户推送营销消息;
1.把选中的id传入后台,修改上线状态;第一步昨天已总结;
2.把修改状态的对象存入ES索引库;
思路:
1.单独建立一个微服务用来做es查询保存操作,导入依赖,配置yml;
2.编写一个接口,交给spring管理,继承ElasticsearchRepository接口,泛型写doc文档实体类和Long;
3.编写controller,注入自己编写的接口CourseRepository,调用方法;
4.编写feign,把接口暴露给课程服务使用;
5.课程微服务直接es微服务的feign接口使用 即可;
注意:doc文档实体类上加注解
@Document(indexName= "hrm", type = "course") indexname为es索引库名,type为es索引表,类似于mysql的建库建表;
1.单独建立一个微服务用来做es查询保存操作,导入依赖,配置yml;
<!--ES的包--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency>spring: data: elasticsearch: cluster-name: elasticsearch #集群名称 cluster-nodes: 127.0.0.1:9300 #9200是图形界面端,9300代码你的端口 application: name: service-es #服务名 #配置数据库链接信息
2.编写一个接口,交给spring管理,继承ElasticsearchRepository接口,泛型写doc文档实体类和Long;
@Component public interface CourseRepository extends ElasticsearchRepository<CourseDoc,Long> { }
3.编写controller,注入自己编写的接口CourseRepository,调用方法;
@RestController @RequestMapping("/es") public class CourseEsController { @Autowired private CourseRepository courseRepository; @PostMapping("/course") public JSONResult CourseEs(@RequestBody List<CourseDoc>docList){ courseRepository.saveAll(docList); return JSONResult.success(); }
4.编写feign,把接口暴露给课程服务使用;
@FeignClient(value = "service-es",fallbackFactory = FallbackFactoryCourse.class) public interface CourseFeign { @PostMapping("/es/course") public JSONResult CourseEs(@RequestBody List<CourseDoc> docList);
5.课程微服务直接es微服务的feign接口使用 即可;
5.1创建一个list来存放前端选中的数据;
5.2.通过前端选中的id找出课程对象与课程市场对象,用
BeanUtils.copyProperties(起始数据,目标数据)方法给doc文档类赋值;
5.3返回list即可;
@Override public JSONResult onLineCourse(CourseQuery query) { //上下线 //思路 //1.通过前端传过来的id拿到对象; //2.拿到对象给对象设置状态; //3.修改返回即可; //4.把上线的课程存入es索引库 //5.通过rabbitMQ发送消息给用户 Long[] ids = query.getIds(); List<CourseDoc> list = new ArrayList<>(); CourseDoc courseDoc = new CourseDoc(); for (Long id : ids) { Course course = courseMapper.selectById(id); if(course.getStatus()!=0){ throw new MyException("当前课程已经上线啦~"); } course.setStatus(1); courseMapper.updateById(course); CourseMarket courseMarket = courseMarketMapper.selectById(course.getId()); System.out.println(courseMarket); BeanUtils.copyProperties(courseMarket,courseDoc); BeanUtils.copyProperties(course,courseDoc); list.add(courseDoc); courseFeign.CourseEs(list); }
3.通过rabbitMQ给用户推送营销消息;
springboot整合RabbitMQ思路:
1.导入依赖,添加yml配置;
<!--spirngboot集成rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>#RabbitMQ相关 放在第二层 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtualHost: / publisher-confirms: true #消息发送到交换机后的回调 publisher-returns: true #消息由交换机发到队列失败后的回调 template: mandatory: true # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
2.rabbitmq配置类,配置队列名与交换机机名,还有队列 与交换机的绑定;
以及初始化RabbitAdmin对象
@Component public class ConfigRabbitMQ { //创建交换机 @Bean public Exchange getExchange(){ return ExchangeBuilder.topicExchange("课程交换机名").build(); } //创建队列 @Bean public Queue getQueue(){ return new Queue("课程队列名",true,false,false); } //绑定交换机与队列 @Bean public Binding getBinding(){ return BindingBuilder.bind(getQueue()).to(getExchange()).with("课程路由键").noargs(); } //初始化RabbitAdmin对象 @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类 rabbitAdmin.setAutoStartup(true); //下面设置目的:项目启动时,就创建交换机和队列 //创建交换机 rabbitAdmin.declareExchange(getExchange()); //创建对列 rabbitAdmin.declareQueue(getQueue()); return rabbitAdmin; } }
第二个配置类,配置rabbitmq的序列化配置,以及两个回调函数(
confirms,returns
);
/** * @description: 做序列化 */ @Configuration public class RabbitMQConverterConfig implements RabbitListenerConfigurer{ //以下配置RabbitMQ消息服务 @Autowired public ConnectionFactory connectionFactory; @Bean public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); // 这里的转换器设置实现了 通过 @Payload 注解 自动反序列化message body factory.setMessageConverter(new MappingJackson2MessageConverter()); return factory; } @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory()); } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(10); //设置手动签收 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory); // 这里的转换器设置实现了发送消息时自动序列化消息对象为message body template.setMessageConverter(jsonMessageConverter()); template.setMandatory(true); return template; } }
3.使用RabbitTemplate发送队列消息;
rabbitTemplate.convertAndSend("课程交换机名","课程路由键","这是新上线的队列");
高频面试题;如何保持RabbitMQ消息不丢失?
1、需要设置Confirm和Return回调方法进行处理
2、然后搞一张消息发送的记录表,里面包含如下字段:交换机名称、routingkey、消息内容、消息状态、重试次数等字段
3、发送消息的时候,将状态置为发送中,重试次数给个默认值(可以从配置表中取),如果Confirm回调里的ack是false,那么我们就需要将状态更新为发送失败,否则更新为发送成功(0:发送中;1:生产者到交换机失败;2:交换机到队列失败;3:成功)
4、Return回调方法只要被触发,说明消息肯定发送失败了,直接将状态改为发送失败
5、搞一个定时任务,定时去扫描该表的所有状态为发送失败的记录,重试次数大于0的消息,重新进行消息发送,每发送一次,重试次数字段减一,直到0为止
6、当重试次数等于0时,说明发送了很多次还是失败,此时需要发短信或邮件告知运维人员,进行人工干预了
原理理解图: