In the previous article there was a presentation of how to autoscale the entire Earthquake notification ecosystem using some custom made scripts.
In this one we’ll use Terraform to provision the Lambdas and their configurations, and for the Confluent Kafka topics we’ll adapt the custom script made in the last article to a next level of automation.
And the cherry on top will be an entire pipeline of provisioning using GithubActions to upscale/downscale the system based on the DynamoDb items count.

Initial configurations and stages declarations
Starting with the code in the Lambda function which will orchestrate the entire scaling:
# Define the scaling stages
scaling_stages = {
(0, 100): (2, 2, 2), # 2 topics, 2 threads, 2 provisioned concurrency when item count is between 0 and 100
(101, 200): (4, 4, 4), # 4 topics, 4 threads, 4 provisioned concurrency when item count is between 101 and 200
(201, float('inf')): (6, 6, 6), # 6 topics, 6 threads, 6 provisioned concurrency when item count is above 200
}
# Initialize the DynamoDB client
dynamodb = boto3.client('dynamodb')
# GitHub repository details
GITHUB_REPO = "molayq/lambda_earthquake_v1"
GITHUB_FILE_PATH = "locals.tf"
GITHUB_TOKEN = ""
# Confluent Kafka details
CONFLUENT_API_KEY = ""
CONFLUENT_API_SECRET = ""
CONFLUENT_CLUSTER_ID = ""
CONFLUENT_BASE_URL = f"https://x.eu-central-1.aws.confluent.cloud/kafka/v3/clusters/{CONFLUENT_CLUSTER_ID}/topics"
You can see in the above code the scaling_stages dictionary which will have certain levels of scaling based on specific interval of subscribers. This is crucial in order to determine in an automatic way if the need is to upscale or to downscale the entire system.
The rest of configurations are Github and Confluent, very important is the github token which can be retrieved from the repo settings. Probably you ask why there is a locals.tf update but we’ll get to it in the following lines.
Moving on to the lambda handler we have:
def lambda_handler(event, context):
# Get the item count from DynamoDB
item_count = get_dynamodb_item_count('earthquake_notifications')
# Get the current scaling stage
topics, threads, concurrency = get_scaling_stage(item_count)
From get_scaling_stage the number of current topics, threads and function concurrency will be retrieved to see in which stage the system should be.
# Function to get the current scaling stage based on item count
def get_scaling_stage(item_count):
for (lower_bound, upper_bound), settings in scaling_stages.items():
if lower_bound <= item_count <= upper_bound:
return settings
return scaling_stages[max(scaling_stages.keys())] # Return the highest stage if item count exceeds all thresholds
And get_dynamodb_item_count function:
# Function to get the item count from DynamoDB table
def get_dynamodb_item_count(table_name):
response = dynamodb.scan(
TableName=table_name,
Select='COUNT'
)
return response['Count']
Github file store
Next we will build a github file where the important aspects: topics, threads, concurrency are stored. We could consider this file like a watcher file which knows exactly in which stage is the system in a particular moment. If no changes are needed then the file won’t be updated. And it’s a terraform file which will be used in the terraform automation code to provision the desired state.
# Generate the new content for locals.tf
new_content = f"""
locals {{
kafka_topics = {json.dumps(["earthquake_notifications_" + str(i + 1) for i in range(topics)])}
thread_count = {threads}
provisioned_concurrency = {concurrency}
}}
"""
# Get the current state from GitHub
file_info = get_file_info(GITHUB_REPO, GITHUB_FILE_PATH)
sha = file_info['sha']
current_content = base64.b64decode(file_info['content']).decode('utf-8')
We can see the get_file_info here:
# Helper function to get file information from GitHub
def get_file_info(repo, file_path):
github_token = GITHUB_TOKEN # Ensure the GITHUB_TOKEN is set in your Lambda environment variables
headers = {
"Authorization": f"token {github_token}",
"Content-Type": "application/json"
}
response = requests.get(
f"https://api.github.com/repos/{repo}/contents/{file_path}",
headers=headers
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"GitHub API error: {response.status_code} - {response.text}")
Locals.tf file comparison for scaling
In the following we determine if we need to change the stage of the system(if it needs to upscale/downscale) or everything is as it should.
# Determine if updates are needed
if new_content.strip() != current_content.strip():
# Update the locals.tf file on GitHub
update_response = update_github_file(new_content, sha)
print(f"Updated locals.tf with new scaling values: {update_response.status_code}")
# Determine topics to add or remove based on scaling
existing_topics = get_existing_confluent_topics()
desired_topics = ["earthquake_notifications_" + str(i + 1) for i in range(topics)]
topics_to_add = list(set(desired_topics) - set(existing_topics))
topics_to_remove = list(set(existing_topics) - set(desired_topics))
# Update Confluent Kafka topics
if topics_to_add:
update_confluent_topics(add=True, topics_to_change=topics_to_add)
if topics_to_remove:
update_confluent_topics(add=False, topics_to_change=topics_to_remove)
else:
print("No changes to the scaling parameters are needed.")
return {
'statusCode': 200,
'body': json.dumps('Lambda execution completed')
}
With update_github_file:
def update_github_file(new_content, sha):
url = f"https://api.github.com/repos/{GITHUB_REPO}/contents/{GITHUB_FILE_PATH}"
headers = {
"Authorization": "token",
"Content-Type": "application/json"
}
data = {
"message": "Update scaling parameters based on DynamoDB item count",
"committer": {
"name": "x",
"email": "x@coco.com"
},
"content": base64.b64encode(new_content.encode('utf-8')).decode('utf-8'),
"sha": sha,
"branch": "main"
}
response = requests.put(url, headers=headers, data=json.dumps(data))
return response
Which will update the github file as a form of commit and push kind of way but through requests
To get the existing Confluent Kafka topics we have get_existing_confluent_topics:
def get_existing_confluent_topics():
# Basic Authentication
basic_auth = base64.b64encode(f"{CONFLUENT_API_KEY}:{CONFLUENT_API_SECRET}".encode("utf-8")).decode("utf-8")
headers = {
"Authorization": f"Basic {basic_auth}",
"Content-Type": "application/json"
}
response = requests.get(CONFLUENT_BASE_URL, headers=headers)
if response.status_code == 200:
topics_data = response.json()
return [topic['topic_name'] for topic in topics_data['data']]
else:
print(f"Error fetching topics: {response.status_code} - {response.text}")
return []
And update_confluent_topics to update the topics if the system needs to scale:
def update_confluent_topics(add=True, topics_to_change=None):
# Basic Authentication
basic_auth = base64.b64encode(f"{CONFLUENT_API_KEY}:{CONFLUENT_API_SECRET}".encode("utf-8")).decode("utf-8")
headers = {
"Authorization": f"Basic {basic_auth}",
"Content-Type": "application/json"
}
for topic in topics_to_change:
if add:
payload = json.dumps({
"topic_name": topic,
"partitions_count": 6,
"replication_factor": 3
})
response = requests.post(CONFLUENT_BASE_URL, headers=headers, data=payload)
print(f"Added topic {topic}: {response.status_code} - {response.text}")
else:
response = requests.delete(f"{CONFLUENT_BASE_URL}/{topic}", headers=headers)
print(f"Removed topic {topic}: {response.status_code} - {response.text}")
So until now we have an orchestration mechanism which determines the scaling stage and if the system needs to have some changes or not based on the comparison of locals.tf file in github with what comes from the code logic. We demonstrated here a synergy between Lambda functions scaling possibility by using exactly a file that’s embedded in the provisioning code.
In the following part we’ll show how to use that information stored now in locals.tf with terraform triggered by the Github actions pipeline.
**This project is fully registered as an intelectual property, please do not reproduce any of the presented information.