merlion.utils package

This package contains various utilities, including the TimeSeries class and utilities for resampling time series.

Submodules

merlion.utils.conj_priors module

Implementations of Bayesian conjugate priors & their online update rules.

ConjPrior([sample])

Abstract base class for a Bayesian conjugate prior.

BetaBernoulli([sample])

Beta-Bernoulli conjugate prior for binary data.

NormInvGamma([sample])

Normal-InverseGamma conjugate prior.

MVNormInvWishart([sample])

Multivariate Normal-InverseWishart conjugate prior.

BayesianLinReg([sample])

Bayesian Ordinary Linear Regression conjugate prior, which models a univariate input as a function of time.

BayesianMVLinReg([sample])

Bayesian multivariate linear regression conjugate prior, which models a multivariate input as a function of time.

class merlion.utils.conj_priors.ConjPrior(sample=None)

Bases: ABC

Abstract base class for a Bayesian conjugate prior. Can be used with either TimeSeries or numpy arrays directly.

Parameters

sample – a sample used to initialize the prior.

to_dict()
classmethod from_dict(state_dict)
static get_time_series_values(x)
Return type

ndarray

Returns

numpy array representing the input x

process_time_series(x)
Return type

Tuple[ndarray, ndarray]

Returns

(t, x), where t is a normalized list of timestamps, and x is a numpy array representing the input

abstract posterior(x, return_rv=False, log=True, return_updated=False)

Predictive posterior (log) PDF for new observations, or the scipy.stats random variable where applicable.

Parameters
  • x – value(s) to evaluate posterior at (None implies that we want to return the random variable)

  • return_rv – whether to return the random variable directly

  • log – whether to return the log PDF (instead of the PDF)

  • return_updated – whether to return an updated version of the conjugate prior as well

abstract update(x)

Update the conjugate prior based on new observations x.

abstract forecast(time_stamps)

Return a posterior predictive interval for the time stamps given.

Parameters

time_stamps – a list of time stamps

Return type

Tuple[TimeSeries, TimeSeries]

Returns

(forecast, stderr), where forecast is the expected posterior value and stderr is the standard error of that forecast.

class merlion.utils.conj_priors.ScalarConjPrior(sample=None)

Bases: ConjPrior, ABC

Abstract base class for a Bayesian conjugate prior for a scalar random variable.

Parameters

sample – a sample used to initialize the prior.

process_time_series(x)
Returns

(t, x), where t is a normalized list of timestamps, and x is a numpy array representing the input

static get_time_series_values(x)
Return type

ndarray

Returns

numpy array representing the input x

class merlion.utils.conj_priors.BetaBernoulli(sample=None)

Bases: ScalarConjPrior

Beta-Bernoulli conjugate prior for binary data. We assume the model

\[\begin{split}\begin{align*} X &\sim \mathrm{Bernoulli}(\theta) \\ \theta &\sim \mathrm{Beta}(\alpha, \beta) \end{align*}\end{split}\]

The update rule for data \(x_1, \ldots, x_n\) is

\[\begin{split}\begin{align*} \alpha &= \alpha + \sum_{i=1}^{n} \mathbb{I}[x_i = 1] \\ \beta &= \beta + \sum_{i=1}^{n} \mathbb{I}[x_i = 0] \end{align*}\end{split}\]
Parameters

sample – a sample used to initialize the prior.

posterior(x, return_rv=False, log=True, return_updated=False)

The posterior distribution of x is \(\mathrm{Bernoulli}(\alpha / (\alpha + \beta))\).

theta_posterior(theta, return_rv=False, log=True)

The posterior distribution of \(\theta\) is \(\mathrm{Beta}(\alpha, \beta)\).

update(x)

Update the conjugate prior based on new observations x.

forecast(time_stamps)

Return a posterior predictive interval for the time stamps given.

Parameters

time_stamps – a list of time stamps

Return type

Tuple[TimeSeries, TimeSeries]

Returns

(forecast, stderr), where forecast is the expected posterior value and stderr is the standard error of that forecast.

class merlion.utils.conj_priors.NormInvGamma(sample=None)

Bases: ScalarConjPrior

Normal-InverseGamma conjugate prior. Following Wikipedia and Murphy (2007), we assume the model

\[\begin{split}\begin{align*} X &\sim \mathcal{N}(\mu, \sigma^2) \\ \mu &\sim \mathcal{N}(\mu_0, \sigma^2 / n) \\ \sigma^2 &\sim \mathrm{InvGamma}(\alpha, \beta) \end{align*}\end{split}\]

The update rule for data \(x_1, \ldots, x_n\) is

\[\begin{split}\begin{align*} \bar{x} &= \frac{1}{n} \sum_{i = 1}^{n} x_i \\ \alpha &= \alpha + n/2 \\ \beta &= \beta + \frac{1}{2} \sum_{i = 1}^{n} (x_i - \bar{x})^2 + \frac{1}{2} (\mu_0 - \bar{x})^2 \\ \mu_0 &= \frac{n_0}{n_0 + n} \mu_0 + \frac{n}{n_0 + n} \bar{x} \\ n_0 &= n_0 + n \end{align*}\end{split}\]
Parameters

sample – a sample used to initialize the prior.

update(x)

Update the conjugate prior based on new observations x.

mu_posterior(mu, return_rv=False, log=True)

The posterior for \(\mu\) is \(\text{Student-t}_{2\alpha}(\mu_0, \beta / (n \alpha))\)

sigma2_posterior(sigma2, return_rv=False, log=True)

The posterior for \(\sigma^2\) is \(\text{InvGamma}(\alpha, \beta)\).

posterior(x, log=True, return_rv=False, return_updated=False)

The posterior for \(x\) is \(\text{Student-t}_{2\alpha}(\mu_0, (n+1) \beta / (n \alpha))\)

forecast(time_stamps)

Return a posterior predictive interval for the time stamps given.

Parameters

time_stamps – a list of time stamps

Return type

Tuple[TimeSeries, TimeSeries]

Returns

(forecast, stderr), where forecast is the expected posterior value and stderr is the standard error of that forecast.

class merlion.utils.conj_priors.MVNormInvWishart(sample=None)

Bases: ConjPrior

Multivariate Normal-InverseWishart conjugate prior. Multivariate equivalent of Normal-InverseGamma. Following Murphy (2007), we assume the model

\[\begin{split}\begin{align*} X &\sim \mathcal{N}_d(\mu, \Sigma) \\ \mu &\sim \mathcal{N}_d(\mu_0, \Sigma / n) \\ \Sigma &\sim \mathrm{InvWishart}_{\nu}(\Lambda) \end{align*}\end{split}\]

The update rule for data \(x_1, \ldots, x_n\) is

\[\begin{split}\begin{align*} \bar{x} &= \frac{1}{n} \sum_{i = 1}^{n} x_i \\ \nu &= \nu + n/2 \\ \Lambda &= \Lambda + \frac{n_0 n}{n_0 + n} (\mu_0 - \bar{x}) (\mu_0 - \bar{x})^T + \sum_{i = 1}^{n} (x_i - \bar{x}) (x_i - \bar{x})^T \\ \mu_0 &= \frac{n_0}{n_0 + n} \mu_0 + \frac{n}{n_0 + n} \bar{x} \\ n_0 &= n_0 + n \end{align*}\end{split}\]
Parameters

sample – a sample used to initialize the prior.

process_time_series(x)
Returns

(t, x), where t is a normalized list of timestamps, and x is a numpy array representing the input

update(x)

Update the conjugate prior based on new observations x.

mu_posterior(mu, return_rv=False, log=True)

The posterior for \(\mu\) is \(\text{Student-t}_{\nu-d+1}(\mu_0, \Lambda / (n (\nu - d + 1)))\)

Sigma_posterior(sigma2, return_rv=False, log=True)

The posterior for \(\Sigma\) is \(\text{InvWishart}_{\nu}(\Lambda^{-1})\)

posterior(x, return_rv=False, log=True, return_updated=False)

The posterior for \(x\) is \(\text{Student-t}_{\nu-d+1}(\mu_0, (n + 1) \Lambda / (n (\nu - d + 1)))\)

forecast(time_stamps, name='forecast')

Return a posterior predictive interval for the time stamps given.

Parameters

time_stamps – a list of time stamps

Return type

Tuple[TimeSeries, TimeSeries]

Returns

(forecast, stderr), where forecast is the expected posterior value and stderr is the standard error of that forecast.

class merlion.utils.conj_priors.BayesianLinReg(sample=None)

Bases: ConjPrior

Bayesian Ordinary Linear Regression conjugate prior, which models a univariate input as a function of time. Following Wikipedia, we assume the model

\[\begin{split}\begin{align*} x(t) &\sim \mathcal{N}(m t + b, \sigma^2) \\ w &\sim \mathcal{N}((m_0, b_0), \sigma^2 \Lambda_0^{-1}) \\ \sigma^2 &\sim \mathrm{InvGamma}(\alpha, \beta) \end{align*}\end{split}\]

Consider new data \((t_1, x_1), \ldots, (t_n, x_n)\). Let \(T \in \mathbb{R}^{n \times 2}\) be the matrix obtained by stacking the row vector of times with an all-ones row vector. Let \(w = (m, b) \in \mathbb{R}^{2}\) be the full weight vector. Let \(x \in \mathbb{R}^{n}\) denote all observed values. Then we have the update rule

\[\begin{split}\begin{align*} w_{OLS} &= (T^T T)^{-1} T^T x \\ \Lambda_n &= \Lambda_0 + T^T T \\ w_n &= (\Lambda_0 + T^T T)^{-1} (\Lambda_0 w_0 + T^T T w_{OLS}) \\ \alpha_n &= \alpha_0 + n / 2 \\ \beta_n &= \beta_0 + \frac{1}{2}(x^T x + w_0^T \Lambda_0 w_0 - w_n^T \Lambda_n w_n) \end{align*}\end{split}\]
Parameters

sample – a sample used to initialize the prior.

update(x)

Update the conjugate prior based on new observations x.

posterior_explicit(x, return_rv=False, log=True, return_updated=False)

Let \(\Lambda_n, \alpha_n, \beta_n\) be the posterior values obtained by updating the model on data \((t_1, x_1), \ldots, (t_n, x_n)\). The predictive posterior has PDF

\[\begin{align*} P((t, x)) &= \frac{1}{(2 \pi)^{-n/2}} \sqrt{\frac{\det \Lambda_0}{\det \Lambda_n}} \frac{\beta_0^{\alpha_0}}{\beta_n^{\alpha_n}}\frac{\Gamma(\alpha_n)}{\Gamma(\alpha_0)} \end{align*}\]
posterior(x, return_rv=False, log=True, return_updated=False)

Naive computation of the posterior using Bayes Rule, i.e.

\[\begin{split}\hat{\sigma}^2 &= \mathbb{E}[\sigma^2] \\ \hat{w} &= \mathbb{E}[w \mid \sigma^2 = \hat{\sigma}^2] \\ p(x \mid t) &= \frac{ p(w = \hat{w}, \sigma^2 = \hat{\sigma}^2) p(x \mid t, w = \hat{w}, \sigma^2 = \hat{\sigma}^2)}{ p(w = \hat{w}, \sigma^2 = \hat{\sigma}^2 \mid x, t)}\end{split}\]
forecast(time_stamps)

Return a posterior predictive interval for the time stamps given.

Parameters

time_stamps – a list of time stamps

Return type

Tuple[TimeSeries, TimeSeries]

Returns

(forecast, stderr), where forecast is the expected posterior value and stderr is the standard error of that forecast.

class merlion.utils.conj_priors.BayesianMVLinReg(sample=None)

Bases: ConjPrior

Bayesian multivariate linear regression conjugate prior, which models a multivariate input as a function of time. Following Wikipedia and Geisser (1965), we assume the model

\[\begin{split}\begin{align*} X(t) &\sim \mathcal{N}_{d}(m t + b, \Sigma) \\ (m, b) &\sim \mathcal{N}_{2d}((m_0, b_0), \Sigma \otimes \Lambda_0^{-1}) \\ \Sigma &\sim \mathrm{InvWishart}_{\nu}(V_0) \\ \end{align*}\end{split}\]

where \((m, b)\) is the concatenation of the vectors \(m\) and \(b\), \(\Lambda_0 \in \mathbb{R}^{2 \times 2}\), and \(\otimes\) is the Kronecker product. Consider new data \((t_1, x_1), \ldots, (t_n, x_n)\). Let \(T \in \mathbb{R}^{n \times 2}\) be the matrix obtained by stacking the row vector of times with an all-ones row vector. Let \(W = [m, b]^T \in \mathbb{R}^{2 \times d}\) be the full weight matrix. Let \(X \in \mathbb{R}^{n \times d}\) be the matrix of observed \(x\) values. Then we have the update rule

\[\begin{split}\begin{align*} \nu_n &= \nu_0 + n \\ W_n &= (\Lambda_0 + T^T T)^{-1}(\Lambda_0 W_0 + T^T X) \\ V_n &= V_0 + (X - TW_n)^T (X - TW_n) + (W_n - W_0)^T \Lambda_0 (W_n - W_0) \\ \Lambda_n &= \Lambda_0 + T^T T \\ \end{align*}\end{split}\]
Parameters

sample – a sample used to initialize the prior.

process_time_series(x)
Returns

(t, x), where t is a normalized list of timestamps, and x is a numpy array representing the input

update(x)

Update the conjugate prior based on new observations x.

posterior_explicit(x, return_rv=False, log=True, return_updated=False)

Let \(\Lambda_n, \nu_n, V_n\) be the posterior values obtained by updating the model on data \((t_1, x_1), \ldots, (t_n, x_n)\). The predictive posterior has PDF

\[\begin{align*} P((t, x)) &= \frac{1}{(2 \pi)^{-nd/2}} \sqrt{\frac{\det \Lambda_0}{\det \Lambda_n}} \frac{\det(V_0/2)^{\nu_0/2}}{\det(V_n/2)^{\nu_n/2}}\frac{\Gamma_d(\nu_n/2)}{\Gamma_d(\nu_0 / 2)} \end{align*}\]
posterior(x, return_rv=False, log=True, return_updated=False)

Naive computation of the posterior using Bayes Rule, i.e.

\[\begin{split}\hat{\Sigma} &= \mathbb{E}[\Sigma] \\ \hat{W} &= \mathbb{E}[W \mid \Sigma = \hat{\Sigma}] \\ p(X \mid t) &= \frac{ p(W = \hat{W}, \Sigma = \hat{\Sigma}) p(X \mid t, W = \hat{W}, \Sigma = \hat{\Sigma})}{ p(W = \hat{W}, \Sigma = \hat{\Sigma} \mid x, t)}\end{split}\]
forecast(time_stamps)

Return a posterior predictive interval for the time stamps given.

Parameters

time_stamps – a list of time stamps

Return type

Tuple[TimeSeries, TimeSeries]

Returns

(forecast, stderr), where forecast is the expected posterior value and stderr is the standard error of that forecast.

merlion.utils.istat module

class merlion.utils.istat.IStat(value=None, n=0)

Bases: object

An abstract base class for computing various statistics incrementally, with emphasis on recency-weighted variants.

Parameters
  • value (Optional[float]) – Initial value of the statistic. Defaults to None.

  • n (int) – Initial sample size. Defaults to 0.

property n
property value
abstract add(x)

Add a new value to update the statistic. :param x: new value to add to the sample.

abstract drop(x)

Drop a value to update the statistic. :param x: value to drop from the sample.

add_batch(batch)

Add a batch of new values to update the statistic. :type batch: List[float] :param batch: new values to add to the sample.

drop_batch(batch)

Drop a batch of new values to update the statistic. :type batch: List[float] :param batch: new values to add to the sample.

class merlion.utils.istat.Mean(value=None, n=0)

Bases: IStat

Class for incrementally computing the mean of a series of numbers.

Parameters
  • value (Optional[float]) – Initial value of the statistic. Defaults to None.

  • n (int) – Initial sample size. Defaults to 0.

property value
add(x)

Add a new value to update the statistic. :param x: new value to add to the sample.

drop(x)

Drop a value to update the statistic. :param x: value to drop from the sample.

class merlion.utils.istat.Variance(ex_value=None, ex2_value=None, n=0, ddof=1)

Bases: IStat

Class for incrementally computing the variance of a series of numbers.

Parameters
  • ex_value (Optional[float]) – Initial value of the first moment (mean).

  • ex2_value (Optional[float]) – Initial value of the second moment.

  • n (int) – Initial sample size.

  • ddof (int) – The delta degrees of freedom to use when correcting the estimate of the variance.

\[\text{Var}(x_i) = \text{E}(x_i^2) - \text{E}(x_i)^2\]
mean_class

alias of Mean

add(x)

Add a new value to update the statistic. :param x: new value to add to the sample.

drop(x)

Drop a value to update the statistic. :param x: value to drop from the sample.

property true_value
property corrected_value
property value
property sd
property se
class merlion.utils.istat.ExponentialMovingAverage(recency_weight=0.1, **kwargs)

Bases: Mean

Class for incrementally computing the exponential moving average of a series of numbers.

Parameters

recency_weight (float) – Recency weight to use when updating the exponential moving average.

Letting w be the recency weight,

\[\begin{split}\begin{align*} \text{EMA}_w(x_0) & = x_0 \\ \text{EMA}_w(x_t) & = w \cdot x_t + (1-w) \cdot \text{EMA}_w(x_{t-1}) \end{align*}\end{split}\]
property recency_weight
property value
drop(x)

Exponential Moving Average does not support dropping values

class merlion.utils.istat.RecencyWeightedVariance(recency_weight, **kwargs)

Bases: Variance

Class for incrementally computing the recency-weighted variance of a series of numbers.

Parameters

recency_weight (float) – Recency weight to use when updating the recency weighted variance.

Letting w be the recency weight,

\[\text{RWV}_w(x_t) = \text{EMA}_w({x^2_t}) - \text{EMA}_w(x_t)^2\]
mean_class

alias of ExponentialMovingAverage

property recency_weight
drop(x)

Recency Weighted Variance does not support dropping values

merlion.utils.misc module

class merlion.utils.misc.AutodocABCMeta(classname, bases, cls_dict)

Bases: ABCMeta

Metaclass used to ensure that inherited members of an abstract base class also inherit docstrings for inherited methods.

class merlion.utils.misc.ValIterOrderedDict

Bases: OrderedDict

OrderedDict whose iterator goes over self.values() instead of self.keys().

merlion.utils.misc.dynamic_import(import_path, alias=None)

Dynamically import a member from the specified module.

Parameters
  • import_path (str) – syntax ‘module_name:member_name’, e.g. ‘merlion.transform.normalize:PowerTransform’

  • alias (Optional[dict]) – dict which maps shortcuts for the registered classes, to their full import paths.

Returns

imported class

merlion.utils.misc.initializer(func)

Decorator for the __init__ method. Automatically assigns the parameters.

class merlion.utils.misc.ProgressBar(total, length=40, decimals=1, fill='█')

Bases: object

Parameters
  • total (int) – total iterations

  • length (int) – character length of bar

  • decimals (int) – positive number of decimals in percent complete

  • fill (str) – bar fill character

print(iteration, prefix, suffix, end='')
Parameters
  • iteration – current iteration

  • prefix – prefix string

  • suffix – suffix string

  • end – end character (e.g. "\r", "\r\n")

merlion.utils.resample module

class merlion.utils.resample.AlignPolicy(value)

Bases: Enum

Policies for aligning multiple univariate time series.

OuterJoin = 0
InnerJoin = 1
FixedReference = 2
FixedGranularity = 3
class merlion.utils.resample.AggregationPolicy(value)

Bases: Enum

Aggregation policies. Values are partial functions for pandas.core.resample.Resampler methods.

Mean = functools.partial(<function AggregationPolicy.<lambda>>)
Sum = functools.partial(<function AggregationPolicy.<lambda>>)
Median = functools.partial(<function AggregationPolicy.<lambda>>)
First = functools.partial(<function AggregationPolicy.<lambda>>)
Last = functools.partial(<function AggregationPolicy.<lambda>>)
Min = functools.partial(<function AggregationPolicy.<lambda>>)
Max = functools.partial(<function AggregationPolicy.<lambda>>)
class merlion.utils.resample.MissingValuePolicy(value)

Bases: Enum

Missing value imputation policies. Values are partial functions for pd.Series methods.

FFill = functools.partial(<function MissingValuePolicy.<lambda>>)

Fill gap with the first value before the gap.

BFill = functools.partial(<function MissingValuePolicy.<lambda>>)

Fill gap with the first value after the gap.

Nearest = functools.partial(<function MissingValuePolicy.<lambda>>, method='nearest')

Replace missing value with the value closest to it.

Interpolate = functools.partial(<function MissingValuePolicy.<lambda>>, method='time')

Fill in missing values by linear interpolation.

merlion.utils.resample.to_pd_datetime(timestamp)

Converts a timestamp (or list/iterable of timestamps) to pandas Datetime, truncated at the millisecond.

merlion.utils.resample.to_timestamp(t)

Converts a datetime to a Unix timestamp.

merlion.utils.resample.granularity_str_to_seconds(granularity)

Converts a string/float/int granularity (representing a timedelta) to the number of seconds it represents, truncated at the millisecond.

Return type

Optional[float]

merlion.utils.resample.get_gcd_timedelta(*time_stamp_lists)

Calculates all timedeltas present in any of the lists of time stamps given, and returns the GCD of all these timedeltas (up to units of milliseconds).

merlion.utils.resample.infer_granularity(time_stamps)

Infers the granularity of a list of time stamps

merlion.utils.resample.reindex_df(df, reference, missing_value_policy)

Reindexes a Datetime-indexed dataframe df to have the same time stamps as a reference sequence of timestamps. Imputes missing values with the given MissingValuePolicy.

merlion.utils.time_series module

class merlion.utils.time_series.UnivariateTimeSeries(time_stamps, values, name=None, freq='1h')

Bases: Series

Please read the tutorial before reading this API doc. This class is a time-indexed pd.Series which represents a univariate time series. For the most part, it supports all the same features as pd.Series, with the following key differences to iteration and indexing:

  1. Iterating over a UnivariateTimeSeries is implemented as

    for timestamp, value in univariate:
        # do stuff...
    

    where timestamp is a Unix timestamp, and value is the corresponding time series value.

  2. Integer index: u[i] yields the tuple (u.time_stamps[i], u.values[i])

  3. Slice index: u[i:j:k] yields a new UnivariateTimeSeries(u.time_stamps[i:j:k], u.values[i:j:k])

The class also supports the following additional features:

  1. univariate.time_stamps returns the list of Unix timestamps, and univariate.values returns the list of the time series values. You may access the pd.DatetimeIndex directly with univariate.index (or its np.ndarray representation with univariate.np_time_stamps), and the np.ndarray of values with univariate.np_values.

  2. univariate.concat(other) will concatenate the UnivariateTimeSeries other to the right end of univariate.

  3. left, right = univariate.bisect(t) will split the univariate at the given timestamp t.

  4. window = univariate.window(t0, tf) will return the subset of the time series occurring between timestamps t0 (inclusive) and tf (non-inclusive)

  5. series = univariate.to_pd() will convert the UnivariateTimeSeries into a regular pd.Series (for compatibility).

  6. univariate = UnivariateTimeSeries.from_pd(series) uses a time-indexed pd.Series to create a UnivariateTimeSeries object directly.

__getitem__(i)
Parameters

i (Union[int, slice]) – integer index or slice

Return type

Union[Tuple[float, float], UnivariateTimeSeries]

Returns

(self.time_stamps[i], self.values[i]) if i is an integer. UnivariateTimeSeries(self.time_series[i], self.values[i]) if i is a slice.

__iter__()

The i’th item in the iterator is the tuple (self.time_stamps[i], self.values[i]).

Parameters
  • time_stamps (Optional[Sequence[Union[int, float]]]) – a sequence of Unix timestamps. You may specify None if you only have values with no specific time stamps.

  • values (Sequence[float]) – a sequence of univariate values, where values[i] occurs at time time_stamps[i]

  • name (Optional[str]) – the name of the univariate time series

  • freq – if time_stamps is not provided, the univariate is assumed to be sampled at frequency freq. freq may be a string (e.g. "1h"), timedelta, or int/float (in units of seconds).

property np_time_stamps
Return type

np.ndarray

Returns

the numpy representation of this time series’s Unix timestamps

property np_values
Return type

np.ndarray

Returns

the numpy representation of this time series’s values

property time_stamps
Return type

List[float]

Returns

the list of Unix timestamps for the time series

property values
Return type

List[float]

Returns

the list of values for the time series.

property t0
Return type

float

Returns

the first timestamp in the univariate time series.

property tf
Return type

float

Returns

the final timestamp in the univariate time series.

is_empty()
Return type

bool

Returns

True if the univariate is empty, False if not.

copy(deep=True)

Copies the UnivariateTimeSeries. Simply a wrapper around the pd.Series.copy() method.

concat(other)

Concatenates the UnivariateTimeSeries other to the right of this one. :param UnivariateTimeSeries other: another UnivariateTimeSeries :rtype: UnivariateTimeSeries :return: concatenated univariate time series

bisect(t, t_in_left=False)

Splits the time series at the point where the given timestamp occurs.

Parameters
  • t (float) – a Unix timestamp or datetime object. Everything before time t is in the left split, and everything after time t is in the right split.

  • t_in_left (bool) – if True, t is in the left split. Otherwise, t is in the right split.

Return type

Tuple[UnivariateTimeSeries, UnivariateTimeSeries]

Returns

the left and right splits of the time series.

window(t0, tf, include_tf=False)
Parameters
  • t0 (float) – The timestamp/datetime at the start of the window (inclusive)

  • tf (float) – The timestamp/datetime at the end of the window (inclusive if include_tf is True, non-inclusive otherwise)

  • include_tf (bool) – Whether to include tf in the window.

Return type

UnivariateTimeSeries

Returns

The subset of the time series occurring between timestamps t0 (inclusive) and tf (included if include_tf is True, excluded otherwise).

to_dict()
Return type

Dict[float, float]

Returns

A dictionary representing the data points in the time series.

classmethod from_dict(obj, name=None)
Parameters
  • obj (Dict[float, float]) – A dictionary of timestamp - value pairs

  • name – the name to assign the output

Return type

UnivariateTimeSeries

Returns

the UnivariateTimeSeries represented by series.

to_pd()
Return type

Series

Returns

A pandas Series representing the time series, indexed by time.

classmethod from_pd(series, name=None, freq='1h')
Parameters
  • series (Series) – a pd.Series. If it has a``pd.DatetimeIndex``, we will use that index for the timestamps. Otherwise, we will create one at the specified frequency.

  • name – the name to assign the output

  • freq – if series is not indexed by time, this is the frequency at which we will assume it is sampled.

Return type

UnivariateTimeSeries

Returns

the UnivariateTimeSeries represented by series.

to_ts()
Return type

TimeSeries

Returns

A TimeSeries representing this univariate time series.

classmethod empty(name=None)
Return type

UnivariateTimeSeries

Returns

A Merlion UnivariateTimeSeries that has empty timestamps and values.

class merlion.utils.time_series.TimeSeries(univariates, *, check_aligned=True)

Bases: object

Please read the tutorial before reading this API doc. This class represents a general multivariate time series as a wrapper around a number of (optionally named) UnivariateTimeSeries. A TimeSeries object is initialized as time_series = TimeSeries(univariates), where univariates is either a list of UnivariateTimeSeries, or a dictionary mapping string names to their corresponding UnivariateTimeSeries objects.

Because the individual univariates need not be sampled at the same times, an important concept for TimeSeries is alignment. We say that a TimeSeries is aligned if all of its univariates have observations sampled at the exact set set of times.

One may access the UnivariateTimeSeries comprising this TimeSeries in four ways:

  1. Iterate over the individual univariates using

    for var in time_series.univariates:
        # do stuff with each UnivariateTimeSeries var
    
  2. Access an individual UnivariateTimeSeries by name as time_series.univariates[name]. If you supplied unnamed univariates to the constructor (i.e. using a list), the name of a univariate will just be its index in that list.

  3. Get the list of each univariate’s name with time_series.names.

  4. Iterate over named univariates as

    for name, var in time_series.items():
        # do stuff
    

    Note that this is equivalent to iterating over zip(time_series.names, time_series.univariates).

This class supports the following additional features as well:

  1. Interoperability with pandas

    • df = time_series.to_pd() yields a time-indexed pd.DataFrame, where each column (with the appropriate name) corresponds to a variable. Missing values are NaN.

    • time_series = TimeSeries.from_pd(df) takes a time-indexed pd.DataFrame and returns a corresponding TimeSeries object (missing values are handled appropriately). The order of time_series.univariates is the order of df.keys().

  2. Automated alignment: aligned = time_series.align() resamples each of time_series.univariates so that they all have the same timestamps. By default, this is done by taking the union of all timestamps present in any individual univariate time series, and imputing missing values via interpolation. See the method documentation for details on how you may configure the alignment policy.

  3. Transparent indexing and iteration for TimeSeries which have all univariates aligned (i.e. they all have the same timestamps)

    • Get the length and shape of the time series (equal to the number of observations in each individual univariate). Note that if the time series is not aligned, we will return the length/shape of an equivalent pandas dataframe and emit a warning.

    • Index time_series[i] = (times[i], (x1[i], ..., xn[i])) (assuming time_series has n aligned univariates with timestamps times, and xk = time_series.univariates[k-1].values). Slice returns a TimeSeries object and works as one would expect.

    • Assuming time_series has n variables, you may iterate with

      for t_i, (x1_i, ..., xn_i) in time_series:
          # do stuff
      

      Notably, this lets you call times, val_vectors = zip(*time_series)

  4. Time-based queries for any time series

    • Get the two sub TimeSeries before and after a timestamp t via left, right = time_series.bisect(t)

    • Get the sub TimeSeries between timestamps t0 (inclusive) and tf (non-inclusive) via window = time_series.window(t0, tf)

  5. Concatenation: two TimeSeries may be concatenated (in time) as time_series = time_series_1 + time_series_2.

__getitem__(i)

Only supported if all individual variable time series are sampled at the same time stamps.

Parameters

i (Union[int, slice]) – integer index or slice.

Return type

Union[Tuple[float, Tuple[float]], TimeSeries]

Returns

If i is an integer, returns the tuple (time_stamps[i], tuple(var.values[i] for var in self.univariates)). If i is a slice, returns the time series TimeSeries([var[i] for var in self.univariates])

__iter__()

Only supported if all individual variable time series are sampled at the same time stamps. The i’th item of the iterator is the tuple (time_stamps[i], tuple(var.values[i] for var in self.univariates)).

property names
Returns

The list of the names of the univariates.

items()
Returns

Iterator over (name, univariate) tuples.

property dim: int
Return type

int

Returns

The dimension of the time series (the number of variables).

property is_aligned: bool
Return type

bool

Returns

Whether all individual variable time series are sampled at the same time stamps, i.e. they are aligned.

property np_time_stamps
Return type

np.ndarray

Returns

the numpy representation of this time series’s Unix timestamps

property time_stamps
Return type

List[float]

Returns

the list of Unix timestamps for the time series

property t0: float
Return type

float

Returns

the first timestamp in the time series.

property tf: float
Return type

float

Returns

the final timestamp in the time series.

is_empty()
Return type

bool

Returns

whether the time series is empty

squeeze()
Return type

UnivariateTimeSeries

Returns

a UnivariateTimeSeries if the time series only has one univariate, otherwise returns itself, a TimeSeries

property shape: Tuple[int, int]
Return type

Tuple[int, int]

Returns

the shape of this time series, i.e. (self.dim, len(self))

bisect(t, t_in_left=False)

Splits the time series at the point where the given timestap t occurs.

Parameters
  • t (float) – a Unix timestamp or datetime object. Everything before time t is in the left split, and everything after time t is in the right split.

  • t_in_left (bool) – if True, t is in the left split. Otherwise, t is in the right split.

Return type

Tuple[TimeSeries, TimeSeries]

Returns

the left and right splits of the time series.

window(t0, tf, include_tf=False)
Parameters
  • t0 (float) – The timestamp/datetime at the start of the window (inclusive)

  • tf (float) – The timestamp/datetime at the end of the window (inclusive if include_tf is True, non-inclusive otherwise)

  • include_tf (bool) – Whether to include tf in the window.

Returns

The subset of the time series occurring between timestamps t0 (inclusive) and tf (included if include_tf is True, excluded otherwise).

Return type

TimeSeries

to_pd()
Return type

DataFrame

Returns

A pandas DataFrame (indexed by time) which represents this time series. Each variable corresponds to a column of the DataFrame. Timestamps which are present for one variable but not another, are represented with NaN.

classmethod from_pd(df, check_times=True, freq='1h')
Parameters
  • df (Union[Series, DataFrame, ndarray]) – A pandas DataFrame with a DatetimeIndex. Each column corresponds to a different variable of the time series, and the key of column (in sorted order) give the relative order of those variables (in the list self.univariates). Missing values should be represented with NaN. May also be a pandas Series for univariate time series.

  • check_times – whether to check that all times in the index are unique (up to the millisecond) and sorted.

  • freq – if df is not indexed by time, this is the frequency at which we will assume it is sampled.

Return type

TimeSeries

Returns

the TimeSeries object corresponding to df.

classmethod from_ts_list(ts_list, *, check_aligned=True)
Parameters
  • ts_list (Iterable[TimeSeries]) – iterable of time series we wish to form a multivariate time series with

  • check_aligned (bool) – whether to check if the output time series is aligned

Return type

TimeSeries

Returns

A multivariate TimeSeries created from all the time series in the inputs.

align(*, reference=None, granularity=None, origin=None, remove_non_overlapping=True, alignment_policy=None, aggregation_policy=AggregationPolicy.Mean, missing_value_policy=MissingValuePolicy.Interpolate)

Aligns all the univariate time series comprising this multivariate time series so that they all have the same time stamps.

Parameters
  • reference (Optional[Sequence[Union[int, float]]]) – A specific set of timestamps we want the resampled time series to contain. Required if alignment_policy is AlignPolicy.FixedReference. Overrides other alignment policies if specified.

  • granularity (Union[str, int, float, None]) – The granularity (in seconds) of the resampled time time series. Defaults to the GCD time difference between adjacent elements of reference (when available) or time_series (otherwise). Ignored if reference is given or alignment_policy is AlignPolicy.FixedReference. Overrides other alignment policies if specified.

  • origin (Optional[int]) – The first timestamp of the resampled time series. Only used if the alignment policy is AlignPolicy.FixedGranularity.

  • remove_non_overlapping – If True, we will only keep the portions of the univariates that overlap with each other. For example, if we have 3 univariates which span timestamps [0, 3600], [60, 3660], and [30, 3540], we will only keep timestamps in the range [60, 3540]. If False, we will keep all timestamps produced by the resampling.

  • alignment_policy (Optional[AlignPolicy]) –

    The policy we want to use to align the time time series.

    • AlignPolicy.FixedReference aligns each single-variable time series to reference, a user-specified sequence of timestamps.

    • AlignPolicy.FixedGranularity resamples each single-variable time series at the same granularity, aggregating windows and imputing missing values as desired.

    • AlignPolicy.OuterJoin returns a time series with the union of all timestamps present in any single-variable time series.

    • AlignPolicy.InnerJoin returns a time series with the intersection of all timestamps present in all single-variable time series.

  • aggregation_policy (AggregationPolicy) – The policy used to aggregate windows of adjacent observations when downsampling.

  • missing_value_policy (MissingValuePolicy) – The policy used to impute missing values created when upsampling.

Return type

TimeSeries

Returns

The resampled multivariate time series.

merlion.utils.time_series.ts_csv_load(file_name, ms=True, n_vars=None)
Parameters
  • file_name (str) – a csv file starting with the field timestamp followed by all the all variable names.

  • ms – whether the timestamps are in milliseconds (rather than seconds)

Return type

TimeSeries

Returns

A merlion TimeSeries object.

merlion.utils.time_series.ts_to_csv(time_series, file_name)
Parameters
  • time_series (TimeSeries) – the TimeSeries object to write to a csv.

  • file_name (str) – the name to assign the csv file.

merlion.utils.time_series.assert_equal_timedeltas(time_series, timedelta=None)

Checks that all time deltas in the time series are equal, either to each other, or a pre-specified timedelta (in seconds).