共计 1705 个字符,预计需要花费 5 分钟才能阅读完成。
通过HTTP方式提交 生产 消息
使用GO SDK方式通过TCP提交消息和消费消息
直接上代码了
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"io/ioutil"
"log"
"strconv"
"time"
)
var nullLogger = log.New(ioutil.Discard, "", log.LstdFlags)
var nsqAddr = "192.168.0.101:4150"
var nsqTopic = "test"
// 生产者
func sendMessage() {
config := nsq.NewConfig() // 1. 创建生产者
producer, err := nsq.NewProducer(nsqAddr, config)
if err != nil {
log.Fatalln("连接失败: ("+nsqAddr+")", err)
}
errPing := producer.Ping() // 2. 生产者ping
if errPing != nil {
log.Fatalln("无法ping通: " + nsqAddr, errPing)
}
producer.SetLogger(nullLogger, nsq.LogLevelInfo) // 3. 设置不输出info级别的日志
for i := 0; i < 5; i++ { // 4. 生产者发布消息
message := "消息发送测试 " + time.Now().Format("") + " " + strconv.Itoa(i+10000)
err2 := producer.Publish(nsqTopic, []byte(message)) // 注意one-test 对应消费者consumer.go 保持一致
if err2 != nil {
log.Panic("生产者推送消息失败!")
}
}
// 延迟任务,延迟 15 秒
err = producer.DeferredPublish(nsqTopic, 15*time.Second, []byte("延迟任务"))
if err != nil {
return
}
producer.Stop() // 5. 生产者停止执行
}
// 消费者
func doConsumerTask() {
// 1. 创建消费者
config := nsq.NewConfig()
q, errNewCsmr := nsq.NewConsumer(nsqTopic, "ch-one-test", config)
if errNewCsmr != nil {
fmt.Printf("fail to new consumer!, topic=%s, channel=%s", nsqTopic, "ch-one-test")
}
// 2. 添加处理消息的方法
q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
log.Printf("message: %v", string(message.Body))
message.Finish()
return nil
}))
// 3. 通过http请求来发现nsqd生产者和配置的topic(推荐使用这种方式)
lookupAddr := []string{
"192.168.0.101:4161",
}
err := q.ConnectToNSQLookupds(lookupAddr)
//err := q.ConnectToNSQD(nsqAddr)
if err != nil {
log.Panic("[ConnectToNSQLookupds] Could not find nsqd!")
}
// 4. 接收消费者停止通知
<-q.StopChan
// 5. 获取统计结果
stats := q.Stats()
fmt.Sprintf("message received %d, finished %d, requeued:%s, connections:%s",
stats.MessagesReceived, stats.MessagesFinished, stats.MessagesRequeued, stats.Connections)
}
func main() {
sendMessage() // 生产消息
//doConsumerTask() // 消费消息
}
正文完