101 PySpark exercises are designed to challenge your logical muscle and to help internalize data manipulation with python’s favorite package for data analysis. The questions are of 3 levels of difficulties with L1 being the easiest to L3 being the hardest.
You might also like to try out:
1. How to import PySpark and check the version?
Difficulty Level: L1
Show Solution
import findspark
findspark.init()
# Creating a SparkSession: A SparkSession is the entry point for using the PySpark DataFrame and SQL API.
# To create a SparkSession, use the following code
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySpark 101 Exercises").getOrCreate()
# Get version details
print(spark.version)
#> 3.3.2
2. How to convert the index of a PySpark DataFrame into a column?
Difficulty Level: L1
Hint: The PySpark DataFrame doesn’t have an explicit concept of an index like Pandas DataFrame. However, if you have a DataFrame and you’d like to add a new column that is basically a row number.
Input:
# Assuming df is your DataFrame
df = spark.createDataFrame([
("Alice", 1),
("Bob", 2),
("Charlie", 3),
], ["Name", "Value"])
df.show()
+-------+-----+
| Name|Value|
+-------+-----+
| Alice| 1|
| Bob| 2|
|Charlie| 3|
+-------+-----+
Expected Output:
+-------+-----+-----+
| Name|Value|index|
+-------+-----+-----+
| Alice| 1| 0|
| Bob| 2| 1|
|Charlie| 3| 2|
+-------+-----+-----+
Show Solutionfrom pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id
# Define window specification
w = Window.orderBy(monotonically_increasing_id())
# Add index
df = df.withColumn("index", row_number().over(w) - 1)
df.show()
+-------+-----+-----+
| Name|Value|index|
+-------+-----+-----+
| Alice| 1| 0|
| Bob| 2| 1|
|Charlie| 3| 2|
+-------+-----+-----+
3. How to combine many lists to form a PySpark DataFrame?
Difficulty Level: L1
Create a PySpark DataFrame from list1
and list2
Hint: For Creating DataFrame from multiple lists, first create an RDD (Resilient Distributed Dataset) from those lists and then convert the RDD to a DataFrame.
Input:
# Define your lists
list1 = ["a", "b", "c", "d"]
list2 = [1, 2, 3, 4]
Show Solution# Create an RDD from the lists and convert it to a DataFrame
rdd = spark.sparkContext.parallelize(list(zip(list1, list2)))
df = rdd.toDF(["Column1", "Column2"])
# Show the DataFrame
df.show()
+-------+-------+
|Column1|Column2|
+-------+-------+
| a| 1|
| b| 2|
| c| 3|
| d| 4|
+-------+-------+
4. How to get the items of list A not present in list B?
Difficulty Level: L2
Get the items of list_A
not present in list_B
in PySpark, you can use the subtract
operation on RDDs (Resilient Distributed Datasets).
Input:
list_A = [1, 2, 3, 4, 5]
list_B = [4, 5, 6, 7, 8]
Expected Output:
#> [1, 2, 3]
Show Solutionsc = spark.sparkContext
# Convert lists to RDD
rdd_A = sc.parallelize(list_A)
rdd_B = sc.parallelize(list_B)
# Perform subtract operation
result_rdd = rdd_A.subtract(rdd_B)
# Collect result
result_list = result_rdd.collect()
print(result_list)
#> [1, 2, 3]
5. How to get the items not common to both list A and list B?
Difficulty Level: L2
Get all items of list_A
and list_B
not common to both.
Input:
list_A = [1, 2, 3, 4, 5]
list_B = [4, 5, 6, 7, 8]
Show Solutionsc = spark.sparkContext
# Convert lists to RDD
rdd_A = sc.parallelize(list_A)
rdd_B = sc.parallelize(list_B)
# Perform subtract operation
result_rdd_A = rdd_A.subtract(rdd_B)
result_rdd_B = rdd_B.subtract(rdd_A)
# Union the two RDDs
result_rdd = result_rdd_A.union(result_rdd_B)
# Collect result
result_list = result_rdd.collect()
print(result_list)
[1, 2, 3, 8, 6, 7]
6. How to get the minimum, 25th percentile, median, 75th, and max of a numeric column?
Difficulty Level: L2
Compute the minimum, 25th percentile, median, 75th, and maximum of column Age
input
# Create a sample DataFrame
data = [("A", 10), ("B", 20), ("C", 30), ("D", 40), ("E", 50), ("F", 15), ("G", 28), ("H", 54), ("I", 41), ("J", 86)]
df = spark.createDataFrame(data, ["Name", "Age"])
df.show()
+----+---+
|Name|Age|
+----+---+
| A| 10|
| B| 20|
| C| 30|
| D| 40|
| E| 50|
| F| 15|
| G| 28|
| H| 54|
| I| 41|
| J| 86|
+----+---+
Show Solution# Calculate percentiles
quantiles = df.approxQuantile("Age", [0.0, 0.25, 0.5, 0.75, 1.0], 0.01)
print("Min: ", quantiles[0])
print("25th percentile: ", quantiles[1])
print("Median: ", quantiles[2])
print("75th percentile: ", quantiles[3])
print("Max: ", quantiles[4])
Min: 10.0
25th percentile: 20.0
Median: 30.0
75th percentile: 50.0
Max: 86.0
7. How to get frequency counts of unique items of a column?
Difficulty Level: L1
Calculte the frequency counts of each unique value
Input
from pyspark.sql import Row
# Sample data
data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]
# create DataFrame
df = spark.createDataFrame(data)
# show DataFrame
df.show()
+----+---------+
|name| job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam| Doctor|
+----+---------+
Show Solutiondf.groupBy("job").count().show()
+---------+-----+
| job|count|
+---------+-----+
| Engineer| 4|
|Scientist| 2|
| Doctor| 1|
+---------+-----+
8. How to keep only top 2 most frequent values as it is and replace everything else as ‘Other’?
Difficulty Level: L3
Input
from pyspark.sql import Row
# Sample data
data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]
# create DataFrame
df = spark.createDataFrame(data)
# show DataFrame
df.show()
+----+---------+
|name| job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam| Doctor|
+----+---------+
Show Solutionfrom pyspark.sql.functions import col, when
# Get the top 2 most frequent jobs
top_2_jobs = df.groupBy('job').count().orderBy('count', ascending=False).limit(2).select('job').rdd.flatMap(lambda x: x).collect()
# Replace all but the top 2 most frequent jobs with 'Other'
df = df.withColumn('job', when(col('job').isin(top_2_jobs), col('job')).otherwise('Other'))
# show DataFrame
df.show()
+----+---------+
|name| job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam| Other|
+----+---------+
9. How to Drop rows with NA values specific to a particular column?
Difficulty Level: L1
input
# Assuming df is your DataFrame
df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])
df.show()
+----+-----+----+
|Name|Value| id|
+----+-----+----+
| A| 1|null|
| B| null| 123|
| B| 3| 456|
| D| null|null|
+----+-----+----+
Show Solutiondf_2 = df.dropna(subset=['Value'])
df_2.show()
+----+-----+----+
|Name|Value| id|
+----+-----+----+
| A| 1|null|
| B| 3| 456|
+----+-----+----+
10. How to rename columns of a PySpark DataFrame using two lists – one containing the old column names and the other containing the new column names?
Difficulty Level: L2
Input
# suppose you have the following DataFrame
df = spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ["col1", "col2", "col3"])
# old column names
old_names = ["col1", "col2", "col3"]
# new column names
new_names = ["new_col1", "new_col2", "new_col3"]
df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| 2| 3|
| 4| 5| 6|
+----+----+----+
Show Solution# renaming
for old_name, new_name in zip(old_names, new_names):
df = df.withColumnRenamed(old_name, new_name)
df.show()
+--------+--------+--------+
|new_col1|new_col2|new_col3|
+--------+--------+--------+
| 1| 2| 3|
| 4| 5| 6|
+--------+--------+--------+
11. How to bin a numeric list to 10 groups of equal size?
Difficulty Level: L2
Input
from pyspark.sql.functions import rand
from pyspark.ml.feature import Bucketizer
# Create a DataFrame with a single column "values" filled with random numbers
num_items = 100
df = spark.range(num_items).select(rand(seed=42).alias("values"))
df.show(5)
+-------------------+
| values|
+-------------------+
| 0.619189370225301|
| 0.5096018842446481|
| 0.8325259388871524|
|0.26322809041172357|
| 0.6702867696264135|
+-------------------+
only showing top 5 rows
Show Solution# Define the bucket boundaries
num_buckets = 10
quantiles = df.stat.approxQuantile("values", [i/num_buckets for i in range(num_buckets+1)], 0.01)
# Create the Bucketizer
bucketizer = Bucketizer(splits=quantiles, inputCol="values", outputCol="buckets")
# Apply the Bucketizer
df_buck = bucketizer.transform(df)
#Frequency table
df_buck.groupBy("buckets").count().show()
# Show the original and bucketed values
df_buck.show(5)
+-------+-----+
|buckets|count|
+-------+-----+
| 8.0| 10|
| 0.0| 8|
| 7.0| 10|
| 1.0| 10|
| 4.0| 10|
| 3.0| 10|
| 2.0| 10|
| 6.0| 10|
| 5.0| 10|
| 9.0| 12|
+-------+-----+
+-------------------+-------+
| values|buckets|
+-------------------+-------+
| 0.619189370225301| 5.0|
| 0.5096018842446481| 3.0|
| 0.8325259388871524| 8.0|
|0.26322809041172357| 2.0|
| 0.6702867696264135| 6.0|
+-------------------+-------+
only showing top 5 rows
12. How to create contigency table?
Difficulty Level: L1
Input
# Example DataFrame
data = [("A", "X"), ("A", "Y"), ("A", "X"), ("B", "Y"), ("B", "X"), ("C", "X"), ("C", "X"), ("C", "Y")]
df = spark.createDataFrame(data, ["category1", "category2"])
df.show()
+---------+---------+
|category1|category2|
+---------+---------+
| A| X|
| A| Y|
| A| X|
| B| Y|
| B| X|
| C| X|
| C| X|
| C| Y|
+---------+---------+
Show Solution# Frequency
df.cube("category1").count().show()
# Contingency table
df.crosstab('category1', 'category2').show()
+---------+-----+
|category1|count|
+---------+-----+
| null| 8|
| A| 3|
| B| 2|
| C| 3|
+---------+-----+
+-------------------+---+---+
|category1_category2| X| Y|
+-------------------+---+---+
| A| 2| 1|
| B| 1| 1|
| C| 2| 1|
+-------------------+---+---+
13. How to find the numbers that are multiples of 3 from a column?
Difficulty Level: L2
Input
from pyspark.sql.functions import rand
# Generate a DataFrame with a single column "id" with 10 rows
df = spark.range(10)
# Generate a random float between 0 and 1, scale and shift it to get a random integer between 1 and 10
df = df.withColumn("random", ((rand(seed=42) * 10) + 1).cast("int"))
# Show the DataFrame
df.show()
+---+------+
| id|random|
+---+------+
| 0| 7|
| 1| 6|
| 2| 9|
| 3| 7|
| 4| 3|
| 5| 8|
| 6| 9|
| 7| 8|
| 8| 3|
| 9| 8|
+---+------+
Show Solutionfrom pyspark.sql.functions import col, when
# Assuming df is your DataFrame and "your_column" is the column with the numbers
df = df.withColumn("is_multiple_of_3", when(col("random") % 3 == 0, 1).otherwise(0))
df.show()
+---+------+----------------+
| id|random|is_multiple_of_3|
+---+------+----------------+
| 0| 7| 0|
| 1| 6| 1|
| 2| 9| 1|
| 3| 7| 0|
| 4| 3| 1|
| 5| 8| 0|
| 6| 9| 1|
| 7| 8| 0|
| 8| 3| 1|
| 9| 8| 0|
+---+------+----------------+
14. How to extract items at given positions from a column?
Difficulty Level: L2
Input
from pyspark.sql.functions import rand
# Generate a DataFrame with a single column "id" with 10 rows
df = spark.range(10)
# Generate a random float between 0 and 1, scale and shift it to get a random integer between 1 and 10
df = df.withColumn("random", ((rand(seed=42) * 10) + 1).cast("int"))
# Show the DataFrame
df.show()
pos = [0, 4, 8, 5]
+---+------+
| id|random|
+---+------+
| 0| 7|
| 1| 6|
| 2| 9|
| 3| 7|
| 4| 3|
| 5| 8|
| 6| 9|
| 7| 8|
| 8| 3|
| 9| 8|
+---+------+
Show Solutionfrom pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id
pos = [0, 4, 8, 5]
# Define window specification
w = Window.orderBy(monotonically_increasing_id())
# Add index
df = df.withColumn("index", row_number().over(w) - 1)
df.show()
# Filter the DataFrame based on the specified positions
df_filtered = df.filter(df.index.isin(pos))
df_filtered.show()
+---+------+-----+
| id|random|index|
+---+------+-----+
| 0| 7| 0|
| 1| 6| 1|
| 2| 9| 2|
| 3| 7| 3|
| 4| 3| 4|
| 5| 8| 5|
| 6| 9| 6|
| 7| 8| 7|
| 8| 3| 8|
| 9| 8| 9|
+---+------+-----+
+---+------+-----+
| id|random|index|
+---+------+-----+
| 0| 7| 0|
| 4| 3| 4|
| 5| 8| 5|
| 8| 3| 8|
+---+------+-----+
15. How to stack two DataFrames vertically ?
Difficulty Level: L1
Input
# Create DataFrame for region A
df_A = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 10), ("orange", 2, 8)], ["Name", "Col_1", "Col_2"])
df_A.show()
# Create DataFrame for region B
df_B = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 15), ("grape", 4, 6)], ["Name", "Col_1", "Col_3"])
df_B.show()
+------+-----+-----+
| Name|Col_1|Col_2|
+------+-----+-----+
| apple| 3| 5|
|banana| 1| 10|
|orange| 2| 8|
+------+-----+-----+
+------+-----+-----+
| Name|Col_1|Col_3|
+------+-----+-----+
| apple| 3| 5|
|banana| 1| 15|
| grape| 4| 6|
+------+-----+-----+
Show Solutiondf_A.union(df_B).show()
+------+-----+-----+
| Name|Col_1|Col_2|
+------+-----+-----+
| apple| 3| 5|
|banana| 1| 10|
|orange| 2| 8|
| apple| 3| 5|
|banana| 1| 15|
| grape| 4| 6|
+------+-----+-----+
16. How to compute the mean squared error on a truth and predicted columns?
Difficulty Level: L2
Input
# Assume you have a DataFrame df with two columns "actual" and "predicted"
# For the sake of example, we'll create a sample DataFrame
data = [(1, 1), (2, 4), (3, 9), (4, 16), (5, 25)]
df = spark.createDataFrame(data, ["actual", "predicted"])
df.show()
+------+---------+
|actual|predicted|
+------+---------+
| 1| 1|
| 2| 4|
| 3| 9|
| 4| 16|
| 5| 25|
+------+---------+
Show Solution# Calculate the squared differences
df = df.withColumn("squared_error", pow((col("actual") - col("predicted")), 2))
# Calculate the mean squared error
mse = df.agg({"squared_error": "avg"}).collect()[0][0]
print(f"Mean Squared Error (MSE) = {mse}")
Mean Squared Error (MSE) = 116.8
17. How to convert the first character of each element in a series to uppercase?
Difficulty Level: L1
# Suppose you have the following DataFrame
data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])
df.show()
+-----+
| name|
+-----+
| john|
|alice|
| bob|
+-----+
Show Solutionfrom pyspark.sql.functions import initcap
# Convert the first character to uppercase
df = df.withColumn("name", initcap(df["name"]))
df.show()
+-----+
| name|
+-----+
| John|
|Alice|
| Bob|
+-----+
18. How to compute summary statistics for all columns in a dataframe
Difficulty Level: L1
# For the sake of example, we'll create a sample DataFrame
data = [('James', 34, 55000),
('Michael', 30, 70000),
('Robert', 37, 60000),
('Maria', 29, 80000),
('Jen', 32, 65000)]
df = spark.createDataFrame(data, ["name", "age" , "salary"])
df.show()
+-------+---+------+
| name|age|salary|
+-------+---+------+
| James| 34| 55000|
|Michael| 30| 70000|
| Robert| 37| 60000|
| Maria| 29| 80000|
| Jen| 32| 65000|
+-------+---+------+
Show Solution# Summary statistics
summary = df.summary()
# Show the summary statistics
summary.show()
+-------+------+------------------+-----------------+
|summary| name| age| salary|
+-------+------+------------------+-----------------+
| count| 5| 5| 5|
| mean| null| 32.4| 66000.0|
| stddev| null|3.2093613071762417|9617.692030835673|
| min| James| 29| 55000|
| 25%| null| 30| 60000|
| 50%| null| 32| 65000|
| 75%| null| 34| 70000|
| max|Robert| 37| 80000|
+-------+------+------------------+-----------------+
19. How to calculate the number of characters in each word in a column?
Difficulty Level: L1
# Suppose you have the following DataFrame
data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])
df.show()
+-----+
| name|
+-----+
| john|
|alice|
| bob|
+-----+
Show Solutionfrom pyspark.sql import functions as F
df = df.withColumn('word_length', F.length(df.name))
df.show()
+-----+-----------+
| name|word_length|
+-----+-----------+
| john| 4|
|alice| 5|
| bob| 3|
+-----+-----------+
20 How to compute difference of differences between consecutive numbers of a column?
Difficulty Level: L2
# For the sake of example, we'll create a sample DataFrame
data = [('James', 34, 55000),
('Michael', 30, 70000),
('Robert', 37, 60000),
('Maria', 29, 80000),
('Jen', 32, 65000)]
df = spark.createDataFrame(data, ["name", "age" , "salary"])
df.show()
+-------+---+------+
| name|age|salary|
+-------+---+------+
| James| 34| 55000|
|Michael| 30| 70000|
| Robert| 37| 60000|
| Maria| 29| 80000|
| Jen| 32| 65000|
+-------+---+------+
Show Solutionfrom pyspark.sql import functions as F
from pyspark.sql.window import Window
# Define window specification
df = df.withColumn("id", F.monotonically_increasing_id())
window = Window.orderBy("id")
# Generate the lag of the variable
df = df.withColumn("prev_value", F.lag(df.salary).over(window))
# Compute the difference with lag
df = df.withColumn("diff", F.when(F.isnull(df.salary - df.prev_value), 0)
.otherwise(df.salary - df.prev_value)).drop("id")
df.show()
+-------+---+------+----------+------+
| name|age|salary|prev_value| diff|
+-------+---+------+----------+------+
| James| 34| 55000| null| 0|
|Michael| 30| 70000| 55000| 15000|
| Robert| 37| 60000| 70000|-10000|
| Maria| 29| 80000| 60000| 20000|
| Jen| 32| 65000| 80000|-15000|
+-------+---+------+----------+------+
21. How to get the day of month, week number, day of year and day of week from a date strings?
Difficulty Level: L2
# example data
data = [("2023-05-18","01 Jan 2010",), ("2023-12-31", "01 Jan 2010",)]
df = spark.createDataFrame(data, ["date_str_1", "date_str_2"])
df.show()
+----------+-----------+
|date_str_1| date_str_2|
+----------+-----------+
|2023-05-18|01 Jan 2010|
|2023-12-31|01 Jan 2010|
+----------+-----------+
Show Solutionfrom pyspark.sql.functions import to_date, dayofmonth, weekofyear, dayofyear, dayofweek
# Convert date string to date format
df = df.withColumn("date_1", to_date(df.date_str_1, 'yyyy-MM-dd'))
df = df.withColumn("date_2", to_date(df.date_str_2, 'dd MMM yyyy'))
df = df.withColumn("day_of_month", dayofmonth(df.date_1))\
.withColumn("week_number", weekofyear(df.date_1))\
.withColumn("day_of_year", dayofyear(df.date_1))\
.withColumn("day_of_week", dayofweek(df.date_1))
df.show()
+----------+-----------+----------+----------+------------+-----------+-----------+-----------+
|date_str_1| date_str_2| date_1| date_2|day_of_month|week_number|day_of_year|day_of_week|
+----------+-----------+----------+----------+------------+-----------+-----------+-----------+
|2023-05-18|01 Jan 2010|2023-05-18|2010-01-01| 18| 20| 138| 5|
|2023-12-31|01 Jan 2010|2023-12-31|2010-01-01| 31| 52| 365| 1|
+----------+-----------+----------+----------+------------+-----------+-----------+-----------+
22. How to convert year-month string to dates corresponding to the 4th day of the month?
Difficulty Level: L2
# example dataframe
df = spark.createDataFrame([('Jan 2010',), ('Feb 2011',), ('Mar 2012',)], ['MonthYear'])
df.show()
+---------+
|MonthYear|
+---------+
| Jan 2010|
| Feb 2011|
| Mar 2012|
+---------+
Show Solutionfrom pyspark.sql.functions import expr, col
# convert YearMonth to date (default to first day of the month)
df = df.withColumn('Date', expr("to_date(MonthYear, 'MMM yyyy')"))
df.show()
# replace day with 4
df = df.withColumn('Date', expr("date_add(date_sub(Date, day(Date) - 1), 3)"))
df.show()
+---------+----------+
|MonthYear| Date|
+---------+----------+
| Jan 2010|2010-01-01|
| Feb 2011|2011-02-01|
| Mar 2012|2012-03-01|
+---------+----------+
+---------+----------+
|MonthYear| Date|
+---------+----------+
| Jan 2010|2010-01-04|
| Feb 2011|2011-02-04|
| Mar 2012|2012-03-04|
+---------+----------+
23 How to filter words that contain atleast 2 vowels from a series?
Difficulty Level: L3
# example dataframe
df = spark.createDataFrame([('Apple',), ('Orange',), ('Plan',) , ('Python',) , ('Money',)], ['Word'])
df.show()
+------+
| Word|
+------+
| Apple|
|Orange|
| Plan|
|Python|
| Money|
+------+
Show Solutionfrom pyspark.sql.functions import col, length, translate
# Filter words that contain at least 2 vowels
df_filtered = df.where((length(col('Word')) - length(translate(col('Word'), 'AEIOUaeiou', ''))) >= 2)
df_filtered.show()
+------+
| Word|
+------+
| Apple|
|Orange|
| Money|
+------+
24. How to filter valid emails from a list?
Difficulty Level: L3
# Create a list
data = ['buying books at amazom.com', '[email protected]', '[email protected]', '[email protected]']
# Convert the list to DataFrame
df = spark.createDataFrame(data, "string")
df.show(truncate =False)
+--------------------------+
|value |
+--------------------------+
|buying books at amazom.com|
|[email protected] |
|[email protected] |
|[email protected] |
+--------------------------+
Show Solution# Define a regular expression pattern for emails
pattern = "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
# Apply filter operation to keep only valid emails
df_filtered = df.filter(F.col("value").rlike(pattern))
# Show the DataFrame
df_filtered.show()
+-----------------+
| value|
+-----------------+
|[email protected]|
| [email protected]|
|[email protected]|
+-----------------+
25. How to Pivot PySpark DataFrame?
Convert region
categories to Columns and sum the revenue
Difficulty Level: L3
# Sample data
data = [
(2021, 1, "US", 5000),
(2021, 1, "EU", 4000),
(2021, 2, "US", 5500),
(2021, 2, "EU", 4500),
(2021, 3, "US", 6000),
(2021, 3, "EU", 5000),
(2021, 4, "US", 7000),
(2021, 4, "EU", 6000),
]
# Create DataFrame
columns = ["year", "quarter", "region", "revenue"]
df = spark.createDataFrame(data, columns)
df.show()
+----+-------+------+-------+
|year|quarter|region|revenue|
+----+-------+------+-------+
|2021| 1| US| 5000|
|2021| 1| EU| 4000|
|2021| 2| US| 5500|
|2021| 2| EU| 4500|
|2021| 3| US| 6000|
|2021| 3| EU| 5000|
|2021| 4| US| 7000|
|2021| 4| EU| 6000|
+----+-------+------+-------+
Show Solution# Execute the pivot operation
pivot_df = df.groupBy("year", "quarter").pivot("region").sum("revenue")
pivot_df.show()
+----+-------+----+----+
|year|quarter| EU| US|
+----+-------+----+----+
|2021| 2|4500|5500|
|2021| 1|4000|5000|
|2021| 3|5000|6000|
|2021| 4|6000|7000|
+----+-------+----+----+
26. How to get the mean of a variable grouped by another variable?
Difficulty Level: L3
# Sample data
data = [("1001", "Laptop", 1000),
("1002", "Mouse", 50),
("1003", "Laptop", 1200),
("1004", "Mouse", 30),
("1005", "Smartphone", 700)]
# Create DataFrame
columns = ["OrderID", "Product", "Price"]
df = spark.createDataFrame(data, columns)
df.show()
+-------+----------+-----+
|OrderID| Product|Price|
+-------+----------+-----+
| 1001| Laptop| 1000|
| 1002| Mouse| 50|
| 1003| Laptop| 1200|
| 1004| Mouse| 30|
| 1005|Smartphone| 700|
+-------+----------+-----+
Show Solutionfrom pyspark.sql.functions import mean
# GroupBy and aggregate
result = df.groupBy("Product").agg(mean("Price").alias("Total_Sales"))
# Show results
result.show()
+----------+-----------+
| Product|Total_Sales|
+----------+-----------+
| Laptop| 1100.0|
| Mouse| 40.0|
|Smartphone| 700.0|
+----------+-----------+
27. How to compute the euclidean distance between two columns?
Difficulty Level: L3
Compute the euclidean distance between series (points) p and q, without using a packaged formula.
# Define your series
data = [(1, 10), (2, 9), (3, 8), (4, 7), (5, 6), (6, 5), (7, 4), (8, 3), (9, 2), (10, 1)]
# Convert list to DataFrame
df = spark.createDataFrame(data, ["series1", "series2"])
df.show()
+-------+-------+
|series1|series2|
+-------+-------+
| 1| 10|
| 2| 9|
| 3| 8|
| 4| 7|
| 5| 6|
| 6| 5|
| 7| 4|
| 8| 3|
| 9| 2|
| 10| 1|
+-------+-------+
Show Solutionfrom pyspark.sql.functions import expr
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
# Convert series to vectors
vecAssembler = VectorAssembler(inputCols=["series1", "series2"], outputCol="vectors")
df = vecAssembler.transform(df)
# Calculate squared differences
df = df.withColumn("squared_diff", expr("POW(series1 - series2, 2)"))
# Sum squared differences and take square root
df.agg(expr("SQRT(SUM(squared_diff))").alias("euclidean_distance")).show()
+------------------+
|euclidean_distance|
+------------------+
| 18.16590212458495|
+------------------+
28. How to replace missing spaces in a string with the least frequent character?
Difficulty Level: L3
Replace the spaces in my_str with the least frequent characte
#Sample DataFrame
df = spark.createDataFrame([('dbc deb abed gade',),], ["string"])
df.show()
+-----------------+
| string|
+-----------------+
|dbc deb abed gade|
+-----------------+
Desired output
+-----------------+-----------------+
| string| modified_string|
+-----------------+-----------------+
|dbc deb abed gade|dbccdebcabedcgade|
+-----------------+-----------------+
Show Solutionfrom pyspark.sql.functions import udf, explode
from pyspark.sql.types import StringType, ArrayType
from collections import Counter
def least_freq_char_replace_spaces(s):
counter = Counter(s.replace(" ", ""))
least_freq_char = min(counter, key = counter.get)
return s.replace(' ', least_freq_char)
udf_least_freq_char_replace_spaces = udf(least_freq_char_replace_spaces, StringType())
df = spark.createDataFrame([('dbc deb abed gade',)], ["string"])
df.withColumn('modified_string', udf_least_freq_char_replace_spaces(df['string'])).show()
+-----------------+-----------------+
| string| modified_string|
+-----------------+-----------------+
|dbc deb abed gade|dbccdebcabedcgade|
+-----------------+-----------------+
29. How to create a TimeSeries starting ‘2000-01-01’ and 10 weekends (saturdays) after that having random numbers as values?
Difficulty Level: L3
Desired output
values can be random
+----------+--------------+
| date|random_numbers|
+----------+--------------+
|2000-01-01| 8|
|2000-01-08| 3|
|2000-01-15| 8|
|2000-01-22| 5|
|2000-01-29| 4|
|2000-02-05| 6|
|2000-02-12| 8|
|2000-02-19| 1|
|2000-02-26| 9|
|2000-03-04| 3|
+----------+--------------+
Show Solutionfrom pyspark.sql.functions import expr, explode, sequence, rand
# Start date and end date (start + 10 weekends)
start_date = '2000-01-01'
end_date = '2000-03-04' # Calculated manually: 10 weekends (Saturdays) from start date
# Create a DataFrame with one row containing a sequence from start_date to end_date with a 1 day step
df = spark.range(1).select(
explode(
sequence(
expr(f"date '{start_date}'"),
expr(f"date '{end_date}'"),
expr("interval 1 day")
)
).alias("date")
)
# Filter out the weekdays (retain weekends)
df = df.filter(expr("dayofweek(date) = 7")) # 7 corresponds to Saturday in Spark
# Add the random numbers column
#df = df.withColumn("random_numbers", rand()*10)
df = df.withColumn("random_numbers", ((rand(seed=42) * 10) + 1).cast("int"))
# Show the DataFrame
df.show()
+----------+--------------+
| date|random_numbers|
+----------+--------------+
|2000-01-01| 8|
|2000-01-08| 3|
|2000-01-15| 8|
|2000-01-22| 5|
|2000-01-29| 4|
|2000-02-05| 6|
|2000-02-12| 8|
|2000-02-19| 1|
|2000-02-26| 9|
|2000-03-04| 3|
+----------+--------------+
30. How to get the nrows, ncolumns, datatype of a dataframe?
Difficiulty Level: L1
Get the number of rows, columns, datatype and summary statistics of each column of the Churn_Modelling dataset. Also get the numpy array and list equivalent of the dataframe
url = "https://raw.githubusercontent.com/selva86/datasets/master/Churn_Modelling.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("Churn_Modelling.csv"), header=True, inferSchema=True)
#df = spark.read.csv("C:/Users/RajeshVaddi/Documents/MLPlus/DataSets/Churn_Modelling.csv", header=True, inferSchema=True)
df.show(5, truncate=False)
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|Surname |CreditScore|Geography|Gender|Age|Tenure|Balance |NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|1 |15634602 |Hargrave|619 |France |Female|42 |2 |0.0 |1 |1 |1 |101348.88 |1 |
|2 |15647311 |Hill |608 |Spain |Female|41 |1 |83807.86 |1 |0 |1 |112542.58 |0 |
|3 |15619304 |Onio |502 |France |Female|42 |8 |159660.8 |3 |1 |0 |113931.57 |1 |
|4 |15701354 |Boni |699 |France |Female|39 |1 |0.0 |2 |0 |0 |93826.63 |0 |
|5 |15737888 |Mitchell|850 |Spain |Female|43 |2 |125510.82|1 |1 |1 |79084.1 |0 |
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
only showing top 5 rows
Show Solution# For number of rows
nrows = df.count()
print("Number of Rows: ", nrows)
# For number of columns
ncols = len(df.columns)
print("Number of Columns: ", ncols)
# For data types of each column
datatypes = df.dtypes
print("Data types: ", datatypes)
Number of Rows: 10000
Number of Columns: 14
Data types: [('RowNumber', 'int'), ('CustomerId', 'int'), ('Surname', 'string'), ('CreditScore', 'int'), ('Geography', 'string'), ('Gender', 'string'), ('Age', 'int'), ('Tenure', 'int'), ('Balance', 'double'), ('NumOfProducts', 'int'), ('HasCrCard', 'int'), ('IsActiveMember', 'int'), ('EstimatedSalary', 'double'), ('Exited', 'int')]
31. How to rename a specific columns in a dataframe?
Difficiulty Level: L2
Input
# Suppose you have the following DataFrame
df = spark.createDataFrame([('Alice', 1, 30),('Bob', 2, 35)], ["name", "age", "qty"])
df.show()
# Rename lists for specific columns
old_names = ["qty", "age"]
new_names = ["user_qty", "user_age"]
+-----+---+---+
| name|age|qty|
+-----+---+---+
|Alice| 1| 30|
| Bob| 2| 35|
+-----+---+---+
Show Solutionold_names = ["qty", "age"]
new_names = ["user_qty", "user_age"]
# You can then rename the columns like this:
for old_name, new_name in zip(old_names, new_names):
df = df.withColumnRenamed(old_name, new_name)
df.show()
+-----+--------+--------+
| name|user_age|user_qty|
+-----+--------+--------+
|Alice| 1| 30|
| Bob| 2| 35|
+-----+--------+--------+
32. How to check if a dataframe has any missing values and count of missing values in each column?
Difficulty Level: L2
Input
# Assuming df is your DataFrame
df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])
df.show()
+----+-----+----+
|Name|Value| id|
+----+-----+----+
| A| 1|null|
| B| null| 123|
| B| 3| 456|
| D| null|null|
+----+-----+----+
Show Solutionfrom pyspark.sql.functions import col, sum
missing = df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns))
has_missing = any(row.asDict().values() for row in missing.collect())
print(has_missing)
missing_count = missing.collect()[0].asDict()
print(missing_count)
True
{'Name': 0, 'Value': 2, 'id': 2}
33 How to replace missing values of multiple numeric columns with the mean?
Difficulty Level: L2
Input
df = spark.createDataFrame([
("A", 1, None),
("B", None, 123 ),
("B", 3, 456),
("D", 6, None),
], ["Name", "var1", "var2"])
df.show()
+----+----+----+
|Name|var1|var2|
+----+----+----+
| A| 1|null|
| B|null| 123|
| B| 3| 456|
| D| 6|null|
+----+----+----+
Show Solutionfrom pyspark.ml.feature import Imputer
column_names = ["var1", "var2"]
# Initialize the Imputer
imputer = Imputer(inputCols= column_names, outputCols= column_names, strategy="mean")
# Fit the Imputer
model = imputer.fit(df)
#Transform the dataset
imputed_df = model.transform(df)
imputed_df.show(5)
+----+----+----+
|Name|var1|var2|
+----+----+----+
| A| 1| 289|
| B| 3| 123|
| B| 3| 456|
| D| 6| 289|
+----+----+----+
34. How to change the order of columns of a dataframe?
Difficulty Level: L1
Input
# Sample data
data = [("John", "Doe", 30), ("Jane", "Doe", 25), ("Alice", "Smith", 22)]
# Create DataFrame from the data
df = spark.createDataFrame(data, ["First_Name", "Last_Name", "Age"])
# Show the DataFrame
df.show()
+----------+---------+---+
|First_Name|Last_Name|Age|
+----------+---------+---+
| John| Doe| 30|
| Jane| Doe| 25|
| Alice| Smith| 22|
+----------+---------+---+
Show Solutionnew_order = ["Age", "First_Name", "Last_Name"]
# Reorder the columns
df = df.select(*new_order)
# Show the DataFrame with reordered columns
df.show()
+---+----------+---------+
|Age|First_Name|Last_Name|
+---+----------+---------+
| 30| John| Doe|
| 25| Jane| Doe|
| 22| Alice| Smith|
+---+----------+---------+
35. How to format or suppress scientific notations in a PySpark DataFrame?
# Assuming you have a DataFrame df and the column you want to format is 'your_column'
df = spark.createDataFrame([(1, 0.000000123), (2, 0.000023456), (3, 0.000345678)], ["id", "your_column"])
df.show()
+---+-----------+
| id|your_column|
+---+-----------+
| 1| 1.23E-7|
| 2| 2.3456E-5|
| 3| 3.45678E-4|
+---+-----------+
Show Solutionfrom pyspark.sql.functions import format_number
# Determine the number of decimal places you want
decimal_places = 10
df = df.withColumn("your_column", format_number("your_column", decimal_places))
df.show()
+---+------------+
| id| your_column|
+---+------------+
| 1|0.0000001230|
| 2|0.0000234560|
| 3|0.0003456780|
+---+------------+
36. How to format all the values in a dataframe as percentages?
Difficulty Level: L2
Input
# Sample data
data = [(0.1, .08), (0.2, .06), (0.33, .02)]
df = spark.createDataFrame(data, ["numbers_1", "numbers_2"])
df.show()
+---------+---------+
|numbers_1|numbers_2|
+---------+---------+
| 0.1| 0.08|
| 0.2| 0.06|
| 0.33| 0.02|
+---------+---------+
Show Solutionfrom pyspark.sql.functions import concat, col, lit
columns = ["numbers_1", "numbers_2"]
for col_name in columns:
df = df.withColumn(col_name, concat((col(col_name) * 100).cast('decimal(10, 2)'), lit("%")))
df.show()
+---------+---------+
|numbers_1|numbers_2|
+---------+---------+
| 10.00%| 8.00%|
| 20.00%| 6.00%|
| 33.00%| 2.00%|
+---------+---------+
37. How to filter every nth row in a dataframe?
Difficulty Level: L2
Input
# Sample data
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Dave", 4), ("Eve", 5),
("Frank", 6), ("Grace", 7), ("Hannah", 8), ("Igor", 9), ("Jack", 10)]
# Create DataFrame
df = spark.createDataFrame(data, ["Name", "Number"])
df.show()
+-------+------+
| Name|Number|
+-------+------+
| Alice| 1|
| Bob| 2|
|Charlie| 3|
| Dave| 4|
| Eve| 5|
| Frank| 6|
| Grace| 7|
| Hannah| 8|
| Igor| 9|
| Jack| 10|
+-------+------+
Show Solution# Define window
window = Window.orderBy(monotonically_increasing_id())
# Add row_number to DataFrame
df = df.withColumn("rn", row_number().over(window))
n = 5 # filter every 5th row
# Filter every nth row
df = df.filter((df.rn % n) == 0)
df.show()
+----+------+---+
|Name|Number| rn|
+----+------+---+
| Eve| 5| 5|
|Jack| 10| 10|
+----+------+---+
38 How to get the row number of the nth largest value in a column?
Difficulty Level: L2
Input
from pyspark.sql import Row
# Sample Data
data = [
Row(id=1, column1=5),
Row(id=2, column1=8),
Row(id=3, column1=12),
Row(id=4, column1=1),
Row(id=5, column1=15),
Row(id=6, column1=7),
]
df = spark.createDataFrame(data)
df.show()
+---+-------+
| id|column1|
+---+-------+
| 1| 5|
| 2| 8|
| 3| 12|
| 4| 1|
| 5| 15|
| 6| 7|
+---+-------+
Show Solutionfrom pyspark.sql.window import Window
from pyspark.sql.functions import desc, row_number
window = Window.orderBy(desc("column1"))
df = df.withColumn("row_number", row_number().over(window))
n = 3 # We're interested in the 3rd largest value.
row = df.filter(df.row_number == n).first()
if row:
print("Row number:", row.row_number)
print("Column value:", row.column1)
Row number: 3
Column value: 8
39. How to get the last n rows of a dataframe with row sum > 100?
Difficulty Level: L2
Input
# Sample data
data = [(10, 25, 70),
(40, 5, 20),
(70, 80, 100),
(10, 2, 60),
(40, 50, 20)]
# Create DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])
# Display original DataFrame
df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 10| 25| 70|
| 40| 5| 20|
| 70| 80| 100|
| 10| 2| 60|
| 40| 50| 20|
+----+----+----+
Show Solutionfrom pyspark.sql import functions as F
from functools import reduce
# Add 'row_sum' column
df = df.withColumn('row_sum', reduce(lambda a, b: a+b, [F.col(x) for x in df.columns]))
# Display DataFrame with 'row_sum'
df.show()
# Filter rows where 'row_sum' > 100
df = df.filter(F.col('row_sum') > 100)
# Display filtered DataFrame
df.show()
# Add 'id' column
df = df.withColumn('id', F.monotonically_increasing_id())
# Get the last 2 rows
df_last_2 = df.sort(F.desc('id')).limit(2)
# Display the last 2 rows
df_last_2.show()
+----+----+----+-------+
|col1|col2|col3|row_sum|
+----+----+----+-------+
| 10| 25| 70| 105|
| 40| 5| 20| 65|
| 70| 80| 100| 250|
| 10| 2| 60| 72|
| 40| 50| 20| 110|
+----+----+----+-------+
+----+----+----+-------+
|col1|col2|col3|row_sum|
+----+----+----+-------+
| 10| 25| 70| 105|
| 70| 80| 100| 250|
| 40| 50| 20| 110|
+----+----+----+-------+
+----+----+----+-------+-----------+
|col1|col2|col3|row_sum| id|
+----+----+----+-------+-----------+
| 40| 50| 20| 110|25769803776|
| 70| 80| 100| 250|17179869184|
+----+----+----+-------+-----------+
40. How to create a column containing the minimum by maximum of each row?
Difficulty Level: L2
Input
# Sample Data
data = [(1, 2, 3), (4, 5, 6), (7, 8, 9), (10, 11, 12)]
# Create DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])
df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| 2| 3|
| 4| 5| 6|
| 7| 8| 9|
| 10| 11| 12|
+----+----+----+
Show Solutionfrom pyspark.sql.functions import udf, array
from pyspark.sql.types import FloatType
# Define UDF
def min_max_ratio(row):
return float(min(row)) / max(row)
min_max_ratio_udf = udf(min_max_ratio, FloatType())
# Apply UDF to create new column
df = df.withColumn('min_by_max', min_max_ratio_udf(array(df.columns)))
df.show()
+----+----+----+----------+
|col1|col2|col3|min_by_max|
+----+----+----+----------+
| 1| 2| 3|0.33333334|
| 4| 5| 6| 0.6666667|
| 7| 8| 9| 0.7777778|
| 10| 11| 12| 0.8333333|
+----+----+----+----------+
41. How to create a column that contains the penultimate value in each row?
Difficulty Level: L2
Create a new column ‘penultimate’ which has the second largest value of each row of df
Input
data = [(10, 20, 30),
(40, 60, 50),
(80, 70, 90)]
df = spark.createDataFrame(data, ["Column1", "Column2", "Column3"])
df.show()
+-------+-------+-------+
|Column1|Column2|Column3|
+-------+-------+-------+
| 10| 20| 30|
| 40| 60| 50|
| 80| 70| 90|
+-------+-------+-------+
Show Solutionfrom pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, IntegerType
# Define UDF to sort array in descending order
sort_array_desc = F.udf(lambda arr: sorted(arr), ArrayType(IntegerType()))
# Create array from columns, sort in descending order and get the penultimate value
df = df.withColumn("row_as_array", sort_array_desc(F.array(df.columns)))
df = df.withColumn("Penultimate", df['row_as_array'].getItem(1))
df = df.drop('row_as_array')
df.show()
+-------+-------+-------+-----------+
|Column1|Column2|Column3|Penultimate|
+-------+-------+-------+-----------+
| 10| 20| 30| 20|
| 40| 60| 50| 50|
| 80| 70| 90| 80|
+-------+-------+-------+-----------+
42. How to normalize all columns in a dataframe?
Difficulty Level: L2
Normalize all columns of df by subtracting the column mean and divide by standard deviation.
Range all columns of df such that the minimum value in each column is 0 and max is 1.
Input
# create a sample dataframe
data = [(1, 2, 3),
(2, 3, 4),
(3, 4, 5),
(4, 5, 6)]
df = spark.createDataFrame(data, ["Col1", "Col2", "Col3"])
df.show()
+----+----+----+
|Col1|Col2|Col3|
+----+----+----+
| 1| 2| 3|
| 2| 3| 4|
| 3| 4| 5|
| 4| 5| 6|
+----+----+----+
Show Solutionfrom pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import col
# define the list of columns to be normalized
input_cols = ["Col1", "Col2", "Col3"]
# initialize VectorAssembler with input and output column names
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
# transform the data
df_assembled = assembler.transform(df)
# initialize StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
# fit and transform the data
scalerModel = scaler.fit(df_assembled)
df_normalized = scalerModel.transform(df_assembled)
# if you want to drop the original 'features' column
df_normalized = df_normalized.drop('features')
df_normalized.show(truncate=False)
+----+----+----+-------------------------------------------------------------+
|Col1|Col2|Col3|scaled_features |
+----+----+----+-------------------------------------------------------------+
|1 |2 |3 |[-1.161895003862225,-1.161895003862225,-1.161895003862225] |
|2 |3 |4 |[-0.3872983346207417,-0.3872983346207417,-0.3872983346207417]|
|3 |4 |5 |[0.3872983346207417,0.3872983346207417,0.3872983346207417] |
|4 |5 |6 |[1.161895003862225,1.161895003862225,1.161895003862225] |
+----+----+----+-------------------------------------------------------------+
43. How to get the positions where values of two columns match?
Difficulty Level: L1
Input
# Create sample DataFrame
data = [("John", "John"), ("Lily", "Lucy"), ("Sam", "Sam"), ("Lucy", "Lily")]
df = spark.createDataFrame(data, ["Name1", "Name2"])
df.show()
+-----+-----+
|Name1|Name2|
+-----+-----+
| John| John|
| Lily| Lucy|
| Sam| Sam|
| Lucy| Lily|
+-----+-----+
Show Solutionfrom pyspark.sql.functions import when
from pyspark.sql.functions import col
# Add new column Match to indicate if Name1 and Name2 match
df = df.withColumn("Match", when(col("Name1") == col("Name2"), True).otherwise(False))
# Display DataFrame
df.show()
+-----+-----+-----+
|Name1|Name2|Match|
+-----+-----+-----+
| John| John| true|
| Lily| Lucy|false|
| Sam| Sam| true|
| Lucy| Lily|false|
+-----+-----+-----+
44. How to create lags and leads of a column by group in a dataframe?
Difficulty Level: L2
Input
# Create a sample DataFrame
data = [("2023-01-01", "Store1", 100),
("2023-01-02", "Store1", 150),
("2023-01-03", "Store1", 200),
("2023-01-04", "Store1", 250),
("2023-01-05", "Store1", 300),
("2023-01-01", "Store2", 50),
("2023-01-02", "Store2", 60),
("2023-01-03", "Store2", 80),
("2023-01-04", "Store2", 90),
("2023-01-05", "Store2", 120)]
df = spark.createDataFrame(data, ["Date", "Store", "Sales"])
df.show()
+----------+------+-----+
| Date| Store|Sales|
+----------+------+-----+
|2023-01-01|Store1| 100|
|2023-01-02|Store1| 150|
|2023-01-03|Store1| 200|
|2023-01-04|Store1| 250|
|2023-01-05|Store1| 300|
|2023-01-01|Store2| 50|
|2023-01-02|Store2| 60|
|2023-01-03|Store2| 80|
|2023-01-04|Store2| 90|
|2023-01-05|Store2| 120|
+----------+------+-----+
Show Solutionfrom pyspark.sql.functions import lag, lead, to_date
from pyspark.sql.window import Window
# Convert the date from string to date type
df = df.withColumn("Date", to_date(df.Date, 'yyyy-MM-dd'))
# Create a Window partitioned by Store, ordered by Date
windowSpec = Window.partitionBy("Store").orderBy("Date")
# Create lag and lead variables
df = df.withColumn("Lag_Sales", lag(df["Sales"]).over(windowSpec))
df = df.withColumn("Lead_Sales", lead(df["Sales"]).over(windowSpec))
df.show()
+----------+------+-----+---------+----------+
| Date| Store|Sales|Lag_Sales|Lead_Sales|
+----------+------+-----+---------+----------+
|2023-01-01|Store1| 100| null| 150|
|2023-01-02|Store1| 150| 100| 200|
|2023-01-03|Store1| 200| 150| 250|
|2023-01-04|Store1| 250| 200| 300|
|2023-01-05|Store1| 300| 250| null|
|2023-01-01|Store2| 50| null| 60|
|2023-01-02|Store2| 60| 50| 80|
|2023-01-03|Store2| 80| 60| 90|
|2023-01-04|Store2| 90| 80| 120|
|2023-01-05|Store2| 120| 90| null|
+----------+------+-----+---------+----------+
45. How to get the frequency of unique values in the entire dataframe?
Difficulty Level: L3
Get the frequency of unique values in the entire dataframe df.
Input
# Create a numeric DataFrame
data = [(1, 2, 3),
(2, 3, 4),
(1, 2, 3),
(4, 5, 6),
(2, 3, 4)]
df = spark.createDataFrame(data, ["Column1", "Column2", "Column3"])
# Print DataFrame
df.show()
+-------+-------+-------+
|Column1|Column2|Column3|
+-------+-------+-------+
| 1| 2| 3|
| 2| 3| 4|
| 1| 2| 3|
| 4| 5| 6|
| 2| 3| 4|
+-------+-------+-------+
Show Solutionfrom pyspark.sql.functions import col
# get column names
columns = df.columns
# stack all columns into a single column
df_single = None
for c in columns:
if df_single is None:
df_single = df.select(col(c).alias("single_column"))
else:
df_single = df_single.union(df.select(col(c).alias("single_column")))
# generate frequency table
frequency_table = df_single.groupBy("single_column").count().orderBy('count', ascending=False)
# show frequency table
frequency_table.show()
+-------------+-----+
|single_column|count|
+-------------+-----+
| 3| 4|
| 2| 4|
| 4| 3|
| 1| 2|
| 5| 1|
| 6| 1|
+-------------+-----+
46. How to replace both the diagonals of dataframe with 0?
Difficulty Level: L3
Replace both values in both diagonals of df with 0.
Input
# Create a numeric DataFrame
data = [(1, 2, 3, 4),
(2, 3, 4, 5),
(1, 2, 3, 4),
(4, 5, 6, 7)]
df = spark.createDataFrame(data, ["col_1", "col_2", "col_3", "col_4"])
# Print DataFrame
df.show()
+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
| 1| 2| 3| 4|
| 2| 3| 4| 5|
| 1| 2| 3| 4|
| 4| 5| 6| 7|
+-----+-----+-----+-----+
Show Solutionfrom pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql.functions import when, col
# Define window specification
w = Window.orderBy(monotonically_increasing_id())
# Add index
df = df.withColumn("id", row_number().over(w) - 1)
df = df.select([when(col("id") == i, 0).otherwise(col("col_"+str(i+1))).alias("col_"+str(i+1)) for i in range(4)])
# Create a reverse id column
df = df.withColumn("id", row_number().over(w) - 1)
df = df.withColumn("id_2", df.count() - 1 - df["id"])
df_with_diag_zero = df.select([when(col("id_2") == i, 0).otherwise(col("col_"+str(i+1))).alias("col_"+str(i+1)) for i in range(4)])
df_with_diag_zero.show()
+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
| 0| 2| 3| 0|
| 2| 0| 0| 5|
| 1| 0| 0| 4|
| 0| 5| 6| 0|
+-----+-----+-----+-----+
47. How to reverse the rows of a dataframe?
Difficulty Level: L2
Reverse all the rows of dataframe df.
Input
# Create a numeric DataFrame
data = [(1, 2, 3, 4),
(2, 3, 4, 5),
(3, 4, 5, 6),
(4, 5, 6, 7)]
df = spark.createDataFrame(data, ["col_1", "col_2", "col_3", "col_4"])
# Print DataFrame
df.show()
+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
| 1| 2| 3| 4|
| 2| 3| 4| 5|
| 3| 4| 5| 6|
| 4| 5| 6| 7|
+-----+-----+-----+-----+
Show Solutionfrom pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id
# Define window specification
w = Window.orderBy(monotonically_increasing_id())
# Add index
df = df.withColumn("id", row_number().over(w) - 1)
df_2 = df.orderBy("id", ascending=False).drop("id")
df_2.show()
+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
| 4| 5| 6| 7|
| 3| 4| 5| 6|
| 2| 3| 4| 5|
| 1| 2| 3| 4|
+-----+-----+-----+-----+
48. How to create one-hot encodings of a categorical variable (dummy variables)?
Difficulty Level: L2
Get one-hot encodings for column Categories
in the dataframe df
and append it as columns.
Input
data = [("A", 10),("A", 20),("B", 30),("B", 20),("B", 30),("C", 40),("C", 10),("D", 10)]
columns = ["Categories", "Value"]
df = spark.createDataFrame(data, columns)
df.show()
+----------+-----+
|Categories|Value|
+----------+-----+
| A| 10|
| A| 20|
| B| 30|
| B| 20|
| B| 30|
| C| 40|
| C| 10|
| D| 10|
+----------+-----+
Show Solutionfrom pyspark.ml.feature import StringIndexer, OneHotEncoder
#from pyspark.sql.types import StringType, StructType, StructField
# StringIndexer Initialization
indexer = StringIndexer(inputCol="Categories", outputCol="Categories_Indexed")
indexerModel = indexer.fit(df)
# Transform the DataFrame using the fitted StringIndexer model
indexed_df = indexerModel.transform(df)
#indexed_df.show()
encoder = OneHotEncoder(inputCol="Categories_Indexed", outputCol="Categories_onehot")
encoded_df = encoder.fit(indexed_df).transform(indexed_df)
encoded_df = encoded_df.drop("Categories_Indexed")
encoded_df.show(truncate=False)
+----------+-----+-----------------+
|Categories|Value|Categories_onehot|
+----------+-----+-----------------+
|A |10 |(3,[1],[1.0]) |
|A |20 |(3,[1],[1.0]) |
|B |30 |(3,[0],[1.0]) |
|B |20 |(3,[0],[1.0]) |
|B |30 |(3,[0],[1.0]) |
|C |40 |(3,[2],[1.0]) |
|C |10 |(3,[2],[1.0]) |
|D |10 |(3,[],[]) |
+----------+-----+-----------------+
49. How to Pivot the dataframe (converting rows into columns) ?
Difficulty Level: L2
convert region
column categories to Column
Input
# Sample data
data = [
(2021, 1, "US", 5000),
(2021, 1, "EU", 4000),
(2021, 2, "US", 5500),
(2021, 2, "EU", 4500),
(2021, 3, "US", 6000),
(2021, 3, "EU", 5000),
(2021, 4, "US", 7000),
(2021, 4, "EU", 6000),
]
# Create DataFrame
columns = ["year", "quarter", "region", "revenue"]
df = spark.createDataFrame(data, columns)
Show Solution# Execute the pivot operation
pivot_df = df.groupBy("year", "quarter").pivot("region").sum("revenue")
pivot_df.show()
+----+-------+----+----+
|year|quarter| EU| US|
+----+-------+----+----+
|2021| 2|4500|5500|
|2021| 1|4000|5000|
|2021| 3|5000|6000|
|2021| 4|6000|7000|
+----+-------+----+----+
50. How to UnPivot the dataframe (converting columns into rows) ?
Difficulty Level: L2
UnPivot EU
, US
columns and create region
, revenue
Columns
Input
# Sample data
data = [(2021, 2, 4500, 5500),
(2021, 1, 4000, 5000),
(2021, 3, 5000, 6000),
(2021, 4, 6000, 7000)]
# Create DataFrame
columns = ["year", "quarter", "EU", "US"]
df = spark.createDataFrame(data, columns)
df.show()
+----+-------+----+----+
|year|quarter| EU| US|
+----+-------+----+----+
|2021| 2|4500|5500|
|2021| 1|4000|5000|
|2021| 3|5000|6000|
|2021| 4|6000|7000|
+----+-------+----+----+
Expected Output
+----+-------+------+-------+
|year|quarter|region|revenue|
+----+-------+------+-------+
|2021| 2| EU| 4500|
|2021| 2| US| 5500|
|2021| 1| EU| 4000|
|2021| 1| US| 5000|
|2021| 3| EU| 5000|
|2021| 3| US| 6000|
|2021| 4| EU| 6000|
|2021| 4| US| 7000|
+----+-------+------+-------+
Show Solutionfrom pyspark.sql.functions import expr
unpivotExpr = "stack(2, 'EU',EU, 'US', US) as (region,revenue)"
unPivotDF = pivot_df.select("year","quarter", expr(unpivotExpr)).where("revenue is not null")
unPivotDF.show()
+----+-------+------+-------+
|year|quarter|region|revenue|
+----+-------+------+-------+
|2021| 2| EU| 4500|
|2021| 2| US| 5500|
|2021| 1| EU| 4000|
|2021| 1| US| 5000|
|2021| 3| EU| 5000|
|2021| 3| US| 6000|
|2021| 4| EU| 6000|
|2021| 4| US| 7000|
+----+-------+------+-------+
51. How to impute missing values with Zero?
Difficulty Level: L1
Input
# Suppose df is your DataFrame
df = spark.createDataFrame([(1, None), (None, 2), (3, 4), (5, None)], ["a", "b"])
df.show()
+----+----+
| a| b|
+----+----+
| 1|null|
|null| 2|
| 3| 4|
| 5|null|
+----+----+
Show Solutiondf_imputed = df.fillna(0)
df_imputed.show()
+---+---+
| a| b|
+---+---+
| 1| 0|
| 0| 2|
| 3| 4|
| 5| 0|
+---+---+
52. How to identify continuous variables in a dataframe and create a list of those column names?
Difficulty Level: L3
Input
url = "https://raw.githubusercontent.com/selva86/datasets/master/Churn_Modelling_m.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("Churn_Modelling_m.csv"), header=True, inferSchema=True)
#df = spark.read.csv("C:/Users/RajeshVaddi/Documents/MLPlus/DataSets/Churn_Modelling_m.csv", header=True, inferSchema=True)
df.show(2, truncate=False)
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|Surname |CreditScore|Geography|Gender|Age|Tenure|Balance |NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|1 |15634602 |Hargrave|619 |France |Female|42 |2 |0.0 |1 |1 |1 |101348.88 |1 |
|2 |15647311 |Hill |608 |Spain |Female|41 |1 |83807.86|1 |0 |1 |112542.58 |0 |
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
only showing top 2 rows
Show Solutionfrom pyspark.sql.types import IntegerType, StringType, NumericType
from pyspark.sql.functions import approxCountDistinct
def detect_continuous_variables(df, distinct_threshold):
"""
Identify continuous variables in a PySpark DataFrame.
:param df: The input PySpark DataFrame
:param distinct_threshold: Threshold to qualify as continuous variables - Count of distinct values > distinct_threshold
:return: A List containing names of continuous variables
"""
continuous_columns = []
for column in df.columns:
dtype = df.schema[column].dataType
if isinstance(dtype, (IntegerType, NumericType)):
distinct_count = df.select(approxCountDistinct(column)).collect()[0][0]
if distinct_count > distinct_threshold:
continuous_columns.append(column)
return continuous_columns
continuous_variables = detect_continuous_variables(df, 10)
print(continuous_variables)
['RowNumber', 'CustomerId', 'CreditScore', 'Age', 'Tenure', 'Balance', 'EstimatedSalary']
53. How to calculate Mode of a PySpark DataFrame column?
Difficulty Level: L1
Input
# Create a sample DataFrame
data = [(1, 2, 3), (2, 2, 3), (2, 2, 4), (1, 2, 3), (1, 1, 3)]
columns = ["col1", "col2", "col3"]
df = spark.createDataFrame(data, columns)
df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| 2| 3|
| 2| 2| 3|
| 2| 2| 4|
| 1| 2| 3|
| 1| 1| 3|
+----+----+----+
Show Solutionfrom pyspark.sql.functions import col
df_grouped = df.groupBy('col2').count()
mode_df = df_grouped.orderBy(col('count').desc()).limit(1)
mode_df.show()
+----+-----+
|col2|count|
+----+-----+
| 2| 4|
+----+-----+
54. How to find installed location of Apache Spark and PySpark?
Difficulty Level: L1
Show Solutionimport findspark
findspark.init()
print(findspark.find())
import os
import pyspark
print(os.path.dirname(pyspark.__file__))
C:\spark\spark-3.3.2-bin-hadoop2
C:\spark\spark-3.3.2-bin-hadoop2\python\pyspark
55. How to convert a column to lower case using UDF?
Difficulty Level: L2
Input
# Create a DataFrame to test
data = [('John Doe', 'NEW YORK'),
('Jane Doe', 'LOS ANGELES'),
('Mike Johnson', 'CHICAGO'),
('Sara Smith', 'SAN FRANCISCO')]
df = spark.createDataFrame(data, ['Name', 'City'])
df.show()
+------------+-------------+
| Name| City|
+------------+-------------+
| John Doe| NEW YORK|
| Jane Doe| LOS ANGELES|
|Mike Johnson| CHICAGO|
| Sara Smith|SAN FRANCISCO|
+------------+-------------+
Show Solutionfrom pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Define your UDF function
def to_lower(s):
if s is not None:
return s.lower()
# Convert your Python function to a Spark UDF
udf_to_lower = udf(to_lower, StringType())
# Apply your UDF to the DataFrame
df = df.withColumn('City_lower', udf_to_lower(df['City']))
# Show the DataFrame
df.show()
+------------+-------------+-------------+
| Name| City| City_lower|
+------------+-------------+-------------+
| John Doe| NEW YORK| new york|
| Jane Doe| LOS ANGELES| los angeles|
|Mike Johnson| CHICAGO| chicago|
| Sara Smith|SAN FRANCISCO|san francisco|
+------------+-------------+-------------+
56. How to convert PySpark data frame to pandas dataframe?
Difficulty Level: L1
Input
# Create a DataFrame to test
data = [('John Doe', 'NEW YORK'),
('Jane Doe', 'LOS ANGELES'),
('Mike Johnson', 'CHICAGO'),
('Sara Smith', 'SAN FRANCISCO')]
pysparkDF = spark.createDataFrame(data, ['Name', 'City'])
pysparkDF.show()
+------------+-------------+
| Name| City|
+------------+-------------+
| John Doe| NEW YORK|
| Jane Doe| LOS ANGELES|
|Mike Johnson| CHICAGO|
| Sara Smith|SAN FRANCISCO|
+------------+-------------+
Show Solution# convert PySpark data frame to pandas
pandasDF = pysparkDF.toPandas()
print(pandasDF)
Name City
0 John Doe NEW YORK
1 Jane Doe LOS ANGELES
2 Mike Johnson CHICAGO
3 Sara Smith SAN FRANCISCO
57. How to View PySpark Cluster Details?
Difficulty Level: L1
Show Solutionprint(spark.sparkContext.uiWebUrl)
http://DESKTOP-UL3QT3E.mshome.net:4040
58. How to View PySpark Cluster Configuration Details?
Difficulty Level: L1
Show Solution# Print all configurations
for k,v in spark.sparkContext.getConf().getAll():
print(f"{k} : {v}")
spark.app.name : PySpark 101 Exercises
spark.driver.extraJavaOptions : -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED
spark.app.startTime : 1684510291553
spark.app.id : local-1684510293468
spark.driver.host : DESKTOP-UL3QT3E.mshome.net
spark.executor.id : driver
spark.sql.warehouse.dir : file:/C:/Users/RajeshVaddi/Documents/MLPlus/6_PySpark%20101%20Exercises/spark-warehouse
spark.driver.port : 50321
spark.app.submitTime : 1684510291319
spark.rdd.compress : True
spark.executor.extraJavaOptions : -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED
spark.serializer.objectStreamReset : 100
spark.master : local[*]
spark.submit.pyFiles :
spark.submit.deployMode : client
spark.ui.showConsoleProgress : true
59. How to restrict the PySpark to use the number of cores in the system?
Difficulty Level: L1
Show Solutionfrom pyspark import SparkConf, SparkContext
conf = SparkConf()
conf.set("spark.executor.cores", "2") # set the number of cores you want here
sc = SparkContext(conf=conf)
60. How to cache PySpark DataFrame or objects and delete cache?
Difficulty Level: L2
In PySpark, caching or persisting data is done to speed up data retrieval during iterative and interactive computations.
Show Solution# Caching the DataFrame
df.cache()
# un-cache or unpersist data using the unpersist() method.
df.unpersist()
DataFrame[Name: string, City: string, City_lower: string]
61. How to Divide a PySpark DataFrame randomly in a given ratio (0.8, 0.2)?
Difficulty Level: L1
Show Solution# Randomly split data (0.8, 0.2)
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)
62. How to build logistic regression in PySpark?
Difficulty Level: L2
Input
# Create a sample dataframe
data = spark.createDataFrame([
(0, 1.0, -1.0),
(1, 2.0, 1.0),
(1, 3.0, -2.0),
(0, 4.0, 1.0),
(1, 5.0, -3.0),
(0, 6.0, 2.0),
(1, 7.0, -1.0),
(0, 8.0, 3.0),
(1, 9.0, -2.0),
(0, 10.0, 2.0),
(1, 11.0, -3.0),
(0, 12.0, 1.0),
(1, 13.0, -1.0),
(0, 14.0, 2.0),
(1, 15.0, -2.0),
(0, 16.0, 3.0),
(1, 17.0, -3.0),
(0, 18.0, 1.0),
(1, 19.0, -1.0),
(0, 20.0, 2.0)
], ["label", "feat1", "feat2"])
Show Solutionfrom pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
# convert the feature columns into a single vector column using VectorAssembler
vecAssembler = VectorAssembler(inputCols=['feat1', 'feat2'], outputCol="features")
data = vecAssembler.transform(data)
# fit the logistic regression model
lr = LogisticRegression(featuresCol='features', labelCol='label')
lr_model = lr.fit(data)
# look at the coefficients and intercept of the logistic regression model
print(f"Coefficients: {str(lr_model.coefficients)}")
print(f"Intercept: {str(lr_model.intercept)}")
Coefficients: [0.020277740475786673,-1.612960940022365]
Intercept: -0.2209292751829534
63. How to convert the categorical string data into numerical data or index?
Difficulty Level: L2
Input
# Create a sample DataFrame
data = [('cat',), ('dog',), ('mouse',), ('fish',), ('dog',), ('cat',), ('mouse',)]
df = spark.createDataFrame(data, ["animal"])
df.show()
+------+
|animal|
+------+
| cat|
| dog|
| mouse|
| fish|
| dog|
| cat|
| mouse|
+------+
Show Solutionfrom pyspark.ml.feature import StringIndexer
# Initialize a StringIndexer
indexer = StringIndexer(inputCol='animal', outputCol='animalIndex')
# Fit the indexer to the DataFrame and transform the data
indexed = indexer.fit(df).transform(df)
indexed.show()
+------+-----------+
|animal|animalIndex|
+------+-----------+
| cat| 0.0|
| dog| 1.0|
| mouse| 2.0|
| fish| 3.0|
| dog| 1.0|
| cat| 0.0|
| mouse| 2.0|
+------+-----------+
64. How to calculate Correlation of two variables in a DataFrame?
Difficulty Level: L1
Input
# Create a sample dataframe
data = [Row(feature1=5, feature2=10, feature3=25),
Row(feature1=6, feature2=15, feature3=35),
Row(feature1=7, feature2=25, feature3=30),
Row(feature1=8, feature2=20, feature3=60),
Row(feature1=9, feature2=30, feature3=70)]
df = spark.createDataFrame(data)
df.show()
+--------+--------+--------+
|feature1|feature2|feature3|
+--------+--------+--------+
| 5| 10| 25|
| 6| 15| 35|
| 7| 25| 30|
| 8| 20| 60|
| 9| 30| 70|
+--------+--------+--------+
Show Solution# Calculate correlation
correlation = df.corr("feature1", "feature2")
print("Correlation between feature1 and feature2 :", correlation)
Correlation between feature1 and feature2 : 0.9
65. How to calculate Correlation Matrix?
Difficulty Level: L2
Input
# Create a sample dataframe
data = [Row(feature1=5, feature2=10, feature3=25),
Row(feature1=6, feature2=15, feature3=35),
Row(feature1=7, feature2=25, feature3=30),
Row(feature1=8, feature2=20, feature3=60),
Row(feature1=9, feature2=30, feature3=70)]
df = spark.createDataFrame(data)
df.show()
+--------+--------+--------+
|feature1|feature2|feature3|
+--------+--------+--------+
| 5| 10| 25|
| 6| 15| 35|
| 7| 25| 30|
| 8| 20| 60|
| 9| 30| 70|
+--------+--------+--------+
Show Solution# Calculate Correlation Using Using MLlib
from pyspark.ml.stat import Correlation
# Assemble feature vector
# Define the feature and label columns & Assemble the feature vector
vector_assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
data_vector = vector_assembler.transform(df).select("features")
# Calculate correlation
correlation_matrix = Correlation.corr(data_vector, "features").head()[0]
print(correlation_matrix)
DenseMatrix([[1. , 0.9 , 0.91779992],
[0.9 , 1. , 0.67837385],
[0.91779992, 0.67837385, 1. ]])
66. How to calculate VIF (Variance Inflation Factor ) for set of variables in a DataFrame?
Difficulty Level: L3
Input
# Create a sample dataframe
data = [Row(feature1=5, feature2=10, feature3=25),
Row(feature1=6, feature2=15, feature3=35),
Row(feature1=7, feature2=25, feature3=30),
Row(feature1=8, feature2=20, feature3=60),
Row(feature1=9, feature2=30, feature3=70)]
df = spark.createDataFrame(data)
df.show()
+--------+--------+--------+
|feature1|feature2|feature3|
+--------+--------+--------+
| 5| 10| 25|
| 6| 15| 35|
| 7| 25| 30|
| 8| 20| 60|
| 9| 30| 70|
+--------+--------+--------+
Show Solutionfrom pyspark.sql import SparkSession, Row
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
def calculate_vif(data, features):
vif_dict = {}
for feature in features:
non_feature_cols = [col for col in features if col != feature]
assembler = VectorAssembler(inputCols=non_feature_cols, outputCol="features")
lr = LinearRegression(featuresCol='features', labelCol=feature)
model = lr.fit(assembler.transform(data))
vif = 1 / (1 - model.summary.r2)
vif_dict[feature] = vif
return vif_dict
features = ['feature1', 'feature2', 'feature3']
vif_values = calculate_vif(df, features)
for feature, vif in vif_values.items():
print(f'VIF for {feature}: {vif}')
VIF for feature1: 66.2109375000003
VIF for feature2: 19.33593749999992
VIF for feature3: 23.30468749999992
67. How to perform Chi-Square test?
Difficulty Level: L2
Input
# Create a sample dataframe
data = [(1, 0, 0, 1, 1),
(2, 0, 1, 0, 0),
(3, 1, 0, 0, 0),
(4, 0, 0, 1, 1),
(5, 0, 1, 1, 0)]
df = spark.createDataFrame(data, ["id", "feature1", "feature2", "feature3", "label"])
df.show()
+---+--------+--------+--------+-----+
| id|feature1|feature2|feature3|label|
+---+--------+--------+--------+-----+
| 1| 0| 0| 1| 1|
| 2| 0| 1| 0| 0|
| 3| 1| 0| 0| 0|
| 4| 0| 0| 1| 1|
| 5| 0| 1| 1| 0|
+---+--------+--------+--------+-----+
Show Solutionfrom pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
df = assembler.transform(df)
from pyspark.ml.stat import ChiSquareTest
r = ChiSquareTest.test(df, "features", "label").head()
print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))
pValues: [0.36131042852617856,0.13603712811414348,0.1360371281141436]
degreesOfFreedom: [1, 1, 1]
statistics: [0.8333333333333335,2.2222222222222228,2.2222222222222223]
68. How to calculate the Standard Deviation?
Difficulty Level: L1
Input
# Sample data
data = [("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)]
# Create DataFrame
df = spark.createDataFrame(data, ["Employee", "Department", "Salary"])
df.show()
+--------+----------+------+
|Employee|Department|Salary|
+--------+----------+------+
| James| Sales| 3000|
| Michael| Sales| 4600|
| Robert| Sales| 4100|
| Maria| Finance| 3000|
| James| Sales| 3000|
| Scott| Finance| 3300|
| Jen| Finance| 3900|
| Jeff| Marketing| 3000|
| Kumar| Marketing| 2000|
| Saif| Sales| 4100|
+--------+----------+------+
Show Solutionfrom pyspark.sql.functions import stddev
salary_stddev = df.select(stddev("Salary").alias("stddev"))
salary_stddev.show()
+-----------------+
| stddev|
+-----------------+
|765.9416862050705|
+-----------------+
69. How to calculate missing value percentage in each column?
Difficulty Level: L3
Input
# Create a sample dataframe
data = [("John", "Doe", None),
(None, "Smith", "New York"),
("Mike", "Smith", None),
("Anna", "Smith", "Boston"),
(None, None, None)]
df = spark.createDataFrame(data, ["FirstName", "LastName", "City"])
df.show()
+---------+--------+--------+
|FirstName|LastName| City|
+---------+--------+--------+
| John| Doe| null|
| null| Smith|New York|
| Mike| Smith| null|
| Anna| Smith| Boston|
| null| null| null|
+---------+--------+--------+
Show Solution# Calculate the total number of rows in the dataframe
total_rows = df.count()
# For each column calculate the number of null values and then calculate the percentage
for column in df.columns:
null_values = df.filter(df[column].isNull()).count()
missing_percentage = (null_values / total_rows) * 100
print(f"Missing values in {column}: {missing_percentage}%")
Missing values in FirstName: 40.0%
Missing values in LastName: 20.0%
Missing values in City: 60.0%
70. How to get the names of DataFrame objects that have been created in an environment?
Difficulty Level: L2
Show Solutiondataframe_names = [name for name, obj in globals().items() if isinstance(obj, pyspark.sql.DataFrame)]
for name in dataframe_names:
print(name)