Simple use of lightweight task queue RQ

Original link: http://www.nosuchfield.com/2022/05/19/Simple-use-of-the-lightweight-task-queue-RQ/

RQ (Redis Queue) is a lightweight Python task queue, here is a record of its simple use.

First install RQ (the Python version used here is 3.8.0)

 pip install rq==1.10.1

Then create the following files

 .├── __init__.py├── jobs.py└── run.py

Among them, two queues are created by connecting redis-server in __init__.py : default and queue_1

 1
2
3
4
5
6
 from redis import Redis
from rq import Queue

redis_conn = Redis( '127.0.0.1' , db= 0 )
rq_default_queue = Queue( 'default' , connection=redis_conn)
rq_queue_1 = Queue( 'queue_1' , connection=redis_conn)

Then we define the tasks that need to be executed in jobs.py

 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 import requests
from redis import Redis

redis_cli = Redis( '127.0.0.1' , db= 0 )

# The callback function for the successful execution of the task
def report_success (job, connection, result, *args, **kwargs) :
print(result)

# The task of counting the number of words on a webpage
def count_words_at_url (url) :
resp = requests.get(url)
return len(resp.text.split())

# Get Redis key service
def get_redis_keys () :
return redis_cli.keys()

After creating the task sending queue and the task itself, we can start the worker. We let the worker listen to two queues: default and queue_1

 rq worker queue_1 default --with-scheduler

Then we can send the task to the worker in run.py to execute

 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 from datetime import timedelta
import time
from rq import Retry
from rq_demo import rq_default_queue, rq_queue_1
from jobs import count_words_at_url, get_redis_keys, report_success

rq_default_queue.enqueue(count_words_at_url, 'https://www.jd.com' , on_success=report_success, retry=Retry(max= 3 , interval=[ 1 , 3 , 6 ]))

job1 = rq_default_queue.enqueue(count_words_at_url, 'http://nvie.com' )
print(job1.result)
time.sleep( 2 )
print(job1.result)

job2 = rq_queue_1.enqueue_in(timedelta(seconds= 5 ), get_redis_keys)
print(job2.result)
time.sleep( 6 )
print(job2.result)

The execution result is as follows

 None330None[b'rq:workers:queue_1', b'rq:worker:93618c280dbb445d92380fc26a33bc93', b'rq:job:909c68e7-a517-469a-ad33-c7f350e4f1dd', b'rq:scheduler-lock:default', b'rq:failed:default', b'rq:wip:queue_1', b'rq:job:f098f9f7-82b2-4a4f-b482-6d4cffd9b09e', b'rq:job:e006b2b5-69cf-4240-a0f2-202d31d71ad9', b'rq:scheduler-lock:queue_1', b'rq:finished:default', b'rq:queues', b'rq:job:aaaaf0e1-3673-4652-8109-20c46ebb89e0', b'rq:job:75b3839e-b8bb-45d8-bfe0-64ce63a502ab', b'rq:job:3c013ebb-7e68-4af2-9291-2abc7f8f0ba3', b'rq:job:8a40b7af-6b10-46a7-9b20-c9af734c0e26', b'rq:clean_registries:default', b'rq:job:43db8266-c1d0-4256-b355-00135e6a67c6', b'rq:clean_registries:queue_1', b'rq:job:e0f68454-c599-45db-b572-a7364f911b4f', b'rq:job:f9adc85e-400a-4e33-95cc-6f68c4d8a0d2', b'rq:workers', b'rq:workers:default']

The output of the worker is as follows

 ➜ rq_demo git:(master) ./rq.sh17:28:32 Worker rq:worker:93618c280dbb445d92380fc26a33bc93: started, version 1.10.117:28:32 Subscribing to channel rq:pubsub:93618c280dbb445d92380fc26a33bc9317:28:32 *** Listening on queue_1, default...17:28:32 Trying to acquire locks for default, queue_117:28:32 Cleaning registries for queue: queue_117:28:32 Cleaning registries for queue: default17:28:38 default: jobs.count_words_at_url('https://www.jd.com') (f098f9f7-82b2-4a4f-b482-6d4cffd9b09e)370017:28:39 default: Job OK (f098f9f7-82b2-4a4f-b482-6d4cffd9b09e)17:28:39 Result is kept for 500 seconds17:28:39 default: jobs.count_words_at_url('http://nvie.com') (43db8266-c1d0-4256-b355-00135e6a67c6)17:28:40 default: Job OK (43db8266-c1d0-4256-b355-00135e6a67c6)17:28:40 Result is kept for 500 seconds17:28:45 queue_1: jobs.get_redis_keys() (75b3839e-b8bb-45d8-bfe0-64ce63a502ab)17:28:45 queue_1: Job OK (75b3839e-b8bb-45d8-bfe0-64ce63a502ab)17:28:45 Result is kept for 500 seconds

You can see that the task has been executed. Of course, we can also start multiple workers to accept tasks to execute at the same time, which can improve the efficiency of task execution.

The code involved in this article is on my GitHub .

This article is reprinted from: http://www.nosuchfield.com/2022/05/19/Simple-use-of-the-lightweight-task-queue-RQ/
This site is for inclusion only, and the copyright belongs to the original author.

Leave a Comment