docker compose搭建NSQ

Docker compose 搭建 NSQ

如果希望快速在本地尝试可以查看官方文档 quick_start

编辑docker-compose

假定我们的公网IP为160.168.20.194基于官方文档的 docker-compose 编写差异化配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
version: '3'
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
# 映射nsqlookupd端口
ports:
- "4160:4160"
- "4161:4161"
nsqd:
image: nsqio/nsq
#启动命令设置 在公网部署启用--broadcast-address=ip
command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 --broadcast-address=160.168.20.194
depends_on:
- nsqlookupd
# 映射nsqd端口
ports:
- "4150:4150"
- "4151:4151"
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
depends_on:
- nsqlookupd
# 映射nsqadmin端口
ports:
- "4171:4171"

启动docker compose

在docker-compose.yml相同目录下执行命令启动

1
docker-compose up -d

查看运行状态

1
docker-compose ps

查看运行容器的日志

1
docker-compose logs

查看 nsq_admin http://160.168.20.194:4171/

生产消息

基于官方文档编写一个生产者,每隔五秒发一次”hello+当前时间“

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

package main

import (
"fmt"
"github.com/nsqio/go-nsq"
"time"
)

// NSQ Producer Demo
var producer *nsq.Producer

// 初始化生产者
func initProducer(str string) (err error) {
config := nsq.NewConfig()
producer, err = nsq.NewProducer(str, config)
if err != nil {
fmt.Printf("create producer failed, err:%v\n", err)
return err
}
return nil
}

func main() {
nsqAddress := "160.168.20.194:4150"
err := initProducer(nsqAddress)
if err != nil {
fmt.Printf("init producer failed, err:%v\n", err)
return
}
for {
s := "hello" + time.Now().UTC().String()
messageBody := []byte(s)
topicName := "test"
err = producer.Publish(topicName, messageBody)
fmt.Println()
time.Sleep(time.Duration(5) * time.Second)
}
}

消费消息

另外起一个消费者接受消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package main

import (
"fmt"
"github.com/nsqio/go-nsq"
"log"
"os"
"os/signal"
"syscall"
)

type myMessageHandler struct{}

// HandleMessage implements the Handler interface.
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
if len(m.Body) == 0 {
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
// In this case, a message with an empty body is simply ignored/discarded.
return nil
}

// do whatever actual message processing is desired
err := processMessage(m.Body)

// Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
return err
}

func processMessage(body []byte) error {
fmt.Println(string(body))
return nil
}

func main() {
// Instantiate a consumer that will subscribe to the provided channel.
config := nsq.NewConfig()
consumer, err := nsq.NewConsumer("test", "channel", config)
if err != nil {
log.Fatal(err)
}

// Set the Handler for messages received by this Consumer. Can be called multiple times.
// See also AddConcurrentHandlers.
consumer.AddHandler(&myMessageHandler{})

// Use nsqlookupd to discover nsqd instances.
// See also ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds.
err = consumer.ConnectToNSQLookupd("150.158.20.194:4161")
if err != nil {
log.Fatal(err)
}

// wait for signal to exit
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan

// Gracefully stop the consumer.
consumer.Stop()
}

docker compose搭建NSQ
http://example.com/2023/04/21/program/docker compose 搭建 NSQ/
作者
hao88
发布于
2023年4月21日
许可协议