Kafka, Spark Streaming Demo - Read from Topic and Write to Topic

Anish Mahapatra
3 min readNov 20, 2023

--

In this demo, we will read a topic and perform a transformation from lower case to upper case.

Photo by Robert Zunikoff on Unsplash

Pre-Requisites

  • Ensure you have Zookeeper and Kafka running on your system
  • Ensure you have the relevant Spark installed on your System
  • Run Zookeeper and Kafka, and create the following topics created

In case you are having issues, you can refer to my other blogs:

src-topic, dest-topic

Commands to create the topics

Create src-topic: The topic which will be the source. Make sure you are in your kafka folder in the terminal. Otherwise, change the directory (cd) to wherever your kafka folder is.

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic src-topic

Create dest-topic: The topic which will be the destination.

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic dest-topic

Then, ensure you have the topics src-topic, and dest-topic by listing them out.

bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

Great, we have not created our topics created successfully now.

Let’s go ahead and run our code now. You can run it in Jupyter Notebook and create a checkpoint folder called checkpoint_folder.

from pyspark.sql import SparkSession
from pyspark.sql.functions import upper

# Create a Spark Session
spark = SparkSession.builder \
.appName("KafkaReadTransformWriteToKafka") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.getOrCreate()

# Create a DataFrame that reads from the input Kafka topic name src-topic
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "src-topic")
.load()
.selectExpr("CAST(value AS STRING) as message")
)

# Apply transformation - converting lowercase messages to uppercase
transformed_df = df.withColumn("transformedMessage", upper(df.message))

# Write the transformed messages to another Kafka topic
query = (
transformed_df
.selectExpr("CAST(transformedMessage AS STRING) AS value")
.writeStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "dest-topic")
.option("checkpointLocation", "checkpoint_folder") # Specify the checkpoint location
.start()
)

query.awaitTermination()

Once you run the notebook successfully, run the producer script for `src-topic` and the consumer code for `dest-topic`. We will produce lowercase in src-topic, convert it to upper case and receive it in dest-topic.

In one terminal window, type the following for the producer script via src-topic

bin/kafka-console-producer.sh --topic src-topic --bootstrap-server localhost:9092

In the next terminal, type the following script:

bin/kafka-console-consumer.sh --topic dest-topic --bootstrap-server localhost:9092

Now, you should have two terminals side by side with the producer and consumer running.

Let’s try it.

Type out sample text in the producer terminal and see it being transformed into UPPERCASE in the consumer terminal.

That’s it, you were successfully able to use Spark Streaming to read and write from one topic to the other in Kafka using Spark Structured Streaming.

--

--

Anish Mahapatra

Senior AI & ML Engineer | Fortune 500 | Senior Technical Writer - Google me. Anish Mahapatra | https://www.linkedin.com/in/anishmahapatra/