Skip to content

Notifying fire alarm using SQS

Ballerina is an open-source programming language that empowers developers to integrate their system easily with the support of connectors. In this guide, we are mainly focusing on how to use Amazon SQS Connector to notify alerts. You can find other integration modules from the wso2-ballerina GitHub repository.

What you'll build

The following diagram illustrates the scenario:

Message flow diagram image

Let's consider a scenario where a fire alarm sends fire alerts to an Amazon SQS queue. As message duplication is not an issue, an Amazon SQS Standard Queue is used as the queue. A fire alarm listener is polling the messages in the SQS queue. When fire alerts are available, it will consume the messages from the queue and remove the messages from the queue. Consumed messages are showed in the console to the User.

Here, the fire alarm is sending fire alerts periodically from a Ballerina worker where listener polls in another worker. Both sent messages and received messages are printed in the console.

As there can be multiple alert messages available in the queue, the listener is configured to consume more than one message at a time.

Before you begin

  • Download and install Ballerina Integrator for your operating system

    Note: This installs Ballerina language version 1.0.1, a compiler plugin to help mitigate errors, and snippet .jar files for custom auto-completion.

  • Install Oracle JDK 1.8.*
  • Install VS Code

    Tip: For a better development experience, use VS Code (which is the recommended editor for Ballerina Integrator).

  • Install the Ballerina Integrator Extension from the VS Code marketplace

  • Amazon SQS Account

Get the code

You can download the ZIP file and extract the contents to get the project code.

Download ZIP

Implementation

1. Create a new project.

```bash
$ ballerina new notifying-fire-alarm-using-sqs
```

2. Create a module.

```bash
$ ballerina add alert_notification_using_amazonsqs
```

To implement the scenario in this guide, you can use the following package structure:

  notifying-fire-alarm-using-sqs
  ├── Ballerina.toml
  └── src
      └── alert_notification_using_amazonsqs
          ├── Module.md
          ├── create_notification_queue.bal
          ├── notify_fire.bal
          ├── listen_to_fire_alarm.bal
          └── main.bal

Now that you have created the project structure, the next step is to develop the integration scenario.

3. Set up credentials for Amazon SQS

Create a new access key, which includes a new secret access key. - To create a new secret access key for your root account, use the security credentials page. Expand the Access Keys section, and then click Create New Root Key.

To create a new secret access key for an IAM user, open the IAM console. Click Users in the Details pane, click the appropriate IAM user, and then click Create Access Key on the Security Credentials tab.

Download the newly created credentials, when prompted to do so in the key creation wizard.

4. Add project configurations file

Add the project configuration file by creating a ballerina.conf file under the root path of the project structure. This file should have following configurations. Add the obtained Amazon SQS configurations to the file.

ACCESS_KEY_ID="<Amazon SQS key ID>"
SECRET_ACCESS_KEY="<Amazon SQS secret key>"
REGION="<Amazon SQS region>"
ACCOUNT_NUMBER="<Amazon Account number>"

When a queue is created you can find the ACCOUNT_NUMBER under Details tab in the SQS Management Console as shown below.

SQS Console

5. Write the integration.

Take a look at the code samples below to understand how to implement the integration scenario.

The following code creates a new queue in Amazon SQS with the configuration provided in a file. In order to create a queue initialize the amazonsqs:Client with configuration parameters and invoke the createQueue method of it.

create_notification_queue.bal

import ballerina/config;
import wso2/amazonsqs;

function createNotificationQueue(string queueName) returns @tainted string|error {
    // Amazon SQS client configuration
    amazonsqs:Configuration configuration = {
        accessKey: config:getAsString("ACCESS_KEY_ID"),
        secretKey: config:getAsString("SECRET_ACCESS_KEY"),
        region: config:getAsString("REGION"),
        accountNumber: config:getAsString("ACCOUNT_NUMBER")
    };

    // Amazon SQS client
    amazonsqs:Client sqsClient = new(configuration);

    // Create SQS Standard Queue for notifications
    string|error queueURL = sqsClient->createQueue(queueName, {});
    return queueURL;
}

The following code generates fire alert notifications periodically and these are sent to the above created SQS queue. Instead of the while loop added, you can add some custom logic to trigger fire alarm. Create a client as described in step 1 and invoke sendMessage method to send alert message to the SQS queue.

notify_fire.bal

import ballerina/config;
import ballerina/log;
import ballerina/runtime;
import wso2/amazonsqs;

// periodicallySendFireNotifications, which periodically send fire alerts to Amazon SQS queue
function periodicallySendFireNotifications(string queueResourcePath) {

    // Amazon SQS client configuration
    amazonsqs:Configuration configuration = {
        accessKey: config:getAsString("ACCESS_KEY_ID"),
        secretKey: config:getAsString("SECRET_ACCESS_KEY"),
        region: config:getAsString("REGION"),
        accountNumber: config:getAsString("ACCOUNT_NUMBER")
    };

    // Amazon SQS client
    amazonsqs:Client sqsClient = new(configuration);

    while (true) {

        // Wait for 5 seconds
        runtime:sleep(5000);
        string queueUrl = "";

        // Send a fire notification to the queue
        amazonsqs:OutboundMessage|error response = sqsClient->sendMessage("There is a fire!",
            queueResourcePath, {});
        // When the response is valid
        if (response is amazonsqs:OutboundMessage) {
            log:printInfo("Sent an alert to the queue. MessageID: " + response.messageId);
        } else {
            log:printError("Error occurred while trying to send an alert to the SQS queue!");
        }
    }

}

The following code listens to the SQS queue and if there are any notifications, it would receive from the queue and delete the existing messages in the queue. sleep method in the while loop can be called according to the polling interval. Then create the client as described in step 1 and invoke receiveMessage method. Depending on the MaxNumberOfMessages parameter set in the attributes array, maximum number of messages received per API invocation will be restricted. Each message can be accessed with receiptHandle value in the response. Once the message is read it can be deleted by invoking the deleteMessage method.

listen_to_fire_alarm.bal

import ballerina/config;
import ballerina/log;
import ballerina/runtime;
import wso2/amazonsqs;

// listenToFireAlarm, which listens to the Amazon SQS queue for fire notifications with polling
function listenToFireAlarm(string queueResourcePath) {

    // Amazon SQS client configuration
    amazonsqs:Configuration configuration = {
        accessKey: config:getAsString("ACCESS_KEY_ID"),
        secretKey: config:getAsString("SECRET_ACCESS_KEY"),
        region: config:getAsString("REGION"),
        accountNumber: config:getAsString("ACCOUNT_NUMBER")


    };

    // Amazon SQS client
    amazonsqs:Client sqsClient = new(configuration);
    string receivedReceiptHandler = "";

    // Receive a message from the queue
    map<string> attributes = {};
    // MaxNumberOfMessages, the maximum number of messages that can be received per request
    attributes["MaxNumberOfMessages"] = "10";
    // VisibilityTimeout, time allowed to delete after received, in seconds
    attributes["VisibilityTimeout"] = "2";
    // WaitTimeSeconds, waits for this time (in seconds) till messages are collected before received
    attributes["WaitTimeSeconds"] = "1";

    while(true) {

        // Wait for 5 seconds
        runtime:sleep(5000);
        // Receive messages from the queue
        amazonsqs:InboundMessage[]|error response = sqsClient->receiveMessage(queueResourcePath, attributes);

        // When the response is not an error
        if (response is amazonsqs:InboundMessage[]) {

            // When there are messages available in the queue
            if (response.length() > 0) {
                log:printInfo("************** Received fire alerts! ******************");
                int deleteMssageCount = response.length();
                log:printInfo("Going to delete " + deleteMssageCount.toString() + " messages from queue.");

                // Iterate on each message
                foreach var eachResponse in response {

                    // Keep receipt handle for deleting the message from the queue
                    receivedReceiptHandler = eachResponse.receiptHandle;

                    // Delete the received the messages from the queue
                    boolean|error deleteResponse = sqsClient->deleteMessage(queueResourcePath, receivedReceiptHandler);

                    // When the response from the delete operation is valid
                    if (deleteResponse is boolean && deleteResponse) {
                        if (deleteResponse) {
                            log:printInfo("Deleted the fire alert \"" + eachResponse.body + "\" from the queue.");
                        }
                    } else {
                        log:printError("Error occurred while deleting the message.");
                    }
                }
            } else {
                log:printInfo("Queue is empty. No messages to be deleted.");
            }

        } else {
            log:printError("Error occurred while receiving the message.");
        }
    }
}

In the following code, the main method would implement the workers related to creating a queue, sending a message to the queue, and consuming and receiving/deleting messages from the queue. There the workers can be replaced with the relevant code. queueCreator code should be called once to setup the queue. Code in the fireNotifier can be called from the fire alarm triggering side while fireListener should reside in the alarm polling/listening code.

main.bal

import ballerina/log;
import wso2/amazonsqs;

// Executes the workers in the guide
public function main(string... args) {

    // queueCreator, creates a new queue
    worker queueCreator {
        log:printInfo("queueCreator started ....");
        string queueResourcePath = "";
        // Create the queue
        string|error queueURL = createNotificationQueue("fireNotifications");
        // When the queue creation operation is successful
        if (queueURL is string) {
            // Extract the SQS queue resource path from the URL
            queueResourcePath = amazonsqs:splitString(queueURL, "amazonaws.com", 1);
            log:printInfo("Queue Resource Path: " + queueResourcePath);
        } else {
            log:printError("Error occurred while creating the queue.");
        }

        // Send the resource path of the queue to the fire notifier
        queueResourcePath -> fireNotifier;
        // Send the resource path of the queue to the fire listener
        queueResourcePath -> fireListener;
    }

    // Fire notifier worker which publishes to the SQS queue
    worker fireNotifier {
        log:printInfo("fireNotifier started ....");
        // Get the resource path from the queue creator
        string queueResourcePath = <- queueCreator;
        // Starts to periodically send fire alerts
        periodicallySendFireNotifications(queueResourcePath);
    }

    // Fire listener which listens to the SQS queue
    worker fireListener {
        log:printInfo("fireListener started ....");
        // Get the resource path from the queue creator
        string queueResourcePath = <- queueCreator;
        // Starts to listen for the fire alerts via polling
        listenToFireAlarm(queueResourcePath);
    }

    // Starts from the queue creator worker
    wait queueCreator;

}

Testing

First let’s build the module. Navigate to the project root directory and execute the following command.

  $ ballerina build alert_notification_using_amazonsqs

This creates the executables. Now run the guide.jar file created in the above step.

  $ java -jar target/bin/alert_notification_using_amazonsqs.jar

You see the SQS queue creation, sending fire alerts to the queue, consuming process of queues and subsequent deletion process on console.

Top