We will use PySpark as our programming language and databricks community edition as a platform to develop.
Question:
Consider we have a JSON format file with nested columns in it and the input schema of the file is shown above.
- We need to Add column called discount to the fruits, which is of struct type.
- Remove the column called rate under vegetables.
- Cast the existing rate column found under Stationary field.
The input dataset can be downloaded from the below given git link. Download and practice on your own for better understanding
Solution:
Step 1: Read the give input JSON file
#Read Input JSON File in_path="dbfs:/FileStore/shared_uploads/mart_in_json.json" file_format="json" df1 = ( spark.read.format(file_format) .option('multiLine',True) .load(in_path) ) df1.select("Area","fruits","vegetable","Stationary").show() df1.printSchema()
Out[]:
Step 2: Add Column to the StructType()
Problem with WithColumn():
We can use withColumn function to add a column to Spark DF. But limitation with this function is that we cannot use it to add new column inside the nested columns, in other words we cannot add new column inside StructType field using withColumn() function. If we try to add column using this Spark function, we will encounter a result as shown below, where the column is not added inside Struct but added at last outside.
Method to Add Column Using Struct function:
#Solution Using Struct
#import struct function
from pyspark.sql.functions import struct,col
#Get Existing Struct Schema of fruits from dataframe
s_fields = df1.schema["fruits"].dataType.names
#add new column using struct
df2=(
df1.withColumn("fruits",
struct(*([col('fruits')[c].alias(c) for c in s_fields]
+ [lit(10).alias('discount')])))
))
#out schema
df2.printSchema()
Out[]:
df2.select("Area","fruits","vegetable","Stationary").display()
Step 3: Remove a Column from StructType():
We cannot use drop function to remove the column inside StructType. So as a work around we can remove the column from the schema and use withColumn to get the new dataframe with column inside StructType dropped.
#get the existing schema
s_fields = df1.schema["Vegetable"].dataType.names
#Remove the column from the list
s_fields.remove("rate")
#define the column with removed schema
df3=df2.withColumn("Vegetable",struct(*([col('Vegetable')[c].alias(c) for c in s_fields])))
#New dataframe
df3.printSchema()
Out[]:
Step 3: Cast a Column inside StructType()
#Get the schema s_fields = df1.schema["Stationary"].dataType.names s_fields.remove("rate") #cast Rate column as string df4=( df3.withColumn("Stationary", struct(*([col('Stationary')[c].alias(c) for c in s_fields] + [col('Stationary')['rate'].cast('string').alias('rate')])) ) ) #print Schema df4.printSchema()
Out[]:
df2.select("Area","fruits","vegetable","Stationary").display()
Out[]:
Happy Learning !!!
1 Comments
What if we need to add discount only when fruits price>10
ReplyDelete