merlion.spark package

This module implements APIs to integrate Merlion with PySpark. The expected use case is to use distributed computing to train and run inference on multiple time series in parallel.

dataset

Utils for reading & writing pyspark datasets.

pandas_udf

Pyspark pandas UDFs for Merlion functions.

Submodules

merlion.spark.dataset module

Utils for reading & writing pyspark datasets.

merlion.spark.dataset.TSID_COL_NAME = '__ts_id'

Many functions in this module rely on having a column named TSID_COL_NAME being in the dataset. This column can be added manually using add_tsid_column, and its addition is handled automatically by read_dataset.

merlion.spark.dataset.read_dataset(spark, path, file_format='csv', time_col=None, index_cols=None, data_cols=None)

Reads a time series dataset as a pyspark Dataframe.

Parameters
  • spark (SparkSession) – The current SparkSession.

  • path (str) – The path at which the dataset is stored.

  • file_format (str) – The file format the dataset is stored in.

  • time_col (Optional[str]) – The name of the column which specifies timestamp. If None is provided, it is assumed to be the first column which is not an index column or pre-specified data column.

  • index_cols (Optional[List[str]]) – The columns used to index the various time series in the dataset. If None is provided, we assume the entire dataset is just a single time series.

  • data_cols (Optional[List[str]]) – The columns we will use for downstream time series tasks. If None is provided, we use all columns that are not a time or index column.

Return type

DataFrame

Returns

A pyspark dataframe with columns [time_col, *index_cols, *data_cols, TSID_COL_NAME] (in that order).

merlion.spark.dataset.write_dataset(df, time_col, path, file_format='csv')

Writes the dataset at the specified path.

Parameters
  • df (DataFrame) – The dataframe to save. The dataframe must have a column TSID_COL_NAME indexing the time series in the dataset (this column is automatically added by read_dataset).

  • time_col (str) – The name of the column which specifies timestamp.

  • path (str) – The path to save the dataset at.

  • file_format (str) – The file format in which to save the dataset.

merlion.spark.dataset.create_hier_dataset(spark, df, time_col=None, index_cols=None, agg_dict=None)

Aggregates the time series in the dataset & appends them to the original dataset.

Parameters
  • spark (SparkSession) – The current SparkSession.

  • df (DataFrame) – A pyspark dataframe containing all the data. The dataframe must have a column TSID_COL_NAME indexing the time series in the dataset (this column is automatically added by read_dataset).

  • time_col (Optional[str]) – The name of the column which specifies timestamp. If None is provided, it is assumed to be the first column which is not an index column or pre-specified data column.

  • index_cols (Optional[List[str]]) – The columns used to index the various time series in the dataset. If None is provided, we assume the entire dataset is just a single time series. These columns define the levels of the hierarchy. For example, if each time series represents sales and we have index_cols = ["store", "item"], we will first aggregate sales for all items sold at a particular store; then we will aggregate sales for all items at all stores.

  • agg_dict (Optional[Dict]) – A dictionary used to specify how different data columns should be aggregated. If a data column is not in the dict, we aggregate using sum by default.

Return type

Tuple[DataFrame, ndarray]

Returns

The dataset with additional time series corresponding to each level of the hierarchy, as well as a matrix specifying how the hierarchy is constructed.

merlion.spark.dataset.add_tsid_column(spark, df, index_cols)

Adds the column TSID_COL_NAME to the dataframe, which assigns an integer ID to each time series in the dataset.

Parameters
  • spark (SparkSession) – The current SparkSession.

  • df (DataFrame) – A pyspark dataframe containing all the data.

  • index_cols (List[str]) – The columns used to index the various time series in the dataset.

Return type

DataFrame

Returns

The pyspark dataframe with an additional column TSID_COL_NAME added as the last column.

merlion.spark.pandas_udf module

Pyspark pandas UDFs for Merlion functions. This module contains pandas UDFs that can be called via pyspark.sql.DataFrame.applyInPandas to run Merlion forecasting, anomaly detection, and time series reconciliation in parallel.

merlion.spark.pandas_udf.forecast(pdf, index_cols, time_col, target_col, time_stamps, model, predict_on_train=False, agg_dict=None)

Pyspark pandas UDF for performing forecasting. Should be called on a pyspark dataframe grouped by time series ID, i.e. by index_cols.

Parameters
  • pdf (DataFrame) – The pandas.DataFrame containing the training data. Should be a single time series.

  • index_cols (List[str]) – The list of column names used to index all the time series in the dataset. Not used for modeling.

  • time_col (str) – The name of the column containing the timestamps.

  • target_col (str) – The name of the column whose value we wish to forecast.

  • time_stamps (Union[List[int], List[str]]) – The timestamps at which we would like to obtain a forecast.

  • model (Union[ForecasterBase, dict]) – The model (or model dict) we are using to obtain a forecast.

  • predict_on_train (bool) – Whether to return the model’s prediction on the training data.

  • agg_dict (Optional[dict]) – A dictionary used to specify how different data columns should be aggregated. If a non-target data column is not in agg_dict, we do not model it for aggregated time series.

Return type

DataFrame

Returns

A pandas.DataFrame with the forecast & its standard error (NaN if the model doesn’t have error bars). Columns are [*index_cols, time_col, target_col, target_col + "_err"].

merlion.spark.pandas_udf.anomaly(pdf, index_cols, time_col, train_test_split, model)

Pyspark pandas UDF for performing anomaly detection. Should be called on a pyspark dataframe grouped by time series ID, i.e. by index_cols.

Parameters
  • pdf (DataFrame) – The pandas.DataFrame containing the training and testing data. Should be a single time series.

  • index_cols (List[str]) – The list of column names used to index all the time series in the dataset. Not used for modeling.

  • time_col (str) – The name of the column containing the timestamps.

  • train_test_split (Union[int, str]) – The time at which the testing data starts.

  • model (Union[DetectorBase, dict]) – The model (or model dict) we are using to predict anomaly scores.

Return type

DataFrame

Returns

A pandas.DataFrame with the anomaly scores on the test data. Columns are [*index_cols, time_col, "anom_score"].

merlion.spark.pandas_udf.reconciliation(pdf, hier_matrix, target_col)

Pyspark pandas UDF for computing the minimum-trace hierarchical time series reconciliation, as described by Wickramasuriya et al. 2018. Should be called on a pyspark dataframe grouped by timestamp. Pyspark implementation of merlion.utils.hts.minT_reconciliation.

Parameters
  • pdf (DataFrame) – A pandas.DataFrame containing forecasted values & standard errors from m time series at a single timestamp. Each time series should be indexed by TSID_COL_NAME. The first n time series (in order of ID) orrespond to leaves of the hierarchy, while the remaining m - n are weighted sums of the first n. This dataframe can be produced by calling forecast on the dataframe produced by merlion.spark.dataset.create_hier_dataset.

  • hier_matrix (ndarray) – A m-by-n matrix describing how the hierarchy is aggregated. The value of the k-th time series is np.dot(hier_matrix[k], pdf[:n]). This matrix can be produced by merlion.spark.dataset.create_hier_dataset.

  • target_col (str) – The name of the column whose value we wish to forecast.

Returns

A pandas.DataFrame which replaces the original forecasts & errors with reconciled forecasts & errors.

Note

Time series series reconciliation is skipped if the given timestamp has missing values for any of the time series. This can happen for training timestamps if the training time series has missing data and forecast is called with predict_on_train=true.