Menu

PySpark withColumn – A Comprehensive Guide on PySpark “withColumn” and Examples

The “withColumn” function in PySpark allows you to add, replace, or update columns in a DataFrame. It is a DataFrame transformation operation, meaning it returns a new DataFrame with the specified changes, without altering the original DataFrame

The “withColumn” function is particularly useful when you need to perform column-based operations like renaming, changing the data type, or applying a function to the values in a column.

Syntax

The syntax for the “withColumn” function is:

DataFrame.withColumn(colName, col)

where:

DataFrame: The original PySpark DataFrame you want to manipulate.

colName: The name of the new or existing column you want to add, replace, or update.

col: The new expression or value for the specified column.

Let’s start by importing the necessary libraries and creating a simple DataFrame.

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize a Spark session
spark = SparkSession.builder.appName("withColumnExample").getOrCreate()

data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35)]
columns = ["id", "name", "age"]

# Create a DataFrame
df = spark.createDataFrame(data, columns)

# Display the original DataFrame
df.show()
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 25|
|  2|    Bob| 30|
|  3|Charlie| 35|
+---+-------+---+

1. Renaming a column

let’s use “withColumn” to rename the “age” column to “years”

# Rename the 'age' column to 'years'
df = df.withColumn("years", col("age"))

# Drop the original 'age' column
df = df.drop("age")

# Display the updated DataFrame
df.show()
+---+-------+-----+
| id|   name|years|
+---+-------+-----+
|  1|  Alice|   25|
|  2|    Bob|   30|
|  3|Charlie|   35|
+---+-------+-----+

2. Applying a function to a column

we will apply a function to convert the age from years to months

from pyspark.sql.functions import expr

# Convert 'years' to 'months'
df = df.withColumn("months", expr("years * 12"))

# Display the updated DataFrame
df.show()
+---+-------+-----+------+
| id|   name|years|months|
+---+-------+-----+------+
|  1|  Alice|   25|   300|
|  2|    Bob|   30|   360|
|  3|Charlie|   35|   420|
+---+-------+-----+------+

3. Changing a column’s data type

Suppose we want to change the data type of the ‘id’ column from integer to string. We can use “withColumn” along with the “cast” function to achieve this.

from pyspark.sql.types import StringType

# Change the data type of the 'id' column to string
df = df.withColumn("id", col("id").cast(StringType()))

# Display the updated DataFrame
df.show()
+---+-------+-----+------+
| id|   name|years|months|
+---+-------+-----+------+
|  1|  Alice|   25|   300|
|  2|    Bob|   30|   360|
|  3|Charlie|   35|   420|
+---+-------+-----+------+

4. Conditional column update with “withColumn”

let’s use “withColumn” to add a new column “tax” based on the salary. We will apply a 10% tax if the salary is greater than or equal to 50,000, and 5% tax otherwise.

from pyspark.sql.functions import when

data = [(1, "Alice", 25, 45000), (2, "Bob", 30, 55000), (3, "Charlie", 35, 60000)]
columns = ["id", "name", "age", "salary"]

# Create a new DataFrame
df = spark.createDataFrame(data, columns)
df.show()

# Add a new 'tax' column based on the 'salary' column
df = df.withColumn("tax", when(col("salary") >= 50000, col("salary") * 0.1).otherwise(col("salary") * 0.05))

# Display the updated DataFrame
df.show()
+---+-------+---+------+
| id|   name|age|salary|
+---+-------+---+------+
|  1|  Alice| 25| 45000|
|  2|    Bob| 30| 55000|
|  3|Charlie| 35| 60000|
+---+-------+---+------+

+---+-------+---+------+------+
| id|   name|age|salary|   tax|
+---+-------+---+------+------+
|  1|  Alice| 25| 45000|2250.0|
|  2|    Bob| 30| 55000|5500.0|
|  3|Charlie| 35| 60000|6000.0|
+---+-------+---+------+------+

In this example, we used the “when” and “otherwise” functions to create a new “tax” column based on the “salary” column’s values.

5. Using a User-Defined Function (UDF) with “withColumn”

we will create a User-Defined Function (UDF) to categorize employees into different groups based on their age and apply it using “withColumn”.

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def age_group(age):
    if age < 30:
        return "Young"
    elif age < 45:
        return "Middle-aged"
    else:
        return "Old"

# Register the UDF
age_group_udf = udf(age_group, StringType())

Now, we will use “withColumn” to apply the UDF to the “age” column, creating a new “age_group” column.

# Add a new 'age_group' column based on the 'age' column
df = df.withColumn("age_group", age_group_udf(col("age")))

# Display the updated DataFrame
df.show()
+---+-------+---+------+------+-----------+
| id|   name|age|salary|   tax|  age_group|
+---+-------+---+------+------+-----------+
|  1|  Alice| 25| 45000|2250.0|      Young|
|  2|    Bob| 30| 55000|5500.0|Middle-aged|
|  3|Charlie| 35| 60000|6000.0|Middle-aged|
+---+-------+---+------+------+-----------+

6. Column operations using multiple columns

we will perform operations on multiple columns to create a new column. Let’s use the previous DataFrame and create a new column “net_salary” by subtracting the “tax” column from the “salary” column.

from pyspark.sql.functions import round

# Add a new 'net_salary' column by subtracting 'tax' from 'salary'
df = df.withColumn("net_salary", round(col("salary") - col("tax"), 2))

# Display the updated DataFrame
df.show()
+---+-------+---+------+------+-----------+----------+
| id|   name|age|salary|   tax|  age_group|net_salary|
+---+-------+---+------+------+-----------+----------+
|  1|  Alice| 25| 45000|2250.0|      Young|   42750.0|
|  2|    Bob| 30| 55000|5500.0|Middle-aged|   49500.0|
|  3|Charlie| 35| 60000|6000.0|Middle-aged|   54000.0|
+---+-------+---+------+------+-----------+----------+

Combining multiple columns into one

we will combine two columns, “name” and “age_group”, into a single column “name_age_group”. We will use the “concat_ws” function, which allows us to concatenate multiple columns with a specified delimiter.

from pyspark.sql.functions import concat_ws

# Combine 'name' and 'age_group' columns into a new 'name_age_group' column
df = df.withColumn("name_age_group", concat_ws(" - ", col("name"), col("age_group")))

# Display the updated DataFrame
df.show()
+---+-------+---+------+------+-----------+----------+--------------------+
| id|   name|age|salary|   tax|  age_group|net_salary|      name_age_group|
+---+-------+---+------+------+-----------+----------+--------------------+
|  1|  Alice| 25| 45000|2250.0|      Young|   42750.0|       Alice - Young|
|  2|    Bob| 30| 55000|5500.0|Middle-aged|   49500.0|   Bob - Middle-aged|
|  3|Charlie| 35| 60000|6000.0|Middle-aged|   54000.0|Charlie - Middle-...|
+---+-------+---+------+------+-----------+----------+--------------------+

Conclusion

we covered the basics of the PySpark “withColumn” function and explored its usage through examples. We demonstrated how to rename columns, apply functions to columns, and change a column’s data type. The “withColumn” function is a versatile and powerful tool for DataFrame manipulation in PySpark, making it an essential skill to master for any data engineer or data scientist working with big data.

Course Preview

Machine Learning A-Z™: Hands-On Python & R In Data Science

Free Sample Videos:

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science