RocketMQ搭建步骤
开发环境
- 64位 centos7(虚拟机,1G内存)
- 64位 jdk1.8
- maven 3.5.0
- Git
- tomcat(用于启动rocketmq-console)
- rocketmq 3.2.6(最好选择maven仓库中已有的版本,保持客户端依赖的jar包和服务器版本一致)
- rocketmq-console
环境变量配置
vi /etc/profile 打开文件配置如下:
JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk
JRE_HOME=$JAVA_HOME/jre
CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
M2_HOME=/usr/maven/
ROCKETMQ_HOME=/usr/rocketmq
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$M2_HOME/bin:$ROCKETMQ_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH M2_HOME ROCKETMQ_HOME PATH
export NAMESRV_ADDR=127.0.0.1:9876
source /etc/profile 使配置文件立即生效
防火墙配置
宿主机需要远程访问虚拟机的rocketmq服务和web服务,需要开放相关的端口号,简单粗暴的方式是直接关闭防火墙
service iptables stop 关闭防火墙
service iptables status 查看防火墙的状态
service iptables start 启动防火墙
或者为了安全,只开放特定的端口号,如8080、9876、10911等等,此处不再赘述。
安装、启动RocketMQ
1.下载和安装
cd /usr
wget https://github.com/alibaba/RocketMQ/releases/download/v3.2.6/alibaba-rocketmq-3.2.6.tar.gz
tar -zxvf alibaba-rocketmq-3.2.6.tar.gz
mv alibaba-rocketmq-3.2.6 rocketmq
cd rocketmq/bin 进入rocketmq核心命令文件目录
2.设置可执行权限
chmod +x mqadmin mqbroker mqfiltersrv mqshutdown mqnamesrv
3.修改jvm参数
vim修改runserver.sh和runbroker.sh的jvm参数如下(根据虚拟机内存大小设置,超出内存大小可能会报错):
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=320m"
4.启动nameserver
nohup sh mqnamesrv &
5.配置broker
(1)创建broker配置文件
mkdir ../conf/me-2m-2s-async/
sh mqbroker -m >../conf/me-2m-2s-async/broker.p
(2)修改brokerIP
vi ../conf/me-2m-2s-async/broker.p
brokerIP1=192.168.x.x 显示指定为虚拟机的外网IP,不要用localhost和127.0.0.1,因为远程主机会根据brokerIP1指定的地址去访问broker
6.启动broker
nohup sh mqbroker -n localhost:9876 -c ../conf/me-2m-2s-async/broker.p &
7.检查nameserver和broker是否启动成功
执行jps,输出以下进程表示启动成功
8464 NamesrvStartup
8618 BrokerStartup
或者,查看nuhup.out日志文件,有如下信息表示启动成功
The Name Server boot success.
The broker[localhost.localdomain, 192.168.x.x:10911] boot success. and name server is localhost:9876
或者,启动rocketmq自带的Producer和Consumer程序,若可正常发送和消费消息,则表示服务启动成功
bash tools.sh com.alibaba.rocketmq.example.quickstart.Producer #生产者
bash tools.sh com.alibaba.rocketmq.example.quickstart.Consumer #消费者
8.关闭nameserver和broker的方法
sh mqshutdown broker
sh mqshutdown namesrv
安装、启动rocketmq-console
wget https://github.com/duomu/rocketmq-console/raw/master/rocketmq-console.war 下载
将rocketmq-console.war放在/usr/tomcat/webapps目录下
sh /usr/tomcat/bin/startup.sh 启动tomcat
虚拟机本地访问
http://localhost:8080/rocketmq-console
,显示如下页面表示启动成功
宿主机远程访问
http://192.168.x.x:8080/rocketmq-console
,若无法访问,请检查防火墙是否关闭或者是否开放了8080端口号。
编写测试程序
在宿主机(windows)上编写如下测试程序:
依赖配置
//此处只列出mq相关的依赖
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.2.6</version>
</dependency>
创建生产者
package com.fuscent.infoquery.practice.rocketmq;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import org.apache.log4j.Logger;
/**
* @author:duomu
* @date:2017/8/4 18:09
*/
public class MqProducer {
private static Logger logger = Logger.getLogger(MqProducer.class);
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("Producer");
producer.setNamesrvAddr("192.168.229.132:9876");
try {
producer.start();
logger.info("producer启动成功");
for (int i = 0; i < 5; i++) {
Message msg = new Message("TopicA", "tagA", "OrderID188", "Hello world".getBytes());
SendResult result = producer.send(msg);
logger.info("id:" + result.getMsgId() + " result:" + result.getSendStatus());
}
} catch (Exception e) {
logger.error("发送消息失败,Exception error:" + e);
} finally {
producer.shutdown();
}
}
}
创建消费者
package com.fuscent.infoquery.practice.rocketmq;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
import org.apache.log4j.Logger;
import java.util.List;
/**
* @author:duomu
* @date:2017/8/4 18:09
*/
public class MqConsumer {
private static Logger logger = Logger.getLogger(MqConsumer.class);
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer_yll");
consumer.setNamesrvAddr("192.168.229.132:9876");
try {
consumer.subscribe("TopicA", "tagA||tagB");//可订阅多个tag,但是一个消息只能有一个tag
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
Message msg = list.get(0);
logger.info(msg.toString());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
logger.info("consumer启动成功");
} catch (MQClientException e) {
logger.error("消费者订阅消息失败,error:" + e);
}
}
}
测试生成者和消费者
启动生成者
启动消费者
总结
前人栽树,后人乘凉,在baidu+google了n篇文章后,终于把rocketmq搭建成功了,虽然只是单机配置,但是把该踩的坑都踩了,集群搭建应该只是多配几台服务而已,后续再研究啦~~~
坑1
在github上下载了最新的rocketmq4.1.0,后来发现maven中央仓库还没有4.1.0的rocketmq-client依赖包,后来下载了3.5.8,也没有调成功,索性下载一个比较早期的版本,选了3.2.6,我们公司用的3.2.4,比我们公司的早一点点应该不会太差。。。
坑2
nameserver和broker启动成功,宿主机上的生产者发送消息失败,报如下错误,且指向错误码33/44/50:
com.alibaba.rocketmq.client.exception.MQClientException:Send [1] times, still failed, cost [75]ms,...
出现这个问题首先要查看虚拟机本地的producer是否可以正常发送消息,如果本地收发消息正常,那么一定远程访问的过程中出了问题,可能是端口号没开放,也可能是IP地址映射有问题。
对于端口号,我已经确定了n遍,防火墙是关闭的,最初还没有考虑到IP地址的问题,所以百思不得其解,从阿里官方渠道获取了错误33/44/50的解决方案,试了一下也没用,把rocketmq3.2.6源码里面的Producer跑了一下也是报那个错误,错误44的说明里写着可能是producer没有正确连接到NameServer,我知道没有连接成功,可是防火墙我都关闭了还能有什么原因呢。
捣鼓了大半天,就卡在这个问题上了,我想我一定是漏掉了什么,反反复复看38/44/50的错误说明,直到看到错误50说明里面的这一句话:
然后我注意到下面这个嵌套错误,debug了一下,也没看出什么,当时我还以为这个ip是虚拟机的局域网ip
接着就baidu+google,偶然google出一篇思路别具一格的文章,说rocketmq自动识别网络出错,要把其他网络关掉,我之前学习docker的时候的确在虚拟机上配了docker的网络。
然后就尝试关掉docker的网络(172.17.0.1),可是关掉了还是照样报上面的错误啊。。。
真的没有办法了,今天早晨来了突然想到,能够访问外网ip不能访问局域网ip,ping一下看看吧,果然局域网ip ping不通,由于对网络、虚拟机了解的不深,我就去求教网络童鞋了,问宿主机怎么能够访问虚拟机的局域网ip(我用的NAT模式),网络童鞋说你用桥接模式吧,当时心中暗喜,心想吼吼我的大难题就要这么简单的解决了,网络童鞋走后,我就试了一下,麻蛋为什么用桥接模式分了新的ip(172.16.2.129),还是报上面那个172.17.0.1的错误。。
第一次搭rocketmq,想尽快调通,基本上都是用的默认配置,而且默认配置一般不会有问题啊,自己写配置才容易出错,然鹅万能的百度告诉我我之前先入为主的观念是错的,我想这应该是终极解决方案了吧。。
原来broker自动寻的地址是172.17.0.1,而且深深的刻在了默认配置文件里,虽然我关掉了这个网络,配置文件里还是这个地址,然后我重新写了个配置文件,强制指定broker所在的机器ip为192.168.x.x,重启服务,大功告成!
和局域网ip能否ping通无关,我把网络连接改回了NAT模式,感谢网络童鞋的帮忙,我要好好补一下网络和虚拟机的知识了。。。
参考资料
附上最有价值的几个~~
http://rocketmq.apache.org/docs/quick-start/
官方资料,搭建mq之前最好把User Guide都看一遍
https://firsh.me/2017/07/19/rocketmq-p-c/
https://my.oschina.net/xcafe/blog/814135
坑2的终极解决方案
http://www.cnblogs.com/badboyf/p/6611774.html