Code: Select all
# SPDX-FileCopyrightText: 2021 Ladyada for Adafruit Industries
# SPDX-FileCopyrightText: 2022 Kattni Rembor for Adafruit Industries
# SPDX-License-Identifier: MIT
import time
import ssl
from random import randint
import microcontroller
import socketpool
import wifi
import board
import neopixel
import adafruit_minimqtt.adafruit_minimqtt as MQTT
from adafruit_io.adafruit_io import IO_MQTT
from analogio import AnalogIn
analog_in_5 = AnalogIn(board.D5)
analog_in_6 = AnalogIn(board.D6)
try:
from data import secrets
except ImportError:
print("WiFi and Adafruit IO credentials are kept in secrets.py - please add them there!")
raise
# Add your Adafruit IO Username and Key to secrets.py
# (visit io.adafruit.com if you need to create an account,
# or if you need to obtain your Adafruit IO key.)
aio_username = secrets["aio_username"]
aio_key = secrets["aio_key"]
# WiFi
try:
print("Connecting to %s" % secrets["ssid"])
wifi.radio.connect(secrets["ssid"], secrets["password"])
print("Connected to %s!" % secrets["ssid"])
# Wi-Fi connectivity fails with error messages, not specific errors, so this except is broad.
except Exception as e: # pylint: disable=broad-except
print("Failed to connect to WiFi. Error:", e, "\nBoard will hard reset in 30 seconds.")
time.sleep(30)
microcontroller.reset()
# Initialise NeoPixel
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.3)
# Define callback functions which will be called when certain events happen.
def connected(client):
print("Connected to Adafruit IO! Listening for NeoPixel changes...")
# Subscribe to Adafruit IO feed called "neopixel"
client.subscribe("neopixel")
def message(client, feed_id, payload): # pylint: disable=unused-argument
print("Feed {0} received new value: {1}".format(feed_id, payload))
if feed_id == "neopixel":
pixel.fill(int(payload[1:], 16))
# Create a socket pool
pool = socketpool.SocketPool(wifi.radio)
# Initialize a new MQTT Client object
mqtt_client = MQTT.MQTT(
broker="io.adafruit.com",
username=secrets["aio_username"],
password=secrets["aio_key"],
socket_pool=pool,
ssl_context=ssl.create_default_context(),
)
# Initialize Adafruit IO MQTT "helper"
io = IO_MQTT(mqtt_client)
# Set up the callback methods above
io.on_connect = connected
io.on_message = message
try:
# If Adafruit IO is not connected...
if not io.is_connected:
# Connect the client to the MQTT broker.
print("Connecting to Adafruit IO...")
io.connect()
# Explicitly pump the message loop.
io.loop()
# Publish when first booted
io.publish("random-boot", "BOOT")
# Adafruit IO fails with internal error types and WiFi fails with specific messages.
# This except is broad to handle any possible failure.
except Exception as e: # pylint: disable=broad-except
print("Failed to get or send data, or connect. Error:", e,
"\nBoard will hard reset in 30 seconds.")
time.sleep(30)
microcontroller.reset()
timestamp = 0
while True:
number_5 = analog_in_5.value
time.sleep(0.1)
number_6 = analog_in_6.value
time.sleep(0.1)
try:
# If Adafruit IO is not connected...
if not io.is_connected:
# Connect the client to the MQTT broker.
print("Connecting to Adafruit IO...")
io.connect()
# Explicitly pump the message loop.
io.loop()
# Print and publish analog readings to Adafruit IO every 10 seconds.
if (time.monotonic() - timestamp) >= 10:
#random_number = "{}".format(randint(0, 255))
#print("Current 'random' number: {}".format(random_number))
print(number_5, number_6)
io.publish("analog-in-5", number_5)
io.publish("analog-in-6", number_6)
timestamp = time.monotonic()
# Adafruit IO fails with internal error types and WiFi fails with specific messages.
# This except is broad to handle any possible failure.
except Exception as e: # pylint: disable=broad-except
print("Failed to get or send data, or connect. Error:", e,
"\nBoard will hard reset in 30 seconds.")
time.sleep(30)
microcontroller.reset()
But then crashes with the following error.
Is it possible to read from the analog channel on an ESP32-S3 while connected to WiFi and publishing to Adafruit IO? Any ideas what the issue could be?