Towards AWS

Where Cloud Experts, Heroes, Builders, and Developers share their stories, experiences, and solutions.

Follow publication

EventBridge Pipes Using Terraform

--

In this post, I will show you how to implement a simple event-driven application using EventBridge Pipes and Terraform. At the time of writing this post, EventBridge Pipes resources are not available with Terraform AWS Provider, therefore I use Terraform AWS Cloud Control Provider (awscc). You can follow this link to learn more about EventBridge Pipes.

Use Case

Let’s say we have an application that handles queries from different types of customers such as Gold, Silver and Platinum. Currently, these queries are published to a centralized SQS queue and processed by a consumer application.

Now we have a new requirement that we want to filter all the platinum customers' queries and reprocess them to include the customer support agent details before sending them to the consumer service. (Refer to the above diagram)

Components of the EventBridge Pipes

The following diagram shows the main components of the EventBridge Pipes that we are going to implement in this example.

Source

This is the starting point of the EventBridge Pipes. The pipe can receive events from various sources such as SQS, DynamoDB etc. In this example, pipe receives events from an SQS queue (customer-request).

resource "aws_sqs_queue" "customer_request_sqs" {
name = "customer-request"
}

Filtering

Since we need to filter platinum customer events from the source events, we can include the optional “Filtering” step into the pipe. You can use Amazon EventBridge event patterns to write filtering criteria. As you can see in the below code, I use the customer_type field in the SQS message body to filter platinum customer events.

{
filters = [{ pattern = "{ \"body\": { \"customer_type\": [\"Platinum\"] }}" }]
}

Enrichment

We can use the enrichment step of EventBridge Pipes to enhance the data from the source before sending it to the target. In this example, enrichment step does the following tasks.

  1. Transform events before sending them to an enrichment target.
  2. Include additional data into the event payload using an enrichment Lambda function.

Transform an input event

Following is a sample event of the customer request. You can see that createdDate field has a slightly different format and we would like to transform it as created_date before sending it to the enrichment Lambda function.

{
"id":"003",
"customer_type":"Platinum",
"query":"Need technical support",
"severity":"High",
"createdDate":"2023–02–25"
}

We can use EventBridge Pipes Enrichment Input Transformer to transform the event as below.

{
"id": "<$.body.id>",
"customer_type": "<$.body.customer_type>",
"query": "<$.body.query>",
"severity":"<$.body.severity>",
"created_date": "<$.body.createdDate>"
}

2. Include additional data into the event payload

In this example, we want to include the customer support agent's details in the event before sending them to consumer service. Therefore we need to reprocess the event and generate a new event. To do that, we can use a Lambda function as an EventBridge Pipes Enrichment target.

module "enrich_customer_request_lambda" {
source = "terraform-aws-modules/lambda/aws"

function_name = "enrich-customer-request"
source_path = "${path.module}/lambda/enrich-customer-request"
handler = "index.handler"
runtime = "nodejs18.x"
local_existing_package = "${path.module}/lambda/enrich-customer-request/index.zip"
create_role = false
lambda_role = aws_iam_role.enrich_customer_request_lambda_iam_role.arn
}

Enrichment Lambda handler

As you can see in the below code, I create a new event to include the customer agent details and return back to the pipe. Then the pipe will send enriched events to the target Lambda function.

export const handler = async (event) => {
console.log("Filtered platinum customer events : %O", event);

const enrichedEvents = []

if (event && event.length > 0) {
event.forEach(eventObj => {
let updateEvent = {
...eventObj,
...{
agent_id: "0001",
agent_name : "Mark John",
agent_email: "markj@abc.com"
}
}
enrichedEvents.push(updateEvent);
});
}
return enrichedEvents;
};

Target

This is the final step of the EventBridge Pipes that processes the data coming from the Enrichment step. In this example, we use a Lambda function as the Target.

module "process_customer_request_lambda" {
source = "terraform-aws-modules/lambda/aws"

function_name = "process-customer-request"
source_path = "${path.module}/lambda/process-customer-request"
handler = "index.handler"
runtime = "nodejs18.x"
local_existing_package = "${path.module}/lambda/process-customer-request/index.zip"
create_role = false
lambda_role = aws_iam_role.process_customer_request_lambda_iam_role.arn
}

Target Lambda handler

export const handler = async (event) => {
console.log("Enriched platinum customer events : %O", event);

if (event && event.length > 0) {
event.forEach(eventObject => {
console.log(eventObject)
});
}
return event;
};

Combined All Steps

Following is the awscc_pipes_pipe resource that creates the EventBridge Pipes with all the steps that we discussed before. You can download the complete source code from this link.

resource "awscc_pipes_pipe" "pipe" {
name = "pipe-customer-request"
role_arn = aws_iam_role.pipe_iam_role.arn

source = aws_sqs_queue.customer_request_sqs.arn

source_parameters = {
sqs = {
sqs_queue_parameters = {
batch_size = 10
}
}

filter_criteria = {
filters = [{ pattern = "{ \"body\": { \"customer_type\": [\"Platinum\"] }}" }]
}
}

enrichment = module.enrich_customer_request_lambda.lambda_function_arn
enrichment_parameters = {
input_template = "{\"id\": \"<$.body.id>\", \"customer_type\": \"<$.body.customer_type>\", \"query\": \"<$.body.query>\",\"severity\": \"<$.body.severity>\", \"created_date\" : \"<$.body.createdDate>\"}"
}

target = module.process_customer_request_lambda.lambda_function_arn
}

Testing

Find the sample file customer-request.json in the source code and run the below command. It will publish five customer requests to SQS (2 Platinum, 1 Gold and 2 Silver requests).

aws sqs send-message-batch \
--queue-url https://sqs.us-east-1.amazonaws.com/<AWS Account>/customer-request \
--region us-east-1 \
--entries file://customer-request.json

Enrichment Lambda function logs

The following log event shows the transformed event that EventBridge Pipes sent to the Enrichment Lambda function.

Target Lambda function logs

The following log event shows the enriched event that came from the Enrichment step.

Some Important Points

  • If you are using AWS KMS custom encryption key (CMK) to encrypt the source, you must explicitly give “kms:Decrypt” permission to pipe’s execution role.
  • If you encountered an error like below. Please delete the pipe and rerun the “terraform apply”. This is a known issue with the awscc_pipe resource.
 noSuchPath in source, path provided: //SourceParameters/FilterCriteria/Filters/0/Pattern

References

--

--

Published in Towards AWS

Where Cloud Experts, Heroes, Builders, and Developers share their stories, experiences, and solutions.

No responses yet

Write a response