merlion.spark package
This module implements APIs to integrate Merlion with PySpark. The expected use case is to use distributed computing to train and run inference on multiple time series in parallel.
There are two ways to use the PySpark API: directly invoking the Spark apps spark_apps/anomaly.py
and
spark_apps/forecast.py
from the command line with either python
or spark-submit
,
or using the Dockerfile to serve a Spark application on a Kubernetes cluster with spark-on-k8s
.
To understand the expected arguments for these apps, call python spark_apps/anomaly.py -h
or
python spark_apps/forecast.py -h
.
Setting up the spark-on-k8s-operator
We will now cover how to serve these Spark apps using the spark-on-k8s-operator. For all methods, we expect that you have installed Merlion from source by cloning our git repo.
Next, you need to create a Kubernetes cluster. For local development, we recommend Minikube. However, you can also use Kubernetes clusters managed by major cloud providers, e.g. Google’s GKE or Amazon’s EKS. Setting up these clusters is beyond the scope of this document, so we defer to the linked resources.
Once your Kubernetes cluster is set up, you need to use Helm to install
the spark-on-k8s-operator
. A full quick start guide for the operator can be found
here,
but the key steps are to call
$ helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
$ kubectl create namespace spark-apps
$ helm install spark-operator spark-operator/spark-operator \
--namespace spark-operator --create-namespace --set sparkJobNamespace=spark-apps
This will create a Kubernetes namespace spark-apps
from which all your Spark applications will run, and it will
use Helm to install the spark-on-k8s-operator
(which manages all running PySpark apps as Kubernetes custom
resources) in the namespace spark-operator
.
Then, you can build the provided Dockerfile with docker build -t merlion-spark -f docker/spark-on-k8s/Dockerfile .
from the root directory of Merlion.
If you are using Minikube, make sure to point your shell to Minikube’s Docker daemon with
eval $(minikube -p minikube docker-env)
before building the image.
If you are working on the cloud, you will need to publish the built Docker image to the appropriate registry, e.g.
Google’s gcr.io or Amazon’s ECR.
If you require any additional Java dependencies (e.g. to communicate with a Google GCS bucket or AWS S3 bucket),
we recommend you obtain the jars locally with a package manager like Maven,
and add a line to the Dockerfile which copies those jars to a specific path, e.g. /opt/spark/extra-jars
.
Then, you can update the spec.SparkConf
block of your Spark app configuration (see below) as follows:
spec:
sparkConf:
spark.driver.extraClassPath: "local:///opt/spark/extra-jars/*"
spark.executor.extraClassPath: "local:///opt/spark/extra-jars/*"
Specifying a Spark App
Once your cluster is set up, you can submit a YAML file specifying your spark application as a Kubernetes custom
resource. We provide templates for both forecasting and anomaly detection in k8s-spec/forecast.yml
and
k8s-spec/anomaly.yml
respectively. Both of these use the walmart_mini.csv
dataset,
which contains the weekly sales of 10 different products at 2 different stores.
You can change the Docker image used by changing the spec.image
in the YAML file. You can modify the amount of
computational resources allocated to the Spark driver and executor by modifying spec.driver
and spec.executor
respectively. The arguments to the main application file (spark_apps/anomaly.py
or spark_apps/forecast.py
)
are specified as a YAML list under spec.arguments
. These should be modified according to your use case.
By adding the appropriate Java dependencies and modifying the spec.sparkConf
, you can directly read and write files
on cloud storage buckets. While this topic is beyond the scope of this document, we refer an interested reader to
Spark’s Hadoop config,
Hadoop’s AWS S3 connector, and the
GCS connector for more information.
More detailed information about specifying a Spark application can be found in the spark-on-k8s-operator
’s detailed
API documentation.
API Documentation
The API documentation of Merlion’s PySpark connectors (merlion.spark
) is below.
Utils for reading & writing pyspark datasets. |
|
Pyspark pandas UDFs for Merlion functions. |
merlion.spark.dataset
Utils for reading & writing pyspark datasets.
- merlion.spark.dataset.TSID_COL_NAME = '__ts_id'
Many functions in this module rely on having a column named
TSID_COL_NAME
being in the dataset. This column can be added manually usingadd_tsid_column
, and its addition is handled automatically byread_dataset
.
- merlion.spark.dataset.read_dataset(spark, path, file_format='csv', time_col=None, index_cols=None, data_cols=None)
Reads a time series dataset as a pyspark Dataframe.
- Parameters
spark (
SparkSession
) – The current SparkSession.path (
str
) – The path at which the dataset is stored.file_format (
str
) – The file format the dataset is stored in.time_col (
Optional
[str
]) – The name of the column which specifies timestamp. IfNone
is provided, it is assumed to be the first column which is not an index column or pre-specified data column.index_cols (
Optional
[List
[str
]]) – The columns used to index the various time series in the dataset. IfNone
is provided, we assume the entire dataset is just a single time series.data_cols (
Optional
[List
[str
]]) – The columns we will use for downstream time series tasks. IfNone
is provided, we use all columns that are not a time or index column.
- Return type
DataFrame
- Returns
A pyspark dataframe with columns
[time_col, *index_cols, *data_cols, TSID_COL_NAME]
(in that order).
- merlion.spark.dataset.write_dataset(df, time_col, path, file_format='csv')
Writes the dataset at the specified path.
- Parameters
df (
DataFrame
) – The dataframe to save. The dataframe must have a columnTSID_COL_NAME
indexing the time series in the dataset (this column is automatically added byread_dataset
).time_col (
str
) – The name of the column which specifies timestamp.path (
str
) – The path to save the dataset at.file_format (
str
) – The file format in which to save the dataset.
- merlion.spark.dataset.create_hier_dataset(spark, df, time_col=None, index_cols=None, agg_dict=None)
Aggregates the time series in the dataset & appends them to the original dataset.
- Parameters
spark (
SparkSession
) – The current SparkSession.df (
DataFrame
) – A pyspark dataframe containing all the data. The dataframe must have a columnTSID_COL_NAME
indexing the time series in the dataset (this column is automatically added byread_dataset
).time_col (
Optional
[str
]) – The name of the column which specifies timestamp. IfNone
is provided, it is assumed to be the first column which is not an index column or pre-specified data column.index_cols (
Optional
[List
[str
]]) – The columns used to index the various time series in the dataset. IfNone
is provided, we assume the entire dataset is just a single time series. These columns define the levels of the hierarchy. For example, if each time series represents sales and we haveindex_cols = ["store", "item"]
, we will first aggregate sales for all items sold at a particular store; then we will aggregate sales for all items at all stores.agg_dict (
Optional
[Dict
]) – A dictionary used to specify how different data columns should be aggregated. If a data column is not in the dict, we aggregate using sum by default.
- Return type
Tuple
[DataFrame
,ndarray
]- Returns
The dataset with additional time series corresponding to each level of the hierarchy, as well as a matrix specifying how the hierarchy is constructed.
- merlion.spark.dataset.add_tsid_column(spark, df, index_cols)
Adds the column
TSID_COL_NAME
to the dataframe, which assigns an integer ID to each time series in the dataset.- Parameters
spark (
SparkSession
) – The current SparkSession.df (
DataFrame
) – A pyspark dataframe containing all the data.index_cols (
List
[str
]) – The columns used to index the various time series in the dataset.
- Return type
DataFrame
- Returns
The pyspark dataframe with an additional column
TSID_COL_NAME
added as the last column.
merlion.spark.pandas_udf
Pyspark pandas UDFs for Merlion functions.
This module contains pandas UDFs that can be called via pyspark.sql.DataFrame.applyInPandas
to run Merlion
forecasting, anomaly detection, and time series reconciliation in parallel.
- merlion.spark.pandas_udf.forecast(pdf, index_cols, time_col, target_col, time_stamps, model, predict_on_train=False, agg_dict=None)
Pyspark pandas UDF for performing forecasting. Should be called on a pyspark dataframe grouped by time series ID, i.e. by
index_cols
.- Parameters
pdf (
DataFrame
) – Thepandas.DataFrame
containing the training data. Should be a single time series.index_cols (
List
[str
]) – The list of column names used to index all the time series in the dataset. Not used for modeling.time_col (
str
) – The name of the column containing the timestamps.target_col (
str
) – The name of the column whose value we wish to forecast.time_stamps (
Union
[List
[int
],List
[str
]]) – The timestamps at which we would like to obtain a forecast.model (
Union
[ForecasterBase
,dict
]) – The model (or modeldict
) we are using to obtain a forecast.predict_on_train (
bool
) – Whether to return the model’s prediction on the training data.agg_dict (
Optional
[dict
]) – A dictionary used to specify how different data columns should be aggregated. If a non-target data column is not in agg_dict, we do not model it for aggregated time series.
- Return type
DataFrame
- Returns
A
pandas.DataFrame
with the forecast & its standard error (NaN if the model doesn’t have error bars). Columns are[*index_cols, time_col, target_col, target_col + "_err"]
.
- merlion.spark.pandas_udf.anomaly(pdf, index_cols, time_col, train_test_split, model, predict_on_train=False)
Pyspark pandas UDF for performing anomaly detection. Should be called on a pyspark dataframe grouped by time series ID, i.e. by
index_cols
.- Parameters
pdf (
DataFrame
) – Thepandas.DataFrame
containing the training and testing data. Should be a single time series.index_cols (
List
[str
]) – The list of column names used to index all the time series in the dataset. Not used for modeling.time_col (
str
) – The name of the column containing the timestamps.train_test_split (
Union
[int
,str
]) – The time at which the testing data starts.model (
Union
[DetectorBase
,dict
]) – The model (or modeldict
) we are using to predict anomaly scores.predict_on_train (
bool
) – Whether to return the model’s prediction on the training data.
- Return type
DataFrame
- Returns
A
pandas.DataFrame
with the anomaly scores on the test data. Columns are[*index_cols, time_col, "anom_score"]
.
- merlion.spark.pandas_udf.reconciliation(pdf, hier_matrix, target_col)
Pyspark pandas UDF for computing the minimum-trace hierarchical time series reconciliation, as described by Wickramasuriya et al. 2018. Should be called on a pyspark dataframe grouped by timestamp. Pyspark implementation of
merlion.utils.hts.minT_reconciliation
.- Parameters
pdf (
DataFrame
) – Apandas.DataFrame
containing forecasted values & standard errors fromm
time series at a single timestamp. Each time series should be indexed byTSID_COL_NAME
. The firstn
time series (in order of ID) orrespond to leaves of the hierarchy, while the remainingm - n
are weighted sums of the firstn
. This dataframe can be produced by callingforecast
on the dataframe produced bymerlion.spark.dataset.create_hier_dataset
.hier_matrix (
ndarray
) – Am
-by-n
matrix describing how the hierarchy is aggregated. The value of thek
-th time series isnp.dot(hier_matrix[k], pdf[:n])
. This matrix can be produced bymerlion.spark.dataset.create_hier_dataset
.target_col (
str
) – The name of the column whose value we wish to forecast.
- Returns
A
pandas.DataFrame
which replaces the original forecasts & errors with reconciled forecasts & errors.
Note
Time series series reconciliation is skipped if the given timestamp has missing values for any of the time series. This can happen for training timestamps if the training time series has missing data and
forecast
is called withpredict_on_train=true
.