go【第十二篇】消息队列

  • Post author:
  • Post category:其他



消息队列是架构级解耦方案,常用于流量削峰、应用解耦、异步处理

消息队列之NSQ

NSQ是目前比较流行的一个分布式的消息队列,本文主要介绍了NSQ及Go语言如何操作NSQ。

NSQ介绍

NSQ是Go语言编写的一个开源的实时分布式内存消息队列,其性能十分优异。 NSQ的优势有以下优势:

NSQ提倡分布式和分散的拓扑,没有单点故障,支持容错和高可用性,并提供可靠的消息交付保证
NSQ支持横向扩展,没有任何集中式代理。
NSQ易于配置和部署,并且内置了管理界面。

安装


官方下载页面

根据自己的平台下载并解压即可。

启动nsqd:	nsqd -lookupd-tcp-address=127.0.0.1:4160
	[nsqd] 2019/07/18 16:02:50.968403 INFO: nsqd v1.1.0 (built w/go1.10.3)
	[nsqd] 2019/07/18 16:02:51.013659 INFO: ID: 826
	[nsqd] 2019/07/18 16:02:51.014577 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
	[nsqd] 2019/07/18 16:02:51.035655 INFO: HTTP: listening on [::]:4151
	[nsqd] 2019/07/18 16:02:51.035655 INFO: LOOKUP(127.0.0.1:4160): adding peer
	[nsqd] 2019/07/18 16:02:51.038262 INFO: LOOKUP connecting to 127.0.0.1:4160
	[nsqd] 2019/07/18 16:02:51.035655 INFO: TCP: listening on [::]:4150

启动nsqd:	nsqlookupd
	[nsqlookupd] 2019/07/18 15:59:34.219793 INFO: nsqlookupd v1.1.0 (built w/go1.10.3)
	[nsqlookupd] 2019/07/18 15:59:34.279192 INFO: TCP: listening on [::]:4160
	[nsqlookupd] 2019/07/18 15:59:34.279192 INFO: HTTP: listening on [::]:4161
	
运行nsqadmin管理:	nsqadmin -lookupd-http-address=127.0.0.1:4161
	[nsqadmin] 2019/07/18 15:59:54.169512 INFO: nsqadmin v1.1.0 (built w/go1.10.3)
	[nsqadmin] 2019/07/18 15:59:54.213611 INFO: HTTP: listening on [::]:4171
	
创建topic
	http://127.0.0.1:4171/lookup
	topic_demo4
	
	
	

Go操作NSQ


// nsq_producer/main.go
package main

import (
    "bufio"
    "fmt"
    "os"
    "strings"

    "github.com/nsqio/go-nsq"
)

// NSQ Producer Demo

var producer *nsq.Producer

// 初始化生产者
func initProducer(str string) (err error) {
    config := nsq.NewConfig()
    producer, err = nsq.NewProducer(str, config)
    if err != nil {
        fmt.Printf("create producer failed, err:%v\n", err)
        return err
    }
    return nil
}

func main() {
    nsqAddress := "127.0.0.1:4150" // nsqd的地址
    err := initProducer(nsqAddress)
    if err != nil {
        fmt.Printf("init producer failed, err:%v\n", err)
        return
    }

    reader := bufio.NewReader(os.Stdin) // 从标准输入读取
    for {
        data, err := reader.ReadString('\n')
        if err != nil {
            fmt.Printf("read string from stdin failed, err:%v\n", err)
            continue
        }
        data = strings.TrimSpace(data) // 去掉输入内容前后的空格
        if strings.ToUpper(data) == "Q" { // 输入Q退出
            break
        }
        // 向 'topic_demo' publish 数据
        err = producer.Publish("topic_demo", []byte(data))
        if err != nil {
            fmt.Printf("publish msg to nsq failed, err:%v\n", err)
            continue
        }
    }
}


生产者


// nsq_consumer/main.go
package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/nsqio/go-nsq"
)

// NSQ Consumer Demo

// MyHandler 是一个消费者类型
type MyHandler struct {
    Title string
}

// HandleMessage 是需要实现的处理消息的方法
func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
    fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body))
    return
}

// 初始化消费者
func initConsumer(topic string, channel string, address string) (err error) {
    config := nsq.NewConfig()
    config.LookupdPollInterval = 15 * time.Second // 15秒查询一次有没有新的nsqd节点加入进来
    c, err := nsq.NewConsumer(topic, channel, config)
    if err != nil {
        fmt.Printf("create consumer failed, err:%v\n", err)
        return
    }
    // 创建一个结构体对象
    consumer := &MyHandler{
        Title: "鸣人一号",
    }
    c.AddHandler(consumer)

    // if err := c.ConnectToNSQD(address); err != nil { // 直接连NSQD
    if err := c.ConnectToNSQLookupd(address); err != nil { // 通过lookupd查询
        return err
    }
    return nil

}

func main() {
    err := initConsumer("topic_demo", "xxx", "127.0.0.1:4161")
    if err != nil {
        fmt.Printf("init consumer failed, err:%v\n", err)
        return
    }
    c := make(chan os.Signal)        // 定义一个信号的通道
    signal.Notify(c, syscall.SIGINT) // 转发键盘中断信号到c
    <-c                              // 阻塞
}


消费者

转载于:https://www.cnblogs.com/hyit/articles/11189838.html