新聞中心
DeepTime,是一個(gè)結(jié)合使用元學(xué)習(xí)的深度時(shí)間指數(shù)模型。通過(guò)使用元學(xué)習(xí)公式來(lái)預(yù)測(cè)未來(lái),以應(yīng)對(duì)時(shí)間序列中的常見問(wèn)題(協(xié)變量偏移和條件分布偏移——非平穩(wěn))。該模型是時(shí)間序列預(yù)測(cè)的元學(xué)習(xí)公式協(xié)同作用的一個(gè)很好的例子。

DeepTime架構(gòu)
DeepTime組件
DeepTime中有三種類型的層:
- 嶺回歸
- 多層感知機(jī)(MLP)
- 隨機(jī)傅里葉特征
讓我們看看這些層在做什么:
嶺回歸
多層感知機(jī)(MLP)
這些是在神經(jīng)網(wǎng)絡(luò)(nn)中使用的線性回歸公式。然后使用了一個(gè)ReLU函數(shù)激活。這些層非常適合將時(shí)間指數(shù)映射到該時(shí)間指數(shù)的時(shí)間序列值。公式如下:
隨機(jī)的傅里葉層
隨機(jī)傅里葉允許mlp學(xué)習(xí)高頻模式。盡管隨機(jī)傅里葉層需要為每個(gè)任務(wù)和數(shù)據(jù)集找到不同的超參數(shù)(只是為了不過(guò)度擬合或不足擬合),但作者通過(guò)將各種傅里葉基函數(shù)與各種尺度參數(shù)相結(jié)合來(lái)限制這種計(jì)算。
DeepTIME架構(gòu)
在每個(gè)任務(wù)中,選擇一個(gè)時(shí)間序列,然后將其分為主干窗口(綠色)和預(yù)測(cè)窗口(藍(lán)色)兩部分。然后,然后他們通過(guò)兩個(gè)彼此共享信息并與元參數(shù)關(guān)聯(lián)的元模型。 在上圖描述的架構(gòu)上訓(xùn)練模型后,計(jì)算損失函數(shù)并嘗試將其最小化。
其他時(shí)間序列預(yù)測(cè)模型的區(qū)別
DeepTIME是一個(gè)時(shí)間指數(shù)模型,就像Prophet,高斯過(guò)程等,而最近比較突出的模型如N-HiTS, Autoformer, DeepAR, Informer等都是歷史價(jià)值模型。
當(dāng)我們說(shuō)時(shí)間序列的時(shí)間指數(shù)模型時(shí),確切的意思是預(yù)測(cè)絕對(duì)隨時(shí)間變化(它考慮了當(dāng)前的時(shí)間指數(shù)特征)。另一方面,歷史價(jià)值模型使用以前的事件來(lái)預(yù)測(cè)未來(lái)。這個(gè)公式能讓你更清楚。:)
它包含了元學(xué)習(xí)公式,這意味著這個(gè)模型可以學(xué)會(huì)如何學(xué)習(xí)。由于它是一個(gè)時(shí)間指數(shù)模型,可以在元學(xué)習(xí)中表現(xiàn)出更好的樣本效率。
它采用直接多步估計(jì)(DMS)的方法(DMS模型一次直接預(yù)測(cè)幾個(gè)數(shù)據(jù)點(diǎn))。另外通過(guò)多步迭代(IMS),它只預(yù)測(cè)下一個(gè)值,然后使用它來(lái)預(yù)測(cè)下一個(gè)數(shù)據(jù)點(diǎn),這與ARIMA、DeepAR等相同。
元學(xué)習(xí)給時(shí)間序列預(yù)測(cè)帶來(lái)了什么?
- 更好的任務(wù)泛化
- 符合附近時(shí)間步長(zhǎng)遵循局部平穩(wěn)分布的假設(shè)。
- 還包含了相似的時(shí)間點(diǎn)將具有相似的特征的假設(shè)。
模型如何預(yù)測(cè)
在每一次訓(xùn)練時(shí),將數(shù)據(jù)分為兩個(gè)窗口(通過(guò)使用第一個(gè)窗口預(yù)測(cè)第二個(gè)窗口)。這里為了簡(jiǎn)單起見使用PyTorch Lightning簡(jiǎn)化訓(xùn)練過(guò)程。
import numpy as np
import gin
import pytorch_lightning as pl
from models import get_model
import random
import torch
import torch.nn.functional as F
from torch import optim
import math
from utils import Checkpoint, default_device, to_tensor
@gin.configurable
class DeepTimeTrainer(pl.LightningModule):
def __init__(self,
lr,
lambda_lr,
weight_decay,
warmup_epochs,
random_seed,
T_max,
eta_min,
dim_size,
datetime_feats,
):
gin.parse_config_file('/home/reza/Projects/PL_DeepTime/DeepTime/config/config.gin')
super(DeepTimeTrainer, self).__init__()
self.lr = lr
self.lambda_lr = lambda_lr
self.weight_decay = weight_decay
self.warmup_epochs = warmup_epochs
self.random_seed = random_seed
self.lr = lr
self.lambda_lr = lambda_lr
self.weight_decay = weight_decay
self.T_max = T_max
self.warmup_epochs = warmup_epochs
self.eta_min = eta_min
self.model = get_model(
model_type='deeptime',
dim_size=dim_size,
datetime_feats=datetime_feats
)
def on_fit_start(self):
torch.manual_seed(self.random_seed)
np.random.seed(self.random_seed)
random.seed(self.random_seed)
def training_step(self, batch, batch_idx):
x, y, x_time, y_time = map(to_tensor, batch)
forecast = self.model(x, x_time, y_time)
if isinstance(forecast, tuple):
# for models which require reconstruction + forecast loss
loss = F.mse_loss(forecast[0], x) + \
F.mse_loss(forecast[1], y)
else:
loss = F.mse_loss(forecast, y)
self.log('train_loss', loss, prog_bar=True, on_epoch=True)
return {'loss': loss, 'train_loss': loss, }
def training_epoch_end(self, outputs):
avg_train_loss = torch.stack([x["train_loss"] for x in outputs]).mean()
self.log('avg_train_loss', avg_train_loss, on_epoch=True, sync_dist=True)
def validation_step(self, batch, batch_idx):
x, y, x_time, y_time = map(to_tensor, batch)
forecast = self.model(x, x_time, y_time)
if isinstance(forecast, tuple):
# for models which require reconstruction + forecast loss
loss = F.mse_loss(forecast[0], x) + \
F.mse_loss(forecast[1], y)
else:
loss = F.mse_loss(forecast, y)
self.log('val_loss', loss, prog_bar=True, on_epoch=True)
return {'val_loss': loss}
def validation_epoch_end(self, outputs):
return outputs
def test_step(self, batch, batch_idx):
x, y, x_time, y_time = map(to_tensor, batch)
forecast = self.model(x, x_time, y_time)
if isinstance(forecast, tuple):
# for models which require reconstruction + forecast loss
loss = F.mse_loss(forecast[0], x) + \
F.mse_loss(forecast[1], y)
else:
loss = F.mse_loss(forecast, y)
self.log('test_loss', loss, prog_bar=True, on_epoch=True)
return {'test_loss': loss}
def test_epoch_end(self, outputs):
return outputs
@gin.configurable
def configure_optimizers(self):
group1 = [] # lambda
group2 = [] # no decay
group3 = [] # decay
no_decay_list = ('bias', 'norm',)
for param_name, param in self.model.named_parameters():
if '_lambda' in param_name:
group1.append(param)
elif any([mod in param_name for mod in no_decay_list]):
group2.append(param)
else:
group3.append(param)
optimizer = optim.Adam([
{'params': group1, 'weight_decay': 0, 'lr': self.lambda_lr, 'scheduler': 'cosine_annealing'},
{'params': group2, 'weight_decay': 0, 'scheduler': 'cosine_annealing_with_linear_warmup'},
{'params': group3, 'scheduler': 'cosine_annealing_with_linear_warmup'}
], lr=self.lr, weight_decay=self.weight_decay)
scheduler_fns = []
for param_group in optimizer.param_groups:
scheduler = param_group['scheduler']
if scheduler == 'none':
fn = lambda T_cur: 1
elif scheduler == 'cosine_annealing':
lr = eta_max = param_group['lr']
fn = lambda T_cur: (self.eta_min + 0.5 * (eta_max - self.eta_min) * (
1.0 + math.cos(
(T_cur - self.warmup_epochs) / (self.T_max - self.warmup_epochs) * math.pi))) / lr
elif scheduler == 'cosine_annealing_with_linear_warmup':
lr = eta_max = param_group['lr']
fn = lambda T_cur: T_cur / self.warmup_epochs if T_cur < self.warmup_epochs else (self.eta_min + 0.5 * (
eta_max - self.eta_min) * (1.0 + math.cos(
(T_cur - self.warmup_epochs) / (self.T_max - self.warmup_epochs) * math.pi))) / lr
else:
raise ValueError(f'No such scheduler, {scheduler}')
scheduler_fns.append(fn)
scheduler = optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=scheduler_fns)
return {'optimizer': optimizer, 'lr_scheduler': scheduler}
def forward(self, batch, z_0=None):
z_0 = None
Y = batch['Y'].to(default_device)
sample_mask = batch['sample_mask'].to(default_device)
available_mask = batch['available_mask'].to(default_device)
# Forecasting
forecasting_mask = available_mask.clone()
if self.n_time_out > 0:
forecasting_mask[:, 0, -self.n_time_out:] = 0
Y, Y_hat, z = self.model(Y=Y, mask=forecasting_mask, idxs=None, z_0=z_0)
if self.n_time_out > 0:
Y = Y[:, :, -self.n_time_out:]
Y_hat = Y_hat[:, :, -self.n_time_out:]
sample_mask = sample_mask[:, :, -self.n_time_out:]
return Y, Y_hat, sample_mask, z
作者在合成數(shù)據(jù)集和真實(shí)世界數(shù)據(jù)集上進(jìn)行了廣泛的實(shí)驗(yàn),表明DeepTime具有極具競(jìng)爭(zhēng)力的性能,在基于MSE的多元預(yù)測(cè)基準(zhǔn)的24個(gè)實(shí)驗(yàn)中,有20個(gè)獲得了最結(jié)果。
有興趣的可以看看源代碼:https://github.com/salesforce/DeepTime
新聞標(biāo)題:DeepTime:時(shí)間序列預(yù)測(cè)中的元學(xué)習(xí)模型
當(dāng)前網(wǎng)址:http://www.5511xx.com/article/cccijoh.html


咨詢
建站咨詢
