Voting resources, early voting, and poll worker information - VOTE. ... Adafruit is open and shipping.
0

favourite circuitpython task scheduler
Moderators: adafruit_support_bill, adafruit

Please be positive and constructive with your questions and comments.

favourite circuitpython task scheduler

by tcornall on Wed Sep 23, 2020 9:40 pm

Hi All.
This is my first post in this forum so forgive me if I have missed a previous post on this subject. I like circuitpython for a number of reasons, but one thing I miss is asyncio to help me manage multiple things happening at random times, like button presses or networky things. Also, hides all the polling (necessary given no interrupt handlers) and avoid wasted cpu time 'sleeping' or calls to blocking code.

Given that Circuitpython supports the 'yield' function, using generators to do this looks a likely candidate for a framework that I can live with. Here's a simple taste of that idea. (It runs on an ItsyBitsy M4 Express board but probably any Circuitpython board also.)
This grew out of examples from 'max' on https://forum.micropython.org/viewtopic.php?t=15 and others, such as David Beazley (google him and watch his yutube videos. Great stuff)
Anyone else have good ideas on how to hide all the polling and avoid blocking calls and wasted time 'sleeping'? (I do have a 'sleep' in the 'main' loop down the bottom but that is only because
the mu editor can't handle all the prints that spew out otherwise. You can get rid of it if you stop the prints or use a more capable serial terminal.)
Code: Select all | TOGGLE FULL SIZE
"""
Defines tasks as generators that yield
and a scheduler as a list of tasks. Very simple but capable of extension.
from 'max' on https://forum.micropython.org/viewtopic.php?t=15
See also https://www.youtube.com/watch?v=Y4Gt3Xjd7G8 for ideas on how to
deal with time in the scheduler.
"""
import time
import touchio
import digitalio
import board
from adafruit_debouncer import Debouncer

def countFrom(arg):
    """ a generator that prints an increasing counter
    """
    n = arg
    while True:
        # do work, which in this case is print an ascending count
        n += 1
        print('Task One!', n)
        # relinquish control
        yield n

"""
wait endTimeMs and then call a func and restart
uses time.monotonic and could 'yield from'
some iterable task passed
to it after the wait.
"""
def waitFunc(endTimeMs, func):
    n = 0
    start = time.monotonic() * 1000
    while True:
        # wait for a time and then call func
        n = time.monotonic() * 1000
        if n >= start + endTimeMs:
            print('Task Two!', n - start)
            func()
            start = n  # restart the wait
        # relinquish control
        yield None

"""
poll for debounced touchpin to just go True  and then
execute the callback and print what happened
"""
def upEdgeDebouncedTouchPin(pin, func):
    elapsed_time = 0
    start = time.monotonic() * 1000  # to know how long it all took
    touchie = Debouncer(touchio.TouchIn(pin)) #instantiate Debouncer
    old_v= False  #assume it is not being touched at start
    while True:
        # 'poll for a debounced touch on a pin and then run a callback'
        elapsed_time = time.monotonic() * 1000 - start
        touchie.update()  #THIS IS ESSENTIAL
        v = touchie.value #poll the Debouncer
        newTrue = True if v and v != old_v else False  #has it just gone True?
        old_v = v #remember for next time
        if newTrue:   # is it being touched?
            print('Touch on', pin, 'in elapsed time', elapsed_time, 'ms')
            func()  # call the callback.
            start = time.monotonic() * 1000  # restart the elapsed_time
        # relinquish control
        yield None

"""
Make a list of tasks (which must be non blocking and 'yield')
to demonstrate that waiting for some time/event can be done
in a non-blocking manner using generators
"""
taskQueue = [countFrom(1000), waitFunc(1000,
            lambda:print("hollo wordl from waitFunc")),
            upEdgeDebouncedTouchPin(board.A0, lambda:print("upEdgeDebouncedTouchPin"))]

# ************************** main loop here **********************
print('starting taskQueue now')
while True:
    for task in taskQueue:
        next(task)   # for each task, get its next()
                           # i.e. make it run up to its yield
        time.sleep(0.1)   # need this to stop mu from hanging
        # but if not printing in tasks or using faster serial term
        # (e.g. mobxterm) then don't need the sleep

tcornall
 
Posts: 4
Joined: Mon Mar 20, 2017 1:01 am

Re: favourite circuitpython task scheduler

by siddacious on Thu Sep 24, 2020 12:22 am

There's a long running thread in the CircuitPython github repo about concurrency and how best to handle it in CircuitPython that you'll probably have some input on:

https://github.com/adafruit/circuitpython/issues/1380

It's a bit long at this point but worth reading if you like chewing bubble gum and walking ;)

siddacious
 
Posts: 338
Joined: Fri Apr 21, 2017 3:09 pm

Re: favourite circuitpython task scheduler

by tcornall on Thu Sep 24, 2020 2:26 am

Ta, @siddacious will have a look. I think I've been thru it before and there were some nice (and similar) ideas there. deshipu's 'meanwhile' library looks interesting...

tcornall
 
Posts: 4
Joined: Mon Mar 20, 2017 1:01 am

Please be positive and constructive with your questions and comments.