Simple Multicore programming in Python
I recently had written a Python program to perform some performance calculations over the results from a regression model. The program was not too complex but the sheer number of calculations to be performed made it quite slow and resulted in a runtime of several hours. In my search for a way to utilize the multiple cores of the machine the program was running on I came across the Ray package from https://www.ray.io/ which makes it very easy to distribute a single-core process onto multiple cores.
Let’s say we have a long-running worker function called calculate_metrics() which performs the performance calculations over a Pandas dataframe we are looking for. The actual contents of these calculations are not really the point right now, this example shows a way to parallelize this workload. The extremely simplified example of this program looks as follows:
def main():
# Retrieve dataframe from database
df = retrieve_data()
metrics = calculate_metrics(df)
The retrieve_data() function is retrieving the large dataset from the database, so depending on its size the calculation time will very but will run on a single core. Now in order to split this workload onto multiple cores the approach will be as follows:
- Utilize a unique identifier in the dataframe to split it up into 4 equal parts
- Call the calculate_metrics() function on each of the dataframe parts in parallel.
- Take the error metrics results from each of the calculate_metrics() instances and merge them together
Now obviously this approach assumes that each row of the dataframe can be processed individually and that there are no dependencies between dataframe rows that may exist in the different chunks. I will be using the ray package for distributing the workload over a fixed number of 4 cores which can be optimized for your use case. Ray simply requires that you annotate your worker function with the @ray.remote annotation to enable it for running on multiple cores. The improved program will look as follows:
import ray
import pandas as pd
#
# Calculate error metrics from the given data frame
#
@ray.remote
def calculate_metrics(pandas_df):
# Metrics calculation logic goes here
def main():
# Retrieve dataframe from database or CSV file
df = retrieve_data()
# The OID column is the unique identifier for each row
# Use it to split the dataframe into four equal parts
valid_oids = df.OID.unique()
valid_oids_split = np.array_split(valid_oids, 4)
df_1 = df[df['OID'].isin(valid_oids_split[0])]
df_2 = df[df['OID'].isin(valid_oids_split[1])]
df_3 = df[df['OID'].isin(valid_oids_split[2])]
df_4 = df[df['OID'].isin(valid_oids_split[3])]
# Start ray and disable logging of warnings to console
ray.init(log_to_driver=False)
# Call calculate_metrics() four times in parallel
print('Calculating metrics')
futures = [ calculate_metrics.remote(df_1),
calculate_metrics.remote(df_2),
calculate_metrics.remote(df_3),
calculate_metrics.remote(df_4)
]
# Merge the results from all of the functions
metrics1, metrics2, metrics3, metrics4 = ray.get(futures)
df_final = pd.concat([metrics1, metrics2, metrics3, metrics4])
This shows that as long as you have a way to split up your workload into equal chunks and assign them to separate instances of a function you are perfectly fine using Python combined with the Ray package. The annotation mechanism is a very elegant way of adding parallelization to already existing code without having to modify the actual calculations within the code itself.