Skip to content
Snippets Groups Projects
Commit a1ba4d02 authored by Taras Lazariv's avatar Taras Lazariv
Browse files

Merge branch '218-apache-flink' into 'preview'

remove deprecated python virtual environment from Flink/Spark in Jupyter

Closes #218

See merge request !439
parents ad9b2c5d 7bad7015
No related branches found
No related tags found
3 merge requests!446docs: Add Jupyter Teaching Example,!445Automated merge from preview to main,!439remove deprecated python virtual environment from Flink/Spark in Jupyter
...@@ -32,7 +32,6 @@ The steps are: ...@@ -32,7 +32,6 @@ The steps are:
Apache Spark can be used in [interactive](#interactive-jobs) and [batch](#batch-jobs) jobs as well Apache Spark can be used in [interactive](#interactive-jobs) and [batch](#batch-jobs) jobs as well
as via [Jupyter notebooks](#jupyter-notebook). All three ways are outlined in the following. as via [Jupyter notebooks](#jupyter-notebook). All three ways are outlined in the following.
The usage of Flink with Jupyter notebooks is currently under examination.
## Interactive Jobs ## Interactive Jobs
...@@ -238,50 +237,34 @@ example below: ...@@ -238,50 +237,34 @@ example below:
## Jupyter Notebook ## Jupyter Notebook
You can run Jupyter notebooks with Spark on the ZIH systems in a similar way as described on the You can run Jupyter notebooks with Spark and Flink on the ZIH systems in a similar way as described
[JupyterHub](../access/jupyterhub.md) page. Interaction of Flink with JupyterHub is currently on the [JupyterHub](../access/jupyterhub.md) page.
under examination and will be posted here upon availability.
### Preparation ### Spawning a Notebook
If you want to run Spark in Jupyter notebooks, you have to prepare it first. This is comparable
to [normal Python virtual environments](../software/python_virtual_environments.md#python-virtual-environment).
You start with an allocation:
```console
marie@login$ srun --pty --ntasks=1 --cpus-per-task=2 --mem-per-cpu=2500 --time=01:00:00 bash -l
```
When a node is allocated, install the required packages: Go to [https://taurus.hrsk.tu-dresden.de/jupyter](https://taurus.hrsk.tu-dresden.de/jupyter).
In the tab "Advanced", go to the field "Preload modules" and select the following Spark or Flink
module:
```console === "Spark"
marie@compute$ cd $HOME ```
marie@compute$ mkdir jupyter-kernel Spark/3.0.1-Hadoop-2.7-Java-1.8-Python-3.7.4-GCCcore-8.3.0
marie@compute$ module load Python ```
marie@compute$ virtualenv --system-site-packages jupyter-kernel/env #Create virtual environment === "Flink"
[...] ```
marie@compute$ source jupyter-kernel/env/bin/activate #Activate virtual environment. Flink/1.12.3-Java-1.8.0_161-OpenJDK-Python-3.7.4-GCCcore-8.3.0
(env) marie@compute$ pip install ipykernel ```
[...]
(env) marie@compute$ python -m ipykernel install --user --name haswell-py3.7-spark --display-name="haswell-py3.7-spark"
Installed kernelspec haswell-py3.7-spark in [...]
(env) marie@compute$ pip install findspark
(env) marie@compute$ deactivate
```
You are now ready to spawn a notebook with Spark. When your Jupyter instance is started, you can set up Spark/Flink. Since the setup in the notebook
requires more steps than in an interactive session, we have created example notebooks that you can
use as a starting point for convenience: [SparkExample.ipynb](misc/SparkExample.ipynb),
[FlinkExample.ipynb](misc/FlinkExample.ipynb)
### Spawning a Notebook !!! warning
Assuming that you have prepared everything as described above, you can go to The notebooks only work with the Spark or Flink module mentioned above. When using other
[https://taurus.hrsk.tu-dresden.de/jupyter](https://taurus.hrsk.tu-dresden.de/jupyter). Spark/Flink modules, it is possible that you have to do additional or other steps in order to
In the tab "Advanced", go to the field "Preload modules" and select one of the Spark modules. When make Spark/Flink running.
your Jupyter instance is started, check whether the kernel that you created in the preparation
phase (see above) is shown in the top right corner of the notebook. If it is not already selected,
select the kernel `haswell-py3.7-spark`. Then, you can set up Spark. Since the setup in the
notebook requires more steps than in an interactive session, we have created an example notebook
that you can use as a starting point for convenience: [SparkExample.ipynb](misc/SparkExample.ipynb)
!!! note !!! note
......
%% Cell type:code id: tags:
``` python
import sys
!{sys.executable} -m pip install apache-flink --user
```
%% Cell type:code id: tags:
``` python
%%bash
echo $FLINK_ROOT_DIR
echo $JAVA_HOME
hostname
if [ ! -d $HOME/jupyter-flink-conf ]
then
cp -r $FLINK_ROOT_DIR/conf $HOME/jupyter-flink-conf
chmod -R u+w $HOME/jupyter-flink-conf
fi
```
%% Cell type:code id: tags:
``` python
import sys
import os
os.environ['FLINK_CONF_DIR'] = os.environ['HOME'] + '/cluster-conf-' + os.environ['SLURM_JOBID'] + '/flink'
os.environ['PYTHONPATH'] = os.environ['PYTHONPATH'] + ':' + os.environ['HOME'] + '/.local/lib/python3.6/site-packages'
```
%% Cell type:code id: tags:
``` python
!SHELL=/bin/bash bash framework-configure.sh flink $HOME/jupyter-flink-conf
```
%% Cell type:code id: tags:
``` python
exitcode = os.system('start-cluster.sh')
if not exitcode:
print("started Flink cluster successful")
```
%% Cell type:code id: tags:
``` python
%%bash
echo "This is a short story for you. In this story nothing is happening. Have a nice day!" > myFlinkTestFile
```
%% Cell type:code id: tags:
``` python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSource
from pyflink.datastream.connectors import StreamFormat
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.common.typeinfo import Types
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(2)
#set the Python executable for the workers
env.set_python_executable(sys.executable)
# define the source
ds = env.from_source(source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
"myFlinkTestFile").process_static_file_set().build(),
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="file_source")
def split(line):
yield from line.split()
# compute word count
ds = ds.flat_map(split) \
.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
.key_by(lambda i: i[0]) \
.reduce(lambda i, j: (i[0], i[1] + j[1])) \
.map(lambda i: print(i))
# submit for execution
env.execute()
```
%% Cell type:code id: tags:
``` python
%%bash
stop-cluster.sh
```
%% Cell type:code id: tags:
``` python
!ps -ef | grep -i java
```
%% Cell type:code id: tags:
``` python
!pkill -f "java"
```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
import sys
!{sys.executable} -m pip install findspark --user
```
%% Cell type:code id: tags:
``` python
!which python
```
%% Cell type:code id: tags:
``` python
%%bash %%bash
echo $SPARK_HOME echo $SPARK_HOME
echo $JAVA_HOME echo $JAVA_HOME
hostname hostname
if [ ! -d $HOME/jupyter-spark-conf ] if [ ! -d $HOME/jupyter-spark-conf ]
then then
cp -r $SPARK_HOME/conf $HOME/jupyter-spark-conf cp -r $SPARK_HOME/conf $HOME/jupyter-spark-conf
chmod -R u+w $HOME/jupyter-spark-conf chmod -R u+w $HOME/jupyter-spark-conf
echo "ml `ml -t list Spark` 2>/dev/null" >> $HOME/jupyter-spark-conf/spark-env.sh echo "ml `ml -t list Spark` 2>/dev/null" >> $HOME/jupyter-spark-conf/spark-env.sh
fi fi
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
import sys import sys
import os import os
os.environ['PYSPARK_PYTHON'] = sys.executable os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['SPARK_CONF_DIR'] = os.environ['HOME'] + '/cluster-conf-' + os.environ['SLURM_JOBID'] + '/spark' os.environ['SPARK_CONF_DIR'] = os.environ['HOME'] + '/cluster-conf-' + os.environ['SLURM_JOBID'] + '/spark'
os.environ['PYTHONPATH'] = os.environ['PYTHONPATH'] + ':' + os.environ['HOME'] + '/.local/lib/python3.6/site-packages'
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
!SHELL=/bin/bash bash framework-configure.sh spark $HOME/jupyter-spark-conf !SHELL=/bin/bash bash framework-configure.sh spark $HOME/jupyter-spark-conf
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
!start-all.sh !start-all.sh
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
import findspark
findspark.init(os.environ['SPARK_HOME'])
```
%% Cell type:code id: tags:
``` python
import platform import platform
import pyspark import pyspark
from pyspark import SparkContext from pyspark import SparkContext
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
sc = SparkContext(master="spark://"+platform.node()+":7077", appName="RDD basic functions App") sc = SparkContext(master="spark://"+platform.node()+":7077", appName="RDD basic functions App")
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
#your Spark workflow code here, the following is just an example: #your Spark workflow code here, the following is just an example:
datafile = sc.textFile("SparkExample.ipynb") datafile = sc.textFile("SparkExample.ipynb")
firstTenItems = datafile.take(10) firstTenItems = datafile.take(10)
for item in firstTenItems: for item in firstTenItems:
print(item) print(item)
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
sc.stop() sc.stop()
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
!stop-all.sh !stop-all.sh
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
!ps -ef | grep -i java !ps -ef | grep -i java
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
!pkill -f "pyspark-shell" !pkill -f "pyspark-shell"
``` ```
%% Cell type:code id: tags:
``` python
```
......
...@@ -87,6 +87,7 @@ filesystem ...@@ -87,6 +87,7 @@ filesystem
filesystems filesystems
flink flink
Flink Flink
FlinkExample
FMA FMA
foreach foreach
Fortran Fortran
...@@ -145,6 +146,7 @@ inode ...@@ -145,6 +146,7 @@ inode
Instrumenter Instrumenter
IOPS IOPS
IPs IPs
ipynb
ISA ISA
Itanium Itanium
jobqueue jobqueue
...@@ -322,6 +324,7 @@ Slurm ...@@ -322,6 +324,7 @@ Slurm
SLURMCluster SLURMCluster
SMP SMP
SMT SMT
SparkExample
spython spython
squeue squeue
srun srun
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment