merlion.spark package
This module implements APIs to integrate Merlion with PySpark. The expected use case is to use distributed computing to train and run inference on multiple time series in parallel.
Utils for reading & writing pyspark datasets. |
|
Pyspark pandas UDFs for Merlion functions. |
Submodules
merlion.spark.dataset module
Utils for reading & writing pyspark datasets.
- merlion.spark.dataset.TSID_COL_NAME = '__ts_id'
Many functions in this module rely on having a column named
TSID_COL_NAME
being in the dataset. This column can be added manually usingadd_tsid_column
, and its addition is handled automatically byread_dataset
.
- merlion.spark.dataset.read_dataset(spark, path, file_format='csv', time_col=None, index_cols=None, data_cols=None)
Reads a time series dataset as a pyspark Dataframe.
- Parameters
spark (
SparkSession
) – The current SparkSession.path (
str
) – The path at which the dataset is stored.file_format (
str
) – The file format the dataset is stored in.time_col (
Optional
[str
]) – The name of the column which specifies timestamp. IfNone
is provided, it is assumed to be the first column which is not an index column or pre-specified data column.index_cols (
Optional
[List
[str
]]) – The columns used to index the various time series in the dataset. IfNone
is provided, we assume the entire dataset is just a single time series.data_cols (
Optional
[List
[str
]]) – The columns we will use for downstream time series tasks. IfNone
is provided, we use all columns that are not a time or index column.
- Return type
DataFrame
- Returns
A pyspark dataframe with columns
[time_col, *index_cols, *data_cols, TSID_COL_NAME]
(in that order).
- merlion.spark.dataset.write_dataset(df, time_col, path, file_format='csv')
Writes the dataset at the specified path.
- Parameters
df (
DataFrame
) – The dataframe to save. The dataframe must have a columnTSID_COL_NAME
indexing the time series in the dataset (this column is automatically added byread_dataset
).time_col (
str
) – The name of the column which specifies timestamp.path (
str
) – The path to save the dataset at.file_format (
str
) – The file format in which to save the dataset.
- merlion.spark.dataset.create_hier_dataset(spark, df, time_col=None, index_cols=None, agg_dict=None)
Aggregates the time series in the dataset & appends them to the original dataset.
- Parameters
spark (
SparkSession
) – The current SparkSession.df (
DataFrame
) – A pyspark dataframe containing all the data. The dataframe must have a columnTSID_COL_NAME
indexing the time series in the dataset (this column is automatically added byread_dataset
).time_col (
Optional
[str
]) – The name of the column which specifies timestamp. IfNone
is provided, it is assumed to be the first column which is not an index column or pre-specified data column.index_cols (
Optional
[List
[str
]]) – The columns used to index the various time series in the dataset. IfNone
is provided, we assume the entire dataset is just a single time series. These columns define the levels of the hierarchy. For example, if each time series represents sales and we haveindex_cols = ["store", "item"]
, we will first aggregate sales for all items sold at a particular store; then we will aggregate sales for all items at all stores.agg_dict (
Optional
[Dict
]) – A dictionary used to specify how different data columns should be aggregated. If a data column is not in the dict, we aggregate using sum by default.
- Return type
Tuple
[DataFrame
,ndarray
]- Returns
The dataset with additional time series corresponding to each level of the hierarchy, as well as a matrix specifying how the hierarchy is constructed.
- merlion.spark.dataset.add_tsid_column(spark, df, index_cols)
Adds the column
TSID_COL_NAME
to the dataframe, which assigns an integer ID to each time series in the dataset.- Parameters
spark (
SparkSession
) – The current SparkSession.df (
DataFrame
) – A pyspark dataframe containing all the data.index_cols (
List
[str
]) – The columns used to index the various time series in the dataset.
- Return type
DataFrame
- Returns
The pyspark dataframe with an additional column
TSID_COL_NAME
added as the last column.
merlion.spark.pandas_udf module
Pyspark pandas UDFs for Merlion functions.
This module contains pandas UDFs that can be called via pyspark.sql.DataFrame.applyInPandas
to run Merlion
forecasting, anomaly detection, and time series reconciliation in parallel.
- merlion.spark.pandas_udf.forecast(pdf, index_cols, time_col, target_col, time_stamps, model, predict_on_train=False, agg_dict=None)
Pyspark pandas UDF for performing forecasting. Should be called on a pyspark dataframe grouped by time series ID, i.e. by
index_cols
.- Parameters
pdf (
DataFrame
) – Thepandas.DataFrame
containing the training data. Should be a single time series.index_cols (
List
[str
]) – The list of column names used to index all the time series in the dataset. Not used for modeling.time_col (
str
) – The name of the column containing the timestamps.target_col (
str
) – The name of the column whose value we wish to forecast.time_stamps (
Union
[List
[int
],List
[str
]]) – The timestamps at which we would like to obtain a forecast.model (
Union
[ForecasterBase
,dict
]) – The model (or modeldict
) we are using to obtain a forecast.predict_on_train (
bool
) – Whether to return the model’s prediction on the training data.agg_dict (
Optional
[dict
]) – A dictionary used to specify how different data columns should be aggregated. If a non-target data column is not in agg_dict, we do not model it for aggregated time series.
- Return type
DataFrame
- Returns
A
pandas.DataFrame
with the forecast & its standard error (NaN if the model doesn’t have error bars). Columns are[*index_cols, time_col, target_col, target_col + "_err"]
.
- merlion.spark.pandas_udf.anomaly(pdf, index_cols, time_col, train_test_split, model)
Pyspark pandas UDF for performing anomaly detection. Should be called on a pyspark dataframe grouped by time series ID, i.e. by
index_cols
.- Parameters
pdf (
DataFrame
) – Thepandas.DataFrame
containing the training and testing data. Should be a single time series.index_cols (
List
[str
]) – The list of column names used to index all the time series in the dataset. Not used for modeling.time_col (
str
) – The name of the column containing the timestamps.train_test_split (
Union
[int
,str
]) – The time at which the testing data starts.model (
Union
[DetectorBase
,dict
]) – The model (or modeldict
) we are using to predict anomaly scores.
- Return type
DataFrame
- Returns
A
pandas.DataFrame
with the anomaly scores on the test data. Columns are[*index_cols, time_col, "anom_score"]
.
- merlion.spark.pandas_udf.reconciliation(pdf, hier_matrix, target_col)
Pyspark pandas UDF for computing the minimum-trace hierarchical time series reconciliation, as described by Wickramasuriya et al. 2018. Should be called on a pyspark dataframe grouped by timestamp. Pyspark implementation of
merlion.utils.hts.minT_reconciliation
.- Parameters
pdf (
DataFrame
) – Apandas.DataFrame
containing forecasted values & standard errors fromm
time series at a single timestamp. Each time series should be indexed byTSID_COL_NAME
. The firstn
time series (in order of ID) orrespond to leaves of the hierarchy, while the remainingm - n
are weighted sums of the firstn
. This dataframe can be produced by callingforecast
on the dataframe produced bymerlion.spark.dataset.create_hier_dataset
.hier_matrix (
ndarray
) – Am
-by-n
matrix describing how the hierarchy is aggregated. The value of thek
-th time series isnp.dot(hier_matrix[k], pdf[:n])
. This matrix can be produced bymerlion.spark.dataset.create_hier_dataset
.target_col (
str
) – The name of the column whose value we wish to forecast.
- Returns
A
pandas.DataFrame
which replaces the original forecasts & errors with reconciled forecasts & errors.
Note
Time series series reconciliation is skipped if the given timestamp has missing values for any of the time series. This can happen for training timestamps if the training time series has missing data and
forecast
is called withpredict_on_train=true
.