S3 Flights data sent by Lambda functions through Kinesis Streams into AWS OpenSearch

In this research we will try to show you how to send data from a S3 bucket into AWS OpenSearch using Kinesis Streams and Kinesis Firehouse Streams. Lambda function will be used to ingest and send data to the Kinesis Streams.

This is the first time using this type integration between these AWS services so the data used will be from a flights ticket project and what we want to see is how to inject it into opensearch. But maybe in the future researches there will be data from SES events or WAF.

The below architecture is showing a full segregated pipeline for the case when we have multiple types of data, to make a difference in the pipeline stage, before reaching OpenSearch. Those will have different indices and index patterns.

But other architectures could also be build with a common producer and separated Kinesis Streams, or with full common pipeline for the cases where we don’t need separation of data inside the pipeline. And the filtering can be made exclusively in OpenSearch.

Starting from the S3 data which is structured as as JSON in an array of objects of form:

[
    {
        "price": 68,
        "outbound": "2023-11-23T12:05:00",
        "inbound": "2023-11-23T20:15:00",
        "checkedBagPrice": 96.81,
        "handBagPrice": 163.416,
        "totalPriceWCheckedBag": 164.81,
        "totalPriceWHandBag": 231.416,
        "bookingUrl": "x",
        "minSeatPrice": 0,
        "seatsAvailable": []
    },
    {
        "price": 74,
        "outbound": "2023-11-23T12:05:00",
        "inbound": "2023-11-24T08:15:00",
        "checkedBagPrice": 114.03,
        "handBagPrice": 166.152,
        "totalPriceWCheckedBag": 188.03,
        "totalPriceWHandBag": 240.152,
        "bookingUrl": "x",
        "minSeatPrice": 0,
        "seatsAvailable": []
    }
]

For the Lambda code:

import boto3
from datetime import datetime
import json

# Initialize the Boto3 clients
s3_client = boto3.client('s3')
kinesis_client = boto3.client('kinesis')

# S3 Bucket and Object Key
bucket_name = 'flights-crawler-bucket'
file_key = 'cluj-napoca-london/cluj-napoca-london.json'

# Kinesis Stream Name
stream_name = 'flights'

def lambda_handler(event, context):
    # Assuming the Lambda is triggered by an S3 event
    # If triggered differently, you'll need to adjust how you get file_key
    # for record in event['Records']:
    #     file_key = record['s3']['object']['key']

    # Retrieve the JSON file from S3
    response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
    file_content = response['Body'].read()
    data = json.loads(file_content)
    
    # Exclude the last object from the list
    data_to_process = data[:-1]
    
    for item in data_to_process:
        # Add a timestamp
        item['timestamp'] = datetime.utcnow().isoformat() + 'Z'  # Adding 'Z' to indicate UTC time
        
        # Remove the bookingUrl
        item.pop('bookingUrl', None)
        
        # Convert the item back to a JSON string format
        data_string = json.dumps(item)
        
        # Send the data to Kinesis
        response = kinesis_client.put_record(
            StreamName=stream_name,
            Data=data_string,
            PartitionKey='partition-key'  # Use an appropriate partition key
        )
        print(response)

    return {
        'statusCode': 200,
        'body': json.dumps('Data processing complete')
    }

For the Kinesis Stream we need to create one:

And link to it an Amazon Firehose Stream:

!The Amazon Firehose Stream will use a service role which needs to be point to the AWS OpenSearch cluster and to have permissions for pushing data into the cluster.

For the OpenSearch we used a simplified cluster, also for cost optimisations:

The price in the OpenSearch from AWS is more likely to increase by a lot due to the instance type and its replication in nodes and availability zones where the instances are deployed.

Inside the OpenSearch we need to configure a role and map it to the role in AWS(that which was created in the Amazon Firehose Stream). Also to give some specific permissions to the role

And the mapping between the AWS service role and the OpenSearch role:

Once the role is created we could proceed to the indices part where we have from the Amazon Firehouse an indice as “cluj-napoca-london” with a rotation per day. Which means that in OpenSearch we will send something like “cluj-napoca-london-09-02-2024” and every day another indice, in order to not create conflicts between data. The logic for this research is to ingest and push data every day in a new indice.

If everything is good, in the Indices screen we have:

And in order to search to those documents we need to define an index pattern with a timestamp as default time field

And if we go to discover panel all those documents in that indice will be shown:

And to create a dashboard for prices linked to the inbound dates:

In