⭐ PySpark and 🤗 Hugging Face Parquet Files

Community Article Published August 13, 2024

Introduction

Welcome to this guide on using PySpark to load and process Parquet files from Hugging Face datasets! We'll walk you through setting up a Spark session, loading Parquet files, and performing basic data operations, all while using a wine reviews dataset as our example. Let's dive in!


Table of Contents

  1. Setup
  2. Displaying the Dataset
  3. Loading Hugging Face Parquet Files
  4. Exploring the Data
  5. Data Transformation
  6. Conclusion

1. Setup

Before we begin, let's set up our environment. First, we'll install the necessary libraries and start a Spark session.

pip install pyspark

Next, we will import the required dependencies and initialize a Spark session. The Spark session is your entry point to using Spark for working with DataFrames.

from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WineReviews").getOrCreate()

2. Displaying the Dataset

We can display an interactive viewer directly from Hugging Face to better understand our dataset. This step is optional but highly recommended!

from IPython.display import HTML

dataset = "james-burton/wine_reviews"

iframe_html = """
<iframe src="https://huggingface.co/datasets/{dataset}/embed/viewer" width="80%" height="560px"></iframe>
""".format(dataset=dataset)
display(HTML(iframe_html))

The Wine Reviews dataset contains reviews from both professional wine critics and enthusiasts, detailing various wine characteristics like variety, ratings, price, and origin.


3. Loading Hugging Face Parquet Files

Now, let's load the dataset by fetching Parquet file URLs from the Hugging Face API and adding them to the Spark context.

import requests

HUGGING_FACE_PARQUET_API = "https://huggingface.co/api/datasets/{dataset}/parquet"
r = requests.get(HUGGING_FACE_PARQUET_API.format(dataset=dataset))

train_parquet_files = r.json()['default']['train']

for url in train_parquet_files:
  spark.sparkContext.addFile(url)

df = spark.read.parquet(SparkFiles.getRootDirectory() + "/*.parquet")

4. Exploring the Data

With the data loaded into a DataFrame, we can explore its structure and contents.

# Shape of the dataset
print(f"Shape of the dataset: {df.count()}, {len(df.columns)}")

# Displaying first 10 rows
df.show(n=10)

# Getting a statistical summary of the data
df.describe().show()

# Print the schema of the DataFrame
df.printSchema()

5. Data Transformation

Let's perform some basic transformations on the dataset.

# Display all values of a particular column
df.select('country').show()

# Select multiple columns
df.select(['country','province']).show()

# Display data types of columns
df.dtypes

# Create a subset of the dataset
df1 = df.limit(5)
df1.show()

Adding a New Column

We'll add a new column that concatenates the values of country and province with a hyphen.

from pyspark.sql.functions import concat, lit

df1 = df.withColumn("location", concat(df['country'], lit('-'), df['province']))
df1.show()

Grouping Data

Group the data by country and count the number of records for each country, then sort the results in descending order.

df.groupBy('country').count().orderBy('count', ascending=False).show()

Executing SQL Queries

You can also achieve the same result using SQL queries.

df.createOrReplaceTempView("wine_reviews_table")
spark.sql("SHOW TABLES;").show()

result_df = spark.sql("SELECT country, count(*) as count from wine_reviews_table GROUP BY country ORDER BY count DESC")
result_df.show()

Handling Missing Values

Let's check for and handle missing values in the dataset.

from pyspark.sql.functions import col, isnan, when, count

null_df_counts = df.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '') | \
                            col(c).isNull() | \
                            isnan(c), c
                           )).alias(c)
                    for c in df.columns])

null_df_counts.show()

# Removing rows with any null values
df_clean = df.dropna()
df_clean.show()

You can see the notebook with the code here.

6. Conclusion

In this tutorial, we covered:

  • Initializing a Spark session.
  • Loading Parquet files from Hugging Face into a PySpark DataFrame.
  • Exploring the data structure and content.
  • Performing data transformations.
  • Executing SQL queries.
  • Handling missing values.

These steps lay the foundation for more advanced data analysis and transformations using PySpark.

🤗 Happy data processing!