Python

Python #

Installation #

pip install git+https://github.com/decadentsoup/rota-python

Defining a new task #

Tasks are processes that can be run synchronously from within the same code base or asynchronously from any code base. They are defined with the @rota.task decorator.

import rota, smtplib

@rota.task
def send_email(to_address, subject, body):
    with smtplib.SMTP("smtp.example.com") as smtp:
        smtp.sendmail("noreply@example.com", to_address, f"Subject: {subject}\n\n{body}")

Running your program as a worker #

In order for your task to register your task with Rota, you need to start your program as a worker. This is not the only way to schedule workers: You could instead define the above task as a serverless function and register it using Infrastructure as Code, but that is not covered here.

To start a Python worker, you need to define a Python module that defines the tasks and/or imports other modules that define tasks. Assuming you have a module called my_tasks, you can run it like so:

python-rota worker my_tasks

If you want to use the same module both as an executable command and as the module you pass to python-rota worker, make sure to only execute your command code when __name__ == "__main__".

Take, for example, this code:

import rota

@rota.task
def example(message):
    print(message)

name = input("Name: ")
print(f"Hello, {name}!")

If you run this as a worker, Python will see name = input("Name: ") and execute it while the module is loading and the worker will not be able to finish initialization until input is given.

However, if you wrap it like so:

if __name__ == "__main__":
    name = input("Name: ")
    print(f"Hello, {name}!")

Then when you start the worker, Python will properly detect that this file is being imported as a module and avoid executing those two lines. Meanwhile, if you run the file with python mymodule.py, they’ll execute just fine!

Run a task synchronously #

To run the task synchronously, call it like a normal function.

send_email("test@example.com", "Test Message", "This is a test message!")

Run a task asynchronously #

To run the task asynchronously, call it with schedule.

send_email.params("test@example.com", "Test Message", "This is a test message!").schedule()

Running a task defined somewhere else #

If you want to run a task scheduled somewhere else, you can either use rota.remote or generate function definitions with python-rota generate.

To schedule an arbitrary task without any code generation:

rota.remote("send_text").params(to_number="+1XXX5550100", text="This is a test message!").schedule()

To generate a module defining all the tasks currently registered in Rota you can use:

python-rota generate -o tasks.py

Then you can use that module like so:

from .tasks import send_text

send_text(to_number="+1XXX5550100", text="This is a test message!")

In previous sections, we used positional arguments even though Rota requires keyword arguments. This is possible because the task definitions happened in the same code base, so the rota module can automatically detect the parameter names.

However, when using remote tasks this information is not available and you must use keyword arguments.

Running a task at a later time #

In the above examples, the task is being scheduled as soon as possible. However, we often want to delay the execution of tasks until some time has passed, e.g. if we’re waiting for an API’s rate limit to reset.

To execute a task five seconds in the future, you can do:

from datetime import timedelta

send_email
    .params("test@example.com", "Test Message", "This is a test message!")
    .schedule(timedelta(seconds=5))

If you want to specify an exact date/time, you can instead use:

from datetime import date, time, timedelta
from zoneinfo import ZoneInfo

# Tomorrow at 12:00 PM in Eastern Time Zone.
tomorrow_noon = datetime.combine(
    date.today() + timedelta(days=1, hours=12),
    time(12, tzinfo=ZoneInfo("America/New_York")),
)

send_email.schedule
    .params("test@example.com", "Test Message", "This is a test message!")
    .schedule(tomorrow_noon)
When you pass in a timedelta it will be converted into a datetime before being sent to Rota, so it’s “5 seconds since I called schedule”, not “5 seconds since Rota received the request”.

Splitting up work for batch processing #

Often times there is a large number of data points you need to process, but it takes a long time to process them in a loop.

Let’s say for example you want to verify every single user in your database. You might do something like:

def verify_all_users():
    for user in get_users():
        user.verify()

As the number of users grow, this function will take longer and longer. Just making it a task isn’t enough, since that task will take a long time to finish. Instead, you can create a task that handles a batch of the users and schedule them to execute in parallel:

from itertools import batched

@rota.task
def verify_users(users):
    user.verify()

def verify_all_users();
    # Schedule the task once every 1000 users.
    for users_batch in batched(get_users(), 1000):
        verify_users.params(users_batch).schedule()

We also support a number of convenience routines to help make batching easier:

def verify_all_users()
    verify_users
        .batch_params(users=get_users())
        .batch_size(1000)
        .schedule()

Chaining together tasks with workflows #

Rota’s most advanced feature is the ability to quickly and easily create “workflows”, which are chains of tasks whose results can feed into each other.

Take this scenario: You made a product where users can send letters to groups of people. In order to make this happen, you must:

  • Go through every address they want to send a letter to and make sure it is a valid, real address. We’ll call this step Validating.
  • Notify the sender of how many letters will be sent and how many will not because of bad addresses. We’ll call this step Prereporting.
  • Generate the letters as PDFs and send them to the printers. We’ll call this step Printing.
  • Notify the mailroom staff that they need to send out the letters we just printed. We’ll call this step Notifying.
  • Notify the sender that letters are printed and sending, and inform them of any that failed to print. We’ll call this step Postreporting.

Here’s one giant task that can take hours for large sends:

def send_letters(user, letters):
    # Validating
    num_to_send = 0
    num_invalid = 0
    letters_to_send = []
    for letter in letters:
        letter.address.verify()
        if letter.address.valid:
            num_to_send += 1
            letters_to_send.append(letter)
        else:
            num_invalid += 1

    # Prereporting
    send_email(user.email, "Prereport", f"Will send: {num_to_send}\nWill not send: {num_invalid}")

    # Printing
    num_success = 0
    num_failure = 0
    for letter in letters_to_send:
        try:
            pdf_blob = letter.generate_pdf()
            printer.print(pdf_blob)
        except:
            num_success += 1
        else:
            num_failure += 1

    # Notifying
    send_email("mailroom@example.com", "Letters Ready", f"You need to send {num_success} letters.")

    # Postreporting
    send_email(user.email, "Postreport", f"{num_success} out of {num_to_send} letters printed successfully and are being sent by our mailroom staff. {num_failure} letters failed to print.")

Simple but since every happens synchronously it’s slow! We can’t just divide this into a bunch of tasks because values are being passed back and forth between them. At the same time, we’d like to accomplish parallelism as easily as possible.

We can do this chaining these tasks together:

def send_letters(user, letters):
    validating    = rota.step("Validating",     validate.batch_params(letters=letters).batch_size(1000))
    prereporting  = rota.step("Prereporting",   prereport.params(user, validating.output.num_to_send, validating.output.num_invalid))
    printing      = rota.step("Printing",       printout.batch_params(letters_to_send=validating.output.letters_to_send).batch_size(500))
    notifying     = rota.step("Notifying",      notify.params(printing.output.num_success))
    postreporting = rota.step("Postreporting",  postreporting.params(printing.output.num_success, validating.output.num_to_send, printing.output.num_failure))

    rota.workflow(validating, prereporting, printing, notifying, postreporting).schedule()

@rota.task
def validate(letters):
    num_to_send = 0
    num_invalid = 0
    letters_to_send = []

    for letter in letters:
        letter.address.verify()
        if letter.address.valid:
            num_to_send += 1
            letters_to_send.append(letter)
        else:
            num_invalid += 1

    return {"num_to_send": num_to_send, "num_invalid": num_invalid, "letters_to_send": letters_to_send}

@rota.task
def prereport(user, num_to_send, num_invalid):
    send_email(user.email, "Prereport", f"Will send: {num_to_send}\nWill not send: {num_invalid}")

@rota.task
def printout(letters_to_send):
    num_success = 0
    num_failure = 0

    for letter in letters_to_send:
        try:
            pdf_blob = letter.generate_pdf()
            printer.print(pdf_blob)
        except:
            num_success += 1
        else:
            num_failure += 1

    return {"num_success": num_success, "num_failure": num_failure}

@rota.task
def notify(num_success):
    send_email("mailroom@example.com", "Letters Ready", f"You need to send {num_success} letters.")

@rota.task
def postreporting(num_success, num_to_send, num_failure):
    send_email(user.email, "Postreport", f"{num_success} out of {num_to_send} letters printed successfully and are being sent by our mailroom staff. {num_failure} letters failed to print.")

The magic here is that workflows are dependency graphs capable of automatically detecting which tasks depend on which others and passing the outputs (return values) from completed tasks as parameters to others.

Additionally, the workflow will be visible in the UI as a graph showing whose waiting on what and making it easy to get each step’s current status.

Timeouts and fallbacks #

Let’s go back to the workflow example and say we have found that validating addresses can sometimes be too slow, and in order to meet our agreements with our customers we need to give up after a certain amount of time and unblock the rest of the flow.

We can do this like so:

validating = rota
    .step("Validating", validate.batch_params(letters=letters).batch_size(1000))
    .skip_after(timedelta(minutes=2))

This will move on to the next step after two minutes.

However, there’s a problem: many of the other steps depend on the output of this task. When the workflow is run like this, the other tasks will fail with a “dependency skipped” error.

You can resolve this by setting a fallback output, like this:

validating = rota
    .step("Validating", validate.batch_params(letters=letters).batch_size(1000))
    .skip_after(timedelta(minutes=2))
    .fallback_output(num_to_send=len(letters), num_invalid=0, letters_to_send=letters)

Now if two minutes pass, the task is skipped and the fallback values are used in their stead. None of the letters will be considered invalid and we’ll try to send all of them. Don’t worry, the mailroom will compost the bad letters!