Parallelization

In practice, parallelization is essential and can significantly speed up optimization. For population-based algorithms, the evaluation of a set of solutions can be parallelized easily by parallelizing the evaluation itself.

Vectorized Matrix Operations

One way is using the NumPy matrix operations, which has been used for almost all test problems implemented in pymoo. By default, elementwise_evaluation is set to False, which implies the _evaluate retrieves a set of solutions. Thus, x is a matrix where each row is an individual, and each column a variable.

[1]:
import numpy as np

from pymoo.core.problem import Problem

class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, **kwargs)

    def _evaluate(self, x, out, *args, **kwargs):
         out["F"] = np.sum(x ** 2, axis=1)

The axis=1 operation parallelizes the sum of the matrix directly using an efficient NumPy operation.

[2]:
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.optimize import minimize

res = minimize(MyProblem(), GA())
print('Threads:', res.exec_time)
Threads: 0.8583278656005859

Starmap Interface

In general, pymoo allows passing a starmap object to be used for parallelization. The starmap interface is defined in the Python standard library multiprocessing.Pool.starmap function. This allows excellent and flexible parallelization opportunities.

IMPORTANT: Please note that the problem needs to have set elementwise_evaluation=True, which implicates one call of _evaluate only takes care of a single solution.

[1]:
from pymoo.core.problem import ElementwiseProblem

class MyProblem(ElementwiseProblem):

    def __init__(self, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, **kwargs)

    def _evaluate(self, x, out, *args, **kwargs):
         out["F"] = (x ** 2).sum()

Then, we can pass a starmap object to be used for parallelization.

Threads

[2]:
from pymoo.core.problem import starmap_parallelized_eval
from multiprocessing.pool import ThreadPool

# the number of threads to be used
n_threads = 8

# initialize the pool
pool = ThreadPool(n_threads)

# define the problem by passing the starmap interface of the thread pool
problem = MyProblem(runner=pool.starmap, func_eval=starmap_parallelized_eval)
[3]:
from pymoo.algorithms.soo.nonconvex.pso import PSO
from pymoo.optimize import minimize

res = minimize(problem, PSO(), seed=1, n_gen=100)
print('Threads:', res.exec_time)
Threads: 0.6641809940338135
[4]:
pool.close()

Processes

[5]:
import multiprocessing

# the number of processes to be used
n_proccess = 8
pool = multiprocessing.Pool(n_proccess)
problem = MyProblem(runner=pool.starmap, func_eval=starmap_parallelized_eval)
[6]:
res = minimize(problem, PSO(), seed=1, n_gen=100)
print('Processes:', res.exec_time)
Processes: 5.308876037597656
[7]:
pool.close()

Note: Here clearly the overhead of serializing and transfer the data are visible.

Dask

More advanced is to distribute the evaluation function to a couple of workers. There exists a couple of frameworks that support the distribution of code. For our framework, we recommend using Dask.

Documentation to setup the cluster is available here. You first start a scheduler somewhere and then connect workers to it. Then, a client object connects to the scheduler and distributes the jobs automatically for you.

[1]:
from dask.distributed import Client
client = Client()
client.restart()
print("STARTED")
STARTED
[9]:
import numpy as np
from pymoo.core.problem import ElementwiseProblem, dask_parallelized_eval

from dask.distributed import Client
client = Client()

# create the problem and set the parallelization to dask
problem = MyProblem(runner=client, func_eval=dask_parallelized_eval)

res = minimize(problem, PSO(), seed=1, n_gen=100)
print('Dask:', res.exec_time)
/Users/blankjul/anaconda3/lib/python3.7/site-packages/distributed/dashboard/core.py:74: UserWarning:
Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.
  warnings.warn("\n" + msg)
Dask: 111.35524010658264
[10]:
client.close()

Note: Here, the overhead of transferring data to the workers of Dask is dominating. However, if your problem is computationally more expensive, this shall not be the case anymore.

Custom Parallelization

If you need more control over the parallelization process, we like to provide an example of fully customizable parallelization. The _evaluate function gets the whole set of solutions to be evaluated because, by default, elementwise_evaluation is disabled.

Threads

Thus, a thread pool can be initialized in the constructor of the Problem class and then be used to speed up the evaluation. The code below basically does what internally happens using the starmap interface of pymoo directly (with an inline function definition and without some overhead, this is why it is slightly faster).

[11]:
from pymoo.core.problem import Problem

pool = ThreadPool(8)

class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, **kwargs)

    def _evaluate(self, X, out, *args, **kwargs):

        # define the function
        def my_eval(x):
            return (x ** 2).sum()

        # prepare the parameters for the pool
        params = [[X[k]] for k in range(len(X))]

        # calculate the function values in a parallelized manner and wait until done
        F = pool.starmap(my_eval, params)

        # store the function values and return them.
        out["F"] = np.array(F)

problem = MyProblem()
[12]:
res = minimize(problem, PSO(), seed=1, n_gen=100)
print('Threads:', res.exec_time)
Threads: 0.6479101181030273
[13]:
pool.close()

Dask

[14]:
import numpy as np
from dask.distributed import Client

from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.core.problem import Problem
from pymoo.optimize import minimize

client = Client(processes=False)

class MyProblem(Problem):

    def __init__(self, *args, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5,
                         elementwise_evaluation=False, *args, **kwargs)

    def _evaluate(self, X, out, *args, **kwargs):
        def fun(x):
            return np.sum(x ** 2)

        jobs = [client.submit(fun, x) for x in X]
        out["F"] = np.row_stack([job.result() for job in jobs])

[15]:
problem = MyProblem()

res = minimize(problem, PSO(), seed=1, n_gen=100)
print('Dask:', res.exec_time)

client.close()
Dask: 11.151088953018188