diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 4adc0ec6a75c2ea391815aa6c1eb260591ac3ed9..0d4efde55a9c87bf15179a72ef1a10d804ff6dfb 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -164,6 +164,22 @@ Run Spark example: - test-spark-pi.example - if: ($CI_PIPELINE_SOURCE == "schedule") && ($JUST_REMOVE != "yes") +Run Kafka example: + needs: ["Install module Kafka"] + extends: .extensible_test_job + script: + - make test-kafka-from-ci + rules: + - if: ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "main") && ($CI_PIPELINE_SOURCE == "merge_request_event") + changes: + - Makefile + - BigDataFrameworkConfigure.eb.template + - bigdataframeworkconfigure/* + - Kafka.eb + - Kafka.patch + - test-kafka.example + - if: ($CI_PIPELINE_SOURCE == "schedule") && ($JUST_REMOVE != "yes") + Test framework-configure.sh: needs: ["Install module BigDataFrameworkConfigure", "Install module Flink"] extends: .extensible_job diff --git a/Kafka.patch b/Kafka.patch index 34a35f1a4f93bb9c9446eb174ccbe5c41989ab35..cfd88884978c91522da5790b020a156fcec3912e 100644 --- a/Kafka.patch +++ b/Kafka.patch @@ -142,10 +142,10 @@ diff -Naur a/config/kraft/server.properties b/config/kraft/server.properties diff -Naur a/config/meta.conf b/config/meta.conf --- a/config/meta.conf +++ b/config/meta.conf -@@ -0,0 +1,4 @@ +@@ -0,0 +1,5 @@ +export FRAMEWORK_MASTER_FILE= +export FRAMEWORK_SLAVE_FILE=slaves -+export FRAMEWORK_CONF_FILES="server.properties zookeeper.properties kraft/broker.properties kraft/server.properties kraft/controller.properties setup-kafka-env.sh" ++export FRAMEWORK_CONF_FILES="server.properties zookeeper.properties kraft/broker.properties kraft/server.properties kraft/controller.properties setup-kafka-env.sh producer.properties consumer.properties" +export FRAMEWORK_BIN_DIR="$KAFKA_BIN_DIR" +export FRAMEWORK_NAME="KAFKA" diff -Naur a/config/producer.properties b/config/producer.properties @@ -210,3 +210,15 @@ diff -Naur a/config/zookeeper.properties b/config/zookeeper.properties # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config +diff -Naur a/config/consumer.properties b/config/consumer.properties +--- a/config/consumer.properties ++++ b/config/consumer.properties +@@ -16,7 +16,7 @@ + + # list of brokers used for bootstrapping knowledge about the rest of the cluster + # format: host1:port1,host2:port2 ... +-bootstrap.servers=localhost:9092 ++bootstrap.servers=FRAMEWORK_MASTER_NODE:9092 + + # consumer group id + group.id=test-consumer-group diff --git a/Makefile b/Makefile index d6014c7492917485b568bea88355520d3522451b..9e3fbdeb2ebf8c1a3770e724bdbd2ba788c4ed1a 100644 --- a/Makefile +++ b/Makefile @@ -54,6 +54,10 @@ test-spark-from-ci: export WORKSPACE_PATH=`ws_find -F horse ${workspace_name}`; \ ./test-spark-pi.example $${WORKSPACE_PATH} +test-kafka-from-ci: + export WORKSPACE_PATH=`ws_find -F horse ${workspace_name}`; \ + ./test-kafka.example $${WORKSPACE_PATH} + check-toolchain-dependency: ./check-all-easybuild-use-same-toolchain.sh diff --git a/test-kafka.example b/test-kafka.example new file mode 100755 index 0000000000000000000000000000000000000000..0dc61e57282ec915a43136e2d3b667fa272e5bd4 --- /dev/null +++ b/test-kafka.example @@ -0,0 +1,122 @@ +set -u +set -e +set -v + +export LANG=C +WORKSPACE_PATH=$1 +echo "WORKSPACE_PATH is $WORKSPACE_PATH" + +module load development/24.04 GCC/13.2.0 + +module use $WORKSPACE_PATH/${LMOD_SYSTEM_NAME}/modules/all/Compiler/GCC/13.2.0/ + +module load Kafka + +#configuration +export FRAMEWORK_DATA_DIR=$WORKSPACE_PATH/example-kafka-data-dir +MASTERNODE=$(scontrol show hostnames $SLURM_JOB_NODELIST | head -1) +PORT="9092" +BOOTSTRAP_SERVER="$MASTERNODE:$PORT" +KAFKA_TOPIC=test1 +KAFKA_TOPIC_TWO=test2 +TIMEOUT_DURATION_SEC=30 +NUM_RECORDS=600 + +echo "Initializing framework configuration" +source framework-configure.sh --framework kafka --destination $PWD/real-kafka-conf + +myExitHandler () { + echo "myExitHandler called" + + while [[ "$(jps | grep -c 'Kafka' )" > "1" ]]; do + # Stopping Kafka server + kafka-server-stop.sh -daemon $PWD/real-kafka-conf/kafka/server.properties + done + while [[ "$(jps | grep -c 'QuorumPeerMain' )" > "1" ]]; do + # Stopping Zookeeper + zookeeper-server-stop.sh -daemon $PWD/real-kafka-conf/kafka/zookeeper.properties + done +} + +#register cleanup hook +trap myExitHandler EXIT +trap myExitHandler ERR + +# Starting zookeeper +TIMEOUT_START=$(date +%s) +echo "Waiting for Zookeeper to start." +while [[ "$(jps | grep -c 'QuorumPeerMain' )" < "1" ]]; do + TIMEOUT_CURR=$(date +%s) + zookeeper-server-start.sh -daemon $PWD/real-kafka-conf/kafka/zookeeper.properties + sleep 5s + if (( TIMEOUT_START - TIMEOUT_CURR > TIMEOUT_DURATION_SEC )); then echo "Fail: Timeout reached."; exit 1; fi +done +echo "Zookeeper successfully started." + +# Starting Kafka broker +TIMEOUT_START=$(date +%s) +echo "Waiting for Kafka broker to start." +while [[ "$(jps | grep -c 'Kafka' )" < "1" ]]; do + TIMEOUT_CURR=$(date +%s) + kafka-server-start.sh -daemon $PWD/real-kafka-conf/kafka/server.properties + sleep 5s; + if (( TIMEOUT_START - TIMEOUT_CURR > TIMEOUT_DURATION_SEC )); then echo "Fail: Timeout reached."; exit 1; fi +done +echo "Kafka broker successfully started." + +sleep 10s + +echo "Checking if Zookeeper and Kafka processes are still running." +if [[ "$(jps | grep -c 'QuorumPeerMain' )" < "1" ]] || [[ "$(jps | grep -c 'Kafka' )" < "1" ]]; then + echo "Fail: Required process are not runnning." + exit 1 +fi + +echo "Creating kafka topic." +kafka-topics.sh --create --topic $KAFKA_TOPIC --bootstrap-server "$BOOTSTRAP_SERVER" --partitions 1 --replication-factor 1 +kafka-topics.sh --create --topic $KAFKA_TOPIC_TWO --bootstrap-server "$BOOTSTRAP_SERVER" --partitions 1 --replication-factor 1 + +NUM_TOPIC=$(kafka-topics.sh --list --bootstrap-server "$BOOTSTRAP_SERVER" | wc -l) +if [[ $NUM_TOPIC -ne 2 ]]; then + echo "Fail: Expected 2 topics, but $NUM_TOPIC topics are present." + exit 1 +fi + +echo "Producer performance test started on topic \"$KAFKA_TOPIC\"." +kafka-producer-perf-test.sh \ + --topic $KAFKA_TOPIC \ + --num-records $NUM_RECORDS \ + --throughput 10 \ + --record-size 200 \ + --producer.config $PWD/real-kafka-conf/kafka/producer.properties +echo "Producer performance test complete." + +NUM_RECORDS_RESULT=$(kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "$BOOTSTRAP_SERVER" --topic $KAFKA_TOPIC --time -1) +NUM_RECORDS_RESULT=$(echo "$NUM_RECORDS_RESULT" | awk -F':' '{print $3}') +if [[ $NUM_RECORDS_RESULT -ne $NUM_RECORDS ]]; then + echo "Fail: Expected number of records ($NUM_RECORDS) in Kafka topic is different than actual value ($NUM_RECORDS_RESULT)." +fi + +echo "Deleting Kafka topic \"$KAFKA_TOPIC\"." +kafka-topics.sh --delete --topic $KAFKA_TOPIC --bootstrap-server "$BOOTSTRAP_SERVER" + +NUM_TOPIC=$(kafka-topics.sh --list --bootstrap-server "$BOOTSTRAP_SERVER" | wc -l) +if [[ $NUM_TOPIC -ne 1 ]]; then + echo "Fail: Expected 1 topics, but $NUM_TOPIC topics are present." + exit 1 +fi + +echo "Stopping Kafka server" +kafka-server-stop.sh -daemon $PWD/real-kafka-conf/kafka/server.properties +echo "Stopping Zookeeper" +zookeeper-server-stop.sh -daemon $PWD/real-kafka-conf/kafka/zookeeper.properties + +sleep 5s + +echo "Checking if Zookeeper and Kafka processes are still running." +if [[ "$(jps | grep -c 'QuorumPeerMain' )" > "1" ]] || [[ "$(jps | grep -c 'Kafka' )" > "1" ]]; then + echo "Fail: Required process are not stopped." + exit 1 +fi + +exit 0