diff --git a/doc.zih.tu-dresden.de/docs/software/data_analytics_with_python.md b/doc.zih.tu-dresden.de/docs/software/data_analytics_with_python.md index a1974c5d288b275a33f621044209ec0e90ce201d..bc9ac622530f2b355adef7337fb5d49447d79be1 100644 --- a/doc.zih.tu-dresden.de/docs/software/data_analytics_with_python.md +++ b/doc.zih.tu-dresden.de/docs/software/data_analytics_with_python.md @@ -85,15 +85,20 @@ For more examples of using pandarallel check out ### Dask -[Dask](https://dask.org/) is a flexible and open-source library for parallel computing in Python. -It replaces some Python data structures with parallel versions in order to provide advanced -parallelism for analytics, enabling performance at scale for some of the popular tools. For -instance: Dask arrays replace NumPy arrays, Dask dataframes replace Pandas dataframes. +[Dask](https://dask.org/) is a flexible and open-source library +for parallel computing in Python. +It replaces some Python data structures with parallel versions +in order to provide advanced +parallelism for analytics, enabling performance at scale +for some of the popular tools. +For instance: Dask arrays replace NumPy arrays, +Dask dataframes replace Pandas dataframes. Furthermore, Dask-ML scales machine learning APIs like Scikit-Learn and XGBoost. Dask is composed of two parts: -- Dynamic task scheduling optimized for computation and interactive computational workloads. +- Dynamic task scheduling optimized for computation and interactive + computational workloads. - Big Data collections like parallel arrays, data frames, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments. These parallel collections run on top of dynamic task schedulers. @@ -110,9 +115,10 @@ Dask supports several user interfaces: - Delayed: Parallel function evaluation - Futures: Real-time parallel function evaluation -#### Dask Usage +#### Dask Modules on ZIH Systems -On ZIH systems, Dask is available as a module. Check available versions and load your preferred one: +On ZIH systems, Dask is available as a module. +Check available versions and load your preferred one: ```console marie@compute$ module spider dask @@ -129,34 +135,252 @@ marie@compute$ python -c "import dask; print(dask.__version__)" 2021.08.1 ``` -The preferred and simplest way to run Dask on ZIH system is using -[dask-jobqueue](https://jobqueue.dask.org/). +The preferred way is to use Dask as a separate module as was described above. +However, you can use it as part of the **Anaconda** module, e.g: `module load Anaconda3`. -**TODO** create better example with jobqueue +#### Scheduling by Dask + +Whenever you use functions on Dask collections (Dask Array, Dask Bag, etc.), Dask models these as +single tasks forming larger task graphs in the background without you noticing. +After Dask generates these task graphs, +it needs to execute them on parallel hardware. +This is the job of a task scheduler. +Please use Distributed scheduler for your +Dask computations on the cluster and avoid using a Single machine scheduler. + +##### Distributed Scheduler + +There are a variety of ways to set Distributed scheduler. +However, `dask.distributed` scheduler will be used for many of them. +To use the `dask.distributed` scheduler you must set up a Client: + +```python +from dask.distributed import Client +client = Client(...) # Connect to distributed cluster and override default +df.x.sum().compute() # This now runs on the distributed system +``` + +The idea behind Dask is to scale Python and distribute computation among the workers (multiple +machines, jobs). +The preferred and simplest way to run Dask on ZIH systems +today both for new or experienced users +is to use **[dask-jobqueue](https://jobqueue.dask.org/)**. + +However, Dask-jobqueue is slightly oriented toward +interactive analysis +usage, and it might be better to use tools like +**[Dask-mpi](https://docs.dask.org/en/latest/setup/hpc.html#using-mpi)** +in some routine batch production workloads. + +##### Dask-mpi + +You can launch a Dask network using +`mpirun` or `mpiexec` and the `dask-mpi` command line executable. +This depends on the [mpi4py library](#mpi4py-mpi-for-python). +For more detailed information, please check +[the official documentation](https://docs.dask.org/en/latest/setup/hpc.html#using-mpi). + +##### Dask-jobqueue + +[Dask-jobqueue](https://jobqueue.dask.org/) can be used as the standard way +to use dask for most users. +It allows an easy deployment of Dask Distributed on HPC with Slurm +or other job queuing systems. + +Dask-jobqueue is available as an extension +for a Dask module (which can be loaded by: `module load dask`). + +The availability of the exact packages (such a Dask-jobqueue) +in the module can be checked by the +`module whatis <name_of_the_module>` command, e.g. `module whatis dask`. + +Moreover, it is possible to install and use `dask-jobqueue` +in your local python environments. +You can install Dask-jobqueue with `pip` or `conda`. + +###### Example of Using Dask-Jobqueue with SLURMCluster + +[Dask-jobqueue](https://jobqueue.dask.org/en/latest/howitworks.html#workers-vs-jobs) +allows running jobs on the ZIH system +inside the python code and scale computations over the jobs. +[Dask-jobqueue](https://jobqueue.dask.org/en/latest/howitworks.html#workers-vs-jobs) +creates a Dask Scheduler in the Python process +where the cluster object is instantiated. +Please check the example of a definition of the cluster object +for the partition `alpha` (queue at the dask terms) on the ZIH system: + +```python +from dask_jobqueue import SLURMCluster + +cluster = SLURMCluster(queue='alpha', + cores=8, + processes=2, + project='p_marie', + memory="8GB", + walltime="00:30:00") + +``` + +These parameters above specify the characteristics of a +single job or a single compute node, +rather than the characteristics of your computation as a whole. +It hasn’t actually launched any jobs yet. +For the full computation, you will then ask for a number of +jobs using the scale command, e.g : `cluster.scale(2)`. +Thus, you have to specify a `SLURMCluster` by `dask_jobqueue`, +scale it and use it for your computations. There is an example: ```python -from dask.distributed import Client, progress -client = Client(n_workers=4, threads_per_worker=1) -client +from distributed import Client +from dask_jobqueue import SLURMCluster +from dask import delayed + +cluster = SLURMCluster(queue='alpha', + cores=8, + processes=2, + project='p_marie', + memory="80GB", + walltime="00:30:00", + extra=['--resources gpu=1']) + +cluster.scale(2) #scale it to 2 workers! +client = Client(cluster) #command will show you number of workers (python objects corresponds to jobs) ``` -### mpi4py - MPI for Python +Please have a look at the `extra` parameter in the script above. +This could be used to specify a +special hardware availability that the scheduler +is not aware of, for example, GPUs. +Please don't forget to specify the name of your project. + +The Python code for setting up Slurm clusters +and scaling clusters can be run by the `srun` +(but remember that using `srun` directly on the shell +blocks the shell and launches an +interactive job) or batch jobs or +[JupyterHub](../access/jupyterhub.md) with loaded Dask +(by module or by Python virtual environment). + +!!! note + The job to run original code (de facto an interface) with a setup should be simple and light. + Please don't use a lot of resources for that. + +The following example shows using +Dask by `dask-jobqueue` with `SLURMCluster` and `dask.array` +for the Monte-Carlo estimation of Pi. + +??? example "Example of using SLURMCluster" + + ```python + #use of dask-jobqueue for the estimation of Pi by Monte-Carlo method + + import time + from time import time, sleep + from dask.distributed import Client + from dask_jobqueue import SLURMCluster + import subprocess as sp + + import dask.array as da + import numpy as np + + #setting up the dashboard + + uid = int( sp.check_output('id -u', shell=True).decode('utf-8').replace('\n','') ) + portdash = 10001 + uid + + #create a Slurm cluster, please specify your project + + cluster = SLURMCluster(queue='alpha', cores=2, project='p_marie', memory="8GB", walltime="00:30:00", extra=['--resources gpu=1'], scheduler_options={"dashboard_address": f":{portdash}"}) + + #submit the job to the scheduler with the number of nodes (here 2) requested: + + cluster.scale(2) + + #wait for Slurm to allocate a resources + + sleep(120) + + #check resources + + client = Client(cluster) + client + + #real calculations with a Monte Carlo method + + def calc_pi_mc(size_in_bytes, chunksize_in_bytes=200e6): + """Calculate PI using a Monte Carlo estimate.""" + + size = int(size_in_bytes / 8) + chunksize = int(chunksize_in_bytes / 8) + + xy = da.random.uniform(0, 1, size=(size / 2, 2), chunks=(chunksize / 2, 2)) + + in_circle = ((xy ** 2).sum(axis=-1) < 1) + pi = 4 * in_circle.mean() + + return pi + + def print_pi_stats(size, pi, time_delta, num_workers): + """Print pi, calculate offset from true value, and print some stats.""" + print(f"{size / 1e9} GB\n" + f"\tMC pi: {pi : 13.11f}" + f"\tErr: {abs(pi - np.pi) : 10.3e}\n" + f"\tWorkers: {num_workers}" + f"\t\tTime: {time_delta : 7.3f}s") + + #let's loop over different volumes of double-precision random numbers and estimate it + + for size in (1e9 * n for n in (1, 10, 100)): + + start = time() + pi = calc_pi_mc(size).compute() + elaps = time() - start + + print_pi_stats(size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)) -Message Passing Interface (MPI) is a standardized and portable message-passing standard, designed to -function on a wide variety of parallel computing architectures. The Message Passing Interface (MPI) -is a library specification that allows HPC to pass information between its various nodes and -clusters. MPI is designed to provide access to advanced parallel hardware for end-users, library -writers and tool developers. + #Scaling the Cluster to twice its size and re-run the experiments + + new_num_workers = 2 * len(cluster.scheduler.workers) -mpi4py (MPI for Python) provides bindings of the MPI standard for the Python programming -language, allowing any Python program to exploit multiple processors. + print(f"Scaling from {len(cluster.scheduler.workers)} to {new_num_workers} workers.") -mpi4py is based on MPI-2 C++ bindings. It supports almost all MPI calls. This implementation is -popular on Linux clusters and in the SciPy community. Operations are primarily methods of -communicator objects. It supports communication of pickle-able Python objects. mpi4py provides -optimized communication of NumPy arrays. + cluster.scale(new_num_workers) -mpi4py is included in the SciPy-bundle modules on the ZIH system. + sleep(120) + + client + + #Re-run same experiments with doubled cluster + + for size in (1e9 * n for n in (1, 10, 100)): + + start = time() + pi = calc_pi_mc(size).compute() + elaps = time() - start + + print_pi_stats(size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)) + ``` + +Please check the availability of resources that you want to allocate +by the script for the example above. +You can do it with `sinfo` command. The script doesn't work +without available cluster resources. + +### Mpi4py - MPI for Python + +Message Passing Interface (MPI) is a standardized and +portable message-passing standard, designed to +function on a wide variety of parallel computing architectures. + +Mpi4py (MPI for Python) provides bindings of the MPI standard for +the Python programming language, +allowing any Python program to exploit multiple processors. + +Mpi4py is based on MPI-2 C++ bindings. It supports almost all MPI calls. +It supports communication of pickle-able Python objects. +Mpi4py provides optimized communication of NumPy arrays. + +Mpi4py is included in the SciPy-bundle modules on the ZIH system. ```console marie@compute$ module load SciPy-bundle/2020.11-foss-2020b @@ -169,7 +393,7 @@ mpi4py 3.0.3 [...] ``` -Other versions of the package can be found with +Other versions of the package can be found with: ```console marie@compute$ module spider mpi4py @@ -186,7 +410,7 @@ marie@compute$ module spider mpi4py Names marked by a trailing (E) are extensions provided by another module. ----------------------------------------------------------------------------------------------------------------------------------------- - For detailed information about a specific "mpi4py" package (including how to load the modules) use the module's full name. + For detailed information about a specific "mpi4py" package (including how to load the modules), use the module's full name. Note that names that have a trailing (E) are extensions provided by other modules. For example: @@ -194,7 +418,11 @@ Names marked by a trailing (E) are extensions provided by another module. ----------------------------------------------------------------------------------------------------------------------------------------- ``` -Check if mpi4py is running correctly +Moreover, it is possible to install mpi4py in your local conda +environment. + +The example of mpi4py usage for the verification that +mpi4py is running correctly can be found below: ```python from mpi4py import MPI @@ -202,4 +430,22 @@ comm = MPI.COMM_WORLD print("%d of %d" % (comm.Get_rank(), comm.Get_size())) ``` -**TODO** verify mpi4py installation +For the multi-node case, use a script similar to this: + +```bash +#!/bin/bash +#SBATCH --nodes=2 +#SBATCH --partition=ml +#SBATCH --tasks-per-node=2 +#SBATCH --cpus-per-task=1 + +module load modenv/ml +module load PythonAnaconda/3.6 + +eval "$(conda shell.bash hook)" +conda activate /home/marie/conda-virtual-environment/kernel2 && srun python mpi4py_test.py #specify name of your virtual environment +``` + +For the verification of the multi-node case, +you can use the Python code from the previous part +(with verification of the installation) as a test file. diff --git a/doc.zih.tu-dresden.de/wordlist.aspell b/doc.zih.tu-dresden.de/wordlist.aspell index 85cb0d7f13cde7edffa73f5b44d3a4acdf721027..7d651ac573f26d6f4bb4c1aa8c15718e799f1716 100644 --- a/doc.zih.tu-dresden.de/wordlist.aspell +++ b/doc.zih.tu-dresden.de/wordlist.aspell @@ -5,6 +5,7 @@ Altix Amber Amdahl's analytics +Analytics anonymized Ansys APIs @@ -42,6 +43,7 @@ CUDA cuDNN CXFS dask +Dask dataframes DataFrames datamover @@ -69,6 +71,7 @@ env EPYC Espresso ESSL +facto fastfs FFT FFTW @@ -165,6 +168,7 @@ modenv Montecito mountpoint mpi +Mpi mpicc mpiCC mpicxx @@ -280,6 +284,7 @@ SHA SHMEM SLES Slurm +SLURMCluster SMP SMT squeue @@ -340,4 +345,4 @@ Xming yaml zih ZIH -ZIH's \ No newline at end of file +ZIH's