AWS Database Blog

Predictive Analytics with Time-series Machine Learning on Amazon Timestream

Capacity planning for large applications can be difficult due to constantly changing requirements and the dynamic nature of modern infrastructures. Traditional reactive approaches, for instance, relying on static thresholds for some DevOps metrics like CPU and memory, fall short in such environments. In this post, we show how you can perform predictive analysis on aggregated DevOps data (CPU, memory, transactions per second) stored in Amazon Timestream by using Amazon SageMaker built-in algorithms. This enables proactive capacity planning to prevent potential business interruptions. This approach can be used to run machine learning on any time-series data stored in Timestream with SageMaker.

Timestream is a fast, scalable, and serverless time-series database service that makes it simple to store and analyze trillions of events per day. Timestream automatically scales up or down to adjust capacity and performance, so that you don’t have to manage the underlying infrastructure.

SageMaker is a fully managed machine learning (ML) service. With SageMaker, data scientists and developers can quickly and effortlessly build and train ML models, and then directly deploy them into a production-ready hosted environment. It provides an integrated Jupyter authoring notebook instance for quick access to your data sources for exploration and analysis, so you don’t have to manage servers. It also provides common ML algorithms that are optimized to run efficiently against extremely large data in a distributed environment.

Solution overview

DevOps teams can use Timestream to store metrics, logs, and other time-series data. You can then query this data to gain insights into your systems’ behaviors. Timestream can handle high volumes of incoming data with low latency, which enables teams to perform real-time analytics. DevOps teams can analyze performance metrics and other operational data in real time to facilitate quick decision-making.

The following reference architecture shows how you can use Timestream for DevOps use cases.

The solution consists of the following key components:

Prerequisites

To follow this post, you should be familiar with the key concepts of Timestream, SageMaker, Amazon Simple Storage Service (Amazon S3), AWS Identity and Access Management (IAM), and Python. This post also includes a hands-on lab for using an AWS CloudFormation template and Jupyter notebook to provision and interact with the relevant AWS services. An AWS account with the necessary IAM privileges is required.

Launch the hands-on lab

Complete the following steps to launch the hands-on lab:

  1. Launch the CloudFormation stack:
    Launch Stack
    Note: This solution creates AWS resources that incur costs on your account, make sure you delete the stack once you’re done.
  2. Provide a stack name and keep all other options as default.
    This stack creates a Timestream database and table, and ingests sample aggregated DevOps data. It also creates a SageMaker notebook instance and S3 bucket.
  3. When the stack is complete, make a note of the notebook instance and S3 bucket name listed on the stack’s Outputs tab on the AWS CloudFormation console.
    We use the SageMaker notebook instance to prepare data from Timestream, train an ML model, and run predictions.
  4. To access to the notebook instance, navigate to the SageMaker console and choose Notebook instances in the navigation pane.
  5. Open the instance created by the CloudFormation stack.
  6. When the status of the notebook is InService, choose Open Jupyter.
    The following example shows a notebook instance called TimeseriesDataAnalysis.

  7. Choose timestream_predictive_analysis.ipynb and mark it has trusted.

Prepare data for analysis

You can now start analyzing the data and preparing it for training by running the cells in the notebook. Complete the following steps:

  1. The following code sets up a SageMaker session and creates Amazon S3 and Timestream clients. It also installs the Amazon SageMaker Data Wrangler library, which extends the power of the pandas library to AWS, connecting DataFrames and AWS data and analytics services to provide quick integration to Timestream and many other AWS services.
    import time
    import numpy as np
    import pandas as pd
    import json
    import matplotlib.pyplot as plt
    import boto3
    import sagemaker
    from sagemaker import get_execution_role
    from IPython import display
    %pip install awswrangler
    import awswrangler as wr
    
    np.random.seed(1)
    
    # Setting up Sagemaker session
    prefix = "sagemaker/DEMO-deepar"
    sagemaker_session = sagemaker.Session()
    role = get_execution_role()
    bucket = sagemaker_session.default_bucket()
    
    # setting S3 bucket path to upload training datasets 
    s3_data_path = f"{bucket}/{prefix}/data"
    s3_output_path = f"{bucket}/{prefix}/output"
    print(s3_data_path)
    print(s3_output_path)
    
    # Setting up S3 client 
    s3_client = boto3.client('s3') 
    
    # Timestream Configurations.  
    DB_NAME = "Demo_Predictive_Analysis" # <--- specify the database created in Amazon Timestream
    TABLE_NAME = "Telemetry_Aggregated_Data" # <--- specify the table created in Amazon Timestream
    
    timestream_client = boto3.client('timestream-query')
  2. Make a note of the S3 bucket path’s output at the end of this step.
    You can delete these buckets after the analysis is complete.
  3. Query data from Timestream:
    query = """
    SELECT *
    FROM "Demo_Predictive_Analysis"."Telemetry_Aggregated_Data" 
    """
    
    result = wr.timestream.query(sql=query,pagination_config={'PageSize': 1000})
    display.display(result)

  4. Visualize the time-series data:
    labels = ['cpu', 'memory', 'tps']
    
    cpu_series = pd.Series(data = result['cpu_avg'].values, index = pd.to_datetime(result['time']))
    memory_series = pd.Series(data = result['memory_avg'].values, index = pd.to_datetime(result['time']))
    tps_series = pd.Series(data = result['tps_avg'].values, index = pd.to_datetime(result['time']))
    
    ## collect all series in list
    time_series = []
    time_series.append(cpu_series)
    time_series.append(memory_series)
    time_series.append(tps_series)
    
    for k in range(len(time_series)):
    
        print(f'-------------------------------------------\n\tGraph {labels[k]}')
        time_series[k].plot(label = labels[k])
        plt.legend(loc='lower right')
        plt.show()

    The following is a plot of CPU usage.

    The following is a plot of memory usage.

    The following is a plot of transactions per second (TPS).

    The solution uses the SageMaker DeepAR forecasting algorithm, which has been chosen because of its effectiveness in predicting one-dimensional time-series data using recurrent neural networks (RNN). DeepAR stands out for its ability to adapt to diverse time-series patterns, making it a versatile and powerful choice. It employs a supervised learning approach, using labeled historical data for training, and takes advantage of the strengths of RNN architecture to capture temporal dependencies in sequential data.

  5. Use the following DeepAR hyperparameters to initialize the ML instance:
    freq = "H" ## time measured in hours
    prediction_length = 48
    context_length = 72
    data_length = 400
    num_ts = 2
    period = 24 
    
    hyperparameters = {
        "time_freq": freq,
        "context_length": str(context_length),
        "prediction_length": str(prediction_length),
        "num_cells": "40",
        "num_layers": "3",
        "likelihood": "gaussian",
        "epochs": "20",
        "mini_batch_size": "32",
        "learning_rate": "0.001",
        "dropout_rate": "0.05",
        "early_stopping_patience": "10",
    }
    

    Looking at the earlier graphs, you’ll notice that the pattern looks similar for all three metrics. Because of this, we just use the CPU metrics for training. However, we can use the trained model to predict other metrics besides CPU. If the pattern of data is different, then we have to train each dataset separately and predict accordingly.

    We have about 16 days of data in a 24-hour cycle period. We train the model with the first 14 days over a 3-days (72-hours) context window, and we use the last 2 days (48 hours) for testing our predictions.

  6. Training data is the first part of the data up to the last 2 days (48-hours):
    time_series_training = []
    for ts in time_series:
        time_series_training.append(ts[:-prediction_length])
    time_series[0].plot(label="test", title = "cpu")
    time_series_training[0].plot(label="train", ls=":")
    plt.legend()
    plt.show()

    The following plot shows the data and overlays it with the test data.

  7. This next step formats the data according to the DeepAR input format so that it can be used for training the model. The step then saves the dataset to Amazon S3.
    def series_to_obj(ts, cat=None):
        obj = {"start": str(ts.index[0]), "target": list(ts)}
        if cat is not None:
            obj["cat"] = cat
        return obj
    
    def series_to_jsonline(ts, cat=None):
        return json.dumps(series_to_obj(ts, cat))
    
    encoding = "utf-8"
    FILE_TRAIN = "train.json"
    FILE_TEST = "test.json"
    with open(FILE_TRAIN, "wb") as f:
        for ts in time_series_training:
            f.write(series_to_jsonline(ts).encode(encoding))
            f.write("\n".encode(encoding))
    
    with open(FILE_TEST, "wb") as f:
        for ts in time_series:
            f.write(series_to_jsonline(ts).encode(encoding))
            f.write("\n".encode(encoding))
    s3 = boto3.client("s3")
    s3.upload_file(FILE_TRAIN, bucket, prefix + "/data/train/" + FILE_TRAIN)
    s3.upload_file(FILE_TEST, bucket, prefix + "/data/test/" + FILE_TRAIN)

You can validate the files test.json and train.json by navigating to the Amazon S3 console and looking for the bucket that was created earlier (for example, s3://sagemaker-<region>-<account_number>/sagemaker/DEMO-deepar/data).

Train the model with the DeepAR forecasting algorithm

This step trains the model using a generic estimator. It launches an ML instance (instance type ml.c4.xlarge) using a SageMaker image containing the DeepAR algorithm:

image_uri = sagemaker.image_uris.retrieve("forecasting-deepar", boto3.Session().region_name)
estimator = sagemaker.estimator.Estimator(
    sagemaker_session=sagemaker_session,
    image_uri=image_uri,
    role=role,
    instance_count=1,
    instance_type="ml.c4.xlarge",
    base_job_name="DEMO-deepar",
    output_path=f"s3://{s3_output_path}",
)
estimator.set_hyperparameters(**hyperparameters)

data_channels = {"train": f"s3://{s3_data_path}/train/", "test": f"s3://{s3_data_path}/test/"}

estimator.fit(inputs=data_channels)

Wait for the model training to finish (around 5 minutes) before running predictions.

When the training job is complete, you will see the following response.

Generate predictive insights

With the model training phase is successfully complete, the next step involves initiating the prediction instance by deploying an endpoint.

  1. Deploy the endpoint with the following code:
    job_name = estimator.latest_training_job.name
    
    endpoint_name = sagemaker_session.endpoint_from_job(
        job_name=job_name,
        initial_instance_count=1,
        instance_type="ml.m4.xlarge",
        image_uri=image_uri,
        role=role,
    )

    Launching the instance can take some time. Initially, there is only one hyphen (–) shown in the output. Wait until the status line finishes with an exclamation mark (!).

  2. Use the following helper class to run predictions:
    class DeepARPredictor(sagemaker.predictor.RealTimePredictor):
        def set_prediction_parameters(self, freq, prediction_length):
            """Set the time frequency and prediction length parameters. This method **must** be called
            before being able to use `predict`.
    
            Parameters:
            freq -- string indicating the time frequency
            prediction_length -- integer, number of predicted time points
    
            Return value: none.
            """
            self.freq = freq
            self.prediction_length = prediction_length
    
        def predict(
            self,
            ts,
            cat=None,
            encoding="utf-8",
            num_samples=100,
            quantiles=["0.1", "0.5", "0.9"],
            content_type="application/json",
        ):
            """Requests the prediction of for the time-series listed in `ts`, each with the (optional)
            corresponding category listed in `cat`.
    
            Parameters:
            ts -- list of `pandas.Series` objects, the time-series to predict
            cat -- list of integers (default: None)
            encoding -- string, encoding to use for the request (default: "utf-8")
            num_samples -- integer, number of samples to compute at prediction time (default: 100)
            quantiles -- list of strings specifying the quantiles to compute (default: ["0.1", "0.5", "0.9"])
    
            Return value: list of `pandas.DataFrame` objects, each containing the predictions
            """
            prediction_times = [x.index[-1] + pd.Timedelta(1, unit=self.freq) for x in ts]
            req = self.__encode_request(ts, cat, encoding, num_samples, quantiles)
            res = super(DeepARPredictor, self).predict(req, initial_args={"ContentType": content_type})
            return self.__decode_response(res, prediction_times, encoding)
    
        def __encode_request(self, ts, cat, encoding, num_samples, quantiles):
            instances = [series_to_obj(ts[k], cat[k] if cat else None) for k in range(len(ts))]
            configuration = {
                "num_samples": num_samples,
                "output_types": ["quantiles"],
                "quantiles": quantiles,
            }
            http_request_data = {"instances": instances, "configuration": configuration}
            return json.dumps(http_request_data).encode(encoding)
    
        def __decode_response(self, response, prediction_times, encoding):
            response_data = json.loads(response.decode(encoding))
            list_of_df = []
            for k in range(len(prediction_times)):
                prediction_index = pd.date_range(
                    start=prediction_times[k], freq=self.freq, periods=self.prediction_length
                )
                list_of_df.append(
                    pd.DataFrame(
                        data=response_data["predictions"][k]["quantiles"], index=prediction_index
                    )
                )
            return list_of_df
        
    predictor = DeepARPredictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker_session)
    predictor.set_prediction_parameters(freq, prediction_length)
    
    list_of_df = predictor.predict(time_series_training[:3], content_type="application/json")
    actual_data = time_series[:3]
  3. Finally, you can visualize the results:
    for k in range(len(list_of_df)):
        print(f'-------------------------------------------\n\tPrediction {labels[k]}')
        plt.figure(figsize=(12, 6))
        actual_data[k][-prediction_length - context_length :].plot(label=f'target - {labels[k]}')
        p10 = list_of_df[k]["0.1"]
        p90 = list_of_df[k]["0.9"]
        plt.fill_between(p10.index, p10, p90, color="y", alpha=0.5, label="80% confidence interval")
        list_of_df[k]["0.5"].plot(label="prediction median")
        plt.legend()
        plt.show()

    The following plot shows our CPU prediction.

    The following plot shows our memory prediction.

    The following plot shows our TPS prediction.

  4. Delete the endpoint
    sagemaker_session.delete_endpoint(endpoint_name)

The outcome of our prediction closely matches with the test data. We can use these predictions to plan capacity. You can follow the steps in this post to seamlessly extend this solution to predict other time-series data stored in Timestream. It provides a flexible and applicable solution for users looking to apply accurate predictions across a spectrum of time-series datasets in real-world scenarios.

Aggregations in Timestream

In general, it’s best practice to aggregate time-series data at a lower frequency before training a model. Using raw data can make the model run slow and be less accurate.

With the Timestream scheduled query feature, you can aggregate data and store it in a different Timestream table. You can use scheduled queries for business reports that summarize the end-user activity from your applications, so you can train ML models for personalization. You can also use scheduled queries for alarms that detect anomalies, network intrusions, or fraudulent activity, so you can take immediate remedial actions. The following is a sample SQL query that can be run as a scheduled query to aggregate/upsample data in 1-hour time intervals:

select
microservice_name,
region,
'aggregate_host_metric' as measure_name,
bin(time, 1h) as time,
round(avg(memory),2) as memory_avg,
round(avg(tps),2) as tps_avg,
round(avg(cpu),2) as cpu_avg
from “Demo”.”source_metrics”
group by microservice_name, region, bin(time, 1h)

Clean up

To avoid incurring charges, use the AWS Management Console to delete the resources that you created while running this exercise:

  1. Delete the SageMaker resources and S3 buckets created outside of the CloudFormation stack.
  2. Empty the S3 bucket you created, so you don’t face issues while deleting the stack.
  3. Delete the CloudFormation stack you created for this solution.

Conclusion

In this post, we showed you how to improve capacity planning by running predictive analysis on DevOps time-series data stored in Timestream using the SageMaker DeepAR algorithm. By combining the capabilities of SageMaker and Timestream, you can make predictions and gain valuable insights for your time-series datasets.

For more information about Timestream aggregation, refer to Queries with aggregate functions. For advanced time-series analytical functions, see Time-series functions. To learn more about using the DeepAR algorithm, refer to Best practices for using the DeepAR Algorithm.

We welcome your feedback. If you have questions or suggestions, leave them in the comment section.


About the Authors

Balwanth Reddy Bobilli is a Timestream Specialist Solutions Architect at AWS based out of Utah. Prior to joining AWS, he worked at Goldman Sachs as a Cloud Database Architect. He is passionate about databases and cloud computing. He has great experience in building secure, scalable, and resilient solutions in cloud, specifically with cloud databases.

Norbert Funke is a Sr. Timestream Specialist Solutions Architect at AWS based out of New York. Prior to joining AWS, he was working for a data consulting company owned by PwC on data architecture and data analytics.

Renuka Uttarala is a Senior Leader at AWS since 2019, leading global teams specializing in data services architecture solutions. She has 20+ years of IT industry experience specialized in Advanced Analytics and Data Science fields. Prior to joining AWS, she worked in product development, enterprise architecture, and solution engineering leadership roles at various companies, including HCL Technologies, Amdocs Openet, Warner Bros. Discovery, and Oracle Corporation.