摘要:
Apache Camel是Apache基金会下的一个开源项目,它是一个基于规则路由和中介引擎,提供企业集成模式的Java对象的实现,通过应用程序接口(或称为陈述式的Java领域特定语言(DSL))来配置路由和中介的规则。领域特定语言意味着Apache Camel支持你在的集成开发工具中使用平常的,类型安全的,可自动补全的Java代码来编写路由规则,而不需要大量的XML配置文件。同时,也支持在spring中使用XML配置定义路由和中介规则。
activemq有两种队列模式
queue:一对一,消息是异步的,但是只能消费一次
topic:一对多,可以多个消费者,但是要提前订阅,否则看不到消息
下面开始整合
所需的依赖(maven)
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.3</version>
</dependency>
我们先新建一个处理业务逻辑,产生消息的类Message,只需实现Processor接口即可
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import java.text.SimpleDateFormat;
import java.util.Date;
public class Message implements Processor {
public void process(Exchange exchange) throws Exception {
SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String format = simpleDateFormat.format(new Date());
exchange.getOut().setBody(format);
}
}
好,我们在看怎么配置路由发送消息到队列
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;
import javax.jms.ConnectionFactory;
public class TestCamel {
public static void main(String[] args) throws Exception {
CamelContext context = new DefaultCamelContext(); // 1. 创建 CamelContext.
ConnectionFactory connectionfactory =
new ActiveMQConnectionFactory("smx","smx","tcp://localhost:61616");
//添加组件标签
context.addComponent("activemq", JmsComponent.jmsComponentAutoAcknowledge(connectionfactory));
//业务逻辑处理
final Message message =new Message();
context.addRoutes(new RouteBuilder() {
public void configure() {
// 2. 为路由配置组件或终端节点.
from("timer://foo?fixedRate=true&period=1s&repeatCount=10").process(message).to("activemq:topic:testCamel");
//接收消息之后发送到某个类里做逻辑处理,把处理结果又可以返回某个队列里边做处理
from("activemq:topic:testCamel").bean(HelloWord.class, "hello(String,String)").to("activemq:queue:testQueue");
}
});
// 3. 添加路由到CamelContext
context.setTracing(true);
context.start(); // 4. 启动CamelContext.
Thread.sleep(Integer.MAX_VALUE); // 为了保持CamelContext处于工作状态,这里需要sleep主线程
context.stop(); // 最后停止CamelContext
}
}
上面的ActiveMQConnectionFactory是传入的用户名和密码,是自己定的,后面的连接端口是activemq默认开出一个端口号
下面来测试
直接运行main方法,前提是你的activemq服务器已开启
运行之后,会有10个消息被传送到了testCamel队列里头,为了大家直接看结果,我在底下又把这10条消息消费到了HelloWord类的hello方法里头
import org.apache.camel.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HelloWord {
private static Logger custom_l = LoggerFactory.getLogger("custom_l");
public String hello(@Headers String topic, String body) {
custom_l.debug(body);
return "hello camel+activemq";
}
}
就这样,10条消息 就被我写到custom_l日志文件中去了
细心的博友会发现,我HelloWord类还返回了一个值,这个是你处理了消息之后又把结果发送到了testQueue队列里了,同理又可以订阅!
转于:
http://blog.csdn.net/fengchen0123456789/article/details/77365899