Lazily Loading ML Models for Scoring with PySpark

Jeshua Bratman
Abnormal Security Engineering Blog
5 min readDec 11, 2020

--

Authors: Jeshua Bratman and Vineet Edupuganti

Our core email attack detection product at Abnormal works by processing each incoming message, applying a series of classification models, and ultimately deciding if a message might be an attack. This detection system runs in an online distributed system processing millions of messages per day.

Rescoring

One key component in our pipeline is called rescoring. This data pipeline loads historical examples of email attacks in order to evaluate the accuracy of our detection system with respect to historical attacks. Rescoring allows us to easily measure how changes to code and classification models impact the performance of the product.

High-level steps in rescoring:

  1. Re-process messages
  2. Re-extract every attribute and feature
  3. Re-run every detection model
  4. Evaluate precision, recall, and other statistics

Crucially, we need this batch evaluation pipeline to resemble our online decisioning flow as closely as possible, using the exact same code paths for the scoring module, but run over large sets of batch data using Spark. If the code running was not identical to our real-time scoring systems, we would not be able to trust that the rescoring results match true detection performance in our online system.

Lazy evaluation for rescoring models

PySpark has been a very flexible batch processing tool, but one challenge we ran into with the above architecture surfaces when using certain optimized ML libraries. In this article, we’ll focus in particular on Tensorflow models (using Keras), and nearest-neighbor models (using Spotify’s Annoy library). Neither plays nicely with Spark.

The straightforward approach would be:

  1. Load models into memory in the Spark driver equivalently to how our real-time scorer loads models
  2. Broadcast these models to the Spark executors
  3. Run each model across examples in Spark executors

However, for both Tensorflow and Annoy, the loaded models exist outside the Python process memory space and therefore cannot be broadcast using PySpark (which attempts to just pickle the Python object).

Our simple solution is to broadcast only the bytes of the model and load them into memory in a lazy fashion.

class ModelWrapper:
def __init__(self, model_data):
self.model_data = model_data
def predict(self, features):
if not self.initialized:
self._load_model(self.model_data)
# Predict
def _load_model(self):
# Initialize Tensorflow Graph & Session
# Load model into memory.

This means the model is not actually loaded until the first time it is used to make a prediction.

Example 1: Keras+Tensorflow models

We use this lazy loading method, in particular, for Keras/TF models. Since the TensorFlow graph lives in memory outside of the Python process we must ensure this graph and session are created on the Spark executors. The models are loaded and used identically between the batch and realtime pipeline. Here’s how it looks:

The raw data we pass around for Keras models is simply

  1. The Keras JSON description
  2. Trained weights in the .h5 format

Sketch of loading Keras models from JSON & H5:

import h5py
from keras.models import model_from_json
from keras.engine import saving
model = model_from_json(model_json)
h5_file = h5py.File(io.BytesIO(model_bytes))
if “layer_names” not in f.attrs and “model_weights” in f:
h5_file =h5_file[“model_weights”]
saving.load_weights_from_hdf5_group(h5_file, model.layers, reshape=reshape)

Example 2: Nearest Neighbor store using Annoy

Just as with our Keras and TF models, we need to handle serialization and lazy loading when working with Annoy objects as well. Properly leveraging Annoy is an important requirement for our work on intelligent signatures and gives us the ability to perform efficient nearest neighbor lookups with large quantities of data (i.e. given an embedding, find the entries in our nearest neighbor store with the most similar embeddings).

Originally developed by Spotify and used for music recommendations, Annoy uses an approximate approach that creates an index based on a set of trees to enable fast lookup. Additionally, the library allows one to build indices offline and share them across memory processes. Despite these benefits, there are a few challenges with leveraging Annoy objects in production, as we describe below.

Serialization

After building the Annoy index, it is important to be able to save it to a distributed file store, so we can load it in the same manner as our other models. However, a limitation of the Annoy library is that it only enables saving to disk, and other serialization methods (Pickle, for example) will not work. As such, we use an indirect approach whereby we save the index to a temporary file, read the corresponding bytes back, and store it as part of a larger serialized thrift object. To read the index, we load in the bytes directly from our online file store, save to a temporary file, and load in the index with the Annoy API. See below for corresponding code snippets.

def get_annoy_bytes(annoyIndex):
with tempfile.NamedTemporaryFile(suffix='.ann') as fp:
fname = fp.name
annoyIndex.save(fname)
file = open(fname, "rb")
annoy_bytes = file.read()
file.close()
return annoy_bytes
def read_annoy_bytes(dimension, distance, annoy_bytes):
annoy_index = annoy.AnnoyIndex(dimension, distance)
with tempfile.NamedTemporaryFile(suffix='.ann') as fp:
fname = fp.name
fp.write(annoy_bytes)
annoy_index.load(fname)
return annoy_indexdef get_annoy_bytes(annoyIndex):

Lazy loading

In addition to serialization, we have to deal with the same challenges as the Keras models in terms of not being able to broadcast Annoy objects using PySpark. As such, we have to employ the lazy loading mechanism described above. With this approach, we initialize the Annoy index to None, such that we can broadcast without memory errors. Each time a given worker tries to score a message, the lazy init function is called, which loads in the Annoy index object. After the first time the object is loaded no additional read operations are required.

Large scale ML at Abnormal

In summary, this post provides a quick glimpse into a few of the challenges with running large-scale ML systems in production — namely leveraging libraries like Keras and Annoy in conjunction with big data frameworks like PySpark. We hope the methods we introduce for addressing lazy loading and serialization can be useful to you as you face similar challenges working with these tools.

To learn more about the exciting work that Abnormal is doing, check out the rest of our blog here. And if developing machine learning models and software systems to stop cybercrime interests you, yes we’re hiring!

--

--

Jeshua Bratman
Abnormal Security Engineering Blog

Founding engineer and Head of ML at Abnormal Security. I write about AI, ML, Data Science, and Cyber Security mixed with some comedy