Skip to content Skip to sidebar Skip to footer

Parallel Processing In Foreach Loop

Hello I have a situation where I am calling some API to get a list of movies. For each record in the list, I call another API. I would like to make that for loop parallel for bette

Solution 1:

The best option for this would be to use threads. Threads in Python cannot use CPUs in parallel, but they can execute concurrently while there are blocking operations. Processes, although the can really run in parallel, are slow to start and communicate with, and are better suited to big CPU-bounded work loads. Also, as you indicate in your question, processes can sometimes be difficult to launch.

You can use the somewhat-secret (i.e. undocumented but actually well known) multiprocessing.pool.ThreadPool class. If you are going to be doing this many times, you can create a pool once at the beginning and reuse it. You just need to make sure pool.close() and maybe also pool.join() are called when the program exits.

from multiprocessing.pool import ThreadPool

# Global/class variables    
NUM_THREADS = 5
pool = ThreadPool(NUM_THREADS)

# Inside some function/method
return pool.map(lambda movie: tmdb.get_movie(movie['detail']['ids']['tmdb_id']), movies)

# On exit
pool.close()  # Prevents more jobs to be submitted
pool.join()  # Waits until all jobs are finished

Solution 2:

You question is very broad and leaves out many of details, so here's a outline of what would need to be done. To avoid the PicklingError, the database is opened in each process — which can be done by using an initializer function (named start_process() in the example code below).

Note: Due the the overhead involved in initializing the database to do one query, @jdehesa's multi-threading approach would likely be the better tactic in this situation (threading generally makes sharing a global variable less costly). Alternatively, you could make the get_movie() interface function process more than one id each time it's called (i.e. "batches" of them).

class Database:
    """ Mock implementation. """
    def __init__(self, *args, **kwargs):
        pass  # Open/connect to database.

    def get_movie(self, id):
        return 'id_%s_foobar' % id


import multiprocessing as mp

def start_process(*args):
    global tmdb
    tmdb = Database(*args)

def get_movie(id):
    tmdb_movie = tmdb.get_movie(id)
    return tmdb_movie

if __name__ == '__main__':

    collection = [{'detail': {'ids': {'tmdb_id': 1}}},
                  {'detail': {'ids': {'tmdb_id': 2}}},
                  {'detail': {'ids': {'tmdb_id': 3}}}]

    pool_size = mp.cpu_count()
    with mp.Pool(processes=pool_size, initializer=start_process,
                 initargs=('init info',)) as pool:
        movies = pool.map(get_movie, (movie['detail']['ids']['tmdb_id']
                                        for movie in collection))

    print(movies)  # -> ['id_1_foobar', 'id_2_foobar', 'id_3_foobar']

An multiprocessing alternative which would allow the database to be shared to some degree by multiple processes without connecting to it each time, would be to define a custom multiprocessing.Manager() that opened the database once, and provided an interface to it to get the info for one (or more movies) given their id(s). This is also discussed in the Sharing state between processes section (in the Server Process subsection) of the online documentation. The built-in Manager supports a number of container types, lists, dicts, and Queues.

Below is example code showing how to create your own custom manager for the database. If you uncomment the calls to print(), you'll see that only one Database instance is created:

class Database:
    """ Mock implementation. """
    def __init__(self, *args, **kwargs):
#        print('Database.__init__')
        pass  # Open/connect to database.

    def get_movie(self, id):
        return 'id_%s_foobar' % id


from functools import partial
import multiprocessing as mp
from multiprocessing.managers import BaseManager


class DB_Proxy(object):
    """ Shared Database instance proxy class. """
    def __init__(self, *args, **kwargs):
        self.database = Database(*args, **kwargs)

    def get_movie(self, id):
#        print('DB_Proxy.get_movie')
        tmdb_movie = self.database.get_movie(id)
        return tmdb_movie


class MyManager(BaseManager): pass  # Custom Manager

MyManager.register('DB_Proxy', DB_Proxy)


if __name__ == '__main__':

    collection = [{'detail': {'ids': {'tmdb_id': 1}}},
                  {'detail': {'ids': {'tmdb_id': 2}}},
                  {'detail': {'ids': {'tmdb_id': 3}}}]

    manager = MyManager()
    manager.start()

    db_proxy = manager.DB_Proxy('init info')

    pool_size = mp.cpu_count()
    with mp.Pool(pool_size) as pool:
        movies = pool.map(db_proxy.get_movie,
                          (movie['detail']['ids']['tmdb_id']
                            for movie in collection))

    print(movies)  # -> ['id_1_foobar', 'id_2_foobar', 'id_3_foobar']

Post a Comment for "Parallel Processing In Foreach Loop"