Skip to content
Snippets Groups Projects

remove deprecated python virtual environment from Flink/Spark in Jupyter

Merged Elias Werner requested to merge 218-apache-flink into preview
Files
4
%% Cell type:code id: tags:
``` python
import sys
!{sys.executable} -m pip install apache-flink --user
```
%% Cell type:code id: tags:
``` python
%%bash
echo $FLINK_ROOT_DIR
echo $JAVA_HOME
hostname
if [ ! -d $HOME/jupyter-flink-conf ]
then
cp -r $FLINK_ROOT_DIR/conf $HOME/jupyter-flink-conf
chmod -R u+w $HOME/jupyter-flink-conf
fi
```
%% Cell type:code id: tags:
``` python
import sys
import os
os.environ['FLINK_CONF_DIR'] = os.environ['HOME'] + '/cluster-conf-' + os.environ['SLURM_JOBID'] + '/flink'
os.environ['PYTHONPATH'] = os.environ['PYTHONPATH'] + ':' + os.environ['HOME'] + '/.local/lib/python3.6/site-packages'
```
%% Cell type:code id: tags:
``` python
!SHELL=/bin/bash bash framework-configure.sh flink $HOME/jupyter-flink-conf
```
%% Cell type:code id: tags:
``` python
exitcode = os.system('start-cluster.sh')
if not exitcode:
print("started Flink cluster successful")
```
%% Cell type:code id: tags:
``` python
%%bash
echo "This is a short story for you. In this story nothing is happening. Have a nice day!" > myFlinkTestFile
```
%% Cell type:code id: tags:
``` python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSource
from pyflink.datastream.connectors import StreamFormat
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.common.typeinfo import Types
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(2)
#set the Python executable for the workers
env.set_python_executable(sys.executable)
# define the source
ds = env.from_source(source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
"myFlinkTestFile").process_static_file_set().build(),
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="file_source")
def split(line):
yield from line.split()
# compute word count
ds = ds.flat_map(split) \
.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
.key_by(lambda i: i[0]) \
.reduce(lambda i, j: (i[0], i[1] + j[1])) \
.map(lambda i: print(i))
# submit for execution
env.execute()
```
%% Cell type:code id: tags:
``` python
%%bash
stop-cluster.sh
```
%% Cell type:code id: tags:
``` python
!ps -ef | grep -i java
```
%% Cell type:code id: tags:
``` python
!pkill -f "java"
```
Loading