Kaggle实战Pytorch Forecasting => TemporalFusionTransformer 保姆级0基础代码逐行解析
代码地址:
Pytorch Forecasting => TemporalFusionTransformer
DataFrame 是 pandas 库中的一种数据结构,用于存储和处理二维表格数据。它类似于电子表格或 SQL 表,具有行和列。每列可以具有不同的数据类型(例如整数、浮点数、字符串等),并且可以通过行标签和列标签进行索引。DataFrame 提供了许多用于数据清洗、转换、分析和可视化的方法。
Loading Data
df_train = pd.read_csv(path / 'train.csv', parse_dates=['date'], infer_datetime_format=True)
df_test = pd.read_csv(path / 'test.csv', parse_dates=['date'], infer_datetime_format=True)
这段代码使用 pandas 库中的 read_csv 函数读取了训练集和测试集文件
它使用了两个参数:‘parse_dates’ 和 ‘infer_datetime_format’。
‘parse_dates’ 参数指定了应将哪些列解析为日期
‘infer_datetime_format’ 参数则指示函数尝试自动推断日期格式。
文件路径由变量 ‘path’ 指定。
最后,读取的数据被存储在名为 ‘df_train’ 的变量中。
‘parse_dates’ 参数用于指定应将哪些列解析为日期。
‘infer_datetime_format’ 参数用于指定是否尝试推断日期时间字符串的格式。如果将其设置为 True,并且启用了 ‘parse_dates’,则 pandas 会尝试推断日期列中日期时间字符串的格式。如果可以推断出来,它会切换到一种更快的解析方法。在某些情况下,这可以将解析速度提高 5-10 倍。
‘parse_dates’ 和 ‘infer_datetime_format’ 这两个参数都会影响如何解析 CSV 文件中的日期时间数据。如果启用了 ‘parse_dates’,则 pandas 会尝试将指定的列解析为日期时间数据。如果同时启用了 ‘infer_datetime_format’,则 pandas 会尝试推断日期时间字符串的格式,从而加快解析速度。这两个参数都可以帮助确保 CSV 文件中的日期时间数据被正确解析并存储在 DataFrame 中。
题外话
df['Date-Time'] = pd.to_datetime(df['Date-Time'])
df['timestamp'] = df['Date-Time'].values.astype(np.int64)
这两行代码将 DataFrame 中名为 ‘Date-Time’ 的列转换为日期时间数据类型,并将其存储在新的 ‘Date-Time’ 列中。然后,它创建了一个名为 ‘timestamp’ 的新列,其中包含 ‘Date-Time’ 列中日期时间数据的时间戳。时间戳是指从 1970 年 1 月 1 日(UTC)以来经过的纳秒数,它们通常用于表示日期时间数据。
将日期时间数据转换为时间戳有几个目的。首先,时间戳是一种数字表示形式,可以更容易地进行数学运算和比较。其次,时间戳通常用于存储和传输日期时间数据,因为它们占用的空间更少且格式统一。最后,许多函数和库都需要或支持使用时间戳来表示日期时间数据,因此将日期时间数据转换为时间戳可以提高代码的兼容性和可用性。
prediction_steps = df_test['date'].nunique()
nunique()是一个pandas函数,用于计算数据框或序列中唯一值的数量。例如,在上面的代码中,df_test[‘date’].nunique()将返回’date’列中不同日期的数量。这个函数通常用于了解数据框或序列中有多少个不同的类别。
df_train = pd.merge(df_train, stores, on='store_nbr', how='left')
这行代码是用Python编写的。它使用pandas库中的merge函数将两个数据框df_train和stores合并在一起。合并是基于’store_nbr’列进行的,这意味着两个数据框中’store_nbr’列的值将被用作匹配键。合并方式为’left’,这意味着结果数据框将保留df_train中的所有行,即使在stores中没有匹配的行也是如此。最后,合并后的数据框被赋值给df_train变量。
holidays = pd.read_csv(path / 'holidays_events.csv', parse_dates=['date'], infer_datetime_format=True)
#Keep only not transferrred holidays
holidays = holidays.loc[holidays['transferred'] == False]
holidays_nat = holidays[holidays['locale']=='National'].drop_duplicates(subset='date')
holidays_reg = holidays[holidays['locale']=='Regional'].drop_duplicates(subset='date')
holidays_loc = holidays[holidays['locale']=='Local'].drop_duplicates(subset='date')
df_train = pd.merge(df_train, holidays_nat[['date','description']], on='date', how='left').rename(columns={'description': 'holiday_nat'})
df_train = pd.merge(df_train, holidays_reg[['date', 'locale_name', 'description']], left_on=['date', 'state'], right_on=['date', 'locale_name'], how='left').rename(columns={'description': 'holiday_reg'}).drop(columns=['locale_name'])
df_train = pd.merge(df_train, holidays_loc[['date', 'locale_name', 'description']], left_on=['date', 'city'], right_on=['date', 'locale_name'], how='left').rename(columns={'description': 'holiday_loc'}).drop(columns=['locale_name'])
df_train[["holiday_nat", "holiday_reg", "holiday_loc"]] = df_train[["holiday_nat", "holiday_reg", "holiday_loc"]].fillna("No")
df_train.shape
这段代码是用Python编写的。它首先使用pandas库中的read_csv函数从’holidays_events.csv’文件中读取数据,并将其存储在名为holidays的数据框中。在读取过程中,’date’列被解析为日期格式。
接下来,代码使用布尔索引来选择holidays数据框中’transferred’列值为False的行,并将结果存储回holidays变量中。
然后,代码根据’locale’列的值将holidays数据框分成三个子数据框:holidays_nat、holidays_reg和holidays_loc,分别包含’locale’值为’National’、’Regional’和’Local’的行。在创建这些子数据框时,代码还使用了drop_duplicates函数来删除’date’列中重复的行。
接下来,代码使用pandas库中的merge函数将这三个子数据框与df_train数据框合并在一起。合并是基于’date’列进行的,并且合并方式为’left’,这意味着结果数据框将保留df_train中的所有行。合并后,新列被重命名为’holiday_nat’、‘holiday_reg’和’holiday_loc’,并且不需要的列被删除。
最后,代码使用fillna函数将’holiday_nat’、’holiday_reg’和’holiday_loc’列中的缺失值替换为”No”,并显示出df_train数据框的形状。
同理合并了oil和Transactions
df_train = pd.merge(df_train, oil, on=‘date’, how=‘left’)
df_train = pd.merge(df_train, transactions, on=[‘store_nbr’, ‘date’], how=‘left’)
合并EarthQuake数据做了点处理
from scipy.stats import skewnorm
earthquake = pd.DataFrame()
earthquake["date"] = pd.date_range("2016-04-17","2016-05-16")
earthquake['earthquake_effect'] = [2*skewnorm.pdf(i/20, 0.5) for i in range(len(earthquake))]
df_train = pd.merge(df_train, earthquake, on='date', how='left')
df_train['earthquake_effect'].fillna(0, inplace=True)
这段代码是用Python编写的。它首先从scipy库中导入skewnorm模块,然后创建一个名为earthquake的空数据框。接下来,代码使用pandas库中的date_range函数生成一个日期范围,并将其存储在earthquake数据框的’date’列中。
然后,代码使用列表推导式和skewnorm.pdf函数计算出一个值列表,并将其存储在earthquake数据框的’earthquake_effect’列中。
接下来,代码使用pandas库中的merge函数将earthquake数据框与df_train数据框合并在一起。合并是基于’date’列进行的,并且合并方式为’left’,这意味着结果数据框将保留df_train中的所有行。合并后,代码使用fillna函数将’earthquake_effect’列中的缺失值替换为0。
import plotly.express as px
px.line(df_train[(df_train['date'] > pd.to_datetime("2016-03-16"))&(df_train['date'] < pd.to_datetime("2016-06-16"))&(df_train['store_nbr']==2)&(df_train['family']=='AUTOMOTIVE')], x='date', y=['earthquake_effect', 'sales'])
这是一行用Python编写的代码,它使用Plotly Express库来创建折线图。图表的数据来自一个名为df_train的DataFrame,它被过滤以仅包含date列在2016年3月16日和6月16日之间,store_nbr列等于2且family列等于’AUTOMOTIVE’的行。图表的x轴将显示date列的值,y轴将显示earthquake_effect和sales列的值。
pd.to_datetime是Pandas库中的一个函数,它用于将输入转换为日期时间格式。在您提供的代码中,它被用来将字符串”2016-03-16″和”2016-06-16″转换为日期时间对象,以便与df_train中的date列进行比较。这样可以确保只选择在这两个日期之间的行。
日期时间对象是Python中的一种数据类型,用于表示日期和时间。它通常包含年、月、日、小时、分钟和秒等信息。可以使用Python内置的datetime模块来创建和操作日期时间对象。例如,可以使用datetime.datetime(2022, 10, 30, 16, 13, 49)来创建一个表示2022年10月30日16点13分49秒的日期时间对象。
Payday
def get_distance_from_paydays(date):
end_of_month = date.daysinmonth
distance_to_1st = 0 if date.day >=15 else 15 - date.day
distance_to15th = 0 if date.day < 15 else end_of_month - date.day
return distance_to_1st + distance_to15th
df_train['days_from_payday'] = df_train['date'].apply(get_distance_from_paydays)
这段代码定义了一个名为get_distance_from_paydays的函数,它接受一个日期作为输入,并计算该日期与当月1日和15日的距离。如果日期在15日之后,则距离1日的天数为0,距离15日的天数为当月总天数减去日期的天数。如果日期在15日之前,则距离1日的天数为15减去日期的天数,距离15日的天数为0。函数返回这两个距离之和。
然后,代码使用apply方法将get_distance_from_paydays函数应用于DataFrame df_train中的date列,并将结果存储在新列days_from_payday中。
Derivates from sales
df_train['average_sales_by_family'] = df_train.groupby(["date", 'family'], observed=True).sales.transform('mean')
df_train['average_sales_by_store'] = df_train.groupby(["date", 'store_nbr'], observed=True).sales.transform('mean')
这段代码使用groupby方法将DataFrame df_train按照date和family列分组,并计算每组的销售额平均值。然后,使用transform方法将这些平均值应用于原始DataFrame中的每一行,并将结果存储在新列average_sales_by_family中。
接下来,代码再次使用groupby方法将DataFrame df_train按照date和store_nbr列分组,并计算每组的销售额平均值。然后,使用transform方法将这些平均值应用于原始DataFrame中的每一行,并将结果存储在新列average_sales_by_store中。
groupby方法用于将DataFrame按照一个或多个列的值进行分组。这样可以对每个组执行聚合操作,例如计算每组的平均值、最大值、最小值等。在您提供的代码中,groupby方法被用来按照date和family列以及date和store_nbr列对DataFrame进行分组,以便计算每个日期和家庭组合以及每个日期和商店编号组合的销售额平均值。
Casting and preparing for Pytorch Forecasting TimeSeriesDataSet
df_train['dcoilwtico'] = df_train['dcoilwtico'].interpolate().fillna(method='bfill')
df_train['transactions'] = df_train['transactions'].interpolate().fillna(method='bfill')
df_train['dayofweek'] = df_train['date'].dt.dayofweek.astype('str').astype('category')
df_train['month'] = df_train['date'].dt.month.astype('str').astype('category')
df_train['dayofyear'] = df_train['date'].dt.dayofyear.astype('str').astype('category')
for cat_col in ['holiday_nat', 'holiday_reg', 'holiday_loc','city','state' , 'store_type', 'store_cluster', 'store_nbr', 'family']:
df_train[cat_col] = df_train[cat_col].astype(str).astype('category')
df_train['time_idx'] = (df_train['date'].dt.date - df_train['date'].dt.date.min()).dt.days
这段代码是用来处理数据的。它首先使用插值法填充了’dcoilwtico’和’transactions’列中的缺失值。然后,它将’date’列中的日期转换为星期几、月份和一年中的第几天,并将它们存储为类别变量。接下来,它将’holiday_nat’、‘holiday_reg’、‘holiday_loc’、‘city’、‘state’、‘store_type’、‘store_cluster’、‘store_nbr’和’family’列转换为类别变量。最后,它创建了一个新列’time_idx’,表示每个日期与最小日期之间的天数差。
将列转换为类别类型的目的是为了节省内存并提高性能。类别类型通常用于具有重复值的列,例如性别、国家或城市等。它通过将每个唯一值存储为整数并使用整数数组来表示数据来实现这一点。这样可以大大减少内存占用,并且在进行某些操作时,例如分组或排序,速度也会更快。
将列转换为类别类型可以节省内存,因为它使用整数编码来表示数据。对于每个唯一的类别值,Pandas会分配一个整数代码。然后,它使用这些整数代码来表示数据,而不是原始的字符串值。由于整数通常占用的空间比字符串少,所以这样可以大大减少内存占用。此外,由于整数编码是连续的,因此它们可以更有效地存储在内存中。
对象数据类型通常用于存储字符串,但也可以存储任何Python对象。它使用指针数组来存储数据,每个指针都指向内存中的一个对象。由于字符串的长度可能不同,因此它们不能连续存储在内存中,这会导致额外的开销。
类别数据类型用于表示具有有限数量不同值的数据。它通过将每个唯一值映射到一个整数代码来实现这一点,并使用整数数组来表示数据。这样可以大大减少内存占用,并且在进行某些操作时,例如分组或排序,速度也会更快。
总之,对象数据类型适用于存储任意类型的数据,而类别数据类型适用于具有重复值的列。
并不是所有列都适合转换为类别类型。类别类型最适用于具有重复值的列,例如性别、国家或城市等。如果列中的唯一值数量接近总行数,则将其转换为类别类型可能不会节省太多内存,甚至可能会增加内存占用。此外,如果列中的值经常更改,则维护类别编码可能会带来额外的开销。
因此,在决定是否将列转换为类别类型时,应考虑数据的特点以及您打算如何使用它。
Training/Tuning/Evaluating TFT
利用Pytorch Forecasting来进行TFT训练
from pytorch_forecasting import TimeSeriesDataSet, Baseline, TemporalFusionTransformer
from pytorch_forecasting.data import GroupNormalizer
这段代码从pytorch_forecasting库中导入了TimeSeriesDataSet、Baseline和TemporalFusionTransformer类,以及GroupNormalizer类。
TimeSeriesDataSet类用于创建时间序列数据集,它提供了许多用于预处理和转换时间序列数据的功能。
Baseline类是一个简单的基线模型,可以用来预测时间序列数据。
TemporalFusionTransformer类是一个基于Transformer的深度学习模型,用于预测时间序列数据。
GroupNormalizer类用于对数据进行分组归一化,以便在训练模型时更好地处理不同范围的特征。
这些类可以用来构建、训练和评估时间序列预测模型。
max_prediction_length = prediction_steps
max_encoder_length = 60 # Go back 60 Days
training_cutoff = df_train["time_idx"].max() - max_prediction_length
这段代码定义了三个变量:max_prediction_length、max_encoder_length和training_cutoff。
max_prediction_length表示预测步长,即预测未来多少天的数据。
max_encoder_length表示编码器长度,即在训练模型时使用过去多少天的数据。
training_cutoff表示训练数据的截止时间索引。它通过从数据帧df_train中的最大时间索引减去预测步长来计算。
这些变量用于定义时间序列预测模型的输入和输出长度,以及确定训练数据的范围。
training = TimeSeriesDataSet(
df_train[lambda x: x.time_idx <= training_cutoff],
time_idx="time_idx",
target="sales",
group_ids=["store_nbr", "family"],
min_encoder_length=max_encoder_length // 2, # keep encoder length long (as it is in the validation set)
max_encoder_length=max_encoder_length,
min_prediction_length=1,
max_prediction_length=max_prediction_length,
static_categoricals=["store_nbr",
"family",
"city",
"state",
"store_cluster",
"store_type"],
time_varying_known_categoricals=["holiday_nat",
"holiday_reg",
"holiday_loc",
"month",
"dayofweek",
"dayofyear"],
time_varying_known_reals=["time_idx", "onpromotion", 'days_from_payday', 'dcoilwtico', "earthquake_effect"
],
time_varying_unknown_categoricals=[],
time_varying_unknown_reals=[
"sales",
"transactions",
"average_sales_by_family",
"average_sales_by_store",
],
target_normalizer=GroupNormalizer(
groups=["store_nbr", "family"], transformation="softplus"
), # use softplus and normalize by group
add_relative_time_idx=True,
add_target_scales=True,
add_encoder_length=True,
allow_missing_timesteps=True
)
# create validation set (predict=True) which means to predict the last max_prediction_length points in time
# for each series
validation = TimeSeriesDataSet.from_dataset(training, df_train, predict=True, stop_randomization=True)
这段代码用于创建训练集和验证集。
首先,它使用TimeSeriesDataSet类来创建一个训练集。它从数据帧df_train中选择time_idx列小于等于training_cutoff的行作为训练数据。然后,它指定了一些参数来定义数据集,包括时间索引、目标变量、分组变量、编码器长度、预测长度、静态类别变量、已知的时间变化类别变量、已知的时间变化实数变量、未知的时间变化类别变量和未知的时间变化实数变量等。此外,它还指定了目标归一化器,并添加了一些额外的特征,例如相对时间索引、目标比例和编码器长度等。
接下来,它使用TimeSeriesDataSet.from_dataset()方法从训练集创建一个验证集。它使用predict=True参数来指定预测每个序列的最后max_prediction_length个时间点,并使用stop_randomization=True参数来禁用随机化。
这些数据集可用于训练和评估时间序列预测模型。
training_cutoff变量表示训练数据的截止时间索引。它通过从数据帧df_train中的最大时间索引减去预测步长来计算。这样可以确保训练数据不包括需要预测的时间点,从而避免数据泄露。
在创建训练集时,只有time_idx列小于等于training_cutoff的行才会被用作训练数据。
max_prediction_length变量表示预测步长,即预测未来多少天的数据。它用于定义时间序列预测模型的输出长度。
在创建训练集和验证集时,可以指定min_prediction_length和max_prediction_length参数来定义预测长度的范围。在训练模型时,这些参数会影响目标序列的长度。
这段代码定义了几种不同类型的变量,用于创建时间序列数据集。
static_categoricals:静态类别变量,表示在整个时间序列中不会改变的类别变量。
time_varying_known_categoricals:已知的时间变化类别变量,表示在时间序列中会改变的类别变量,且它们的值在预测时是已知的。
time_varying_known_reals:已知的时间变化实数变量,表示在时间序列中会改变的实数变量,且它们的值在预测时是已知的。
time_varying_unknown_categoricals:未知的时间变化类别变量,表示在时间序列中会改变的类别变量,但它们的值在预测时是未知的。
time_varying_unknown_reals:未知的时间变化实数变量,表示在时间序列中会改变的实数变量,但它们的值在预测时是未知的。
这些参数用于定义数据集中包含哪些特征以及它们如何使用。例如,在训练模型时,已知的特征可以用作输入,而未知的特征可以用作目标。
create dataloaders for model
batch_size = 64 # set this between 32 to 128
train_dataloader = training.to_dataloader(train=True, batch_size=batch_size, num_workers=2, drop_last=True)
val_dataloader = validation.to_dataloader(train=False, batch_size=batch_size * 10, num_workers=2,drop_last=True)
这段代码创建了两个数据加载器,一个用于训练数据集,另一个用于验证数据集。数据加载器负责将数据分批次地提供给模型进行训练或验证。
batch_size 变量设置为 64,这意味着每个批次将包含 64 个样本。train_dataloader 是用于训练数据集的数据加载器,它的批次大小为 batch_size。val_dataloader 是用于验证数据集的数据加载器,它的批次大小为 batch_size * 10,即 640。
num_workers 参数设置为 2,这意味着将使用两个工作进程来预处理数据。drop_last 参数设置为 True,这意味着如果最后一个批次的样本数量不足 batch_size,则将其丢弃。
Baseline
import torch
# calculate baseline mean absolute error, i.e. predict next value as the last available value from the history
actuals = torch.cat([y for x, (y, weight) in iter(val_dataloader)])
baseline_predictions = Baseline().predict(val_dataloader)
(actuals - baseline_predictions).abs().mean().item()
这段代码计算了基线平均绝对误差。基线预测是指将下一个值预测为历史数据中的最后一个可用值。
首先,代码从 val_dataloader 中提取了实际值并将其连接在一起。然后,使用 Baseline 类的 predict 方法对验证数据集进行基线预测。最后,计算实际值与基线预测之间的绝对误差的平均值。
计算基线平均绝对误差的目的是为了评估模型的性能。基线预测提供了一个简单的参考,可以用来衡量模型预测的准确性。如果模型的预测结果比基线预测更准确,那么说明模型是有效的。否则,模型可能需要进一步改进。
已收到消息. 计算基线平均绝对误差的目的是为了评估模型的性能。基线预测提供了一个简单的参考,可以用来衡量模型预测的准确性。如果模型的预测结果比基线预测更准确,那么说明模型是有效的。否则,模型可能需要进一步改进。
Baseline 类是一个用于生成基线预测的类。在这段代码中,它的 predict 方法被用来对验证数据集进行基线预测。
import pytorch_lightning as pl
from pytorch_forecasting.metrics import QuantileLoss
这段代码导入了 pytorch_lightning 和 pytorch_forecasting.metrics 模块中的 QuantileLoss 类。
pytorch_lightning 是一个用于深度学习的开源库,它提供了一种简化深度学习训练过程的方法。
QuantileLoss 是 pytorch_forecasting.metrics 模块中的一个类,它用于计算分位数损失。分位数损失是一种常用于回归问题的损失函数,它可以用来衡量预测值与实际值之间的差异。
Hyperparameter Optimization
import pickle
from pytorch_forecasting.models.temporal_fusion_transformer.tuning import optimize_hyperparameters
# create study
study = optimize_hyperparameters(
train_dataloader,
val_dataloader,
model_path="optuna_test",
n_trials=50,
max_epochs=20,
gradient_clip_val_range=(0.01, 1.0),
hidden_size_range=(8, 64),
hidden_continuous_size_range=(8, 64),
attention_head_size_range=(1, 4),
learning_rate_range=(0.001, 0.1),
dropout_range=(0.1, 0.3),
trainer_kwargs=dict(limit_train_batches=30, log_every_n_steps=15, gpus=1),
reduce_on_plateau_patience=4,
use_learning_rate_finder=False, # use Optuna to find ideal learning rate or use in-built learning rate finder
timeout=4200
)
# show best hyperparameters
print(study.best_trial.params)
这段代码使用了PyTorch Forecasting库中的optimize_hyperparameters函数来优化超参数。它创建了一个研究,用于在给定的训练和验证数据加载器、模型路径、试验次数、最大时期和其他超参数范围的情况下,优化超参数。最后,它显示了最佳试验的超参数。
超参数优化是确定使模型性能最大化的超参数正确组合的过程。它在一个训练过程中运行多个试验。每一次试验都是训练程序的完整执行,并在指定的范围内选择超参数设置值。这个过程一旦完成,就会给你一组最适合模型的超参数值,以获得最佳结果
超参数是在机器学习的上下文中,在开始学习过程之前设置值的参数,而不是通过训练得到的参数数据 。通常情况下,需要对超参数进行优化,给学习机选择一组最优超参数,以提高学习的性能和效果。
超参数优化是确定使模型性能最大化的超参数正确组合的过程。它在一个训练过程中运行多个试验。每一次试验都是训练程序的完整执行,并在指定的范围内选择超参数设置值。这个过程一旦完成,就会给你一组最适合模型的超参数值,以获得最佳结果。这是任何机器学习项目中的一个比较重要的步骤,因为其影响了模型的最佳结果
Retrain A full Model
#Early Stopping
MIN_DELTA = 1e-4
PATIENCE = 10
#PL Trainer
MAX_EPOCHS = 150
GPUS = 1
GRADIENT_CLIP_VAL=study.best_trial.params['gradient_clip_val']
LIMIT_TRAIN_BATCHES=30
#Fusion Transformer
LR = study.best_trial.params['learning_rate']
HIDDEN_SIZE = study.best_trial.params['hidden_size']
DROPOUT = study.best_trial.params['dropout']
ATTENTION_HEAD_SIZE = study.best_trial.params['attention_head_size']
HIDDEN_CONTINUOUS_SIZE = study.best_trial.params['hidden_continuous_size']
OUTPUT_SIZE=7
REDUCE_ON_PLATEAU_PATIENCE=5
这段代码定义了一些超参数和变量。其中,MIN_DELTA和PATIENCE是早停的参数,MAX_EPOCHS、GPUS、GRADIENT_CLIP_VAL和LIMIT_TRAIN_BATCHES是PL训练器的参数。LR、HIDDEN_SIZE、DROPOUT、ATTENTION_HEAD_SIZE、HIDDEN_CONTINUOUS_SIZE、OUTPUT_SIZE和REDUCE_ON_PLATEAU_PATIENCE是Fusion Transformer的参数。这些参数的值是从之前优化超参数的结果中获取的。
PL Trainer很可能是指PyTorch Lightning的Trainer类。它是一个用于训练PyTorch Lightning模型的类,可以自动完成许多训练过程中的任务,如梯度计算、设备选择、日志记录等。它可以通过传递参数来进行设置,以满足不同的训练需求
Fusion Transformer(融合变换器)是一种针对多步预测任务的Transformer模型,并且具有很好的可解释性。它能够处理多个数据源,包括静态协变量、已知未来输入和其他仅在历史上观察到的外生时间序列,而不需要任何关于它们如何与目标交互的先验信息。
from pytorch_lightning.callbacks import EarlyStopping, LearningRateMonitor
# configure network and trainer
early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=MIN_DELTA, patience=PATIENCE, verbose=False, mode="min")
lr_logger = LearningRateMonitor() # log the learning rate
trainer = pl.Trainer(
max_epochs=MAX_EPOCHS,
gpus=GPUS,
weights_summary="top",
gradient_clip_val=GRADIENT_CLIP_VAL,
limit_train_batches=LIMIT_TRAIN_BATCHES,#oment in for training, running valiation every 30 batches
#fast_dev_run=True, # comment in to check that networkor dataset has no serious bugs
callbacks=[lr_logger, early_stop_callback],
log_every_n_steps=10
)
tft = TemporalFusionTransformer.from_dataset(
training,
learning_rate=LR,
hidden_size=HIDDEN_SIZE,
attention_head_size=ATTENTION_HEAD_SIZE,
dropout=DROPOUT,
hidden_continuous_size=HIDDEN_CONTINUOUS_SIZE,
output_size=OUTPUT_SIZE,# 7 quantiles by default
loss=QuantileLoss(),
log_interval=10, # uncomment for learning rate finder and otherwise, e.g. to 10 for logging every 10 batches
reduce_on_plateau_patience=REDUCE_ON_PLATEAU_PATIENCE,
)
print(f"Number of parameters in network: {tft.size()/1e3:.1f}k")
这段代码导入了PyTorch Lightning的EarlyStopping和LearningRateMonitor回调,并配置了网络和训练器。它定义了一个早停回调,用于在验证损失不再改善时停止训练,以及一个学习率记录器,用于记录学习率。然后,它实例化了一个Trainer对象,并传递了一些参数,如最大训练轮数、GPU数量、梯度裁剪值等。最后,它实例化了一个TemporalFusionTransformer对象,并从数据集中进行初始化,传递了一些参数,如学习率、隐藏层大小、注意力头大小等。
LearningRateMonitor是PyTorch Lightning的一个回调,用于在训练过程中记录学习率。它可以记录每个优化器的学习率,并将其记录到日志中,以便在训练过程中跟踪学习率的变化。
要使用LearningRateMonitor,首先需要从pytorch_lightning.callbacks模块中导入它。然后,可以实例化一个LearningRateMonitor对象,并将其添加到Trainer的回调列表中。例如:
from pytorch_lightning.callbacks import LearningRateMonitor
lr_monitor = LearningRateMonitor()
trainer = pl.Trainer(callbacks=[lr_monitor])
这样,在训练过程中,学习率就会被自动记录到日志中。
import tensorflow as tf
import tensorboard as tb
tf.io.gfile = tb.compat.tensorflow_stub.io.gfile
这段代码导入了TensorFlow和TensorBoard库,并将tf.io.gfile重定向到tb.compat.tensorflow_stub.io.gfile。这样做的目的是为了解决TensorFlow和TensorBoard版本不兼容的问题,使得TensorBoard能够正常使用。
train
# fit network
trainer.fit(
tft,
train_dataloaders=train_dataloader,
val_dataloaders=val_dataloader,
)
这段代码使用Trainer的fit方法来训练网络。它将TemporalFusionTransformer对象、训练数据加载器和验证数据加载器作为参数传递给fit方法。这样,Trainer就会使用给定的数据加载器和模型进行训练,并在每个训练轮结束时使用验证数据集进行验证。
Evaluation
# load the best model according to the validation loss
# (given that we use early stopping, this is not necessarily the last epoch)
best_model_path = trainer.checkpoint_callback.best_model_path
best_tft = TemporalFusionTransformer.load_from_checkpoint(best_model_path)
这段代码从Trainer的checkpoint_callback属性中获取最佳模型的路径,然后使用TemporalFusionTransformer的load_from_checkpoint方法从该路径加载最佳模型。由于使用了早停,最佳模型并不一定是最后一个训练轮的模型,而是在验证损失最小时的模型
# calcualte mean absolute error on validation set
actuals = torch.cat([y[0] for x, y in iter(val_dataloader)])
predictions = best_tft.predict(val_dataloader)
(actuals - predictions).abs().mean()
这段代码计算了最佳模型在验证集上的平均绝对误差。它首先使用torch.cat方法将验证数据集中的所有目标值拼接在一起,然后使用best_tft的predict方法对验证数据集进行预测。最后,它计算了实际值和预测值之间的绝对误差,并取了平均值。
# raw predictions are a dictionary from which all kind of information including quantiles can be extracted
raw_predictions, x = best_tft.predict(val_dataloader, mode="raw", return_x=True)
这段代码使用best_tft的predict方法以“raw”模式对验证数据集进行预测,并返回原始预测和输入数据。原始预测是一个字典,其中包含了各种信息,包括分位数。这样,可以从原始预测中提取所需的信息。
for idx in range(5): # plot 10 examples
best_tft.plot_prediction(x, raw_predictions, idx=idx, add_loss_to_title=True);
这段代码使用best_tft的plot_prediction方法绘制了5个预测示例。它传递了输入数据、原始预测、索引和add_loss_to_title参数。这样,就可以在图表的标题中添加损失信息。
要绘制预测示例,可以使用PyTorch Lightning模型的plot_prediction方法(如果该模型提供了这个方法)。这个方法通常需要传递输入数据、原始预测和要绘制的数据点的索引。
predictions, x = best_tft.predict(val_dataloader, return_x=True)
predictions_vs_actuals = best_tft.calculate_prediction_actual_by_variable(x, predictions)
best_tft.plot_prediction_actual_by_variable(predictions_vs_actuals);
这段代码使用了名为 best_tft 的对象的 predict 方法来预测 val_dataloader 中的数据。然后,它使用 calculate_prediction_actual_by_variable 方法来计算预测值与实际值之间的差异。最后,它使用 plot_prediction_actual_by_variable 方法来绘制预测值与实际值之间的差异。
Prediction
df_test = pd.merge(df_test, stores, on='store_nbr')
这段代码使用 pandas 库中的 merge 函数将 df_test 和 stores 两个数据框合并在一起。合并是基于两个数据框中的 store_nbr 列进行的。结果将被存储在 df_test 中
# select last 30 days from data (max_encoder_length is 24)
encoder_data = df_train[lambda x: x.time_idx > x.time_idx.max() - max_encoder_length]
last_data = df_train[df_train['time_idx'].isin([idx - prediction_steps for idx in df_test['time_idx'].unique()])]
last_data['time_idx'] = last_data['time_idx'] + prediction_steps
decoder_data = pd.merge(df_test[[col for col in df_test.columns if 'sales' not in col]],
last_data[['time_idx','store_nbr', 'family', 'sales', 'average_sales_by_family', 'average_sales_by_store' , 'transactions']],
on = ['time_idx', 'store_nbr', 'family',]
)
# combine encoder and decoder data
new_prediction_data = pd.concat([encoder_data, decoder_data], ignore_index=True)
这段代码是用来从数据中选择最后30天的数据(max_encoder_length为24)。首先,它使用lambda函数从df_train中选择time_idx大于最大time_idx减去max_encoder_length的数据作为encoder_data。
然后,它从df_train中选择time_idx在df_test的time_idx减去prediction_steps的唯一值中的数据作为last_data。接着,它将last_data的time_idx加上prediction_steps。
接下来,它将df_test中不包含’sales’的列与last_data中的’time_idx’、‘store_nbr’、‘family’、‘sales’、‘average_sales_by_family’、‘average_sales_by_store’和’transactions’列合并,基于’time_idx’、’store_nbr’和’family’列,作为decoder_data。
最后,它将encoder_data和decoder_data连接起来,忽略索引,作为new_prediction_data。
在这段代码中,encoder_data 是从 df_train 中选择的数据,它包含了 time_idx 大于最大 time_idx 减去 max_encoder_length 的数据。而 decoder_data 是通过将 df_test 中不包含 ‘sales’ 的列与 last_data 中的某些列合并而得到的。这两个数据集的来源和构造方式不同。
这段代码的目的是从 df_train 和 df_test 数据中提取数据,构造出 encoder_data 和 decoder_data,并将它们连接起来形成 new_prediction_data。根据代码中的变量名和操作,可以推断出这可能是为了进行时间序列预测而准备的数据。
pd.merge 和 pd.concat 都是用来合并数据的函数。
pd.merge 是用来根据一个或多个键将两个数据集的行连接起来的。它可以处理不同类型的合并,如一对一、多对一和多对多。在这段代码中,它用于将 df_test 中不包含 ‘sales’ 的列与 last_data 中的某些列合并,基于 ‘time_idx’、‘store_nbr’ 和 ‘family’ 列。
而 pd.concat 是用来沿着特定轴连接 Pandas 对象的。它可以处理不同类型的连接,如连接、交叉和并集。在这段代码中,它用于将 encoder_data 和 decoder_data 连接起来,忽略索引,形成 new_prediction_data。
根据这段代码,我们可以推断出 new_prediction_data 是由 encoder_data 和 decoder_data 连接而成的。encoder_data 包含了过去的数据,而 decoder_data 包含了未来需要预测的时间点的数据。
在时间序列预测中,通常使用过去的数据来预测未来的数据。
因此, new_prediction_data 被用作TFT输入数据,用于训练时间序列预测模型。
new_raw_predictions, new_x = best_tft.predict(new_prediction_data, mode="raw", return_x=True)
for idx in range(10): # plot 10 examples
best_tft.plot_prediction(new_x, new_raw_predictions, idx=idx, show_future_observed=False);
这段代码使用最佳的TFT模型来预测新数据。它首先使用“best_tft.predict”函数对“new_prediction_data”进行预测,并返回原始预测值和新的x值。然后,它使用“best_tft.plot_prediction”函数绘制前10个预测结果的图形,但不显示未来观察值。
new_raw_predictions 和 new_x 是 best_tft.predict 函数的输出。根据代码,best_tft.predict 函数接受 new_prediction_data 作为输入,并以“raw”模式进行预测。new_raw_predictions 可能包含原始预测值,而 new_x 可能包含与预测相关的新x值。
interpretation = best_tft.interpret_output(new_raw_predictions, reduction="sum")
best_tft.plot_interpretation(interpretation)
这段代码是在使用一个名为 best_tft 的对象的 interpret_output 方法来解释新的原始预测。解释结果通过对解释值求和来进行归约。然后,使用 best_tft 对象的 plot_interpretation 方法绘制解释结果。这些代码可能与机器学习模型的解释性有关。
Reformat predictions
predictions = best_tft.predict(new_prediction_data, mode="prediction", return_x=False)
在使用一个名为 best_tft 的对象的 predict 方法来对 new_prediction_data 进行预测。预测模式被设置为 prediction,并且不返回输入数据。
predictions = pd.DataFrame(predictions.numpy()).T
predictions['date'] = sorted(df_test['date'].unique())
predictions = pd.melt(predictions, id_vars=['date'])
predictions = predictions.sort_values(['date', 'variable']).reset_index(drop=True)
df_test[['date', 'id', 'store_nbr', 'family']].sort_values(['date', 'store_nbr', 'family']).reset_index(drop=True)
df_test = df_test.join(predictions['value'])
这段代码将 predictions 转换为一个 DataFrame,并将其转置。然后,它将 date 列添加到 predictions 中,其中 date 列的值来自 df_test 中唯一的日期值。接下来,它使用 pd.melt 函数将 predictions 重塑为长格式,并根据日期和变量对其进行排序。最后,它对 df_test 按照日期、商店编号和家庭进行排序,并将 predictions 中的 value 列加入到 df_test 中。
import plotly.graph_objects as go
import plotly.express as px
list_colors = px.colors.qualitative.Plotly
def plot_train_prediction(df_train, df_predictions, store="3", n_families=10, date_begin="2017-07-15", pred_time_col='date' , pred_col='value'):
df_train_viz = df_train[(df_train['date'] > pd.to_datetime("2017-07-15"))&(df_train['store_nbr']==store)]
fig = go.Figure()
for i, family in enumerate(df_train_viz['family'].unique()[:10]):
train = df_train_viz[df_train_viz['family']==family]
pred = df_predictions[(df_predictions['family']==family)&(df_predictions['store_nbr']==store)]
fig.add_trace(go.Scatter(x =train["date"], y=train["sales"], mode='lines', name=f'{family}_train', line=dict(color=list_colors[i])))
fig.add_trace(go.Scatter(x =pred[pred_time_col], y=pred[pred_col], mode='lines', name=f'{family}_pred', line=dict(color=list_colors[i])))
fig.show()
plot_train_prediction(df_train, df_test, store="4")
这段文本是一个 Python 代码片段,它定义了一个名为 plot_train_prediction 的函数。该函数使用 plotly 库中的 graph_objects 和 express 模块来绘制训练数据和预测数据的图形。函数接受多个参数,包括 df_train(训练数据),df_predictions(预测数据),store(商店编号),n_families(家庭数量),date_begin(开始日期),pred_time_col(预测时间列)和 pred_col(预测值列)。函数内部使用了 for 循环来遍历前 10 个家庭,并为每个家庭添加两条轨迹,一条表示训练数据,另一条表示预测数据。最后,使用 fig.show() 显示图形。在代码的最后一行,调用了 plot_train_prediction 函数并传入了相应的参数。
df_test[['id', 'value']].rename(columns={"value": "sales"}).to_csv('submission.csv', index=False)
这段文本是一个 Python 代码片段,它使用了 pandas 库中的 DataFrame 对象。代码首先从 df_test DataFrame 中选择 id 和 value 列,然后使用 rename 函数将 value 列重命名为 sales。最后,使用 to_csv 函数将结果保存到名为 submission.csv 的文件中,且不包含索引列。