How to create AWS Kinesis Data Stream from scratch

Anish Mahapatra
6 min readNov 21, 2023

--

This blog will give you a walkthrough of how to create an AWS Kinesis Data Stream from scratch.

Photo by Daniel Eledut on Unsplash

Head on to the AWS Home Console and search for IAM in the search bar and select IAM Roles.

Select Trusted Entities

Search for Lambda under “Use Case” and select it.

Click the orange next button and go to the Add Permissions tab.

Add Permissions

Here, you can add the below permissions:

  • AmazonKinesisFullAccess
  • AmazonS3FullAccess
  • CloudWatchFullAccess

Name, Review and Create

Add the following:

  • Role name: first-data-stream-role

Review and leave everything else to the default and hit Create Role!

The Role should then be created.

Create Lambda Function

Search for Lambda function in the Search Bar.

Hit Create function.

Create Producer Function

  • Function Name: producer
  • Architecture: x86_64
  • Change default execution role > use an existing role > first-data-stream-role
  • Hit Create Function!

Producer has now been successfully created.

Create Consumer 1 & Consumer 2 Functions

Consumer1

Consumer2

Review producer, consumer1, consumer2

Create S3 bucket

Search for S3 in the search bar.

Select Create Bucket

Create bucket with unique name. I have named mine anishkinesis.

AWS Region: Select the one that is closest to you.

Note: So, the region of lambda function and AWS Bucket has to match. But, for now to keep it standard, let’s select US East — 1 for now.

The bucket will show up in the S3 Buckets Tab.

Select the bucket you made and Go to the properties tab.

Create Event Notification on S3 Bucket

Do Ctrl + F on the page and search for Event Notifications. Here, select “Create Event Notifications”

General Configuration

Input the following:

Event Types

Under Event Types, make the “All object create events”

Destination

Next, Ctrl+F and search for the Destination tab.

Select the Lambda Function, and select producer.

Note: In case producer is not available, go to the Lambda functions tab in AWS and see the location. I had an error where the S3 bucket and Lambda function regions were not matching.

Create Kinesis Data Streams

Search for Kinesis on the search bar.

Select Create Data Stream.

Since we have 2 consumers, select 2 shards.

Hit Create and now, the Kinesis Data Stream should be created.

Now, go to Configuration

Select Edit Encryption

Select AWS-managed CMK.

Write Code for the producer

Now we will write the Producer code in Javascript (Don’t worry if you don’t know JS. This code can be used for all data flows )

Go to the Lambda function for producer and you will see the code window below. Paste the following code. You will see a change in the function overview. You will notice the S3 bucket we attached is now present.

const AWS = require("aws-sdk");
AWS.config.update({
region: "eu-north-1"
})

const s3 = new AWS.S3();
const kinesis = new AWS.Kinesis();

exports.handler = async (event) => {
console.log(JSON.stringify(event));
const bucketName = event.Records[0].s3.bucket.name;
const keyName = event.Records[0].s3.object.key;
const params = {
Bucket: bucketName,
Key: keyName
}

await s3.getObject(params).promise().then(async (data) => {
const dataString = data.Body.toString();
const payload = {
data: dataString
}

await sendToKinesis(payload, keyName);
}, error => {
console.error(error);

})

};

async function sendToKinesis(payload, partitionKey) {
const params = {
Data: JSON.stringify(payload),
PartitionKey: partitionKey,
StreamName: "datastream1"
}

await kinesis.putRecord(params).promise().then(response => {
console.log(response);
}, error => {
console.error(error);
})
}

Consumer1 Code

Write the code for consumer1

export const handler = async (event) => {
console.log(JSON.stringify(event));
for (const record of event.Records) {
const data = JSON.parse(Buffer.from(record.kinesis.data, 'base64'));
// send emails to clients, publisht the data to social media
console.log('consumer #1', data);
}
};

Deploy the changes. Then, go to add trigger in the consumer1 function overview.

Select Kinesis.

Select the right configurations as shown below.

Upload file to S3 Bucket

Type S3 in the search panel and go to your bucket.

Make a .txt file with any 2–3 lines of your choice of words. I can show you a sample below.

This is a sample script for Data Engineering
You can reach out to me on LinkedIn
https://www.linkedin.com/in/anishmahapatra/

Go to your bucket, drag and drop the file, or select the upload button and select the file.

You will be able to see the data in the bucket now, as shown below.

--

--

Anish Mahapatra

Senior AI & ML Engineer | Fortune 500 | Senior Technical Writer - Google me. Anish Mahapatra | https://www.linkedin.com/in/anishmahapatra/