Parallelisation

Open in Colab

import sys
IN_COLAB = 'google.colab' in sys.modules
if IN_COLAB:
    %pip install dask[dataframe] ray

What is it?

Parallelisation divides a large problem into many smaller ones and solves them simultaneously.

This divides up the time/space complexity.

These divided up tasks are centrally managed by a scheduler.

The work can be split across:

  • Multiple processes (cores)

    • A process is the instance of a computer program.

    • This program is being executed by one or many threads.

    • Useful for compute-bound problems.

  • Multiple threads (parts of processes)

    • A thread of execution is the smallest sequence of programmed instructions.

    • Useful for memory-bound problems.

_images/process-threads.png

Image source

Parallelising a Python?

Python itself is not designed for massive scalability and controls threads preemptively using the Global Interpreter Lock, GIL.

(The GIL prevents running on mutliple threads simutaneously.)

This has lead many libraries to work around this using C/C++ backends.

Some options include:

  • multiprocessing for creating a pool of asynchronous workers.

  • joblib for creating lightweight pipelines that help with embaressingly parallel tasks.

  • asyncio for concurrent programs, especially ones that are input/output bound.

  • concurrent.futures for launching parallel tasks.

These options work well for the CPU cores on your machine, though not really beyond that.

Dask

Dask has great features, excellent documentation, and a familiar API.

It works through creating and computing task graphs.

Task graphs have nodes (functions) and edges (objects).

For example, the task graph might be:

dask-task-graphs.svg

Image source

These task graphs are executed by a scheduler.

Note

This is separate to the HPC’s scheduler.

The resources used by this scheduler are managed by a cluster.

There are two main types of Dask scheduler which can deploy jobs:

  • Single machine

    • Cluster manager: LocalCluster()

    • Simpler.

    • For your laptop or a local server.

    • Test things out here first.

  • Distributed

    • Cluster manager: SGECluster(), SLURMCluster()

    • More complex.

    • For a cluster on a high performance computer (HPC) (e.g., SGE, SLURM), Kubernetes, or cloud.

    • Once working correctly on a single machine, move over to distributed.

To simplify things, you can think of the scheduler and the cluster manager as the same thing (i.e., the Dask manager). You then connect to this Dask manager using a client to make workers.

dask-cluster-manager.svg

Image source

Single machine

if not IN_COLAB:
    from dask.distributed import Client, LocalCluster
    cluster = LocalCluster()  # the Dask manager itself
    client = Client(cluster)  # connecting to this Dask manager to see whats doing
# if not in colab, can render the client and cluster information
client

Client

Client-6cb677ce-9af9-11ec-9039-6045bda294e4

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

If instead you wanted multiple threads, then could specify this in the Client instance:

client = Client(processes=False, threads_per_worker=4, n_workers=1)

Attention

Always close down the client and cluster at the end.

client.close()
cluster.close()

Applications

There are many ways you can use Dask.

For example, there are:

Here, out example is going to do some maths on arrays. Don’t worry about what it is. Just focus on the Dask bits.

import numpy as np
import dask.array as da

Warning

Parallelised code often introduces overheads e.g., the Dask manager.

For small jobs, these can take up a large relative portion of the resources, making the job inefficient. Hence, it is worthwhile ensuring that the job is big enough that these overheads are small in comparison. The examples here in this tutorial are on the small side purely for teaching purposes.

x = da.random.random(
    (10_000, 10_000, 10), 
    chunks=(1_000, 1_000, 5))
y = da.random.random(
    (10_000, 10_000, 10), 
    chunks=(1_000, 1_000, 5))
z = (da.arcsin(x) + da.arccos(y)).sum(axis=(1, 2))

No work has happened yet.

All that has been done is that a task graph was created. This contains the future work that we will eventually compute.

So, if I take a look at z, I can see that it will eventually be a NumPy array, but at the moment it is lots of Dask chunks that are lazily loaded.

z
Array Chunk
Bytes 78.12 kiB 7.81 kiB
Shape (10000,) (1000,)
Count 1310 Tasks 10 Chunks
Type float64 numpy.ndarray
10000 1

The first thing to do is often to take a look at the task graph.

This is done by called the .visualise() method on the Dask object.

This may help find potential bottlenecks.

For example:

z.visualize()
dot: graph is too large for cairo-renderer bitmaps. Scaling by 0.333945 to fit
_images/05_parallelisation_26_1.png

These can often be too large for the Jupyter cell.

Instead, we can look at the copy saved in this folder under mydask.png.

We can see that this large graph has lots of parallel tasks!

Before we compute anything, it’s helpful to setup Dask’s live dashboard.

Local profiling and diagnostics

Many of the profiling tools we looked at earlier don’t work well with parallel code.

Dask provides its own useful tools.

Interactive dashboard

The Dask dashboard provides live feedback and diagnostics in many plots and tables using Bokeh.

First, get the Dask dashboard address (normally http://localhost:8787/status):

print(client.dashboard_link)
http://127.0.0.1:8787/status

Either load that link in a browser or copy the address into the Dask Lab Extension.

The Dask Lab Extension should now show all of the client and cluster information.

You can place some of these panels to the right hand side e.g.,:

  • Task Stream

  • Progress

  • Workers

Task Stream

  • Each row is the activity on a process (core) or thread within the cluster over time.

  • Each rectangle is one task.

  • The different colours and different types of work:

    • Idle (white)

    • Serialisation (grey)

    • Communication (red)

    • Disk input/output (orange)

    • Other tasks e.g., computation (other colours)

SegmentLocal Image source

Progress

  • Progresses from left to right, showing the number of tasks complete and remaining.

  • Same colours as above, with grey for ready to run, darker colours for data in memory, and lighter colours for complete tasks released from memory.

SegmentLocal Image source

Workers

  • Shows the CPU percentage use (also per worker).

  • Shows the memory percentage use (also per worker).

Note

Remember, this is Dask’s view of the world, so it may be different to the view from your laptop or HPC.

There is lots of other useful information, such as (video demonstration):

  • Profiler

    • Statistical (sampling every 10ms) capture of the call stack from the Dask schedulers perspective.

SegmentLocal

Image source

  • System

  • Logs

  • Individual workers

  • etc.

Now, when you .compute() the task graph, you can view the progress in these dashboard panels.

result = z.compute()
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker

And now the result is converted to a NumPy array in memory.

type(result)
numpy.ndarray
result
array([157034.51615065, 157314.03067803, 157032.42473922, ...,
       157067.68038372, 157327.71828624, 156917.61075045])

Question

What did our Dask Dashboard show?

If you want a shareable report of this interactive dashboard, you can use the performance report context manager.

from dask.distributed import performance_report
with performance_report(filename="dask-report.html"):
    result = z.compute()

Can then view this performance report here or in a browser:

from IPython.display import display, HTML
display(HTML(filename='dask-report.html'))
Dask Performance Report

This can also be shared with colleagues:

For example, for this report click here.

Static profilers

There are also a range of simpler, static profilers, such as:

When you’re happy that everything is correct on a single machine, you can move over to distributed.

Attention

Remember to close down the local client and cluster when you’re finished.

if not IN_COLAB:   
    client.close()
    cluster.close()

Distributed

Dask-MPI

Uses the mpi4py package and MPI to distribute the workers (not communication).

MPI (Message Passing Interface) is where many systems send and receive messages (sometimes containing data) between processes with their own (private) memory.

It’s suitable for problems that require distributed memory. For example, if your computations are mostly generic Python code (i.e., non-numeric) that don’t release the GIL.

MPI is parallelism between nodes (multi-process).

mpi.png

Image source

mpi4py provides MPI for Python, allowing Python applications to use multiple processes.

To use MPI in batch jobs, you can specify the number of processes using -pe ib.

For example, for 2 processes:

#$ -pe ib 2

When you need to share memory across chunks, you can use use OpenMP (Open Multi-Processing).

It’s suitable for problems that are mostly numeric (e.g., NumPy and Pandas) that release the GIL entirely.

OpenMP is parallelism within nodes (multi-thread).

openmp.png

Image source

To use OpenMP in batch jobs, you can specify the number of threads using -pe smp.

For example, for 2 threads:

#$ -pe smp 2
Submission script

First, initialise a Dask MPI session and connect a client to it.

The initialisation launches the Dask Scheduler on MPI rank 0 (first process), the user’s Client code on MPI rank 1 (second process), and the Dask Workers on MPI ranks 2 and above (all the other processes).

So, for 8 processes, there will be 6 workers.

from dask_mpi import initialize
from dask.distributed import Client

initialize()
client = Client()

Then, add in your Dask work.

Here, we’re making the example from earlier bigger.

This increases efficiency, by reducing the relative size of Dask’s overheads (scheduler and client).

Ensure to have the .compute() call within the performance_report context manager to capture the diagnostics.

import dask.array as da
from dask.distributed import performance_report


def example_function():
    x = da.random.random(
        (100_000, 100_000, 10),
        chunks=(10_000, 10_000, 5))
    y = da.random.random(
        (100_000, 100_000, 10),
        chunks=(10_000, 10_000, 5))
    z = (da.arcsin(x) + da.arccos(y)).sum(axis=(1, 2))

    with performance_report(filename="dask-report_mpi.html"):
        result = z.compute()


if __name__ == "__main__":
    example_function()

The full script can be found in example_dask_mpi_sge.py.

Batch job

First, setup the resource request from the HPC scheduler.

Note

Remember, this is separate to the Dask scheduler.

#!/bin/bash -l
#$ -cwd -V
#$ -l h_rt=01:00:00
#$ -pe smp 8
#$ -l h_vmem=48G

Then, load an MPI module e.g.,:

module load intel openmpi

Warning

Issues can arise from oversubscribing threads.

To avoid this issue, ensure the following environment variables are set within your batch job.

You could also add these to your ~/.bashrc file.

# ensure linear algebra libraries using 1 thread
# https://docs.dask.org/en/stable/array-best-practices.html#avoid-oversubscribing-threads
export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1

Load the conda environment e.g.,:

conda activate swd6_hpp

Ensure that number of cores match in the requested resources at the top and in the mpirun call e.g.,:

#$ -pe smp 8

mpirun -np 8 python example_dask_mpi_sge.py

The full script can be found in example_dask_mpi_sge.bash.

Distributed profiling and diagnostics

View the Dask performance report here.

You can see the 6 workers, and the diagnostics look similar to that for the local machine.

Question

How can we check that the job used the CPU cores efficiently?

Dask-Jobqueue

Dask-Jobqueue supports a variety of resource managers (e.g., SGE, SLURM).

We recommend you use it for batch jobs on the HPC, rather than interactive jobs. Though it does have lots of nice features for interactive work e.g., adaptive dynamic scaling of workers.

Setup the config

The example below is for an SGE scheduler (ARC).

The is the default config for a worker. These settings can be overwritten in the Python script when you create the workers.

The full script can be found in ~/.config/dask/jobqueue.yaml.

Here are examples for different HPC systems.

jobqueue:

  sge:
    name: dask-worker

    # Dask worker options
    cores: 1                   # Total number of cores per job
    memory: '1 GB'            # Total amount of memory per job
    processes: 1                # Number of Python processes per job

    interface: ib0                       # Network interface to use like eth0 or ib0
    death-timeout: 60                    # Number of seconds to wait if a worker can not find a scheduler
    local-directory: null                # Location of fast local storage like /scratch or $TMPDIR

    # SGE resource manager options
    shebang: "#!/usr/bin/env bash"
    queue: null
    project: null
    walltime: '01:00:00'
    extra: []
    env-extra: []
    job-extra: []
    log-directory: null

    resource-spec: null

distributed:
  worker:
    memory:
      target: false # dont spill to disk
      spill: false # dont spill to disk
      pause: 0.80 # pause memory execution at 80% use
      terminate: 0.95 # restart the worker at 95% use
Submission script

The distributed client and cluster are created within the Python script.

The resource requirements per worker are defined, along with the number of workers.

Here, the scheduler and client run on a separate job to the workers.

This is in contrast to Dask-MPI where all work is in one job for all processes. For Dask-MPI, the scheduler and client get assigned to worker ranks 0 and 1, which reduces the total worker count by 2.

The full script can be found in example_dask_jobqueue_sge.py file.

from dask.distributed import Client
from dask_jobqueue import SGECluster


def setup_client_and_cluster(
    number_processes=1, number_jobs=1, walltime="00:01:00", memory=1
):
    """
    Setup Dask client and cluster.
    Ensure that the number of workers is the right amount
    for your job and will be fully utilised.
    """
    print("Setting up Dask client and cluster ...")
    # number of workers used for number of partitions
    number_workers = number_processes * number_jobs
    # these are the requirements for a single worker
    cluster = SGECluster(
        interface="ib0",
        walltime=walltime,
        memory=f"{memory} G",
        resource_spec=f"h_vmem={memory}G",
        scheduler_options={"dashboard_address": ":2727"},
        job_extra=[
            "-V",  # export all environment variables
            f"-pe smp {number_processes}",
            f"-l disk={memory}G",
        ],
        local_directory=os.sep.join([
            os.environ.get("PWD"),
            "dask-worker-space"]),
    )
    client = Client(cluster)
    cluster.scale(jobs=number_jobs)
    print("The resources of each worker are: ")
    print(cluster.job_script())
    return client, cluster


def main():
    client, cluster = setup_client_and_cluster(
        number_processes=1,
        number_jobs=8,
        walltime="01:00:00",
        memory=24,
    )

    print("Main processing ...")
    example_function()
    print("Finished processing.")

    client.close()
    cluster.close()
    print("Closed client and cluster.")
Batch job

We will submit the scheduler and client to the login nodes, and the workers to the compute nodes.

Therefore, this submission script (below) just sets the environment variables, activates conda, and runs Python.

The full script can be found in example_dask_jobqueue_sge.bash.

#!/bin/bash -l

# ensure linear algebra libraries using 1 thread
# https://docs.dask.org/en/stable/array-best-practices.html#avoid-oversubscribing-threads
export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1

conda activate swd6_hpp

python example_dask_jobqueue_sge.py

To run this job on the login node, first ensure that your job is executable:

chmod 700 example_dask_jobqueue_sge.bash

Then you could just execute it:

. example_dask_jobqueue_sge.bash

However, if your session gets disconnected (e.g., if your internet connection drops), then the Dask Scheduler will die too.

To work around this we can use tmux.

tmux creates persistant sessions, which enables the Dask scheduler to persist even if connection is lost.

To use tmux, you first create a new session:

tmux new -s dask_scheduler

Then from within the tmux session, go back to the directory with your script and execute it:

. example_dask_jobqueue_sge.bash

You can detach from a tmux session using CTRL + d.

You can list your tmux sessions using:

tmux ls

You can reattach to your tmux session using:

tmux attach-session -t dask_scheduler
Distributed profiling and diagnostics

Similar as for Dask-MPI, you can view the Dask performance report here.

The Dask-Jobqueue method create and schedules the workers within the Python script. This additional overhead takes extra time relative to Dask-MPI.

The other difference is that here for Dask-Jobqueue, all 8 requested workers were used directly on the computation.

Question

How well did this job use the resources (use the output from qacct below, which is from one of the workers)?

$ $ qacct -j 3526684
==============================================================
qname        feps-cpu.q          
hostname     d13s0b1.arc4.leeds.ac.uk
group        EAR                 
owner        earlacoa            
project      feps-cpu            
department   defaultdepartment   
jobname      dask-worker         
jobnumber    3526684             
taskid       undefined
account      sge                 
priority     0                   
qsub_time    Fri Feb 25 16:25:14 2022
start_time   Fri Feb 25 16:25:35 2022
end_time     Fri Feb 25 16:38:21 2022
granted_pe   smp                 
slots        1                   
failed       100 : assumedly after job
exit_status  137                  (Killed)
ru_wallclock 766s
ru_utime     0.040s
ru_stime     0.045s
ru_maxrss    4.879KB
ru_ixrss     0.000B
ru_ismrss    0.000B
ru_idrss     0.000B
ru_isrss     0.000B
ru_minflt    15248               
ru_majflt    0                   
ru_nswap     0                   
ru_inblock   0                   
ru_oublock   16                  
ru_msgsnd    0                   
ru_msgrcv    0                   
ru_nsignals  0                   
ru_nvcsw     208                 
ru_nivcsw    38                  
cpu          728.840s
mem          6.774TBs
io           43.651MB
iow          0.000s
maxvmem      18.891GB
arid         undefined
ar_sub_time  undefined
category     -U admiralty,feps-cpu,feps-gpu -l disk=48G,env=centos7,h_rt=3600,h_vmem=48G,node_type=40core-192G,project=feps-cpu -pe smp 1

Note

When the work is complete, the Dask scheduler kills the Dask workers. Hence, the killed exit status (100: failure after job) is okay here. You can check the saved results and Dask diagnostics for verification of this.

Other general features

Functions become Tasks

if not IN_COLAB:
    from dask.distributed import Client
    client = Client()
def double(x):
    return x * 2


double(5)
10

Submitting this function (task) to the client, runs it in a background thread or process:

future = client.submit(double, 10)
future
Future: double status: pending, type: NoneType, key: double-5ce1a66df78558f9e317fc38030e6ab8

Return the result of this future object using .result():

future.result()
20

You can also map a function (task) to many inputs using Dask Bags.

Dask Bags allow operations like map, filter, fold, and groupby on collections of generic Python objects.

import dask.bag as db

You can create Dask Bags from a sequence, reading from files, or from delayed objects.

bag = db.from_sequence(range(10))
bag
dask.bag<from_sequence, npartitions=10>
doubled_bag = bag.map(double)
doubled_bag
dask.bag<double, npartitions=10>

As earlier, call .compute() to execute the computation (i.e., the mapped function):

doubled_bag.compute()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Classes become Actors

Actors enable stateful computations.

They are pointers to remote objects.

You can call methods on these remote objects.

class Counter:
    """ A simple class to manage an incrementing counter """
    n = 0

    def __init__(self):
        self.n = 0

    def increment(self):
        self.n += 1
        return self.n
    
    def read(self):
        return self.n

Create an actor on a worker using the actor=True keyword argument:

future = client.submit(Counter, actor=True)

Get back a pointer to that object:

counter = future.result()
counter
<Actor: Counter, key=Counter-64679ec0-6e0c-47d9-8285-9de4e60fe7e1>

Then call the remote method on that pointer:

future = counter.increment()
future
<ActorFuture>

And return the result:

future.result()
1
if not IN_COLAB:
    client.close()

Ray

Another great library for parallel computing in Python is Ray.

Ray provides a simple, universal API for building distributed applications.

Ray will automatically detect the available GPUs and CPUs on the machine.

You can also specify required resources.

First, initialise Ray.

import ray
ray.init()
E0225 15:57:14.684472870 1690940 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
E0225 15:57:14.700983177 1690940 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
E0225 15:57:14.715875414 1690940 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
{'node_ip_address': '192.168.0.37',
 'raylet_ip_address': '192.168.0.37',
 'redis_address': '192.168.0.37:6379',
 'object_store_address': '/tmp/ray/session_2022-02-25_15-57-13_632236_1690940/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2022-02-25_15-57-13_632236_1690940/sockets/raylet',
 'webui_url': None,
 'session_dir': '/tmp/ray/session_2022-02-25_15-57-13_632236_1690940',
 'metrics_export_port': 63570,
 'gcs_address': '192.168.0.37:37291',
 'node_id': '926837b8972c63a6d9edd9594bf8473a1143f872c878d5ddc01746b0'}

Ray has many great features, similar to Dask.

This includes:

Functions (tasks) in Ray

Parallelise functions by adding @ray.remote decorator.

@ray.remote
def increment(x):
    return x + 1

Then instead of calling it normally, use the .remote() method.

futures = [increment.remote(x) for x in range(4)]

This yields a future object reference that you can retrieve with ray.get(object).

print(ray.get(futures))
[1, 2, 3, 4]

Classes (actors) in Ray

Classes (actors) are parallelised in the same way as functions (tasks) in Ray i.e., by using the @ray.remote decorator:

@ray.remote
class Counter:
    """ A simple class to manage an incrementing counter """
    def __init__(self):
        self.n = 0

    def increment(self):
        self.n += 1
        return self.n
    
    def read(self):
        return self.n

Similar to before, construct an actor instance using .remote()

counters = [Counter.remote() for i in range(4)]

On each actor, call remote methods (increment and read), and get the future objects

[counter.increment.remote() for counter in counters]

futures = [counter.read.remote() for counter in counters]

print(ray.get(futures))
[1, 1, 1, 1]

Attention

When finished, remember to shut down the Ray connection.

ray.shutdown()

Ray is a little unintuitive to run on a HPC (e.g., SLURM guide), as you need things like the compute node IP addresses.

For now, it’s best to stick to using Ray in Jupyter Notebooks.

That is unless you’re doing distributed deep learning (examples repository).

For HPC, you can use Dask.

Other key features of Ray:

Exercises

Exercise 1

Why does parallelisation speed up code?

Exercise 2

What are there multiple of to split the work over?

Exercise 3

If you need to share memory, would you use MPI or OpenMP?

Exercise 4

Which Dask library can be tailored to a variety of resource managers (e.g., SGE, SLURM)?

Exercise 5

Which is of the 3 examples below is most efficient and why?

Note, the chunks keyword argument is the size of each chunk.

Example 1: Many, small chunks.

x = da.random.random(10_000_000, chunks=(1_000,))
y = x.sum().compute()

Example 2: Fewer, large chunks.

x = da.random.random(10_000_000, chunks=(100_000,))
y = x.sum().compute()

Example 3: Use NumPy.

x = np.random.random(10_000_000)
y = x.sum()

Exercise 6

How well did the job below use the HPC resources?

$ qacct -j 3524073
==============================================================
qname        feps-cpu.q          
hostname     d9s9b4.arc4.leeds.ac.uk
group        EAR                 
owner        earlacoa            
project      feps-cpu            
department   defaultdepartment   
jobname      example_bodo_mpi_sge.bash
jobnumber    3524073             
taskid       undefined
account      sge                 
priority     0                   
qsub_time    Thu Feb 24 12:48:24 2022
start_time   Thu Feb 24 12:48:34 2022
end_time     Thu Feb 24 12:48:55 2022
granted_pe   smp                 
slots        8                   
failed       0    
exit_status  0                   
ru_wallclock 21s
ru_utime     139.030s
ru_stime     7.635s
ru_maxrss    1.687MB
ru_ixrss     0.000B
ru_ismrss    0.000B
ru_idrss     0.000B
ru_isrss     0.000B
ru_minflt    764941              
ru_majflt    2                   
ru_nswap     0                   
ru_inblock   0                   
ru_oublock   80                  
ru_msgsnd    0                   
ru_msgrcv    0                   
ru_nsignals  0                   
ru_nvcsw     38884               
ru_nivcsw    665                 
cpu          146.665s
mem          112.727GBs
io           166.518MB
iow          0.000s
maxvmem      12.973GB
arid         undefined
ar_sub_time  undefined
category     -U admiralty,feps-cpu,feps-gpu -l env=centos7,h_rt=600,h_vmem=24G,node_type=40core-192G,project=feps-cpu -pe smp 8

Exercise 7

How well did this job used the HPC resources?

If it wasn’t ideal, what went wrong and what might fix it?

$ qacct -j 3524046
==============================================================
qname        feps-cpu.q          
hostname     d9s9b4.arc4.leeds.ac.uk
group        EAR                 
owner        earlacoa            
project      feps-cpu            
department   defaultdepartment   
jobname      example_bodo_mpi_sge.bash
jobnumber    3524046             
taskid       undefined
account      sge                 
priority     0                   
qsub_time    Thu Feb 24 12:34:54 2022
start_time   Thu Feb 24 12:35:08 2022
end_time     Thu Feb 24 12:37:14 2022
granted_pe   smp                 
slots        8                   
failed       0    
exit_status  0                   
ru_wallclock 126s
ru_utime     125.250s
ru_stime     6.542s
ru_maxrss    1.689MB
ru_ixrss     0.000B
ru_ismrss    0.000B
ru_idrss     0.000B
ru_isrss     0.000B
ru_minflt    758663              
ru_majflt    2                   
ru_nswap     0                   
ru_inblock   0                   
ru_oublock   80                  
ru_msgsnd    0                   
ru_msgrcv    0                   
ru_nsignals  0                   
ru_nvcsw     35039               
ru_nivcsw    30366               
cpu          131.792s
mem          102.207GBs
io           166.212MB
iow          0.000s
maxvmem      13.432GB
arid         undefined
ar_sub_time  undefined
category     -U admiralty,feps-cpu,feps-gpu -l env=centos7,h_rt=600,h_vmem=24G,node_type=40core-192G,project=feps-cpu -pe smp 8

Key Points

Important

  • Use Dask or Ray to parallelise your numerical work.

  • Test locally (single machine) first before moving to a distributed machine (high-performance computer), ensuring that the code is parallelising correctly.

  • Use diagnostics to understand your parallel code (e.g., Dask’s dashboard, qacct).

Further information

Good practises

  • Start small.

  • Test out ideas locally first, on laptop (or other single machine).

  • Avoid very large chunks / partitions / task graphs (a good first option is to allow auto-chunking).

  • Only use parallelisation (e.g., Dask) when needed, then move back to normal Python ( / NumPy / Pandas).

  • Persist data in memory (RAM) where can, as faster than accessing from disk.

  • Load data with the parallel library (e.g., Dask), rather than just passing data to it to manage.

  • Call compute once, on lots of computations.

  • Avoid global state.

  • Don’t modify the data in place.

  • Avoid moving large pieces of data around (i.e., bring analysis to the data on the cluster).

  • More information:

Other options

  • Dask on Ray

    • Use Ray as a backend for Dask tasks.

    • Dask dispatches tasks to Ray for scheduling and execution.

  • Modin

    • Swap out the library import and use the same API.

    • Uses Ray or Dask to easily speed up your Pandas code.

    • To use Modin, simply replace the import and use Pandas API as normal.

  • Mars

    • A tensor-based unified framework for large-scale data computation which scales numpy, pandas, scikit-learn and many other libraries.

    • Swap out the library import, use the same API, and add .execute().

    • Mars Tensor for NumPy.

    • Mars DataFrame for Pandas.

    • Mars can also use Ray as the backend (instructions).

  • Polars

    • Fast DataFrame library for Rust and Python.

  • RayDP

    • Combines your Spark and Ray clusters, making it easy to do large scale data processing using the PySpark API and seemlessly use that data to train your models using TensorFlow and PyTorch.

  • IPython Parallel

    • For stateful remote control of several running ipython sessions.

  • Hadoop Streaming

    • For batch processing using MapReduce from Hadoop.

  • PySpark

  • streamparse

  • Using eliot with Dask

Resources