Parallel Processing In Foreach Loop
Solution 1:
The best option for this would be to use threads. Threads in Python cannot use CPUs in parallel, but they can execute concurrently while there are blocking operations. Processes, although the can really run in parallel, are slow to start and communicate with, and are better suited to big CPU-bounded work loads. Also, as you indicate in your question, processes can sometimes be difficult to launch.
You can use the somewhat-secret (i.e. undocumented but actually well known) multiprocessing.pool.ThreadPool
class. If you are going to be doing this many times, you can create a pool once at the beginning and reuse it. You just need to make sure pool.close()
and maybe also pool.join()
are called when the program exits.
from multiprocessing.pool import ThreadPool
# Global/class variables
NUM_THREADS = 5
pool = ThreadPool(NUM_THREADS)
# Inside some function/method
return pool.map(lambda movie: tmdb.get_movie(movie['detail']['ids']['tmdb_id']), movies)
# On exit
pool.close() # Prevents more jobs to be submitted
pool.join() # Waits until all jobs are finished
Solution 2:
You question is very broad and leaves out many of details, so here's a outline of what would need to be done. To avoid the PicklingError
, the database is opened in each process — which can be done by using an initializer
function (named start_process()
in the example code below).
Note: Due the the overhead involved in initializing the database to do one query, @jdehesa's multi-threading approach would likely be the better tactic in this situation (threading generally makes sharing a global variable less costly). Alternatively, you could make the get_movie()
interface function process more than one id
each time it's called (i.e. "batches" of them).
class Database:
""" Mock implementation. """
def __init__(self, *args, **kwargs):
pass # Open/connect to database.
def get_movie(self, id):
return 'id_%s_foobar' % id
import multiprocessing as mp
def start_process(*args):
global tmdb
tmdb = Database(*args)
def get_movie(id):
tmdb_movie = tmdb.get_movie(id)
return tmdb_movie
if __name__ == '__main__':
collection = [{'detail': {'ids': {'tmdb_id': 1}}},
{'detail': {'ids': {'tmdb_id': 2}}},
{'detail': {'ids': {'tmdb_id': 3}}}]
pool_size = mp.cpu_count()
with mp.Pool(processes=pool_size, initializer=start_process,
initargs=('init info',)) as pool:
movies = pool.map(get_movie, (movie['detail']['ids']['tmdb_id']
for movie in collection))
print(movies) # -> ['id_1_foobar', 'id_2_foobar', 'id_3_foobar']
An multiprocessing alternative which would allow the database to be shared to some degree by multiple processes without connecting to it each time, would be to define a custom multiprocessing.Manager()
that opened the database once, and provided an interface to it to get the info for one (or more movies) given their id(s). This is also discussed in the Sharing state between processes section (in the Server Process subsection) of the online documentation. The built-in Manager
supports a number of container types, list
s, dict
s, and Queue
s.
Below is example code showing how to create your own custom manager for the database. If you uncomment the calls to print()
, you'll see that only one Database
instance is created:
class Database:
""" Mock implementation. """
def __init__(self, *args, **kwargs):
# print('Database.__init__')
pass # Open/connect to database.
def get_movie(self, id):
return 'id_%s_foobar' % id
from functools import partial
import multiprocessing as mp
from multiprocessing.managers import BaseManager
class DB_Proxy(object):
""" Shared Database instance proxy class. """
def __init__(self, *args, **kwargs):
self.database = Database(*args, **kwargs)
def get_movie(self, id):
# print('DB_Proxy.get_movie')
tmdb_movie = self.database.get_movie(id)
return tmdb_movie
class MyManager(BaseManager): pass # Custom Manager
MyManager.register('DB_Proxy', DB_Proxy)
if __name__ == '__main__':
collection = [{'detail': {'ids': {'tmdb_id': 1}}},
{'detail': {'ids': {'tmdb_id': 2}}},
{'detail': {'ids': {'tmdb_id': 3}}}]
manager = MyManager()
manager.start()
db_proxy = manager.DB_Proxy('init info')
pool_size = mp.cpu_count()
with mp.Pool(pool_size) as pool:
movies = pool.map(db_proxy.get_movie,
(movie['detail']['ids']['tmdb_id']
for movie in collection))
print(movies) # -> ['id_1_foobar', 'id_2_foobar', 'id_3_foobar']
Post a Comment for "Parallel Processing In Foreach Loop"