merlion.utils package
This package contains various utilities, including the TimeSeries class and
utilities for resampling time series.
| Implementation of  | |
| Code for resampling time series. | |
| Utils for data I/O. | |
| Aggregation for hierarchical time series. | |
| Generators for synthetic time series. | |
| Implementations of Bayesian conjugate priors & their online update rules. | |
| Incremental computation of time series statistics. | 
merlion.utils.time_series
Implementation of TimeSeries class.
- class merlion.utils.time_series.UnivariateTimeSeries(time_stamps, values, name=None, freq='1h')
- Bases: - Series- Please read the tutorial before reading this API doc. This class is a time-indexed - pd.Serieswhich represents a univariate time series. For the most part, it supports all the same features as- pd.Series, with the following key differences to iteration and indexing:- Iterating over a - UnivariateTimeSeriesis implemented as- for timestamp, value in univariate: # do stuff... - where - timestampis a Unix timestamp, and- valueis the corresponding time series value.
- Integer index: - u[i]yields the tuple- (u.time_stamps[i], u.values[i])
- Slice index: - u[i:j:k]yields a new- UnivariateTimeSeries(u.time_stamps[i:j:k], u.values[i:j:k])
 - The class also supports the following additional features: - univariate.time_stampsreturns the list of Unix timestamps, and- univariate.valuesreturns the list of the time series values. You may access the- pd.DatetimeIndexdirectly with- univariate.index(or its- np.ndarrayrepresentation with- univariate.np_time_stamps), and the- np.ndarrayof values with- univariate.np_values.
- univariate.concat(other)will concatenate the UnivariateTimeSeries- otherto the right end of- univariate.
- left, right = univariate.bisect(t)will split the univariate at the given timestamp- t.
- window = univariate.window(t0, tf)will return the subset of the time series occurring between timestamps- t0(inclusive) and- tf(non-inclusive)
- series = univariate.to_pd()will convert the- UnivariateTimeSeriesinto a regular- pd.Series(for compatibility).
- univariate = UnivariateTimeSeries.from_pd(series)uses a time-indexed- pd.Seriesto create a- UnivariateTimeSeriesobject directly.
 - __getitem__(i)
- Parameters
- i ( - Union[- int,- slice]) – integer index or slice
- Return type
- Union[Tuple[float, float], UnivariateTimeSeries] 
- Returns
- (self.time_stamps[i], self.values[i])if- iis an integer.- UnivariateTimeSeries(self.time_series[i], self.values[i])if- iis a slice.
 
 - __iter__()
- The i’th item in the iterator is the tuple - (self.time_stamps[i], self.values[i]).
 - Parameters
- time_stamps ( - Optional[- Sequence[- Union[- int,- float]]]) – a sequence of Unix timestamps. You may specify- Noneif you only have- valueswith no specific time stamps.
- values ( - Sequence[- float]) – a sequence of univariate values, where- values[i]occurs at time- time_stamps[i]
- name ( - Optional[- str]) – the name of the univariate time series
- freq – if - time_stampsis not provided, the univariate is assumed to be sampled at frequency- freq.- freqmay be a string (e.g.- "1h"), timedelta, or- int/- float(in units of seconds).
 
 - property np_time_stamps
- Return type
- np.ndarray 
- Returns
- the - numpyrepresentation of this time series’s Unix timestamps
 
 - property np_values
- Return type
- np.ndarray 
- Returns
- the - numpyrepresentation of this time series’s values
 
 - property time_stamps
- Return type
- List[float] 
- Returns
- the list of Unix timestamps for the time series 
 
 - property values
- Return type
- List[float] 
- Returns
- the list of values for the time series. 
 
 - property t0
- Return type
- float 
- Returns
- the first timestamp in the univariate time series. 
 
 - property tf
- Return type
- float 
- Returns
- the final timestamp in the univariate time series. 
 
 - is_empty()
- Return type
- bool 
- Returns
- True if the univariate is empty, False if not. 
 
 - copy(deep=True)
- Copies the - UnivariateTimeSeries. Simply a wrapper around the- pd.Series.copy()method.
 - concat(other)
- Concatenates the - UnivariateTimeSeries- otherto the right of this one. :param UnivariateTimeSeries other: another- UnivariateTimeSeries:rtype: UnivariateTimeSeries :return: concatenated univariate time series
 - bisect(t, t_in_left=False)
- Splits the time series at the point where the given timestamp occurs. - Parameters
- t ( - float) – a Unix timestamp or datetime object. Everything before time- tis in the left split, and everything after time- tis in the right split.
- t_in_left ( - bool) – if- True,- tis in the left split. Otherwise,- tis in the right split.
 
- Return type
- Returns
- the left and right splits of the time series. 
 
 - window(t0, tf, include_tf=False)
- Parameters
- t0 ( - float) – The timestamp/datetime at the start of the window (inclusive)
- tf ( - float) – The timestamp/datetime at the end of the window (inclusive if- include_tfis- True, non-inclusive otherwise)
- include_tf ( - bool) – Whether to include- tfin the window.
 
- Return type
- Returns
- The subset of the time series occurring between timestamps - t0(inclusive) and- tf(included if- include_tfis- True, excluded otherwise).
 
 - to_dict()
- Return type
- Dict[- float,- float]
- Returns
- A dictionary representing the data points in the time series. 
 
 - classmethod from_dict(obj, name=None)
- Parameters
- obj ( - Dict[- float,- float]) – A dictionary of timestamp - value pairs
- name – the name to assign the output 
 
- Return type
- Returns
- the - UnivariateTimeSeriesrepresented by series.
 
 - to_pd()
- Return type
- Series
- Returns
- A pandas Series representing the time series, indexed by time. 
 
 - classmethod from_pd(series, name=None, freq='1h')
- Parameters
- series ( - Union[- Series,- DataFrame]) – a- pd.Series. If it has a``pd.DatetimeIndex``, we will use that index for the timestamps. Otherwise, we will create one at the specified frequency.
- name – the name to assign the output 
- freq – if - seriesis not indexed by time, this is the frequency at which we will assume it is sampled.
 
- Return type
- Returns
- the - UnivariateTimeSeriesrepresented by series.
 
 - to_ts(name=None)
- Name
- a name to assign the univariate when converting it to a time series. Can override the existing name. 
- Return type
- Returns
- A - TimeSeriesrepresenting this univariate time series.
 
 - classmethod empty(name=None)
- Return type
- Returns
- A Merlion - UnivariateTimeSeriesthat has empty timestamps and values.
 
 
- class merlion.utils.time_series.TimeSeries(univariates, *, freq='1h', check_aligned=True)
- Bases: - object- Please read the tutorial before reading this API doc. This class represents a general multivariate time series as a wrapper around a number of (optionally named) - UnivariateTimeSeries. A- TimeSeriesobject is initialized as- time_series = TimeSeries(univariates), where- univariatesis either a list of- UnivariateTimeSeries, or a dictionary mapping string names to their corresponding- UnivariateTimeSeriesobjects.- Because the individual - univariatesneed not be sampled at the same times, an important concept for- TimeSeriesis alignment. We say that a- TimeSeriesis aligned if all of its univariates have observations sampled at the exact set set of times.- One may access the - UnivariateTimeSeriescomprising this- TimeSeriesin four ways:- Iterate over the individual univariates using - for var in time_series.univariates: # do stuff with each UnivariateTimeSeries var 
- Access an individual - UnivariateTimeSeriesby name as- time_series.univariates[name]. If you supplied unnamed univariates to the constructor (i.e. using a list), the name of a univariate will just be its index in that list.
- Get the list of each univariate’s name with - time_series.names.
- Iterate over named univariates as - for name, var in time_series.items(): # do stuff - Note that this is equivalent to iterating over - zip(time_series.names, time_series.univariates).
 - This class supports the following additional features as well: - Interoperability with - pandas- df = time_series.to_pd()yields a time-indexed- pd.DataFrame, where each column (with the appropriate name) corresponds to a variable. Missing values are- NaN.
- time_series = TimeSeries.from_pd(df)takes a time-indexed- pd.DataFrameand returns a corresponding- TimeSeriesobject (missing values are handled appropriately). The order of- time_series.univariatesis the order of- df.keys().
 
- Automated alignment: - aligned = time_series.align()resamples each of- time_series.univariatesso that they all have the same timestamps. By default, this is done by taking the union of all timestamps present in any individual univariate time series, and imputing missing values via interpolation. See the method documentation for details on how you may configure the alignment policy.
- Transparent indexing and iteration for - TimeSerieswhich have all univariates aligned (i.e. they all have the same timestamps)- Get the length and shape of the time series (equal to the number of observations in each individual univariate). Note that if the time series is not aligned, we will return the length/shape of an equivalent - pandasdataframe and emit a warning.
- Index - time_series[i] = (times[i], (x1[i], ..., xn[i]))(assuming- time_serieshas- naligned univariates with timestamps- times, and- xk = time_series.univariates[k-1].values). Slice returns a- TimeSeriesobject and works as one would expect.
- Assuming - time_serieshas- nvariables, you may iterate with- for t_i, (x1_i, ..., xn_i) in time_series: # do stuff - Notably, this lets you call - times, val_vectors = zip(*time_series)
 
- Time-based queries for any time series - Get the two sub - TimeSeriesbefore and after a timestamp- tvia- left, right = time_series.bisect(t)
- Get the sub - TimeSeriesbetween timestamps- t0(inclusive) and- tf(non-inclusive) via- window = time_series.window(t0, tf)
 
- Concatenation: two - TimeSeriesmay be concatenated (in time) as- time_series = time_series_1 + time_series_2.
 - __getitem__(i)
- Only supported if all individual variable time series are sampled at the same time stamps. - Parameters
- i ( - Union[- int,- slice]) – integer index or slice.
- Return type
- Union[Tuple[float, Tuple[float]], TimeSeries] 
- Returns
- If - iis an integer, returns the tuple- (time_stamps[i], tuple(var.values[i] for var in self.univariates)). If- iis a slice, returns the time series- TimeSeries([var[i] for var in self.univariates])
 
 - __iter__()
- Only supported if all individual variable time series are sampled at the same time stamps. The i’th item of the iterator is the tuple - (time_stamps[i], tuple(var.values[i] for var in self.univariates)).
 - property names
- Returns
- The list of the names of the univariates. 
 
 - items()
- Returns
- Iterator over - (name, univariate)tuples.
 
 - property dim: int
- Returns
- The dimension of the time series (the number of variables). 
 
 - rename(mapper)
- Parameters
- mapper ( - Union[- Iterable[- str],- Mapping[- str,- str],- Callable[[- str],- str]]) – Dict-like or function transformations to apply to the univariate names. Can also be an iterable of new univariate names.
- Returns
- the time series with renamed univariates. 
 
 - property is_aligned: bool
- Returns
- Whether all individual variable time series are sampled at the same time stamps, i.e. they are aligned. 
 
 - property index
 - property np_time_stamps
- Return type
- np.ndarray 
- Returns
- the - numpyrepresentation of this time series’s Unix timestamps
 
 - property time_stamps
- Return type
- List[float] 
- Returns
- the list of Unix timestamps for the time series 
 
 - property t0: float
- Return type
- float 
- Returns
- the first timestamp in the time series. 
 
 - property tf: float
- Return type
- float 
- Returns
- the final timestamp in the time series. 
 
 - is_empty()
- Return type
- bool
- Returns
- whether the time series is empty 
 
 - squeeze()
- Return type
- Returns
- UnivariateTimeSeriesif the time series is univariate; otherwise returns itself, a- TimeSeries
 
 - property shape: Tuple[int, int]
- Returns
- the shape of this time series, i.e. - (self.dim, len(self))
 
 - concat(other, axis=0)
- Concatenates the - TimeSeries- otheron the time axis if- axis = 0or the variable axis if- axis = 1. :rtype: TimeSeries :return: concatenated time series
 - bisect(t, t_in_left=False)
- Splits the time series at the point where the given timestamp - toccurs.- Parameters
- t ( - float) – a Unix timestamp or datetime object. Everything before time- tis in the left split, and everything after time- tis in the right split.
- t_in_left ( - bool) – if- True,- tis in the left split. Otherwise,- tis in the right split.
 
- Return type
- Tuple[TimeSeries, TimeSeries] 
- Returns
- the left and right splits of the time series. 
 
 - window(t0, tf, include_tf=False)
- Parameters
- t0 ( - float) – The timestamp/datetime at the start of the window (inclusive)
- tf ( - float) – The timestamp/datetime at the end of the window (inclusive if- include_tfis- True, non-inclusive otherwise)
- include_tf ( - bool) – Whether to include- tfin the window.
 
- Returns
- The subset of the time series occurring between timestamps - t0(inclusive) and- tf(included if- include_tfis- True, excluded otherwise).
- Return type
 
 - to_pd()
- Return type
- DataFrame
- Returns
- A pandas DataFrame (indexed by time) which represents this time series. Each variable corresponds to a column of the DataFrame. Timestamps which are present for one variable but not another, are represented with NaN. 
 
 - to_csv(file_name, **kwargs)
 - classmethod from_pd(df, check_times=True, drop_nan=True, freq='1h')
- Parameters
- df ( - Union[- Series,- DataFrame,- ndarray]) – A- pandas.DataFramewith a- DatetimeIndex. Each column corresponds to a different variable of the time series, and the key of column (in sorted order) give the relative order of those variables in- self.univariates. Missing values should be represented with- NaN. May also be a- pandas.Seriesfor single-variable time series.
- check_times – whether to check that all times in the index are unique (up to the millisecond) and sorted. 
- drop_nan – whether to drop all - NaNentries before creating the time series. Specifying- Falseis useful if you wish to impute the values on your own.
- freq – if - dfis not indexed by time, this is the frequency at which we will assume it is sampled.
 
- Return type
- Returns
- the - TimeSeriesobject corresponding to- df.
 
 - classmethod from_ts_list(ts_list, *, check_aligned=True)
- Parameters
- ts_list (Iterable[TimeSeries]) – iterable of time series we wish to form a multivariate time series with 
- check_aligned (bool) – whether to check if the output time series is aligned 
 
- Return type
- Returns
- A multivariate - TimeSeriescreated from all the time series in the inputs.
 
 - align(*, reference=None, granularity=None, origin=None, remove_non_overlapping=True, alignment_policy=None, aggregation_policy=AggregationPolicy.Mean, missing_value_policy=MissingValuePolicy.Interpolate)
- Aligns all the univariates comprising this multivariate time series so that they all have the same time stamps. - Parameters
- reference ( - Optional[- Sequence[- Union[- int,- float]]]) – A specific set of timestamps we want the resampled time series to contain. Required if- alignment_policyis- AlignPolicy.FixedReference. Overrides other alignment policies if specified.
- granularity ( - Union[- str,- int,- float,- None]) – The granularity (in seconds) of the resampled time time series. Defaults to the GCD time difference between adjacent elements of- time_series(otherwise). Ignored if- referenceis given or- alignment_policyis- AlignPolicy.FixedReference. Overrides other alignment policies if specified.
- origin ( - Optional[- int]) – The first timestamp of the resampled time series. Only used if the alignment policy is- AlignPolicy.FixedGranularity.
- remove_non_overlapping – If - True, we will only keep the portions of the univariates that overlap with each other. For example, if we have 3 univariates which span timestamps [0, 3600], [60, 3660], and [30, 3540], we will only keep timestamps in the range [60, 3540]. If- False, we will keep all timestamps produced by the resampling.
- alignment_policy ( - Optional[- AlignPolicy]) –- The policy we want to use to align the time series. - AlignPolicy.FixedReferencealigns each single-variable time series to- reference, a user-specified sequence of timestamps.
- AlignPolicy.FixedGranularityresamples each single-variable time series at the same granularity, aggregating windows and imputing missing values as desired.
- AlignPolicy.OuterJoinreturns a time series with the union of all timestamps present in any single-variable time series.
- AlignPolicy.InnerJoinreturns a time series with the intersection of all timestamps present in all single-variable time series.
 
- aggregation_policy ( - AggregationPolicy) – The policy used to aggregate windows of adjacent observations when downsampling.
- missing_value_policy ( - MissingValuePolicy) – The policy used to impute missing values created when upsampling.
 
- Return type
- Returns
- The resampled multivariate time series. 
 
 
- merlion.utils.time_series.assert_equal_timedeltas(time_series, granularity, offset=None)
- Checks that all time deltas in the time series are equal, either to each other, or a pre-specified timedelta (in seconds). 
merlion.utils.resample
Code for resampling time series.
- class merlion.utils.resample.AlignPolicy(value)
- Bases: - Enum- Policies for aligning multiple univariate time series. - OuterJoin = 0
 - InnerJoin = 1
 - FixedReference = 2
 - FixedGranularity = 3
 
- class merlion.utils.resample.AggregationPolicy(value)
- Bases: - Enum- Aggregation policies. Values are partial functions for pandas.core.resample.Resampler methods. - Mean = functools.partial(<function AggregationPolicy.<lambda>>)
 - Sum = functools.partial(<function AggregationPolicy.<lambda>>)
 - Median = functools.partial(<function AggregationPolicy.<lambda>>)
 - First = functools.partial(<function AggregationPolicy.<lambda>>)
 - Last = functools.partial(<function AggregationPolicy.<lambda>>)
 - Min = functools.partial(<function AggregationPolicy.<lambda>>)
 - Max = functools.partial(<function AggregationPolicy.<lambda>>)
 
- class merlion.utils.resample.MissingValuePolicy(value)
- Bases: - Enum- Missing value imputation policies. Values are partial functions for - pd.Seriesmethods.- FFill = functools.partial(<function MissingValuePolicy.<lambda>>)
- Fill gap with the first value before the gap. 
 - BFill = functools.partial(<function MissingValuePolicy.<lambda>>)
- Fill gap with the first value after the gap. 
 - Nearest = functools.partial(<function MissingValuePolicy.<lambda>>, method='nearest')
- Replace missing value with the value closest to it. 
 - Interpolate = functools.partial(<function MissingValuePolicy.<lambda>>, method='time')
- Fill in missing values by linear interpolation. 
 - ZFill = functools.partial(<function MissingValuePolicy.<lambda>>, to_replace=nan, value=0)
- Replace missing values with zeros. 
 
- merlion.utils.resample.to_pd_datetime(timestamp)
- Converts a timestamp (or list/iterable of timestamps) to pandas Datetime, truncated at the millisecond. 
- merlion.utils.resample.to_offset(dt)
- Converts a time gap to a - pd.Timedeltaif possible, otherwise a- pd.DateOffset.
- merlion.utils.resample.to_timestamp(t)
- Converts a datetime to a Unix timestamp. 
- merlion.utils.resample.granularity_str_to_seconds(granularity)
- Converts a string/float/int granularity (representing a timedelta) to the number of seconds it represents, truncated at the millisecond. - Return type
- Optional[- float]
 
- merlion.utils.resample.get_date_offset(time_stamps, reference)
- Returns the date offset one must add to - time_stampsso its last timestamp aligns with that of- reference.- Return type
- DateOffset
 
- merlion.utils.resample.infer_granularity(time_stamps, return_offset=False)
- Infers the granularity of a list of time stamps. 
- merlion.utils.resample.reindex_df(df, reference, missing_value_policy)
- Reindexes a Datetime-indexed dataframe - dfto have the same time stamps as a reference sequence of timestamps. Imputes missing values with the given MissingValuePolicy.
merlion.utils.data_io
Utils for data I/O.
- merlion.utils.data_io.df_to_time_series(df, time_col=None, timestamp_unit='s', data_cols=None)
- Converts a general - pandas.DataFrameto a TimeSeries object.- Parameters
- df ( - DataFrame) – the dataframe to process
- time_col ( - Optional[- str]) – the name of the column specifying time. If- Noneis specified, the existing index is used if it is a- DatetimeIndex. Otherwise, the first column is used.
- timestamp_unit – if the time column is in Unix timestamps, this is the unit of the timestamp. 
- data_cols ( - Union[- str,- List[- str],- None]) – the columns representing the actual data values of interest.
 
- Return type
 
- merlion.utils.data_io.data_io_decorator(func)
- Decorator to standardize docstrings for data I/O functions. 
- merlion.utils.data_io.csv_to_time_series(file_name: str, time_col: str = None, timestamp_unit='s', data_cols: Union[str, List[str]] = None) TimeSeries
- Reads a CSV file and converts it to a TimeSeries object. - Parameters
- time_col – the name of the column specifying time. If - Noneis specified, the existing index is used if it is a- DatetimeIndex. Otherwise, the first column is used.
- timestamp_unit – if the time column is in Unix timestamps, this is the unit of the timestamp. 
- data_cols – the columns representing the actual data values of interest. 
 
 
merlion.utils.hts
Aggregation for hierarchical time series.
- merlion.utils.hts.minT_reconciliation(forecasts, errs, sum_matrix, n_leaves)
- Computes the minimum trace reconciliation for hierarchical time series, as described by Wickramasuriya et al. 2018. This algorithm assumes that we have a number of time series aggregated at various levels (the aggregation tree is described by - sum_matrix), and we obtain independent forecasts at each level of the hierarchy. Minimum trace reconciliation finds the optimal way to adjust (reconcile) the forecasts to reduce the variance of the estimation.- Parameters
- forecasts ( - List[- TimeSeries]) – forecast for each aggregation level of the hierarchy
- errs ( - List[- TimeSeries]) – standard errors of forecasts for each level of the hierarchy. While not strictly necessary, reconciliation performs better if all forecasts are accompanied by uncertainty estimates.
- sum_matrix ( - ndarray) – matrix describing how the hierarchy is aggregated
- n_leaves ( - int) – the number of leaf forecasts (i.e. the number of forecasts at the most dis-aggregated level of the hierarchy). We assume that the leaf forecasts are last in the lists- forecasts&- errs, and that- sum_matrixreflects this fact.
 
- Return type
- List[- TimeSeries]
- Returns
- reconciled forecasts for each aggregation level of the hierarchy 
 
merlion.utils.ts_generator
Generators for synthetic time series.
- class merlion.utils.ts_generator.TimeSeriesGenerator(f, n, x0=0.0, step=1.0, scale=1.0, noise=<built-in method normal of numpy.random.mtrand.RandomState object>, distort=<built-in function add>, name=None, t0='1970 00:00:00', tdelta='5min')
- Bases: - object- An abstract base class for generating synthetic time series data. Generates a 1-dimensional grid x(0), x(1), …, x(n-1), where x(i) = x0 + i * step. Then generates a time series y(0), y(1), …, y(n-1), where y(i) = f(x(i)) + noise. - Parameters
- n ( - int) – The number of points to be generated.
- x0 ( - float) – The initial value to use to form that 1-dimensional grid that will be used to compute the synthetic values.
- step ( - float) – The step size to use when forming the 1-dimensional grid.
- scale ( - float) – A scalar to use to either inflate or deflate the synthetic data.
- noise ( - Callable[[],- float]) – A function that generates a random value when called.
- distort ( - Callable[[- float,- float],- float]) – A function mapping two real numbers to one real number which will be used to inject noise into the time series.
- name ( - Optional[- str]) – The name to assign the univariate that will be generated.
- t0 ( - str) – Initial timestamp to use when wrapping the generated values into a TimeSeries object.
- tdelta ( - str) – the time delta to use when wrapping the generated values into a TimeSeries object.
 
 - property n
 - property x0
 - property step
 - y(x)
 - generate(return_ts=True)
- Generates synthetic time series data according and returns it as a list or as a TimeSeries object. - Return type
- Union[- List[- float],- TimeSeries]
 
 
- class merlion.utils.ts_generator.GeneratorComposer(generators, per_generator_noise=False, **kwargs)
- Bases: - TimeSeriesGenerator- A class for generating synthetic time series by composing other TimeSeriesGenerator’s. - Parameters
- n – The number of points to be generated. 
- x0 – The initial value to use to form that 1-dimensional grid that will be used to compute the synthetic values. 
- step – The step size to use when forming the 1-dimensional grid. 
- scale – A scalar to use to either inflate or deflate the synthetic data. 
- noise – A function that generates a random value when called. 
- distort – A function mapping two real numbers to one real number which will be used to inject noise into the time series. 
- name – The name to assign the univariate that will be generated. 
- t0 – Initial timestamp to use when wrapping the generated values into a TimeSeries object. 
- tdelta – the time delta to use when wrapping the generated values into a TimeSeries object. 
 
 - property generators
 
- class merlion.utils.ts_generator.GeneratorConcatenator(string_outputs=True, **kwargs)
- Bases: - GeneratorComposer- A class for generating synthetic time series data that undergoes fundamental changes to it’s behavior that certain points in time. For example, with this class one could generate a time series that begins as linear and then becomes stationary. - For example, let f = 0 with for 3 steps 0,1,2 and g = 2 * x for the next three steps 3,4,5. generate() returns: - [0, 0, 0, 6, 8, 10] if string_outputs is False 
- [0, 0, 0, 2, 4, 6] if string_outputs is True. 
 - param string_outputs: If True, ensure that the end and beginning of each
- pair of consecutive time series are connected. For example, Let there be two generating functions f, and g belonging to consecutive generators. If True, adjust g by a constant c such that f(x) = g(x) at the last point x that f uses to generate its series. 
 - property generators
 - y(x)
- A Generator Sequence has no method y. 
 
merlion.utils.conj_priors
Implementations of Bayesian conjugate priors & their online update rules.
| 
 | Abstract base class for a Bayesian conjugate prior. | 
| 
 | Beta-Bernoulli conjugate prior for binary data. | 
| 
 | Normal-InverseGamma conjugate prior. | 
| 
 | Multivariate Normal-InverseWishart conjugate prior. | 
| 
 | Bayesian Ordinary Linear Regression conjugate prior, which models a univariate input as a function of time. | 
| 
 | Bayesian multivariate linear regression conjugate prior, which models a multivariate input as a function of time. | 
- class merlion.utils.conj_priors.ConjPrior(sample=None)
- Bases: - ABC- Abstract base class for a Bayesian conjugate prior. Can be used with either TimeSeries or - numpyarrays directly.- Parameters
- sample – a sample used to initialize the prior. 
 - to_dict()
 - abstract property n_params: int
 - classmethod from_dict(state_dict)
 - static get_time_series_values(x)
- Return type
- ndarray
- Returns
- numpy array representing the input - x
 
 - process_time_series(x)
- Return type
- Tuple[- ndarray,- ndarray]
- Returns
- (t, x), where- tis a normalized list of timestamps, and- xis a- numpyarray representing the input
 
 - abstract posterior(x, return_rv=False, log=True, return_updated=False)
- Predictive posterior (log) PDF for new observations, or the - scipy.statsrandom variable where applicable.- Parameters
- x – value(s) to evaluate posterior at ( - Noneimplies that we want to return the random variable)
- return_rv – whether to return the random variable directly 
- log – whether to return the log PDF (instead of the PDF) 
- return_updated – whether to return an updated version of the conjugate prior as well 
 
 
 - abstract update(x)
- Update the conjugate prior based on new observations x. 
 - abstract forecast(time_stamps)
- Return a posterior predictive interval for the time stamps given. - Parameters
- time_stamps – a list of time stamps 
- Return type
- Tuple[- TimeSeries,- TimeSeries]
- Returns
- (forecast, stderr), where- forecastis the expected posterior value and- stderris the standard error of that forecast.
 
 
- class merlion.utils.conj_priors.ScalarConjPrior(sample=None)
- Bases: - ConjPrior,- ABC- Abstract base class for a Bayesian conjugate prior for a scalar random variable. - Parameters
- sample – a sample used to initialize the prior. 
 - process_time_series(x)
- Returns
- (t, x), where- tis a normalized list of timestamps, and- xis a- numpyarray representing the input
 
 - static get_time_series_values(x)
- Return type
- ndarray
- Returns
- numpy array representing the input - x
 
 
- class merlion.utils.conj_priors.BetaBernoulli(sample=None)
- Bases: - ScalarConjPrior- Beta-Bernoulli conjugate prior for binary data. We assume the model \[\begin{split}\begin{align*} X &\sim \mathrm{Bernoulli}(\theta) \\ \theta &\sim \mathrm{Beta}(\alpha, \beta) \end{align*}\end{split}\]- The update rule for data \(x_1, \ldots, x_n\) is \[\begin{split}\begin{align*} \alpha &= \alpha + \sum_{i=1}^{n} \mathbb{I}[x_i = 1] \\ \beta &= \beta + \sum_{i=1}^{n} \mathbb{I}[x_i = 0] \end{align*}\end{split}\]- Parameters
- sample – a sample used to initialize the prior. 
 - property n_params: int
 - posterior(x, return_rv=False, log=True, return_updated=False)
- The posterior distribution of x is \(\mathrm{Bernoulli}(\alpha / (\alpha + \beta))\). 
 - theta_posterior(theta, return_rv=False, log=True)
- The posterior distribution of \(\theta\) is \(\mathrm{Beta}(\alpha, \beta)\). 
 - update(x)
- Update the conjugate prior based on new observations x. 
 - forecast(time_stamps)
- Return a posterior predictive interval for the time stamps given. - Parameters
- time_stamps – a list of time stamps 
- Return type
- Tuple[- TimeSeries,- TimeSeries]
- Returns
- (forecast, stderr), where- forecastis the expected posterior value and- stderris the standard error of that forecast.
 
 
- class merlion.utils.conj_priors.NormInvGamma(sample=None)
- Bases: - ScalarConjPrior- Normal-InverseGamma conjugate prior. Following Wikipedia and Murphy (2007), we assume the model \[\begin{split}\begin{align*} X &\sim \mathcal{N}(\mu, \sigma^2) \\ \mu &\sim \mathcal{N}(\mu_0, \sigma^2 / n) \\ \sigma^2 &\sim \mathrm{InvGamma}(\alpha, \beta) \end{align*}\end{split}\]- The update rule for data \(x_1, \ldots, x_n\) is \[\begin{split}\begin{align*} \bar{x} &= \frac{1}{n} \sum_{i = 1}^{n} x_i \\ \alpha &= \alpha + n/2 \\ \beta &= \beta + \frac{1}{2} \sum_{i = 1}^{n} (x_i - \bar{x})^2 + \frac{1}{2} (\mu_0 - \bar{x})^2 \\ \mu_0 &= \frac{n_0}{n_0 + n} \mu_0 + \frac{n}{n_0 + n} \bar{x} \\ n_0 &= n_0 + n \end{align*}\end{split}\]- Parameters
- sample – a sample used to initialize the prior. 
 - property n_params: int
 - update(x)
- Update the conjugate prior based on new observations x. 
 - mu_posterior(mu, return_rv=False, log=True)
- The posterior for \(\mu\) is \(\text{Student-t}_{2\alpha}(\mu_0, \beta / (n \alpha))\) 
 - sigma2_posterior(sigma2, return_rv=False, log=True)
- The posterior for \(\sigma^2\) is \(\text{InvGamma}(\alpha, \beta)\). 
 - posterior(x, log=True, return_rv=False, return_updated=False)
- The posterior for \(x\) is \(\text{Student-t}_{2\alpha}(\mu_0, (n+1) \beta / (n \alpha))\) 
 - forecast(time_stamps)
- Return a posterior predictive interval for the time stamps given. - Parameters
- time_stamps – a list of time stamps 
- Return type
- Tuple[- TimeSeries,- TimeSeries]
- Returns
- (forecast, stderr), where- forecastis the expected posterior value and- stderris the standard error of that forecast.
 
 
- class merlion.utils.conj_priors.MVNormInvWishart(sample=None)
- Bases: - ConjPrior- Multivariate Normal-InverseWishart conjugate prior. Multivariate equivalent of Normal-InverseGamma. Following Murphy (2007), we assume the model \[\begin{split}\begin{align*} X &\sim \mathcal{N}_d(\mu, \Sigma) \\ \mu &\sim \mathcal{N}_d(\mu_0, \Sigma / n) \\ \Sigma &\sim \mathrm{InvWishart}_{\nu}(\Lambda) \end{align*}\end{split}\]- The update rule for data \(x_1, \ldots, x_n\) is \[\begin{split}\begin{align*} \bar{x} &= \frac{1}{n} \sum_{i = 1}^{n} x_i \\ \nu &= \nu + n/2 \\ \Lambda &= \Lambda + \frac{n_0 n}{n_0 + n} (\mu_0 - \bar{x}) (\mu_0 - \bar{x})^T + \sum_{i = 1}^{n} (x_i - \bar{x}) (x_i - \bar{x})^T \\ \mu_0 &= \frac{n_0}{n_0 + n} \mu_0 + \frac{n}{n_0 + n} \bar{x} \\ n_0 &= n_0 + n \end{align*}\end{split}\]- Parameters
- sample – a sample used to initialize the prior. 
 - property n_params
 - process_time_series(x)
- Returns
- (t, x), where- tis a normalized list of timestamps, and- xis a- numpyarray representing the input
 
 - update(x)
- Update the conjugate prior based on new observations x. 
 - mu_posterior(mu, return_rv=False, log=True)
- The posterior for \(\mu\) is \(\text{Student-t}_{\nu-d+1}(\mu_0, \Lambda / (n (\nu - d + 1)))\) 
 - Sigma_posterior(sigma2, return_rv=False, log=True)
- The posterior for \(\Sigma\) is \(\text{InvWishart}_{\nu}(\Lambda^{-1})\) 
 - posterior(x, return_rv=False, log=True, return_updated=False)
- The posterior for \(x\) is \(\text{Student-t}_{\nu-d+1}(\mu_0, (n + 1) \Lambda / (n (\nu - d + 1)))\) 
 - forecast(time_stamps, name='forecast')
- Return a posterior predictive interval for the time stamps given. - Parameters
- time_stamps – a list of time stamps 
- Return type
- Tuple[- TimeSeries,- TimeSeries]
- Returns
- (forecast, stderr), where- forecastis the expected posterior value and- stderris the standard error of that forecast.
 
 
- class merlion.utils.conj_priors.BayesianLinReg(sample=None)
- Bases: - ConjPrior- Bayesian Ordinary Linear Regression conjugate prior, which models a univariate input as a function of time. Following Wikipedia, we assume the model \[\begin{split}\begin{align*} x(t) &\sim \mathcal{N}(m t + b, \sigma^2) \\ w &\sim \mathcal{N}((m_0, b_0), \sigma^2 \Lambda_0^{-1}) \\ \sigma^2 &\sim \mathrm{InvGamma}(\alpha, \beta) \end{align*}\end{split}\]- Consider new data \((t_1, x_1), \ldots, (t_n, x_n)\). Let \(T \in \mathbb{R}^{n \times 2}\) be the matrix obtained by stacking the row vector of times with an all-ones row vector. Let \(w = (m, b) \in \mathbb{R}^{2}\) be the full weight vector. Let \(x \in \mathbb{R}^{n}\) denote all observed values. Then we have the update rule \[\begin{split}\begin{align*} w_{OLS} &= (T^T T)^{-1} T^T x \\ \Lambda_n &= \Lambda_0 + T^T T \\ w_n &= (\Lambda_0 + T^T T)^{-1} (\Lambda_0 w_0 + T^T T w_{OLS}) \\ \alpha_n &= \alpha_0 + n / 2 \\ \beta_n &= \beta_0 + \frac{1}{2}(x^T x + w_0^T \Lambda_0 w_0 - w_n^T \Lambda_n w_n) \end{align*}\end{split}\]- Parameters
- sample – a sample used to initialize the prior. 
 - property n_params: int
 - update(x)
- Update the conjugate prior based on new observations x. 
 - posterior_explicit(x, return_rv=False, log=True, return_updated=False)
- Let \(\Lambda_n, \alpha_n, \beta_n\) be the posterior values obtained by updating the model on data \((t_1, x_1), \ldots, (t_n, x_n)\). The predictive posterior has PDF \[\begin{align*} P((t, x)) &= \frac{1}{(2 \pi)^{-n/2}} \sqrt{\frac{\det \Lambda_0}{\det \Lambda_n}} \frac{\beta_0^{\alpha_0}}{\beta_n^{\alpha_n}}\frac{\Gamma(\alpha_n)}{\Gamma(\alpha_0)} \end{align*}\]
 - posterior(x, return_rv=False, log=True, return_updated=False)
- Naive computation of the posterior using Bayes Rule, i.e. \[\begin{split}\hat{\sigma}^2 &= \mathbb{E}[\sigma^2] \\ \hat{w} &= \mathbb{E}[w \mid \sigma^2 = \hat{\sigma}^2] \\ p(x \mid t) &= \frac{ p(w = \hat{w}, \sigma^2 = \hat{\sigma}^2) p(x \mid t, w = \hat{w}, \sigma^2 = \hat{\sigma}^2)}{ p(w = \hat{w}, \sigma^2 = \hat{\sigma}^2 \mid x, t)}\end{split}\]
 - forecast(time_stamps)
- Return a posterior predictive interval for the time stamps given. - Parameters
- time_stamps – a list of time stamps 
- Return type
- Tuple[- TimeSeries,- TimeSeries]
- Returns
- (forecast, stderr), where- forecastis the expected posterior value and- stderris the standard error of that forecast.
 
 
- class merlion.utils.conj_priors.BayesianMVLinReg(sample=None)
- Bases: - ConjPrior- Bayesian multivariate linear regression conjugate prior, which models a multivariate input as a function of time. Following Wikipedia and Geisser (1965), we assume the model \[\begin{split}\begin{align*} X(t) &\sim \mathcal{N}_{d}(m t + b, \Sigma) \\ (m, b) &\sim \mathcal{N}_{2d}((m_0, b_0), \Sigma \otimes \Lambda_0^{-1}) \\ \Sigma &\sim \mathrm{InvWishart}_{\nu}(V_0) \\ \end{align*}\end{split}\]- where \((m, b)\) is the concatenation of the vectors \(m\) and \(b\), \(\Lambda_0 \in \mathbb{R}^{2 \times 2}\), and \(\otimes\) is the Kronecker product. Consider new data \((t_1, x_1), \ldots, (t_n, x_n)\). Let \(T \in \mathbb{R}^{n \times 2}\) be the matrix obtained by stacking the row vector of times with an all-ones row vector. Let \(W = [m, b]^T \in \mathbb{R}^{2 \times d}\) be the full weight matrix. Let \(X \in \mathbb{R}^{n \times d}\) be the matrix of observed \(x\) values. Then we have the update rule \[\begin{split}\begin{align*} \nu_n &= \nu_0 + n \\ W_n &= (\Lambda_0 + T^T T)^{-1}(\Lambda_0 W_0 + T^T X) \\ V_n &= V_0 + (X - TW_n)^T (X - TW_n) + (W_n - W_0)^T \Lambda_0 (W_n - W_0) \\ \Lambda_n &= \Lambda_0 + T^T T \\ \end{align*}\end{split}\]- Parameters
- sample – a sample used to initialize the prior. 
 - property n_params: int
 - process_time_series(x)
- Returns
- (t, x), where- tis a normalized list of timestamps, and- xis a- numpyarray representing the input
 
 - update(x)
- Update the conjugate prior based on new observations x. 
 - posterior_explicit(x, return_rv=False, log=True, return_updated=False)
- Let \(\Lambda_n, \nu_n, V_n\) be the posterior values obtained by updating the model on data \((t_1, x_1), \ldots, (t_n, x_n)\). The predictive posterior has PDF \[\begin{align*} P((t, x)) &= \frac{1}{(2 \pi)^{-nd/2}} \sqrt{\frac{\det \Lambda_0}{\det \Lambda_n}} \frac{\det(V_0/2)^{\nu_0/2}}{\det(V_n/2)^{\nu_n/2}}\frac{\Gamma_d(\nu_n/2)}{\Gamma_d(\nu_0 / 2)} \end{align*}\]
 - posterior(x, return_rv=False, log=True, return_updated=False)
- Naive computation of the posterior using Bayes Rule, i.e. \[\begin{split}\hat{\Sigma} &= \mathbb{E}[\Sigma] \\ \hat{W} &= \mathbb{E}[W \mid \Sigma = \hat{\Sigma}] \\ p(X \mid t) &= \frac{ p(W = \hat{W}, \Sigma = \hat{\Sigma}) p(X \mid t, W = \hat{W}, \Sigma = \hat{\Sigma})}{ p(W = \hat{W}, \Sigma = \hat{\Sigma} \mid x, t)}\end{split}\]
 - forecast(time_stamps)
- Return a posterior predictive interval for the time stamps given. - Parameters
- time_stamps – a list of time stamps 
- Return type
- Tuple[- TimeSeries,- TimeSeries]
- Returns
- (forecast, stderr), where- forecastis the expected posterior value and- stderris the standard error of that forecast.
 
 
merlion.utils.istat
Incremental computation of time series statistics.
- class merlion.utils.istat.IStat(value=None, n=0)
- Bases: - object- An abstract base class for computing various statistics incrementally, with emphasis on recency-weighted variants. - Parameters
- value ( - Optional[- float]) – Initial value of the statistic. Defaults to None.
- n ( - int) – Initial sample size. Defaults to 0.
 
 - property n
 - property value
 - abstract add(x)
- Add a new value to update the statistic. :type x: :param x: new value to add to the sample. 
 - abstract drop(x)
- Drop a value to update the statistic. :type x: :param x: value to drop from the sample. 
 - add_batch(batch)
- Add a batch of new values to update the statistic. :type batch: - List[- float] :param batch: new values to add to the sample.
 - drop_batch(batch)
- Drop a batch of new values to update the statistic. :type batch: - List[- float] :param batch: new values to add to the sample.
 
- class merlion.utils.istat.Mean(value=None, n=0)
- Bases: - IStat- Class for incrementally computing the mean of a series of numbers. - Parameters
- value ( - Optional[- float]) – Initial value of the statistic. Defaults to None.
- n ( - int) – Initial sample size. Defaults to 0.
 
 - property value
 - add(x)
- Add a new value to update the statistic. :type x: :param x: new value to add to the sample. 
 - drop(x)
- Drop a value to update the statistic. :type x: :param x: value to drop from the sample. 
 
- class merlion.utils.istat.Variance(ex_value=None, ex2_value=None, n=0, ddof=1)
- Bases: - IStat- Class for incrementally computing the variance of a series of numbers. - Parameters
- ex_value ( - Optional[- float]) – Initial value of the first moment (mean).
- ex2_value ( - Optional[- float]) – Initial value of the second moment.
- n ( - int) – Initial sample size.
- ddof ( - int) – The delta degrees of freedom to use when correcting the estimate of the variance.
 
 \[\text{Var}(x_i) = \text{E}(x_i^2) - \text{E}(x_i)^2\]- add(x)
- Add a new value to update the statistic. :type x: :param x: new value to add to the sample. 
 - drop(x)
- Drop a value to update the statistic. :type x: :param x: value to drop from the sample. 
 - property true_value
 - property corrected_value
 - property value
 - property sd
 - property se
 
- class merlion.utils.istat.ExponentialMovingAverage(recency_weight=0.1, **kwargs)
- Bases: - Mean- Class for incrementally computing the exponential moving average of a series of numbers. - Parameters
- recency_weight ( - float) – Recency weight to use when updating the exponential moving average.
 - Letting - wbe the recency weight,\[\begin{split}\begin{align*} \text{EMA}_w(x_0) & = x_0 \\ \text{EMA}_w(x_t) & = w \cdot x_t + (1-w) \cdot \text{EMA}_w(x_{t-1}) \end{align*}\end{split}\]- property recency_weight
 - property value
 - drop(x)
- Exponential Moving Average does not support dropping values 
 
- class merlion.utils.istat.RecencyWeightedVariance(recency_weight, **kwargs)
- Bases: - Variance- Class for incrementally computing the recency-weighted variance of a series of numbers. - Parameters
- recency_weight ( - float) – Recency weight to use when updating the recency weighted variance.
 - Letting - wbe the recency weight,\[\text{RWV}_w(x_t) = \text{EMA}_w({x^2_t}) - \text{EMA}_w(x_t)^2\]- mean_class
- alias of - ExponentialMovingAverage
 - property recency_weight
 - drop(x)
- Recency Weighted Variance does not support dropping values