Many of the examples and use cases for txtai focus on transforming text. Makes sense as txt is even in the name! But that doesn't mean txtai only works with text.
This article will cover examples of how to efficiently process tensors using txtai workflows.
Install dependencies
Install txtai
and all dependencies. We will install the api, pipeline and workflow optional extras packages, along with the datasets package
pip install txtai[api,pipeline,workflow] datasets
Transform large tensor arrays
The first section attempts to apply a simple transform to a very large memory-mapped array (2,000,000 x 1024).
import numpy as np
import torch
# Generate large memory-mapped array
rows, cols = 2000000, 1024
data = np.memmap("data.npy", dtype=np.float32, mode="w+", shape=(rows, cols))
del data
# Open memory-mapped array
data = np.memmap("data.npy", dtype=np.float32, shape=(rows, cols))
# Create tensor
tensor = torch.from_numpy(data).to("cuda:0")
# Apply tanh transform to tensor
torch.tanh(tensor).shape
torch.tanh(tensor).shape
RuntimeError: CUDA out of memory. Tried to allocate 7.63 GiB (GPU 0; 11.17 GiB total capacity; 7.63 GiB already allocated; 3.04 GiB free; 7.63 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation. See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF
ls -l --block-size=MB data.npy
-rw-r--r-- 1 root root 8192MB Dec 6 23:24 data.npy
Not surprisingly this runs out of CUDA memory. The array needs 2,000,000 * 1024 * 4 = 8GB
which exceeds the amount of GPU memory available.
One of the great things about NumPy and PyTorch arrays is that they can be sliced without having to copy data. Additionally, PyTorch has methods to work directly on NumPy arrays without copying data, in other words both NumPy arrays and PyTorch arrays can share the same memory. This opens the door to efficient processing of tensor data in place.
Let's try applying a simple tanh transform in batches over the array.
def process(x):
print(x.shape)
return torch.tanh(torch.from_numpy(x).to("cuda:0")).cpu().numpy()
# Split into 250,000 rows per call
batch = 250000
count = 0
for x in range(0, len(data), batch):
for row in process(data[x : x + batch]):
count += 1
print(count)
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
2000000
Iterating over the data array and selecting slices to operate on allows the transform to complete successfully! Each torch.from_numpy
call is building a view of a portion the existing large NumPy data array.
Enter workflows
The next section takes the same array and shows how workflows can apply transformations to tensors.
from txtai.workflow import Task, Workflow
# Create workflow with a single task calling process for each batch
task = Task(process)
workflow = Workflow([task], batch)
# Run workflow
count = 0
for row in workflow(data):
count += 1
print(count)
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
2000000
Workflows process the data in the same fashion as the code in the previous section. On top of that, workflows can handle text, images, video, audio, document, tensors and more. Workflow graphs can also be connected together to handle complex use cases.
Workflows with PyTorch models
The next example applies a PyTorch model to the same data. The model applies a series of transforms and outputs a single float per row.
from torch import nn
class Model(nn.Module):
def __init__(self):
super().__init__()
self.gelu = nn.ReLU()
self.linear1 = nn.Linear(1024, 512)
self.dropout = nn.Dropout(0.5)
self.norm = nn.LayerNorm(512)
self.linear2 = nn.Linear(512, 1)
def forward(self, inputs):
outputs = self.gelu(inputs)
outputs = self.linear1(outputs)
outputs = self.dropout(outputs)
outputs = self.norm(outputs)
outputs = self.linear2(outputs)
return outputs
model = Model().to("cuda:0")
def process(x):
with torch.no_grad():
outputs = model(torch.from_numpy(x).to("cuda:0")).cpu().numpy()
print(outputs.shape)
return outputs
# Create workflow with a single task calling model for each batch
task = Task(process)
workflow = Workflow([task], batch)
# Run workflow
count = 0
for row in workflow(data):
count += 1
print(count)
(250000, 1)
(250000, 1)
(250000, 1)
(250000, 1)
(250000, 1)
(250000, 1)
(250000, 1)
(250000, 1)
2000000
Once again the data can be processed in batches using workflows, even with a more complex model. Let's try a more interesting example.
Workflows in parallel
Workflows consist of a series of tasks. Each task can output one to many outputs per input element. Multi-output tasks have options available to merge the data for downstream tasks.
The following example builds a workflow with a task having three separate actions. Each action takes text as an input an applies a sentiment classifier. This is followed by a task that merges the three outputs for each row using a mean transform. Essentially, this workflow builds a weighted sentiment classifier using the outputs of three models.
import time
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForSequenceClassification
class Tokens:
def __init__(self, texts):
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")
tokens = tokenizer(texts, padding=True, return_tensors="pt").to("cuda:0")
self.inputs, self.attention = tokens["input_ids"], tokens["attention_mask"]
def __len__(self):
return len(self.inputs)
def __getitem__(self, value):
return (self.inputs[value], self.attention[value])
class Classify:
def __init__(self, model):
self.model = model
def __call__(self, tokens):
with torch.no_grad():
inputs, attention = tokens
outputs = self.model(input_ids=inputs, attention_mask=attention)
outputs = outputs["logits"]
return outputs
# Load reviews from the rotten tomatoes dataset
ds = load_dataset("rotten_tomatoes")
texts = ds["train"]["text"]
tokens = Tokens(texts)
model1 = AutoModelForSequenceClassification.from_pretrained("M-FAC/bert-tiny-finetuned-sst2")
model1 = model1.to("cuda:0")
model2 = AutoModelForSequenceClassification.from_pretrained("howey/electra-base-sst2")
model2 = model2.to("cuda:0")
model3 = AutoModelForSequenceClassification.from_pretrained("philschmid/MiniLM-L6-H384-uncased-sst2")
model3 = model3.to("cuda:0")
task1 = Task([Classify(model1), Classify(model2), Classify(model3)])
task2 = Task([lambda x: torch.sigmoid(x).mean(axis=1).cpu().numpy()])
workflow = Workflow([task1, task2], 250)
start = time.time()
for x in workflow(tokens):
pass
print(f"Took {time.time() - start} seconds")
Took 84.73194456100464 seconds
Note that while the task actions are parallel, that doesn't necessarily mean the operations are concurrent. In the case above, the actions are are executed sequentially.
Workflows have an additional option to run task actions concurrently. The two supported modes are "thread" and "process". I/O bound actions will do better with multithreading and CPU bound actions will do better with multiprocessing. More can be read in the txtai documentation.
task1 = Task([Classify(model1), Classify(model2), Classify(model3)], concurrency="thread")
task2 = Task([lambda x: torch.sigmoid(x).mean(axis=1).cpu().numpy()])
workflow = Workflow([task1, task2], 250)
start = time.time()
for x in workflow(tokens):
pass
print(f"Took {time.time() - start} seconds")
Took 85.21102929115295 seconds
In this case, concurrency doesn't improve performance. While the GIL is a factor, a bigger factor is that the GPU is already fully loaded. This method would be more beneficial if the system had a second GPU or the primary GPU had idle cycles.
Wrapping up
This article introduced a number of different ways to work with large-scale tensor data and process it efficiently. This article purposely didn't cover embeddings and pipelines to demonstrate how workflows can stand on their own. In addition to workflows, this article covered efficient methods to work with large tensor arrays in PyTorch and NumPy.