How to Add Column to StructType in Spark DF | Add, Drop, Cast Column in Spark Struct Field | Apache Spark

In this blog, we will learn how to apply a business logic or transformation to the column in Spark DF of type StructType. We will see how we can add, remove or cast a column inside struct type fields in Dataframe. To understand this let us consider the real time use case as described in the below picture.

We will use PySpark as our programming language and databricks community edition as a platform to develop.

Question:


Consider we have a JSON format file with nested columns in it and the input schema of the file is shown above. 
  • We need to Add column called discount to the fruits, which is of struct type.
  • Remove the column called rate under vegetables.
  • Cast the existing rate column found under Stationary field.


The input dataset can be downloaded from the below given git link. Download and practice on your own for better understanding


Solution:


Step 1: Read the give input JSON file


#Read Input JSON File
in_path="dbfs:/FileStore/shared_uploads/mart_in_json.json"
file_format="json"

df1 = (
       spark.read.format(file_format)
                 .option('multiLine',True)
                 .load(in_path)
)

df1.select("Area","fruits","vegetable","Stationary").show()
df1.printSchema()
Out[]:




Step 2: Add Column to the StructType()

Problem with WithColumn():

We can use withColumn function to add a column to Spark DF. But limitation with this function is that we cannot use it to add new column inside the nested columns, in other words we cannot add new column inside StructType field using withColumn() function. If we try to add column using this Spark function, we will encounter a result as shown below, where the column is not added inside Struct but added at last outside.


Method to Add Column Using Struct function:



#Solution Using Struct

#import struct function
from pyspark.sql.functions import struct,col

#Get Existing Struct Schema of fruits from dataframe
s_fields = df1.schema["fruits"].dataType.names

#add new column using struct
df2=(
  df1.withColumn("fruits",
       struct(*([col('fruits')[c].alias(c) for c in s_fields]
              + [lit(10).alias('discount')])))
    ))

#out schema
df2.printSchema()
Out[]:


df2.select("Area","fruits","vegetable","Stationary").display()


Step 3: Remove a Column from StructType():


We cannot use drop function to remove the column inside  StructType. So as a work around we can remove the column from the schema and use withColumn to get the new dataframe with column  inside StructType dropped.

#get the existing schema
s_fields = df1.schema["Vegetable"].dataType.names

#Remove the column from the list 
s_fields.remove("rate")

#define the column with removed schema
df3=df2.withColumn("Vegetable",struct(*([col('Vegetable')[c].alias(c) for c in s_fields])))

#New dataframe
df3.printSchema()
 Out[]:



df2.select("Area","fruits","vegetable","Stationary").display()

Out[]:



Step 3: Cast a Column inside StructType()


#Get the schema
s_fields = df1.schema["Stationary"].dataType.names
s_fields.remove("rate")


#cast Rate column as string
df4=(
  df3.withColumn("Stationary",
                 struct(*([col('Stationary')[c].alias(c) for c in s_fields] + 
                          [col('Stationary')['rate'].cast('string').alias('rate')]))
                )
)

#print Schema
df4.printSchema()
Out[]:

df2.select("Area","fruits","vegetable","Stationary").display()


Out[]:


Happy Learning !!!

Post a Comment

1 Comments

  1. What if we need to add discount only when fruits price>10

    ReplyDelete