How to set up Kafka on Ubuntu
sudo apt-get update
java -version
sudo wget https://downloads.apache.org/kafka/3.6.0/kafka_2.12-3.6.0.tgz
Un-tar the file
sudo tar xzf kafka_2.12-3.6.0.tgz
Create new directory
sudo mkdir -p /usr/local/kafka
sudo mv kafka_2.12-3.6.0 /usr/local/kafka
Create the systemd unit file for zookeeper service
sudo nano /etc/systemd/system/zookeeper.service
Paste the below lines
[Unit]
Description=Apache Zookeeper service
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
Hit Ctrl + O (Save) and press Enter
Hit Ctrl + X (Exit)
sudo systemctl daemon-reload
Create the systemd unit file for kafka service.
sudo nano /etc/systemd/system/kafka.service
Paste Below Lines
[Unit]
Description=Apache Kafka Service
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service
[Service]
Type=simple
Environment="JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre"
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
[Install]
WantedBy=multi-user.target
Note: Modify the value of JAVA_HOME value,If the path of your Java installation is different.
sudo systemctl daemon-reload
sudo systemctl start zookeeper
sudo systemctl start kafka
Let’s check if it’s up and running. In case you have issues — most likely it’s the path or permission issues. Use ChatGPT/Google to address this. Also, take the time to read what is written in the code you pasted below the sudo nano command and see if it matches your system and location.
sudo systemctl status zookeeper
sudo systemctl status kafka
Now, make sure you are in the Kafka Folder
Run
Make a New Topic!
sudo bin/kafka-topics.sh --create --topic web_logs --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4
List all the topics
sudo bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Start the Producer & Consumer
sudo bin/kafka-console-producer.sh --broker-list localhost:9092 --topic anishmahapatra
sudo bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic anishmahapatra --from-beginning
Great, now you can run the code below on Jupyter Notebook. This would apply to my below configurations:
- Spark 3.5.0
- Kafka 2.12–3.4.0
- Scala 2.12
Run on Jupyter Notebook
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, IntegerType
# Create a SparkSession
spark = SparkSession.builder \
.appName("WebLogAnalysis") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.getOrCreate()
# Define the schema to parse JSON logs (adjust the fields and types according to your log structure)
log_schema = StructType() \
.add("timestamp", StringType()) \
.add("ip", StringType()) \
.add("url", StringType()) \
.add("status_code", IntegerType())
# Subscribe to Kafka topic and read the data
kafka_params = {
"kafka.bootstrap.servers": "localhost:9092",
"subscribe": "web_logs",
"startingOffsets": "latest"
}
df = spark \
.readStream \
.format("kafka") \
.options(**kafka_params) \
.load()
# Parse the value column using the defined schema
logs = df.selectExpr("CAST(value AS STRING)").alias("logs")
parsed_logs = logs \
.select(from_json("logs.value", log_schema).alias("parsed_log")) \
.select("parsed_log.*")
# Example analysis: Count HTTP response codes
http_codes_count = parsed_logs \
.groupBy("status_code") \
.count()
# Display the streaming HTTP response code counts
query = http_codes_count \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
Then, you can start the producer:
sudo bin/kafka-console-producer.sh --broker-list localhost:9092 --topic web_logs
{"timestamp": "2023-11-17T10:10:00", "ip": "192.168.1.3", "url": "example.com/about", "status_code": 200}
{"timestamp": "2023-11-17T10:15:00", "ip": "192.168.1.4", "url": "example.com/contact", "status_code": 500}
{"timestamp": "2023-11-17T10:20:00", "ip": "192.168.1.5", "url": "example.com/products", "status_code": 200}
{"timestamp": "2023-11-17T10:25:00", "ip": "192.168.1.6", "url": "example.com/services", "status_code": 404}
{"timestamp": "2023-11-17T10:30:00", "ip": "192.168.1.7", "url": "example.com/blog", "status_code": 200}
{"timestamp": "2023-11-17T10:35:00", "ip": "192.168.1.8", "url": "example.com/portfolio", "status_code": 303}
{"timestamp": "2023-11-17T10:40:00", "ip": "192.168.1.9", "url": "example.com/news", "status_code": 404}
{"timestamp": "2023-11-17T10:45:00", "ip": "192.168.1.10", "url": "example.com/login", "status_code": 200}
Code for Business Scenario
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, current_timestamp
from pyspark.sql.types import StructType, StringType, FloatType
# Define the schema of the incoming JSON data
stockSchema = StructType() \
.add("symbol", StringType()) \
.add("price", FloatType())
# Create a Spark Session
spark = SparkSession.builder \
.appName("RealTimeStockTickerProcessing") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.getOrCreate()
# Read from Kafka
rawStream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "stock-ticker") \
.load()
# Deserialize JSON data and select the fields
tickerData = rawStream.select(from_json(col("value").cast("string"), stockSchema).alias("data")).select("data.*")
# Create a 'timestamp' column based on current time
tickerDataWithTimestamp = tickerData.withColumn("timestamp", current_timestamp())
# Calculate the moving average
movingAvg = tickerDataWithTimestamp.groupBy(
col("symbol"),
window(col("timestamp"), "5 minutes", "30 seconds")
).avg("price")
# Start the computation
query = movingAvg.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", False) \
.start()
# Await termination
query.awaitTermination()
sudo bin/kafka-console-producer.sh --broker-list localhost:9092 --topic stock-ticker
{"symbol": "AAPL", "price": 150.25}
{"symbol": "GOOGL", "price": 2700.50}
{"symbol": "MSFT", "price": 305.75}