How to Convert CSV to Parquet, JSON Format

Convert CSV to Parquet, JSON and other format

Convert CSV to Parquet

In this article, I am going to explain you how to read a CSV file into PySpark and Pandas DataFrame and convert or save DataFrame to JSON and Parquet file format the library (Pandas, PySpark) supports using Python.

Apache Parquet is a columnar file format that provides optimizations to speed up queries and is a far more efficient file format than CSV or JSON, supported by many data processing systems.

Below are the high level steps we will follow to convert the CSV file into different file format.

1. Read CSV file into Python Dataframe

To read a CSV file into a Pandas or Spark DataFrame, use Pandas.read_csv or spark.read.csv respectively. These methods take a file path to read as an argument. By default, the read() method considers the header to be a data record and reads the column names on file as data. the read_csv() method considers the header to be a data row and reads the column names on file as data. To overcome this, explicitly mention True for the header option. Additionally, the read() method reads all columns as strings by default.

Pandas
import pandas as pd

#CSV file with full path
employee_file = "employees.csv"

#Read csv file into pandas dataframe
df = pd.read_csv(employee_file)

#Print the Dataframe
df.show()
code
PySpark
#Import libraries
import pyspark
from pyspark.sql.types import DoubleType, LongType, StringType, IntegerType, StructField
from pyspark.sql.functions import col

#CSV File Path
employees_csv_file = "employees.csv"

#Create Schema
new_data_types = {
    "employee_id": LongType(),
    "Name": StringType(),
    "DateofBirth": StringType(),
    "Salary": DoubleType(),
    "Department": StringType()
    }

# Define the schema based on the custom data types
schema = StructType([
    StructField(col_name, data_type, True) for col_name, data_type in new_data_types.items()
])

# Create a SparkSession
spark = pyspark.sql.SparkSession.builder.appName("Convert CSV to Parquet").getOrCreate()

#Read Options
csv_options = {
    "header": "true", #Set to "false" if your CSV file doesn't have a header
    "schema": schema, #Infer data types automatically
    "quote": "\"",
    "escape": "\"",
    "sep": ",",
}


#Read CSV file into PySpark Dataframe
df = spark.read.csv(employees_csv_file, **csv_options)

# Show the PySpark Dataframe
df.show()

# Print the PySpark Schema
df.printSchema()
Code

2. Convert the Dataframe into Parquet

Let’s convert the Spark DataFrame created from a CSV file to a Parquet file. To write a Spark DataFrame to a Parquet file(pyspark write parquet), use the df.write.parquet() or df.coalesce(1).write.parquet function. This function is provided in the DataFrameWriter class to pyspark write parquet file. As mentioned earlier, Spark doesn’t need any additional packages or libraries to use Parquet, as it is provided by default. This is convenient because we don’t have to worry about version and compatibility issues.

Why PySpark Create Multiple Files?

PySpark creates multiple files when writing a DataFrame because it partitions the data. Partitioning is a technique for dividing data into smaller chunks that can be processed independently. This can improve the performance of certain operations, such as join and aggregation.

By default, PySpark partitions the data based on the number of cores in the cluster. This ensures that each core has approximately the same amount of work to do. However, you can also manually specify the number of partitions using the coalesce() function.

When writing a DataFrame to multiple files, PySpark creates a separate file for each partition. This allows PySpark to write the data in parallel, which can improve performance.

In the following example, we write a DataFrame to a Parquet file named employees.parquet

Read Parquet File using Pandas
# Write the DataFrame to a Parquet file
df.to_parquet("employees.parquet")
Python
PySpark Dataframe to Parquet file
# Write the DataFrame to a Parquet file
df.coalesce(1).write.parquet("employees.parquet")

# Close the SparkSession
#spark.close()
Python

3. Convert the Dataframe into JSON

Similar to Parquet, once we have a DataFrame created from CSV file, we can easily convert or save it to JSON file using dataframe.write.json(filename)

Pandas
# Write the DataFrame to a JSON file
df.to_json("employees.json")
Python
PySpark Dataframe to JSON
# Write the DataFrame to a Parquet file
df.coalesce(1).write.json("employees.json")
Python

4. Complete Code of Converting CSV to Parquet and JSON file

Convert CSV To Parquest using PySpark
#Import libraries
import pyspark
from pyspark.sql.types import DoubleType, LongType, StringType, IntegerType, StructField
from pyspark.sql.functions import col

#CSV File Path
employees_csv_file = "employees.csv"

#Create Schema
new_data_types = {
    "employee_id": LongType(),
    "Name": StringType(),
    "DateofBirth": StringType(),
    "Salary": DoubleType(),
    "Department": StringType()
    }

# Define the schema based on the custom data types
schema = StructType([
    StructField(col_name, data_type, True) for col_name, data_type in new_data_types.items()
])

# Create a SparkSession
spark = pyspark.sql.SparkSession.builder.appName("Convert CSV to Parquet").getOrCreate()

#Read Options
csv_options = {
    "header": "true", #Set to "false" if your CSV file doesn't have a header
    "schema": schema, #Infer data types automatically
    "quote": "\"",
    "escape": "\"",
    "sep": ",",
}


#Read CSV file into PySpark Dataframe
df = spark.read.csv(employees_csv_file, **csv_options)

# Show the PySpark Dataframe
df.show()

# Print the PySpark Schema
df.printSchema()

# Write the DataFrame to a Parquet file
df.coalesce(1).write.parquet("employees.parquet")

# Write the DataFrame to a Json file
df.coalesce(1).write.json("employees.json")

# Close the SparkSession
spark.close()
PySpark Convert CSV To Parquest
Convert CSV to Parquet Pandas
import pandas as pd

#CSV file with full path
employee_file = "employees.csv"

#Read csv file into pandas dataframe
df = pd.read_csv(employee_file)

#Write the DataFrame to a json file
df.to_json("employees.json")

#Write the DataFrame to a parquet file
df.to_parquet("employees.parquet")

#Print the Dataframe
df.show()
Pandas Convert CSV to Parquet

This Post Has 2 Comments

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.