Logistic Regression#

Logistic regression is often used in both predictive and inferential analysis. A logistic regression model is trained on a dataset to learn how certain independent variables relate to the probability of a binary outcome occurring. The model is then applied to cases where the independent variables are known but the outcome is unknown, and the probability of the outcome occurring can be estimated.

This article shows how to use the Spark ML functions to generate a logistic regression model in PySpark and sparklyr, including a discussion of the steps necessary to prepare the data and potential issues to consider. We will also show how to access standard errors and confidence intervals for inferential analysis and how to assemble these steps into a pipeline.

Preparing the data#

We will be using the Spark ML functions GeneralizedLinearRegression/ml_generalised_linear_regression. The syntax used to call these functions is somewhat similar to logistic regression functions in python or R. However, there may be some additional steps to take in preparing the data before the model can be run successfully.

We can read the data in as follows:

#import packages

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import yaml

import pandas as pd

#local session 
spark = (SparkSession.builder.master("local[2]").appName("logistic-regression").getOrCreate())

with open("../../../config.yaml") as f:
    config = yaml.safe_load(f)

# Set the data path
rescue_path_parquet = config["rescue_clean_path"]

# Read in the data
rescue = spark.read.parquet(rescue_path_parquet)

rescue.limit(5).toPandas()

Let’s say we want to use the rescue data to build a logistic regression model that predicts whether an animal is a cat or not. We need to create a new column for our target variable, is_cat which takes the value “1” if an animal is a cat, or “0” if it is not a cat.

# Create is_cat column to contain target variable and select relevant predictors
rescue_cat = rescue.withColumn('is_cat', 
                               F.when(F.col('animal_group')=="Cat", 1)
                               .otherwise(0)).select("typeofincident", 
                              "engine_count", 
                              "job_hours", 
                              "hourly_cost", 
                              "total_cost", 
                              "originofcall", 
                              "propertycategory",
                              "specialservicetypecategory",
                              "incident_duration",
                              "is_cat")

# Check created column
rescue_cat.limit(20).toPandas()

# Check data types
rescue_cat.printSchema()

Examining the dataset with printSchema/glimpse(), we can see that some of the columns we have selected (such as engine_count) are of the type “character” when they should be numeric. We can convert all of these to numeric values by running the following:

# Convert engine_count, job_hours, hourly_cost, total_cost and incident_duration columns to numeric
rescue_cat = (
  rescue_cat.withColumn("engine_count", F.col("engine_count").cast("double"))
            .withColumn("job_hours", F.col("job_hours").cast("double"))
            .withColumn("hourly_cost", F.col("hourly_cost").cast("double"))
            .withColumn("total_cost", F.col("total_cost").cast("double"))
            .withColumn("incident_duration", F.col("incident_duration").cast("double")))


# Check data types are now correct
rescue_cat.printSchema()

Missing values#

You may already be familiar with functions such as LogisticRegression or glm for performing logistic regression in python and R. Some of these commonly used functions are quite user-friendly and will automatically account for issues such as missing values in the data. However, the Spark ML functions in PySpark and sparklyr require the user to correct for these issues before running the regression functions.

# Get the count of missing values for each column
missing_summary = (
    rescue_cat
    .select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in rescue_cat.columns])
)

# Show the summary
missing_summary.show(vertical = True)

# We can see that these are all on the same rows by filtering for NAs in one of the columns:
rescue_cat.filter(rescue_cat.total_cost.isNull()).limit(38).toPandas()

# For simplicity, we will just filter out these rows:
rescue_cat = rescue_cat.na.drop()

# Double check we have no nulls left:
missing_summary = (
    rescue_cat
    .select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in rescue_cat.columns])
    .show(vertical = True)
)

Now we have dealt with incorrect data types and missing values, we are ready to run our logistic regression.

Running a logistic regression model#

When running a logistic regression model in sparklyr using ml_generalised_linear_regression, it is sufficient to prepare the data and deal with missing values as shown above before running the model. The function will automatically encode categorical variables without user input.

However, using the Spark ML functions for logistic regression in PySpark requires the user to encode any categorical variables in the dataset so they can be represented numerically before running the regression model. Typically this is done using a method called one-hot encoding. One category is selected as the reference category and new columns are created in the data for each of the other categories, assigning a 1 or 0 for each row depending on whether that entry belongs to that category or not.

This is done implicitly by the ml_generalised_linear_regression function in sparklyr, but in PySpark we will need to make use of the StringIndexer and OneHotEncoderEstimator feature transformers to prepare the data first. Additionally, running the regression model in PySpark requires us to assemble all of the predictor variables into a single features column using the VectorAssembler feature transformer.

Note that in Spark 3.X, the OneHotEncoderEstimator feature transformer in PySpark has been renamed OneHotEncoder, so the PySpark code below will need to be amended accordingly.

Python Example

First, we will need to encode the categorical variables. This can be done in two stages. The first uses the StringIndexer feature transformer to convert each categorical column into numeric data. It assigns a numerical index to each unique category. By default, it will choose the most frequent category in the data to label first (starting from 0). The next most frequent category will be assigned 1, then 2, and so on.

Note that the StringIndexer does not support multiple input columns so we will have to apply it to each category individually.

# Importing the required libraries - replace OneHotEncoderEstimator with OneHotEncoder if using Spark >= 3.0
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoderEstimator

## First we call the StringIndexer separately for each categorical variable

# Indexing the specialservicetypecategory column
serviceIdx = StringIndexer(inputCol='specialservicetypecategory',
                               outputCol='serviceIndex')

# Indexing the originofcallcolumn
callIdx = StringIndexer(inputCol='originofcall',
                               outputCol='callIndex')

# Indexing the propertycategory column
propertyIdx = StringIndexer(inputCol='propertycategory',
                               outputCol='propertyIndex')
                               
# Apply indexing to each column one by one

rescue_cat_indexed = serviceIdx.fit(rescue_cat).transform(rescue_cat)
rescue_cat_indexed = callIdx.fit(rescue_cat_indexed).transform(rescue_cat_indexed)
rescue_cat_indexed = propertyIdx.fit(rescue_cat_indexed).transform(rescue_cat_indexed)

# Check that this has worked correctly
rescue_cat_indexed.select('is_cat', 'specialservicetypecategory', 'originofcall',
                          'propertycategory', 'serviceIndex', 'callIndex', 
                          'propertyIndex').show(10, truncate = False)

Next we use OneHotEncoderEstimator to perform one-hot encoding. This converts each of the categories we have applied the StringIndexer to into a vector that indicates which category the record belongs to.

# Apply OneHotEncoderEstimator to each categorical column simultaneously
# Replace OneHotEncoderEstimator with OneHotEncoder if using Spark >= 3.0
encoder = OneHotEncoderEstimator(inputCols = ['serviceIndex', 'callIndex', 'propertyIndex'], 
                                 outputCols = ['serviceVec', 'callVec', 'propertyVec'])

rescue_cat_ohe = encoder.fit(rescue_cat_indexed).transform(rescue_cat_indexed)

# Check that this has worked correctly 
rescue_cat_ohe.select('is_cat', 'specialservicetypecategory', 'originofcall',
                          'propertycategory', 'serviceVec', 'callVec', 
                          'propertyVec').show(10, truncate = False)

The format of the vectors generated by the one-hot encoders can be difficult to understand at first glance as it is not as intuitive as simply creating new columns for categories and assigning a 1 or 0 to indicate which category they belong to. The one-hot encoder feature transformer essentially maps the column indices we generated with the string indexer to a column of binary vectors.

Each vector has three elements. The first gives the number of categories in the encoded variable, minus one (the reference category). The second element gives the index of the category the entry belongs to, as generated previously with the string indexer. This is empty (“[]”) if the entry belongs to the reference category. The final element holds the binary value that indicates whether the entry belongs to the index column specified by the first two elements of the vector. As a result, this third element is always “1.0” unless the entry belongs to the reference category, in which case it is empty (“[]”).

As an example, let’s take a closer look at the first and fourth entries for serviceVec in the output table shown above. The first number in the vector for both entries is “3” since this is equal to the number of categories in the specialservicetypecategory column minus the reference category. In this case, the reference category is Animal Rescue From Water and the other categories have been assigned an index of 0 (Other Animal Assistance), 1 (Animal Rescue From Height), or 2 (Animal Rescue From Below Ground) by the string indexer. Therefore, the top row has a serviceVec of (3,[0], [1.0]). This means it belongs to the 0th index of the specialservicetypecategory column, Other Animal Assistance. The fourth row on the other hand is assigned (3,[],[]), where the final two elements of the vector are empty since it belongs to the reference category rather than any of our indexed categories. Further information can be found in the OneHotEncoder documentation.

Then, we need to vectorise all our predictors into a new column called “features” which will be our input/features class. We must also rename our target variable column, “is_cat” to “label”:

# Call 'VectorAssembler' to vectorise all predictor columns in dataset
assembler = VectorAssembler(inputCols=['engine_count', 'job_hours', 'hourly_cost',
                                       'callVec', 'propertyVec', 'serviceVec'],
                            outputCol = "features")
 
# Apply vectorisation                                      
rescue_cat_vectorised = assembler.transform(rescue_cat_ohe)

# Rename "is_cat" target variable column to "label" ready to pass to the regression model
rescue_cat_final = rescue_cat_vectorised.withColumnRenamed("is_cat", "label").select("label", "features")

We will use the GeneralizedLinearRegression pyspark.ml function to carry out our regression. The model can be imported and set up like so:

# Import GeneralizedLinearRegression
from pyspark.ml.regression import GeneralizedLinearRegression

# Define model - specify family and link as shown for logistic regression
glr = GeneralizedLinearRegression(family="binomial", link="logit")

Once this has been done we can run the model like so:

# Run model
model = glr.fit(rescue_cat_final)

# Get model results
model_output = model.transform(rescue_cat_final)
model_output.show(10)
R Example We can run a logistic regression model in `sparklyr` by using the `ml_logistic_regression` function:
# Run the model
glm_out <- sparklyr::ml_logistic_regression(rescue_cat, 
                                            formula = "is_cat ~ engine_count + job_hours + hourly_cost + originofcall + propertycategory + specialservicetypecategory")

Where we have specified the variables to be used in the regression using the formula syntax target variable ~ predictor1 + predictor2 + predictor3 +....

Model predictions can be accessed using ml_predict:

# Get model predictions
sparklyr::ml_predict(glm_out) %>% 
  print(n = 20, width = Inf)

Inferential Analysis#

The Spark ML functions available in PySpark and sparklyr were largely developed with predictive analysis in mind. This means that they have been built primarily to specify a regression model and retrieve the prediction (as we have done above), with little information in between.

When conducting analysis at ONS, we are often not interested in predicting unknown outcomes, but instead understanding the relationship between the independent variables and the probability of the outcome. This is what is referred to as inferential analysis in this section.

Python Example The regression coefficients from the model above can be accessed by applying the `summary` method:
# Get model summary
summary = model.summary

# Show summary
summary

This provides the regression coefficents, standard errors, t-values and p-values for each feature variable. Unfortunately, we have not yet found or built functionality in PySpark to calculate exact confidence intervals. Instead, we can approximate the 95% confidence intervals by multiplying the standard errors by 1.96 and subtract or add this to the estimate. As PySpark regression functions are only necessary on datasets with a large number of observations, this approximation is sufficient. To make it clear which confidence intervals correspond to which features, the example below also shows how to reconstruct the summary above into a Pandas dataframe that also contains the confidence intervals:

# Get model output
model_output = model.transform(rescue_cat_final)

# Get feature names from the model output metadata
# Numeric and binary (categorical) metadata are accessed separately
numeric_metadata = model_output.select("features").schema[0].metadata.get('ml_attr').get('attrs').get('numeric')
binary_metadata = model_output.select("features").schema[0].metadata.get('ml_attr').get('attrs').get('binary')

# Merge the numeric and binary metadata lists to get all the feature names
merge_list = numeric_metadata + binary_metadata

# Convert the feature name list to a Pandas dataframe
full_summary = pd.DataFrame(merge_list)

# Get the regression coefficients from the model
full_summary['coefficients'] = model.coefficients

# The intercept coefficient needs to be added in separately since it is not part of the features metadata
# Define a new row for the intercept coefficient and get value from model
intercept = pd.DataFrame({'name':'intercept', 'coefficients':model.intercept}, index = [0])

# Add new row to the top of the full_summary dataframe
full_summary = pd.concat([intercept,full_summary.loc[:]]).reset_index(drop=True)

# Add standard errors, t-values and p-values from summary into the full_summary dataframe:
full_summary['std_error'] = summary.coefficientStandardErrors
full_summary['tvalues'] = summary.tValues
full_summary['pvalues'] = summary.pValues

# Manually calculate upper and lower confidence bounds and add into dataframe
full_summary['upper_ci'] = full_summary['coefficients'] + (1.96*full_summary['std_error'])
full_summary['lower_ci'] = full_summary['coefficients'] - (1.96*full_summary['std_error'])

# View final model summary
full_summary
R Example The regression coefficients from the model above can be displayed by using the `tidy` function from the `broom` package:
# Run the model
glm_out <- sparklyr::ml_logistic_regression(rescue_cat, 
                                            formula = "is_cat ~ engine_count + job_hours + hourly_cost + originofcall + propertycategory + specialservicetypecategory")

# View coefficients
broom::tidy(glm_out)

This does not provide a lot of information. The ml_logistic_regression function only produces regression coefficients and does not produce standard errors. This means that confidence intervals cannot be calculated.

Instead, we must use the function ml_generalised_linear_regression. Here, we need to specify the family and link arguments as “binomial” and “logit” respectively in order to run a logistic regression model that produces standard errors.

# Run model with ml_generalized_linear_regression
glm_out <- sparklyr::ml_generalized_linear_regression(rescue_cat, 
                                            formula = "is_cat ~ engine_count + job_hours + hourly_cost + originofcall + propertycategory + specialservicetypecategory", 
                                            family = "binomial", 
                                            link = "logit")

# View a tibble of coefficients, std. error, p-values
broom::tidy(glm_out)

Individual statistics of interest can also be accessed directly using ml_summary. For example, to get the standard errors:

glm_out$summary$coefficient_standard_errors()

Other statistics can also be accessed individually in this way by replacing “coefficient_standard_errors” with the statistic of interest from the list below:

GeneralizedLinearRegressionTrainingSummary 
Access the following via `$` or `ml_summary()`. 
- aic() 
- degrees_of_freedom() 
- deviance() 
- dispersion() 
- null_deviance() 
- num_instances() 
- prediction_col 
- predictions 
- rank 
- residual_degree_of_freedom() 
- residual_degree_of_freedom_null() 
- residuals() 
- coefficient_standard_errors() 
- num_iterations 
- solver 
- p_values() 
- t_values()

Unfortunately, we have not yet found or built functionality in sparkylr to calculate exact confidence intervals. Instead, we can approximate the 95% confidence intervals by multiplying the standard errors by 1.96 and subtract or add this to the estimate. As sparklyr regression functions are only necessary on datasets with a large number of observations, this approximation is sufficient.

broom::tidy(glm_out) %>%
  dplyr::mutate(lower_ci = estimate - (1.96 * std.error),
                upper_ci = estimate + (1.96 * std.error))

Things to watch out for#

Singularity issue#

Some functions for carrying out logistic regression, such as glm when using R, are able to detect when a predictor in the model only takes a singular value and drop these columns from the analysis. However, the equivalent functions in PySpark and sparklyr do not have this functionality, so we need to be careful to check that variables included in the model take more than one value in order to avoid errors.

Python Example
# Add the `typeofincident` categorical column into analysis 
# Setup column indexing
incidentIdx = StringIndexer(inputCol='typeofincident',
                               outputCol='incidentIndex')

# Call the string indexer 
rescue_cat_singular_indexed = incidentIdx.fit(rescue_cat_indexed).transform(rescue_cat_indexed)

# Setup one-hot encoding
encoder_singular = OneHotEncoderEstimator(inputCols = ['incidentIndex'], 
                                 outputCols = ['incidentVec'])
                      
# The following returns an error "The input column incidentIndex should have at least two distinct values":
rescue_cat_singular_ohe = encoder_singular.fit(rescue_cat_singular_indexed).transform(rescue_cat_singular_indexed)

Running the code above will produce an error since the typeofincident column only contains one value, Special Service. The regression model will not run unless typeofincident is removed from the formula.

Similarly, a column with a singular numeric value will also fail. Although instead of returning an error at the encoding stage, the model will run but be unable to return a summary:

# Convert typeofincident column into numeric value
rescue_cat_singular = rescue_cat_ohe.withColumn('typeofincident', F.when(F.col('typeofincident')=="Special Service", 1)
                               .otherwise(0))

# Setup the vectorassembler to include this variable in the features column
assembler = VectorAssembler(inputCols=['typeofincident', 'engine_count', 'job_hours', 'hourly_cost', 
                                       'callVec', 'propertyVec', 'serviceVec'], 
                           outputCol = "features")

rescue_cat_vectorised_sing = assembler.transform(rescue_cat_singular)


rescue_cat_final_sing = rescue_cat_vectorised_sing.withColumnRenamed("is_cat", "label").select("label", "features")

# Run the model
model_sing = glr.fit(rescue_cat_final_sing)

# Return model summary (will give an error)
summary_sing = model_sing.summary

summary_sing
R Example
glm_singular <- sparklyr::ml_generalized_linear_regression(rescue_cat, 
                                                 formula = "is_cat ~ typeofincident + engine_count + job_hours + hourly_cost + originofcall + propertycategory + specialservicetypecategory", 
                                                 family = "binomial", 
                                                 link = "logit")

Running the code above will produce an error since the typeofincident column only contains one value, Special Service. The regression model will not run unless typeofincident is removed from the formula.

Selecting reference categories#

When including categorical variables as independent variables in a regression, one of the categories must act as the reference category. The coefficients for all other categories are relative to the reference category.

By default, the OneHotEncoderEstimator/ml_generalized_linear_regression functions will select the least common category as the reference category before applying one-hot encoding to the categories so that they can be represented as a binary numerical value. For example, specialservicetypecategory has four unique values: Animal Rescue From Below Ground, Animal Rescue From Height, Animal Rescue From Water, and Other Animal Assistance.

rescue_cat.groupBy("specialservicetypecategory").count().orderBy("count").show(truncate = False)
+-------------------------------+-----+
|specialservicetypecategory     |count|
+-------------------------------+-----+
|Animal Rescue From Water       |343  |
|Animal Rescue From Below Ground|593  |
|Animal Rescue From Height      |2123 |
|Other Animal Assistance        |2801 |
+-------------------------------+-----+

Since there are only 343 instances of “Animal Rescue From Water” in the data, this has been automatically selected as the reference category in the example above. Regression coefficients in the previous section are therefore shown relative to the to the “Animal Rescue From Water” reference category.

Selecting a reference category can be particularly useful for inferential analysis. For example, in the rescue_cat dataset, we might want to select “Other Animal Assistance” as our reference category instead because it is the largest special service type and could serve as a useful reference point.

Python Example

To do this, we can make use of the additional stringOrderType argument for the StringIndexer function we used previously to index our categorical variables. This allows us to specify the indexing order. By default, this argument is set to frequencyDesc, so “Animal Rescue From Water” would be ordered last and is subsequently dropped by the OneHotEncoderEstimator function and set as the reference category. If we want “Other Animal Assistance” to be dropped instead, we need to ensure it is placed last in the indexing order. In this case, specifying stringOrderType = frequencyAsc would be sufficient to achieve this. However, to keep this example as general as possible, a convenient way of ensuring any chosen reference category is ordered last is to add an appropriate prefix to it, such as “000_”, and order the categories in descending alphabetical order (alphabetDesc):

# Add "000_" prefix to selected reference categories

rescue_cat_reindex = (rescue_cat
                      .withColumn('specialservicetypecategory', 
                                       F.when(F.col('specialservicetypecategory')=="Other Animal Assistance", "000_Other Animal Assistance")
                                       .otherwise(F.col('specialservicetypecategory')))
                      .withColumn('originofcall', 
                                       F.when(F.col('originofcall') == "Person (mobile)", "000_Person (mobile)")
                                       .otherwise(F.col('originofcall')))
                      .withColumn('propertycategory', 
                                       F.when(F.col('propertycategory') == "Dwelling", "000_Dwelling")
                                       .otherwise(F.col('propertycategory'))))

# Check prefix additions 
rescue_cat_reindex.select('specialservicetypecategory', 'originofcall', 'propertycategory').show(20)

# Use stringOrderType argument of StringIndexer

# Re-indexing the specialservicetypecategory column
serviceIdx = StringIndexer(inputCol='specialservicetypecategory',
                               outputCol='serviceIndex', 
                               stringOrderType = "alphabetDesc")

# Indexing the originofcallcolumn
callIdx = StringIndexer(inputCol='originofcall',
                               outputCol='callIndex',
                               stringOrderType = "alphabetDesc")

# Indexing the propertycategory column
propertyIdx = StringIndexer(inputCol='propertycategory',
                               outputCol='propertyIndex', 
                               stringOrderType = "alphabetDesc")

# Call indexing for each column one by one

rescue_cat_indexed = serviceIdx.fit(rescue_cat_reindex).transform(rescue_cat_reindex)
rescue_cat_indexed = callIdx.fit(rescue_cat_indexed).transform(rescue_cat_indexed)
rescue_cat_indexed = propertyIdx.fit(rescue_cat_indexed).transform(rescue_cat_indexed)

Once this is done we can run the regression again and get the coefficients relative to the chosen reference categories:

# Encode re-indexed columns
encoder = OneHotEncoderEstimator(inputCols = ['serviceIndex', 'callIndex', 'propertyIndex'], 
                                 outputCols = ['serviceVec', 'callVec', 'propertyVec'])

rescue_cat_ohe = encoder.fit(rescue_cat_indexed).transform(rescue_cat_indexed)


# Vectorize all our predictors into a new column called "features" 

assembler = VectorAssembler(inputCols=['engine_count', 'job_hours', 'hourly_cost', 
                                       'callVec', 'propertyVec', 'serviceVec'], 
                           outputCol = "features")

rescue_cat_vectorised = assembler.transform(rescue_cat_ohe)

# Rename target variable "is_cat" to "label" ready to run regression model
rescue_cat_final = rescue_cat_vectorised.withColumnRenamed("is_cat", "label").select("label", "features")

# Run the model again
model = glr.fit(rescue_cat_final)

# Show summary
model.summary
R Example

To do this, we can make use of the one-hot encoding concept and two of sparklyr’s Feature Transformers; ft_string_indexer and ft_one_hot_encoder. These have limited functionality, so we must first manipulate it such that the reference category will be ordered last when using ft_string_indexer, to ensure that the category ordered last is dropped when applying ft_one_hot_encoder. A convenient way of doing this is to order the categories in descending alphabetical order and ensuring that our chosen category will be ordered last by adding an appropriate prefix to it. For example, adding 000_:

rescue_cat_ohe <- rescue_cat %>%
  dplyr::mutate(specialservicetypecategory = ifelse(specialservicetypecategory == "Other Animal Assistance",
                                                    "000_Other Animal Assistance",
                                                    specialservicetypecategory)) %>%
  sparklyr::ft_string_indexer(input_col = "specialservicetypecategory", 
                              output_col = "specialservicetypecategory_idx",
                              string_order_type = "alphabetDesc") %>%
  sparklyr::ft_one_hot_encoder(input_cols = c("specialservicetypecategory_idx"), 
                               output_cols = c("specialservicetypecategory_ohe"), 
                               drop_last = TRUE) %>%
  sparklyr::sdf_separate_column(column = "specialservicetypecategory_ohe",
                                into = c("specialservicetypecategory_Animal Rescue From Water", 
                                         "specialservicetypecategory_Animal Rescue From Below Ground", 
                                         "specialservicetypecategory_Animal Rescue From Height")) %>% 
  sparklyr::select(-ends_with(c("_ohe", "_idx")),
                   -specialservicetypecategory)

sdf_separate_column has been used to separate encoded variables into individual columns and the intermediate columns with “_ohe” and “_idx” suffixes have been dropped once the process is complete. This can be repeated for every categorical variable as desired.

# originofcall
rescue_cat_ohe <- rescue_cat_ohe %>%
  dplyr::mutate(originofcall = ifelse(originofcall == "Person (mobile)",
                                      "000_Person (mobile)",
                                      originofcall )) %>%
  sparklyr::ft_string_indexer(input_col = "originofcall", 
                              output_col = "originofcall_idx",
                              string_order_type = "alphabetDesc") %>%
  sparklyr::ft_one_hot_encoder(input_cols = c("originofcall_idx"), 
                               output_cols = c("originofcall_ohe"), 
                               drop_last = TRUE) %>%
  sparklyr::sdf_separate_column(column = "originofcall_ohe",
                                into = c("originofcall_Ambulance", 
                                         "originofcall_Police", 
                                         "originofcall_Coastguard",
                                         "originofcall_Person (land line)",
                                         "originofcall_Not Known",
                                         "originofcall_Person (running Call)",
                                         "originofcall_Other Frs")) 
# propertycategory
rescue_cat_ohe <- rescue_cat_ohe %>%
  dplyr::mutate(propertycategory = ifelse(propertycategory == "Dwelling",
                                          "000_Dwelling",
                                          propertycategory)) %>%
  sparklyr::ft_string_indexer(input_col = "propertycategory", 
                              output_col = "propertycategory_idx",
                              string_order_type = "alphabetDesc") %>%
  sparklyr::ft_one_hot_encoder(input_cols = c("propertycategory_idx"), 
                               output_cols = c("propertycategory_ohe"), 
                               drop_last = TRUE) %>%
  sparklyr::sdf_separate_column(column = "propertycategory_ohe",
                                into = c("propertycategory_Outdoor", 
                                         "propertycategory_Road Vehicle", 
                                         "propertycategory_Non Residential",
                                         "propertycategory_Boat",
                                         "propertycategory_Outdoor Structure",
                                         "propertycategory_Other Residential"))


# remove _idx and _ohe intermediate columns and original data columns
# also remove `typeofincident` to avoid singularity error
rescue_cat_ohe <- rescue_cat_ohe %>% 
  sparklyr::select(-ends_with(c("_ohe", "_idx")),
                   -originofcall,
                   -propertycategory,
                   -typeofincident)

Once this is done we can run the regression again and get the coefficients relative to the chosen reference categories:

# Run regression with one-hot encoded variables with chosen reference categories
glm_ohe <- sparklyr::ml_generalized_linear_regression(rescue_cat_ohe, 
                                                      formula = "is_cat ~ .", 
                                                      family = "binomial", 
                                                      link = "logit")

# Get coefficients and confidence intervals
broom::tidy(glm_ohe) %>%
  dplyr::mutate(lower_ci = estimate - (1.96 * std.error),
                upper_ci = estimate + (1.96 * std.error))

Multicollinearity#

A key assumption of linear and logistic regression models is that the feature columns are independent of one another. Including features that correlate with each other in the model is a clear violation of this assumption, so we need to identify these and remove them to get a valid result.

A useful way of identifying these variables is to generate a correlation matrix of all the the features in the model. This can be done using the corr function from the ml subpackage pyspark.ml.stat in PySpark, or the ml_corr function in sparklyr.

In PySpark, note that the correlation function is only applicable to a vector and not to a dataframe. We will therefore have to apply it to the “features” column of the rescue_cat_final dataset produced after the indexing, encoding and vector assembly stages of the model setup.

from pyspark.ml.stat import Correlation

# Select feature column vector
features_vector = rescue_cat_final.select("features")

# Generate correlation matrix
matrix = Correlation.corr(features_vector, "features").collect()[0][0]

# Convert matrix into a useful format
corr_matrix = matrix.toArray().tolist() 

# Get list of features to assign to matrix columns and indices
features = pd.DataFrame(merge_list)['name'].values.tolist()

# Final correlation matrix
corr_matrix_df = pd.DataFrame(data=corr_matrix, columns = features, index = features) 

corr_matrix_df

Looking closely at the total_cost column, we can see that there is very strong correlation between that column and job_hours, with a correlation coefficient very close to 1. Additionally, there is quite strong correlation between the total_cost column and the engine_count column, as well as a moderate correlation with both hourly_cost and the “Animal Rescue From Water” category in the specialservicetypecategory column. This is to be expected, since if we take a closer look at total_cost we can see it always takes a value that is equal to hourly_cost multiplied by job_hours.

rescue_cat.select("job_hours", "hourly_cost", "total_cost").orderBy("job_hours", ascending=False).limit(30).toPandas()
	job_hours	hourly_cost	total_cost
0	12.0	326.0	3912.0
1	12.0	290.0	3480.0
2	10.0	298.0	2980.0
3	9.0	260.0	2340.0
4	9.0	260.0	2340.0
5	9.0	260.0	2340.0
6	9.0	295.0	2655.0
7	8.0	260.0	2080.0
8	8.0	333.0	2664.0
9	7.0	328.0	2296.0
10	7.0	260.0	1820.0
11	7.0	290.0	2030.0
12	7.0	260.0	1820.0
13	7.0	290.0	2030.0
14	7.0	295.0	2065.0
15	7.0	326.0	2282.0
16	6.0	298.0	1788.0
17	6.0	260.0	1560.0
18	6.0	333.0	1998.0
19	6.0	260.0	1560.0
20	6.0	298.0	1788.0
21	5.0	260.0	1300.0
22	5.0	260.0	1300.0
23	5.0	260.0	1300.0
24	5.0	260.0	1300.0
25	5.0	260.0	1300.0
26	5.0	255.0	1275.0
27	5.0	260.0	1300.0
28	5.0	260.0	1300.0
29	5.0	260.0	1300.0

Therefore, we need to drop this feature from our model since it is not independent of other variables. You may also want to remove other variables from the model based on the correlation matrix. For example, there is also quite high correlation between the job_hours column and several other features. Deciding on which features to remove from your model may require some experimentation, but in general, a correlation coefficient of >0.7 among two or more predictors indicates multicollinearity and some of these should be removed from the model to ensure validity.

Coefficients:
             Feature Estimate   Std Error  T Value P Value
         (Intercept)   1.0885      0.3722   2.9246  0.0034
        engine_count  -0.6804      0.2214  -3.0729  0.0021
         hourly_cost  -0.0007      0.0010  -0.7175  0.4730
      callVec_Police  -0.9668      0.2257  -4.2829  0.0000
callVec_Person (r...   0.6470      1.5530   0.4166  0.6770
callVec_Person (l...   0.1338      0.0572   2.3375  0.0194
   callVec_Other Frs  -0.6789      0.3657  -1.8565  0.0634
   callVec_Not Known -25.0264 356123.9993  -0.0001  0.9999
  callVec_Coastguard -26.0423 356123.9993  -0.0001  0.9999
   callVec_Ambulance  -0.0626      1.4188  -0.0441  0.9648
propertyVec_Road ...   0.2017      0.1470   1.3720  0.1701
propertyVec_Outdo...  -1.4310      0.1159 -12.3504  0.0000
 propertyVec_Outdoor  -0.7482      0.0677 -11.0556  0.0000
propertyVec_Other...  -0.2765      0.4561  -0.6062  0.5444
propertyVec_Non R...  -0.9055      0.0921  -9.8298  0.0000
    propertyVec_Boat   0.7267      1.4248   0.5100  0.6100
serviceVec_Animal...  -0.9946      0.1600  -6.2167  0.0000
serviceVec_Animal...   0.4235      0.0614   6.9004  0.0000
serviceVec_Animal...   0.4458      0.0959   4.6498  0.0000

(Dispersion parameter for binomial family taken to be 1.0000)
    Null deviance: 8123.3546 on 5841 degrees of freedom
Residual deviance: 7535.0784 on 5841 degrees of freedom

Spark ML Pipelines#

We can also combine all of the analysis steps outlined above into a single workflow using Spark’s ML Pipelines. Data manipulation, feature transformation and modelling can be rewritten into a pipeline/ml_pipeline() object and saved.

The output of this in sparklyr is in Scala code, meaning that it can be added to scheduled Spark ML jobs without any dependencies in R. However, it should be noted that we have not yet found a simple way of generating a summary coefficient table as shown in the sections above when using a pipeline for analysis. Although coefficients, p-values etc. can be obtained it is difficult to map these to the corresponding features for inferential analysis. As a result, pipelines for logistic regression in sparklyr are currently best used for predictive analysis only.

PySpark pipelines do not have this problem and we can access the coefficients in a similar way to that shown above even when using a pipeline for analysis.

Python Example

As shown above, we need to prepare the data, index categorical variables, encode them, assemble features into a single vector column and run the regression to perform our analysis. The data preparation step can be done before setting up the pipeline (in this case we will re-use the rescue_cat_reindex dataframe from the Selecting reference categories section of this guide as a shortcut), so our pipeline steps will be as follows:

  1. String indexer for specialservicetypecategory

  2. String indexer for originofcall

  3. String indexer for propertycategory

  4. One hot encoder for specialservicetypecategory, originofcalland propertycategory

  5. Vector assembler to generate a single features column that contains a vector of features

  6. Logistic regression model

We will define each of these steps first to assemble the pipeline:

from pyspark.ml import Pipeline

# Rename "is_cat" to "label" before setting up pipeline stages
rescue_cat_reindex = rescue_cat_reindex.withColumnRenamed("is_cat", "label")

# 1. Indexing the specialservicetypecategory column
serviceIdx = StringIndexer(inputCol='specialservicetypecategory',
                               outputCol='serviceIndex', 
                               stringOrderType = "alphabetDesc")

# 2. Indexing the originofcall column
callIdx = StringIndexer(inputCol='originofcall',
                               outputCol='callIndex',
                               stringOrderType = "alphabetDesc")

# 3. Indexing the propertycategory column
propertyIdx = StringIndexer(inputCol='propertycategory',
                               outputCol='propertyIndex', 
                               stringOrderType = "alphabetDesc")

# 4. One-hot encoding
encoder = OneHotEncoderEstimator(inputCols = ['serviceIndex', 'callIndex', 'propertyIndex'], 
                                 outputCols = ['serviceVec', 'callVec', 'propertyVec'])

# 5. Vector assembler
assembler = VectorAssembler(inputCols=['engine_count', 'hourly_cost', 
                                       'callVec', 'propertyVec', 'serviceVec'], 
                           outputCol = "features")

# 6. Regression model
glr = GeneralizedLinearRegression(family="binomial", link="logit")

# Creating the pipeline
pipe = Pipeline(stages=[serviceIdx, callIdx, propertyIdx,
                        encoder, assembler, glr])
                        
# View the pipeline stages
pipe.getStages()

The pipeline can be fitted to the rescue_cat_reindex dataset by fitting the model (fit) and predictions can be accessed using transform:

fit_model = pipe.fit(rescue_cat_reindex)

# Save model results
results = fit_model.transform(rescue_cat_reindex)
  
# Showing the results
results.show()

To get the summary table with regression coefficients, we will need to access the regression model stage of the pipeline directly using stages:

# Get coefficients summary table
summary = fit_model.stages[-1].summary

summary

The confidence intervals can then be calculated and the summary can be converted into a pandas dataframe as shown in the “Inferential Analysis” section if desired.

You can save a pipeline or pipeline model using the .write().overwrite().save command:

# Save pipeline

pipe.write().overwrite().save("rescue_pipeline")

# Save the pipeline model

fit_model.write().overwrite().save("rescue_model")

They can then be re-loaded using the load() command and the re-loaded model can be used to re-fit new data with the same model by supplying the name of the new spark dataframe you wish to fit.

# Load saved pipeline
reloaded_pipeline = Pipeline.load("rescue_pipeline")

# Re-fit to a subset of rescue data as an example of how pipelines can be re-used
new_model = reloaded_pipeline.fit(rescue_cat_reindex.sample(withReplacement=None,
                      fraction=0.1, seed = 99))
                      
# View new model summary
new_model.stages[-1].summary
# Close the spark session
spark.stop()
R Example

As above, we still need to generate the is_cat column from the original data, select our predictors and remove missing values. In order to select our reference categories we can also add the “000_” prefix to these categories at this stage. All of this data preparation can later be called in the pipeline in a single step using the ft_dplyr_transformer feature transformer. This function converts dplyr code into a SQL feature transformer that can then be used in a pipeline. We will set up the transformation as follows:

rescue_cat <- rescue %>% 
# generate is_cat column
  dplyr::mutate(is_cat = ifelse(animal_group == "Cat", 1, 0)) %>% 
  sparklyr::select(engine_count, 
                   hourly_cost, 
                   originofcall, 
                   propertycategory,
                   specialservicetypecategory,
                   is_cat) %>%
# ensure numeric columns have the correct type
  dplyr::mutate(across(c(engine_count, hourly_cost), 
                   ~as.numeric(.))) %>%
# remove missing values
  sparklyr::filter(!is.na(engine_count)) %>%
# select reference categories and ensure they will be ordered last by the indexer
  dplyr::mutate(specialservicetypecategory = ifelse(specialservicetypecategory == "Other Animal Assistance",
                         "000_Other Animal Assistance",
                         specialservicetypecategory)) %>%
  dplyr::mutate(originofcall = ifelse(originofcall == "Person (mobile)",
                         "000_Person (mobile)",
                          originofcall )) %>%
  dplyr::mutate(propertycategory = ifelse(propertycategory == "Dwelling",
                         "000_Dwelling",
                         propertycategory))

The resulting pipeline stage is produced from the dplyr code:

ft_dplyr_transformer(sc, rescue_cat)

Creating the Pipeline#

Our sparklyr pipeline will have a total of 9 steps:

  1. SQL transformer - resulting from the ft_dplyr_transformer() transformation

  2. String indexer for specialservicetypecategory

  3. String indexer for originofcall

  4. String indexer for propertycategory

  5. One hot encoder for specialservicetypecategory

  6. One hot encoder for originofcall

  7. One hot encoder for propertycategory

  8. Vector assembler - to generate a single features column that contains a vector of feature values (ft_vector_assembler())

  9. Logistic regression model

Unfortunately, in Spark 2.x there is no way of combining multiple string indexers or one hot encoders for different columns, so we need a pipeline step for each column we need to index and encode. For Spark 3.x the ft_one_hot_encoder_estimator() function can be used in the place of ft_one_hot_encoder() which allows the selection of multiple columns. See here for further details.

The pipeline can be constructed as follows:

rescue_pipeline <- ml_pipeline(sc) %>%
  sparklyr::ft_dplyr_transformer(rescue_cat) %>%
  sparklyr::ft_string_indexer(input_col = "specialservicetypecategory", 
                    output_col = "specialservicetypecategory_idx",
                    string_order_type = "alphabetDesc") %>% 
  sparklyr::ft_string_indexer(input_col = "originofcall", 
                    output_col = "originofcall_idx",
                    string_order_type = "alphabetDesc") %>%
  sparklyr::ft_string_indexer(input_col = "propertycategory", 
                    output_col = "propertycategory_idx",
                    string_order_type = "alphabetDesc") %>%
  sparklyr::ft_one_hot_encoder(input_cols = c("specialservicetypecategory_idx"), 
                     output_cols = c("specialservicetypecategory_ohe"), 
                     drop_last = TRUE) %>%
  sparklyr::ft_one_hot_encoder(input_cols = c("originofcall_idx"), 
                     output_cols = c("originofcall_ohe"), 
                     drop_last = TRUE) %>%
  sparklyr::ft_one_hot_encoder(input_cols = c("propertycategory_idx"), 
                     output_cols = c("propertycategory_ohe"), 
                     drop_last = TRUE)  %>%
  sparklyr::ft_vector_assembler(input_cols = c("engine_count", "hourly_cost", "originofcall_ohe", "propertycategory_ohe",
                                              "specialservicetypecategory_ohe"), 
                                output_col = "features") %>%
  ml_generalized_linear_regression(features_col = "features",
                                   label_col = "is_cat",
                                   family = "binomial", 
                                   link = "logit") 
                                   
# View the pipeline
rescue_pipeline

The pipeline can be fitted to the rescue dataset by applying ml_fit() and predictions can be accessed using ml_transform().

# Fit the pipeline 
fitted_pipeline <- ml_fit(rescue_pipeline, rescue)

# View the fitted pipeline - notice output now shows model coeffiecients
fitted_pipeline

# Get predictions
predictions <- ml_transform(fitted_pipeline, rescue) 

It is possible to generate a list of model coefficents, p-values etc. using the ml_stage function to access the logistic regression stage of the model, combined with ml_summary as we did before. However, it is not easy to map the coefficients back to the feature that they correspond to.

# Access the logistic regression stage of the pipeline
model_details <- ml_stage(fitted_pipeline, 'generalized_linear_regression')

# Get coefficients and summary statistics from model
summary <- tibble(coefficients = c(model_details$intercept, model_details$coefficients), 
                  std_errors = model_details$summary$coefficient_standard_errors(), 
                  p_values = model_details$summary$p_values()) %>%
  dplyr::mutate(lower_ci = coefficients - (1.96 * std_errors),
                upper_ci = coefficients + (1.96 * std_errors))
                
# View the summary
summary

You can save a pipeline or pipeline model using the ml_save command:

# Save pipeline
ml_save(
  rescue_pipeline,
  "rescue_pipeline",
  overwrite = TRUE
)

# Save pipeline model
ml_save(
  fitted_pipeline,
  "rescue_model",
  overwrite = TRUE
)

They can then be re-loaded using the ml_load() command and the re-loaded model can be used to re-fit new data with the same model by supplying the name of the new spark dataframe you wish to fit.

# Reload our saved pipeline
reloaded_pipeline <- ml_load(sc, "rescue_pipeline")

# Re-fit to a subset of rescue data as an example of how pipelines can be re-used
new_model <- ml_fit(reloaded_pipeline, sample_frac(rescue, 0.1))
# Close the spark session
spark_disconnect(sc)

Further resources#

PySpark references and documentation:

Sparklyr references and documentation:

Acknowledgements#

Thanks to Ted Dolby for providing guidance on logistic regression in sparklyr which was adapted to produce this page.