Golang MQTT的使用 实现发布订阅
Eclipse Paho MQTT Go Client 为 Eclipse Paho 项目下的 Go 语言版客户端库,该库能够连接到 MQTT Broker 以发布消息,订阅主题并接收已发布的消息,支持完全异步的操作模式。
官方源代码地址: github.com/eclipse/paho.mqtt.golang
Go应用使用mqtt通信协议的时候, 是作为client端使用的, server端自然需要一个服务来承载, 有很多软件提供MQTT协议支持, 比如mosquitto mqtt, emqx, smqtt, rabbitmq mqtt, pulsar mop mqtt等等.
MQTT服务安装
Golang使用MQTT
#下载MQTT依赖包
go get github.com/eclipse/paho.mqtt.golang
Go 语言的 Paho MQTT 连接 EMQX Broker例子,并进行消息收发
package main
import (
"fmt"
uuid "github.com/satori/go.uuid"
"os"
"strings"
"time"
"github.com/eclipse/paho.mqtt.golang"
)
var fallbackFun mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
//打印收到的topic和内容
fmt.Printf("topic: %s\n", msg.Topic())
fmt.Printf("msg: %s\n", msg.Payload())
//收到的topic $test/msg/liang/get
//处理发布的topic $abcd/msg/liang/set
split := strings.Split(msg.Topic(), "/")
liang := split[2]
setTopic := "$abcd/msg/" + liang + "/set"
//处理要发送的数据
data := "{\"msg\":\"hello\"}"
// 处理完之后 发布消息 $abcd/msg/liang/set
client.Publish(setTopic, 0, false, data)
}
func main() {
clientId := uuid.NewV4()
opts := mqtt.NewClientOptions().AddBroker("tcp://192.168.1.8:1883").SetClientID(fmt.Sprintf("%s", clientId))
opts.SetKeepAlive(60 * time.Second)
// 设置消息回调处理函数 fallbackFun
opts.SetDefaultPublishHandler(fallbackFun)
opts.SetPingTimeout(1 * time.Second)
c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
// 订阅主题 $test/msg/#
if token := c.Subscribe("$test/msg/#", 0, nil); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
select {
}
// 断开连接
c.Disconnect(250)
time.Sleep(1 * time.Second)
}
参考链接:
https://www.emqx.io/docs/zh/v4.4/development/go.html#mqtt-go-%E4%BD%BF%E7%94%A8%E7%A4%BA%E4%BE%8B
https://blog.justwe.site/post/tcp-mqtt/
文章评论