from tsai.data.core import TSCategorize
from tsai.data.external import get_UCR_data
from tsai.data.preprocessing import TSStandardize
Time Series Data Augmentation
Functions used to transform TSTensors (Data Augmentation)
= 'NATOPS'
dsid = get_UCR_data(dsid, return_split=False)
X, y, splits = [None, TSCategorize()]
tfms = TSStandardize()
batch_tfms = get_ts_dls(X, y, tfms=tfms, splits=splits, batch_tfms=batch_tfms, bs=128)
dls = next(iter(dls.train)) xb, yb
TSIdentity
TSIdentity (magnitude=None, **kwargs)
Applies the identity tfm to a TSTensor
batch
=0).shape, xb.shape) test_eq(TSIdentity()(xb, split_idx
TSShuffle_HLs
TSShuffle_HLs (magnitude=1.0, ex=None, **kwargs)
Randomly shuffles HIs/LOs of an OHLC TSTensor
batch
=0).shape, xb.shape) test_eq(TSShuffle_HLs()(xb, split_idx
TSShuffleSteps
TSShuffleSteps (magnitude=1.0, ex=None, **kwargs)
Randomly shuffles consecutive sequence datapoints in batch
= TSTensor(torch.arange(11).float())
t = []
tt_ for _ in range(1000):
= TSShuffleSteps()(t, split_idx=0)
tt len(set(tt.tolist())), len(t))
test_eq(
test_ne(tt, t)for i,t in enumerate(tt) if t!=i])
tt_.extend([t = np.unique(tt_, return_counts=True) # This is to visualize distribution which should be equal for all and half for first and last items
x, y ; plt.bar(x, y)
TSGaussianNoise
TSGaussianNoise (magnitude=0.5, additive=True, ex=None, **kwargs)
Applies additive or multiplicative gaussian noise
.1, additive=True)(xb, split_idx=0).shape, xb.shape)
test_eq(TSGaussianNoise(.1, additive=False)(xb, split_idx=0).shape, xb.shape) test_eq(TSGaussianNoise(
TSMagMulNoise
TSMagMulNoise (magnitude=1, ex=None, **kwargs)
Applies multiplicative noise on the y-axis for each step of a TSTensor
batch
TSMagAddNoise
TSMagAddNoise (magnitude=1, ex=None, **kwargs)
Applies additive noise on the y-axis for each step of a TSTensor
batch
=0).shape, xb.shape)
test_eq(TSMagAddNoise()(xb, split_idx=0).shape, xb.shape)
test_eq(TSMagMulNoise()(xb, split_idx=0), xb)
test_ne(TSMagAddNoise()(xb, split_idx=0), xb) test_ne(TSMagMulNoise()(xb, split_idx
random_cum_linear_generator
random_cum_linear_generator (o, magnitude=0.1)
random_cum_noise_generator
random_cum_noise_generator (o, magnitude=0.1, noise=None)
random_cum_curve_generator
random_cum_curve_generator (o, magnitude=0.1, order=4, noise=None)
random_curve_generator
random_curve_generator (o, magnitude=0.1, order=4, noise=None)
TSTimeNoise
TSTimeNoise (magnitude=0.1, ex=None, **kwargs)
Applies noise to each step in the x-axis of a TSTensor
batch based on smooth random curve
=0).shape, xb.shape)
test_eq(TSTimeNoise()(xb, split_idx=0), xb) test_ne(TSTimeNoise()(xb, split_idx
TSMagWarp
TSMagWarp (magnitude=0.02, ord=4, ex=None, **kwargs)
Applies warping to the y-axis of a TSTensor
batch based on a smooth random curve
=0).shape, xb.shape)
test_eq(TSMagWarp()(xb, split_idx=0), xb) test_ne(TSMagWarp()(xb, split_idx
TSTimeWarp
TSTimeWarp (magnitude=0.1, ord=6, ex=None, **kwargs)
Applies time warping to the x-axis of a TSTensor
batch based on a smooth random curve
=0).shape, xb.shape)
test_eq(TSTimeWarp()(xb, split_idx=0), xb) test_ne(TSTimeWarp()(xb, split_idx
TSWindowWarp
TSWindowWarp (magnitude=0.1, ex=None, **kwargs)
Applies window slicing to the x-axis of a TSTensor
batch based on a random linear curve based on https://halshs.archives-ouvertes.fr/halshs-01357973/document
=0).shape, xb.shape) test_eq(TSWindowWarp()(xb, split_idx
TSMagScalePerVar
TSMagScalePerVar (magnitude=0.5, ex=None, **kwargs)
Applies per_var scaling to the y-axis of a TSTensor
batch based on a scalar
TSMagScale
TSMagScale (magnitude=0.5, ex=None, **kwargs)
Applies scaling to the y-axis of a TSTensor
batch based on a scalar
=0).shape, xb.shape)
test_eq(TSMagScale()(xb, split_idx=0).shape, xb.shape)
test_eq(TSMagScalePerVar()(xb, split_idx=0), xb)
test_ne(TSMagScale()(xb, split_idx=0), xb) test_ne(TSMagScalePerVar()(xb, split_idx
test_interpolate
test_interpolate (mode='linear')
# Run the test
'linear') test_interpolate(
linear interpolation is not supported by mps. You can try a different mode
Error: The operator 'aten::upsample_linear1d.out' is not currently implemented for the MPS device. If you want this op to be added in priority during the prototype phase of this feature, please comment on https://github.com/pytorch/pytorch/issues/77764. As a temporary fix, you can set the environment variable `PYTORCH_ENABLE_MPS_FALLBACK=1` to use the CPU as a fallback for this op. WARNING: this will be slower than running natively on MPS.
False
'nearest') test_interpolate(
True
TSRandomResizedCrop
TSRandomResizedCrop (magnitude=0.1, size=None, scale=None, ex=None, mode='nearest', **kwargs)
Randomly amplifies a sequence focusing on a random section of the steps
if test_interpolate('nearest'):
.5)(xb, split_idx=0).shape, xb.shape)
test_eq(TSRandomResizedCrop(=.8, scale=(.5, 1))(xb, split_idx=0).shape, xb.shape)
test_ne(TSRandomResizedCrop(size=20, scale=(.5, 1))(xb, split_idx=0).shape, xb.shape) test_ne(TSRandomResizedCrop(size
TSWindowSlicing
TSWindowSlicing (magnitude=0.1, ex=None, mode='nearest', **kwargs)
Randomly extracts an resize a ts slice based on https://halshs.archives-ouvertes.fr/halshs-01357973/document
if test_interpolate('nearest'):
=0).shape, xb.shape)
test_eq(TSWindowSlicing()(xb, split_idx=0), xb) test_ne(TSWindowSlicing()(xb, split_idx
TSRandomZoomOut
TSRandomZoomOut (magnitude=0.1, ex=None, mode='nearest', **kwargs)
Randomly compresses a sequence on the x-axis
if test_interpolate('nearest'):
.5)(xb, split_idx=0).shape, xb.shape)# test_eq(TSRandomZoomOut(
TSRandomTimeScale
TSRandomTimeScale (magnitude=0.1, ex=None, mode='nearest', **kwargs)
Randomly amplifies/ compresses a sequence on the x-axis keeping the same length
if test_interpolate('nearest'):
.5)(xb, split_idx=0).shape, xb.shape) test_eq(TSRandomTimeScale(
TSRandomTimeStep
TSRandomTimeStep (magnitude=0.02, ex=None, mode='nearest', **kwargs)
Compresses a sequence on the x-axis by randomly selecting sequence steps and interpolating to previous size
if test_interpolate('nearest'):
=0).shape, xb.shape) test_eq(TSRandomTimeStep()(xb, split_idx
TSResampleSteps
TSResampleSteps (step_pct=1.0, same_seq_len=True, magnitude=None, **kwargs)
Transform that randomly selects and sorts sequence steps (with replacement) maintaining the sequence length
=.9, same_seq_len=False)(xb, split_idx=0).shape[-1], round(.9*xb.shape[-1]))
test_eq(TSResampleSteps(step_pct=.9, same_seq_len=True)(xb, split_idx=0).shape[-1], xb.shape[-1]) test_eq(TSResampleSteps(step_pct
TSBlur
TSBlur (magnitude=1.0, ex=None, filt_len=None, **kwargs)
Blurs a sequence applying a filter of type [1, 0, 1]
=7)(xb, split_idx=0).shape, xb.shape)
test_eq(TSBlur(filt_len=0), xb) test_ne(TSBlur()(xb, split_idx
TSSmooth
TSSmooth (magnitude=1.0, ex=None, filt_len=None, **kwargs)
Smoothens a sequence applying a filter of type [1, 5, 1]
=7)(xb, split_idx=0).shape, xb.shape)
test_eq(TSSmooth(filt_len=0), xb) test_ne(TSSmooth()(xb, split_idx
TSFreqDenoise
TSFreqDenoise (magnitude=0.1, ex=None, wavelet='db4', level=2, thr=None, thr_mode='hard', pad_mode='per', **kwargs)
Denoises a sequence applying a wavelet decomposition method
maddest
maddest (d, axis=None)
try: import pywt
except ImportError: pass
if 'pywt' in dir():
=0).shape, xb.shape)
test_eq(TSFreqDenoise()(xb, split_idx=0), xb) test_ne(TSFreqDenoise()(xb, split_idx
TSRandomFreqNoise
TSRandomFreqNoise (magnitude=0.1, ex=None, wavelet='db4', level=2, mode='constant', **kwargs)
Applys random noise using a wavelet decomposition method
if 'pywt' in dir():
=0).shape, xb.shape) test_eq(TSRandomFreqNoise()(xb, split_idx
TSRandomResizedLookBack
TSRandomResizedLookBack (magnitude=0.1, mode='nearest', **kwargs)
Selects a random number of sequence steps starting from the end and return an output of the same shape
if test_interpolate('nearest'):
for i in range(100):
= TSRandomResizedLookBack()(xb, split_idx=0)
o -1], xb.shape[-1]) test_eq(o.shape[
TSRandomLookBackOut
TSRandomLookBackOut (magnitude=0.1, **kwargs)
Selects a random number of sequence steps starting from the end and set them to zero
for i in range(100):
= TSRandomLookBackOut()(xb, split_idx=0)
o -1], xb.shape[-1]) test_eq(o.shape[
TSVarOut
TSVarOut (magnitude=0.05, ex=None, **kwargs)
Set the value of a random number of variables to zero
=0).shape, xb.shape) test_eq(TSVarOut()(xb, split_idx
TSCutOut
TSCutOut (magnitude=0.05, ex=None, **kwargs)
Sets a random section of the sequence to zero
=0).shape, xb.shape) test_eq(TSCutOut()(xb, split_idx
TSTimeStepOut
TSTimeStepOut (magnitude=0.05, ex=None, **kwargs)
Sets random sequence steps to zero
=0).shape, xb.shape) test_eq(TSTimeStepOut()(xb, split_idx
TSRandomCropPad
TSRandomCropPad (magnitude=0.05, ex=None, **kwargs)
Crops a section of the sequence of a random length
=0).shape, xb.shape) test_eq(TSRandomCropPad()(xb, split_idx
TSMaskOut
TSMaskOut (magnitude=0.1, compensate:bool=False, ex=None, **kwargs)
Applies a random mask
=0).shape, xb.shape)
test_eq(TSMaskOut()(xb, split_idx=0), xb) test_ne(TSMaskOut()(xb, split_idx
TSInputDropout
TSInputDropout (magnitude=0.0, ex=None, **kwargs)
Applies input dropout with required_grad=False
.1)(xb, split_idx=0).shape, xb.shape)
test_eq(TSInputDropout(.1)(xb, split_idx=0), xb) test_ne(TSInputDropout(
TSTranslateX
TSTranslateX (magnitude=0.1, ex=None, **kwargs)
Moves a selected sequence window a random number of steps
=0).shape, xb.shape) test_eq(TSTranslateX()(xb, split_idx
TSRandomShift
TSRandomShift (magnitude=0.02, ex=None, **kwargs)
Shifts and splits a sequence
=0).shape, xb.shape) test_eq(TSRandomShift()(xb, split_idx
TSHorizontalFlip
TSHorizontalFlip (magnitude=1.0, ex=None, **kwargs)
Flips the sequence along the x-axis
=0).shape, xb.shape)
test_eq(TSHorizontalFlip()(xb, split_idx=0), xb) test_ne(TSHorizontalFlip()(xb, split_idx
TSRandomTrend
TSRandomTrend (magnitude=0.1, ex=None, **kwargs)
Randomly rotates the sequence along the z-axis
=0).shape, xb.shape) test_eq(TSRandomTrend()(xb, split_idx
TSVerticalFlip
TSVerticalFlip (magnitude=1.0, ex=None, **kwargs)
Applies a negative value to the time sequence
=0).shape, xb.shape)
test_eq(TSVerticalFlip()(xb, split_idx=0), xb) test_ne(TSVerticalFlip()(xb, split_idx
TSResize
TSResize (magnitude=-0.5, size=None, ex=None, mode='nearest', **kwargs)
Resizes the sequence length of a time series
if test_interpolate('nearest'):
for sz in np.linspace(.2, 2, 10): test_eq(TSResize(sz)(xb, split_idx=0).shape[-1], int(round(xb.shape[-1]*(1+sz))))
1)(xb, split_idx=0), xb) test_ne(TSResize(
TSRandomSize
TSRandomSize (magnitude=0.1, ex=None, mode='nearest', **kwargs)
Randomly resizes the sequence length of a time series
if test_interpolate('nearest'):
= []
seq_len_ for i in range(100):
= TSRandomSize(.5)(xb, split_idx=0)
o -1])
seq_len_.append(o.shape[min(seq_len_), xb.shape[-1])
test_lt(max(seq_len_), xb.shape[-1]) test_gt(
TSRandomLowRes
TSRandomLowRes (magnitude=0.5, ex=None, mode='nearest', **kwargs)
Randomly resizes the sequence length of a time series to a lower resolution
TSDownUpScale
TSDownUpScale (magnitude=0.5, ex=None, mode='nearest', **kwargs)
Downscales a time series and upscales it again to previous sequence length
if test_interpolate('nearest'):
=0).shape, xb.shape) test_eq(TSDownUpScale()(xb, split_idx
TSRandomDownUpScale
TSRandomDownUpScale (magnitude=0.5, ex=None, mode='nearest', **kwargs)
Randomly downscales a time series and upscales it again to previous sequence length
if test_interpolate('nearest'):
=0).shape, xb.shape)
test_eq(TSRandomDownUpScale()(xb, split_idx=0), xb)
test_ne(TSDownUpScale()(xb, split_idx=1), xb) test_eq(TSDownUpScale()(xb, split_idx
TSRandomConv
TSRandomConv (magnitude=0.05, ex=None, ks=[1, 3, 5, 7], **kwargs)
Applies a convolution with a random kernel and random weights with required_grad=False
for i in range(5):
= TSRandomConv(magnitude=0.05, ex=None, ks=[1, 3, 5, 7])(xb, split_idx=0)
o test_eq(o.shape, xb.shape)
TSRandom2Value
TSRandom2Value (magnitude=0.1, sel_vars=None, sel_steps=None, static=False, value=nan, **kwargs)
Randomly sets selected variables of type TSTensor
to predefined value (default: np.nan)
= TSTensor(torch.ones(2, 3, 10))
t =0.5, sel_vars=None, sel_steps=None, static=False, value=0)(t, split_idx=0).data TSRandom2Value(magnitude
tensor([[[0., 0., 1., 0., 1., 1., 0., 1., 1., 0.],
[1., 1., 0., 1., 1., 1., 1., 1., 1., 0.],
[1., 1., 1., 1., 1., 0., 0., 1., 1., 1.]],
[[1., 1., 1., 1., 1., 0., 1., 1., 0., 1.],
[0., 0., 0., 0., 0., 1., 0., 1., 0., 1.],
[0., 1., 0., 1., 0., 0., 0., 1., 0., 0.]]])
= TSTensor(torch.ones(2, 3, 10))
t =0.5, sel_vars=[1], sel_steps=slice(-5, None), static=False, value=0)(t, split_idx=0).data TSRandom2Value(magnitude
tensor([[[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 0., 1., 0., 0., 0.],
[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]],
[[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 0., 1., 0., 0., 0.],
[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]]])
= TSTensor(torch.ones(2, 3, 10))
t =0.5, sel_vars=[1], sel_steps=None, static=True, value=0)(t, split_idx=0).data TSRandom2Value(magnitude
tensor([[[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
[0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]],
[[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]]])
= TSTensor(torch.ones(2, 3, 10))
t =1, sel_vars=1, sel_steps=None, static=False, value=0)(t, split_idx=0).data TSRandom2Value(magnitude
tensor([[[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
[0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]],
[[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
[0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]]])
= TSTensor(torch.ones(2, 3, 10))
t =1, sel_vars=[1,2], sel_steps=None, static=False, value=0)(t, split_idx=0).data TSRandom2Value(magnitude
tensor([[[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
[0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]],
[[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
[0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]]])
= TSTensor(torch.ones(2, 3, 10))
t =1, sel_vars=1, sel_steps=[1,3,5], static=False, value=0)(t, split_idx=0).data TSRandom2Value(magnitude
tensor([[[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
[1., 0., 1., 0., 1., 0., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]],
[[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
[1., 0., 1., 0., 1., 0., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]]])
= TSTensor(torch.ones(2, 3, 10))
t =1, sel_vars=[1,2], sel_steps=[1,3,5], static=False, value=0)(t, split_idx=0).data TSRandom2Value(magnitude
tensor([[[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
[1., 0., 1., 0., 1., 0., 1., 1., 1., 1.],
[1., 0., 1., 0., 1., 0., 1., 1., 1., 1.]],
[[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
[1., 0., 1., 0., 1., 0., 1., 1., 1., 1.],
[1., 0., 1., 0., 1., 0., 1., 1., 1., 1.]]])
= TSTensor(torch.ones(2,3,4))
t =.5, sel_vars=[0,2])(t, split_idx=0).data TSRandom2Value(magnitude
tensor([[[1., nan, nan, 1.],
[1., 1., 1., 1.],
[1., nan, 1., 1.]],
[[nan, 1., 1., nan],
[1., 1., 1., 1.],
[nan, nan, 1., 1.]]])
= TSTensor(torch.ones(2,3,4))
t =.5, sel_steps=slice(2, None))(t, split_idx=0).data TSRandom2Value(magnitude
tensor([[[1., 1., 1., nan],
[1., 1., nan, 1.],
[1., 1., nan, nan]],
[[1., 1., nan, 1.],
[1., 1., nan, nan],
[1., 1., nan, 1.]]])
= TSTensor(torch.ones(2,3,100))
t =.5)(t, split_idx=0)).sum().item(), 0)
test_gt(np.isnan(TSRandom2Value(magnitude= TSTensor(torch.ones(2,3,100))
t =.5, sel_vars=[0,2])(t, split_idx=0)[:, [0,2]]).sum().item(), 0)
test_gt(np.isnan(TSRandom2Value(magnitude= TSTensor(torch.ones(2,3,100))
t =.5, sel_vars=[0,2])(t, split_idx=0)[:, 1]).sum().item(), 0) test_eq(np.isnan(TSRandom2Value(magnitude
TSMask2Value
TSMask2Value (mask_fn, value=nan, sel_vars=None, **kwargs)
Randomly sets selected variables of type TSTensor
to predefined value (default: np.nan)
= TSTensor(torch.ones(2,3,100))
t def _mask_fn(o, r=.15, value=np.nan):
return torch.rand_like(o) > (1-r)
=0)).sum().item(), 0) test_gt(np.isnan(TSMask2Value(_mask_fn)(t, split_idx
TSSelfDropout
TSSelfDropout (p:float=1.0, nm:str=None, before_call:callable=None, **kwargs)
Applies dropout to a tensor with nan values by rotating axis=0 inplace
Type | Default | Details | |
---|---|---|---|
p | float | 1.0 | Probability of applying Transform |
nm | str | None | |
before_call | callable | None | Optional batchwise preprocessing function |
kwargs |
self_mask
self_mask (o)
= TSTensor(torch.ones(2,3,100))
t = torch.rand_like(t) > .7
mask = np.nan
t[mask] = np.isnan(t).float().mean().item()
nan_perc = TSSelfDropout()(t, split_idx=0)
t2 float().mean().item(), nan_perc)
test_gt(torch.isnan(t2).float().mean().item() nan_perc, torch.isnan(t2).
(0.30000001192092896, 0.49000000953674316)
RandAugment
RandAugment (tfms:list, N:int=1, M:int=3, **kwargs)
A transform that before_call its state at each __call__
=5, M=10)(xb, split_idx=0), xb) test_ne(RandAugment(TSMagAddNoise, N
TestTfm
TestTfm (tfm, magnitude=1.0, ex=None, **kwargs)
Utility class to test the output of selected tfms during training
get_tfm_name
get_tfm_name (tfm)
==get_tfm_name((partial(TSMagScale()), 0.1, .05))==get_tfm_name(TSMagScale())==get_tfm_name((TSMagScale(), 0.1, .05)), True) test_eq(get_tfm_name(partial(TSMagScale()))
= [get_tfm_name(t) for t in all_TS_randaugs] all_TS_randaugs_names