{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "### Integrated-gradient on IMDB dataset (PyTorch)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is an example of the integrated-gradient method on text classification with a PyTorch model. If using this explainer, please cite the original work: https://github.com/ankurtaly/Integrated-Gradients." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import pandas as pd\n", "import torch\n", "import torch.nn as nn\n", "import sklearn\n", "from sklearn.datasets import fetch_20newsgroups\n", "\n", "from omnixai.data.text import Text\n", "from omnixai.preprocessing.text import Word2Id\n", "from omnixai.explainers.tabular.agnostic.L2X.utils import Trainer, InputData, DataLoader\n", "from omnixai.explainers.nlp.specific.ig import IntegratedGradientText" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We apply a simple CNN model for this text classification task. Note that the method `forward` has two inputs `inputs` (token ids) and `masks` (the sentence masks). For `IntegratedGradientText`, the first input of the model must be the token ids." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "class TextModel(nn.Module):\n", "\n", " def __init__(self, num_embeddings, num_classes, **kwargs):\n", " super().__init__()\n", " self.num_embeddings = num_embeddings\n", " self.embedding_size = kwargs.get(\"embedding_size\", 50)\n", " self.embedding = nn.Embedding(self.num_embeddings, self.embedding_size)\n", " self.embedding.weight.data.normal_(mean=0.0, std=0.01)\n", " \n", " hidden_size = kwargs.get(\"hidden_size\", 100)\n", " kernel_sizes = kwargs.get(\"kernel_sizes\", [3, 4, 5])\n", " if type(kernel_sizes) == int:\n", " kernel_sizes = [kernel_sizes]\n", "\n", " self.activation = nn.ReLU()\n", " self.conv_layers = nn.ModuleList([\n", " nn.Conv1d(self.embedding_size, hidden_size, k, padding=k // 2) for k in kernel_sizes])\n", " self.dropout = nn.Dropout(0.2)\n", " self.output_layer = nn.Linear(len(kernel_sizes) * hidden_size, num_classes)\n", "\n", " def forward(self, inputs, masks):\n", " embeddings = self.embedding(inputs)\n", " x = embeddings * masks.unsqueeze(dim=-1)\n", " x = x.permute(0, 2, 1)\n", " x = [self.activation(layer(x).max(2)[0]) for layer in self.conv_layers]\n", " outputs = self.output_layer(self.dropout(torch.cat(x, dim=1)))\n", " if outputs.shape[1] == 1:\n", " outputs = outputs.squeeze(dim=1)\n", " return outputs" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We use a `Text` object to represent a batch of texts/sentences. The package `omnixai.preprocessing.text` provides some transforms related to text data such as `Tfidf` and `Word2Id`." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# Load the training and test datasets\n", "train_data = pd.read_csv('/home/ywz/data/imdb/labeledTrainData.tsv', sep='\\t')\n", "n = int(0.8 * len(train_data))\n", "x_train = Text(train_data[\"review\"].values[:n])\n", "y_train = train_data[\"sentiment\"].values[:n].astype(int)\n", "x_test = Text(train_data[\"review\"].values[n:])\n", "y_test = train_data[\"sentiment\"].values[n:].astype(int)\n", "class_names = [\"negative\", \"positive\"]\n", "# The transform for converting words/tokens to IDs\n", "transform = Word2Id().fit(x_train)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The preprocessing function converts a batch of texts into token IDs and the masks. The outputs of the preprocessing function must fit the inputs of the model." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "max_length = 256\n", "device = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", "\n", "def preprocess(X: Text):\n", " samples = transform.transform(X)\n", " max_len = 0\n", " for i in range(len(samples)):\n", " max_len = max(max_len, len(samples[i]))\n", " max_len = min(max_len, max_length)\n", " inputs = np.zeros((len(samples), max_len), dtype=int)\n", " masks = np.zeros((len(samples), max_len), dtype=np.float32)\n", " for i in range(len(samples)):\n", " x = samples[i][:max_len]\n", " inputs[i, :len(x)] = x\n", " masks[i, :len(x)] = 1\n", " return inputs, masks" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We now train the CNN model and evaluate its performance." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " |████████████████████████████████████████| 100.0% Complete, Loss 0.0008\n" ] } ], "source": [ "model = TextModel(\n", " num_embeddings=transform.vocab_size,\n", " num_classes=len(class_names)\n", ").to(device)\n", "\n", "Trainer(\n", " optimizer_class=torch.optim.AdamW,\n", " learning_rate=1e-3,\n", " batch_size=128,\n", " num_epochs=10,\n", ").train(\n", " model=model,\n", " loss_func=nn.CrossEntropyLoss(),\n", " train_x=transform.transform(x_train),\n", " train_y=y_train,\n", " padding=True,\n", " max_length=max_length,\n", " verbose=True\n", ")" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Test accuracy: 0.8458027386386188\n" ] } ], "source": [ "model.eval()\n", "data = transform.transform(x_test)\n", "data_loader = DataLoader(\n", " dataset=InputData(data, [0] * len(data), max_length),\n", " batch_size=32,\n", " collate_fn=InputData.collate_func,\n", " shuffle=False\n", ")\n", "outputs = []\n", "for inputs in data_loader:\n", " value, mask, target = inputs\n", " y = model(value.to(device), mask.to(device))\n", " outputs.append(y.detach().cpu().numpy())\n", "outputs = np.concatenate(outputs, axis=0)\n", "predictions = np.argmax(outputs, axis=1)\n", "print('Test accuracy: {}'.format(\n", " sklearn.metrics.f1_score(y_test, predictions, average='binary')))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To initialize `IntegratedGradientText`, we need to set the following parameters:\n", "\n", " - `model`: The model to explain, whose type is `tf.keras.Model` or `torch.nn.Module`.\n", " - `embedding_layer`: The embedding layer in the model, which can be `tf.keras.layers.Layer` or `torch.nn.Module`.\n", " - `preprocess_function`: The pre-processing function that converts the raw input data into the inputs of `model`. The first output of `preprocess_function` should be the token ids.\n", " - `mode`: The task type, e.g., `classification` or `regression`.\n", " - `id2token`: The mapping from token ids to tokens." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
Instance 0: Class positive
\n", "
what a great movie if you have no taste

\n", "
Instance 1: Class positive
\n", "
it was a fantastic performance

\n", "
Instance 2: Class positive
\n", "
best film ever

\n", "
Instance 3: Class positive
\n", "
such a great show

\n", "
Instance 4: Class negative
\n", "
it was a horrible movie

\n", "
Instance 5: Class negative
\n", "
i never watched something as bad

" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "explainer = IntegratedGradientText(\n", " model=model,\n", " embedding_layer=model.embedding,\n", " preprocess_function=preprocess,\n", " id2token=transform.id_to_word\n", ")\n", "x = Text([\n", " \"What a great movie! if you have no taste.\",\n", " \"it was a fantastic performance!\",\n", " \"best film ever\",\n", " \"such a great show!\",\n", " \"it was a horrible movie\",\n", " \"i've never watched something as bad\"\n", "])\n", "explanations = explainer.explain(x)\n", "explanations.ipython_plot(class_names=class_names)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.5" } }, "nbformat": 4, "nbformat_minor": 2 }