NSQ消息队列使用

173次阅读
没有评论

共计 1705 个字符,预计需要花费 5 分钟才能阅读完成。

通过HTTP方式提交 生产 消息

NSQ消息队列使用

使用GO SDK方式通过TCP提交消息和消费消息

直接上代码了

package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
    "io/ioutil"
    "log"
    "strconv"
    "time"
)

var nullLogger = log.New(ioutil.Discard, "", log.LstdFlags)
var nsqAddr = "192.168.0.101:4150"
var nsqTopic = "test"
// 生产者
func sendMessage() {
    config := nsq.NewConfig() // 1. 创建生产者
    producer, err := nsq.NewProducer(nsqAddr, config)
    if err != nil {
        log.Fatalln("连接失败: ("+nsqAddr+")", err)
    }

    errPing := producer.Ping() // 2. 生产者ping
    if errPing != nil {
        log.Fatalln("无法ping通: " + nsqAddr, errPing)
    }

    producer.SetLogger(nullLogger, nsq.LogLevelInfo) // 3. 设置不输出info级别的日志

    for i := 0; i < 5; i++ { // 4. 生产者发布消息
        message := "消息发送测试 " + time.Now().Format("") + " " + strconv.Itoa(i+10000)
        err2 := producer.Publish(nsqTopic, []byte(message)) // 注意one-test 对应消费者consumer.go 保持一致
        if err2 != nil {
            log.Panic("生产者推送消息失败!")
        }
    }
    // 延迟任务,延迟 15 秒
    err = producer.DeferredPublish(nsqTopic, 15*time.Second, []byte("延迟任务"))
    if err != nil {
        return
    }

    producer.Stop() // 5. 生产者停止执行
}
// 消费者
func doConsumerTask() {
    // 1. 创建消费者
    config := nsq.NewConfig()
    q, errNewCsmr := nsq.NewConsumer(nsqTopic, "ch-one-test", config)
    if errNewCsmr != nil {
        fmt.Printf("fail to new consumer!, topic=%s, channel=%s", nsqTopic, "ch-one-test")
    }

    // 2. 添加处理消息的方法
    q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
        log.Printf("message: %v", string(message.Body))
        message.Finish()
        return nil
    }))

    // 3. 通过http请求来发现nsqd生产者和配置的topic(推荐使用这种方式)
    lookupAddr := []string{
        "192.168.0.101:4161",
    }
    err := q.ConnectToNSQLookupds(lookupAddr)
    //err := q.ConnectToNSQD(nsqAddr)
    if err != nil {
        log.Panic("[ConnectToNSQLookupds] Could not find nsqd!")
    }

    // 4. 接收消费者停止通知
    <-q.StopChan

    // 5. 获取统计结果
    stats := q.Stats()
    fmt.Sprintf("message received %d, finished %d, requeued:%s, connections:%s",
        stats.MessagesReceived, stats.MessagesFinished, stats.MessagesRequeued, stats.Connections)
}
func main() {
    sendMessage() // 生产消息
    //doConsumerTask() // 消费消息
}
正文完
 0
Eric chan
版权声明:本站原创文章,由 Eric chan 于2021-05-23发表,共计1705字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。