载入库文件
from machine import Pin, SoftI2C
import network
import dht
import time
from umqtt.simple import MQTTClient
import ssd1306
# 初始化OLED
i2c = SoftI2C(scl=Pin(22), sda=Pin(21))
oled = ssd1306.SSD1306_I2C(128, 64, i2c)
# 全局变量用于持久化显示信息
temp_display_text = "Temp: -- C"
hum_display_text = "Hum: -- %"
# 连接Wi-Fi
def connect_wifi(ssid, password):
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('Connecting to network...')
wlan.connect(ssid, password)
while not wlan.isconnected():
pass # 等待连接
print('Network connected:', wlan.ifconfig())
# MQTT订阅回调函数
def mqtt_sub_callback(topic, msg):
global temp_display_text, hum_display_text
print((topic, msg))
if topic.decode() == "sensor/temperature":
temp_display_text = "Temp: {} C".format(msg.decode())
elif topic.decode() == "sensor/humidity":
hum_display_text = "Hum: {} %".format(msg.decode())
oled.fill(0) # 清除屏幕
oled.text(temp_display_text, 0, 0)
oled.text(hum_display_text, 0, 16)
oled.show()
# 设置MQTT客户端并连接到MQTT Broker
def setup_mqtt():
mqtt_client = MQTTClient("客户端名称", "服务器地址", user="改为你的账号", password="改为你的密码", port=端口号)
mqtt_client.set_callback(mqtt_sub_callback)
mqtt_client.connect()
mqtt_client.subscribe(b"sensor/temperature")
mqtt_client.subscribe(b"sensor/humidity")
print("Connected to MQTT Broker and subscribed to topics")
return mqtt_client
# DHT11传感器初始化
dht_sensor = dht.DHT11(Pin(4))
# 主循环函数
def main():
ssid = 'wifi名称'
password = '密码'
connect_wifi(ssid, password)
mqtt_client = setup_mqtt()
while True:
try:
dht_sensor.measure()
temp = dht_sensor.temperature()
hum = dht_sensor.humidity()
temp_topic = "sensor/temperature"
hum_topic = "sensor/humidity"
mqtt_client.publish(temp_topic, str(temp))
mqtt_client.publish(hum_topic, str(hum))
# 检查是否有新的消息,并发布从传感器中读取的数据
mqtt_client.check_msg()
print('Temperature: %3.1f C' % temp)
print('Humidity: %3.1f %%' % hum)
except OSError as e:
print('Failed to read sensor', e)
time.sleep(2) # 每隔两秒刷新一次数据
if __name__ == "__main__":
main()