merlion.utils package
This package contains various utilities, including the TimeSeries
class and
utilities for resampling time series.
Submodules
merlion.utils.conj_priors module
Implementations of Bayesian conjugate priors & their online update rules.

Abstract base class for a Bayesian conjugate prior. 

BetaBernoulli conjugate prior for binary data. 

NormalInverseGamma conjugate prior. 

Multivariate NormalInverseWishart conjugate prior. 

Bayesian Ordinary Linear Regression conjugate prior, which models a univariate input as a function of time. 

Bayesian multivariate linear regression conjugate prior, which models a multivariate input as a function of time. 
 class merlion.utils.conj_priors.ConjPrior(sample=None)
Bases:
ABC
Abstract base class for a Bayesian conjugate prior. Can be used with either
TimeSeries
ornumpy
arrays directly. Parameters
sample – a sample used to initialize the prior.
 to_dict()
 classmethod from_dict(state_dict)
 static get_time_series_values(x)
 Return type
ndarray
 Returns
numpy array representing the input
x
 process_time_series(x)
 Return type
Tuple
[ndarray
,ndarray
] Returns
(t, x)
, wheret
is a normalized list of timestamps, andx
is anumpy
array representing the input
 abstract posterior(x, return_rv=False, log=True, return_updated=False)
Predictive posterior (log) PDF for new observations, or the
scipy.stats
random variable where applicable. Parameters
x – value(s) to evaluate posterior at (
None
implies that we want to return the random variable)return_rv – whether to return the random variable directly
log – whether to return the log PDF (instead of the PDF)
return_updated – whether to return an updated version of the conjugate prior as well
 abstract update(x)
Update the conjugate prior based on new observations x.
 abstract forecast(time_stamps)
Return a posterior predictive interval for the time stamps given.
 Parameters
time_stamps – a list of time stamps
 Return type
Tuple
[TimeSeries
,TimeSeries
] Returns
(forecast, stderr)
, whereforecast
is the expected posterior value andstderr
is the standard error of that forecast.
 class merlion.utils.conj_priors.ScalarConjPrior(sample=None)
Bases:
ConjPrior
,ABC
Abstract base class for a Bayesian conjugate prior for a scalar random variable.
 Parameters
sample – a sample used to initialize the prior.
 process_time_series(x)
 Returns
(t, x)
, wheret
is a normalized list of timestamps, andx
is anumpy
array representing the input
 static get_time_series_values(x)
 Return type
ndarray
 Returns
numpy array representing the input
x
 class merlion.utils.conj_priors.BetaBernoulli(sample=None)
Bases:
ScalarConjPrior
BetaBernoulli conjugate prior for binary data. We assume the model
\[\begin{split}\begin{align*} X &\sim \mathrm{Bernoulli}(\theta) \\ \theta &\sim \mathrm{Beta}(\alpha, \beta) \end{align*}\end{split}\]The update rule for data \(x_1, \ldots, x_n\) is
\[\begin{split}\begin{align*} \alpha &= \alpha + \sum_{i=1}^{n} \mathbb{I}[x_i = 1] \\ \beta &= \beta + \sum_{i=1}^{n} \mathbb{I}[x_i = 0] \end{align*}\end{split}\] Parameters
sample – a sample used to initialize the prior.
 posterior(x, return_rv=False, log=True, return_updated=False)
The posterior distribution of x is \(\mathrm{Bernoulli}(\alpha / (\alpha + \beta))\).
 theta_posterior(theta, return_rv=False, log=True)
The posterior distribution of \(\theta\) is \(\mathrm{Beta}(\alpha, \beta)\).
 update(x)
Update the conjugate prior based on new observations x.
 forecast(time_stamps)
Return a posterior predictive interval for the time stamps given.
 Parameters
time_stamps – a list of time stamps
 Return type
Tuple
[TimeSeries
,TimeSeries
] Returns
(forecast, stderr)
, whereforecast
is the expected posterior value andstderr
is the standard error of that forecast.
 class merlion.utils.conj_priors.NormInvGamma(sample=None)
Bases:
ScalarConjPrior
NormalInverseGamma conjugate prior. Following Wikipedia and Murphy (2007), we assume the model
\[\begin{split}\begin{align*} X &\sim \mathcal{N}(\mu, \sigma^2) \\ \mu &\sim \mathcal{N}(\mu_0, \sigma^2 / n) \\ \sigma^2 &\sim \mathrm{InvGamma}(\alpha, \beta) \end{align*}\end{split}\]The update rule for data \(x_1, \ldots, x_n\) is
\[\begin{split}\begin{align*} \bar{x} &= \frac{1}{n} \sum_{i = 1}^{n} x_i \\ \alpha &= \alpha + n/2 \\ \beta &= \beta + \frac{1}{2} \sum_{i = 1}^{n} (x_i  \bar{x})^2 + \frac{1}{2} (\mu_0  \bar{x})^2 \\ \mu_0 &= \frac{n_0}{n_0 + n} \mu_0 + \frac{n}{n_0 + n} \bar{x} \\ n_0 &= n_0 + n \end{align*}\end{split}\] Parameters
sample – a sample used to initialize the prior.
 update(x)
Update the conjugate prior based on new observations x.
 mu_posterior(mu, return_rv=False, log=True)
The posterior for \(\mu\) is \(\text{Studentt}_{2\alpha}(\mu_0, \beta / (n \alpha))\)
 sigma2_posterior(sigma2, return_rv=False, log=True)
The posterior for \(\sigma^2\) is \(\text{InvGamma}(\alpha, \beta)\).
 posterior(x, log=True, return_rv=False, return_updated=False)
The posterior for \(x\) is \(\text{Studentt}_{2\alpha}(\mu_0, (n+1) \beta / (n \alpha))\)
 forecast(time_stamps)
Return a posterior predictive interval for the time stamps given.
 Parameters
time_stamps – a list of time stamps
 Return type
Tuple
[TimeSeries
,TimeSeries
] Returns
(forecast, stderr)
, whereforecast
is the expected posterior value andstderr
is the standard error of that forecast.
 class merlion.utils.conj_priors.MVNormInvWishart(sample=None)
Bases:
ConjPrior
Multivariate NormalInverseWishart conjugate prior. Multivariate equivalent of NormalInverseGamma. Following Murphy (2007), we assume the model
\[\begin{split}\begin{align*} X &\sim \mathcal{N}_d(\mu, \Sigma) \\ \mu &\sim \mathcal{N}_d(\mu_0, \Sigma / n) \\ \Sigma &\sim \mathrm{InvWishart}_{\nu}(\Lambda) \end{align*}\end{split}\]The update rule for data \(x_1, \ldots, x_n\) is
\[\begin{split}\begin{align*} \bar{x} &= \frac{1}{n} \sum_{i = 1}^{n} x_i \\ \nu &= \nu + n/2 \\ \Lambda &= \Lambda + \frac{n_0 n}{n_0 + n} (\mu_0  \bar{x}) (\mu_0  \bar{x})^T + \sum_{i = 1}^{n} (x_i  \bar{x}) (x_i  \bar{x})^T \\ \mu_0 &= \frac{n_0}{n_0 + n} \mu_0 + \frac{n}{n_0 + n} \bar{x} \\ n_0 &= n_0 + n \end{align*}\end{split}\] Parameters
sample – a sample used to initialize the prior.
 process_time_series(x)
 Returns
(t, x)
, wheret
is a normalized list of timestamps, andx
is anumpy
array representing the input
 update(x)
Update the conjugate prior based on new observations x.
 mu_posterior(mu, return_rv=False, log=True)
The posterior for \(\mu\) is \(\text{Studentt}_{\nud+1}(\mu_0, \Lambda / (n (\nu  d + 1)))\)
 Sigma_posterior(sigma2, return_rv=False, log=True)
The posterior for \(\Sigma\) is \(\text{InvWishart}_{\nu}(\Lambda^{1})\)
 posterior(x, return_rv=False, log=True, return_updated=False)
The posterior for \(x\) is \(\text{Studentt}_{\nud+1}(\mu_0, (n + 1) \Lambda / (n (\nu  d + 1)))\)
 forecast(time_stamps, name='forecast')
Return a posterior predictive interval for the time stamps given.
 Parameters
time_stamps – a list of time stamps
 Return type
Tuple
[TimeSeries
,TimeSeries
] Returns
(forecast, stderr)
, whereforecast
is the expected posterior value andstderr
is the standard error of that forecast.
 class merlion.utils.conj_priors.BayesianLinReg(sample=None)
Bases:
ConjPrior
Bayesian Ordinary Linear Regression conjugate prior, which models a univariate input as a function of time. Following Wikipedia, we assume the model
\[\begin{split}\begin{align*} x(t) &\sim \mathcal{N}(m t + b, \sigma^2) \\ w &\sim \mathcal{N}((m_0, b_0), \sigma^2 \Lambda_0^{1}) \\ \sigma^2 &\sim \mathrm{InvGamma}(\alpha, \beta) \end{align*}\end{split}\]Consider new data \((t_1, x_1), \ldots, (t_n, x_n)\). Let \(T \in \mathbb{R}^{n \times 2}\) be the matrix obtained by stacking the row vector of times with an allones row vector. Let \(w = (m, b) \in \mathbb{R}^{2}\) be the full weight vector. Let \(x \in \mathbb{R}^{n}\) denote all observed values. Then we have the update rule
\[\begin{split}\begin{align*} w_{OLS} &= (T^T T)^{1} T^T x \\ \Lambda_n &= \Lambda_0 + T^T T \\ w_n &= (\Lambda_0 + T^T T)^{1} (\Lambda_0 w_0 + T^T T w_{OLS}) \\ \alpha_n &= \alpha_0 + n / 2 \\ \beta_n &= \beta_0 + \frac{1}{2}(x^T x + w_0^T \Lambda_0 w_0  w_n^T \Lambda_n w_n) \end{align*}\end{split}\] Parameters
sample – a sample used to initialize the prior.
 update(x)
Update the conjugate prior based on new observations x.
 posterior_explicit(x, return_rv=False, log=True, return_updated=False)
Let \(\Lambda_n, \alpha_n, \beta_n\) be the posterior values obtained by updating the model on data \((t_1, x_1), \ldots, (t_n, x_n)\). The predictive posterior has PDF
\[\begin{align*} P((t, x)) &= \frac{1}{(2 \pi)^{n/2}} \sqrt{\frac{\det \Lambda_0}{\det \Lambda_n}} \frac{\beta_0^{\alpha_0}}{\beta_n^{\alpha_n}}\frac{\Gamma(\alpha_n)}{\Gamma(\alpha_0)} \end{align*}\]
 posterior(x, return_rv=False, log=True, return_updated=False)
Naive computation of the posterior using Bayes Rule, i.e.
\[\begin{split}\hat{\sigma}^2 &= \mathbb{E}[\sigma^2] \\ \hat{w} &= \mathbb{E}[w \mid \sigma^2 = \hat{\sigma}^2] \\ p(x \mid t) &= \frac{ p(w = \hat{w}, \sigma^2 = \hat{\sigma}^2) p(x \mid t, w = \hat{w}, \sigma^2 = \hat{\sigma}^2)}{ p(w = \hat{w}, \sigma^2 = \hat{\sigma}^2 \mid x, t)}\end{split}\]
 forecast(time_stamps)
Return a posterior predictive interval for the time stamps given.
 Parameters
time_stamps – a list of time stamps
 Return type
Tuple
[TimeSeries
,TimeSeries
] Returns
(forecast, stderr)
, whereforecast
is the expected posterior value andstderr
is the standard error of that forecast.
 class merlion.utils.conj_priors.BayesianMVLinReg(sample=None)
Bases:
ConjPrior
Bayesian multivariate linear regression conjugate prior, which models a multivariate input as a function of time. Following Wikipedia and Geisser (1965), we assume the model
\[\begin{split}\begin{align*} X(t) &\sim \mathcal{N}_{d}(m t + b, \Sigma) \\ (m, b) &\sim \mathcal{N}_{2d}((m_0, b_0), \Sigma \otimes \Lambda_0^{1}) \\ \Sigma &\sim \mathrm{InvWishart}_{\nu}(V_0) \\ \end{align*}\end{split}\]where \((m, b)\) is the concatenation of the vectors \(m\) and \(b\), \(\Lambda_0 \in \mathbb{R}^{2 \times 2}\), and \(\otimes\) is the Kronecker product. Consider new data \((t_1, x_1), \ldots, (t_n, x_n)\). Let \(T \in \mathbb{R}^{n \times 2}\) be the matrix obtained by stacking the row vector of times with an allones row vector. Let \(W = [m, b]^T \in \mathbb{R}^{2 \times d}\) be the full weight matrix. Let \(X \in \mathbb{R}^{n \times d}\) be the matrix of observed \(x\) values. Then we have the update rule
\[\begin{split}\begin{align*} \nu_n &= \nu_0 + n \\ W_n &= (\Lambda_0 + T^T T)^{1}(\Lambda_0 W_0 + T^T X) \\ V_n &= V_0 + (X  TW_n)^T (X  TW_n) + (W_n  W_0)^T \Lambda_0 (W_n  W_0) \\ \Lambda_n &= \Lambda_0 + T^T T \\ \end{align*}\end{split}\] Parameters
sample – a sample used to initialize the prior.
 process_time_series(x)
 Returns
(t, x)
, wheret
is a normalized list of timestamps, andx
is anumpy
array representing the input
 update(x)
Update the conjugate prior based on new observations x.
 posterior_explicit(x, return_rv=False, log=True, return_updated=False)
Let \(\Lambda_n, \nu_n, V_n\) be the posterior values obtained by updating the model on data \((t_1, x_1), \ldots, (t_n, x_n)\). The predictive posterior has PDF
\[\begin{align*} P((t, x)) &= \frac{1}{(2 \pi)^{nd/2}} \sqrt{\frac{\det \Lambda_0}{\det \Lambda_n}} \frac{\det(V_0/2)^{\nu_0/2}}{\det(V_n/2)^{\nu_n/2}}\frac{\Gamma_d(\nu_n/2)}{\Gamma_d(\nu_0 / 2)} \end{align*}\]
 posterior(x, return_rv=False, log=True, return_updated=False)
Naive computation of the posterior using Bayes Rule, i.e.
\[\begin{split}\hat{\Sigma} &= \mathbb{E}[\Sigma] \\ \hat{W} &= \mathbb{E}[W \mid \Sigma = \hat{\Sigma}] \\ p(X \mid t) &= \frac{ p(W = \hat{W}, \Sigma = \hat{\Sigma}) p(X \mid t, W = \hat{W}, \Sigma = \hat{\Sigma})}{ p(W = \hat{W}, \Sigma = \hat{\Sigma} \mid x, t)}\end{split}\]
 forecast(time_stamps)
Return a posterior predictive interval for the time stamps given.
 Parameters
time_stamps – a list of time stamps
 Return type
Tuple
[TimeSeries
,TimeSeries
] Returns
(forecast, stderr)
, whereforecast
is the expected posterior value andstderr
is the standard error of that forecast.
merlion.utils.istat module
 class merlion.utils.istat.IStat(value=None, n=0)
Bases:
object
An abstract base class for computing various statistics incrementally, with emphasis on recencyweighted variants.
 Parameters
value (
Optional
[float
]) – Initial value of the statistic. Defaults to None.n (
int
) – Initial sample size. Defaults to 0.
 property n
 property value
 abstract add(x)
Add a new value to update the statistic. :param x: new value to add to the sample.
 abstract drop(x)
Drop a value to update the statistic. :param x: value to drop from the sample.
 add_batch(batch)
Add a batch of new values to update the statistic. :type batch:
List
[float
] :param batch: new values to add to the sample.
 drop_batch(batch)
Drop a batch of new values to update the statistic. :type batch:
List
[float
] :param batch: new values to add to the sample.
 class merlion.utils.istat.Mean(value=None, n=0)
Bases:
IStat
Class for incrementally computing the mean of a series of numbers.
 Parameters
value (
Optional
[float
]) – Initial value of the statistic. Defaults to None.n (
int
) – Initial sample size. Defaults to 0.
 property value
 add(x)
Add a new value to update the statistic. :param x: new value to add to the sample.
 drop(x)
Drop a value to update the statistic. :param x: value to drop from the sample.
 class merlion.utils.istat.Variance(ex_value=None, ex2_value=None, n=0, ddof=1)
Bases:
IStat
Class for incrementally computing the variance of a series of numbers.
 Parameters
ex_value (
Optional
[float
]) – Initial value of the first moment (mean).ex2_value (
Optional
[float
]) – Initial value of the second moment.n (
int
) – Initial sample size.ddof (
int
) – The delta degrees of freedom to use when correcting the estimate of the variance.
\[\text{Var}(x_i) = \text{E}(x_i^2)  \text{E}(x_i)^2\] add(x)
Add a new value to update the statistic. :param x: new value to add to the sample.
 drop(x)
Drop a value to update the statistic. :param x: value to drop from the sample.
 property true_value
 property corrected_value
 property value
 property sd
 property se
 class merlion.utils.istat.ExponentialMovingAverage(recency_weight=0.1, **kwargs)
Bases:
Mean
Class for incrementally computing the exponential moving average of a series of numbers.
 Parameters
recency_weight (
float
) – Recency weight to use when updating the exponential moving average.
Letting
w
be the recency weight,\[\begin{split}\begin{align*} \text{EMA}_w(x_0) & = x_0 \\ \text{EMA}_w(x_t) & = w \cdot x_t + (1w) \cdot \text{EMA}_w(x_{t1}) \end{align*}\end{split}\] property recency_weight
 property value
 drop(x)
Exponential Moving Average does not support dropping values
 class merlion.utils.istat.RecencyWeightedVariance(recency_weight, **kwargs)
Bases:
Variance
Class for incrementally computing the recencyweighted variance of a series of numbers.
 Parameters
recency_weight (
float
) – Recency weight to use when updating the recency weighted variance.
Letting
w
be the recency weight,\[\text{RWV}_w(x_t) = \text{EMA}_w({x^2_t})  \text{EMA}_w(x_t)^2\] mean_class
alias of
ExponentialMovingAverage
 property recency_weight
 drop(x)
Recency Weighted Variance does not support dropping values
merlion.utils.misc module
 class merlion.utils.misc.AutodocABCMeta(classname, bases, cls_dict)
Bases:
ABCMeta
Metaclass used to ensure that inherited members of an abstract base class also inherit docstrings for inherited methods.
 class merlion.utils.misc.ModelConfigMeta(classname, bases, cls_dict)
Bases:
type
Metaclass used to ensure that the function signatures for model
Config
initializers contain all relevant parameters, including those specified in the superclass. Also update docstrings accordingly.For example, the only parameter of the base class
Config
istransform
.ForecasterConfig
adds the parametersmax_forecast_steps
andtarget_seq_index
. BecauseConfig
inherits from this metaclass, we can declareclass ForecasterConfig(Config): def __init__(self, max_forecast_steps: int = None, target_seq_index: int = None, **kwargs): ...
and have the function signature for
ForecasterConfig
’s initializer include the parametertransform
, even though we never declared it explicitly. Additionally, the docstring fortransform
is inherited from the base class.
 merlion.utils.misc.combine_signatures(sig1, sig2)
Utility function which combines the signatures of two functions.
 merlion.utils.misc.parse_init_docstring(docstring)
 class merlion.utils.misc.ValIterOrderedDict
Bases:
OrderedDict
OrderedDict whose iterator goes over self.values() instead of self.keys().
 merlion.utils.misc.dynamic_import(import_path, alias=None)
Dynamically import a member from the specified module.
 Parameters
import_path (
str
) – syntax ‘module_name:member_name’, e.g. ‘merlion.transform.normalize:PowerTransform’alias (
Optional
[dict
]) – dict which maps shortcuts for the registered classes, to their full import paths.
 Returns
imported class
 merlion.utils.misc.initializer(func)
Decorator for the __init__ method. Automatically assigns the parameters.
 class merlion.utils.misc.ProgressBar(total, length=40, decimals=1, fill='█')
Bases:
object
 Parameters
total (
int
) – total iterationslength (
int
) – character length of bardecimals (
int
) – positive number of decimals in percent completefill (
str
) – bar fill character
 print(iteration, prefix, suffix, end='')
 Parameters
iteration – current iteration
prefix – prefix string
suffix – suffix string
end – end character (e.g.
"\r"
,"\r\n"
)
merlion.utils.resample module
 class merlion.utils.resample.AlignPolicy(value)
Bases:
Enum
Policies for aligning multiple univariate time series.
 OuterJoin = 0
 InnerJoin = 1
 FixedReference = 2
 FixedGranularity = 3
 class merlion.utils.resample.AggregationPolicy(value)
Bases:
Enum
Aggregation policies. Values are partial functions for pandas.core.resample.Resampler methods.
 Mean = functools.partial(<function AggregationPolicy.<lambda>>)
 Sum = functools.partial(<function AggregationPolicy.<lambda>>)
 Median = functools.partial(<function AggregationPolicy.<lambda>>)
 First = functools.partial(<function AggregationPolicy.<lambda>>)
 Last = functools.partial(<function AggregationPolicy.<lambda>>)
 Min = functools.partial(<function AggregationPolicy.<lambda>>)
 Max = functools.partial(<function AggregationPolicy.<lambda>>)
 class merlion.utils.resample.MissingValuePolicy(value)
Bases:
Enum
Missing value imputation policies. Values are partial functions for
pd.Series
methods. FFill = functools.partial(<function MissingValuePolicy.<lambda>>)
Fill gap with the first value before the gap.
 BFill = functools.partial(<function MissingValuePolicy.<lambda>>)
Fill gap with the first value after the gap.
 Nearest = functools.partial(<function MissingValuePolicy.<lambda>>, method='nearest')
Replace missing value with the value closest to it.
 Interpolate = functools.partial(<function MissingValuePolicy.<lambda>>, method='time')
Fill in missing values by linear interpolation.
 merlion.utils.resample.to_pd_datetime(timestamp)
Converts a timestamp (or list/iterable of timestamps) to pandas Datetime, truncated at the millisecond.
 merlion.utils.resample.to_timestamp(t)
Converts a datetime to a Unix timestamp.
 merlion.utils.resample.granularity_str_to_seconds(granularity)
Converts a string/float/int granularity (representing a timedelta) to the number of seconds it represents, truncated at the millisecond.
 Return type
Optional
[float
]
 merlion.utils.resample.get_gcd_timedelta(*time_stamp_lists)
Calculates all timedeltas present in any of the lists of time stamps given, and returns the GCD of all these timedeltas (up to units of milliseconds).
 merlion.utils.resample.infer_granularity(time_stamps)
Infers the granularity of a list of time stamps
 merlion.utils.resample.reindex_df(df, reference, missing_value_policy)
Reindexes a Datetimeindexed dataframe
df
to have the same time stamps as a reference sequence of timestamps. Imputes missing values with the givenMissingValuePolicy
.
merlion.utils.time_series module
 class merlion.utils.time_series.UnivariateTimeSeries(time_stamps, values, name=None, freq='1h')
Bases:
Series
Please read the tutorial before reading this API doc. This class is a timeindexed
pd.Series
which represents a univariate time series. For the most part, it supports all the same features aspd.Series
, with the following key differences to iteration and indexing:Iterating over a
UnivariateTimeSeries
is implemented asfor timestamp, value in univariate: # do stuff...
where
timestamp
is a Unix timestamp, andvalue
is the corresponding time series value.Integer index:
u[i]
yields the tuple(u.time_stamps[i], u.values[i])
Slice index:
u[i:j:k]
yields a newUnivariateTimeSeries(u.time_stamps[i:j:k], u.values[i:j:k])
The class also supports the following additional features:
univariate.time_stamps
returns the list of Unix timestamps, andunivariate.values
returns the list of the time series values. You may access thepd.DatetimeIndex
directly withunivariate.index
(or itsnp.ndarray
representation withunivariate.np_time_stamps
), and thenp.ndarray
of values withunivariate.np_values
.univariate.concat(other)
will concatenate the UnivariateTimeSeriesother
to the right end ofunivariate
.left, right = univariate.bisect(t)
will split the univariate at the given timestampt
.window = univariate.window(t0, tf)
will return the subset of the time series occurring between timestampst0
(inclusive) andtf
(noninclusive)series = univariate.to_pd()
will convert theUnivariateTimeSeries
into a regularpd.Series
(for compatibility).univariate = UnivariateTimeSeries.from_pd(series)
uses a timeindexedpd.Series
to create aUnivariateTimeSeries
object directly.
 __getitem__(i)
 Parameters
i (
Union
[int
,slice
]) – integer index or slice Return type
Union[Tuple[float, float], UnivariateTimeSeries]
 Returns
(self.time_stamps[i], self.values[i])
ifi
is an integer.UnivariateTimeSeries(self.time_series[i], self.values[i])
ifi
is a slice.
 __iter__()
The i’th item in the iterator is the tuple
(self.time_stamps[i], self.values[i])
.
 Parameters
time_stamps (
Optional
[Sequence
[Union
[int
,float
]]]) – a sequence of Unix timestamps. You may specifyNone
if you only havevalues
with no specific time stamps.values (
Sequence
[float
]) – a sequence of univariate values, wherevalues[i]
occurs at timetime_stamps[i]
name (
Optional
[str
]) – the name of the univariate time seriesfreq – if
time_stamps
is not provided, the univariate is assumed to be sampled at frequencyfreq
.freq
may be a string (e.g."1h"
), timedelta, orint
/float
(in units of seconds).
 property np_time_stamps
 Return type
np.ndarray
 Returns
the
numpy
representation of this time series’s Unix timestamps
 property np_values
 Return type
np.ndarray
 Returns
the
numpy
representation of this time series’s values
 property time_stamps
 Return type
List[float]
 Returns
the list of Unix timestamps for the time series
 property values
 Return type
List[float]
 Returns
the list of values for the time series.
 property t0
 Return type
float
 Returns
the first timestamp in the univariate time series.
 property tf
 Return type
float
 Returns
the final timestamp in the univariate time series.
 is_empty()
 Return type
bool
 Returns
True if the univariate is empty, False if not.
 copy(deep=True)
Copies the
UnivariateTimeSeries
. Simply a wrapper around thepd.Series.copy()
method.
 concat(other)
Concatenates the
UnivariateTimeSeries
other
to the right of this one. :param UnivariateTimeSeries other: anotherUnivariateTimeSeries
:rtype: UnivariateTimeSeries :return: concatenated univariate time series
 bisect(t, t_in_left=False)
Splits the time series at the point where the given timestamp occurs.
 Parameters
t (
float
) – a Unix timestamp or datetime object. Everything before timet
is in the left split, and everything after timet
is in the right split.t_in_left (
bool
) – ifTrue
,t
is in the left split. Otherwise,t
is in the right split.
 Return type
 Returns
the left and right splits of the time series.
 window(t0, tf, include_tf=False)
 Parameters
t0 (
float
) – The timestamp/datetime at the start of the window (inclusive)tf (
float
) – The timestamp/datetime at the end of the window (inclusive ifinclude_tf
isTrue
, noninclusive otherwise)include_tf (
bool
) – Whether to includetf
in the window.
 Return type
 Returns
The subset of the time series occurring between timestamps
t0
(inclusive) andtf
(included ifinclude_tf
isTrue
, excluded otherwise).
 to_dict()
 Return type
Dict
[float
,float
] Returns
A dictionary representing the data points in the time series.
 classmethod from_dict(obj, name=None)
 Parameters
obj (
Dict
[float
,float
]) – A dictionary of timestamp  value pairsname – the name to assign the output
 Return type
 Returns
the
UnivariateTimeSeries
represented by series.
 to_pd()
 Return type
Series
 Returns
A pandas Series representing the time series, indexed by time.
 classmethod from_pd(series, name=None, freq='1h')
 Parameters
series (
Series
) – apd.Series
. If it has a``pd.DatetimeIndex``, we will use that index for the timestamps. Otherwise, we will create one at the specified frequency.name – the name to assign the output
freq – if
series
is not indexed by time, this is the frequency at which we will assume it is sampled.
 Return type
 Returns
the
UnivariateTimeSeries
represented by series.
 to_ts()
 Return type
 Returns
A
TimeSeries
representing this univariate time series.
 classmethod empty(name=None)
 Return type
 Returns
A Merlion
UnivariateTimeSeries
that has empty timestamps and values.
 class merlion.utils.time_series.TimeSeries(univariates, *, check_aligned=True)
Bases:
object
Please read the tutorial before reading this API doc. This class represents a general multivariate time series as a wrapper around a number of (optionally named)
UnivariateTimeSeries
. ATimeSeries
object is initialized astime_series = TimeSeries(univariates)
, whereunivariates
is either a list ofUnivariateTimeSeries
, or a dictionary mapping string names to their correspondingUnivariateTimeSeries
objects.Because the individual
univariates
need not be sampled at the same times, an important concept forTimeSeries
is alignment. We say that aTimeSeries
is aligned if all of its univariates have observations sampled at the exact set set of times.One may access the
UnivariateTimeSeries
comprising thisTimeSeries
in four ways:Iterate over the individual univariates using
for var in time_series.univariates: # do stuff with each UnivariateTimeSeries var
Access an individual
UnivariateTimeSeries
by name astime_series.univariates[name]
. If you supplied unnamed univariates to the constructor (i.e. using a list), the name of a univariate will just be its index in that list.Get the list of each univariate’s name with
time_series.names
.Iterate over named univariates as
for name, var in time_series.items(): # do stuff
Note that this is equivalent to iterating over
zip(time_series.names, time_series.univariates)
.
This class supports the following additional features as well:
Interoperability with
pandas
df = time_series.to_pd()
yields a timeindexedpd.DataFrame
, where each column (with the appropriate name) corresponds to a variable. Missing values areNaN
.time_series = TimeSeries.from_pd(df)
takes a timeindexedpd.DataFrame
and returns a correspondingTimeSeries
object (missing values are handled appropriately). The order oftime_series.univariates
is the order ofdf.keys()
.
Automated alignment:
aligned = time_series.align()
resamples each oftime_series.univariates
so that they all have the same timestamps. By default, this is done by taking the union of all timestamps present in any individual univariate time series, and imputing missing values via interpolation. See the method documentation for details on how you may configure the alignment policy.Transparent indexing and iteration for
TimeSeries
which have all univariates aligned (i.e. they all have the same timestamps)Get the length and shape of the time series (equal to the number of observations in each individual univariate). Note that if the time series is not aligned, we will return the length/shape of an equivalent
pandas
dataframe and emit a warning.Index
time_series[i] = (times[i], (x1[i], ..., xn[i]))
(assumingtime_series
hasn
aligned univariates with timestampstimes
, andxk = time_series.univariates[k1].values
). Slice returns aTimeSeries
object and works as one would expect.Assuming
time_series
hasn
variables, you may iterate withfor t_i, (x1_i, ..., xn_i) in time_series: # do stuff
Notably, this lets you call
times, val_vectors = zip(*time_series)
Timebased queries for any time series
Get the two sub
TimeSeries
before and after a timestampt
vialeft, right = time_series.bisect(t)
Get the sub
TimeSeries
between timestampst0
(inclusive) andtf
(noninclusive) viawindow = time_series.window(t0, tf)
Concatenation: two
TimeSeries
may be concatenated (in time) astime_series = time_series_1 + time_series_2
.
 __getitem__(i)
Only supported if all individual variable time series are sampled at the same time stamps.
 Parameters
i (
Union
[int
,slice
]) – integer index or slice. Return type
Union[Tuple[float, Tuple[float]], TimeSeries]
 Returns
If
i
is an integer, returns the tuple(time_stamps[i], tuple(var.values[i] for var in self.univariates))
. Ifi
is a slice, returns the time seriesTimeSeries([var[i] for var in self.univariates])
 __iter__()
Only supported if all individual variable time series are sampled at the same time stamps. The i’th item of the iterator is the tuple
(time_stamps[i], tuple(var.values[i] for var in self.univariates))
.
 property names
 Returns
The list of the names of the univariates.
 items()
 Returns
Iterator over
(name, univariate)
tuples.
 property dim: int
 Return type
int
 Returns
The dimension of the time series (the number of variables).
 property is_aligned: bool
 Return type
bool
 Returns
Whether all individual variable time series are sampled at the same time stamps, i.e. they are aligned.
 property np_time_stamps
 Return type
np.ndarray
 Returns
the
numpy
representation of this time series’s Unix timestamps
 property time_stamps
 Return type
List[float]
 Returns
the list of Unix timestamps for the time series
 property t0: float
 Return type
float
 Returns
the first timestamp in the time series.
 property tf: float
 Return type
float
 Returns
the final timestamp in the time series.
 is_empty()
 Return type
bool
 Returns
whether the time series is empty
 squeeze()
 Return type
 Returns
a
UnivariateTimeSeries
if the time series only has one univariate, otherwise returns itself, aTimeSeries
 property shape: Tuple[int, int]
 Return type
Tuple
[int
,int
] Returns
the shape of this time series, i.e.
(self.dim, len(self))
 bisect(t, t_in_left=False)
Splits the time series at the point where the given timestap
t
occurs. Parameters
t (
float
) – a Unix timestamp or datetime object. Everything before timet
is in the left split, and everything after timet
is in the right split.t_in_left (
bool
) – ifTrue
,t
is in the left split. Otherwise,t
is in the right split.
 Return type
Tuple[TimeSeries, TimeSeries]
 Returns
the left and right splits of the time series.
 window(t0, tf, include_tf=False)
 Parameters
t0 (
float
) – The timestamp/datetime at the start of the window (inclusive)tf (
float
) – The timestamp/datetime at the end of the window (inclusive ifinclude_tf
isTrue
, noninclusive otherwise)include_tf (
bool
) – Whether to includetf
in the window.
 Returns
The subset of the time series occurring between timestamps
t0
(inclusive) andtf
(included ifinclude_tf
isTrue
, excluded otherwise). Return type
 to_pd()
 Return type
DataFrame
 Returns
A pandas DataFrame (indexed by time) which represents this time series. Each variable corresponds to a column of the DataFrame. Timestamps which are present for one variable but not another, are represented with NaN.
 classmethod from_pd(df, check_times=True, freq='1h')
 Parameters
df (
Union
[Series
,DataFrame
,ndarray
]) – A pandas DataFrame with a DatetimeIndex. Each column corresponds to a different variable of the time series, and the key of column (in sorted order) give the relative order of those variables (in the list self.univariates). Missing values should be represented withNaN
. May also be a pandas Series for univariate time series.check_times – whether to check that all times in the index are unique (up to the millisecond) and sorted.
freq – if
df
is not indexed by time, this is the frequency at which we will assume it is sampled.
 Return type
 Returns
the
TimeSeries
object corresponding todf
.
 classmethod from_ts_list(ts_list, *, check_aligned=True)
 Parameters
ts_list (Iterable[TimeSeries]) – iterable of time series we wish to form a multivariate time series with
check_aligned (bool) – whether to check if the output time series is aligned
 Return type
 Returns
A multivariate
TimeSeries
created from all the time series in the inputs.
 align(*, reference=None, granularity=None, origin=None, remove_non_overlapping=True, alignment_policy=None, aggregation_policy=AggregationPolicy.Mean, missing_value_policy=MissingValuePolicy.Interpolate)
Aligns all the univariate time series comprising this multivariate time series so that they all have the same time stamps.
 Parameters
reference (
Optional
[Sequence
[Union
[int
,float
]]]) – A specific set of timestamps we want the resampled time series to contain. Required ifalignment_policy
isAlignPolicy.FixedReference
. Overrides other alignment policies if specified.granularity (
Union
[str
,int
,float
,None
]) – The granularity (in seconds) of the resampled time time series. Defaults to the GCD time difference between adjacent elements ofreference
(when available) ortime_series
(otherwise). Ignored ifreference
is given oralignment_policy
isAlignPolicy.FixedReference
. Overrides other alignment policies if specified.origin (
Optional
[int
]) – The first timestamp of the resampled time series. Only used if the alignment policy is AlignPolicy.FixedGranularity.remove_non_overlapping – If
True
, we will only keep the portions of the univariates that overlap with each other. For example, if we have 3 univariates which span timestamps [0, 3600], [60, 3660], and [30, 3540], we will only keep timestamps in the range [60, 3540]. IfFalse
, we will keep all timestamps produced by the resampling.alignment_policy (
Optional
[AlignPolicy
]) –The policy we want to use to align the time time series.
AlignPolicy.FixedReference
aligns each singlevariable time series toreference
, a userspecified sequence of timestamps.AlignPolicy.FixedGranularity
resamples each singlevariable time series at the same granularity, aggregating windows and imputing missing values as desired.AlignPolicy.OuterJoin
returns a time series with the union of all timestamps present in any singlevariable time series.AlignPolicy.InnerJoin
returns a time series with the intersection of all timestamps present in all singlevariable time series.
aggregation_policy (
AggregationPolicy
) – The policy used to aggregate windows of adjacent observations when downsampling.missing_value_policy (
MissingValuePolicy
) – The policy used to impute missing values created when upsampling.
 Return type
 Returns
The resampled multivariate time series.
 merlion.utils.time_series.ts_csv_load(file_name, ms=True, n_vars=None)
 Parameters
file_name (
str
) – a csv file starting with the field timestamp followed by all the all variable names.ms – whether the timestamps are in milliseconds (rather than seconds)
 Return type
 Returns
A merlion
TimeSeries
object.
 merlion.utils.time_series.ts_to_csv(time_series, file_name)
 Parameters
time_series (
TimeSeries
) – theTimeSeries
object to write to a csv.file_name (
str
) – the name to assign the csv file.
 merlion.utils.time_series.assert_equal_timedeltas(time_series, timedelta=None)
Checks that all time deltas in the time series are equal, either to each other, or a prespecified timedelta (in seconds).