camel+activemq入门整合

  • Post author:
  • Post category:其他


摘要:

Apache Camel是Apache基金会下的一个开源项目,它是一个基于规则路由和中介引擎,提供企业集成模式的Java对象的实现,通过应用程序接口(或称为陈述式的Java领域特定语言(DSL))来配置路由和中介的规则。领域特定语言意味着Apache Camel支持你在的集成开发工具中使用平常的,类型安全的,可自动补全的Java代码来编写路由规则,而不需要大量的XML配置文件。同时,也支持在spring中使用XML配置定义路由和中介规则。

activemq有两种队列模式

queue:一对一,消息是异步的,但是只能消费一次

topic:一对多,可以多个消费者,但是要提前订阅,否则看不到消息

下面开始整合

所需的依赖(maven)

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.14.3</version>
        </dependency>

我们先新建一个处理业务逻辑,产生消息的类Message,只需实现Processor接口即可

import org.apache.camel.Exchange;
import org.apache.camel.Processor;

import java.text.SimpleDateFormat;
import java.util.Date;

public class Message implements Processor {

    public void process(Exchange exchange) throws Exception {
        SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String format = simpleDateFormat.format(new Date());
        exchange.getOut().setBody(format);
    }

}

好,我们在看怎么配置路由发送消息到队列

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;

import javax.jms.ConnectionFactory;

public class TestCamel {
    public static void main(String[] args) throws Exception {
        CamelContext context = new DefaultCamelContext(); // 1. 创建 CamelContext.

        ConnectionFactory connectionfactory =
                new ActiveMQConnectionFactory("smx","smx","tcp://localhost:61616");

        //添加组件标签
        context.addComponent("activemq", JmsComponent.jmsComponentAutoAcknowledge(connectionfactory));

        //业务逻辑处理
        final Message message =new Message();

        context.addRoutes(new RouteBuilder() {
            public void configure() {
                // 2. 为路由配置组件或终端节点.
                from("timer://foo?fixedRate=true&period=1s&repeatCount=10").process(message).to("activemq:topic:testCamel");
             //接收消息之后发送到某个类里做逻辑处理,把处理结果又可以返回某个队列里边做处理
                from("activemq:topic:testCamel").bean(HelloWord.class, "hello(String,String)").to("activemq:queue:testQueue");
                }
        });

        // 3. 添加路由到CamelContext
        context.setTracing(true);
        context.start(); // 4. 启动CamelContext.
        Thread.sleep(Integer.MAX_VALUE);  // 为了保持CamelContext处于工作状态,这里需要sleep主线程
        context.stop(); // 最后停止CamelContext
    }
}

上面的ActiveMQConnectionFactory是传入的用户名和密码,是自己定的,后面的连接端口是activemq默认开出一个端口号

下面来测试

直接运行main方法,前提是你的activemq服务器已开启

运行之后,会有10个消息被传送到了testCamel队列里头,为了大家直接看结果,我在底下又把这10条消息消费到了HelloWord类的hello方法里头

import org.apache.camel.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HelloWord {

    private static Logger custom_l = LoggerFactory.getLogger("custom_l");

    public String hello(@Headers String topic, String body) {
        custom_l.debug(body);
        return "hello camel+activemq";
    }
}

就这样,10条消息 就被我写到custom_l日志文件中去了

这里写图片描述

细心的博友会发现,我HelloWord类还返回了一个值,这个是你处理了消息之后又把结果发送到了testQueue队列里了,同理又可以订阅!

转于:

http://blog.csdn.net/fengchen0123456789/article/details/77365899