Photo by Ben White on Unsplash

Real-time Anomaly Detection: How Amazon Kinesis Can Help You Stay Ahead of the Game

Mahesh
18 min readApr 24, 2023

--

Stay Ahead of the Game with Anomaly Detection: Using Amazon Kinesis Data Firehose and Analytics for Real-Time Insights

Anomaly detection is a critical task in data processing, especially when it comes to streaming data. As more and more data is generated in real-time, it’s important to detect any unusual patterns or outliers that may indicate a problem or opportunity.

The traditional approach of batch processing cannot keep up with the volume, velocity, and variety of streaming data generated by modern applications. This is where continuous analytics comes into play.

Continuous analytics is an essential part of modern data processing, especially when it comes to analysing streaming data in real-time. With the growth of the internet of things (IoT), the need for continuous analytics has become even more critical, as the data generated by these devices needs to be analysed in real-time to detect any anomalies or trends.

Today we will discuss how you can use Amazon Kinesis Data Firehose and Amazon Kinesis Data Analytics Application (KDA) for near real-time detection of anomalies in your data streams.

Amazon Kinesis Data Firehose: Streaming Data Pipeline

Amazon Kinesis Data Firehose is a fully managed service that allows you to capture, transform, and load streaming data from various sources such as databases, log files, social media platforms, and IoT devices into destinations like Amazon S3, Amazon Redshift, and Amazon Elasticsearch Service. With Amazon Kinesis Data Firehose, you don’t have to write any code to build or maintain a custom pipeline. You simply specify the source of your stream data, define your transformation rules, and configure your destination endpoint(s).

Amazon Kinesis Data Analytics Application: Real-Time Analysis of Streaming Data

Once your streaming data is being ingested into Amazon S3 through Amazon Kinesis Data Firehose, it can be further analyzed in real time using Amazon Kinesis Data Analytics Application. KDA is a scalable and durable service that helps you analyze large volumes of streaming data. It enables you to continuously process streams of data in real-time and discover patterns, correlations, trends, outliers, and other insights. KDA provides built-in functions for filtering, mapping, aggregating, scaling, and enriching your data streams as they arrive, allowing you to perform complex calculations on raw data directly within the service itself.

Amazon Kinesis Data Firehose and Amazon Kinesis Data Analytics are two services that can be used together with services like Amazon Simple Notification Service to provide continuous analytics on streaming data.

Here is the high level system design:

Prerequisites:

  1. Access to AWS account and its services
  2. Some Python and SQL knowledge
  3. Some knowledge of infrastructure and ML

Summary:

  1. First we will set up necessary IAM and required roles
  2. Then we will create a lambda function that will take input from Firehose and return a prediction
  3. Now we will create a Firehose Delivery Stream
  4. And a Lambda SNS function
  5. At this step we will create the Kinesis Data Analytics App
  6. In the end we will put records in the Data Firehose
  7. We will review and release resources

Here is how you can do it:

0. Dataset

All machine learning solutions begin with dataset.

Not this one.

Here I focus mainly on the system architecture, because I feel many new engineers lack this skill.

So we will insert dummy dataset wherever needed. And for our problem, we define an anomaly as a negative user sentiment.

So if we detect a user’s sentiment is positive from the input review text, then this is not an anomaly for our system. We will create notifications only for negative sentiments/labels/reviews/ouputs.

1. Setup

Setting up IAM (Identity and Access Management) and permissions in this project is extremely important. Because AWS works on zero trust security model it becomes imperative that you create IAM roles with limited permissions that allow users to perform only specific tasks, such as training a model or deploying an endpoint.

By setting up IAM policies and permissions, you can control who has access to your Amazon SageMaker resources, such as notebooks, data, models, and endpoints. You can specify what actions users or applications are allowed to perform on these resources, and under what conditions. I suggest least-privilege access to all your users and regular user permission purging of unused resources.

import json
import time

import boto3
import pandas as pd
import sagemaker
from botocore.exceptions import ClientError

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

sts = boto3.Session().client(service_name="sts", region_name=region)
iam = boto3.Session().client(service_name="iam", region_name=region)
lam = boto3.Session().client(service_name="lambda", region_name=region)
firehose = boto3.Session().client(service_name="firehose", region_name=region)
sns = boto3.Session().client(service_name="sns", region_name=region)
kinesis_analytics = boto3.Session().client(
service_name="kinesisanalytics", region_name=region
)

Let’s create a Kinesis Role:

Please note that `ADOAWS_*` suffix in role and function names stands for Anomaly Detection On AWS

iam_kinesis_role_name = "ADOAWS_Kinesis"

assume_role_policy_doc = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "kinesis.amazonaws.com"},
"Action": "sts:AssumeRole",
},
{
"Effect": "Allow",
"Principal": {"Service": "firehose.amazonaws.com"},
"Action": "sts:AssumeRole",
},
{
"Effect": "Allow",
"Principal": {"Service": "kinesisanalytics.amazonaws.com"},
"Action": "sts:AssumeRole",
},
],
}

try:
iam_role_kinesis = iam.create_role(
RoleName=iam_kinesis_role_name,
AssumeRolePolicyDocument=json.dumps(assume_role_policy_doc),
Description="DSOAWS Kinesis Role",
)
print("Role succesfully created.")
except ClientError as e:
if e.response["Error"]["Code"] == "EntityAlreadyExists":
iam_role_kinesis = iam.get_role(RoleName=iam_kinesis_role_name)
print("Role already exists.")
else:
print("Unexpected error: %s" % e)

time.sleep(30)

iam_role_kinesis_name = iam_role_kinesis["Role"]["RoleName"]
print(f"Role Name: {iam_role_kinesis_name}")

iam_role_kinesis_arn = iam_role_kinesis["Role"]["Arn"]
print(f"Role ARN: {iam_role_kinesis_arn}")

account_id = sts.get_caller_identity()["Account"]

You should an output like below upon successful execution.

Successful Response

Give your stream a name:

stream_name = "adows-kinesis-data-stream"

Firehose name:

firehose_name = "dsoaws-kinesis-data-firehose"

Lambda Function name that will be connected to Amazon SNS:

lambda_fn_name_sns = "PushNotificationToSNS"

Creat a Kinesis Policy:

kinesis_policy_doc = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject",
],
"Resource": [f"arn:aws:s3:::{bucket}", f"arn:aws:s3:::{bucket}/*"],
},
{
"Effect": "Allow",
"Action": ["logs:PutLogEvents"],
"Resource": [f"arn:aws:logs:{region}:{account_id}:log-group:/*"],
},
{
"Effect": "Allow",
"Action": [
"kinesis:*",
],
"Resource": [f"arn:aws:kinesis:{region}:{account_id}:stream/{stream_name}"],
},
{
"Effect": "Allow",
"Action": [
"firehose:*",
],
"Resource": [
f"arn:aws:firehose:{region}:{account_id}:deliverystream/{firehose_name}"
],
},
{
"Effect": "Allow",
"Action": [
"kinesisanalytics:*",
],
"Resource": ["*"],
},
{
"Sid": "UseLambdaFunction",
"Effect": "Allow",
"Action": ["lambda:InvokeFunction", "lambda:GetFunctionConfiguration"],
"Resource": ["*"],
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": ["arn:aws:iam::*:role/service-role/kinesis*"],
},
],
}

response = iam.put_role_policy(
RoleName=iam_role_kinesis_name,
PolicyName="ADOAWS_KinesisPolicy",
PolicyDocument=json.dumps(kinesis_policy_doc),
)

time.sleep(30)

print(json.dumps(response, indent=4, sort_keys=True, default=str))

Feel free to change variables like `region` and `bucket` as per your desire. I created this code on Sagemaker Studio, hence it will use default Sagemaker bucket in S3.

(You can change the bucket in the Setup section)

Create a Lambda IAM Role:

iam_lambda_role_name = "ADOAWS_Lambda"

assume_role_policy_doc = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole",
},
{
"Effect": "Allow",
"Principal": {"Service": "kinesisanalytics.amazonaws.com"},
"Action": "sts:AssumeRole",
},
],
}

try:
iam_role_lambda = iam.create_role(
RoleName=iam_lambda_role_name,
AssumeRolePolicyDocument=json.dumps(assume_role_policy_doc),
Description="ADOAWS Lambda Role",
)
print("Role succesfully created.")
except ClientError as e:
if e.response["Error"]["Code"] == "EntityAlreadyExists":
iam_role_lambda = iam.get_role(RoleName=iam_lambda_role_name)
print("Role already exists")
else:
print("Unexpected error: %s" % e)

time.sleep(30)

iam_role_lambda_name = iam_role_lambda["Role"]["RoleName"]
print(f"Role Name: {iam_role_lambda_name}")

iam_role_lambda_arn = iam_role_lambda["Role"]["Arn"]
print(f"Role ARN: {iam_role_lambda_arn}")
Successful Response

Attach AWS Lambda Policy to the IAM role you created above:

lambda_policy_doc = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "UseLambdaFunction",
"Effect": "Allow",
"Action": ["lambda:InvokeFunction", "lambda:GetFunctionConfiguration"],
"Resource": f"arn:aws:lambda:{region}:{account_id}:function:*",
},
{"Effect": "Allow", "Action": "cloudwatch:*", "Resource": "*"},
{"Effect": "Allow", "Action": "sns:*", "Resource": "*"},
{
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": f"arn:aws:logs:{region}:{account_id}:*",
},
{"Effect": "Allow", "Action": "sagemaker:InvokeEndpoint", "Resource": "*"},
{
"Effect": "Allow",
"Action": ["logs:CreateLogStream", "logs:PutLogEvents"],
"Resource": f"arn:aws:logs:{region}:{account_id}:log-group:/aws/lambda/*",
},
],
}

response = iam.put_role_policy(
RoleName=iam_role_lambda_name,
PolicyName="ADOAWS_LambdaPolicy",
PolicyDocument=json.dumps(lambda_policy_doc),
)

time.sleep(30)

print(json.dumps(response, indent=4, sort_keys=True, default=str))

Now we have required permissions setup. If you face any errors it could be due the fact that your IAM do not have permission for those resources.

In that case you can add permissions incrementally and try to make it work or add all permissions of above mentioned resources and purge them one by one till it breaks. Choice is yours 🤪

2. Create Lambda to return predictions

Creating First Lambda Function

Records from our Firehose will be sent to this Lambda for predictions. After prediction we will send it to Kinesis Analytics app for anomaly detection.

If Kinesis Analytics app, detects an anomaly then it will trigger another lambda which will push notification to Amazon SNS (probably at 3 A.M. 😮‍💨)

To make predictions in this Lambda, we can either:

1. Train a machine learning anomaly detection model and deploy it at an endpoint which this Lambda will invoke during run time,

OR

2. Use hugging face distilled Bert pre-trained model for sentiment analysis and save the time for our dummy example

I went ahead with option 2. But for your use cases and real anomaly detection option 1 is a well suited choice. I have given some reference code for that too at the end of this section.

:: Option 2 (used in this demo)

Create a .zip file for the Python dependencies (Lambda Layer)

This requires us to create a directory called python for Python environments.

!mkdir src
!touch src/get_pred_from_kinesis.py

# USE (Recommended) if you'r env do not has zip
# !conda install -c conda-forge zip -y
# OR
# !pip install zip

Now we install transformers library that will be published as a layer to our lambda function:

!rm -rf layer/python
!mkdir -p layer/python
!pip install -q --target layer/python transformers
!cd layer && zip -q --recurse-paths layer.zip .

Let’s load the zip file as binary:

with open("layer/layer.zip", "rb") as f:
layer = f.read()

Publish the layer (we will attach it to our lambda in a bit):

transformers_lambda_layer_name = "transformers-python-sdk-layer"
layer_response = lam.publish_layer_version(
LayerName=transformers_lambda_layer_name,
Content={"ZipFile": layer},
Description="Layer with 'pip install transformers'",
CompatibleRuntimes=["python3.9"],
)

layer_version_arn = layer_response["LayerVersionArn"]

print(
f"Lambda layer {transformers_lambda_layer_name} successfully created with LayerVersionArn {layer_version_arn}."
)

If this doesn’t work due to upload limit, you can manually upload the zip file from the Amazon Lambda layers UI and update variable name layer_version_arn with your ARN

Create a .zip file for our Python code (Lambda Function)

This is the contents of our src/get_pred_from_kinesis.py file:

"""Python Lambda Handler function."""
import base64
import json
import random

from transformers import pipeline

sent_pipeline = pipeline(
"sentiment-analysis",
model="distilbert-base-uncased-finetuned-sst-2-english"
)

def lambda_handler(event, context) -> json:
"""Lambda handler function."""
outputs = []

for record in event["records"]:
payload = base64.b64decode(record["data"])
text = payload.decode("utf-8")

split_inputs = text.split("\t")
review_body = split_inputs[2]

predictions = sent_pipeline(review_body)
inputs = [{"features": [review_body]}]

for pred, input_data in zip(predictions, inputs):
review_id = random.randint(0, 1000)

# review_id, star_rating, review_body
output_data = "{}\t{}\t{}".format(
split_inputs[0], 0 if pred["label] == "NEGATIVE" else 1, input_data["features"]
)
output_data_encoded = output_data.encode("utf-8")
output_record = {
"recordId": record["recordId"],
"result": "Ok",
"data": base64.b64encode(output_data_encoded).decode("utf-8"),
}
outputs.append(output_record)

return {"records": outputs}

If above code gives your errors in Amazon Lambda try other options like removing transformers layer and hardcoding predictions to see if it works.

!zip src/GetPredFromKinesis.zip src/get_pred_from_kinesis.py

Load the zip file as binary:

with open("src/GetPredFromKinesis.zip", "rb") as f:
code = f.read()

Create The Lambda Function

pred_lambda_fn_name = "GetPredFromKinesis"
try:
response = lam.create_function(
FunctionName=f"{pred_lambda_fn_name}",
Runtime="python3.9",
Role=f"{iam_role_lambda_arn}",
Handler="src/get_pred_from_kinesis.lambda_handler",
Code={"ZipFile": code},
Layers=[layer_version_arn],
Description="Get sentiment prediction using transformers on review input text.",
# max timeout supported by Firehose is 5min
Timeout=300,
MemorySize=128,
Publish=True,
)
print(f"Lambda Function {pred_lambda_fn_name} successfully created.")
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceConflictException":
response = lam.update_function_code(
FunctionName=f"{pred_lambda_fn_name}",
ZipFile=code,
Publish=True,
DryRun=False,
)
print(f"Updating existing Lambda Function {pred_lambda_fn_name}. This is OK.")
else:
print(f"Error: {e}")

# Response : Lambda Function GetPredFromKinesis successfully created.
response = lam.get_function(FunctionName=pred_lambda_fn_name)

pred_lambda_fn_arn = response["Configuration"]["FunctionArn"]
print(pred_lambda_fn_arn)

# Response: arn:aws:lambda:us-east-1:XXXXXXXXXXXX:function:GetPredFromKinesis

:: Option 1(NOT used in this demo)

Train a Sagemaker Model and create an endpoint

For you production use cases you will probably need a anomaly detection model trained on your data.

You can use any tool, but since we have only been using AWS so far I give you below an example of training and deploying a XGBoost Classifier model on Sagemaker.

Please note that this has not been tested:

Using XGBoost Classifier on sagemaker:

import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.amazon.amazon_estimator import get_image_uri

# Set up a SageMaker session
sagemaker_session = sagemaker.Session()

# Get the IAM role used by SageMaker to access AWS resources
role = get_execution_role()

# Define the name of the S3 bucket to be used for data storage
bucket_name = 'my-sentiment-analysis-bucket'

# Define the location of the training and validation data
train_data_location = f"s3://{bucket_name}/train"
validation_data_location = f"s3://{bucket_name}/validation"

# Define the location where the trained model will be stored
model_artifact_location = f"s3://{bucket_name}/models"

# Set up a SageMaker estimator using the Amazon SageMaker XGBoost algorithm
container = get_image_uri(boto3.Session().region_name, "xgboost")
estimator = sagemaker.estimator.Estimator(
container,
role,
train_instance_count=1, # This is bad
train_instance_type="ml.m4.xlarge",
output_path=model_artifact_location,
sagemaker_session=sagemaker_session
)

# Set up hyperparameters
estimator.set_hyperparameters(
objective="binary:logistic",
num_round=100
)

# Set up data channels
train_data = sagemaker.inputs.TrainingInput(
train_data_location,
content_type="text/csv"
)
validation_data = sagemaker.inputs.TrainingInput(
validation_data_location,
content_type="text/csv"
)

data_channels = {'train': train_data, 'validation': validation_data}

# Start the training job
estimator.fit(inputs=data_channels)

There are plently of examples available online to help you with this. I will recommend you add Sagemaker Clarify, Sagemaker Lineage Tracking and Sagemaker Hyperparameter Tuning, in your mix to help you get more out of your MDLC (model development life cycle).

training_job_name = estimator.latest_training_job.name
print("Training Job Name: {}".format(training_job_name))
%%time

estimator.latest_training_job.wait(logs=False)

Display Training Job Metrics

estimator.training_job_analytics.dataframe()

In this example code, we first set up a SageMaker session and get the IAM role used by SageMaker to access AWS resources. We then define the name of the S3 bucket to be used for data storage, as well as the location of the training and validation data, and the location where the trained model will be stored.

We set up a SageMaker estimator using the Amazon SageMaker XGBoost algorithm, and specify hyperparameters such as the objective function and the number of rounds. We also set up data channels for the training and validation data, and start the training job.

This is just a basic example, and there are many other hyperparameters and algorithms that can be used for sentiment analysis on SageMaker. However, this code should give you a starting point for training your own sentiment analysis model on Amazon SageMaker.

Deploy an Endpoint (we are still in ::option 1 😅)

import boto3
import sagemaker

# Set up a SageMaker session
sagemaker_session = sagemaker.Session()

# Define the name of the S3 bucket where the trained model is stored
bucket_name = 'my-sentiment-analysis-bucket'
model_artifact_location = f's3://{bucket_name}/models'

# Define the name of the SageMaker endpoint configuration
endpoint_config_name = 'my-sentiment-analysis-endpoint-config'

# Create the endpoint configuration
xgb_endpoint_config = sagemaker.session.EndpointConfig(
name=endpoint_config_name,
model_name='model.tar.gz',
initial_instance_count=1,
instance_type='ml.m4.xlarge'
)

# Create the endpoint
endpoint_name = 'my-sentiment-analysis-endpoint'
xgb_predictor = estimator.deploy(
initial_instance_count=1, # This in abomination
endpoint_name=endpoint_name,
endpoint_config_name=endpoint_config_name
)

In this example code, we first set up a SageMaker session and define the name of the S3 bucket where the trained model is stored. We also define the name of the SageMaker endpoint configuration, which specifies the name of the model artifact in S3, the number and type of instances to use for the endpoint, and other configuration options.

We create the endpoint configuration by calling sagemaker.session.EndpointConfig(), passing in the name, model_name, initial_instance_count, and instance_type parameters.

Finally, we create the endpoint by calling estimator.deploy() and passing in the initial_instance_count, endpoint_name, and endpoint_config_name parameters.

Once the endpoint is created, we can use it to make predictions on new data by calling xgb_predictor.predict().

Configure Lambda With Endpoint

response = lam.update_function_configuration(
FunctionName=lambda_fn_name_invoke_ep,
Environment={"Variables": {"ENDPOINT_NAME": endpoint_name}}
)

3. Create a Kinesis Data Firehose Delivery Stream

I am guilty of copying this code from the examples available online 🥸

But feel free to change variables like buffering, cloudwatch log groups, S3 Prefix as per your needs.

Note that `DeliveryStreamType=DirectPut` is used in this example.

try:
response = firehose.create_delivery_stream(
DeliveryStreamName=firehose_name,
DeliveryStreamType="DirectPut",
ExtendedS3DestinationConfiguration={
"RoleARN": iam_role_kinesis_arn,
"BucketARN": f"arn:aws:s3:::{bucket}",
"Prefix": "kinesis-data-firehose/",
"ErrorOutputPrefix": "kinesis-data-firehose-error/",
"BufferingHints": {"SizeInMBs": 1, "IntervalInSeconds": 60},
"CompressionFormat": "UNCOMPRESSED",
"CloudWatchLoggingOptions": {
"Enabled": True,
"LogGroupName": "/aws/kinesisfirehose/dsoaws-kinesis-data-firehose",
"LogStreamName": "S3Delivery",
},
"ProcessingConfiguration": {
"Enabled": True,
"Processors": [
{
"Type": "Lambda",
"Parameters": [
{
"ParameterName": "LambdaArn",
"ParameterValue": f"{pred_lambda_fn_arn}:$LATEST",
},
{"ParameterName": "BufferSizeInMBs", "ParameterValue": "1"},
{
"ParameterName": "BufferIntervalInSeconds",
"ParameterValue": "60",
},
],
}
],
},
"S3BackupMode": "Enabled",
"S3BackupConfiguration": {
"RoleARN": iam_role_kinesis_arn,
"BucketARN": f"arn:aws:s3:::{bucket}",
"Prefix": "kinesis-data-firehose-source-record/",
"ErrorOutputPrefix": "!{firehose:error-output-type}/",
"BufferingHints": {"SizeInMBs": 1, "IntervalInSeconds": 60},
"CompressionFormat": "UNCOMPRESSED",
},
"CloudWatchLoggingOptions": {
"Enabled": False,
},
},
)
print(f"Delivery stream {firehose_name} successfully created.")
print(json.dumps(response, indent=4, sort_keys=True, default=str))
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceInUseException":
print(f"Delivery stream {firehose_name} already exists.")
else:
print("Unexpected error: %s" % e)

# Response: Delivery stream adows-kinesis-data-firehose successfully created. {...}

Now we wait until stream is in active status.

# This will take a while
status = ""
while status != "ACTIVE":
r = firehose.describe_delivery_stream(DeliveryStreamName=firehose_name)
description = r.get("DeliveryStreamDescription")
status = description.get("DeliveryStreamStatus")
time.sleep(5)

print(f"Delivery Stream {firehose_name} is active")

# Response: Delivery Stream adows-kinesis-data-firehose is active

Save the ARN for further use.

firehose_arn = r["DeliveryStreamDescription"]["DeliveryStreamARN"]
print(firehose_arn)

4. Create Lambda Destination SNS

Remember in point 2 we talked about that if Kinesis Analytics app detects an anomaly then it will trigger another lambda which will push notification to Amazon SNS.

This is that lambda.

Let’s create a SNS Topic

response = sns.create_topic(
Name="review_anomaly_scores",
)
print(response)
sns_topic_arn = response["TopicArn"]
print(sns_topic_arn)

Now we will create the Lambda function

Our Lambda Function Code lives in ```src/push_notification_to_sns.py``` file, so we create that first.

!touch src/push_notification_to_sns.py

Following are the contents of our Lambda function file:

import base64
import os

import boto3

SNS_TOPIC_ARN = os.environ["SNS_TOPIC_ARN"]

sns = boto3.client("sns")

print("Loading function")


def lambda_handler(event, context):
output = []
success = 0
failure = 0
highest_score = 0

print(f"event: {event}")
r = event["records"]
print(f"records: {r}")
print(f"type_records: {type(r)}")

for record in event["records"]:
try:
# Uncomment the below line to publish the decoded data to the SNS topic.
payload = base64.b64decode(record["data"])
print(f"payload: {payload}")
text = payload.decode("utf-8")
print(f"text: {text}")
score = float(text)
if (score != 0) and (score > highest_score):
highest_score = score
print(f"New highest_score: {highest_score}")
# sns.publish(
# TopicArn=SNS_TOPIC_ARN,
# Message='New anomaly score: {}'.format(text),
# Subject='New Reviews Anomaly Score Detected'
# )
output.append({"recordId": record["recordId"], "result": "Ok"})
success += 1
except Exception as e:
print(e)
output.append({"recordId": record["recordId"], "result": "DeliveryFailed"})
failure += 1
if highest_score != 0:
sns.publish(
TopicArn=SNS_TOPIC_ARN,
Message=f"New anomaly score: {str(highest_score)}",
Subject="New Reviews Anomaly Score Detected",
)
print(
f"Successfully delivered {success} records, failed to deliver {failure} records"
)
return {"records": output}

Zip the code

!zip src/PushNotificationToSNS.zip src/push_notification_to_sns.py

Load the zip as binary

with open("src/PushNotificationToSNS.zip", "rb") as f:
code = f.read()

And now we Create The Lambda Function itself:

try:
response = lam.create_function(
FunctionName=f"{lambda_fn_name_sns}",
Runtime="python3.9",
Role=f"{iam_role_lambda_arn}",
Handler="src/push_notification_to_sns.lambda_handler",
Code={"ZipFile": code},
Description="Deliver output records from Kinesis Analytics application to CloudWatch.",
Timeout=300,
MemorySize=128,
Publish=True,
)
print(f"Lambda Function {lambda_fn_name_sns} successfully created.")

except ClientError as e:
if e.response["Error"]["Code"] == "ResourceConflictException":
response = lam.update_function_code(
FunctionName=f"{lambda_fn_name_sns}",
ZipFile=code,
Publish=True,
DryRun=False,
)
print(f"Updating existing Lambda Function {lambda_fn_name_sns}. This is OK.")
else:
print(f"Error: {e}")

# Response: Lambda Function PushNotificationToSNS successfully created.
response = lam.get_function(FunctionName=lambda_fn_name_sns)

lambda_fn_arn_sns = response["Configuration"]["FunctionArn"]
print(lambda_fn_arn_sns)

Update Lambda Function with SNS Topic ARN

response = lam.update_function_configuration(
FunctionName=lambda_fn_name_sns,
Environment={"Variables": {"SNS_TOPIC_ARN": sns_topic_arn}},
)

5. Create Kinesis Data Analytics App

This is an important step for our anomaly detection service.

Amazon Kinesis Data Analytics is a fully managed service that makes it easy to analyze streaming data in real-time with SQL or Java. Kinesis Data Analytics enables you to process and analyze data streams from various sources such as Kinesis Data Streams, Kinesis Data Firehose, and other sources.

Kinesis Data Analytics provides an interactive SQL editor for creating and running SQL queries on streaming data in real-time. The service also supports Java code for more complex stream processing tasks. Kinesis Data Analytics automatically provisions and scales the necessary compute resources for processing and analyzing data streams.

You can also run Apache Flink application to process streaming data on Kinesis Analytics app.

A word of caution to the reader:

Since the time I began thinking to write this demo, to the time I actually finished writing it, AWS changed their Kinesis Analytics App UI. And they have put SQL App under legacy 😔

Below code still works at the time of publishing this, but I do not know when it will stop working. But there are plethora of resources available online to help you on any topic and you can also ask ChatGPT 🤖

kinesis_data_analytics_app_name = "adoaws-kinesis-data-analytics-sql-app"
in_app_stream_name = "SOURCE_SQL_STREAM_001" # Default
print(in_app_stream_name)

Create Application

sql_code = """ \
CREATE OR REPLACE STREAM "ANOMALY_SCORE_SQL_STREAM" (anomaly_score DOUBLE); \
CREATE OR REPLACE PUMP "ANOMALY_SCORE_STREAM_PUMP" AS \
INSERT INTO "ANOMALY_SCORE_SQL_STREAM" \
SELECT STREAM anomaly_score \
FROM TABLE(RANDOM_CUT_FOREST( \
CURSOR(SELECT STREAM "star_rating" \
FROM "{}" \
) \
) \
); \
""".format(
in_app_stream_name
)
try:
response = kinesis_analytics.create_application(
ApplicationName=kinesis_data_analytics_app_name,
Inputs=[
{
"NamePrefix": "SOURCE_SQL_STREAM",
"KinesisFirehoseInput": {
"ResourceARN": f"{firehose_arn}",
"RoleARN": f"{iam_role_kinesis_arn}",
},
"InputProcessingConfiguration": {
"InputLambdaProcessor": {
"ResourceARN": f"{pred_lambda_fn_arn}",
"RoleARN": f"{iam_role_lambda_arn}",
}
},
"InputSchema": {
"RecordFormat": {
"RecordFormatType": "CSV",
"MappingParameters": {
"CSVMappingParameters": {
"RecordRowDelimiter": "\n",
"RecordColumnDelimiter": "\t",
}
},
},
"RecordColumns": [
{
"Name": "review_id",
"Mapping": "review_id",
"SqlType": "VARCHAR(14)",
},
{
"Name": "star_rating",
"Mapping": "star_rating",
"SqlType": "INTEGER",
},
{
"Name": "review_body",
"Mapping": "review_body",
"SqlType": "VARCHAR(65535)",
},
],
},
},
],
Outputs=[
{
"Name": "ANOMALY_SCORE_SQL_STREAM",
"LambdaOutput": {
"ResourceARN": f"{lambda_fn_arn_sns}",
"RoleARN": f"{iam_role_kinesis_arn}",
},
"DestinationSchema": {"RecordFormatType": "CSV"},
},
],
ApplicationCode=sql_code,
)
print(f"SQL application {kinesis_data_analytics_app_name} successfully created.")
print(json.dumps(response, indent=4, sort_keys=True, default=str))
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceInUseException":
print(f"SQL App {kinesis_data_analytics_app_name} already exists.")
else:
print("Unexpected error: %s" % e)

You will notice that in the schema I have mentioned three columns,

  1. review_id — this is just for brevity,
  2. star_rating — this will be our lambda’s prediction
  3. review_body — this will be our input
response = kinesis_analytics.describe_application(
ApplicationName=kinesis_data_analytics_app_name
)
print(json.dumps(response, indent=4, sort_keys=True, default=str))

Let’s note the input_id which will be used when starting the Data Analytics app:

input_id = response["ApplicationDetail"]["InputDescriptions"][0]["InputId"]
print(input_id)

Start the Kinesis Data Analytics App

try:
response = kinesis_analytics.start_application(
ApplicationName=kinesis_data_analytics_app_name,
InputConfigurations=[
{
"Id": input_id,
"InputStartingPositionConfiguration": {"InputStartingPosition": "NOW"},
}
],
)
print(json.dumps(response, indent=4, sort_keys=True, default=str))
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceInUseException":
print(f"Application {kinesis_data_analytics_app_name} is already starting.")
else:
print(f"Error: {e}")

Wait for Kinesis Data Analytics App to be in RUNNING state

response = kinesis_analytics.describe_application(ApplicationName=kinesis_data_analytics_app_name)

%%time

import time

app_status = response["ApplicationDetail"]["ApplicationStatus"]
print("Application status {}".format(app_status))

while app_status != "RUNNING":
time.sleep(5)
response = kinesis_analytics.describe_application(ApplicationName=kinesis_data_analytics_app_name)
app_status = response["ApplicationDetail"]["ApplicationStatus"]
print("Application status {}".format(app_status))

print("Application status {}".format(app_status))

When I checked recently Amazon has improved Kinesis Data Analytics app to be more robust and API friendly.

This will help you monitor what is happening inside the app as well as get better control like stopping and deleting the app through UI itself (earlier UI deletion was buggy 🫤).

6. Put reviews on Kinesis Data Firehose

Moment of truth, let’s put some records onto Firehose. We will put only bad records because we are impatient 😅 and too lazy to get any real dataset.

firehoses = firehose.list_delivery_streams(DeliveryStreamType="DirectPut")

firehose_response = firehose.describe_delivery_stream(DeliveryStreamName=firehose_name)
%%time

step = 1

for _ in range(0, 10, step):

timestamp = int(time.time())

df_anomalies = pd.DataFrame(
[
{
"review_id": str(timestamp),
"review_body": "This make me want to puke till eternity.",
},
],
columns=["review_id", "star_rating", "review_body"],
)

reviews_tsv_anomalies = df_anomalies.to_csv(sep="\t", header=None, index=False)

response = firehose.put_record(
Record={"Data": reviews_tsv_anomalies.encode("utf-8")},
DeliveryStreamName=firehose_name,
)

Well if you have a real anomaly detection model, this will change to match your schema or you can also use IMDB dataset of 50K movie reviews to play with your sentiment analysis model. Keep in mind your AWS cost though 🫣.

Fellow engineer tip, set up AWS Budgets on your personal account to prevent any unwanted surprises.

If everything worked is intended, then you should:

  1. In your Kinesis S3 bucket (mine was Sagemaker default), you should see three folders, that will have records with predictions, source records (without predictions) and also firehose errors

2. You should also see Lambda function PushNotificationToSNS logs in CloudWatch (indicating records reached the end)

3. Check the GetPredFromKinesis Lambda CloudWatch Log groups for any errors:

4. You can also monitor the PushNotificationToSNS Lambda function logs in Lambda Functions UI for invocations:

If you have any subscribers for your SNS Topic, after confirming those subscribers you should see some action there too.

7. Release resources

This is as important as any other step.

Please release resources like your sagemaker studio instances (if you are using sagemaker studio), delete Kinesis Data Analytics App and Firehose Data stream.

You can also delete SNS Topic and Lambda Functions (though they don’t cost anything at-rest, but if you don’t plan to use them then you can delete them).

Other use cases

  1. You can detect average review rating of your products to detection a behavioural drift
  2. User real time feedback from social media on newly launched product or events to gauge public feedback
  3. Detect users with malicious intent or people who are trying to abuse the system in near real-time
  4. And many more…

Hi 👋, I am Mahesh.

I am a Machine Learning Engineer and I write about how statistics and machine learning can help businesses.

Connect with me on LinkedIn: https://www.linkedin.com/in/maheshrajput

--

--