Chapter 6. Scalability and Distributed Training

The examples that we have seen until now in the previous chapters are considered toy examples and relatively simple. They can fit and run in the memory and compute constraints of a single machine. Most enterprises have larger datasets and more complex requirements that need to scale bigger than one machine. In this chapter, we will look at architecture and techniques that will help enterprises scale Snorkel.

When we think about scaling Snorkel, we essentially are looking to run labeling functions distributed across several machines, typically as part of a cluster.

In this chapter, we will start by exploring Apache Spark as the core set of technologies that allow us to scale Snorkel. Instead of custom engineering a solution that would need to factor in the infrastructure and plumbing needed to scale, Apache Spark can do this out of the box and is already a popular option when it comes to big-data and production deployments within enterprises.

We will use the NLP-based Fake news example from Chapter 3 and see how we can scale it with Apache Spark. During this journey, we will understand the code and design changes that are needed to allow us to achieve a Fake news implementation that we can distribute and deploy at a massive scale.

Bigger models create pressure on the underlying systems for training and inference, which is where we learn from the experience of other well-established software engineering paradigms of high-performance computing (HPC) and web-scale architecture.

HPC is the study and use of very fast supercomputers using parallel processing techniques, running on multiple nodes. This allows for solving complex computational challenges. HPC typically would include both the engineering, and the operational aspects to build, deploy, and manage these instances. We will touch on some of these system architectures, but leave the details as an exercise for you to explore.

Before we get into details of Apache Spark and explore how we can use that to help scale Snorkel, let us discuss the need for scalability and distributed training.

The need for scalability

Weak Supervision enables ML practitioners to generate massive training datasets, and in turn, build better ML and higher quality ML models. Model training routines, therefore, need to be able to scale to incorporate and handle as much training data as possible. As outlined in Chapter 1, the memory and compute resources requirements to programmatically label and training models on massive training sets mean the data cannot fit on one machine. In other words, some kind of scaling is needed. Commonly for scaling there are two options - horizontal and vertical scaling as shown in figure Figure 6-1.

Vertical scaling (also known as scaling up) is adding more hardware to existing infrastructure, often using a more powerful machine than the one we might be using right now, allowing the system to handle more load. Scaling up will work for some time until you hit the largest machine available on your platform. Currently, most public cloud providers offer single servers with up to 120 virtual CPUs and 448 GBs of RAM (such as Azure’s HBv3-series).

Horizontal scaling, or scaling out is adding new machines (i.e. nodes) to the infrastructure allowing the system to handle more load by distributing the additional load across a number of these nodes, typically managed as part of a cluster. For highly parallelizable workloads like applying labeling functions to a partitioned dataset, there is almost no limit to scaling horizontally as you can continue adding commodity servers as compute nodes.

Horizontal vs Vertical scaling
Figure 6-1. Horizontal vs Vertical scaling

When scaling out big-data workloads, we have two sets of layers - the storage layer and the compute layer. Whilst in principle, these both layers don’t have to be interdependent, practically they are correlated and need to be considered in the context of each other.

The storage layer deals with the actual distributed storage (or distributed file system) and associated I/O constraints and trade-offs across multiple nodes, and subsystems. The compute layer on the other hand is where data is processed when loaded. When scaling Weak Supervision, both of these layers become critical and possibly contention points.

In the context of machine learning, the scalability attributes between training and prediction (i.e. inference) are often different. For training, typically we need to load the entire training data in memory making vertical scaling the simplest option.

For inference, on the other hand, we only need batches of data instances loaded at a time. Modern frameworks support distributed training routines and training on streaming datasets, making it possible to train on horizontally scaled architectures at the cost of infrastructural complexity. We take a closer look in the next section.

Distributed training

In distributed training, the workload to train a model is split up and shared among multiple nodes (i.e machines), called worker nodes. These worker nodes work in parallel to speed up model training.

Distributed training can be used for traditional ML models, but is better suited for compute and time-intensive tasks, like training deep neural networks, data programming, and generating weak labels.

Applying labeling functions to large datasets lends itself naturally to horizontally scaled infrastructure. Each labeling function runs independently of the other, with the output collated back.

There are two fundamental approaches - data parallelism, or model parallelism - outlined below.

  • Data parallelism - For a compute cluster the data is divided into chunks (i.e partitions) that are correlated with the available worker nodes in that cluster. Each node gets a copy of the model and operates on its subset of the data, and computes its predictions and errors. To get a consistent single model, the parameters and gradients from each node need to be synchronized across the cluster.

  • Model parallelism - different segments of a model run in parallel on different nodes, and each operates on the same data. It is important to note that the models are being evaluated concurrently, and the order of evaluation should not matter. The algorithm’s ability to parallelize tasks dictates the scalability of this approach. Fundamentally model parallelism is more complex to implement than data parallelism.

Note

We cannot think about model parallelism, without understanding the details of parallel algorithms and parallel programming. Each of these topics is complex in its own right, with books written on the subject and getting into details of these topics outside of the scope of this book.

In addition, to avoid confusion, and keep to the topic of weak supervision, we also do not outline Hybrid parallelism, where we combine the different schemes of data and model parallelism and try and get the best of both worlds.

Demystifying Parallel and Distributed Deep Learning (Ben-Nun, Hoefler 2018) and Integrated Model, Batch and Domain Parallelism in Training Neural Networks (Gholami, Azad, et al 2018), Efficient Large-Scale Language Model Training on GPU Clusters (Narayanan, Phanishayee, et al 2021) are some of the suggested reading material for readers who are curious about the details on these topics when thinking about distributed training and parallelism.

In this chapter, we will focus on data parallelism. We will use Apache Spark as the runtime that allows us to do implement data parallelism. To do this, we need to understand a little bit of Spark’s design principles and architecture. In the next section, we will look at Apache Spark, and how the Spark runtime can help us scale weak supervision within an enterprise.

Apache Spark - An Introduction

Apache Spark is a unified analytics engine for large-scale compute and data processing, that can be deployed both on-premise or in the cloud. Spark (Zaharia, Xin, Wendell, et al 2016) was inspired by Google’s MapReduce implementation (Dean, Ghemawat 2004), and whilst is designed for large-scale data processing, it has been extended to support streaming and real-time implementations.

Apache Spark is an ideal runtime for scaling up data programming and ML, as it provides in-memory storage for intermediate computations, and a composable API design. This composable API design allows us to incorporate different ML libraries, interact with real-time data and graph processing engines.

Spark has the following four principles that form the cornerstone of weak supervision scalability design:

  • Speed - Spark features in-memory implementation and optimization across multiple cores and efficient parallel processing, multi-threading, and a directed acyclic graph (DAG) based query optimization.

  • Ease of use - Spark is easy to use, as it abstracts out all the complexities of distributed computing into a logical data structure called Resilient Distributed Dataset (RDD). The RDD is an immutable distributed collection, where each object has a set of operations that support transformations and actions. The RDD is implemented across different programming languages and other higher-level data structures such as DataFrames and Datasets.

  • Modularity - Spark supports different programming languages and workloads that can be used consistently across these languages. One doesn’t need to switch programming languages or learn new APIs specific to a workload, the unified processing engine brings it together across SQL, R, Python, Java, and Scala to name a few.

  • Extensibility - Spark decouples the storage and computer, and focuses on the fast parallel computation engine. This decoupling allows for a rich ecosystem of third-party packages, supporting a myriad of different data sources from traditional RDBMS, Hadoop, MongoDB, Cassandra, etc., and the more recent cloud storage abstractions such as Azure Storage, AWS S3, Kinesis, etc. This extensibility is very helpful when using Snorkel we are writing labeling functions that incorporate varied unstructured data sources.

Unsurprisingly, Spark has become a popular choice among enterprises and developers who operate with big data. Not only does it allows us to tackle large data sets, but does all of this using the programming language and APIs that are familiar to many.

Note

Going deep with Apache Spark is outside the scope of this book. If you are interested in more details with Spark’s application stack and runtime, the authors of this book can highly recommend O’Reilly’s Learning Spark, 2nd Edition - having used in a personal capacity.

At a high level a typical Spark application stack has four distinct components (see Figure 6-2) that support the design principles outlined earlier and support the core features that we use when writing a Spark application. The APIs exposed via these components get converted into a DAG that is executed by the core engine.

Spark stack
Figure 6-2. Spark Stack

As we think about designing labeling functions with Snorkel, we need to be aware of Spark’s application constraint and runtime. In the next section, we will look at Spark’s runtime and application design - and how we can deploy and use it to scale.

Spark Application Design

Spark can run by itself, on one machine, or several machines as part of a cluster. A Spark application can run independently on one machine, or as part of a cluster. Spark has an out-of-the-box feature to manage clusters and also has the support to plug in other types of cluster managers such as YARN, Mesos, etc. The Spark application invokes a spark session (called a SparkSession) using a Spark driver.

For any Spark Application, the main entry point for execution is via the Spark driver. As shown in Figure 6-3 the Spark driver is responsible for all communication with the cluster - from requesting more resources, to executing operations on nodes. The SparkSession, in effect, is the unified conduit to all of the data, and operations that Spark supports.

SparkSession coordinates and transforms all the operations into underlying computations, and distributed and schedules those as tasks across the different nodes on the cluster. In this process, the SparkSession is using the Cluster manager to request the relevant resources (such as memory, CPU, etc.) needed.

Spark Application Architecture
Figure 6-3. Spark Application Architecture

As we called out earlier, in this chapter, we will use the NLP-based Fake news example from Chapter 3 and deploy that on Spark. And during that process, we will understand the design changes and implementation that allow us to run at scale.

We could deploy a cluster of many machines and configure Apache Spark (and the relevant clusters, networks, etc.) but that would take time and management resources. Instead, we opt to use a cloud instance of Spark running on Azure Databricks.

Databricks vs. Apache Spark

Apache Spark is an open-source distributed computing framework that is built on Scala and came out of the University of California, Berkeley.

Databricks is a company founded by the original creators of Apache Spark and is a commercial offering that adds additional features that are part of the paid offering. Databricks also has cloud-hosted options that make it very easy for most to start.

For a more detailed comparison between the two options, see the Spark and Databricks comparison.

In the next section, we will see how to set up Azure Databricks as our runtime and instance to deploy our example and show how we can scale Snorkel.

Using Azure Databricks to Scale

Azure Databricks is a cloud-hosted platform of Databricks that is integrated with the rest of Azure, supports one-click deployments, and a Databricks dashboard as shown in Figure 6-4.

Using a hosted Databricks implementation, helps us concentrate more on the task at hand of scaling Snorkel and weak supervision and less on the technical aspects of the cluster, nodes, network, security, etc. All of these are critical but are something we cannot get into the details in the context of this book.

Azure Databricks Overview
Figure 6-4. Azure Databricks Overview

Databricks is one of the options that we have out of the box to deploy on Azure. On one hand, deploying a new instance of Databricks using the Azure portal is quite straightforward, as Figure 6-5 shows the instance we spun up for this chapter on Azure. On the other hand, there are several steps and preparatory work needed to configure Databricks before we can use it for Snorkel.

Chapter 6 - Azure Databricks deployment
Figure 6-5. Chapter 6 - Azure Databricks deployment

To deploy our instance, all we need is an active Azure subscription. The quickstart guide shows a detailed step by step instruction on how to deploy and get going with Databricks on Azure.

Cloud options for Databricks

From our perspective, we are using Azure more for convenience, and other cloud providers would work. In addition to Azure, Databricks has other cloud-hosted options from the major cloud providers - AWS, Google, and, Alibaba. All of these allow to deployment, manage and scale of a data platform securely and conveniently.

Once deployed we will use the Databricks workspace as shown in Figure 6-6. This workspace is our entry point to Databricks and something we will use for most of our interactions with Databricks. In this interface, we will create a new Notebook (or import one from the book GitHub repository https://bit.ly/PracticalWeakSupervisionBook), spin up a new cluster to run our code, and set up different jobs to automate, among other things.

Note

Detailed and specific configuration options for Azure Databricks are out of the scope of this book. If Databricks is new to you, it is recommended to read the Azure Databricks documentation for details and best practices.

Azure Databricks Workspace
Figure 6-6. Azure Databricks Workspace

Cluster Setup for Weak Supervision

First, we need to configure a cluster. When configuring the cluster, in addition to node details we also need to outline the packages and related dependencies that need to be deployed on each node. As part of the cluster configuration, we need to choose the Databricks runtime version, a cluster policy, and behavior at runtime.

Figure 6-7 shows the configuration that we are using to run Snorkel and what we will use when labeling the Fake news dataset from Chapter 3.

For more details on all the configuration options on Azure Databricks see the Cluster configuration articles.

Azure Databricks Cluster configuration
Figure 6-7. Azure Databricks Cluster configuration

Cluster configuration

For us to be able to run and label the Fake news dataset on Spark, there are a few dependencies that are needed. As outlined earlier in the chapter, for a cluster to be able to scale up and down, these dependencies need to be consistent across all nodes.

To get our deployment on Spark, we will configure the libraries listed below. This configuration effectively is the same when we do a pip install on a single machine. These would install the latest versions available. If there is a dependency on a specific version, that can be outlined as shown in the TensorFlow library note (“TensorFlow library”).

  • nltk

  • snorkel

  • tldextract

  • bs4

  • elephas

  • keras

To install these specific libraries for the cluster, in the Libraries option we choose PyPI and enter the package name as shown in Figure 6-8. In our case, all of these are available in the default PiPI index and we don’t need a specific repository.

Install library for an Azure Databricks Cluster
Figure 6-8. Library installation for the cluster

Figure 6-9 shows the status of libraries for the cluster once everything is installed and ready to use.

Dependencies installed in the Azure Databricks Cluster
Figure 6-9. Library installation for the cluster

Be aware that the GUI does not validate the libraries for either compatibility or correctness. So, if there are any typos or incompatibilities, the library installer will try to install those incompatible or incorrect libraries and it will eventually fail. This can have an impact on the availability of some of the nodes as they get spun up.

TensorFlow library

In some cases, we had to explicitly install TensorFlow, even though this was already part of Databricks runtime and should not be explicitly required. The example below installs the CPU-optimized version 2.4 of TensorFlow on the cluster.

tensorflow-cpu==2.4.*

Fake news detection dataset on Databricks

Back in Chapter 3, we set up the Fake news detection (FakeNewsNet) as an example that we go running end-to-end on a single machine. Now, we’ll see how we can scale FakeNewsNet from one machine to a cluster running Databricks.

As a reminder, the fake news detection dataset contains three files - train, validation, and test. We are only using the training file (fnn_train.csv). At the time of writing, this file contains 15,212 records, and whilst this is labeled data, we will be ignoring that and treating it as if it wasn’t.

The FakeNewsNet dataset contains the following columns:

Table 6-1. FNN columns
Columns Description

id

an identifier for each sample, representing the PolitiFact website id for this article

date

the time of publishing

speaker

the person or organization to whom this statement is attributed

statement

the claim published by the speaker

sources

the sources that have been used to analyze each statement

paragraph_based_content

paragraphed from where the statement is taken

fullText_based_content

full text from where the paragraph got extracted

We will also be using the Fake news detection (LIAR) dataset from earlier. This dataset is collected by crawling PolitiFact. The labels used for the claims are from the six ratings in Truth-O-Meter:

Table 6-2. Truth-O-Meter ratings
Rating Description

TRUE

Statement is true.

MOSTLY TRUE

Statement is accurate but needs additional clarification.

HALF TRUE

Statement is only partially accurate.

MOSTLY FALSE

Statement ignores facts that might create a different impression.

FALSE

Statement is mostly inaccurate

PANTS ON FIRE

statement is inaccurate.

From a dataset perspective, the changes that are needed are minimal and we can reuse most things. Given we are not running this locally, we need to upload the FakeNewsNet and Liar dataset to the Databricks cluster.

In Spark, there are many ways to connect to different data sources. In our case, we will upload the data to the cluster using the GUI option that Spark offers as shown in Figure 6-10.

Note

If you don’t see the GUI option to upload the data, this is a setting that needs to be enabled in the advanced setting as part of the Admin console. See file upload interface documentation for details.

Uploading Fake news net training data to Databricks
Figure 6-10. Uploading fake news net (FNN) data to Databricks

We upload both the LIAR and the FNN datasets in their respective folders. Once completed, we should see the data and available to load in the cluster as shown in Figure 6-11

FNN and LIAR training data
Figure 6-11. FNN and LIAR training data

Labeling Functions for Snorkel

When it comes to the labeling functions themselves, we see there is no change from earlier to run them at scale with Spark. We also see the options that the classifiers will use to abstain remain the same.

ABSTAIN = -1
FAKE = 0
REAL = 1

Below is a table outlining the labeling function from earlier that we will reuse.

Table 6-3. The labeling functions:
Labeling Function Comments

label_rp()

Crowdsourcing from RealClear politics

label_wp()

Crowdsourcing using Washington Post

label_gb()

Crowdsourcing from Glenn Beck

label_snopes()

Snopes - Fact check ratings

label_politifact()

PolitiFact

factcheckqa()

FactCheck.org

factcheckafpqa()

AFP Fact Check

speaker()

Transfer learning with LIAR dataset

As a reminder, the factcheckqa() and factcheckafpqa() functions check FactCheck or AFP FactCheck sentiment score respectively, and return REAL if the sentiments are mostly positive, and FAKE if the the sentiments are mostly; ABSTAIN if otherwise.

def factcheck_sentiment(row, columnName):
    label = str(row[columnName])
    score = 0
    if(label):
        claims = label[1:-1].split(',')
        for claim in claims:
            print(claim)
            sentiment = sid.polarity_scores(claim)
            print(sentiment)
            if(sentiment["neg"] > sentiment["pos"]):
                score -=1
            elif(sentiment["pos"] > sentiment["neg"]):
                score +=1
        if(score > 0):
            return REAL
        elif (score < 0):
            return FAKE
        else:
            return ABSTAIN
    return ABSTAIN

@labeling_function()
def factcheckqa_sentiment(row):
    return factcheck_sentiment(row, "www.factcheck.org")

@labeling_function()
def factcheckafpqa_sentiment(row):
    return factcheck_sentiment(row, "factcheck.afp.com")
@labeling_function()
def label_snopes(row):
    label = str(row["www.snopes.com"])
    if label is not None:
        if ('real' in label):
            return REAL
        else:
            return FAKE
    else:
        return ABSTAIN
truth_o_meter = {
    "true": 4,
    "mostly-true": 3,
    "half-true": 2,
    "barely-true": 1,
    "mostly-false": -1,
    "false": -2,
    "pants-fire": -3
}

@labeling_function()
def label_politifact(row):
    total_score = 0
    labels = row["www.politifact.com"]
    if(labels):
        labels = str(row["www.politifact.com"]).split(',')
        # The last label has the newline character
        if(len(labels) > 0):
            labels[-1] = labels[-1][:-2]
        for label in labels:
            label = label.strip()
            if(label in truth_o_meter):
                total_score += truth_o_meter[label]
    if(total_score > 0):
        return REAL
    if(total_score < 0):
        return FAKE

    return ABSTAIN

Setting up dependencies

The first thing we need to do is set up dependencies including the packages needed. These are mostly the same from earlier, with a few exceptions needed in the Databricks runtime.

import pandas as pd
import nltk
import collections
import itertools
from collections import Counter
import tldextract
import numpy as np

# web
from urllib.request import Request, urlopen
from bs4 import BeautifulSoup
import time
import JSON

We also import the snorkel packages that are needed. The one difference we have is the use of SparkLFApplier. This is part of Snorkel’s labeling package and expects a Spark RDD as input. Unlike a Pandas Dataframe that is designed to run on one machine, a Spark RDD is designed to run on a cluster comprising of many nodes - allowing it to scale.

# snorkel
from snorkel.labeling import labeling_function
from snorkel.labeling.apply.spark import SparkLFApplier
from snorkel.labeling.model import MajorityLabelVoter
from snorkel.labeling import LFAnalysis
from snorkel.labeling.model import LabelModel

We also set up and initialize variables and sentiment analysis which is used by the fact-checking functions later.

# data filenames
TRAINING_dataFile = "/dbfs/FileStore/fnn/fnn_train.csv"

## training dataset
WORKING_dataFile = "/dbfs/FileStore/test/working_fnn_train.csv"
WORKING2_dataFile = "/dbfs/FileStore/test/working_processed_fnn_train.csv"
CLEANED_dataFile = "/dbfs/FileStore/test/working_processed_fnn_train_clean.csv"

TRAINED_dataFile = "/dbfs/FileStore/test/data_nlp.csv"

# Load the Liar dataset
LIAR_dataFile = "/dbfs/FileStore/liar/liar_train.csv" #training dataset

We also initialize the sentiment analysis package, which is used by some of the labeling functions.

#In some cases a call later will fail, and this is needed on Apache Spark
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.sentiment.vader import SentimentIntensityAnalyzer

nltk.download('vader_lexicon')
sid = SentimentIntensityAnalyzer()

Spark Dataframes vs. Pandas Dataframes

In addition to RDD’s (outlined earlier in this chapter), Spark also has a set of DataFrame APIs that are an extension to the RDD APIs. DataFrames represent data in a tabular fashion and are conceptually equivalent to a table in an RDMS, or a data frame in Python, or an excel spreadsheet. For those using Pandas, these DataFrame APIs would seem similar.

However, under the covers, both are quite different, and this similarity can be quite deceiving. Pandas dataframe are designed to work on one machine and with the constraints of memory and compute of that machine. On the other hand, a Spark dataframe is a distributed representation designed to scale across large clusters with petabytes of data and implements vastly different code generation and optimization techniques.

Pandas dataframe employs an eager execution, with the code executed immediately; Spark dataframes on the other implement a lazy execution system and don’t execute until needed. This results in vastly different behaviors such as with Pandas dataframes, the data is mutable; and with Spark dataframes the data is immutable. For example in Pandas we can use the [] operator, but in Spark we need to use .withColumn() method.

It is important to note, that even though the two sets of APIs are designed to be similar, but they are not interchangeable. When starting, if the intent is that Spark would be the runtime for a production deployment, it would make sense to start out using Spark locally. The code, and logic changes to account for immutability between the different runtime can be quite a challenge.

Loading the data

Now that we have wired up the dependencies, we are ready to load our data. This is already uploaded to the cluster as outlined earlier. We will use Spark’s file API to load the FNN data. We also need to specify the encoding, as we have double-wide characters in the data, that fail in Spark.

# Load the file
# Use pandas to read CSV
data = pd.read_csv(TRAINING_dataFile, encoding='utf-8')

# peek at the data
data.head(3)

Figure 6-12 shows a peek at the structure of the FNN dataset that was loaded.

FNN training dataset
Figure 6-12. FNN training dataset
Note

It might seem better to use spark’s built-in Dataframes to load the files and possibly load using spark.read.csv such as:

data = spark.read.csv(fakeNewsFile_training,
            header=True, inferSchema=True,
            multiLine=True, escape="\")
data.cache()

However, the FNN dataset, with its nested dictionaries breaks the parsing. The data is loaded as multiple rows. Specifically the two fields that get spilled into multiple rows are paragraph_based_content and fullText_based_content.

Pandas are better at parsing this and handling the idiocrasies as it pertains to this dataset.

One important distinction when using Pandas compared with Spark is the handling of null in the data. In Pandas this is expressed as a NaN, however, in Spark, this needs to be expressed as a None. This conversion from NaN to None is not automatic and is something we need to handle. Without this, when we apply labeling functions using Snorkel later will fail.

# Best to avoid pandas in spark but we trying to keep it consistent
# We replace the 'NaN' with 'None' for Spark
data = data.where(cond=data.notna(), other=None)

Handling Nulls

As we did earlier in Chapter 3, we use the label_fnn column, and based on its value convert it to a number. We need a number instead of a string for validation.

data["label_numeric"] = data.apply(
     lambda row: 1 if row["label_fnn"]=='real' else 0, axis=1)

#peek at the data
data.head(3)

Note the result we see in Figure 6-13 shows only a subset for brevity when printed. The output below however shows the structure of the dataframe.

Data columns (total 17 columns):
 #   Column                                     Non-Null Count  Dtype
---  ------                                     --------------  -----
 0   id                                         15212 non-null  int64
 1   date                                       15212 non-null  object
 2   speaker                                    15212 non-null  object
 3   statement                                  15212 non-null  object
 4   sources                                    15212 non-null  object
 5   paragraph_based_content                    15212 non-null  object
 6   fullText_based_content                     15212 non-null  object
 7   label_fnn                                  15212 non-null  object
 8   label_numeric                              15212 non-null  int64
 9   www.politifact.com                         4688 non-null   object
 10  www.snopes.com                             423 non-null    object
 11  www.twitter.com                            3 non-null      object
 12  www.factcheck.org                          461 non-null    object
 13  factcheck.afp.com                          21 non-null     object
 14  www.washingtonpost.com/news/fact-checker/  158 non-null    object
 15  www.realclearpolitics.com                  120 non-null    object
 16  www.glennbeck.com                          22 non-null     object
Converting label to numbers
Figure 6-13. Converting label to numbers

Fact-checking sites

The following remain the same as we saw in Chapter 3:

  • The fact-checking sites

  • The use of urllib package to handle the URLs and read the sites

  • The BeautifulSoup python package to parse the content of the site

As a reminder, the fact-checking sites we used earlier and are part of the list of sources are:

  • www.politifact.com

  • www.snopes.com

  • www.factcheck.org

  • factcheck.afp.com

  • www.washingtonpost.com/news/fact-checker/

  • www.realclearpolitics.com

  • www.glennbeck.com

We do need to tweak the fact_checking_sites() function to accommodate the need to handle nulls differently and incorporate None.

# contacts a URL downloads the website's content and parses it.
def get_parsed_html(url):
    req = Request(url, headers={'User-Agent': 'Mozilla/5.0'})
    webpage = urlopen(req).read()
    parsed_html = BeautifulSoup(webpage)
    return parsed_html

fact_checking_sites = {
    "www.politifact.com" : get_poitifact_image_alt,
    "www.snopes.com": get_snopes_image_alt,
    "www.twitter.com":  extract_twitter_name,
    "www.factcheck.org": get_factcheck_first_paragraph,
    "factcheck.afp.com": get_factcheck_afp_title,
    "www.washingtonpost.com/news/fact-checker/": None,
    "www.realclearpolitics.com": None,
    "www.glennbeck.com": None,
}

The functions that check specifics for each site haven’t changed from earlier. Below are a couple of examples that show this, but note we have not shown all of the functions again here for brevity.

# www.politifact.com
def get_poitifact_image_alt(url):
    result = "abstain"
    try:
        parsed_html = get_parsed_html(url)
        div = parsed_html.body.find('div', attrs={'class':'m-statement__meter'})
        result = div.find("img", attrs={'class':'c-image__original'})["alt"]
    except Exception as e:
        print(e)
    return result

 # www.twitter.com
 def extract_twitter_name(url):
    domain = "https://twitter.com/"
    sub = url[url.find(domain) + len(domain):]
    index = sub.find('/')
    if(index == -1):
        return sub
    else:
        return sub[:index]

Once all of the data is loaded, cleaned, and parsed, we only use data that has at least one label. The following filters out any rows that were null.

data2 = data[data["www.politifact.com"].notnull()
             | data["www.snopes.com"].notnull()
             | data["www.factcheck.org"].notnull()
             | data["factcheck.afp.com"].notnull()
             | data["www.realclearpolitics.com"].notnull()
             | data["www.glennbeck.com"].notnull()
             | data["www.washingtonpost.com/news/fact-checker/"].notnull()
             | data["www.twitter.com"].notnull()]

Transfer Learning using the LIAR dataset

Similar to earlier in Chapter 3, we use the LIAR dataset and crowdsourcing to understand patterns of fake vs real statements for speakers and use these patterns to rate future statements. The underlying premise being that speakers publishing a higher rate of statements that are thought to be false will have a greater likelihood of continuing to do when compared to those speakers that have a higher rate of statements that are classified as true.

We read the LIAR dataset that was uploaded earlier, and for Spark’s runtime address missing values and mark them as None.

# Load the Liar dataset
data3 = pd.read_csv(LIAR_dataFile)

# Clean up the NaN's
data3 = data3.where(cond=data3.notna(), other=None)

# Take a peek at the data to ensure all is correct
data3.head(3)
Preview of the
Figure 6-14. Preview of the “fake news detection (LIAR)” dataset

The labels of course are the claims that are the output of the Truth-O-Meter values. We only leverage the claims that are “true”, “mostly-true” and “false”, “pants-fire”. The labeling function is the same as earlier and if the speaker has a higher than 60% history of speaking falsehood, this labeling_function votes for the article to be FAKE. Vice-versa if the percentage of true claims is higher than 60%.

@labeling_function()
def speaker(row):
    speaker = row["speaker"]
    if(speaker in true_percent and true_percent[speaker] > 0.6):
        return REAL
    if(speaker in false_percent and false_percent[speaker] > 0.6):
        return FAKE
    return ABSTAIN

Weak classifiers - generating agreement

Snorkel uses a multi-task approach (where each task is a labeling function) and evaluates the effectiveness of each task by observing the conflicts between them. As such, the first step in using Snorkel and labeling the data would be to aggregate and resolve any conflicts on the votes from the labeling functions. We will start by splitting the data in a train and validation set with an 80-20% split respectively.

# Calculate the data size to split for an 80-20 ratio for training & testing
data_size = data.shape[0]
train_data_size = int(data_size*0.8)

We use a list of labeling_functions and store this in an array called lfs. To this dataset, we will use the SparkLFApplier, which will execute the labeling functions to evaluate a REAL, FAKE, or ABSTAIN vote for each of the labeling functions - on each data example.

There are a few important differences when Snorkel runs on Spark compared to earlier running Snorkel on a single machine. First, we need to use a different class - the SparkLFApplier instead of the PandasLFApplier. The SparkLFApplier needs a Spark RDD as an input where each row reflects the datapoint. And as we saw earlier in the chapter, a Spark and Pandas dataframes are quite different.

Before we can use the SparkLFApplier function we need to get a RDD from a Pandas dataframe. Whilst this might seem unusual, it is not that uncommon when situations change and we need to scale out to usig Spark.

One way to get an RDD from a pandas data frame is to follow this two-step process: First we convert to a spark Dataframe, which despite the name is very different from a Pandas dataframe. Then from that spark dataframe create an RDD. This resultant RDD is then passed into SparkLFApplier.apply() function that returns a matrix of labels, as they are emitted by the respective labeling functions.

data = data.sample(frac = 1, random_state=1)

lfs = [
        label_rp,
        label_wp,
        label_snopes,
        label_politifact,
        factcheckqa_sentiment,
        factcheckafpqa_sentiment,
        speaker
      ]

# Convert pandas DF's to Spark DF's and then to RDDs
rdd_data = spark.createDataFrame(data).rdd

# Split the data in an 80-20 ratio for train and test
df_train = data[:train_data_size]
df_train = df_train.where((pd.notnull(df_train)), None)

df_valid = data[train_data_size:]
df_valid = df_valid.where((pd.notnull(df_valid)), None)

# invoke Snorkel
applier = SparkLFApplier(lfs=lfs)
L_train = applier.apply(rdd_data)
LFAnalysis(L=L_train, lfs=lfs).lf_summary()

Creating an RDD in Spark

If you use one of the popular ways to create an RDD using the parallelize() function in SparkContext it will fail:

# This will create an RDD as expected, but
# it will fail when called with Snorkel later.
rdd_data = spark.sparkContext.parallelize(df_train)

Therefore we recommended using the createDataFrame() function, to create the RDD - and that will work with Snorkel and not fail.

# Creating an RDD using this will work.
rdd_data = spark.createDataFrame(data).rdd
rdd_train = spark.createDataFrame(df_train).rdd
rdd_valid = spark.createDataFrame(df_valid).rdd

When we run this in our cluster using the earlier data, we get the results as seen in figure Figure 6-15 below. To help understand these results of the output from the labeling functions, we can use the LFAnalysis function to analyze. LFAnalysis are tools from Snorkel that report how our labeling functions perform relative to each other and help us evaluate the performance of the labeling functions.

SparkLFApplier Result - Training Set
Figure 6-15. SparkLFApplier Result - Training set

As a reminder Coverage shows how many data points have at least one label. Overlaps are the percentage of data points with two or more non-abstain labels. And finally, Conflict is the percentage of data points with conflicting labels (that also are non-abstaining).

Looking at the LFAnalysis report in Figure 6-15 we have surprisingly high coverage for label_rp, label_wp, and label_snopes and as a result they also have higher conflicts and overlaps.

Type Conversions needed for Spark runtime

Before we can run the training of the model, we need to convert a few data types and shape the dataset to be able to run on Spark

If you recall, when we split the data into an 80-20 ratio for the training and testing set, that data type is still a pandas Dataframe (pandas.core.frame.DataFrame), and the training dataset looks like.

Training dataset
Figure 6-16. Training dataset

For us to train using Snorkel, we need to convert all the labeling function columns from int to string. If we don’t do this when we try and run the training later we will get an arrow optimization error, but the real cause is type conversion failure.

An example of this failure in our test run is shown below.

`Error: /databricks/spark/python/pyspark/sql/pandas/conversion.py:315:

UserWarning: createDataFrame attempted Arrow optimization because
'spark.sql.execution.arrow.pyspark.enabled' is set to true; however,
failed by the reason below: Unable to convert the field www.snopes.com.
If this column is not necessary, you may consider dropping it or
converting to primitive type before the conversion.
Direct cause: Unsupported type in conversion from Arrow:
null Attempting non-optimization as
'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.`

The reason this is failing is that Snorkel is expecting a string.

TypeError: field www.politifact.com: Can not merge
    type <class 'pyspark.sql.types.DoubleType'> and
             <class 'pyspark.sql.types.StringType'>

Apache Arrow

Apache Arrow is a column-oriented in-memory format that can represent both hierarchical and flat data for efficient analytic workloads. A typical RDBMS is a row-oriented implementation where data is stored by rows. A column-oriented structure is one where data is stored by columns, rather than rows.

Internally, Spark uses Apache Arrow to transfer the data between Python and JVM. Arrow usage is disabled by default and is something that we need to enable. Of course, we also need to have Apache Arrow (PyArrow) install on all Spark cluster nodes. Arrow can also be used with other libraries such as NumPy, and Pandas.

We need to do the following steps for both the training and validation dataset.

  • For all the columns we added (which map to the output of a labeling function), we explicitly convert the type to String.

  • We ensure the null is handled correctly and represented as None.

  • And finally, we convert the pandas dataframe to RDDs using spark.createDataFrame as outlined earlier.

We can convert both the datatype for the columns as outlined below. The .asType() method is essentially a cast() method that allows us to transform pandas objects to the specified type and can be handy when wanting to convert a column, as we are.

# Training dataset
df_train[['www.twitter.com',
            'www.politifact.com',
            'www.snopes.com',
            'www.factcheck.org',
            'factcheck.afp.com',
            'www.washingtonpost.com/news/fact-checker/',
            'www.realclearpolitics.com',
            'www.glennbeck.com']] =
df_train[['www.twitter.com',
          'www.politifact.com',
          'www.snopes.com',
          'www.factcheck.org',
          'factcheck.afp.com',
          'www.washingtonpost.com/news/fact-checker/',
          'www.realclearpolitics.com',
          'www.glennbeck.com']].astype(str)

# Validation dataset
df_valid[['www.twitter.com',
          'www.politifact.com',
          'www.snopes.com',
          'www.factcheck.org',
          'factcheck.afp.com',
          'www.washingtonpost.com/news/fact-checker/',
          'www.realclearpolitics.com',
          'www.glennbeck.com']] =
 df_valid[['www.twitter.com',
           'www.politifact.com',
           'www.snopes.com',
           'www.factcheck.org',
           'factcheck.afp.com',
           'www.washingtonpost.com/news/fact-checker/',
           'www.realclearpolitics.com',
           'www.glennbeck.com']].astype(str)

Now, with the cleaned-up data types, we can convert these into RDDs.

# Training dataset conversion
rdd_train = spark.createDataFrame(df_train).rdd

# Validation dataset conversion
rdd_valid = spark.createDataFrame(df_valid).rdd

After the conversion, we are now finally ready to train the model.

label_model = LabelModel()

# seed == ultimate question of life
label_model.fit(L_train=L_train, n_epochs=100, log_freq=100, seed=42)

preds_train_label = label_model.predict(L=L_train)
preds_valid_label = label_model.predict(L=L_valid)
L_valid = applier.apply(rdd_valid)

Y_valid = df_valid["label_numeric"].values
LFAnalysis(L_valid, lfs).lf_summary(Y_valid)

With the training done, we should analyze the validation set. We will do this using LFAnalysis - similar to how we did this earlier for the training set. The output of Snorkel’s analysis for the validation set is shown in Figure 6-17 below.

SparkLFApplier Result - Validation set
Figure 6-17. SparkLFApplier Result - Validation set

We also use Snorkel’s LabelModel.score() function get industry-standard metrics as shown below. This allows us to do a more apples-to-apples comparison between iterations and other models.

f1_micro = label_model.score(L_valid, Y_valid, metrics=["f1_micro"])
accuracy = label_model.score(L_valid, Y_valid, metrics=["accuracy"])
recall = label_model.score(L_valid, Y_valid, metrics=["recall"])
precision = label_model.score(L_valid, Y_valid, metrics=["precision"])

print("{} {} {} {}".format(f1_micro, accuracy, recall, precision))

As we can see below, in our instance on Spark, all the three metrics we are wanting to track are quite decent in the high seventy percentage, with recall almost at 80%. If you compared, these are quite similar to the metrics we had earlier in Chapter 3.

{'f1_micro': 0.7504401408450704}
{'accuracy': 0.7504401408450704}
{'recall': 0.7962798937112489}
{'precision': 0.7273462783171522}

Finally, we combine the training and validation set and save that data to storage.

snorkel_predictions = np.concatenate((preds_train_label,preds_valid_label))
data["snorkel_labels"] = snorkel_predictions

data.to_csv(TRAINED_dataFile, header=True)

Summary

In this chapter, we explored the need for scalability and saw the difference between scaling up and out. We introduced Apache Spark and saw how its application design allows us to scale relatively easily. We used the Azure managed instance of Spark (Azure Databricks) as our instance to keep things simple and allowing us to bootstrap quickly.

In this chapter, we used the Fake news dataset and the implementation from Chapter 3 to scale from one machine to a Spark-managed cluster running on many machines. Along this journey, we also learned the different configuration setups and changes we need to make to allow our Fake news sample to scale. Many of these are nuances of different runtime and execution environments.

During the chapter, we saw the key code differences that are needed for Snorkel to work. We also saw that dataframes between Pandas and Spark are quite different, including their handling of nulls. These differences are critical enough to make or break our logic and code, and important to be aware when designing the solution upfront.

The ability to scale is critical and required by most enterprises to be able to use weak supervision in production. The recent advancements with data programming using transfer learning and combining Snorkel have the potential for tremendous business success, and we cannot wait for you to come along with us on this journey.

Bibliography

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
44.212.50.220