merlion.spark package

This module implements APIs to integrate Merlion with PySpark. The expected use case is to use distributed computing to train and run inference on multiple time series in parallel.

There are two ways to use the PySpark API: directly invoking the Spark apps spark_apps/anomaly.py and spark_apps/forecast.py from the command line with either python or spark-submit, or using the Dockerfile to serve a Spark application on a Kubernetes cluster with spark-on-k8s. To understand the expected arguments for these apps, call python spark_apps/anomaly.py -h or python spark_apps/forecast.py -h.

Setting up the spark-on-k8s-operator

We will now cover how to serve these Spark apps using the spark-on-k8s-operator. For all methods, we expect that you have installed Merlion from source by cloning our git repo.

Next, you need to create a Kubernetes cluster. For local development, we recommend Minikube. However, you can also use Kubernetes clusters managed by major cloud providers, e.g. Google’s GKE or Amazon’s EKS. Setting up these clusters is beyond the scope of this document, so we defer to the linked resources.

Once your Kubernetes cluster is set up, you need to use Helm to install the spark-on-k8s-operator. A full quick start guide for the operator can be found here, but the key steps are to call

$ helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
$ kubectl create namespace spark-apps
$ helm install spark-operator spark-operator/spark-operator \
  --namespace spark-operator --create-namespace --set sparkJobNamespace=spark-apps

This will create a Kubernetes namespace spark-apps from which all your Spark applications will run, and it will use Helm to install the spark-on-k8s-operator (which manages all running PySpark apps as Kubernetes custom resources) in the namespace spark-operator.

Then, you can build the provided Dockerfile with docker build -t merlion-spark -f docker/spark-on-k8s/Dockerfile . from the root directory of Merlion. If you are using Minikube, make sure to point your shell to Minikube’s Docker daemon with eval $(minikube -p minikube docker-env) before building the image. If you are working on the cloud, you will need to publish the built Docker image to the appropriate registry, e.g. Google’s gcr.io or Amazon’s ECR.

If you require any additional Java dependencies (e.g. to communicate with a Google GCS bucket or AWS S3 bucket), we recommend you obtain the jars locally with a package manager like Maven, and add a line to the Dockerfile which copies those jars to a specific path, e.g. /opt/spark/extra-jars. Then, you can update the spec.SparkConf block of your Spark app configuration (see below) as follows:

spec:
  sparkConf:
    spark.driver.extraClassPath: "local:///opt/spark/extra-jars/*"
    spark.executor.extraClassPath: "local:///opt/spark/extra-jars/*"

Specifying a Spark App

Once your cluster is set up, you can submit a YAML file specifying your spark application as a Kubernetes custom resource. We provide templates for both forecasting and anomaly detection in k8s-spec/forecast.yml and k8s-spec/anomaly.yml respectively. Both of these use the walmart_mini.csv dataset, which contains the weekly sales of 10 different products at 2 different stores.

You can change the Docker image used by changing the spec.image in the YAML file. You can modify the amount of computational resources allocated to the Spark driver and executor by modifying spec.driver and spec.executor respectively. The arguments to the main application file (spark_apps/anomaly.py or spark_apps/forecast.py) are specified as a YAML list under spec.arguments. These should be modified according to your use case. By adding the appropriate Java dependencies and modifying the spec.sparkConf, you can directly read and write files on cloud storage buckets. While this topic is beyond the scope of this document, we refer an interested reader to Spark’s Hadoop config, Hadoop’s AWS S3 connector, and the GCS connector for more information.

More detailed information about specifying a Spark application can be found in the spark-on-k8s-operator’s detailed API documentation.

API Documentation

The API documentation of Merlion’s PySpark connectors (merlion.spark) is below.

dataset

Utils for reading & writing pyspark datasets.

pandas_udf

Pyspark pandas UDFs for Merlion functions.

merlion.spark.dataset

Utils for reading & writing pyspark datasets.

merlion.spark.dataset.TSID_COL_NAME = '__ts_id'

Many functions in this module rely on having a column named TSID_COL_NAME being in the dataset. This column can be added manually using add_tsid_column, and its addition is handled automatically by read_dataset.

merlion.spark.dataset.read_dataset(spark, path, file_format='csv', time_col=None, index_cols=None, data_cols=None)

Reads a time series dataset as a pyspark Dataframe.

Parameters
  • spark (SparkSession) – The current SparkSession.

  • path (str) – The path at which the dataset is stored.

  • file_format (str) – The file format the dataset is stored in.

  • time_col (Optional[str]) – The name of the column which specifies timestamp. If None is provided, it is assumed to be the first column which is not an index column or pre-specified data column.

  • index_cols (Optional[List[str]]) – The columns used to index the various time series in the dataset. If None is provided, we assume the entire dataset is just a single time series.

  • data_cols (Optional[List[str]]) – The columns we will use for downstream time series tasks. If None is provided, we use all columns that are not a time or index column.

Return type

DataFrame

Returns

A pyspark dataframe with columns [time_col, *index_cols, *data_cols, TSID_COL_NAME] (in that order).

merlion.spark.dataset.write_dataset(df, time_col, path, file_format='csv')

Writes the dataset at the specified path.

Parameters
  • df (DataFrame) – The dataframe to save. The dataframe must have a column TSID_COL_NAME indexing the time series in the dataset (this column is automatically added by read_dataset).

  • time_col (str) – The name of the column which specifies timestamp.

  • path (str) – The path to save the dataset at.

  • file_format (str) – The file format in which to save the dataset.

merlion.spark.dataset.create_hier_dataset(spark, df, time_col=None, index_cols=None, agg_dict=None)

Aggregates the time series in the dataset & appends them to the original dataset.

Parameters
  • spark (SparkSession) – The current SparkSession.

  • df (DataFrame) – A pyspark dataframe containing all the data. The dataframe must have a column TSID_COL_NAME indexing the time series in the dataset (this column is automatically added by read_dataset).

  • time_col (Optional[str]) – The name of the column which specifies timestamp. If None is provided, it is assumed to be the first column which is not an index column or pre-specified data column.

  • index_cols (Optional[List[str]]) – The columns used to index the various time series in the dataset. If None is provided, we assume the entire dataset is just a single time series. These columns define the levels of the hierarchy. For example, if each time series represents sales and we have index_cols = ["store", "item"], we will first aggregate sales for all items sold at a particular store; then we will aggregate sales for all items at all stores.

  • agg_dict (Optional[Dict]) – A dictionary used to specify how different data columns should be aggregated. If a data column is not in the dict, we aggregate using sum by default.

Return type

Tuple[DataFrame, ndarray]

Returns

The dataset with additional time series corresponding to each level of the hierarchy, as well as a matrix specifying how the hierarchy is constructed.

merlion.spark.dataset.add_tsid_column(spark, df, index_cols)

Adds the column TSID_COL_NAME to the dataframe, which assigns an integer ID to each time series in the dataset.

Parameters
  • spark (SparkSession) – The current SparkSession.

  • df (DataFrame) – A pyspark dataframe containing all the data.

  • index_cols (List[str]) – The columns used to index the various time series in the dataset.

Return type

DataFrame

Returns

The pyspark dataframe with an additional column TSID_COL_NAME added as the last column.

merlion.spark.pandas_udf

Pyspark pandas UDFs for Merlion functions. This module contains pandas UDFs that can be called via pyspark.sql.DataFrame.applyInPandas to run Merlion forecasting, anomaly detection, and time series reconciliation in parallel.

merlion.spark.pandas_udf.forecast(pdf, index_cols, time_col, target_col, time_stamps, model, predict_on_train=False, agg_dict=None)

Pyspark pandas UDF for performing forecasting. Should be called on a pyspark dataframe grouped by time series ID, i.e. by index_cols.

Parameters
  • pdf (DataFrame) – The pandas.DataFrame containing the training data. Should be a single time series.

  • index_cols (List[str]) – The list of column names used to index all the time series in the dataset. Not used for modeling.

  • time_col (str) – The name of the column containing the timestamps.

  • target_col (str) – The name of the column whose value we wish to forecast.

  • time_stamps (Union[List[int], List[str]]) – The timestamps at which we would like to obtain a forecast.

  • model (Union[ForecasterBase, dict]) – The model (or model dict) we are using to obtain a forecast.

  • predict_on_train (bool) – Whether to return the model’s prediction on the training data.

  • agg_dict (Optional[dict]) – A dictionary used to specify how different data columns should be aggregated. If a non-target data column is not in agg_dict, we do not model it for aggregated time series.

Return type

DataFrame

Returns

A pandas.DataFrame with the forecast & its standard error (NaN if the model doesn’t have error bars). Columns are [*index_cols, time_col, target_col, target_col + "_err"].

merlion.spark.pandas_udf.anomaly(pdf, index_cols, time_col, train_test_split, model, predict_on_train=False)

Pyspark pandas UDF for performing anomaly detection. Should be called on a pyspark dataframe grouped by time series ID, i.e. by index_cols.

Parameters
  • pdf (DataFrame) – The pandas.DataFrame containing the training and testing data. Should be a single time series.

  • index_cols (List[str]) – The list of column names used to index all the time series in the dataset. Not used for modeling.

  • time_col (str) – The name of the column containing the timestamps.

  • train_test_split (Union[int, str]) – The time at which the testing data starts.

  • model (Union[DetectorBase, dict]) – The model (or model dict) we are using to predict anomaly scores.

  • predict_on_train (bool) – Whether to return the model’s prediction on the training data.

Return type

DataFrame

Returns

A pandas.DataFrame with the anomaly scores on the test data. Columns are [*index_cols, time_col, "anom_score"].

merlion.spark.pandas_udf.reconciliation(pdf, hier_matrix, target_col)

Pyspark pandas UDF for computing the minimum-trace hierarchical time series reconciliation, as described by Wickramasuriya et al. 2018. Should be called on a pyspark dataframe grouped by timestamp. Pyspark implementation of merlion.utils.hts.minT_reconciliation.

Parameters
  • pdf (DataFrame) – A pandas.DataFrame containing forecasted values & standard errors from m time series at a single timestamp. Each time series should be indexed by TSID_COL_NAME. The first n time series (in order of ID) orrespond to leaves of the hierarchy, while the remaining m - n are weighted sums of the first n. This dataframe can be produced by calling forecast on the dataframe produced by merlion.spark.dataset.create_hier_dataset.

  • hier_matrix (ndarray) – A m-by-n matrix describing how the hierarchy is aggregated. The value of the k-th time series is np.dot(hier_matrix[k], pdf[:n]). This matrix can be produced by merlion.spark.dataset.create_hier_dataset.

  • target_col (str) – The name of the column whose value we wish to forecast.

Returns

A pandas.DataFrame which replaces the original forecasts & errors with reconciled forecasts & errors.

Note

Time series series reconciliation is skipped if the given timestamp has missing values for any of the time series. This can happen for training timestamps if the training time series has missing data and forecast is called with predict_on_train=true.