Go语言学习
  • README
  • Go 基础
    • go语言介绍
    • go语言主要特性
    • go内置类型和函数
    • init函数和main函数
    • 下划线
    • iota
    • 字符串
    • 数据类型:数组与切片
    • 数据类型:byte、rune与字符串
    • 变量的5种创建方式
    • 数据类型:字典
    • 指针
    • 数据类型:指针
    • 类型断言
    • 流程控制:defer延迟执行
    • 异常机制:panic和recover
    • 函数
    • go依赖管理
    • go中值传递、引用传递、指针传递区别
  • 标准库
    • Go net/http包
  • 数据结构
    • 哈希表
      • 为什么对 map 的 key 或 value 进行取址操作是不允许的
  • Gin
    • gin 快速开始
    • gin-swagger用法
  • Go 进阶
    • Go 指针
    • Go 中的 GC 演变是怎样的?
    • Go 的堆和栈
  • 面向对象
    • make 和 new 的区别
    • new(T) 和 &T{} 有什么区别?
  • 并发编程
    • Channel
    • Go语言 CSP 并发模型
    • GMP 模型原理
      • GMP 模型为什么要有 P ?
    • Go 协程池(goroutine pool)
    • Go语言常见的并发模式
    • Go并发实践:主动停止goroutine
  • 最佳实践
    • 发布Go语言模块
  • 软件包
    • 常用的GoLang包工具
    • Go的UUID生成
    • 现代化的命令行框架Cobra
    • 配置解析神器Viper
    • Go发送邮件gomail
    • Go反射框架Fx
    • NSQ消息队列的使用
    • Go爬虫框架colly
    • grpc-go 的安装和使用
Powered by GitBook
On this page

Was this helpful?

  1. 软件包

NSQ消息队列的使用

NSQ 是由国外的一个短链服务商bitly使用golang开发的一个消息队列系统,正好使用到了这个东西,在这里简单的记录下。

获取客户端

nsq的golang客户端是官方版本的

go get github.com/nsqio/go-nsq

即可

简单的消费者和生产者使用

该客户端有原始的command函数用于一些基础操作,也有consumer和producer的封装,我这里是直接使用了封装了。

  • consumer

消费者比较简单,只要监听队列消息,并处理就可以了,下面是一个简单的例子。

type NSQHandler struct {
}

func (this *NSQHandler) HandleMessage(message *nsq.Message) error {
    log.Println("recv:", string(message.Body))
    return nil
}

func testNSQ() {
    waiter := sync.WaitGroup{}
    waiter.Add(1)

    go func() {
        defer waiter.Done()

        consumer, err := nsq.NewConsumer("test", "ch1", nsq.NewConfig())
        if nil != err {
            log.Println(err)
            return
        }

        consumer.AddHandler(&NSQHandler{})
        err = consumer.ConnectToNSQD("10.100.156.207:4150")
        if nil != err {
            log.Println(err)
            return
        }

        select {}
    }()

    waiter.Wait()
}

创建好consumer后,只需要自己创建一个struct并实现HandleMessage方法即可,当有消息时候,再去处理消息。

需要注意的是,AddHandler的回调是在别的routine中执行的,并且可以添加多个handler用于处理消息,这里可能需要注意下线程的同步问题。

  • producer

生产者也和消费者差不多,首先需要创建一个producer

func (this *MsgQueue) Init(addr string) error {
    var err error
    this.addr = addr

    //  try to connect
    cfg := nsq.NewConfig()
    this.producer, err = nsq.NewProducer(addr, cfg)
    if nil != err {
        return err
    }

    //  try to ping
    err = this.producer.Ping()
    if nil != err {
        this.producer.Stop()
        this.producer = nil
        return err
    }

    return nil
}

producer封装了较多的方法,分为同步和异步两种。带Async后缀的,都是异步的。

同步是收到了nsq的回应后再返回的函数,所以可能会堵塞,而异步的操作,则调用方需要传入一个chan用于接收结果,当有结果返回或者是超时的情况下,相应的内容会写到该chan中。

在这里我用了同步的api,毕竟消息队列假如出了什么问题,那么整个服务就不可用了,而且同步改异步也不会太麻烦,以后可以做下修改。

publish的方法也很简单,提供一个topic和数据就行了。

PreviousGo反射框架FxNextGo爬虫框架colly

Last updated 2 years ago

Was this helpful?