// HandleMessage implements the Handler interface. func(h *myMessageHandler) HandleMessage(m *nsq.Message) error { iflen(m.Body) == 0 { // Returning nil will automatically send a FIN command to NSQ to mark the message as processed. // In this case, a message with an empty body is simply ignored/discarded. returnnil }
// do whatever actual message processing is desired err := processMessage(m.Body)
// Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message. return err }
funcmain() { // Instantiate a consumer that will subscribe to the provided channel. config := nsq.NewConfig() consumer, err := nsq.NewConsumer("test", "channel", config) if err != nil { log.Fatal(err) }
// Set the Handler for messages received by this Consumer. Can be called multiple times. // See also AddConcurrentHandlers. consumer.AddHandler(&myMessageHandler{})
// Use nsqlookupd to discover nsqd instances. // See also ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds. err = consumer.ConnectToNSQLookupd("150.158.20.194:4161") if err != nil { log.Fatal(err) }
// wait for signal to exit sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan
// Gracefully stop the consumer. consumer.Stop() }