物联网数据通信 mqtt - mosquitto

2020-11-05

前段时间在跟进智慧农场项目,数据服务与硬件对接,需要用到 mqtt 解决方案 mosquitto。通过 golang 建立数据服务,获取硬件数据。


1. 流程

  • 硬件往 mqtt broker 指定 topic 上 pub 数据。
  • 数据服务系统 订阅(sub)对应的 topic,接收数据。

  • 硬件设备。
  • 工作流程。

2. mosquitto

mosquitto 是一款实现了消息推送协议 MQTT v3.1 的开源消息代理软件,提供轻量级的,支持可发布/可订阅的消息推送模式,使设备对设备之间的短消息通信变得简单,比如现在应用广泛的低功耗传感器,手机、嵌入式计算机、微型控制器等移动设备。

上述 mosquitto 介绍文字来源 百度百科


2.1. 安装

1
2
3
4
5
# macos 安装
brew install mosquitto

# centos 安装
yum install mosquitto

2.2. 修改配置

1
2
3
4
5
6
7
# vim /usr/local/etc/mosquitto/mosquitto.conf

# 修改 mosquitto 服务 ip
bind_address 127.0.0.1

# 修改 mosquitto 服务端口。
port 1883

2.3. 运行

  • MacOS
1
2
3
4
5
# 启动服务。
brew services start mosquitto

# 关闭服务。
brew services stop mosquitto
  • Centos7
1
2
3
4
5
# 启动服务。
systemctl start mosquitto

# 关闭服务。
systemctl stop mosquitto

2.4. 测试

  • 订阅 topic
1
mosquitto_sub -t news
  • 发布 topic
1
mosquitto_pub -t news -m "hello"

3. 机器对外开放端口

  • MacOS
1
2
3
4
5
6
7
8
# 打开防火墙设置。
# sudo vim /etc/pf.conf

# 添加开放端口。
pass in proto tcp from any to any port 1883

# 刷新端口设置。
sudo pfctl -f /etc/pf.conf
  • Centos7
1
2
3
4
5
6
7
8
# 修改防火墙,添加开放端口。
vi /etc/sysconfig/iptables

# 添加开放的端口。
-A INPUT -m state --state NEW -m tcp -p tcp --dport 1883 -j ACCEPT

# 刷新防火墙设置。
systemctl restart iptables.service

4. golang 测试

通过 mosquitto client,订阅设备给 mosquitto 发布的数据。

4.1. 环境搭建

golang 运行环境在跑 mqtt client 以前已经成功搭建,现在搭建缺失的环节。

  • 获取 client。
1
go get github.com/eclipse/paho.mqtt.golang
  • 如果上面命令获取失败,可以修改配置,再操作。
1
2
export GO111MODULE=on
export GOPROXY=https://goproxy.io
  • 如果 golang.org 文件夹下缺失文件,下载对应包。
1
2
git clone https://github.com/golang/tools.git
git clone -v  https://github.com/golang/net.git

4.2. 源码

硬件通过 msgpack 封装了协议。

demo 借鉴了这个帖子 《以mosquitto为服务, 用golang实现简单的mqtt发布和订阅》,谢谢!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package main

import (
    "bytes"
    "fmt"
    "reflect"
    "time"

    MQTT "github.com/eclipse/paho.mqtt.golang"
    "github.com/vmihailenco/msgpack"
)

var (
    /* 发布和订阅的 topic。 */
    mqttTopic   = "news"
    /* mqtt 服务 ip 和 端口。 */
    mqttHostURL = "tcp://127.0.0.1:1883"
)

/* 订阅数据回调 */
func subCallBackFunc(c MQTT.Client, msg MQTT.Message) {
    if !c.IsConnected() {
        return
    }

    /* 硬件通过 msgpack 封包,解析 msgpack。 */
    var out map[string]interface{}
    err := msgpack.Unmarshal(msg.Payload(), &out)
    if err != nil {
        panic(err)
    }

    fmt.Println("mqtt topic: ", msg.Topic())
    fmt.Printf("version: [%v], msg id: [%v], time: [%v], ip: [%v], mac[%v]\n",
        out["v"], out["mid"], out["time"], out["ip"], out["mac"])
    ...
}

/* client 连接 mqtt broker。 */
func connMQTT(broker, user, passwd string) (bool, MQTT.Client) {
    opts := MQTT.NewClientOptions()
    opts.AddBroker(broker)
    opts.SetUsername(user)
    opts.SetPassword(passwd)

    mc := MQTT.NewClient(opts)
    if token := mc.Connect(); token.Wait() && token.Error() != nil {
        return false, mc
    }

    return true, mc
}

/* 订阅。 */
func subscribe() {
    ok, mc := connMQTT(mqttHostURL, "", "")
    if !ok {
        fmt.Println("sub mqtt failed!")
        return
    }
    /* 订阅对应的 topic 信息。 */
    mc.Subscribe(mqttTopic, 0x00, subCallBackFunc)
}

func main() {
    subscribe()
    for {
        time.Sleep(time.Second)
    }
}

5. 参考