一、github克隆rocketmq项目并构建
添加环境变量
JAVA_HOME=/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home
git clone https://github.com/apache/rocketmq.git
cd rocketmq
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/rocketmq-4.7.1/rocketmq-4.7.1
安装遇到报错,参考解决方法:
二、启动名称服务器
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success. serializeType=JSON
遇到报错
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Unrecognized VM option 'UseCMSCompactAtFullCollection'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
java版本太高,相关命令已删除,不兼容。
解决办法安装java8,电脑里存在java多个版本把当前版本指向java8
// 查看mac中安装的java版本
ls /Library/Java/JavaVirtualMachines
// 本机版本如下
adoptopenjdk-8.jdk openjdk-13.0.1.jdk
// 查看当前指向的java版本
echo $JAVA_HOME
// 修改环境变量指向低版本java
export JAVA_HOME=/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home
然后再次启动正常
三、启动broker
添加环境变量
ROCKETMQ_HOME=/Users/chenpeng/software/rocketmq/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1
nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
tail -f ~/logs/rocketmqlogs/broker.log
四、发送和接受消息
在发送/接收消息之前,我们需要告知客户端名称服务器所在的位置。
RocketMQ 提供了多种方法来实现这一点。
为了简单起见,我们使用环境变量 NAMESRV_ADDR
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
五、关闭服务
sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK
六、go代码示例
使用官方包https://github.com/apache/rocketmq-client-go
go get github.com/apache/rocketmq-client-go/v2
包里路径examples中有代码实例,这里贴一下
Consummer
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
// 多个consumer消费同一个group,同一个topic instance要设置唯一值,否则报错
// consumer.WithInstance("instance name"),
)
err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("subscribe callback: %v \n", msgs[i])
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
time.Sleep(time.Hour)
err = c.Shutdown()
if err != nil {
fmt.Printf("shutdown Consumer error: %s", err.Error())
}
}
注:多个consumer消费同一个group,同一个topic instance要设置唯一值,否则报错
Producer
package main
import (
"context"
"fmt"
"os"
"strconv"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
// Package main implements a simple producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithRetry(2),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
topic := "test"
for i := 0; i < 10; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
}
res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
}
7、官网文档
版权声明:本文为chen_peng7原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。