Introduction
Apache PySpark is an open-source, distributed computing system designed for big data processing and analytics. It provides an interface for programming Apache Spark with the Python programming language.
One of the most important tasks in data processing is reading and writing data to various file formats. In this blog post, we will explore multiple ways to read and write data using PySpark with code examples.
1. Prerequisites
To follow this tutorial, you’ll need to have PySpark installed on your system. You can install PySpark using pip:
pip install pyspark
pip install findspark
2. Initializing Spark Session
Before we dive into reading and writing data, let’s initialize a SparkSession. The SparkSession is the entry point to PySpark and allows you to interact with the data
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Read and Write Data Using PySpark") \
.getOrCreate()
3. Create a PySpark DataFrame
Creating a DataFrame from a Python list of dictionaries:
data = [("Alice", 34), ("Bob", 45), ("Cathy", 29)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)
df.show()
+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
| Bob| 45|
|Cathy| 29|
+-----+---+
4. Reading and Writing CSV Files
To read a CSV file using PySpark, you can use the read.csv() method:
csv_file = "path/to/your/csv/file.csv"
df_csv = spark.read.csv(csv_file, header=True, inferSchema=True)
Now that you have your data in a DataFrame, you can write it back to a CSV file using the write.csv() method:
output_path = "path/to/output/csv/file.csv"
df_csv.write.csv(output_path, header=True, mode="overwrite")
5. Reading and Writing JSON Files
To read a JSON file using PySpark, you can use the read.json() method:
json_file = "path/to/your/json/file.json"
df_json = spark.read.json(json_file)
You can write the data back to a JSON file using the write.json() method:
output_path = "path/to/output/json/file.json"
df_json.write.json(output_path, mode="overwrite")
6. Reading and Writing Parquet Files
To read a Parquet file using PySpark, you can use the read.parquet() method:
parquet_file = "path/to/your/parquet/file.parquet"
df_parquet = spark.read.parquet(parquet_file)
To write the data back to a Parquet file, use the write.parquet() method:
output_path = "path/to/output/parquet/file.parquet"
df_parquet.write.parquet(output_path, mode="overwrite")
7. Creating a SQL Table in PySpark
We’ll create a sample DataFrame using a list of dictionaries and register the DataFrame as a temporary SQL table to perform SQL operations
data = [
{"name": "Alice", "age": 30, "city": "New York"},
{"name": "Bob", "age": 25, "city": "San Francisco"},
{"name": "Charlie", "age": 35, "city": "Los Angeles"}
]
df = spark.createDataFrame(data)
df.createOrReplaceTempView("people")
query = "SELECT * FROM people WHERE age >= 30"
result_df = spark.sql(query)
result_df.show()
+---+-----------+-------+
|age| city| name|
+---+-----------+-------+
| 30| New York| Alice|
| 35|Los Angeles|Charlie|
+---+-----------+-------+
8. Convert Pandas Dataframe to PySpark Dataframe
import pandas as pd
data = [
{"name": "Alice", "age": 30, "city": "New York"},
{"name": "Bob", "age": 25, "city": "San Francisco"},
{"name": "Charlie", "age": 35, "city": "Los Angeles"}
]
pandasDF = pd.DataFrame(data, columns = ['name', 'age', 'city'])
#Create PySpark DataFrame from Pandas
sparkDF=spark.createDataFrame(pandasDF)
sparkDF.printSchema()
sparkDF.show()
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- city: string (nullable = true)
+-------+---+-------------+
| name|age| city|
+-------+---+-------------+
| Alice| 30| New York|
| Bob| 25|San Francisco|
|Charlie| 35| Los Angeles|
+-------+---+-------------+
Conclusion
In this blog post, we explored multiple ways to read and write data using PySpark, including CSV, JSON, Parquet, SQL, Pandas Data Frame