MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布订阅范式的消息协议。它工作在 TCP/IP协议之上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
本次使用ubuntu进行安装及测试。
1、安装mosquitto
start_mqtt.sh
#!/bin/bash
mosquitto -c /etc/mosquitto/mosquitto.conf
passwordfile
root:$7$101$TT7lGYnNAZtouJSH$h0cjkMMso70mCtSk1FRam6gMR+tt78qgyhQ+nSNZe1Jjs6XWLLvmUQdwrTbVxim0FxP0H3t1OivDLxDWt0dL3w==
可以在启动容器后,进入容器执行命令
mosquitto_passwd -c /etc/mosquitto/passwdfile username
dockerfile
# 使用Ubuntu作为基础镜像
FROM ubuntu:latest
# 安装mosquitto、mosquitto-clients以及cmake(用于编译某些插件)
RUN apt-get update && \
apt-get install -y mosquitto mosquitto-clients cmake && \
rm -rf /var/lib/apt/lists/*
RUN mkdir /root/sh
COPY ./conf/mosquitto.conf /etc/mosquitto/
COPY ./conf/passwdfile /etc/mosquitto/
#RUN chmod 0700 /etc/mosquitto/passwdfile
COPY ./start_mqtt.sh /root/sh/start.sh
RUN chmod +x /root/sh/start.sh
# 暴露标准的MQTT端口1883
EXPOSE 1883
# 启动mosquitto服务
ENTRYPOINT ["/root/sh/start.sh"]
#CMD ["mosquitto", "-c", "/etc/mosquitto/mosquitto.conf"]
2、安装nginx(选用)
nginx.conf
load_module /usr/lib/nginx/modules/ngx_stream_module.so;
worker_processes 1;
events {
worker_connections 1024;
}
# 关闭守护进程模式
daemon off;
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
server_tokens off;
# 开启gzip
gzip on;
# 低于1kb的资源不压缩
gzip_min_length 1k;
# 设置压缩所需要的缓冲区大小
gzip_buffers 4 16k;
# 压缩级别【1-9】,越大压缩率越高,同时消耗cpu资源也越多,建议设置在4左右。
gzip_comp_level 4;
# 需要压缩哪些响应类型的资源,缺少的类型自己补。
gzip_types text/css text/javascript application/javascript;
# 配置禁用gzip条件,支持正则。此处表示ie6及以下不启用gzip(因为ie低版本不支持)
gzip_disable "MSIE [1-6]\.";
# 是否添加“Vary: Accept-Encoding”响应头,
gzip_vary on;
# 设置gzip压缩针对的HTTP协议版本,没做负载的可以不用
# gzip_http_version 1.0;
include /etc/nginx/conf.d/*.conf;
}
stream {
include /etc/nginx/conf.d/*.stream;
}
*.stream
upstream backend {
server 172.199.0.6:1883;
}
server {
listen 11883;
proxy_pass backend;
}
dockerfile
# 使用Ubuntu作为基础镜像
FROM ubuntu:latest
LABEL authors="lin@kunyuan.tech"
# 安装所需的依赖
RUN apt-get update && apt-get install -y nginx
RUN apt-get install -y libnginx-mod-stream
RUN mkdir /root/sh
VOLUME /etc/nginx
COPY ./conf/nginx.conf /etc/nginx/nginx.conf
COPY ./conf/conf.d /etc/nginx/conf.d
COPY ./html /home/html
WORKDIR /home/html
# 暴露端口
EXPOSE 80
ENTRYPOINT ["nginx"]
3、启动
docker-compose
version : '3.8'
networks:
ky-networks:
driver: bridge
ipam:
config:
- subnet: 172.199.0.0/24
services:
ky-nginx:
container_name: ky-nginx
restart: always
build:
context: ./nginx
dockerfile: dockerfile
ports:
- "1111:80"
- "11883:11883"
volumes:
- ./nginx/conf/nginx.conf:/etc/nginx/nginx.conf
- ./nginx/conf/conf.d:/etc/nginx/conf.d
- ./nginx/html:/home/html
networks:
ky-networks:
ipv4_address: 172.199.0.4
ky-mqtt:
container_name: ky-mqtt
restart: always
build:
context: ./mqtt
dockerfile: dockerfile
ports:
- "1883:1883"
volumes:
- ./mqtt/conf/mosquitto.conf:/etc/mosquitto/mosquitto.conf
- ./mqtt/conf/passwdfile:/etc/mosquitto/passwdfile
networks:
ky-networks:
ipv4_address: 172.199.0.6
4、测试
#Send
import random
import time
from paho.mqtt import client as mqtt_client
broker = 'ky.room58.cn'
port = 11883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'
def connect_mqtt():
def on_connect(client, userdata, flags, rc, pr):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2, client_id)
# 设置用户名和密码
client.username_pw_set("root", "123456")
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
#Get
import random
from paho.mqtt import client as mqtt_client
broker = 'ky.room58.cn'
port = 11883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc, pr):
if rc == 0:
print("Connected to MQTT Broker!")
subscribe(client)
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2, client_id)
client.username_pw_set("root", "123456")
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
#subscribe(client)
client.loop_forever()
if __name__ == '__main__':
run()
结果