How to Run Your First Task with RQ, Redis, and Python

September 09, 2020
Written by
Diane Phan
Twilion
Reviewed by

header - How to Run Your First Task with RQ, Redis, and Python

As a developer, it can be very useful to learn how to run functions in the background while being able to monitor the queue in another tab or different system. This is incredibly helpful when managing heavy workloads that might not work efficiently when called all at once, or when making large numbers of calls to a database that returns data slowly over time rather than all at once.

In this tutorial we will implement a RQ queue in Python with the help of Redis to schedule and execute tasks in a timely manner.

demo of the RQ queue running on the terminal

Tutorial Requirements

  • Python 3.6 or newer. If your operating system does not provide a Python interpreter, you can go to python.org to download an installer.

Let’s talk about task queues

Task queues are a great way to allow tasks to work asynchronously outside of the main application flow. There are many task queues in Python to assist you in your project, however, we’ll be discussing a solution today known as RQ.

RQ, also known as Redis Queue, is a Python library that allows developers to enqueue jobs to be processed in the background with workers. The RQ workers will be called when it's time to execute the queue in the background. Using a connection to Redis, it’s no surprise that this library is super lightweight and offers support for those getting started for the first time.

By using this particular task queue, it is possible to process jobs in the background with little to no hassle.

Set up the environment

Create a project directory in your terminal called “rq-test” to follow along.

$ mkdir rq-test
$ cd rq-test

Install a virtual environment and copy and paste the commands to install rq and related packages. If you are using a Unix or MacOS system, enter the following commands:

$ python3 -m venv venv
$ source venv/bin/activate
(venv) $ pip install rq

If you are on a Windows machine, enter the following commands in a prompt window:

$ python -m venv venv
$ source venv\bin\activate
(venv) $ pip install rq

RQ requires a Redis installation on your machine which can be done using the following commands using wget. Redis is on version 6.0.6 at the time of this article publication.

If you are using a Unix or MacOS system, enter these commands to install Redis. This is my personal favorite way to install Redis, but there are alternatives below:

$ wget http://download.redis.io/releases/redis-6.0.6.tar.gz
$ tar xzf redis-6.0.6.tar.gz
$ cd redis-6.0.6
$ make

If you have Homebrew installed, you can type brew install redis in the terminal and refer to this GitHub gist to install Redis on the Mac. For developers using Ubuntu Linux, the command sudo apt-get install redis would get the job done as well.

Run the Redis server in a separate terminal window on the default port with the command src/redis-server from the directory where it's installed.

For Windows users, you would have to follow a separate tutorial to run Redis on Windows. Download the latest zip file on GitHub and extract the contents. Run the redis-server.exe file that was extracted from the zip file to start the Redis server.

The output should look similar to the following after running Redis:

55154:C 25 Aug 2020 16:41:18.968 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
55154:C 25 Aug 2020 16:41:18.969 # Redis version=6.0.6, bits=64, commit=c10e5f1e, modified=1, pid=55154, just started
55154:C 25 Aug 2020 16:41:18.969 # Warning: no config file specified, using the default config. In order to specify a config file use src/redis-server /path/to/redis.conf
55154:M 25 Aug 2020 16:41:18.970 * Increased maximum number of open files to 10032 (it was originally set to 2560).
                _._                                                  
           _.-``__ ''-._                                             
      _.-``    `.  `_.  ''-._           Redis 6.0.6 (c10e5f1e/1) 64 bit
  .-`` .-

Build out the tasks

In this case, a task for Redis Queue is merely a Python function. For this article, we’ll tell the task to print a message to the terminal for a “x” amount of seconds to demonstrate the use of RQ.

Copy and paste the following code to a file named “tasks.py” in your directory.

from datetime import datetime, timedelta
import time

def print_task(seconds):
    print("Starting task")
    for num in range(seconds):
        print(num, ". Hello World!")
        time.sleep(1)
    print("Task completed")

def print_numbers(seconds):
    print("Starting num task")
    for num in range(seconds):
        print(num)
        time.sleep(1)
    print("Task to print_numbers completed")

These are simple tasks that print out numbers and text on the terminal so that we can see if the tasks are executed properly. Using the time.sleep(1) function from the Python time library will allow your task to be suspended for the given number of seconds and overall extend the time of the task so that we can examine their progress.

Feel free to alter this code after the tutorial and create your own tasks. Some other popular tasks are sending a fax message or email by connecting to your email client.

Create your queue

Create another file in the root directory and name it “app.py”. Copy and paste the following code:

from datetime import datetime, timedelta
import time
from redis import Redis
from rq import Queue
import tasks

queue = Queue(connection=Redis())

def queue_tasks():
    queue.enqueue(tasks.print_task, 5)
    queue.enqueue_in(timedelta(seconds=10), tasks.print_numbers, 5)

def main():
    queue_tasks()

if __name__ == "__main__":
    main()

The queue object sets up a connection to Redis and initializes a queue based on that connection. This queue can hold all the jobs required to run in the background with workers.

As seen in the code, the tasks.print_task function is added using the enqueue function. This means that the task added to the queue will be executed immediately

The enqueue_in function is another nifty RQ function because it expects a timedelta in order to schedule the specified job. In this case, seconds is specified, but this variable can be changed according to the time schedule expected for your usage. Check out other ways to schedule a job on this GitHub README.

Since we are testing out the RQ queue, I have enqueued both the tasks.print_task and tasks.print_numbers functions so that we can see their output on the terminal. The third argument passed in is a "5" which also stands for the argument passed into the respective functions. In this case, we are expecting to see print_task() print "Hello World!" five times and for print_numbers() to print 5 numbers in order.

If you have created any additional task, be sure to import your tasks at the top of the file so that all the tasks in your Python file can be accessed.

Run the queue

For the purposes of this article, the gif demo below will show a perfect execution of the tasks in queue so no exceptions will be raised.

The Redis server should still be running in a tab from earlier in the tutorial at this point. If it stopped, run the command src/redis-server inside the redis-6.0.6 folder on one tab, or for developers with a Windows machine, start redis-cli.exe. Open another tab solely to run the RQ scheduler with the command rq worker --with-scheduler.

This should be the output after running the command above.

09:05:05 Worker rq:worker:1c0a81e7e160458283ebe1e21d92a26e: started, version 1.5.0
09:05:05 *** Listening on default...
09:05:05 Cleaning registries for queue: default
INFO:rq.worker:Cleaning registries for queue: default

The worker command activated a worker process in order to connect to Redis and look for any jobs assigned to the queue from the code in app.py.

Lastly, open a third tab in the terminal for the root project directory. Start up the virtual environment again with the command source venv/bin/activate. Then type python app.py to run the project.

Go back to the tab that is running rq worker --with-scheduler. Wait 5 more seconds after the first task is executed to see the next task. Although the live demo gif below wasn’t able to capture the best timing due to having to run the program and record, it is noticeable that there was a pause between tasks until execution and that both tasks were completed within 15 seconds.

demo of the RQ queue running on the terminal

Here’s the sample output inside of the rqworker tab:

20:17:15 Worker rq:worker:9b4bf20b70694bfa9ad2621721136b06: started, version 1.5.1
20:17:15 *** Listening on default...
20:17:15: Trying to acquire locks for default
20:17:15 Cleaning registries for queue: default
20:17:15: Scheduler for default started with PID 93149
20:17:28 default: tasks.print_task(5) (eb70f0fd-c01a-4826-95a4-575919b8d8dd)
Starting task
0 . Hello World!
1 . Hello World!
2 . Hello World!
3 . Hello World!
4 . Hello World!
Task completed
20:17:33 default: Job OK (eb70f0fd-c01a-4826-95a4-575919b8d8dd)
20:17:33 Result is kept for 500 seconds
20:17:38 default: tasks.print_numbers(5) (d4d6fbd4-de7f-49ea-bc96-85a10c2a08c0)
Starting num task
0
1
2
3
4
Task to print_numbers completed
20:17:43 default: Job OK (d4d6fbd4-de7f-49ea-bc96-85a10c2a08c0)
20:17:43 Result is kept for 500 seconds

As seen in the output above, if the tasks written in task.py had a line to return anything, then the result of both tasks are kept for 500 seconds which is the default. A developer can alter the return value's time to live by passing in a result_ttl parameter when adding tasks to the queue.

Handle exceptions and try again

If a job were to fail, you can always set up a log to keep track of the error messages, or you can use the RQ queue to enqueue and retry failed jobs. By using RQ's FailedJobRegistry package, you can keep track of the jobs that failed during runtime. The RQ documentation discusses how it handles the exceptions and how data regarding the job can help the developer figure out how to resubmit the job.  

However, RQ also supports developers in handling exceptions in their own way by injecting your own logic to the rq workers. This may be a helpful option for you if you are executing many tasks in your project and those that failed are not worth retrying.

Force a failed task to retry

Since this is an introductory article to run your first task with RQ, let's try to purposely fail one of the tasks from earlier to test out RQ's retry object.

Go to the tasks.py file and alter the print_task() function so that random numbers can be generated and determine if the function will be executed or not. We will be using the random Python library to assist us in generating numbers. Don't forget to include the import random at the top of the file.

Copy and paste the following lines of code to change the print_task() function in the tasks.py file.

import random

def print_task(seconds):
    print("Starting task")
    random_num = random.randrange(1, 3, 1)
    # optional print statement to see the numbers 
    # print("the randomly generated number = ", random_num) 
    if random_num == 2:
        raise RuntimeError('Sorry, I failed! Let me try again.')
    else:
        for num in range(seconds):
            print(num, ". Hello World!")
            time.sleep(1)
    print("Task completed")

Go back to the app.py file to change the queue. Instead of using the enqueue_in function to execute the tasks.print_task function, delete the line and replace it with queue.enqueue(tasks.print_task, 5, retry=Retry(max=2)).

The retry object is imported with rq so make sure you add from rq import Retry at the top of the file as well in order to use this functionality. This object accepts max and interval arguments to specify when the particular function will be retried. In the newly changed line, the tasks.print_task function will pass in the function we want to retry, the argument parameter "5" which stands for the seconds of execution, and lastly the maximum amount of times we want the queue to retry.

The tasks in queue should now look like this:

def queue_tasks():
    queue.enqueue(tasks.print_task, 5, retry=Retry(max=2))
    queue.enqueue_in(timedelta(seconds=10), tasks.print_numbers, 5)

When running the print_task task, there is a 50/50 chance that tasks.print_task() will execute properly since we're only generating a 1 or 2, and the print statement will only happen if you generate a 1. A RuntimeError will be raised otherwise and the queue will retry the task immediately as many times as it takes to successfully print "Hello World!".

What’s next for task queues?

Congratulations! You have successfully learned and implemented the basics of scheduling tasks in the RQ queue. Perhaps now you can tell the worker command to add a task that prints out an infinite number of "Congratulations" messages in a timely manner!

Otherwise, check out these different tasks that you can build in to your Redis Queue:

Let me know what you have been building by reaching out to me over email!

Diane Phan is a developer on the Developer Voices team. She loves to help programmers tackle difficult challenges that might prevent them from bringing their projects to life. She can be reached at dphan [at] twilio.com or LinkedIn.