Start spark3.3 kafka3.2 streaming from scratch

I have not used spark for long days. Today I tried to setup spark to integrate with kafka streaming, but I found the ecosystem has changed a bit.

The main branches for spark and kafka are,

Though scala 2.13 has been released for long days, but both spark and kafka were developed mainly by scala 2.12. When downloading the software, we should choose the version who uses scala 2.12.

First, we want to install scala 2.12. My OS is ubuntu 20.04, x64. Following the steps below to install java, sdkman, scala and sbt.

 $ sudo apt install openjdk-11-jre
$ curl -s "https://get.sdkman.io" | bash
$ sdk install scala 2.12.15
$ sdk install sbt

Next, download spark and kafka from their official sites. Unpackage and move them to /opt directory. So in /opt dir I have,

 $ ls /opt
kafka spark

Put these settings in .bash_profile file under user’s home dir,

 source "/home/pyh/.sdkman/bin/sdkman-init.sh"

export SPARK_HOME=/opt/spark
export JAVA_HOME=/usr
export PATH=/opt/kafka/bin:/opt/spark/bin:$PATH

I have a script “kafka.sh” for managing the kafka service, whose content are follows.

 #!/bin/bash

ACT=$1
TOP=$2
PRE="/opt/kafka"

if [ -z $ACT ];then
echo "$0 action [topic]"
exit
fi

if [ -z $TOP ];then
TOP="quickstart-events"
fi

if [ "$ACT" == "produce" ];then
$PRE/bin/kafka-console-producer.sh --topic $TOP --bootstrap-server localhost:9092

elif [ "$ACT" == "consume" ];then
$PRE/bin/kafka-console-consumer.sh --topic $TOP --from-beginning --bootstrap-server localhost:9092

elif [ "$ACT" == "create" ];then
$PRE/bin/kafka-topics.sh --create --partitions 2 --replication-factor 1 --topic $TOP --bootstrap-server localhost:9092

elif [ "$ACT" == "desc" ];then
$PRE/bin/kafka-topics.sh --describe --topic $TOP --bootstrap-server localhost:9092

elif [ "$ACT" == "startzk" ];then
$PRE/bin/zookeeper-server-start.sh $PRE/config/zookeeper.properties

elif [ "$ACT" == "start" ];then
$PRE/bin/kafka-server-start.sh $PRE/config/server.properties

fi

I use this script to start a kafka process and create a topic,

 $ kafka.sh startzk  # startup zookeeper
$ kafka.sh start # startup kafka
$ kafka.sh create mytest # create a topic

The commands above should be run in three separated terminals. They create a topic named “mytest”.

Now, I produce the messages to kafka by a ruby script.

 $ cat produce.rb 
require 'kafka'

kafka = Kafka.new("localhost:9092", client_id: "ruby-client", resolve_seed_brokers: true)

producer = kafka.producer(required_acks: :all,max_buffer_size: 50_000)

1000.times do
message = rand.to_s
producer.produce(message, key: "key1", topic: "mytest")
end

producer.deliver_messages

To keep publishing messages continuously, we can do it in bash shell.

 $ while [ 1 ];do ruby produce.rb ;sleep 5;done

For now the messages have been published to kafka successfully. I have to read the streaming from kafka into spark and process the messages within spark.

To setup a scala project,

 $ mkdir myproject
$ cd myproject
$ mkdir -p src/main/scala
$ touch build.sbt
$ touch src/main/scala/sparkafka.scala

Here are the content of build.sbt,

 name := "sparkafka"

version := "0.1"

scalaVersion := "2.12.15"

libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.12" % "3.3.0"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.3.0"

And the source code in sparkafka.scala,

 import org.apache.spark.sql.SparkSession

object Sparkafka {
def main(args:Array[String]):Unit = {

val spark = SparkSession.builder.appName("Mykafka").getOrCreate()

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "mytest")
.load()

import spark.implicits._

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

val myCount = df.groupBy("key").count()

val query = myCount.writeStream
.outputMode("complete")
.format("console")
.start()

query.awaitTermination()

}
}

In myproject directory, I run the following command to compile and package the project.

 $ sbt package
...
[success] Total time: 4 s, completed Jul 7, 2022, 4:39:19 PM

Go to spark’s configuration dir, change the log level in log4j2.properties to error.

 $ cat /opt/spark/conf/log4j2.properties |grep error
rootLogger.level = error

The last step, submit the job to spark. In myproject dir, run the command below,

 $ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 --class "Sparkafka" --master local[2] target/scala-2.12/sparkafka_2.12-0.1.jar

Here are the output in terminal,

 -------------------------------------------
Batch: 1
-------------------------------------------
+-------------+-----+
| key|count|
+-------------+-----+
|[6B 65 79 31]| 2000|
+-------------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-------------+-----+
| key|count|
+-------------+-----+
|[6B 65 79 31]| 4000|
+-------------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+-------------+-----+
| key|count|
+-------------+-----+
|[6B 65 79 31]| 5000|
+-------------+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+-------------+-----+
| key|count|
+-------------+-----+
|[6B 65 79 31]| 7000|
+-------------+-----+

Finally we got the jobs run correctly. For production deployment, we read the streaming from kafka, after aggregation we would write the results into a storage such as redis or mysql, instead of writing to terminal as in this sample.