Bits and Pieces

Insightful articles, step-by-step tutorials, and the latest news on full-stack composable software development

Follow publication

Async Flow Control using Queues in Practise for Web Applications

Lakindu Hewawasam
Bits and Pieces
Published in
10 min readDec 20, 2023

What is Queue Based Load Levelling?

When Should I Use Asynchronous Flow Control?

How To Implement Asynchronous Flow Control?

The Problem — Linear Scaling

The Solution — Using Async Flow Control

Let’s Code It Out

Step 01: Pre-requisites

{

"UserId": "XXX",
"Account": "XXX",
"Arn": "arn:aws:iam::XXX:user/XXX"
}

Step 02: Creating the Pulumi Project

pulumi new aws-typescript

Step 03: Provisioning the Queue

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
import * as awsx from "@pulumi/awsx";

const queue = new aws.sqs.Queue("message-queue", {
delaySeconds: 10, // 10 second message delay
messageRetentionSeconds: 1209600, // 14 days
sqsManagedSseEnabled: true,
visibilityTimeoutSeconds: 30,
});

Step 04: Defining the Lambda Functions

const sendMessageToQueueFunction = new aws.lambda.CallbackFunction(
"sendMessageToQueueFunction",
{
memorySize: 2024,
timeout: 200,
callback: async () => {
const sqs = new AWS_SDK.SQS({ region: "us-east-1" });
const queueUrl = queue.id.get();

// Simulate an irregular load by sending about 100 events with random delays
for (let i = 0; i < 100; i++) {
// Introduce a random delay (between 1 and 10 seconds) before sending each event
const delaySeconds = Math.floor(Math.random() * 10) + 1;
await new Promise((resolve) =>
setTimeout(resolve, delaySeconds * 1000)
);

// Send a message to the SQS queue
await sqs
.sendMessage({ QueueUrl: queueUrl, MessageBody: `Event ${i + 1}` })
.promise();
}
},
}
);


const processQueueFunction = new aws.lambda.CallbackFunction(
"processQueueFunction",
{
callback: async (event: aws.sqs.QueueEvent) => {
await Promise.all(
event.Records.map(async (record) => {
console.log(JSON.stringify({ record }));

const sqs = new AWS_SDK.SQS({ region: "us-east-1" });
await sqs
.deleteMessage({
QueueUrl: queue.url.get(),
ReceiptHandle: record.receiptHandle,
})
.promise();
})
);
},
memorySize: 2400,
timeout: 30,
}
);
queue.onEvent("onPush", processQueueFunction);

Step 05: Deploying the Infrastructure

pulumi up

Step 06: Testing the Infrastructure

Wrapping Up

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

Insightful articles, step-by-step tutorials, and the latest news on full-stack composable software development

Responses (2)

Write a response

For real life scenarios I think there is one specific case that often becomes problematic with this approach. If your peak load just goes slightly over the limits it will even out quite quickly and only cause minor delays in message delivery which…

--

This guide provide a practical solution for handling irregular event flows and improving scalability, also adeptly addresses potential rate limit issues. Great content

--