Celery : Launch Chord Callback After Its Associated Body
Solution 1:
Your test have biais, since you are using only one worker, time.sleep()
will block that worker. Meaning it is not processing anymore tasks even with concurrency at 4.
Is it possible to launch the FINISH_GROUP task just after their associated SHORT_TASK have been finished, and not wait all the others non-related SHORT_TASK please ?
Currently you are not waiting other short_task
to finish, they are all scheduled for execution on the same time. Since you are using a sleep, finish_group
will get called once all short_task
of it's respective chord have ended.
Your current execution looks like:
| chord 1| chord 2| chord 3||--------------|--------------|--------------|| short_task 1|||||| short_task 1|||||| short_task 1||| short_task 2|||||| short_task 2|||||| short_task 2||| short_task 3||| v
|| short_task 3|| execution order||| short_task 3|| finish_group ||||| finish_group ||||| finish_group |
If you remove the sleep, add more worker, or use gevent. It should look like this:
| chord 1 | chord 2 | chord 3 |
|------------------|------------------|------------- ----|
| short_task 1,2,3 | short_task 1,2,3 | short_task 1,2,3 |
| finish_group | finish_group | finish_group |
And you should see tasks that are on the same line will appears in the log in sligthly (depending of which worker took it first) different order. But finish_group
will still be last.
Notice that group
ing your tasks is not necessary when using chord
chord(
short_tasks,
finish_group.s(nb)
)
Same code but with gevent:
import gevent
from celery import Celery, group, chord, chain
app = Celery('tasks', broker='redis://localhost/4', backend='redis://localhost/5')
@app.task()defshort_task(nb, i):
print('TEST: start short_task({}, {})'.format(nb, i))
gevent.sleep(1)
print('TEST: end short_task({}, {})'.format(nb, i))
return i
@app.task(name='FINISH_GROUP')deffinish_group(results, nb):
print('TEST: finish_group({}) -> {}'.format(nb, results))
@app.taskdefmain(total):
for nb inrange(1, total+1):
short_tasks = [short_task.si(nb, i) for i inrange(3)]
chord(short_tasks, finish_group.s(nb)).apply_async()
Launch with:
$ celery worker -A celery_test --loglevel=debug --concurrency=20 -P gevent 2>&1 | grep TEST
The output will be scramble because of the execution being in parallel.
[2017-11-06 16:40:08,085] TEST:startshort_task(1,0)
[2017-11-06 16:40:08,088] TEST:startshort_task(1,1)
[2017-11-06 16:40:08,091] TEST:startshort_task(1,2)
[2017-11-06 16:40:08,092] TEST:startshort_task(2,0)
[2017-11-06 16:40:08,094] TEST:startshort_task(2,1)
[2017-11-06 16:40:08,096] TEST:startshort_task(2,2)
[2017-11-06 16:40:08,100] TEST:startshort_task(3,0)
[2017-11-06 16:40:08,101] TEST:startshort_task(3,1)
[2017-11-06 16:40:08,103] TEST:startshort_task(3,2)# ^ all short_task have been started at the same time
[2017-11-06 16:40:09,085] TEST:endshort_task(1,0)
[2017-11-06 16:40:09,089] TEST:endshort_task(1,1)
[2017-11-06 16:40:09,093] TEST:endshort_task(1,2)
[2017-11-06 16:40:09,106] TEST:endshort_task(2,0)
[2017-11-06 16:40:09,106] TEST:endshort_task(2,1)
[2017-11-06 16:40:09,107] TEST:endshort_task(2,2)
[2017-11-06 16:40:09,107] TEST:endshort_task(3,0)
[2017-11-06 16:40:09,108] TEST:endshort_task(3,1)
[2017-11-06 16:40:09,108] TEST:endshort_task(3,2)# ^ total execution is only 1 second since 9 greenlet have slept together
[2017-11-06 16:40:09,115] TEST:finish_group(1)-> [0, 1, 2]
[2017-11-06 16:40:09,126] TEST:finish_group(2)-> [2, 1, 0]
[2017-11-06 16:40:09,128] TEST:finish_group(3)-> [0, 1, 2]
# ^ order of results are mixed depending of which greenlet finished first
Post a Comment for "Celery : Launch Chord Callback After Its Associated Body"