Tag: AI

  • ML Engineering 101: A Thorough Explanation of The Error “DataLoader worker (pid(s) xxx) exited…

    ML Engineering 101: A Thorough Explanation of The Error “DataLoader worker (pid(s) xxx) exited…

    Mengliu Zhao

    ML Engineering 101: A Thorough Explanation of The Error “DataLoader worker (pid(s) xxx) exited unexpectedly”

    A deep dive into PyTorch DataLoader with Multiprocessing

    As one of the many who use the PyTorch library on a day-to-day basis, I believe many ML engineer sooner or later encounters the problem “DataLoader worker (pid(s) xxx) exited unexpectedly” during training.

    It’s frustrating.

    This error is often triggered when calling the DataLoader with parameter num_workers > 0. Many online posts provide simple solutions like setting the num_workers=0, which makes the current issue go away but causes problems new in reality.

    This post will show you some tricks that may help resolve the problem. I’m going to do a deeper dive into the Torch.multiprocessing module and show you some useful virtual memory monitoring and leakage-preventing techniques. In a really rare case, the asynchronous memory occupation and release of the torch.multiprocessing workers could still trigger the issue, even without leakage. The ultimate solution is to optimize the virtual memory usage and understand the torch.multiprocessing behaviour, and perform garbage collection in the __getitem_ method.

    Note: the platform I worked on is Ubuntu 20.04. To adapt to other platforms, many terminal commands need to be changed.

    Image source: https://pxhere.com/en/photo/1379760#google_vignette

    Brute-force Solution and the Cons

    If you search on the web, most people encountering the same issue will tell you the brute-force solution; just set num_workers=0 in the DataLoader, and the issue will be gone.

    It will be the easiest solution if you have a small dataset and can tolerate the training time. However, the underlying issue is still there, and if you have a very large dataset, setting num_workers=0 will result in a very slow performance, sometimes 10x slower. That’s why we must look into the issue further and seek alternative solutions.

    Monitor Your Virtual Memory Usage

    What exactly happens when the dataloader worker exits?

    To catch the last error log in the system, run the following command in the terminal, which will give you a more detailed error message.

    dmesg -T

    Usually, you’ll see the real cause is “out of memory”. But why is there an out-of-memory issue? What specifically caused the extra memory consumption?

    When we set num_workers =0 in the DataLoader, a single main process runs the training script. It will run properly as long as the data batch can fit into memory.

    However, when setting num_workers > 0, things become different. DataLoader will start child processes alongside preloading prefetch_factor*num_workers into the memory to speed things up. By default, prefetch_factor = 2. The prefetched data will consume the machine’s virtual memory (but the good news is that it doesn’t eat up GPUs, so you don’t need to shrink the batch size). So, the first thing we need to do is to monitor the system’s virtual memory usage.

    One of the easiest ways to monitor virtual memory usage is the psutil package, which will monitor the percentage of virtual memory being used

    import psutil
    print(psutil.virtual_memory().percent)

    You can also use the tracemalloc package, which will give you more detailed information:

    snapshot = tracemalloc.take_snapshot()
    top_stats = snapshot.statistics('lineno')
    for stat in top_stats[:10]:
    print(stat)

    When the actual RAM is full, idle data will flow into the swap space (so it’s part of your virtual memory). To check the swap, use the command:

    free -m

    And to change your swap size temporarily during training (e.g., increase to 16G) in the terminal:

    swapoff -a
    fallocate -l 16G /swapfile
    chmod 600 /swapfile
    mkswap /swapfile
    swapon /swapfile

    /dev/shm (or, in certain cases, /run/shm ) is another file system for storing temporary files, which should be monitored. Simply run the following, and you will see the list of drives in your file system:

    df -h

    To resize it temporarily (e.g., increase to 16GB), simply run:

    sudo mount -o remount,size=16G /dev/shm

    Torch.multiprocessing Best Practices

    However, virtual memory is only one side of the story. What if the issue doesn’t go away after adjusting the swap disk?

    The other side of the story is the underlying issues of the torch.multiprocessing module. There are a number of best practices recommendations on the official webpage:

    Multiprocessing best practices – PyTorch 2.3 documentation

    But besides these, three more approaches should be considered, especially regarding memory usage.

    The first thing is shared memory leakage. Leakage means that memory is not released properly after each run of the child worker, and you will observe this phenomenon when you monitor the virtual memory usage at runtime. Memory consumption will keep increasing and reach the point of being “out of memory.” This is a very typical memory leakage.

    So what will cause the leakage?

    Let’s take a look at the DataLoader class itself:

    https://github.com/pytorch/pytorch/blob/main/torch/utils/data/dataloader.py

    Looking under the hood of DataLoader, we’ll see that when nums_worker > 0, _MultiProcessingDataLoaderIter is called. Inside _MultiProcessingDataLoaderIter, Torch.multiprocessing creates the worker queue. Torch.multiprocessing uses two different strategies for memory sharing and caching: file_descriptor and file_system. While file_system requires no file descriptor caching, it is prone to shared memory leaks.

    To check what sharing strategy your machine is using, simply add in the script:

    torch.multiprocessing.get_sharing_strategy()

    To get your system file descriptor limit (Linux), run the following command in the terminal:

    ulimit -n

    To switch your sharing strategy to file_descriptor:

    torch.multiprocessing.set_sharing_strategy(‘file_descriptor’)

    To count the number of opened file descriptors, run the following command:

    ls /proc/self/fd | wc -l

    As long as the system allows, the file_descriptor strategy is recommended.

    The second is the multiprocessing worker starting method. Simply put, it’s the debate as to whether to use a fork or spawn as the worker-starting method. Fork is the default way to start multiprocessing in Linux and can avoid certain file copying, so it is much faster, but it might have issues handling CUDA tensors and third-party libraries like OpenCV in your DataLoader.

    To use the spawn method, you can simply pass the argument multiprocessing_context= “spawn”. to the DataLoader.

    Three, make the Dataset Objects Pickable/Serializable

    There is a super nice post further discussing the “copy-on-read” effect for process folding: https://ppwwyyxx.com/blog/2022/Demystify-RAM-Usage-in-Multiprocess-DataLoader/

    Simply put, it’s no longer a good approach to create a list of filenames and load them in the __getitem__ method. Create a numpy array or panda dataframe to store the list of filenames for serialization purposes. And if you’re familiar with HuggingFace, using a CSV/dataframe is the recommended way to load a local dataset: https://huggingface.co/docs/datasets/v2.19.0/en/package_reference/loading_methods#datasets.load_dataset.example-2

    What If You Have a Really Slow Loader?

    Okay, now we have a better understanding of the multiprocessing module. But is it the end of the story?

    It sounds really crazy. If you have a large and heavy dataset (e.g., each data point > 5 MB), there is a weird chance of encountering the above issues, and I’ll tell you why. The secret is the asynchronous memory release of the multiprocessing workers.

    The trick is simple: hack into the torch library and add a psutil.virtual_memory().percent line before and after the data queue in the _MultiProcessingDataLoaderIter class:

    pytorch/torch/utils/data/dataloader.py at 70d8bc2da1da34915ce504614495c8cf19c85df2 · pytorch/pytorch

    Something like

    print(“before clearing”, psutil.virtual_memory().percent)
    data = self._data_queue.get(timeout=timeout)
    print("after", psutil.virtual_memory().percent)

    In my case, I started my DataLoader with num_workers=8 and observed something like the following:

    So the memory keeps flowing up — but is it memory leakage? Not really. It’s simply because the dataloader workers load faster than they release, creating 8 jobs while releasing 2. And that’s the root cause of the memory overflowing. The solution is simple: just add a garbage collector to the beginning of your __getitem__ method:

    import gc
    def __getitem__(self, idx):
    gc.collect()

    And now you’re good!

    References


    ML Engineering 101: A Thorough Explanation of The Error “DataLoader worker (pid(s) xxx) exited… was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story.

    Originally appeared here:
    ML Engineering 101: A Thorough Explanation of The Error “DataLoader worker (pid(s) xxx) exited…

    Go Here to Read this Fast! ML Engineering 101: A Thorough Explanation of The Error “DataLoader worker (pid(s) xxx) exited…

  • Measuring The Intrinsic Causal Influence Of Your Marketing Campaigns

    Ryan O’Sullivan

    Causal AI, exploring the integration of causal reasoning into machine learning

    Photo by Melanie Deziel on Unsplash

    What is this series of articles about?

    Welcome to my series on Causal AI, where we will explore the integration of causal reasoning into machine learning models. Expect to explore a number of practical applications across different business contexts.

    In the last article we covered optimising non-linear treatment effects in pricing and promotions. This time round we will cover measuring the intrinsic causal influence of your marketing campaigns.

    If you missed the last article on non-linear treatment effects in pricing and promotions, check it out here:

    Optimising Non-Linear Treatment Effects in Pricing and Promotions

    Introduction

    In this article I will help you understand how you can measure the intrinsic causal influence of your marketing campaigns.

    The following aspects will be covered:

    • What are the challenges when it comes to marketing measurement?
    • What is intrinsic causal influence and how does it work?
    • A worked case study in Python showcasing how we can use intrinsic causal influence to give marketing campaigns the credit they deserve.

    The full notebook can be found here:

    causal_ai/notebooks/understanding the intrinsic causal influence of your marketing campaigns.ipynb at main · raz1470/causal_ai

    What are the challenges when it comes to marketing measurement?

    What are the different types of marketing campaigns?

    Organisations use marketing to grow their business by acquiring new customers and retaining existing ones. Marketing campaigns are often split into 3 main categories:

    • Brand
    • Performance
    • Retention

    Each one has it’s own unique challenges when it comes to measurement — Understanding these challenges is crucial.

    Brand campaigns

    The aim of brand campaigns is to raise awareness of your brand in new audiences. They are often run across TV and social media, with the latter often in the format of a video. They don’t usually have a direct call-to-action e.g. “our product will last you a lifetime”.

    The challenge with measuring TV is immediately obvious — We can’t track who has seen a TV ad! But we also have similar challenges when it’s comes to social media — If I watch a video on Facebook and then organically visit the website and purchase the product the next day it is very unlikely we will be able to tie these two events together.

    There is also a secondary challenge of a delayed effect. When raising awareness in new audiences, it might take days/weeks/months for them to get to the point where they consider purchasing your product.

    There is a debatable argument that brand campaigns do all the hard work — However, when it comes to marketing measurement, they often get undervalued because of some of the challenges we highlighted above.

    Performance campaigns

    In general performance campaigns are aimed at customers in-market for your product. They run across paid search, social and affiliate channels. They usually have a call-to-action e.g. “click now to get 5% off your first purchase”.

    When it comes to performance campaigns it isn’t immediately obvious why they are challenging to measure. It’s very likely that we will be able to link the event of a customer clicking on a performance campaign and that customer purchasing that day.

    But would they have clicked if they weren’t already familiar with the brand? How did they get familiar with the brand? If we hadn’t of shown them the campaign, would they have purchased organically anyway? These are tough questions to answer from a data science perspective!

    Retention campaigns

    The other category of campaigns is retention. This is marketing aimed at retaining existing customers. We can usually run AB tests to measure these campaigns.

    Acquisition marketing graph

    It is common to refer to brand and performance campaigns as acquisition marketing. As I mentioned earlier, it is challenging to measure brand and performance campaigns — We often undervalue brand campaigns and overvalue performance campaigns.

    The graph below is a motivating (but simplified) example of how acquisition marketing works:

    User generated image

    How can we (fairly) estimate how much each node contributed to revenue? This is where intrinsic causal influence comes into the picture — Lets’ dive into what it is in the next section!

    What is intrinsic causal influence and how does it work?

    Where does the concept come from?

    The concept was originally proposed in a paper back in 2020:

    Quantifying intrinsic causal contributions via structure preserving interventions

    It is implemented in the GCM module within the python package DoWhy:

    Quantifying Intrinsic Causal Influence – DoWhy documentation

    I personally found the concept quite hard to grasp initially, so in the next section let’s break it down step by step.

    Recap on causal graphs

    Before we try and understand intrinsic causal influence, it is important to have an understanding of causal graphs, structural causal models (SCM) and additive noise models (ANM). My article earlier in the series should help bring you up to speed:

    Using Causal Graphs to answer causal questions

    As a reminder, each node in a causal graph can be seen as the target in a model where it’s direct parents are used as features. It is common to use an additive noise model for each non-root node:

    User generated image

    What really is intrinsic causal influence?

    Now we have recapped causal graphs, let’s start to understand what intrinsic causal influence really is…

    The dictionary definition of intrinsic is “belonging naturally”. In my head I think of a funnel, and the stuff at the top of the funnel are doing the heavy lifting — We want to attribute them the causal influence that they deserve.

    Let’s take the example graph below to help us start to unpick intrinsic causal influence further:

    User generated image
    • A, B and C are root nodes.
    • D is a non-root node, which we can model using it’s direct parents (A, B, C) and a noise term.
    • E is a non-root node, which similar to D, we can model using it’s direct parents (A, B, C) and a noise term.
    • F is our target node, which we can model using it’s direct parents (D, E) and a noise term.

    Let’s focus on node D. It inherits some of it’s influence on node F from node A, B and C. The intrinsic part of it’s influence on node F comes from the noise term. Therefore we are saying that each nodes noise term can be used to estimate the intrinsic causal influence on a target node. It’s worth noting that root nodes are just made up of noise.

    In the case study, we will delve deeper into exactly how to calculate intrinsic causal influence.

    How can it help us measure our marketing campaigns?

    Hopefully you can already see the link between the acquisition marketing example and intrinsic causal influence — Can intrinsic causal infleunce help us stop undervaluing brand campaigns and stop overvaluing performance campaigns? Let’s find out in the case study!

    Case study

    Background

    It’s coming towards the end of the year and the Director of Marketing is coming under pressure from the Finance team to justify why she plans to spend so much on marketing next year. The Finance team use a last click model in which revenue is attributed to the last thing a customer clicked on. They question why they even need to spend anything on TV when everyone comes through organic or social channels!

    The Data Science team are tasked with estimating the intrinsic causal influence of each marketing channel.

    Setting up the Graph (DAG)

    We start by setting up a DAG using expert domain knowledge, re-using the marketing acquisition example from earlier:

    # Create node lookup for channels
    node_lookup = {0: 'Demand',
    1: 'TV spend',
    2: 'Social spend',
    3: 'Organic clicks',
    4: 'Social clicks',
    5: 'Revenue'
    }

    total_nodes = len(node_lookup)

    # Create adjacency matrix - this is the base for our graph
    graph_actual = np.zeros((total_nodes, total_nodes))

    # Create graph using expert domain knowledge
    graph_actual[0, 3] = 1.0 # Demand -> Organic clicks
    graph_actual[0, 4] = 1.0 # Demand -> Social clicks
    graph_actual[1, 3] = 1.0 # Brand spend -> Organic clicks
    graph_actual[2, 3] = 1.0 # Social spend -> Organic clicks
    graph_actual[1, 4] = 1.0 # Brand spend -> Social clicks
    graph_actual[2, 4] = 1.0 # Social spend -> Social clicks
    graph_actual[3, 5] = 1.0 # Organic clicks -> Revenue
    graph_actual[4, 5] = 1.0 # Social clicks -> Revenue

    In essence, the last click model which the finance team are using only uses the direct parents of revenue to measure marketing.

    User generated image

    Data generating process

    We create some samples of data following the data generating process of the DAG:

    • 3 root nodes made up of noise terms; Demand, Brand spend and Social spend.
    • 2 non-root nodes, both inheriting influence from the 3 root nodes plus some noise terms; Organic clicks, Social clicks.
    • 1 target nodes, inheriting influence from the 2 non-root nodes plus a noise term; Revenue
    # Create dataframe with 1 column per code
    df = pd.DataFrame(columns=node_lookup.values())

    # Setup data generating process
    df[node_lookup[0]] = np.random.normal(100000, 25000, size=(20000)) # Demand
    df[node_lookup[1]] = np.random.normal(100000, 20000, size=(20000)) # Brand spend
    df[node_lookup[2]] = np.random.normal(100000, 25000, size=(20000)) # Social spend
    df[node_lookup[3]] = 0.75 * df[node_lookup[0]] + 0.50 * df[node_lookup[1]] + 0.25 * df[node_lookup[2]] + np.random.normal(loc=0, scale=2000, size=20000) # Organic clicks
    df[node_lookup[4]] = 0.30 * df[node_lookup[0]] + 0.50 * df[node_lookup[1]] + 0.70 * df[node_lookup[2]] + np.random.normal(100000, 25000, size=(20000)) # Social clicks
    df[node_lookup[5]] = df[node_lookup[3]] + df[node_lookup[4]] + np.random.normal(loc=0, scale=2000, size=20000) # Revenue

    Training the SCM

    Now we can train the SCM using the GCM module from the python package DoWhy. We setup the data generating process with linear relationships therefore we can use ridge regression as the causal mechanism for each non-root node:

    # Setup graph
    graph = nx.from_numpy_array(graph_actual, create_using=nx.DiGraph)
    graph = nx.relabel_nodes(graph, node_lookup)

    # Create SCM
    causal_model = gcm.InvertibleStructuralCausalModel(graph)

    causal_model.set_causal_mechanism('Demand', gcm.EmpiricalDistribution()) # Deamnd
    causal_model.set_causal_mechanism('TV spend', gcm.EmpiricalDistribution()) # Brand spend
    causal_model.set_causal_mechanism('Social spend', gcm.EmpiricalDistribution()) # Social spend

    causal_model.set_causal_mechanism('Organic clicks', gcm.AdditiveNoiseModel(gcm.ml.create_ridge_regressor())) # Organic clicks
    causal_model.set_causal_mechanism('Social clicks', gcm.AdditiveNoiseModel(gcm.ml.create_ridge_regressor())) # Social clicks
    causal_model.set_causal_mechanism('Revenue', gcm.AdditiveNoiseModel(gcm.ml.create_ridge_regressor())) # Revenue

    gcm.fit(causal_model, df)

    Intrinsic causal influence

    We can easily compute the intrinsic causal influence using the GCM module. We do so and convert the contributions to percentages:

    # calculate intrinsic causal influence
    ici = gcm.intrinsic_causal_influence(causal_model, target_node='Revenue')

    def convert_to_percentage(value_dictionary):
    total_absolute_sum = np.sum([abs(v) for v in value_dictionary.values()])
    return {k: round(abs(v) / total_absolute_sum * 100, 1) for k, v in value_dictionary.items()}

    convert_to_percentage(ici)
    User generated image

    Let’s show these on a bar chart:

    # Convert dictionary to DataFrame
    df = pd.DataFrame(list(ici.items()), columns=['Node', 'Intrinsic Causal Influence'])

    # Create a bar plot
    plt.figure(figsize=(10, 6))
    sns.barplot(x='Node', y='Intrinsic Causal Influence', data=df)

    # Rotate x labels for better readability
    plt.xticks(rotation=45)
    plt.title('Bar Plot from Dictionary Data')
    plt.show()
    User generated image

    Are our results intuitive? If you take a look back at the data generating process code you will see they are! Pay close attention to what each non-root node is inheriting and what additional noise is being added.

    The intrinsic causal influence module is really easy to use, but it doesn’t help us understand the method behind it — To finish off, let’s explore the inner working of intrinsic causal influence!

    Intrinsic causal influence — How does it work?

    We want to estimate how much the noise term of each node contributes to the target node:

    • It is worth remembering that root nodes are just made up of a noise term.
    • In the non-root nodes, we separate the noise term from the what was inherited from parents.
    • We also include the noise term from the target node – This could be interpreted as the contribution of unobserved confounders (although it could also be down the model mis-specification).
    • The noise terms are then used to explain the variance in the target node – This can be seen as a model with noise terms as features and the target node as the outcome.
    • The model is used to estimate the conditional distribution of the target node given subsets of noise variables.
    • Shapley is then used to estimate the contribution of each noise term – If changing the noise term has little impact on the target, then the intrinsic causal influence will be very small.

    Closing thoughts

    Today we covered how you can estimate the intrinsic causal influence of your marketing campaigns. Here are some closing thoughts:

    • Intrinsic causal influence is a powerful concept that could be applied across different use cases, not just marketing.
    • Understanding the inner workings will help you apply it more effectively.
    • Identifying the DAG and estimating the graph accurately is key to getting reasonable intrinsic causal influence estimates.
    • In the marketing acquisition example, you may want to think about adding lagged effects for brand marketing.

    Follow me if you want to continue this journey into Causal AI — In the next article we will explore how we can validate and calibrate our causal models using Synthetic Controls.


    Measuring The Intrinsic Causal Influence Of Your Marketing Campaigns was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story.

    Originally appeared here:
    Measuring The Intrinsic Causal Influence Of Your Marketing Campaigns

    Go Here to Read this Fast! Measuring The Intrinsic Causal Influence Of Your Marketing Campaigns

  • PRISM-Rules in Python

    PRISM-Rules in Python

    W Brett Kennedy

    A simple python rules-induction system

    This article is part of a series covering interpretable predictive models. Previous articles covered ikNN and Additive Decision Trees. PRISM is an existing algorithm (though I did create a python implementation), and the focus in this series is on original algorithms, but I felt it was useful enough to warrant it’s own article as well. Although it is an old idea, I’ve found it to be competitive with most other interpretable models for classification and have used it quite a number of times.

    PRISM is relatively simple, but in machine learning, sometimes the most complicated solutions work best and sometimes the simplest. Where we wish for interpretable models, though, there is a strong benefit to simplicity.

    PRISM is a rules-induction tool. That is, it creates a set of rules to predict the target feature from the other features.

    Rules have at least a couple very important purposes in machine learning. One is prediction. Similar to Decision Trees, linear regression, GAMs, ikNN, Additive Decision Trees, Decision Tables, and small number of other tools, they can provide interpretable classification models.

    Rules can also be used simply as a technique to understand the data. In fact, even without labels, they can be used in an unsupervised manner, creating a set of rules to predict each feature from the others (treating each feature in the table in turns as a target column), which can highlight any strong patterns in the data.

    There are other tools for creating rules in python, including the very strong imodels library. However, it can still be challenging to create a set of rules that are both accurate and comprehensible. Often rules induction systems are unable to create reasonably accurate models, or, if they are able, only by creating many rules and rules with many terms. For example, rules such as:

    IF color=’blue’ AND height < 3.4 AND width > 3.2 AND length > 33.21 AND temperature > 33.2 AND temperature < 44.2 AND width < 5.1 AND weight > 554.0 AND … THEN…

    Where rules have more than about five or ten terms, they can become difficult to follow. Given enough terms, rules can eventually become effectively uninterpretable. And where the set of rules includes more than a moderate number of rules, the rule set as a whole becomes difficult to follow (more so if each rule has many terms).

    PRISM Rules

    PRISM is a rules-induction system first proposed by Chendrowska [1] [2] and described in Principles of Data Mining [3].

    I was unable to find a python implementation and so created one. The main page for PRISM rules is: https://github.com/Brett-Kennedy/PRISM-Rules.

    PRISM supports generating rules both as a descriptive model: to describe patterns within a table (in the form of associations between the features); and as a predictive model. It very often produces a very concise, clean set of interpretable rules.

    As a predictive model, it provides both what are called global and local explanations (in the terminology used in Explainable AI (XAI)). That is, it is fully-interpretable and allows both understanding the model as a whole and the individual predictions.

    Testing multiple rules-induction systems, I very often find PRISM produces the cleanest set of rules. Though, no one system works consistently the best, and it’s usually necessary to try a few rules induction tools.

    The rules produced are in disjunctive normal form (an OR of ANDs), with each individual rule being the AND of one or more terms, with each term of the form Feature = Value, for some Value within the set of values for that feature. For example: the rules produced may be of the form:

    Rules for target value: ‘blue’:

    • IF feat_A = ‘hot’ AND feat_C = ‘round’ THEN ‘blue’
    • IF feat_A = ‘warm’ AND feat_C = ‘square’ THEN ‘blue’

    Rules for target value: ‘red’:

    • IF feat_A = ‘cold’ AND feat_C = ‘triangular’ THEN ‘red’
    • IF feat_A = ‘cool’ AND feat_C = ‘triangular’ THEN ‘red’

    The algorithm works strictly with categorical features, in both the X and Y columns. This implementation will, therefore, automatically bin any numeric columns to support the algorithm. By default, three equal-count bins (representing low, medium, and high values for the feature) are used, but this is configurable through the nbins parameter (more or less bins may be used).

    PRISM Algorithm

    For this section, we assume we are using PRISM as a predictive model, specifically as a classifier.

    The algorithm works by creating a set of rules for each class in the target column. For example, if executing on the Iris dataset, where there are three values in the target column (Setosa, Versicolour, and Virginica), there would be a set of rules related to Setosa, a set related to Versicolour, and a set related to Virginica.

    The generated rules should be read in a first-rule-to-fire manner, and so all rules are generated and presented in a sensible order (from most to least relevant for each target class). For example, examining the set of rules related to Setosa, we would have a set of rules that predict when an iris is Setosa, and these would be ordered from most to least predictive. Similarly for the sets of rules for the other two classes.

    Generating the Rules

    We’ll describe here the algorithm PRISM uses to generate a set of rules for one class. With the Iris dataset, lets say we’re about to generate the rules for the Setosa class.

    To start, PRISM finds the best rule available to predict that target value. This first rule for Setosa would predict as many of the Setosa records as possible. That is, we find the unique set of values in some subset of the other features that best predicts when a record will be Setosa. This is the first rule for Setosa.

    The first rule will, however, not cover all Setosa records, so we create additional rules to cover the remaining rows for Setosa (or as many as we can).

    As each rule is discovered, the rows matching that rule are removed, and the next rule is found to best describe the remaining rows for that target value.

    The rules may each have any number of terms.

    For each other value in the target column, we start again with the full dataset, removing rows as rules are discovered, and generating additional rules to explain the remaining rows for this target class value. So, after finding the rules for Setosa, PRISM would generate the rules for Versicolour, and then for Virginica.

    Coverage and Support

    This implementation enhances the algorithm as described in Principles of Data Mining by outputting statistics related to each rule, as many induced rules can be much more relevant or, the opposite, of substantially lower significance than other rules induced.

    As well, tracking simple statistics about each rule allows providing parameters to specify the minimum coverage for each rule (the minimum number of rows in the training data for which it applies); and the minimum support (the minimum probability of the target class matching the desired value for rows matching the rule). These help reduce noise (extra rules that add only small value to the descriptive or predictive power of the model), though can result in some target classes having few or no rules, potentially not covering all rows for one or more target column values. In these cases, users may wish to adjust these parameters.

    Comparison to Decision Trees

    Decision trees are among the most common interpretable models, quite possibly the most common. When sufficiently small, they can be reasonably interpretable, perhaps as interpretable as any model type, and they can be reasonably accurate for many problems (though certainly not all). They do, though, have limitations as interpretable models, which PRISM was designed to address.

    Decision trees were not specifically designed to be interpretable; it is a convenient property of decision trees that they are as interpretable as they are. They, for example, often grow much larger than is easily comprehensible, often with repeated sub-trees as relationships to features have to be repeated many times within the trees to be properly captured.

    As well, the decision paths for individual predictions may include nodes that are irrelevant, or even misleading, to the final predictions, further reducing compressibility.

    The Cendrowska paper provides examples of simple sets of rules that cannot be represented easily by trees. For example:

    • Rule 1: IF a = 1 AND b = 1 THEN class = 1
    • Rule 2: IF c = 1 AND d = 1 THEN class = 1

    These lead to a surprisingly complex tree. In fact, this is a common pattern that results in overly-complex decision trees: “where there are two (underlying) rules with no attribute in common, a situation that is likely to occur frequently in practice” [3]

    Rules can often generate more interpretable models than can decision trees (though the opposite is also often true) and are useful to try with any project where interpretable models are beneficial. And, where the goal is not building a predictive model, but understanding the data, using multiple models may be advantageous to capture different elements of the data.

    Installation

    The project consists of a single python file which may be downloaded and included in any project using:

    from prism_rules import PrismRules

    Example using the Wine dataset from sklearn

    The github page provides two example notebooks that provide simple, but thorough examples of using the tool. The tool is, though, quite straight-forward. To use the tool to generate rules, simply create a PrismRules object and call get_prism_rules() with a dataset, specifying the target column:

    import pandas as pd
    from sklearn.datasets import load_wine

    data = datasets.load_wine()
    df = pd.DataFrame(data.data, columns=data.feature_names)
    df['Y'] = data['target']
    display(df.head())

    prism = PrismRules()
    _ = prism.get_prism_rules(df, 'Y')

    Results

    This dataset has three values in the target column, so will generate three sets of rules:

    ................................................................
    Target: 0
    ................................................................

    proline = High AND alcohol = High
    Support: the target has value: '0' for 100.000% of the 39 rows matching the rule
    Coverage: the rule matches: 39 out of 59 rows for target value: 0. This is:
    66.102% of total rows for target value: 0
    21.910% of total rows in data

    proline = High AND alcalinity_of_ash = Low
    Support: The target has value: '0' for 100.000% of the 10 remaining rows matching the rule
    Coverage: The rule matches: 10 out of 20 rows remaining for target value: '0'. This is:
    50.000% of remaining rows for target value: '0'
    16.949% of total rows for target value: 0
    5.618% of total rows in data0


    ................................................................
    Target: 1
    ................................................................
    color_intensity = Low AND alcohol = Low
    Support: the target has value: '1' for 100.000% of the 46 rows matching the rule
    Coverage: the rule matches: 46 out of 71 rows for target value: 1. This is:
    64.789% of total rows for target value: 1
    25.843% of total rows in data

    color_intensity = Low
    Support: The target has value: '1' for 78.571% of the 11 remaining rows matching the rule
    Coverage: The rule matches: 11 out of 25 rows remaining for target value: '1'. This is:
    44.000% of remaining rows for target value: '1'
    15.493% of total rows for target value: 1
    6.180% of total rows in data

    ................................................................
    Target: 2
    ................................................................
    flavanoids = Low AND color_intensity = Med
    Support: the target has value: '2' for 100.000% of the 16 rows matching the rule
    Coverage: the rule matches: 16 out of 48 rows for target value: 2. This is:
    33.333% of total rows for target value: 2
    8.989% of total rows in data

    flavanoids = Low AND alcohol = High
    Support: The target has value: '2' for 100.000% of the 10 remaining rows matching the rule
    Coverage: The rule matches: 10 out of 32 rows remaining for target value: '2'. This is:
    31.250% of remaining rows for target value: '2'
    20.833% of total rows for target value: 2
    5.618% of total rows in data

    flavanoids = Low AND color_intensity = High AND hue = Low
    Support: The target has value: '2' for 100.000% of the 21 remaining rows matching the rule
    Coverage: The rule matches: 21 out of 22 rows remaining for target value: '2'. This is:
    95.455% of remaining rows for target value: '2'
    43.750% of total rows for target value: 2
    11.798% of total rows in data

    For each rule, we see both the support and the coverage.

    The support indicates how many rows support the rule; that is: of the rows where the rule can be applied, in how many is it true. The first rule here is:

    proline = High AND alcohol = High
    Support: the target has value: '0' for 100.000% of the 39 rows matching the rule

    This indicates that of the 39 rows where proline = High (the feature proline has a high numeric value) and alcohol is High (the features alcohol has a high numeric value), for 100% of them, the target it 0.

    The coverage indicates how many rows the rule covers. For the first rule, this is:

    Coverage: the rule matches: 39 out of 59 rows for target value: 0. This is:
    66.102% of total rows for target value: 0
    21.910% of total rows in data

    This indicates the coverage both in terms of row count and as a percent of the rows in the data.

    Example Generating Predictions

    To create predictions, we simply call predict() passing a dataframe with the same features as the dataframe used to fit the model (though the target column may optionally be omitted, as in this example).

    y_pred = prism.predict(df.drop(columns=['Y']))

    In this way, PRISM rules may be used equivalently to any other predictive model, such as Decision Trees, Random Forests, XGBoost, and so on.

    However, while generating predictions, some rows may match no rules. In this case, by default, the most common value in the target column during training (which can be seen accessing prism.default_target) will be used. The predict() method also supports a parameter, leave_unknown. If this is set to True, then any records not matching any rules will be set to “NO PREDICTION”. In this case, the predictions will be returned as a string type, even if the original target column was numeric.

    Further examples are provided in the sample notebooks.

    Example with Numeric Data

    In this example, we use sklearn’s make_classification() method to create numeric data (other than the target column), which is then binned. This uses the default of three bins per numeric feature.

    x, y = make_classification(
    n_samples=1000,
    n_features=20,
    n_informative=2,
    n_redundant=2,
    n_repeated=0,
    n_classes=2,
    n_clusters_per_class=1,
    class_sep=2,
    flip_y=0,
    random_state=0
    )

    df = pd.DataFrame(x)
    df['Y'] = y
    prism = PrismRules()
    _ = prism.get_prism_rules(df, 'Y')

    Results

    The data is binned into low, medium, and high values for each column. The results are a set of rules per target class.

    Target: 0
    1 = High
    Support: the target has value: '0' for 100.000% of the 333 rows matching the rule
    Coverage: the rule matches: 333 out of 500 rows for target value: 0. This is:
    66.600% of total rows for target value: 0
    33.300% of total rows in data

    15 = Low AND 4 = Med
    Support: The target has value: '0' for 100.000% of the 63 remaining rows matching the rule
    Coverage: The rule matches: 63 out of 167 rows remaining for target value: '0'. This is:
    37.725% of remaining rows for target value: '0'
    12.600% of total rows for target value: 0
    6.300% of total rows in data

    4 = High AND 1 = Med
    Support: The target has value: '0' for 100.000% of the 47 remaining rows matching the rule
    Coverage: The rule matches: 47 out of 104 rows remaining for target value: '0'. This is:
    45.192% of remaining rows for target value: '0'
    9.400% of total rows for target value: 0
    4.700% of total rows in data

    Example from the book Principles of Data Mining

    This example is provided in one of the sample notebooks on the github page.

    PRISM generated three rules:

    • IF tears = 1 THEN Target=3
    • IF astig = 1 AND tears = 2 and specRX = 2 THEN Target=2
    • If astig = 2 and tears = 2 AND specRX =1 THEN Target =1

    Execution Time

    The algorithm is generally able to produce a set of rules in seconds or minutes, but if it is necessary to decrease the execution time of the algorithm, a sample of the data may be used in lieu of the full dataset. The algorithm generally works quite well on samples of the data, as the model is looking for general patterns as opposed to exceptions, and the patterns will be present in any sufficiently large sample.

    Further notes on tuning the model are provided on the github page.

    Conclusions

    Unfortunately, there are relatively few options available today for interpretable predictive models. As well, no one interpretable model will be sufficiently accurate or sufficiently interpretable for all datasets. Consequently, where interpretability is important, it can be worth testing multiple interpretable models, including Decision Trees, other rules-induction tools, GAMs, ikNN, Additive Decision Trees, and PRISM rules.

    PRISM Rules very often generates clean, interpretable rules, and often a high level of accuracy, though this will vary from project to project. Some tuning is necessary, though similar to other predictive models.

    References

    [1] Chendrowska, J. (1987) PRISM: An Algorithm for Inducing Modular Rules. International Journal of Man-Machine Studies, vol 27, pp. 349–370.

    [2] Chendrowska, J. (1990) Knowledge Acquisition for Expert Systems: Inducing Modular Rules from Examples. PhD Thesis, The Open University.

    [3] Bramer, M. (2007) Principles of Data Mining, Springer Press.


    PRISM-Rules in Python was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story.

    Originally appeared here:
    PRISM-Rules in Python

    Go Here to Read this Fast! PRISM-Rules in Python

  • How Does an Image-Text Foundation Model Work

    Wei Yi

    Learn how an image-text multi-modality model can perform image classification, image retrieval, and image captioning

    Originally appeared here:
    How Does an Image-Text Foundation Model Work

    Go Here to Read this Fast! How Does an Image-Text Foundation Model Work

  • Performance Insights from Sigma Rule Detections in Spark Streaming

    Performance Insights from Sigma Rule Detections in Spark Streaming

    Jean-Claude Cote

    Utilizing Sigma rules for anomaly detection in cybersecurity logs: A study on performance optimization

    Photo by Ed Vazquez on Unsplash

    One of the roles of the Canadian Centre for Cyber Security (CCCS) is to detect anomalies and issue mitigations as quickly as possible.

    While putting our Sigma rule detections into production, we made an interesting observation in our Spark streaming application. Running a single large SQL statement expressing 1000 Sigma detection rules was slower than running five separate queries, each applying 200 Sigma rules. This was surprising, as running five queries forces Spark to read the source data five times rather than once. For further details, please refer to our series of articles:

    Anomaly Detection using Sigma Rules (Part 1): Leveraging Spark SQL Streaming

    Given the vast amount of telemetry data and detection rules we need to execute, every gain in performance yields significant cost savings. Therefore, we decided to investigate this peculiar observation, aiming to explain it and potentially discover additional opportunities to improve performance. We learned a few things along the way and wanted to share them with the broader community.

    Introduction

    Our hunch was that we were reaching a limit in Spark’s code generation. So, a little background on this topic is required. In 2014, Spark introduced code generation to evaluate expressions of the form (id > 1 and id > 2) and (id < 1000 or (id + id) = 12). This article from Databricks explains it very well: Exciting Performance Improvements on the Horizon for Spark SQL

    Two years later, Spark introduced Whole-Stage Code Generation. This optimization merges multiple operators together into a single Java function. Like expression code generation, Whole-Stage Code Generation eliminates virtual function calls and leverages CPU registers for intermediate data. However, rather than being at the expression level, it is applied at the operator level. Operators are the nodes in an execution plan. To find out more, read Apache Spark as a Compiler: Joining a Billion Rows per Second on a Laptop

    To summarize these articles, let’s generate the plan for this simple query:

    explain codegen
    select
    id,
    (id > 1 and id > 2) and (id < 1000 or (id + id) = 12) as test
    from
    range(0, 10000, 1, 32)

    In this simple query, we are using two operators: Range to generate rows and Select to perform a projection. We see these operators in the query’s physical plan. Notice the asterisk (*) beside the nodes and their associated [codegen id : 1]. This indicates that these two operators were merged into a single Java function using Whole-Stage Code Generation.

    |== Physical Plan ==
    * Project (2)
    +- * Range (1)


    (1) Range [codegen id : 1]
    Output [1]: [id#36167L]
    Arguments: Range (0, 10000, step=1, splits=Some(32))

    (2) Project [codegen id : 1]
    Output [2]: [id#36167L, (((id#36167L > 1) AND (id#36167L > 2)) AND ((id#36167L < 1000) OR ((id#36167L + id#36167L) = 12))) AS test#36161]
    Input [1]: [id#36167L]

    The generated code clearly shows the two operators being merged.

    Generated code:
    /* 001 */ public Object generate(Object[] references) {
    /* 002 */ return new GeneratedIteratorForCodegenStage1(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ // codegenStageId=1
    /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
    /* 007 */ private Object[] references;
    /* 008 */ private scala.collection.Iterator[] inputs;
    /* 009 */ private boolean range_initRange_0;
    /* 010 */ private long range_nextIndex_0;
    /* 011 */ private TaskContext range_taskContext_0;
    /* 012 */ private InputMetrics range_inputMetrics_0;
    /* 013 */ private long range_batchEnd_0;
    /* 014 */ private long range_numElementsTodo_0;
    /* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[3];
    /* 016 */
    /* 017 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
    /* 018 */ this.references = references;
    /* 019 */ }
    /* 020 */
    /* 021 */ public void init(int index, scala.collection.Iterator[] inputs) {
    /* 022 */ partitionIndex = index;
    /* 023 */ this.inputs = inputs;
    /* 024 */
    /* 025 */ range_taskContext_0 = TaskContext.get();
    /* 026 */ range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics();
    /* 027 */ range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 028 */ range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 029 */ range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
    /* 030 */
    /* 031 */ }
    /* 032 */
    /* 033 */ private void project_doConsume_0(long project_expr_0_0) throws java.io.IOException {
    /* 034 */ // common sub-expressions
    /* 035 */
    /* 036 */ boolean project_value_4 = false;
    /* 037 */ project_value_4 = project_expr_0_0 > 1L;
    /* 038 */ boolean project_value_3 = false;
    /* 039 */
    /* 040 */ if (project_value_4) {
    /* 041 */ boolean project_value_7 = false;
    /* 042 */ project_value_7 = project_expr_0_0 > 2L;
    /* 043 */ project_value_3 = project_value_7;
    /* 044 */ }
    /* 045 */ boolean project_value_2 = false;
    /* 046 */
    /* 047 */ if (project_value_3) {
    /* 048 */ boolean project_value_11 = false;
    /* 049 */ project_value_11 = project_expr_0_0 < 1000L;
    /* 050 */ boolean project_value_10 = true;
    /* 051 */
    /* 052 */ if (!project_value_11) {
    /* 053 */ long project_value_15 = -1L;
    /* 054 */
    /* 055 */ project_value_15 = project_expr_0_0 + project_expr_0_0;
    /* 056 */
    /* 057 */ boolean project_value_14 = false;
    /* 058 */ project_value_14 = project_value_15 == 12L;
    /* 059 */ project_value_10 = project_value_14;
    /* 060 */ }
    /* 061 */ project_value_2 = project_value_10;
    /* 062 */ }
    /* 063 */ range_mutableStateArray_0[2].reset();
    /* 064 */
    /* 065 */ range_mutableStateArray_0[2].write(0, project_expr_0_0);
    /* 066 */
    /* 067 */ range_mutableStateArray_0[2].write(1, project_value_2);
    /* 068 */ append((range_mutableStateArray_0[2].getRow()));
    /* 069 */
    /* 070 */ }
    /* 071 */
    /* 072 */ private void initRange(int idx) {
    /* 073 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
    /* 074 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(32L);
    /* 075 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(10000L);
    /* 076 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
    /* 077 */ java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
    /* 078 */ long partitionEnd;
    /* 079 */
    /* 080 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
    /* 081 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
    /* 082 */ range_nextIndex_0 = Long.MAX_VALUE;
    /* 083 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
    /* 084 */ range_nextIndex_0 = Long.MIN_VALUE;
    /* 085 */ } else {
    /* 086 */ range_nextIndex_0 = st.longValue();
    /* 087 */ }
    /* 088 */ range_batchEnd_0 = range_nextIndex_0;
    /* 089 */
    /* 090 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
    /* 091 */ .multiply(step).add(start);
    /* 092 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
    /* 093 */ partitionEnd = Long.MAX_VALUE;
    /* 094 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
    /* 095 */ partitionEnd = Long.MIN_VALUE;
    /* 096 */ } else {
    /* 097 */ partitionEnd = end.longValue();
    /* 098 */ }
    /* 099 */
    /* 100 */ java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
    /* 101 */ java.math.BigInteger.valueOf(range_nextIndex_0));
    /* 102 */ range_numElementsTodo_0 = startToEnd.divide(step).longValue();
    /* 103 */ if (range_numElementsTodo_0 < 0) {
    /* 104 */ range_numElementsTodo_0 = 0;
    /* 105 */ } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
    /* 106 */ range_numElementsTodo_0++;
    /* 107 */ }
    /* 108 */ }
    /* 109 */
    /* 110 */ protected void processNext() throws java.io.IOException {
    /* 111 */ // initialize Range
    /* 112 */ if (!range_initRange_0) {
    /* 113 */ range_initRange_0 = true;
    /* 114 */ initRange(partitionIndex);
    /* 115 */ }
    /* 116 */
    /* 117 */ while (true) {
    /* 118 */ if (range_nextIndex_0 == range_batchEnd_0) {
    /* 119 */ long range_nextBatchTodo_0;
    /* 120 */ if (range_numElementsTodo_0 > 1000L) {
    /* 121 */ range_nextBatchTodo_0 = 1000L;
    /* 122 */ range_numElementsTodo_0 -= 1000L;
    /* 123 */ } else {
    /* 124 */ range_nextBatchTodo_0 = range_numElementsTodo_0;
    /* 125 */ range_numElementsTodo_0 = 0;
    /* 126 */ if (range_nextBatchTodo_0 == 0) break;
    /* 127 */ }
    /* 128 */ range_batchEnd_0 += range_nextBatchTodo_0 * 1L;
    /* 129 */ }
    /* 130 */
    /* 131 */ int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L);
    /* 132 */ for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) {
    /* 133 */ long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0;
    /* 134 */
    /* 135 */ project_doConsume_0(range_value_0);
    /* 136 */
    /* 137 */ if (shouldStop()) {
    /* 138 */ range_nextIndex_0 = range_value_0 + 1L;
    /* 139 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localIdx_0 + 1);
    /* 140 */ range_inputMetrics_0.incRecordsRead(range_localIdx_0 + 1);
    /* 141 */ return;
    /* 142 */ }
    /* 143 */
    /* 144 */ }
    /* 145 */ range_nextIndex_0 = range_batchEnd_0;
    /* 146 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0);
    /* 147 */ range_inputMetrics_0.incRecordsRead(range_localEnd_0);
    /* 148 */ range_taskContext_0.killTaskIfInterrupted();
    /* 149 */ }
    /* 150 */ }
    /* 151 */
    /* 152 */ }

    The project_doConsume_0 function contains the code to evaluate (id > 1 and id > 2) and (id < 1000 or (id + id) = 12). Notice how this code is generated to evaluate this specific expression. This is an illustration of expression code generation.

    The whole class is an operator with a processNext method. This generated operator performs both the Projection and the Range operations. Inside the while loop at line 117, we see the code to produce rows and a specific call (not a virtual function) to project_doConsume_0. This illustrates what Whole-Stage Code Generation does.

    Breaking Down the Performance

    Now that we have a better understanding of Spark’s code generation, let’s try to explain why breaking a query doing 1000 Sigma rules into smaller ones performs better. Let’s consider a SQL statement that evaluates two Sigma rules. These rules are straightforward: Rule1 matches events with an Imagepath ending in ‘schtask.exe’, and Rule2 matches an Imagepath starting with ‘d:’.


    select /* #3 */
    Imagepath,
    CommandLine,
    PID,
    map_keys(map_filter(results_map, (k,v) -> v = TRUE)) as matching_rules
    from (
    select /* #2 */
    *,
    map('rule1', rule1, 'rule2', rule2) as results_map
    from (
    select /* #1 */
    *,
    (lower_Imagepath like '%schtasks.exe') as rule1,
    (lower_Imagepath like 'd:%') as rule2
    from (
    select
    lower(PID) as lower_PID,
    lower(CommandLine) as lower_CommandLine,
    lower(Imagepath) as lower_Imagepath,
    *
    from (
    select
    uuid() as PID,
    uuid() as CommandLine,
    uuid() as Imagepath,
    id
    from
    range(0, 10000, 1, 32)
    )
    )
    )
    )

    The select labeled #1 performs the detections and stores the results in new columns named rule1 and rule2. Select #2 regroups these columns under a single results_map, and finally select #3 transforms the map into an array of matching rules. It uses map_filter to keep only the entries of rules that actually matched, and then map_keys is used to convert the map entries into a list of matching rule names.

    Let’s print out the Spark execution plan for this query:


    == Physical Plan ==
    Project (4)
    +- * Project (3)
    +- * Project (2)
    +- * Range (1)

    ...

    (4) Project
    Output [4]: [Imagepath#2, CommandLine#1, PID#0, map_keys(map_filter(map(rule1, EndsWith(lower_Imagepath#5, schtasks.exe), rule2, StartsWith(lower_Imagepath#5, d:)), lambdafunction(lambda v#12, lambda k#11, lambda v#12, false))) AS matching_rules#9]
    Input [4]: [lower_Imagepath#5, PID#0, CommandLine#1, Imagepath#2]

    Notice that node Project (4) is not code generated. Node 4 has a lambda function, does it prevent whole stage code generation? More on this later.

    This query is not quite what we want. We would like to produce a table of events with a column indicating the rule that was matched. Something like this:

    +--------------------+--------------------+--------------------+--------------+
    | Imagepath| CommandLine| PID| matched_rule|
    +--------------------+--------------------+--------------------+--------------+
    |09401675-dc09-4d0...|6b8759ee-b55a-486...|44dbd1ec-b4e0-488...| rule1|
    |e2b4a0fd-7b88-417...|46dd084d-f5b0-4d7...|60111cf8-069e-4b8...| rule1|
    |1843ee7a-a908-400...|d1105cec-05ef-4ee...|6046509a-191d-432...| rule2|
    +--------------------+--------------------+--------------------+--------------+

    That’s easy enough. We just need to explode the matching_rules column.


    select
    Imagepath,
    CommandLine,
    PID,
    matched_rule
    from (
    select
    *,
    explode(matching_rules) as matched_rule
    from (
    /* original statement */
    )
    )

    This produces two additional operators: Generate (6) and Project (7). However, there is also a new Filter (3).

    == Physical Plan ==
    * Project (7)
    +- * Generate (6)
    +- Project (5)
    +- * Project (4)
    +- Filter (3)
    +- * Project (2)
    +- * Range (1)

    ...

    (3) Filter
    Input [3]: [PID#34, CommandLine#35, Imagepath#36]
    Condition : (size(map_keys(map_filter(map(rule1, EndsWith(lower(Imagepath#36),
    schtasks.exe), rule2, StartsWith(lower(Imagepath#36), d:)),
    lambdafunction(lambda v#47, lambda k#46, lambda v#47, false))), true) > 0)
    ...

    (6) Generate [codegen id : 3]
    Input [4]: [PID#34, CommandLine#35, Imagepath#36, matching_rules#43]
    Arguments: explode(matching_rules#43), [PID#34, CommandLine#35, Imagepath#36], false, [matched_rule#48]

    (7) Project [codegen id : 3]
    Output [4]: [Imagepath#36, CommandLine#35, PID#34, matched_rule#48]
    Input [4]: [PID#34, CommandLine#35, Imagepath#36, matched_rule#48]

    The explode function generates rows for every element in the array. When the array is empty, explode does not produce any rows, effectively filtering out rows where the array is empty.

    Spark has an optimization rule that detects the explode function and produces this additional condition. The filter is an attempt by Spark to short-circuit processing as much as possible. The source code for this rule, named org.apache.spark.sql.catalyst.optimizer.InferFiltersFromGenerate, explains it like this:

    Infers filters from Generate, such that rows that would have been removed by this Generate can be removed earlier — before joins and in data sources.

    For more details on how Spark optimizes execution plans please refer to David Vrba’s article Mastering Query Plans in Spark 3.0

    Another question arises: do we benefit from this additional filter? Notice this additional filter is not whole-stage code generated either, presumably because of the lambda function. Let’s try to express the same query but without using a lambda function.

    Instead, we can put the rule results in a map, explode the map, and filter out the rows, thereby bypassing the need for map_filter.


    select
    Imagepath,
    CommandLine,
    PID,
    matched_rule
    from (
    select
    *
    from (
    select
    *,
    explode(results_map) as (matched_rule, matched_result)
    from (
    /* original statement */
    )
    )
    where
    matched_result = TRUE
    )

    The select #3 operation explodes the map into two new columns. The matched_rule column will hold the key, representing the rule name, while the matched_result column will contain the result of the detection test. To filter the rows, we simply keep only those with a positive matched_result.

    The physical plan indicates that all nodes are whole-stage code generated into a single Java function, which is promising.


    == Physical Plan ==
    * Project (8)
    +- * Filter (7)
    +- * Generate (6)
    +- * Project (5)
    +- * Project (4)
    +- * Filter (3)
    +- * Project (2)
    +- * Range (1)

    Let’s conduct some tests to compare the performance of the query using map_filter and the one using explode then filter.

    We ran these tests on a machine with 4 CPUs. We generated 1 million rows, each with 100 rules, and each rule evaluating 5 expressions. These tests were run 5 times.

    On average

    • map_filter took 42.6 seconds
    • explode_then_filter took 51.2 seconds

    So, map_filter is slightly faster even though it’s not using whole-stage code generation.

    However, in our production query, we execute many more Sigma rules — a total of 1000 rules. This includes 29 regex expressions, 529 equals, 115 starts-with, 2352 ends-with, and 5838 contains expressions. Let’s test our query again, but this time with a slight increase in the number of expressions, using 7 instead of 5 per rule. Upon doing this, we encountered an error in our logs:

    Caused by: org.codehaus.commons.compiler.InternalCompilerException: Code grows beyond 64 KB

    We tried increasing spark.sql.codegen.maxFields and spark.sql.codegen.hugeMethodLimit, but fundamentally, Java classes have a function size limit of 64 KB. Additionally, the JVM JIT compiler limits itself to compiling functions smaller than 8 KB.

    However, the query still runs fine because Spark falls back to the Volcano execution model for certain parts of the plan. WholeStageCodeGen is just an optimization after all.

    Running the same test as before but with 7 expressions per rule rather than 5, explode_then_filter is much faster than map_filter.

    • map_filter took 68.3 seconds
    • explode_then_filter took 15.8 seconds

    Increasing the number of expressions causes parts of the explode_then_filter to no longer be whole-stage code generated. In particular, the Filter operator introduced by the rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromGenerate is too big to be incorporated into whole-stage code generation. Let’s see what happens if we exclude the InferFiltersFromGenerate rule:

    spark.sql("SET spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.InferFiltersFromGenerate")

    As expected, the physical plan of both queries no longer has an additional Filter operator.


    == Physical Plan ==
    * Project (6)
    +- * Generate (5)
    +- Project (4)
    +- * Project (3)
    +- * Project (2)
    +- * Range (1)



    == Physical Plan ==
    * Project (7)
    +- * Filter (6)
    +- * Generate (5)
    +- * Project (4)
    +- * Project (3)
    +- * Project (2)
    +- * Range (1)

    Removing the rule indeed had a significant impact on performance:

    • map_filter took 22.49 seconds
    • explode_then_filter took 4.08 seconds

    Both queries benefited greatly from removing the rule. Given the improved performance, we decided to increase the number of Sigma rules to 500 and the complexity to 21 expressions:

    Results:

    • map_filter took 195.0 seconds
    • explode_then_filter took 25.09 seconds

    Despite the increased complexity, both queries still deliver pretty good performance, with explode_then_filter significantly outperforming map_filter.

    It’s interesting to explore the different aspects of code generation employed by Spark. While we may not currently benefit from whole-stage code generation, we can still gain advantages from expression generation.

    Expression generation doesn’t face the same limitations as whole-stage code generation. Very large expression trees can be broken into smaller ones, and Spark’s spark.sql.codegen.methodSplitThreshold controls how these are broken up. Although we experimented with this property, we didn’t observe significant improvements. The default setting seems satisfactory.

    Spark provides a debugging property named spark.sql.codegen.factoryMode, which can be set to FALLBACK, CODEGEN_ONLY, or NO_CODEGEN. We can turn off expression code generation by setting spark.sql.codegen.factoryMode=NO_CODEGEN, which results in a drastic performance degradation:

    With 500 rules and 21 expressions:

    • map_filter took 1581 seconds
    • explode_then_filter took 122.31 seconds.

    Even though not all operators participate in whole-stage code generation, we still observe significant benefits from expression code generation.

    The Results

    Image by author

    With our best case of 25.1 seconds to evaluate 10,500 expressions on 1 million rows, we achieve a very respectable rate of 104 million expressions per second per CPU.

    The takeaway from this study is that when evaluating a large number of expressions, we benefit from converting our queries that use map_filter to ones using an explode then filter approach. Additionally, the org.apache.spark.sql.catalyst.optimizer.InferFiltersFromGenerate rule does not seem beneficial in our use case, so we should exclude that rule from our queries.

    Does it Explain our Initial Observations?

    Implementing these lessons learned in our production jobs yielded significant benefits. However, even after these optimizations, splitting the large query into multiple smaller ones continued to provide advantages. Upon further investigation, we discovered that this was not solely due to code generation but rather a simpler explanation.

    Spark streaming operates by running a micro-batch to completion and then checkpoints its progress before starting a new micro-batch.

    During each micro-batch, Spark has to complete all its tasks, typically 200. However, not all tasks are created equal. Spark employs a round-robin strategy to distribute rows among these tasks. So, on occasion, some tasks can contain events with large attributes, for example, a very large command line, causing certain tasks to finish quickly while others take much longer. For example here the distribution of a micro-batch task execution time. The median task time is 14 seconds. However, the worst straggler is 1.6 minutes!

    Image by author

    This indeed sheds light on a different phenomenon. The fact that Spark waits on a few straggler tasks during each micro-batch leaves many CPUs idle, which explains why splitting the large query into multiple smaller ones resulted in faster overall performance.

    This picture shows 5 smaller queries running in parallel inside the same Spark application. Batch3 is waiting on a straggler task while the other queries keep progressing.

    Image by author

    During these periods of waiting, Spark can utilize the idle CPUs to tackle other queries, thereby maximizing resource utilization and overall throughput.

    Conclusion

    In this article, we provided an overview of Spark’s code generation process and discussed how built-in optimizations may not always yield desirable results. Additionally, we demonstrated that refactoring a query from using lambda functions to one utilizing a simple explode operation resulted in performance improvements. Finally, we concluded that while splitting a large statement did lead to performance boosts, the primary factor driving these gains was the execution topology rather than the queries themselves.


    Performance Insights from Sigma Rule Detections in Spark Streaming was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story.

    Originally appeared here:
    Performance Insights from Sigma Rule Detections in Spark Streaming

    Go Here to Read this Fast! Performance Insights from Sigma Rule Detections in Spark Streaming