项目展示

使用时间序列机器学习进行预测分析

2026-01-27 13:08:57

使用时间序列机器学习进行预测分析基于 Amazon Timestream

作者 Balwanth Bobilli Norbert Funke Renuka Uttarala发布日期 2024年2月16日分类 高级 (300)、Amazon SageMaker、Amazon Timestream、技术指南永久链接评论

关键要点

传统的反应式容量规划方式在动态环境中效果有限,需采用预测分析以实现主动监测。Amazon Timestream 是一种高效、可扩展的无服务器时间序列数据库,便于存储和分析海量事件。通过 Amazon SageMaker 的内置算法,用户可以轻松进行时间序列数据的预测分析。本文详细介绍了如何使用 SageMaker 和 Timestream 平台进行数据分析与模型训练。

在现代基础设施的动态变化中,大型应用程序的容量规划常常面临挑战。传统的反应式方法例如,基于静态阈值来监控 CPU 和内存等 DevOps 指标在这种环境中变得不足。本文将展示如何通过使用 Amazon Timestream 和 Amazon SageMaker 的内置算法,对聚合的 DevOps 数据CPU、内存、每秒事务数进行预测分析,从而实现主动的容量规划,预防潜在的业务中断。这种方法适用于在 Timestream 中存储的任何时间序列数据的机器学习运算。

Timestream 是一个快速、可扩展、无服务器的时间序列数据库服务,能够轻松存储和分析每天产生的数万亿事件。Timestream 会自动进行扩展与收缩,从而调整容量和性能,大大简化了底层基础设施的管理。

而 SageMaker 是一项完全托管的机器学习ML服务。借助 SageMaker,数据科学家和开发者可以快速构建和训练 ML 模型,并直接将其部署到一个准备好的生产环境中。它提供了一个综合的 Jupyter 创作 notebook 实例,快速访问数据源进行探索和分析,不需要用户管理服务器。同时它提供了一些优化的 ML 算法,能够在分布式环境下高效地处理极大数据量。

解决方案概述

DevOps 团队可以使用 Timestream 存储度量、日志及其他时间序列数据。然后可以查询这些数据,获取系统行为的洞察。Timestream 能以较低延迟处理大量数据,这使团队可以进行实时分析,以快速做出决策。

以下参考架构展示了如何将 Timestream 用于 DevOps 用例:

解决方案的主要组件包括:

从云中和本地应用程序以及服务器收集的遥测数据,使用开源采集代理如 Prometheus 和 Telegraf进行引入。还可以通过 Amazon Managed Streaming for Apache Kafka Amazon MSK和 Amazon Kinesis 通过 Amazon Managed Service for Apache Flink 进行数据引入。数据采集后,您可以使用可视化工具,如 Grafana 和 Amazon QuickSight,以及其他使用 JDBC 和 ODBC 驱动程序的工具来分析和展示数据。使用 SageMaker 进行预测分析,详细信息请参考 Amazon SageMaker。

前提条件

阅读本文之前,您应了解 Timestream、SageMaker、Amazon Simple Storage Service (Amazon S3)、AWS 身份和访问管理 (IAM) 和 Python 的基本概念。本文包括一个关于如何使用 AWS CloudFormation 模板和 Jupyter notebook 的实践实验,以便配置和与相关的 AWS 服务进行交互。您需要一个具有必要 IAM 权限的 AWS 账户。

启动实践实验

完成以下步骤以启动实践实验:

启动 CloudFormation 堆栈:注意:该解决方案创建了会在您的账户中产生费用的 AWS 资源,因此确保在完成后删除堆栈。

提供堆栈名称并保持所有选项为默认值。 此堆栈创建一个 Timestream 数据库和表,并引入示例聚合的 DevOps 数据。同时,它还创建一个 SageMaker notebook 实例 和 S3 存储桶。

当堆栈完成时,记下 AWS CloudFormation 控制台的 Outputs 标签中列出的 notebook 实例和 S3 存储桶名称。 我们使用 SageMaker notebook 实例准备来自 Timestream 的数据、训练 ML 模型并进行预测。

要访问 notebook 实例,请导航至 SageMaker 控制台并在导航面板中选择 Notebook instances。

打开由 CloudFormation 堆栈创建的实例。当 notebook 状态为 InService 时,选择 Open Jupyter。 以下示例显示了一个名为 TimeseriesDataAnalysis 的 notebook 实例。

选择 timestreampredictiveanalysisipynb 并标记为信任。

准备数据用于分析

现在,您可以开始分析数据并通过运行 notebook 中的单元格为 训练 做准备。请完成以下步骤:

以下代码设置了一个 SageMaker 会话,并创建了 Amazon S3 和 Timestream 客户端。同时,还安装了 Amazon SageMaker Data Wrangler 库,该库扩展了 pandas 库在 AWS 上的功能,可连接 DataFrame 和 AWS 数据及分析服务,便于快速整合 Timestream 和其他 AWS 服务。

python import time import numpy as np import pandas as pd import json import matplotlibpyplot as plt import boto3 import sagemaker from sagemaker import getexecutionrole from IPython import display pip install awswrangler import awswrangler as wr

nprandomseed(1)

# 设置 SageMaker 会话 prefix = sagemaker/DEMOdeepar sagemakersession = sagemakerSession() role = getexecutionrole() bucket = sagemakersessiondefaultbucket()

# 设置 S3 存储桶路径以上传训练数据集 s3datapath = f{bucket}/{prefix}/data s3outputpath = f{bucket}/{prefix}/output print(s3datapath) print(s3outputpath)

# 设置 S3 客户端 s3client = boto3client(s3)

# Timestream 配置 DBNAME = DemoPredictiveAnalysis # lt 指定在 Amazon Timestream 中创建的数据库 TABLENAME = TelemetryAggregatedData # lt 指定在 Amazon Timestream 中创建的表

timestreamclient = boto3client(timestreamquery)

记下此步骤结束时 S3 存储桶路径的输出。 在分析完成后,您可以删除这些存储桶。

从 Timestream 查询数据:

python query = SELECT FROM DemoPredictiveAnalysisTelemetryAggregatedData

result = wrtimestreamquery(sql=query paginationconfig={PageSize 1000}) displaydisplay(result)

可视化时间序列数据:

python labels = [cpu memory tps]

cpuseries = pdSeries(data=result[cpuavg]values index=pdtodatetime(result[time])) memoryseries = pdSeries(data=result[memoryavg]values index=pdtodatetime(result[time])) tpsseries = pdSeries(data=result[tpsavg]values index=pdtodatetime(result[time]))

# 将所有系列收集到列表中 timeseries = [cpuseries memoryseries tpsseries]

for k in range(len(timeseries)) print(fntGraph {labels[k]}) timeseries[k]plot(label=labels[k]) pltlegend(loc=lower right) pltshow()

以下是 CPU 使用率的图示。

以下是内存使用率的图示。

以下是每秒事务数 (TPS) 的图示。

轻蜂加速

解决方案使用了 SageMaker DeepAR 预测算法,选择此算法是因为其在使用递归神经网络 (RNN) 预测一维时间序列数据方面的有效性。DeepAR 能够适应不同的时间序列模式,是一种多用途且强大的选择。它采用监督学习方法,使用带标签的历史数据进行训练,并利用 RNN 结构的优势来捕获顺序数据中的时间依赖关系。

使用以下 DeepAR 超参数 来初始化 ML 实例:

python freq = H # 时间单位为小时 predictionlength = 48 contextlength = 72 datalength = 400 numts = 2 period = 24

hyperparameters = { timefreq freq contextlength str(contextlength) predictionlength str(predictionlength) numcells 40 numlayers 3 likelihood gaussian epochs 20 minibatchsize 32 learningrate 0001 dropoutrate 005 earlystoppingpatience 10 }

从之前的图表中您可以看到,三个指标的模式相似。因此,我们只使用 CPU 指标进行训练。然而,我们可以使用训练好的模型预测 CPU 之外的其他指标。如果数据模式不同,则必须分别训练每个数据集并相应地进行预测。

我们有大约16天的数据,以24小时为周期。我们用前14天的数据训练模型,使用3天72小时的上下文窗口,并用最后2天48小时验证我们的预测。

使用时间序列机器学习进行预测分析 训练数据是到最后两天48小时的数据的第一部分:

python timeseriestraining = [] for ts in timeseries timeseriestrainingappend(ts[predictionlength]) timeseries[0]plot(label=test title=cpu) timeseriestraining[0]plot(label=train ls=) pltlegend() pltshow()

以下图示显示了数据并将其与测试数据叠加。

接下来的步骤将数据格式化为 DeepAR 输入格式,以便用于训练模型。然后将数据集保存到 Amazon S3 中。

python def seriestoobj(ts cat=None) obj = {start str(tsindex[0]) target list(ts)} if cat is not None obj[cat] = cat return obj

def seriestojsonline(ts cat=None) return jsondumps(seriestoobj(ts cat))

encoding = utf8 FILETRAIN = trainjson FILETEST = testjson with open(FILETRAIN wb) as f for ts in timeseriestraining fwrite(seriestojsonline(ts)encode(encoding)) fwrite(nencode(encoding))

with open(FILETEST wb) as f for ts in timeseries fwrite(seriestojsonline(ts)encode(encoding)) fwrite(nencode(encoding))

s3 = boto3client(s3) s3uploadfile(FILETRAIN bucket prefix /data/train/ FILETRAIN) s3uploadfile(FILETEST bucket prefix /data/test/ FILETRAIN)

您可以通过导航到 Amazon S3 控制台,查看之前创建的存储桶例如,s3//sagemaker/sagemaker/DEMOdeepar/data来验证文件 testjson 和 trainjson。

使用 DeepAR 预测算法训练模型

此步骤将 训练模型,使用通用估算器并启动一个 ML 实例实例类型为 mlc4xlarge,使用包含 DeepAR 算法的 SageMaker 镜像:

pythonimageuri = sagemakerimageurisretrieve(forecastingdeepar boto3Session()regionname)estimator = sagemakerestimatorEstimator( sagemakersession=sagemakersession imageuri=imageuri role=role instancecount=1 instancetype=mlc4xlarge basejobname=DEMOdeepar outputpath=fs3//{s3outputpath})estimatorsethyperparameters(hyperparameters)

datachannels = {train fs3//{s3datapath}/train/ test fs3//{s3datapath}/test/}

estimatorfit(inputs=data

使用 AWS IoT Core 和 LoRaWAN 冷链传感器减少食物浪费和食源性疾病 亚马逊网络服
使用 AWS IoT Core 和 LoRaWAN 冷链传感器减少食物浪费和食源性疾病 亚马逊网络服
2026-01-27
在 Amazon API Gateway 实施多区域故障转移 计算博客
在 Amazon API Gateway 实施多区域故障转移 计算博客
2026-01-27