Menu

PySpark Exercises – 101 PySpark Exercises for Data Analysis

101 PySpark exercises are designed to challenge your logical muscle and to help internalize data manipulation with python’s favorite package for data analysis. The questions are of 3 levels of difficulties with L1 being the easiest to L3 being the hardest.

You might also like to try out:

  1. 101 Pandas Exercises for Data Analysis
  2. 101 NumPy Exercises for Data Analysis

1. How to import PySpark and check the version?

Difficulty Level: L1

Show Solution

import findspark
findspark.init()

# Creating a SparkSession: A SparkSession is the entry point for using the PySpark DataFrame and SQL API.
# To create a SparkSession, use the following code
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySpark 101 Exercises").getOrCreate()

# Get version details
print(spark.version)

#> 3.3.2

2. How to convert the index of a PySpark DataFrame into a column?

Difficulty Level: L1

Hint: The PySpark DataFrame doesn’t have an explicit concept of an index like Pandas DataFrame. However, if you have a DataFrame and you’d like to add a new column that is basically a row number.

Input:

# Assuming df is your DataFrame
df = spark.createDataFrame([
("Alice", 1),
("Bob", 2),
("Charlie", 3),
], ["Name", "Value"])

df.show()

+-------+-----+
| Name|Value|
+-------+-----+
| Alice| 1|
| Bob| 2|
|Charlie| 3|
+-------+-----+

Expected Output:

+-------+-----+-----+
| Name|Value|index|
+-------+-----+-----+
| Alice| 1| 0|
| Bob| 2| 1|
|Charlie| 3| 2|
+-------+-----+-----+
Show Solution
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id

# Define window specification
w = Window.orderBy(monotonically_increasing_id())

# Add index
df = df.withColumn("index", row_number().over(w) - 1)

df.show()

+-------+-----+-----+
| Name|Value|index|
+-------+-----+-----+
| Alice| 1| 0|
| Bob| 2| 1|
|Charlie| 3| 2|
+-------+-----+-----+

3. How to combine many lists to form a PySpark DataFrame?

Difficulty Level: L1

Create a PySpark DataFrame from list1 and list2

Hint: For Creating DataFrame from multiple lists, first create an RDD (Resilient Distributed Dataset) from those lists and then convert the RDD to a DataFrame.

Input:

# Define your lists
list1 = ["a", "b", "c", "d"]
list2 = [1, 2, 3, 4]
Show Solution
# Create an RDD from the lists and convert it to a DataFrame
rdd = spark.sparkContext.parallelize(list(zip(list1, list2)))
df = rdd.toDF(["Column1", "Column2"])

# Show the DataFrame
df.show()


+-------+-------+
|Column1|Column2|
+-------+-------+
| a| 1|
| b| 2|
| c| 3|
| d| 4|
+-------+-------+

4. How to get the items of list A not present in list B?

Difficulty Level: L2

Get the items of list_A not present in list_B in PySpark, you can use the subtract operation on RDDs (Resilient Distributed Datasets).

Input:

list_A = [1, 2, 3, 4, 5]
list_B = [4, 5, 6, 7, 8]

Expected Output:

#> [1, 2, 3]
Show Solution
sc = spark.sparkContext

# Convert lists to RDD
rdd_A = sc.parallelize(list_A)
rdd_B = sc.parallelize(list_B)

# Perform subtract operation
result_rdd = rdd_A.subtract(rdd_B)

# Collect result
result_list = result_rdd.collect()
print(result_list)
 #> [1, 2, 3]

5. How to get the items not common to both list A and list B?

Difficulty Level: L2

Get all items of list_A and list_B not common to both.

Input:

list_A = [1, 2, 3, 4, 5]
list_B = [4, 5, 6, 7, 8]
Show Solution
sc = spark.sparkContext

# Convert lists to RDD
rdd_A = sc.parallelize(list_A)
rdd_B = sc.parallelize(list_B)

# Perform subtract operation
result_rdd_A = rdd_A.subtract(rdd_B)
result_rdd_B = rdd_B.subtract(rdd_A)

# Union the two RDDs
result_rdd = result_rdd_A.union(result_rdd_B)

# Collect result
result_list = result_rdd.collect()

print(result_list)
[1, 2, 3, 8, 6, 7]

6. How to get the minimum, 25th percentile, median, 75th, and max of a numeric column?

Difficulty Level: L2

Compute the minimum, 25th percentile, median, 75th, and maximum of column Age

input

# Create a sample DataFrame
data = [("A", 10), ("B", 20), ("C", 30), ("D", 40), ("E", 50), ("F", 15), ("G", 28), ("H", 54), ("I", 41), ("J", 86)]
df = spark.createDataFrame(data, ["Name", "Age"])

df.show()
+----+---+
|Name|Age|
+----+---+
| A| 10|
| B| 20|
| C| 30|
| D| 40|
| E| 50|
| F| 15|
| G| 28|
| H| 54|
| I| 41|
| J| 86|
+----+---+
Show Solution
# Calculate percentiles
quantiles = df.approxQuantile("Age", [0.0, 0.25, 0.5, 0.75, 1.0], 0.01)

print("Min: ", quantiles[0])
print("25th percentile: ", quantiles[1])
print("Median: ", quantiles[2])
print("75th percentile: ", quantiles[3])
print("Max: ", quantiles[4])
Min: 10.0
25th percentile: 20.0
Median: 30.0
75th percentile: 50.0
Max: 86.0

7. How to get frequency counts of unique items of a column?

Difficulty Level: L1

Calculte the frequency counts of each unique value

Input

from pyspark.sql import Row

# Sample data
data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]

# create DataFrame
df = spark.createDataFrame(data)

# show DataFrame
df.show()
+----+---------+
|name| job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam| Doctor|
+----+---------+
Show Solution
df.groupBy("job").count().show()

+---------+-----+
| job|count|
+---------+-----+
| Engineer| 4|
|Scientist| 2|
| Doctor| 1|
+---------+-----+

8. How to keep only top 2 most frequent values as it is and replace everything else as ‘Other’?

Difficulty Level: L3

Input

from pyspark.sql import Row

# Sample data
data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]

# create DataFrame
df = spark.createDataFrame(data)

# show DataFrame
df.show()
+----+---------+
|name| job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam| Doctor|
+----+---------+
Show Solution
from pyspark.sql.functions import col, when

# Get the top 2 most frequent jobs
top_2_jobs = df.groupBy('job').count().orderBy('count', ascending=False).limit(2).select('job').rdd.flatMap(lambda x: x).collect()

# Replace all but the top 2 most frequent jobs with 'Other'
df = df.withColumn('job', when(col('job').isin(top_2_jobs), col('job')).otherwise('Other'))

# show DataFrame
df.show()

+----+---------+
|name| job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam| Other|
+----+---------+

9. How to Drop rows with NA values specific to a particular column?

Difficulty Level: L1

input

# Assuming df is your DataFrame
df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])

df.show()
+----+-----+----+
|Name|Value| id|
+----+-----+----+
| A| 1|null|
| B| null| 123|
| B| 3| 456|
| D| null|null|
+----+-----+----+
Show Solution
df_2 = df.dropna(subset=['Value'])

df_2.show()

+----+-----+----+
|Name|Value| id|
+----+-----+----+
| A| 1|null|
| B| 3| 456|
+----+-----+----+

10. How to rename columns of a PySpark DataFrame using two lists – one containing the old column names and the other containing the new column names?

Difficulty Level: L2

Input

# suppose you have the following DataFrame
df = spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ["col1", "col2", "col3"])

# old column names
old_names = ["col1", "col2", "col3"]

# new column names
new_names = ["new_col1", "new_col2", "new_col3"]

df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| 2| 3|
| 4| 5| 6|
+----+----+----+
Show Solution
# renaming
for old_name, new_name in zip(old_names, new_names):
df = df.withColumnRenamed(old_name, new_name)

df.show()

+--------+--------+--------+
|new_col1|new_col2|new_col3|
+--------+--------+--------+
| 1| 2| 3|
| 4| 5| 6|
+--------+--------+--------+

11. How to bin a numeric list to 10 groups of equal size?

Difficulty Level: L2

Input

from pyspark.sql.functions import rand
from pyspark.ml.feature import Bucketizer

# Create a DataFrame with a single column "values" filled with random numbers
num_items = 100
df = spark.range(num_items).select(rand(seed=42).alias("values"))

df.show(5)
+-------------------+
| values|
+-------------------+
| 0.619189370225301|
| 0.5096018842446481|
| 0.8325259388871524|
|0.26322809041172357|
| 0.6702867696264135|
+-------------------+
only showing top 5 rows
Show Solution
# Define the bucket boundaries
num_buckets = 10
quantiles = df.stat.approxQuantile("values", [i/num_buckets for i in range(num_buckets+1)], 0.01)

# Create the Bucketizer
bucketizer = Bucketizer(splits=quantiles, inputCol="values", outputCol="buckets")

# Apply the Bucketizer
df_buck = bucketizer.transform(df)

#Frequency table
df_buck.groupBy("buckets").count().show()

# Show the original and bucketed values
df_buck.show(5)

+-------+-----+
|buckets|count|
+-------+-----+
| 8.0| 10|
| 0.0| 8|
| 7.0| 10|
| 1.0| 10|
| 4.0| 10|
| 3.0| 10|
| 2.0| 10|
| 6.0| 10|
| 5.0| 10|
| 9.0| 12|
+-------+-----+

+-------------------+-------+
| values|buckets|
+-------------------+-------+
| 0.619189370225301| 5.0|
| 0.5096018842446481| 3.0|
| 0.8325259388871524| 8.0|
|0.26322809041172357| 2.0|
| 0.6702867696264135| 6.0|
+-------------------+-------+
only showing top 5 rows

12. How to create contigency table?

Difficulty Level: L1

Input

# Example DataFrame
data = [("A", "X"), ("A", "Y"), ("A", "X"), ("B", "Y"), ("B", "X"), ("C", "X"), ("C", "X"), ("C", "Y")]
df = spark.createDataFrame(data, ["category1", "category2"])

df.show()
+---------+---------+
|category1|category2|
+---------+---------+
| A| X|
| A| Y|
| A| X|
| B| Y|
| B| X|
| C| X|
| C| X|
| C| Y|
+---------+---------+
Show Solution
# Frequency
df.cube("category1").count().show()

# Contingency table
df.crosstab('category1', 'category2').show()

+---------+-----+
|category1|count|
+---------+-----+
| null| 8|
| A| 3|
| B| 2|
| C| 3|
+---------+-----+

+-------------------+---+---+
|category1_category2| X| Y|
+-------------------+---+---+
| A| 2| 1|
| B| 1| 1|
| C| 2| 1|
+-------------------+---+---+

13. How to find the numbers that are multiples of 3 from a column?

Difficulty Level: L2

Input

from pyspark.sql.functions import rand

# Generate a DataFrame with a single column "id" with 10 rows
df = spark.range(10)

# Generate a random float between 0 and 1, scale and shift it to get a random integer between 1 and 10
df = df.withColumn("random", ((rand(seed=42) * 10) + 1).cast("int"))

# Show the DataFrame
df.show()
+---+------+
| id|random|
+---+------+
| 0| 7|
| 1| 6|
| 2| 9|
| 3| 7|
| 4| 3|
| 5| 8|
| 6| 9|
| 7| 8|
| 8| 3|
| 9| 8|
+---+------+
Show Solution
from pyspark.sql.functions import col, when

# Assuming df is your DataFrame and "your_column" is the column with the numbers
df = df.withColumn("is_multiple_of_3", when(col("random") % 3 == 0, 1).otherwise(0))

df.show()

+---+------+----------------+
| id|random|is_multiple_of_3|
+---+------+----------------+
| 0| 7| 0|
| 1| 6| 1|
| 2| 9| 1|
| 3| 7| 0|
| 4| 3| 1|
| 5| 8| 0|
| 6| 9| 1|
| 7| 8| 0|
| 8| 3| 1|
| 9| 8| 0|
+---+------+----------------+

14. How to extract items at given positions from a column?

Difficulty Level: L2

Input

from pyspark.sql.functions import rand

# Generate a DataFrame with a single column "id" with 10 rows
df = spark.range(10)

# Generate a random float between 0 and 1, scale and shift it to get a random integer between 1 and 10
df = df.withColumn("random", ((rand(seed=42) * 10) + 1).cast("int"))

# Show the DataFrame
df.show()

pos = [0, 4, 8, 5]
+---+------+
| id|random|
+---+------+
| 0| 7|
| 1| 6|
| 2| 9|
| 3| 7|
| 4| 3|
| 5| 8|
| 6| 9|
| 7| 8|
| 8| 3|
| 9| 8|
+---+------+
Show Solution
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id

pos = [0, 4, 8, 5]

# Define window specification
w = Window.orderBy(monotonically_increasing_id())

# Add index
df = df.withColumn("index", row_number().over(w) - 1)

df.show()

# Filter the DataFrame based on the specified positions
df_filtered = df.filter(df.index.isin(pos))

df_filtered.show()

+---+------+-----+
| id|random|index|
+---+------+-----+
| 0| 7| 0|
| 1| 6| 1|
| 2| 9| 2|
| 3| 7| 3|
| 4| 3| 4|
| 5| 8| 5|
| 6| 9| 6|
| 7| 8| 7|
| 8| 3| 8|
| 9| 8| 9|
+---+------+-----+

+---+------+-----+
| id|random|index|
+---+------+-----+
| 0| 7| 0|
| 4| 3| 4|
| 5| 8| 5|
| 8| 3| 8|
+---+------+-----+

15. How to stack two DataFrames vertically ?

Difficulty Level: L1

Input

# Create DataFrame for region A
df_A = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 10), ("orange", 2, 8)], ["Name", "Col_1", "Col_2"])
df_A.show()

# Create DataFrame for region B
df_B = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 15), ("grape", 4, 6)], ["Name", "Col_1", "Col_3"])
df_B.show()
+------+-----+-----+
| Name|Col_1|Col_2|
+------+-----+-----+
| apple| 3| 5|
|banana| 1| 10|
|orange| 2| 8|
+------+-----+-----+

+------+-----+-----+
| Name|Col_1|Col_3|
+------+-----+-----+
| apple| 3| 5|
|banana| 1| 15|
| grape| 4| 6|
+------+-----+-----+
Show Solution
df_A.union(df_B).show()

+------+-----+-----+
| Name|Col_1|Col_2|
+------+-----+-----+
| apple| 3| 5|
|banana| 1| 10|
|orange| 2| 8|
| apple| 3| 5|
|banana| 1| 15|
| grape| 4| 6|
+------+-----+-----+

16. How to compute the mean squared error on a truth and predicted columns?

Difficulty Level: L2

Input

# Assume you have a DataFrame df with two columns "actual" and "predicted"
# For the sake of example, we'll create a sample DataFrame
data = [(1, 1), (2, 4), (3, 9), (4, 16), (5, 25)]
df = spark.createDataFrame(data, ["actual", "predicted"])

df.show()
+------+---------+
|actual|predicted|
+------+---------+
| 1| 1|
| 2| 4|
| 3| 9|
| 4| 16|
| 5| 25|
+------+---------+
Show Solution
# Calculate the squared differences
df = df.withColumn("squared_error", pow((col("actual") - col("predicted")), 2))

# Calculate the mean squared error
mse = df.agg({"squared_error": "avg"}).collect()[0][0]

print(f"Mean Squared Error (MSE) = {mse}")

Mean Squared Error (MSE) = 116.8

17. How to convert the first character of each element in a series to uppercase?

Difficulty Level: L1

# Suppose you have the following DataFrame
data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])

df.show()
+-----+
| name|
+-----+
| john|
|alice|
| bob|
+-----+
Show Solution
from pyspark.sql.functions import initcap

# Convert the first character to uppercase
df = df.withColumn("name", initcap(df["name"]))

df.show()

+-----+
| name|
+-----+
| John|
|Alice|
| Bob|
+-----+

18. How to compute summary statistics for all columns in a dataframe

Difficulty Level: L1

# For the sake of example, we'll create a sample DataFrame
data = [('James', 34, 55000),
('Michael', 30, 70000),
('Robert', 37, 60000),
('Maria', 29, 80000),
('Jen', 32, 65000)]

df = spark.createDataFrame(data, ["name", "age" , "salary"])

df.show()
+-------+---+------+
| name|age|salary|
+-------+---+------+
| James| 34| 55000|
|Michael| 30| 70000|
| Robert| 37| 60000|
| Maria| 29| 80000|
| Jen| 32| 65000|
+-------+---+------+
Show Solution
# Summary statistics
summary = df.summary()

# Show the summary statistics
summary.show()

+-------+------+------------------+-----------------+
|summary| name| age| salary|
+-------+------+------------------+-----------------+
| count| 5| 5| 5|
| mean| null| 32.4| 66000.0|
| stddev| null|3.2093613071762417|9617.692030835673|
| min| James| 29| 55000|
| 25%| null| 30| 60000|
| 50%| null| 32| 65000|
| 75%| null| 34| 70000|
| max|Robert| 37| 80000|
+-------+------+------------------+-----------------+

19. How to calculate the number of characters in each word in a column?

Difficulty Level: L1

# Suppose you have the following DataFrame
data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])

df.show()
+-----+
| name|
+-----+
| john|
|alice|
| bob|
+-----+
Show Solution
from pyspark.sql import functions as F

df = df.withColumn('word_length', F.length(df.name))
df.show()

+-----+-----------+
| name|word_length|
+-----+-----------+
| john| 4|
|alice| 5|
| bob| 3|
+-----+-----------+

20 How to compute difference of differences between consecutive numbers of a column?

Difficulty Level: L2

# For the sake of example, we'll create a sample DataFrame
data = [('James', 34, 55000),
('Michael', 30, 70000),
('Robert', 37, 60000),
('Maria', 29, 80000),
('Jen', 32, 65000)]

df = spark.createDataFrame(data, ["name", "age" , "salary"])

df.show()
+-------+---+------+
| name|age|salary|
+-------+---+------+
| James| 34| 55000|
|Michael| 30| 70000|
| Robert| 37| 60000|
| Maria| 29| 80000|
| Jen| 32| 65000|
+-------+---+------+
Show Solution
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Define window specification
df = df.withColumn("id", F.monotonically_increasing_id())
window = Window.orderBy("id")

# Generate the lag of the variable
df = df.withColumn("prev_value", F.lag(df.salary).over(window))

# Compute the difference with lag
df = df.withColumn("diff", F.when(F.isnull(df.salary - df.prev_value), 0)
.otherwise(df.salary - df.prev_value)).drop("id")

df.show()

+-------+---+------+----------+------+
| name|age|salary|prev_value| diff|
+-------+---+------+----------+------+
| James| 34| 55000| null| 0|
|Michael| 30| 70000| 55000| 15000|
| Robert| 37| 60000| 70000|-10000|
| Maria| 29| 80000| 60000| 20000|
| Jen| 32| 65000| 80000|-15000|
+-------+---+------+----------+------+

21. How to get the day of month, week number, day of year and day of week from a date strings?

Difficulty Level: L2

# example data
data = [("2023-05-18","01 Jan 2010",), ("2023-12-31", "01 Jan 2010",)]
df = spark.createDataFrame(data, ["date_str_1", "date_str_2"])

df.show()
+----------+-----------+
|date_str_1| date_str_2|
+----------+-----------+
|2023-05-18|01 Jan 2010|
|2023-12-31|01 Jan 2010|
+----------+-----------+
Show Solution
from pyspark.sql.functions import to_date, dayofmonth, weekofyear, dayofyear, dayofweek

# Convert date string to date format
df = df.withColumn("date_1", to_date(df.date_str_1, 'yyyy-MM-dd'))
df = df.withColumn("date_2", to_date(df.date_str_2, 'dd MMM yyyy'))

df = df.withColumn("day_of_month", dayofmonth(df.date_1))\
.withColumn("week_number", weekofyear(df.date_1))\
.withColumn("day_of_year", dayofyear(df.date_1))\
.withColumn("day_of_week", dayofweek(df.date_1))

df.show()

+----------+-----------+----------+----------+------------+-----------+-----------+-----------+
|date_str_1| date_str_2| date_1| date_2|day_of_month|week_number|day_of_year|day_of_week|
+----------+-----------+----------+----------+------------+-----------+-----------+-----------+
|2023-05-18|01 Jan 2010|2023-05-18|2010-01-01| 18| 20| 138| 5|
|2023-12-31|01 Jan 2010|2023-12-31|2010-01-01| 31| 52| 365| 1|
+----------+-----------+----------+----------+------------+-----------+-----------+-----------+

22. How to convert year-month string to dates corresponding to the 4th day of the month?

Difficulty Level: L2

# example dataframe
df = spark.createDataFrame([('Jan 2010',), ('Feb 2011',), ('Mar 2012',)], ['MonthYear'])

df.show()
+---------+
|MonthYear|
+---------+
| Jan 2010|
| Feb 2011|
| Mar 2012|
+---------+
Show Solution
from pyspark.sql.functions import expr, col

# convert YearMonth to date (default to first day of the month)
df = df.withColumn('Date', expr("to_date(MonthYear, 'MMM yyyy')"))

df.show()

# replace day with 4
df = df.withColumn('Date', expr("date_add(date_sub(Date, day(Date) - 1), 3)"))

df.show()

+---------+----------+
|MonthYear| Date|
+---------+----------+
| Jan 2010|2010-01-01|
| Feb 2011|2011-02-01|
| Mar 2012|2012-03-01|
+---------+----------+

+---------+----------+
|MonthYear| Date|
+---------+----------+
| Jan 2010|2010-01-04|
| Feb 2011|2011-02-04|
| Mar 2012|2012-03-04|
+---------+----------+

23 How to filter words that contain atleast 2 vowels from a series?

Difficulty Level: L3

# example dataframe
df = spark.createDataFrame([('Apple',), ('Orange',), ('Plan',) , ('Python',) , ('Money',)], ['Word'])

df.show()
+------+
| Word|
+------+
| Apple|
|Orange|
| Plan|
|Python|
| Money|
+------+
Show Solution
from pyspark.sql.functions import col, length, translate

# Filter words that contain at least 2 vowels
df_filtered = df.where((length(col('Word')) - length(translate(col('Word'), 'AEIOUaeiou', ''))) >= 2)
df_filtered.show()

+------+
| Word|
+------+
| Apple|
|Orange|
| Money|
+------+

24. How to filter valid emails from a list?

Difficulty Level: L3

# Create a list
data = ['buying books at amazom.com', '[email protected]', '[email protected]', '[email protected]']

# Convert the list to DataFrame
df = spark.createDataFrame(data, "string")
df.show(truncate =False)
+--------------------------+
|value |
+--------------------------+
|buying books at amazom.com|
|[email protected] |
|[email protected] |
|[email protected] |
+--------------------------+
Show Solution
# Define a regular expression pattern for emails
pattern = "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"

# Apply filter operation to keep only valid emails
df_filtered = df.filter(F.col("value").rlike(pattern))

# Show the DataFrame
df_filtered.show()

+-----------------+
| value|
+-----------------+
|[email protected]|
| [email protected]|
|[email protected]|
+-----------------+

25. How to Pivot PySpark DataFrame?

Convert region categories to Columns and sum the revenue

Difficulty Level: L3

# Sample data
data = [
(2021, 1, "US", 5000),
(2021, 1, "EU", 4000),
(2021, 2, "US", 5500),
(2021, 2, "EU", 4500),
(2021, 3, "US", 6000),
(2021, 3, "EU", 5000),
(2021, 4, "US", 7000),
(2021, 4, "EU", 6000),
]

# Create DataFrame
columns = ["year", "quarter", "region", "revenue"]
df = spark.createDataFrame(data, columns)
df.show()
+----+-------+------+-------+
|year|quarter|region|revenue|
+----+-------+------+-------+
|2021| 1| US| 5000|
|2021| 1| EU| 4000|
|2021| 2| US| 5500|
|2021| 2| EU| 4500|
|2021| 3| US| 6000|
|2021| 3| EU| 5000|
|2021| 4| US| 7000|
|2021| 4| EU| 6000|
+----+-------+------+-------+
Show Solution
# Execute the pivot operation
pivot_df = df.groupBy("year", "quarter").pivot("region").sum("revenue")

pivot_df.show()

+----+-------+----+----+
|year|quarter| EU| US|
+----+-------+----+----+
|2021| 2|4500|5500|
|2021| 1|4000|5000|
|2021| 3|5000|6000|
|2021| 4|6000|7000|
+----+-------+----+----+

26. How to get the mean of a variable grouped by another variable?

Difficulty Level: L3

# Sample data
data = [("1001", "Laptop", 1000),
("1002", "Mouse", 50),
("1003", "Laptop", 1200),
("1004", "Mouse", 30),
("1005", "Smartphone", 700)]

# Create DataFrame
columns = ["OrderID", "Product", "Price"]
df = spark.createDataFrame(data, columns)

df.show()
+-------+----------+-----+
|OrderID| Product|Price|
+-------+----------+-----+
| 1001| Laptop| 1000|
| 1002| Mouse| 50|
| 1003| Laptop| 1200|
| 1004| Mouse| 30|
| 1005|Smartphone| 700|
+-------+----------+-----+
Show Solution
from pyspark.sql.functions import mean

# GroupBy and aggregate
result = df.groupBy("Product").agg(mean("Price").alias("Total_Sales"))

# Show results
result.show()

+----------+-----------+
| Product|Total_Sales|
+----------+-----------+
| Laptop| 1100.0|
| Mouse| 40.0|
|Smartphone| 700.0|
+----------+-----------+

27. How to compute the euclidean distance between two columns?

Difficulty Level: L3

Compute the euclidean distance between series (points) p and q, without using a packaged formula.

# Define your series
data = [(1, 10), (2, 9), (3, 8), (4, 7), (5, 6), (6, 5), (7, 4), (8, 3), (9, 2), (10, 1)]

# Convert list to DataFrame
df = spark.createDataFrame(data, ["series1", "series2"])

df.show()
+-------+-------+
|series1|series2|
+-------+-------+
| 1| 10|
| 2| 9|
| 3| 8|
| 4| 7|
| 5| 6|
| 6| 5|
| 7| 4|
| 8| 3|
| 9| 2|
| 10| 1|
+-------+-------+
Show Solution
from pyspark.sql.functions import expr
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# Convert series to vectors
vecAssembler = VectorAssembler(inputCols=["series1", "series2"], outputCol="vectors")
df = vecAssembler.transform(df)

# Calculate squared differences
df = df.withColumn("squared_diff", expr("POW(series1 - series2, 2)"))

# Sum squared differences and take square root
df.agg(expr("SQRT(SUM(squared_diff))").alias("euclidean_distance")).show()

+------------------+
|euclidean_distance|
+------------------+
| 18.16590212458495|
+------------------+

28. How to replace missing spaces in a string with the least frequent character?

Difficulty Level: L3

Replace the spaces in my_str with the least frequent characte

#Sample DataFrame
df = spark.createDataFrame([('dbc deb abed gade',),], ["string"])
df.show()
+-----------------+
| string|
+-----------------+
|dbc deb abed gade|
+-----------------+

Desired output

+-----------------+-----------------+
| string| modified_string|
+-----------------+-----------------+
|dbc deb abed gade|dbccdebcabedcgade|
+-----------------+-----------------+
Show Solution
from pyspark.sql.functions import udf, explode
from pyspark.sql.types import StringType, ArrayType
from collections import Counter

def least_freq_char_replace_spaces(s):
counter = Counter(s.replace(" ", ""))
least_freq_char = min(counter, key = counter.get)
return s.replace(' ', least_freq_char)

udf_least_freq_char_replace_spaces = udf(least_freq_char_replace_spaces, StringType())

df = spark.createDataFrame([('dbc deb abed gade',)], ["string"])
df.withColumn('modified_string', udf_least_freq_char_replace_spaces(df['string'])).show()

+-----------------+-----------------+
| string| modified_string|
+-----------------+-----------------+
|dbc deb abed gade|dbccdebcabedcgade|
+-----------------+-----------------+

29. How to create a TimeSeries starting ‘2000-01-01’ and 10 weekends (saturdays) after that having random numbers as values?

Difficulty Level: L3

Desired output

values can be random

+----------+--------------+
| date|random_numbers|
+----------+--------------+
|2000-01-01| 8|
|2000-01-08| 3|
|2000-01-15| 8|
|2000-01-22| 5|
|2000-01-29| 4|
|2000-02-05| 6|
|2000-02-12| 8|
|2000-02-19| 1|
|2000-02-26| 9|
|2000-03-04| 3|
+----------+--------------+
Show Solution
from pyspark.sql.functions import expr, explode, sequence, rand

# Start date and end date (start + 10 weekends)
start_date = '2000-01-01'
end_date = '2000-03-04' # Calculated manually: 10 weekends (Saturdays) from start date

# Create a DataFrame with one row containing a sequence from start_date to end_date with a 1 day step
df = spark.range(1).select(
explode(
sequence(
expr(f"date '{start_date}'"),
expr(f"date '{end_date}'"),
expr("interval 1 day")
)
).alias("date")
)

# Filter out the weekdays (retain weekends)
df = df.filter(expr("dayofweek(date) = 7")) # 7 corresponds to Saturday in Spark

# Add the random numbers column
#df = df.withColumn("random_numbers", rand()*10)
df = df.withColumn("random_numbers", ((rand(seed=42) * 10) + 1).cast("int"))

# Show the DataFrame
df.show()

+----------+--------------+
| date|random_numbers|
+----------+--------------+
|2000-01-01| 8|
|2000-01-08| 3|
|2000-01-15| 8|
|2000-01-22| 5|
|2000-01-29| 4|
|2000-02-05| 6|
|2000-02-12| 8|
|2000-02-19| 1|
|2000-02-26| 9|
|2000-03-04| 3|
+----------+--------------+

30. How to get the nrows, ncolumns, datatype of a dataframe?

Difficiulty Level: L1

Get the number of rows, columns, datatype and summary statistics of each column of the Churn_Modelling dataset. Also get the numpy array and list equivalent of the dataframe

url = "https://raw.githubusercontent.com/selva86/datasets/master/Churn_Modelling.csv"

spark.sparkContext.addFile(url)

df = spark.read.csv(SparkFiles.get("Churn_Modelling.csv"), header=True, inferSchema=True)

#df = spark.read.csv("C:/Users/RajeshVaddi/Documents/MLPlus/DataSets/Churn_Modelling.csv", header=True, inferSchema=True)

df.show(5, truncate=False)
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|Surname |CreditScore|Geography|Gender|Age|Tenure|Balance |NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|1 |15634602 |Hargrave|619 |France |Female|42 |2 |0.0 |1 |1 |1 |101348.88 |1 |
|2 |15647311 |Hill |608 |Spain |Female|41 |1 |83807.86 |1 |0 |1 |112542.58 |0 |
|3 |15619304 |Onio |502 |France |Female|42 |8 |159660.8 |3 |1 |0 |113931.57 |1 |
|4 |15701354 |Boni |699 |France |Female|39 |1 |0.0 |2 |0 |0 |93826.63 |0 |
|5 |15737888 |Mitchell|850 |Spain |Female|43 |2 |125510.82|1 |1 |1 |79084.1 |0 |
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
only showing top 5 rows
Show Solution
# For number of rows
nrows = df.count()
print("Number of Rows: ", nrows)

# For number of columns
ncols = len(df.columns)
print("Number of Columns: ", ncols)

# For data types of each column
datatypes = df.dtypes
print("Data types: ", datatypes)

Number of Rows: 10000
Number of Columns: 14
Data types: [('RowNumber', 'int'), ('CustomerId', 'int'), ('Surname', 'string'), ('CreditScore', 'int'), ('Geography', 'string'), ('Gender', 'string'), ('Age', 'int'), ('Tenure', 'int'), ('Balance', 'double'), ('NumOfProducts', 'int'), ('HasCrCard', 'int'), ('IsActiveMember', 'int'), ('EstimatedSalary', 'double'), ('Exited', 'int')]

31. How to rename a specific columns in a dataframe?

Difficiulty Level: L2

Input

# Suppose you have the following DataFrame
df = spark.createDataFrame([('Alice', 1, 30),('Bob', 2, 35)], ["name", "age", "qty"])

df.show()

# Rename lists for specific columns
old_names = ["qty", "age"]
new_names = ["user_qty", "user_age"]
+-----+---+---+
| name|age|qty|
+-----+---+---+
|Alice| 1| 30|
| Bob| 2| 35|
+-----+---+---+
Show Solution
old_names = ["qty", "age"]
new_names = ["user_qty", "user_age"]

# You can then rename the columns like this:
for old_name, new_name in zip(old_names, new_names):
df = df.withColumnRenamed(old_name, new_name)

df.show()

+-----+--------+--------+
| name|user_age|user_qty|
+-----+--------+--------+
|Alice| 1| 30|
| Bob| 2| 35|
+-----+--------+--------+

32. How to check if a dataframe has any missing values and count of missing values in each column?

Difficulty Level: L2

Input

# Assuming df is your DataFrame
df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])

df.show()
+----+-----+----+
|Name|Value| id|
+----+-----+----+
| A| 1|null|
| B| null| 123|
| B| 3| 456|
| D| null|null|
+----+-----+----+
Show Solution
from pyspark.sql.functions import col, sum

missing = df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns))
has_missing = any(row.asDict().values() for row in missing.collect())
print(has_missing)

missing_count = missing.collect()[0].asDict()
print(missing_count)

True
{'Name': 0, 'Value': 2, 'id': 2}

33 How to replace missing values of multiple numeric columns with the mean?

Difficulty Level: L2

Input

df = spark.createDataFrame([
("A", 1, None),
("B", None, 123 ),
("B", 3, 456),
("D", 6, None),
], ["Name", "var1", "var2"])

df.show()
+----+----+----+
|Name|var1|var2|
+----+----+----+
| A| 1|null|
| B|null| 123|
| B| 3| 456|
| D| 6|null|
+----+----+----+
Show Solution
from pyspark.ml.feature import Imputer

column_names = ["var1", "var2"]

# Initialize the Imputer
imputer = Imputer(inputCols= column_names, outputCols= column_names, strategy="mean")

# Fit the Imputer
model = imputer.fit(df)

#Transform the dataset
imputed_df = model.transform(df)

imputed_df.show(5)

+----+----+----+
|Name|var1|var2|
+----+----+----+
| A| 1| 289|
| B| 3| 123|
| B| 3| 456|
| D| 6| 289|
+----+----+----+

34. How to change the order of columns of a dataframe?

Difficulty Level: L1

Input

# Sample data
data = [("John", "Doe", 30), ("Jane", "Doe", 25), ("Alice", "Smith", 22)]

# Create DataFrame from the data
df = spark.createDataFrame(data, ["First_Name", "Last_Name", "Age"])

# Show the DataFrame
df.show()
+----------+---------+---+
|First_Name|Last_Name|Age|
+----------+---------+---+
| John| Doe| 30|
| Jane| Doe| 25|
| Alice| Smith| 22|
+----------+---------+---+
Show Solution
new_order = ["Age", "First_Name", "Last_Name"]

# Reorder the columns
df = df.select(*new_order)

# Show the DataFrame with reordered columns
df.show()

+---+----------+---------+
|Age|First_Name|Last_Name|
+---+----------+---------+
| 30| John| Doe|
| 25| Jane| Doe|
| 22| Alice| Smith|
+---+----------+---------+

35. How to format or suppress scientific notations in a PySpark DataFrame?

# Assuming you have a DataFrame df and the column you want to format is 'your_column'
df = spark.createDataFrame([(1, 0.000000123), (2, 0.000023456), (3, 0.000345678)], ["id", "your_column"])

df.show()
+---+-----------+
| id|your_column|
+---+-----------+
| 1| 1.23E-7|
| 2| 2.3456E-5|
| 3| 3.45678E-4|
+---+-----------+
Show Solution
from pyspark.sql.functions import format_number

# Determine the number of decimal places you want
decimal_places = 10

df = df.withColumn("your_column", format_number("your_column", decimal_places))
df.show()

+---+------------+
| id| your_column|
+---+------------+
| 1|0.0000001230|
| 2|0.0000234560|
| 3|0.0003456780|
+---+------------+

36. How to format all the values in a dataframe as percentages?

Difficulty Level: L2

Input

# Sample data
data = [(0.1, .08), (0.2, .06), (0.33, .02)]
df = spark.createDataFrame(data, ["numbers_1", "numbers_2"])

df.show()
+---------+---------+
|numbers_1|numbers_2|
+---------+---------+
| 0.1| 0.08|
| 0.2| 0.06|
| 0.33| 0.02|
+---------+---------+
Show Solution
from pyspark.sql.functions import concat, col, lit

columns = ["numbers_1", "numbers_2"]

for col_name in columns:
df = df.withColumn(col_name, concat((col(col_name) * 100).cast('decimal(10, 2)'), lit("%")))

df.show()

+---------+---------+
|numbers_1|numbers_2|
+---------+---------+
| 10.00%| 8.00%|
| 20.00%| 6.00%|
| 33.00%| 2.00%|
+---------+---------+

37. How to filter every nth row in a dataframe?

Difficulty Level: L2

Input

# Sample data
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Dave", 4), ("Eve", 5),
("Frank", 6), ("Grace", 7), ("Hannah", 8), ("Igor", 9), ("Jack", 10)]

# Create DataFrame
df = spark.createDataFrame(data, ["Name", "Number"])

df.show()
+-------+------+
| Name|Number|
+-------+------+
| Alice| 1|
| Bob| 2|
|Charlie| 3|
| Dave| 4|
| Eve| 5|
| Frank| 6|
| Grace| 7|
| Hannah| 8|
| Igor| 9|
| Jack| 10|
+-------+------+
Show Solution
# Define window
window = Window.orderBy(monotonically_increasing_id())

# Add row_number to DataFrame
df = df.withColumn("rn", row_number().over(window))

n = 5 # filter every 5th row

# Filter every nth row
df = df.filter((df.rn % n) == 0)

df.show()

+----+------+---+
|Name|Number| rn|
+----+------+---+
| Eve| 5| 5|
|Jack| 10| 10|
+----+------+---+

38 How to get the row number of the nth largest value in a column?

Difficulty Level: L2

Input

from pyspark.sql import Row

# Sample Data
data = [
Row(id=1, column1=5),
Row(id=2, column1=8),
Row(id=3, column1=12),
Row(id=4, column1=1),
Row(id=5, column1=15),
Row(id=6, column1=7),
]

df = spark.createDataFrame(data)
df.show()
+---+-------+
| id|column1|
+---+-------+
| 1| 5|
| 2| 8|
| 3| 12|
| 4| 1|
| 5| 15|
| 6| 7|
+---+-------+
Show Solution
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, row_number

window = Window.orderBy(desc("column1"))
df = df.withColumn("row_number", row_number().over(window))

n = 3 # We're interested in the 3rd largest value.
row = df.filter(df.row_number == n).first()

if row:
print("Row number:", row.row_number)
print("Column value:", row.column1)

Row number: 3
Column value: 8

39. How to get the last n rows of a dataframe with row sum > 100?

Difficulty Level: L2

Input

# Sample data
data = [(10, 25, 70),
(40, 5, 20),
(70, 80, 100),
(10, 2, 60),
(40, 50, 20)]

# Create DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])

# Display original DataFrame
df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 10| 25| 70|
| 40| 5| 20|
| 70| 80| 100|
| 10| 2| 60|
| 40| 50| 20|
+----+----+----+
Show Solution
from pyspark.sql import functions as F
from functools import reduce

# Add 'row_sum' column
df = df.withColumn('row_sum', reduce(lambda a, b: a+b, [F.col(x) for x in df.columns]))

# Display DataFrame with 'row_sum'
df.show()

# Filter rows where 'row_sum' > 100
df = df.filter(F.col('row_sum') > 100)

# Display filtered DataFrame
df.show()

# Add 'id' column
df = df.withColumn('id', F.monotonically_increasing_id())

# Get the last 2 rows
df_last_2 = df.sort(F.desc('id')).limit(2)

# Display the last 2 rows
df_last_2.show()

+----+----+----+-------+
|col1|col2|col3|row_sum|
+----+----+----+-------+
| 10| 25| 70| 105|
| 40| 5| 20| 65|
| 70| 80| 100| 250|
| 10| 2| 60| 72|
| 40| 50| 20| 110|
+----+----+----+-------+

+----+----+----+-------+
|col1|col2|col3|row_sum|
+----+----+----+-------+
| 10| 25| 70| 105|
| 70| 80| 100| 250|
| 40| 50| 20| 110|
+----+----+----+-------+

+----+----+----+-------+-----------+
|col1|col2|col3|row_sum| id|
+----+----+----+-------+-----------+
| 40| 50| 20| 110|25769803776|
| 70| 80| 100| 250|17179869184|
+----+----+----+-------+-----------+

40. How to create a column containing the minimum by maximum of each row?

Difficulty Level: L2

Input

# Sample Data
data = [(1, 2, 3), (4, 5, 6), (7, 8, 9), (10, 11, 12)]

# Create DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])

df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| 2| 3|
| 4| 5| 6|
| 7| 8| 9|
| 10| 11| 12|
+----+----+----+
Show Solution
from pyspark.sql.functions import udf, array
from pyspark.sql.types import FloatType

# Define UDF
def min_max_ratio(row):
return float(min(row)) / max(row)

min_max_ratio_udf = udf(min_max_ratio, FloatType())

# Apply UDF to create new column
df = df.withColumn('min_by_max', min_max_ratio_udf(array(df.columns)))

df.show()

+----+----+----+----------+
|col1|col2|col3|min_by_max|
+----+----+----+----------+
| 1| 2| 3|0.33333334|
| 4| 5| 6| 0.6666667|
| 7| 8| 9| 0.7777778|
| 10| 11| 12| 0.8333333|
+----+----+----+----------+

41. How to create a column that contains the penultimate value in each row?

Difficulty Level: L2

Create a new column ‘penultimate’ which has the second largest value of each row of df

Input

data = [(10, 20, 30),
(40, 60, 50),
(80, 70, 90)]

df = spark.createDataFrame(data, ["Column1", "Column2", "Column3"])

df.show()
+-------+-------+-------+
|Column1|Column2|Column3|
+-------+-------+-------+
| 10| 20| 30|
| 40| 60| 50|
| 80| 70| 90|
+-------+-------+-------+
Show Solution
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, IntegerType

# Define UDF to sort array in descending order
sort_array_desc = F.udf(lambda arr: sorted(arr), ArrayType(IntegerType()))

# Create array from columns, sort in descending order and get the penultimate value
df = df.withColumn("row_as_array", sort_array_desc(F.array(df.columns)))
df = df.withColumn("Penultimate", df['row_as_array'].getItem(1))
df = df.drop('row_as_array')

df.show()

+-------+-------+-------+-----------+
|Column1|Column2|Column3|Penultimate|
+-------+-------+-------+-----------+
| 10| 20| 30| 20|
| 40| 60| 50| 50|
| 80| 70| 90| 80|
+-------+-------+-------+-----------+

42. How to normalize all columns in a dataframe?

Difficulty Level: L2

Normalize all columns of df by subtracting the column mean and divide by standard deviation.

Range all columns of df such that the minimum value in each column is 0 and max is 1.

Input

# create a sample dataframe
data = [(1, 2, 3),
(2, 3, 4),
(3, 4, 5),
(4, 5, 6)]

df = spark.createDataFrame(data, ["Col1", "Col2", "Col3"])

df.show()
+----+----+----+
|Col1|Col2|Col3|
+----+----+----+
| 1| 2| 3|
| 2| 3| 4|
| 3| 4| 5|
| 4| 5| 6|
+----+----+----+
Show Solution
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import col

# define the list of columns to be normalized
input_cols = ["Col1", "Col2", "Col3"]

# initialize VectorAssembler with input and output column names
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")

# transform the data
df_assembled = assembler.transform(df)

# initialize StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)

# fit and transform the data
scalerModel = scaler.fit(df_assembled)
df_normalized = scalerModel.transform(df_assembled)

# if you want to drop the original 'features' column
df_normalized = df_normalized.drop('features')

df_normalized.show(truncate=False)

+----+----+----+-------------------------------------------------------------+
|Col1|Col2|Col3|scaled_features |
+----+----+----+-------------------------------------------------------------+
|1 |2 |3 |[-1.161895003862225,-1.161895003862225,-1.161895003862225] |
|2 |3 |4 |[-0.3872983346207417,-0.3872983346207417,-0.3872983346207417]|
|3 |4 |5 |[0.3872983346207417,0.3872983346207417,0.3872983346207417] |
|4 |5 |6 |[1.161895003862225,1.161895003862225,1.161895003862225] |
+----+----+----+-------------------------------------------------------------+

43. How to get the positions where values of two columns match?

Difficulty Level: L1

Input

# Create sample DataFrame
data = [("John", "John"), ("Lily", "Lucy"), ("Sam", "Sam"), ("Lucy", "Lily")]
df = spark.createDataFrame(data, ["Name1", "Name2"])

df.show()
+-----+-----+
|Name1|Name2|
+-----+-----+
| John| John|
| Lily| Lucy|
| Sam| Sam|
| Lucy| Lily|
+-----+-----+
Show Solution
from pyspark.sql.functions import when
from pyspark.sql.functions import col

# Add new column Match to indicate if Name1 and Name2 match
df = df.withColumn("Match", when(col("Name1") == col("Name2"), True).otherwise(False))

# Display DataFrame
df.show()

+-----+-----+-----+
|Name1|Name2|Match|
+-----+-----+-----+
| John| John| true|
| Lily| Lucy|false|
| Sam| Sam| true|
| Lucy| Lily|false|
+-----+-----+-----+

44. How to create lags and leads of a column by group in a dataframe?

Difficulty Level: L2

Input

# Create a sample DataFrame
data = [("2023-01-01", "Store1", 100),
("2023-01-02", "Store1", 150),
("2023-01-03", "Store1", 200),
("2023-01-04", "Store1", 250),
("2023-01-05", "Store1", 300),
("2023-01-01", "Store2", 50),
("2023-01-02", "Store2", 60),
("2023-01-03", "Store2", 80),
("2023-01-04", "Store2", 90),
("2023-01-05", "Store2", 120)]

df = spark.createDataFrame(data, ["Date", "Store", "Sales"])

df.show()
+----------+------+-----+
| Date| Store|Sales|
+----------+------+-----+
|2023-01-01|Store1| 100|
|2023-01-02|Store1| 150|
|2023-01-03|Store1| 200|
|2023-01-04|Store1| 250|
|2023-01-05|Store1| 300|
|2023-01-01|Store2| 50|
|2023-01-02|Store2| 60|
|2023-01-03|Store2| 80|
|2023-01-04|Store2| 90|
|2023-01-05|Store2| 120|
+----------+------+-----+
Show Solution
from pyspark.sql.functions import lag, lead, to_date
from pyspark.sql.window import Window

# Convert the date from string to date type
df = df.withColumn("Date", to_date(df.Date, 'yyyy-MM-dd'))

# Create a Window partitioned by Store, ordered by Date
windowSpec = Window.partitionBy("Store").orderBy("Date")

# Create lag and lead variables
df = df.withColumn("Lag_Sales", lag(df["Sales"]).over(windowSpec))
df = df.withColumn("Lead_Sales", lead(df["Sales"]).over(windowSpec))

df.show()

+----------+------+-----+---------+----------+
| Date| Store|Sales|Lag_Sales|Lead_Sales|
+----------+------+-----+---------+----------+
|2023-01-01|Store1| 100| null| 150|
|2023-01-02|Store1| 150| 100| 200|
|2023-01-03|Store1| 200| 150| 250|
|2023-01-04|Store1| 250| 200| 300|
|2023-01-05|Store1| 300| 250| null|
|2023-01-01|Store2| 50| null| 60|
|2023-01-02|Store2| 60| 50| 80|
|2023-01-03|Store2| 80| 60| 90|
|2023-01-04|Store2| 90| 80| 120|
|2023-01-05|Store2| 120| 90| null|
+----------+------+-----+---------+----------+

45. How to get the frequency of unique values in the entire dataframe?

Difficulty Level: L3

Get the frequency of unique values in the entire dataframe df.

Input

# Create a numeric DataFrame
data = [(1, 2, 3),
(2, 3, 4),
(1, 2, 3),
(4, 5, 6),
(2, 3, 4)]
df = spark.createDataFrame(data, ["Column1", "Column2", "Column3"])

# Print DataFrame
df.show()
+-------+-------+-------+
|Column1|Column2|Column3|
+-------+-------+-------+
| 1| 2| 3|
| 2| 3| 4|
| 1| 2| 3|
| 4| 5| 6|
| 2| 3| 4|
+-------+-------+-------+
Show Solution
from pyspark.sql.functions import col

# get column names
columns = df.columns

# stack all columns into a single column
df_single = None

for c in columns:
if df_single is None:
df_single = df.select(col(c).alias("single_column"))
else:
df_single = df_single.union(df.select(col(c).alias("single_column")))

# generate frequency table
frequency_table = df_single.groupBy("single_column").count().orderBy('count', ascending=False)

# show frequency table
frequency_table.show()

+-------------+-----+
|single_column|count|
+-------------+-----+
| 3| 4|
| 2| 4|
| 4| 3|
| 1| 2|
| 5| 1|
| 6| 1|
+-------------+-----+

46. How to replace both the diagonals of dataframe with 0?

Difficulty Level: L3

Replace both values in both diagonals of df with 0.

Input

# Create a numeric DataFrame
data = [(1, 2, 3, 4),
(2, 3, 4, 5),
(1, 2, 3, 4),
(4, 5, 6, 7)]

df = spark.createDataFrame(data, ["col_1", "col_2", "col_3", "col_4"])

# Print DataFrame
df.show()
+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
| 1| 2| 3| 4|
| 2| 3| 4| 5|
| 1| 2| 3| 4|
| 4| 5| 6| 7|
+-----+-----+-----+-----+
Show Solution
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql.functions import when, col

# Define window specification
w = Window.orderBy(monotonically_increasing_id())

# Add index
df = df.withColumn("id", row_number().over(w) - 1)

df = df.select([when(col("id") == i, 0).otherwise(col("col_"+str(i+1))).alias("col_"+str(i+1)) for i in range(4)])

# Create a reverse id column
df = df.withColumn("id", row_number().over(w) - 1)
df = df.withColumn("id_2", df.count() - 1 - df["id"])

df_with_diag_zero = df.select([when(col("id_2") == i, 0).otherwise(col("col_"+str(i+1))).alias("col_"+str(i+1)) for i in range(4)])

df_with_diag_zero.show()

+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
| 0| 2| 3| 0|
| 2| 0| 0| 5|
| 1| 0| 0| 4|
| 0| 5| 6| 0|
+-----+-----+-----+-----+

47. How to reverse the rows of a dataframe?

Difficulty Level: L2

Reverse all the rows of dataframe df.

Input

# Create a numeric DataFrame
data = [(1, 2, 3, 4),
(2, 3, 4, 5),
(3, 4, 5, 6),
(4, 5, 6, 7)]

df = spark.createDataFrame(data, ["col_1", "col_2", "col_3", "col_4"])

# Print DataFrame
df.show()
+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
| 1| 2| 3| 4|
| 2| 3| 4| 5|
| 3| 4| 5| 6|
| 4| 5| 6| 7|
+-----+-----+-----+-----+
Show Solution
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id

# Define window specification
w = Window.orderBy(monotonically_increasing_id())

# Add index
df = df.withColumn("id", row_number().over(w) - 1)

df_2 = df.orderBy("id", ascending=False).drop("id")

df_2.show()

+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
| 4| 5| 6| 7|
| 3| 4| 5| 6|
| 2| 3| 4| 5|
| 1| 2| 3| 4|
+-----+-----+-----+-----+

48. How to create one-hot encodings of a categorical variable (dummy variables)?

Difficulty Level: L2

Get one-hot encodings for column Categories in the dataframe df and append it as columns.

Input

data = [("A", 10),("A", 20),("B", 30),("B", 20),("B", 30),("C", 40),("C", 10),("D", 10)]
columns = ["Categories", "Value"]

df = spark.createDataFrame(data, columns)
df.show()
+----------+-----+
|Categories|Value|
+----------+-----+
| A| 10|
| A| 20|
| B| 30|
| B| 20|
| B| 30|
| C| 40|
| C| 10|
| D| 10|
+----------+-----+
Show Solution
from pyspark.ml.feature import StringIndexer, OneHotEncoder
#from pyspark.sql.types import StringType, StructType, StructField

# StringIndexer Initialization
indexer = StringIndexer(inputCol="Categories", outputCol="Categories_Indexed")
indexerModel = indexer.fit(df)

# Transform the DataFrame using the fitted StringIndexer model
indexed_df = indexerModel.transform(df)
#indexed_df.show()

encoder = OneHotEncoder(inputCol="Categories_Indexed", outputCol="Categories_onehot")
encoded_df = encoder.fit(indexed_df).transform(indexed_df)
encoded_df = encoded_df.drop("Categories_Indexed")
encoded_df.show(truncate=False)

+----------+-----+-----------------+
|Categories|Value|Categories_onehot|
+----------+-----+-----------------+
|A |10 |(3,[1],[1.0]) |
|A |20 |(3,[1],[1.0]) |
|B |30 |(3,[0],[1.0]) |
|B |20 |(3,[0],[1.0]) |
|B |30 |(3,[0],[1.0]) |
|C |40 |(3,[2],[1.0]) |
|C |10 |(3,[2],[1.0]) |
|D |10 |(3,[],[]) |
+----------+-----+-----------------+

49. How to Pivot the dataframe (converting rows into columns) ?

Difficulty Level: L2

convert region column categories to Column

Input

# Sample data
data = [
(2021, 1, "US", 5000),
(2021, 1, "EU", 4000),
(2021, 2, "US", 5500),
(2021, 2, "EU", 4500),
(2021, 3, "US", 6000),
(2021, 3, "EU", 5000),
(2021, 4, "US", 7000),
(2021, 4, "EU", 6000),
]

# Create DataFrame
columns = ["year", "quarter", "region", "revenue"]
df = spark.createDataFrame(data, columns)
Show Solution
# Execute the pivot operation
pivot_df = df.groupBy("year", "quarter").pivot("region").sum("revenue")

pivot_df.show()

+----+-------+----+----+
|year|quarter| EU| US|
+----+-------+----+----+
|2021| 2|4500|5500|
|2021| 1|4000|5000|
|2021| 3|5000|6000|
|2021| 4|6000|7000|
+----+-------+----+----+

50. How to UnPivot the dataframe (converting columns into rows) ?

Difficulty Level: L2

UnPivot EU, US columns and create region, revenue Columns

Input

# Sample data
data = [(2021, 2, 4500, 5500),
(2021, 1, 4000, 5000),
(2021, 3, 5000, 6000),
(2021, 4, 6000, 7000)]

# Create DataFrame
columns = ["year", "quarter", "EU", "US"]
df = spark.createDataFrame(data, columns)

df.show()
+----+-------+----+----+
|year|quarter| EU| US|
+----+-------+----+----+
|2021| 2|4500|5500|
|2021| 1|4000|5000|
|2021| 3|5000|6000|
|2021| 4|6000|7000|
+----+-------+----+----+

Expected Output

+----+-------+------+-------+
|year|quarter|region|revenue|
+----+-------+------+-------+
|2021| 2| EU| 4500|
|2021| 2| US| 5500|
|2021| 1| EU| 4000|
|2021| 1| US| 5000|
|2021| 3| EU| 5000|
|2021| 3| US| 6000|
|2021| 4| EU| 6000|
|2021| 4| US| 7000|
+----+-------+------+-------+
Show Solution
from pyspark.sql.functions import expr

unpivotExpr = "stack(2, 'EU',EU, 'US', US) as (region,revenue)"

unPivotDF = pivot_df.select("year","quarter", expr(unpivotExpr)).where("revenue is not null")

unPivotDF.show()

+----+-------+------+-------+
|year|quarter|region|revenue|
+----+-------+------+-------+
|2021| 2| EU| 4500|
|2021| 2| US| 5500|
|2021| 1| EU| 4000|
|2021| 1| US| 5000|
|2021| 3| EU| 5000|
|2021| 3| US| 6000|
|2021| 4| EU| 6000|
|2021| 4| US| 7000|
+----+-------+------+-------+

51. How to impute missing values with Zero?

Difficulty Level: L1

Input

# Suppose df is your DataFrame
df = spark.createDataFrame([(1, None), (None, 2), (3, 4), (5, None)], ["a", "b"])

df.show()
+----+----+
| a| b|
+----+----+
| 1|null|
|null| 2|
| 3| 4|
| 5|null|
+----+----+
Show Solution
df_imputed = df.fillna(0)

df_imputed.show()

+---+---+
| a| b|
+---+---+
| 1| 0|
| 0| 2|
| 3| 4|
| 5| 0|
+---+---+

52. How to identify continuous variables in a dataframe and create a list of those column names?

Difficulty Level: L3

Input

url = "https://raw.githubusercontent.com/selva86/datasets/master/Churn_Modelling_m.csv"
spark.sparkContext.addFile(url)

df = spark.read.csv(SparkFiles.get("Churn_Modelling_m.csv"), header=True, inferSchema=True)

#df = spark.read.csv("C:/Users/RajeshVaddi/Documents/MLPlus/DataSets/Churn_Modelling_m.csv", header=True, inferSchema=True)

df.show(2, truncate=False)
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|Surname |CreditScore|Geography|Gender|Age|Tenure|Balance |NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|1 |15634602 |Hargrave|619 |France |Female|42 |2 |0.0 |1 |1 |1 |101348.88 |1 |
|2 |15647311 |Hill |608 |Spain |Female|41 |1 |83807.86|1 |0 |1 |112542.58 |0 |
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
only showing top 2 rows
Show Solution
from pyspark.sql.types import IntegerType, StringType, NumericType
from pyspark.sql.functions import approxCountDistinct

def detect_continuous_variables(df, distinct_threshold):
"""
Identify continuous variables in a PySpark DataFrame.
:param df: The input PySpark DataFrame
:param distinct_threshold: Threshold to qualify as continuous variables - Count of distinct values > distinct_threshold
:return: A List containing names of continuous variables
"""
continuous_columns = []
for column in df.columns:
dtype = df.schema[column].dataType
if isinstance(dtype, (IntegerType, NumericType)):
distinct_count = df.select(approxCountDistinct(column)).collect()[0][0]
if distinct_count > distinct_threshold:
continuous_columns.append(column)
return continuous_columns

continuous_variables = detect_continuous_variables(df, 10)
print(continuous_variables)

['RowNumber', 'CustomerId', 'CreditScore', 'Age', 'Tenure', 'Balance', 'EstimatedSalary']

53. How to calculate Mode of a PySpark DataFrame column?

Difficulty Level: L1

Input

# Create a sample DataFrame
data = [(1, 2, 3), (2, 2, 3), (2, 2, 4), (1, 2, 3), (1, 1, 3)]
columns = ["col1", "col2", "col3"]

df = spark.createDataFrame(data, columns)

df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| 2| 3|
| 2| 2| 3|
| 2| 2| 4|
| 1| 2| 3|
| 1| 1| 3|
+----+----+----+
Show Solution
from pyspark.sql.functions import col

df_grouped = df.groupBy('col2').count()
mode_df = df_grouped.orderBy(col('count').desc()).limit(1)

mode_df.show()

+----+-----+
|col2|count|
+----+-----+
| 2| 4|
+----+-----+

54. How to find installed location of Apache Spark and PySpark?

Difficulty Level: L1

Show Solution
import findspark
findspark.init()

print(findspark.find())

import os
import pyspark

print(os.path.dirname(pyspark.__file__))

C:\spark\spark-3.3.2-bin-hadoop2
C:\spark\spark-3.3.2-bin-hadoop2\python\pyspark

55. How to convert a column to lower case using UDF?

Difficulty Level: L2

Input

# Create a DataFrame to test
data = [('John Doe', 'NEW YORK'),
('Jane Doe', 'LOS ANGELES'),
('Mike Johnson', 'CHICAGO'),
('Sara Smith', 'SAN FRANCISCO')]

df = spark.createDataFrame(data, ['Name', 'City'])

df.show()
+------------+-------------+
| Name| City|
+------------+-------------+
| John Doe| NEW YORK|
| Jane Doe| LOS ANGELES|
|Mike Johnson| CHICAGO|
| Sara Smith|SAN FRANCISCO|
+------------+-------------+
Show Solution
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define your UDF function
def to_lower(s):
if s is not None:
return s.lower()

# Convert your Python function to a Spark UDF
udf_to_lower = udf(to_lower, StringType())

# Apply your UDF to the DataFrame
df = df.withColumn('City_lower', udf_to_lower(df['City']))

# Show the DataFrame
df.show()

+------------+-------------+-------------+
| Name| City| City_lower|
+------------+-------------+-------------+
| John Doe| NEW YORK| new york|
| Jane Doe| LOS ANGELES| los angeles|
|Mike Johnson| CHICAGO| chicago|
| Sara Smith|SAN FRANCISCO|san francisco|
+------------+-------------+-------------+

56. How to convert PySpark data frame to pandas dataframe?

Difficulty Level: L1

Input

# Create a DataFrame to test
data = [('John Doe', 'NEW YORK'),
('Jane Doe', 'LOS ANGELES'),
('Mike Johnson', 'CHICAGO'),
('Sara Smith', 'SAN FRANCISCO')]

pysparkDF = spark.createDataFrame(data, ['Name', 'City'])

pysparkDF.show()
+------------+-------------+
| Name| City|
+------------+-------------+
| John Doe| NEW YORK|
| Jane Doe| LOS ANGELES|
|Mike Johnson| CHICAGO|
| Sara Smith|SAN FRANCISCO|
+------------+-------------+
Show Solution
# convert PySpark data frame to pandas
pandasDF = pysparkDF.toPandas()

print(pandasDF)

Name City
0 John Doe NEW YORK
1 Jane Doe LOS ANGELES
2 Mike Johnson CHICAGO
3 Sara Smith SAN FRANCISCO

57. How to View PySpark Cluster Details?

Difficulty Level: L1

Show Solution
print(spark.sparkContext.uiWebUrl)

http://DESKTOP-UL3QT3E.mshome.net:4040

58. How to View PySpark Cluster Configuration Details?

Difficulty Level: L1

Show Solution
# Print all configurations
for k,v in spark.sparkContext.getConf().getAll():
print(f"{k} : {v}")

spark.app.name : PySpark 101 Exercises
spark.driver.extraJavaOptions : -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED
spark.app.startTime : 1684510291553
spark.app.id : local-1684510293468
spark.driver.host : DESKTOP-UL3QT3E.mshome.net
spark.executor.id : driver
spark.sql.warehouse.dir : file:/C:/Users/RajeshVaddi/Documents/MLPlus/6_PySpark%20101%20Exercises/spark-warehouse
spark.driver.port : 50321
spark.app.submitTime : 1684510291319
spark.rdd.compress : True
spark.executor.extraJavaOptions : -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED
spark.serializer.objectStreamReset : 100
spark.master : local[*]
spark.submit.pyFiles :
spark.submit.deployMode : client
spark.ui.showConsoleProgress : true

59. How to restrict the PySpark to use the number of cores in the system?

Difficulty Level: L1

Show Solution
from pyspark import SparkConf, SparkContext

conf = SparkConf()
conf.set("spark.executor.cores", "2") # set the number of cores you want here
sc = SparkContext(conf=conf)

60. How to cache PySpark DataFrame or objects and delete cache?

Difficulty Level: L2

In PySpark, caching or persisting data is done to speed up data retrieval during iterative and interactive computations.

Show Solution
# Caching the DataFrame
df.cache()

# un-cache or unpersist data using the unpersist() method.
df.unpersist()

DataFrame[Name: string, City: string, City_lower: string]

61. How to Divide a PySpark DataFrame randomly in a given ratio (0.8, 0.2)?

Difficulty Level: L1

Show Solution
# Randomly split data (0.8, 0.2)

train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

62. How to build logistic regression in PySpark?

Difficulty Level: L2

Input

# Create a sample dataframe
data = spark.createDataFrame([
(0, 1.0, -1.0),
(1, 2.0, 1.0),
(1, 3.0, -2.0),
(0, 4.0, 1.0),
(1, 5.0, -3.0),
(0, 6.0, 2.0),
(1, 7.0, -1.0),
(0, 8.0, 3.0),
(1, 9.0, -2.0),
(0, 10.0, 2.0),
(1, 11.0, -3.0),
(0, 12.0, 1.0),
(1, 13.0, -1.0),
(0, 14.0, 2.0),
(1, 15.0, -2.0),
(0, 16.0, 3.0),
(1, 17.0, -3.0),
(0, 18.0, 1.0),
(1, 19.0, -1.0),
(0, 20.0, 2.0)
], ["label", "feat1", "feat2"])

Show Solution
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

# convert the feature columns into a single vector column using VectorAssembler
vecAssembler = VectorAssembler(inputCols=['feat1', 'feat2'], outputCol="features")
data = vecAssembler.transform(data)

# fit the logistic regression model
lr = LogisticRegression(featuresCol='features', labelCol='label')
lr_model = lr.fit(data)

# look at the coefficients and intercept of the logistic regression model
print(f"Coefficients: {str(lr_model.coefficients)}")
print(f"Intercept: {str(lr_model.intercept)}")

Coefficients: [0.020277740475786673,-1.612960940022365]
Intercept: -0.2209292751829534

63. How to convert the categorical string data into numerical data or index?

Difficulty Level: L2

Input

# Create a sample DataFrame
data = [('cat',), ('dog',), ('mouse',), ('fish',), ('dog',), ('cat',), ('mouse',)]
df = spark.createDataFrame(data, ["animal"])

df.show()
+------+
|animal|
+------+
| cat|
| dog|
| mouse|
| fish|
| dog|
| cat|
| mouse|
+------+
Show Solution
from pyspark.ml.feature import StringIndexer

# Initialize a StringIndexer
indexer = StringIndexer(inputCol='animal', outputCol='animalIndex')

# Fit the indexer to the DataFrame and transform the data
indexed = indexer.fit(df).transform(df)
indexed.show()

+------+-----------+
|animal|animalIndex|
+------+-----------+
| cat| 0.0|
| dog| 1.0|
| mouse| 2.0|
| fish| 3.0|
| dog| 1.0|
| cat| 0.0|
| mouse| 2.0|
+------+-----------+

64. How to calculate Correlation of two variables in a DataFrame?

Difficulty Level: L1

Input

# Create a sample dataframe
data = [Row(feature1=5, feature2=10, feature3=25),
Row(feature1=6, feature2=15, feature3=35),
Row(feature1=7, feature2=25, feature3=30),
Row(feature1=8, feature2=20, feature3=60),
Row(feature1=9, feature2=30, feature3=70)]
df = spark.createDataFrame(data)

df.show()
+--------+--------+--------+
|feature1|feature2|feature3|
+--------+--------+--------+
| 5| 10| 25|
| 6| 15| 35|
| 7| 25| 30|
| 8| 20| 60|
| 9| 30| 70|
+--------+--------+--------+
Show Solution
# Calculate correlation
correlation = df.corr("feature1", "feature2")

print("Correlation between feature1 and feature2 :", correlation)

Correlation between feature1 and feature2 : 0.9

65. How to calculate Correlation Matrix?

Difficulty Level: L2

Input

# Create a sample dataframe
data = [Row(feature1=5, feature2=10, feature3=25),
Row(feature1=6, feature2=15, feature3=35),
Row(feature1=7, feature2=25, feature3=30),
Row(feature1=8, feature2=20, feature3=60),
Row(feature1=9, feature2=30, feature3=70)]
df = spark.createDataFrame(data)

df.show()
+--------+--------+--------+
|feature1|feature2|feature3|
+--------+--------+--------+
| 5| 10| 25|
| 6| 15| 35|
| 7| 25| 30|
| 8| 20| 60|
| 9| 30| 70|
+--------+--------+--------+
Show Solution
# Calculate Correlation Using Using MLlib
from pyspark.ml.stat import Correlation

# Assemble feature vector
# Define the feature and label columns & Assemble the feature vector
vector_assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
data_vector = vector_assembler.transform(df).select("features")

# Calculate correlation
correlation_matrix = Correlation.corr(data_vector, "features").head()[0]

print(correlation_matrix)

DenseMatrix([[1. , 0.9 , 0.91779992],
[0.9 , 1. , 0.67837385],
[0.91779992, 0.67837385, 1. ]])

66. How to calculate VIF (Variance Inflation Factor ) for set of variables in a DataFrame?

Difficulty Level: L3

Input

# Create a sample dataframe
data = [Row(feature1=5, feature2=10, feature3=25),
Row(feature1=6, feature2=15, feature3=35),
Row(feature1=7, feature2=25, feature3=30),
Row(feature1=8, feature2=20, feature3=60),
Row(feature1=9, feature2=30, feature3=70)]
df = spark.createDataFrame(data)

df.show()
+--------+--------+--------+
|feature1|feature2|feature3|
+--------+--------+--------+
| 5| 10| 25|
| 6| 15| 35|
| 7| 25| 30|
| 8| 20| 60|
| 9| 30| 70|
+--------+--------+--------+
Show Solution
from pyspark.sql import SparkSession, Row
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

def calculate_vif(data, features):
vif_dict = {}

for feature in features:
non_feature_cols = [col for col in features if col != feature]
assembler = VectorAssembler(inputCols=non_feature_cols, outputCol="features")
lr = LinearRegression(featuresCol='features', labelCol=feature)

model = lr.fit(assembler.transform(data))
vif = 1 / (1 - model.summary.r2)

vif_dict[feature] = vif

return vif_dict

features = ['feature1', 'feature2', 'feature3']
vif_values = calculate_vif(df, features)

for feature, vif in vif_values.items():
print(f'VIF for {feature}: {vif}')

VIF for feature1: 66.2109375000003
VIF for feature2: 19.33593749999992
VIF for feature3: 23.30468749999992

67. How to perform Chi-Square test?

Difficulty Level: L2

Input

# Create a sample dataframe
data = [(1, 0, 0, 1, 1),
(2, 0, 1, 0, 0),
(3, 1, 0, 0, 0),
(4, 0, 0, 1, 1),
(5, 0, 1, 1, 0)]

df = spark.createDataFrame(data, ["id", "feature1", "feature2", "feature3", "label"])

df.show()
+---+--------+--------+--------+-----+
| id|feature1|feature2|feature3|label|
+---+--------+--------+--------+-----+
| 1| 0| 0| 1| 1|
| 2| 0| 1| 0| 0|
| 3| 1| 0| 0| 0|
| 4| 0| 0| 1| 1|
| 5| 0| 1| 1| 0|
+---+--------+--------+--------+-----+
Show Solution
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
df = assembler.transform(df)

from pyspark.ml.stat import ChiSquareTest

r = ChiSquareTest.test(df, "features", "label").head()
print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))

pValues: [0.36131042852617856,0.13603712811414348,0.1360371281141436]
degreesOfFreedom: [1, 1, 1]
statistics: [0.8333333333333335,2.2222222222222228,2.2222222222222223]

68. How to calculate the Standard Deviation?

Difficulty Level: L1

Input

# Sample data
data = [("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)]

# Create DataFrame
df = spark.createDataFrame(data, ["Employee", "Department", "Salary"])

df.show()
+--------+----------+------+
|Employee|Department|Salary|
+--------+----------+------+
| James| Sales| 3000|
| Michael| Sales| 4600|
| Robert| Sales| 4100|
| Maria| Finance| 3000|
| James| Sales| 3000|
| Scott| Finance| 3300|
| Jen| Finance| 3900|
| Jeff| Marketing| 3000|
| Kumar| Marketing| 2000|
| Saif| Sales| 4100|
+--------+----------+------+
Show Solution
from pyspark.sql.functions import stddev

salary_stddev = df.select(stddev("Salary").alias("stddev"))

salary_stddev.show()

+-----------------+
| stddev|
+-----------------+
|765.9416862050705|
+-----------------+

69. How to calculate missing value percentage in each column?

Difficulty Level: L3

Input

# Create a sample dataframe
data = [("John", "Doe", None),
(None, "Smith", "New York"),
("Mike", "Smith", None),
("Anna", "Smith", "Boston"),
(None, None, None)]

df = spark.createDataFrame(data, ["FirstName", "LastName", "City"])

df.show()
+---------+--------+--------+
|FirstName|LastName| City|
+---------+--------+--------+
| John| Doe| null|
| null| Smith|New York|
| Mike| Smith| null|
| Anna| Smith| Boston|
| null| null| null|
+---------+--------+--------+
Show Solution
# Calculate the total number of rows in the dataframe
total_rows = df.count()

# For each column calculate the number of null values and then calculate the percentage
for column in df.columns:
null_values = df.filter(df[column].isNull()).count()
missing_percentage = (null_values / total_rows) * 100
print(f"Missing values in {column}: {missing_percentage}%")

Missing values in FirstName: 40.0%
Missing values in LastName: 20.0%
Missing values in City: 60.0%

70. How to get the names of DataFrame objects that have been created in an environment?

Difficulty Level: L2

Show Solution
dataframe_names = [name for name, obj in globals().items() if isinstance(obj, pyspark.sql.DataFrame)]

for name in dataframe_names:
print(name)

Course Preview

Machine Learning A-Z™: Hands-On Python & R In Data Science

Free Sample Videos:

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science