How to set up Kafka on Ubuntu

Anish Mahapatra
4 min readNov 19, 2023
Photo by Scott Webb on Unsplash
sudo apt-get update
java -version
sudo wget https://downloads.apache.org/kafka/3.6.0/kafka_2.12-3.6.0.tgz

Un-tar the file

 sudo tar xzf kafka_2.12-3.6.0.tgz

Create new directory

sudo mkdir -p /usr/local/kafka
sudo mv kafka_2.12-3.6.0 /usr/local/kafka

Create the systemd unit file for zookeeper service

sudo nano /etc/systemd/system/zookeeper.service

Paste the below lines

[Unit]
Description=Apache Zookeeper service
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

Hit Ctrl + O (Save) and press Enter

Hit Ctrl + X (Exit)

sudo systemctl daemon-reload

Create the systemd unit file for kafka service.

sudo nano /etc/systemd/system/kafka.service

Paste Below Lines

[Unit]
Description=Apache Kafka Service
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service

[Service]
Type=simple
Environment="JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre"
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh

[Install]
WantedBy=multi-user.target

Note: Modify the value of JAVA_HOME value,If the path of your Java installation is different.

sudo systemctl daemon-reload
sudo systemctl start zookeeper
sudo systemctl start kafka

Let’s check if it’s up and running. In case you have issues — most likely it’s the path or permission issues. Use ChatGPT/Google to address this. Also, take the time to read what is written in the code you pasted below the sudo nano command and see if it matches your system and location.

sudo systemctl status zookeeper
sudo systemctl status kafka

Now, make sure you are in the Kafka Folder

Run

Make a New Topic!

sudo bin/kafka-topics.sh --create --topic web_logs --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4

List all the topics

sudo bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Start the Producer & Consumer

sudo bin/kafka-console-producer.sh --broker-list localhost:9092 --topic anishmahapatra
sudo bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic anishmahapatra --from-beginning

Great, now you can run the code below on Jupyter Notebook. This would apply to my below configurations:

  • Spark 3.5.0
  • Kafka 2.12–3.4.0
  • Scala 2.12

Run on Jupyter Notebook

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, IntegerType

# Create a SparkSession
spark = SparkSession.builder \
.appName("WebLogAnalysis") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.getOrCreate()

# Define the schema to parse JSON logs (adjust the fields and types according to your log structure)
log_schema = StructType() \
.add("timestamp", StringType()) \
.add("ip", StringType()) \
.add("url", StringType()) \
.add("status_code", IntegerType())

# Subscribe to Kafka topic and read the data
kafka_params = {
"kafka.bootstrap.servers": "localhost:9092",
"subscribe": "web_logs",
"startingOffsets": "latest"
}

df = spark \
.readStream \
.format("kafka") \
.options(**kafka_params) \
.load()

# Parse the value column using the defined schema
logs = df.selectExpr("CAST(value AS STRING)").alias("logs")
parsed_logs = logs \
.select(from_json("logs.value", log_schema).alias("parsed_log")) \
.select("parsed_log.*")

# Example analysis: Count HTTP response codes
http_codes_count = parsed_logs \
.groupBy("status_code") \
.count()

# Display the streaming HTTP response code counts
query = http_codes_count \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()

query.awaitTermination()

Then, you can start the producer:

sudo bin/kafka-console-producer.sh --broker-list localhost:9092 --topic web_logs
{"timestamp": "2023-11-17T10:10:00", "ip": "192.168.1.3", "url": "example.com/about", "status_code": 200}
{"timestamp": "2023-11-17T10:15:00", "ip": "192.168.1.4", "url": "example.com/contact", "status_code": 500}
{"timestamp": "2023-11-17T10:20:00", "ip": "192.168.1.5", "url": "example.com/products", "status_code": 200}

{"timestamp": "2023-11-17T10:25:00", "ip": "192.168.1.6", "url": "example.com/services", "status_code": 404}
{"timestamp": "2023-11-17T10:30:00", "ip": "192.168.1.7", "url": "example.com/blog", "status_code": 200}
{"timestamp": "2023-11-17T10:35:00", "ip": "192.168.1.8", "url": "example.com/portfolio", "status_code": 303}
{"timestamp": "2023-11-17T10:40:00", "ip": "192.168.1.9", "url": "example.com/news", "status_code": 404}
{"timestamp": "2023-11-17T10:45:00", "ip": "192.168.1.10", "url": "example.com/login", "status_code": 200}

Code for Business Scenario

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, current_timestamp
from pyspark.sql.types import StructType, StringType, FloatType

# Define the schema of the incoming JSON data
stockSchema = StructType() \
.add("symbol", StringType()) \
.add("price", FloatType())

# Create a Spark Session
spark = SparkSession.builder \
.appName("RealTimeStockTickerProcessing") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.getOrCreate()

# Read from Kafka
rawStream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "stock-ticker") \
.load()

# Deserialize JSON data and select the fields
tickerData = rawStream.select(from_json(col("value").cast("string"), stockSchema).alias("data")).select("data.*")

# Create a 'timestamp' column based on current time
tickerDataWithTimestamp = tickerData.withColumn("timestamp", current_timestamp())

# Calculate the moving average
movingAvg = tickerDataWithTimestamp.groupBy(
col("symbol"),
window(col("timestamp"), "5 minutes", "30 seconds")
).avg("price")

# Start the computation
query = movingAvg.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", False) \
.start()

# Await termination
query.awaitTermination()
sudo bin/kafka-console-producer.sh --broker-list localhost:9092 --topic stock-ticker
{"symbol": "AAPL", "price": 150.25}
{"symbol": "GOOGL", "price": 2700.50}
{"symbol": "MSFT", "price": 305.75}

--

--

Anish Mahapatra

Senior AI & ML Engineer | Fortune 500 | Senior Technical Writer - Google me. Anish Mahapatra | https://www.linkedin.com/in/anishmahapatra/