![micropython温度湿度上传与mqtt通讯并在oled显示代码二](http://01.pics/zb_users/cache/thumbs/6b534dc32c8b8d42c30bbc69394e2fd7-400-300-1.png)
未分类
micropython温度湿度上传与mqtt通讯并在oled显示代码二
mjdyz 发表于2024-09-02 浏览34 评论0
因为之前的代码在测试运行中出现传感器故障以后就一直报错导致无法正常运行,在原代码的基础上加入了错误重启功能,除wifi连接外其他的错误出现设备将自动重启。每一次代码的更新将会进行长期的运行测试,为此我开通服务器对后期各项开发进行检测展示,测试平台地址http://116.205.128.129:1881/ 账号密码为:test 当然在服务器上我还开了很多服务,具体端口自己猜。
未分类
python温度上传与mqtt通讯订阅上传并在oled显示代码
mjdyz 发表于2024-08-29 浏览54 评论0
载入库文件 from machine import Pin, SoftI2C import network import dht import time from umqtt.simple import MQTTClient import ssd1306 # 初始化OLED i2c = SoftI2C(scl=Pin(22), sda=Pin(21)) oled = ssd1306.SSD1306_I2C(128, 64, i2c) # 全局变量用于持久化显示信息 temp_display_text = "Temp: -- C" hum_display_text = "Hum: -- %" # 连接Wi-Fi def connect_wifi(ssid, password): wlan = network.WLAN(network.STA_IF) wlan.active(True) if not wlan.isconnected(): print('Connecting to network...') wlan.connect(ssid, password) while not wlan.isconnected(): pass # 等待连接 print('Network connected:', wlan.ifconfig()) # MQTT订阅回调函数 def mqtt_sub_callback(topic, msg): global temp_display_text, hum_display_text print((topic, msg)) if topic.decode() == "sensor/temperature": temp_display_text = "Temp: {} C".format(msg.decode()) elif topic.decode() == "sensor/humidity": hum_display_text = "Hum: {} %".format(msg.decode()) oled.fill(0) # 清除屏幕 oled.text(temp_display_text, 0, 0) oled.text(hum_display_text, 0, 16) oled.show() # 设置MQTT客户端并连接到MQTT Broker def setup_mqtt(): mqtt_client = MQTTClient("客户端名称", "服务器地址", user="改为你的账号", password="改为你的密码", port=端口号) mqtt_client.set_callback(mqtt_sub_callback) mqtt_client.connect() mqtt_client.subscribe(b"sensor/temperature") mqtt_client.subscribe(b"sensor/humidity") print("Connected to MQTT Broker and subscribed to topics") return mqtt_client # DHT11传感器初始化 dht_sensor = dht.DHT11(Pin(4)) # 主循环函数 def main(): ssid = 'wifi名称' password = '密码' connect_wifi(ssid, password) mqtt_client = setup_mqtt() while True: try: dht_sensor.measure() temp = dht_sensor.temperature() hum = dht_sensor.humidity() temp_topic = "sensor/temperature" hum_topic = "sensor/humidity" mqtt_client.publish(temp_topic, str(temp)) mqtt_client.publish(hum_topic, str(hum)) # 检查是否有新的消息,并发布从传感器中读取的数据 mqtt_client.check_msg() print('Temperature: %3.1f C' % temp) print('Humidity: %3.1f %%' % hum) except OSError as e: print('Failed to read sensor', e) time.sleep(2) # 每隔两秒刷新一次数据 if __name__ == "__main__": main()
未分类
未命名
mjdyz 发表于2024-08-05 浏览59 评论0
# 欢迎来到01.pics —— 电气与物联网技术的新天地
亲爱的访客们,