Skip to content Skip to sidebar Skip to footer

Celery : Launch Chord Callback After Its Associated Body

When I launch a list of chord() containing a group of tasks and a callback, the callbacks are called only after all the group of tasks have been done, even the tasks which are not

Solution 1:

Your test have biais, since you are using only one worker, time.sleep() will block that worker. Meaning it is not processing anymore tasks even with concurrency at 4.

Is it possible to launch the FINISH_GROUP task just after their associated SHORT_TASK have been finished, and not wait all the others non-related SHORT_TASK please ?

Currently you are not waiting other short_task to finish, they are all scheduled for execution on the same time. Since you are using a sleep, finish_group will get called once all short_task of it's respective chord have ended.

Your current execution looks like:

| chord 1| chord 2| chord 3||--------------|--------------|--------------|| short_task 1|||||| short_task 1|||||| short_task 1||| short_task 2|||||| short_task 2|||||| short_task 2||| short_task 3|||      v
|| short_task 3|| execution order||| short_task 3|| finish_group ||||| finish_group ||||| finish_group |

If you remove the sleep, add more worker, or use gevent. It should look like this:

| chord 1          | chord 2          | chord 3          |
|------------------|------------------|------------- ----|
| short_task 1,2,3 | short_task 1,2,3 | short_task 1,2,3 |
| finish_group     | finish_group     | finish_group     |

And you should see tasks that are on the same line will appears in the log in sligthly (depending of which worker took it first) different order. But finish_group will still be last.


Notice that grouping your tasks is not necessary when using chord

chord(
    short_tasks,
    finish_group.s(nb)
)

Same code but with gevent:

import gevent
from celery import Celery, group, chord, chain

app = Celery('tasks', broker='redis://localhost/4', backend='redis://localhost/5')


@app.task()defshort_task(nb, i):
    print('TEST: start short_task({}, {})'.format(nb, i))
    gevent.sleep(1)
    print('TEST: end   short_task({}, {})'.format(nb, i))
    return i


@app.task(name='FINISH_GROUP')deffinish_group(results, nb):
    print('TEST: finish_group({}) -> {}'.format(nb, results))


@app.taskdefmain(total):
    for nb inrange(1, total+1):
        short_tasks = [short_task.si(nb, i) for i inrange(3)]

        chord(short_tasks, finish_group.s(nb)).apply_async()

Launch with:

$ celery worker -A celery_test --loglevel=debug --concurrency=20 -P gevent 2>&1 | grep TEST

The output will be scramble because of the execution being in parallel.

[2017-11-06 16:40:08,085] TEST:startshort_task(1,0)
[2017-11-06 16:40:08,088] TEST:startshort_task(1,1)
[2017-11-06 16:40:08,091] TEST:startshort_task(1,2)
[2017-11-06 16:40:08,092] TEST:startshort_task(2,0)
[2017-11-06 16:40:08,094] TEST:startshort_task(2,1)
[2017-11-06 16:40:08,096] TEST:startshort_task(2,2)
[2017-11-06 16:40:08,100] TEST:startshort_task(3,0)
[2017-11-06 16:40:08,101] TEST:startshort_task(3,1)
[2017-11-06 16:40:08,103] TEST:startshort_task(3,2)# ^ all short_task have been started at the same time

[2017-11-06 16:40:09,085] TEST:endshort_task(1,0)
[2017-11-06 16:40:09,089] TEST:endshort_task(1,1)
[2017-11-06 16:40:09,093] TEST:endshort_task(1,2)
[2017-11-06 16:40:09,106] TEST:endshort_task(2,0)
[2017-11-06 16:40:09,106] TEST:endshort_task(2,1)
[2017-11-06 16:40:09,107] TEST:endshort_task(2,2)
[2017-11-06 16:40:09,107] TEST:endshort_task(3,0)
[2017-11-06 16:40:09,108] TEST:endshort_task(3,1)
[2017-11-06 16:40:09,108] TEST:endshort_task(3,2)# ^ total execution is only 1 second since 9 greenlet have slept together

[2017-11-06 16:40:09,115] TEST:finish_group(1)-> [0, 1, 2]
[2017-11-06 16:40:09,126] TEST:finish_group(2)-> [2, 1, 0]
[2017-11-06 16:40:09,128] TEST:finish_group(3)-> [0, 1, 2]
# ^ order of results are mixed depending of which greenlet finished first

Post a Comment for "Celery : Launch Chord Callback After Its Associated Body"