In this post, we are going to learn about frequently asked practical question on Apache Spark Dataframes. During processing of data present in our dataframe in Spark, we might be in a situation to add a column with the processed data, or need to rename the column present in dataframe or else we might need to delete a column from the table structured collection called dataframe in Spark. If you need more information on Dataframe, go through this link "Dataframe in Apache Spark". Come, Lets get started.
Problem Statement:
Consider we have a sample data of students marklist in a CSV format file as shown in the below diagram, with four columns in it namely,
"ROLL_NO, SUBJECT, MARKS_OBTAINED, TOTAL_MARKS"
Our problem statement is to,
- Add a column named STATUS with values in it as PASS/FAIL. [Note: If obtained mark is greater than 30, then PASS, else FAIL]
- Rename the added STATUS column into column named as RESULT.
- Finally, drop the TOTAL_MARKS column from the created dataframe.
SOLUTION:
Let us open a fresh jupyter notebook and open an entry point, SparkSession. Also, let us read input file as a dataframe. If you haven't set spark in your windows machine, then I would recommend you to follow the given link to setup "Spark in Windows".
Let us look into the solution to the problem statement one after another.
a) Add Column to Dataframe:
For adding column to the Dataframe in Spark, we can use withColumn() or else we can also use sql statement by registering the Dataframe as a temporary view and write a query. To register the dataframe as temporary view, we have to use createTempView() on top of our dataframe in Spark. We look into both the method one by one.
Using withColumn():
Code snippet to add status column to our dataframe is as follows,
from pyspark.sql.functions import expr
add_df=input_df.withColumn("STATUS" \
,expr("""if(MARKS_OBTAINED >= 30,"PASS","FAIL") """ ))
Here we import expr from sql.functions and add if logic to achieve the output. We can observe, the status column is added to our add_df dataframe.
Out:[]
Using Tempview:
We can end up with the same result using select query written on top of temporary view build using the input dataframe. This method is helpful for the SQL developers who has minimal knowledge on coding and strong in writing queries to get the end results. To register the dataframe a temporary table, we need to use createTempView() as shown in the code snippet below for your reference.
input_df.createTempView("in_table")
add_tbl_df=spark.sql("""select *,
case
when MARKS_OBTAINED >= 30 then "PASS"
else "FAIL"
END as STATUS
from in_table """)
Here, we create write a simple case statement with select query to calculate the STATUS column for our dataframe. There will be no change in output, it will be same as the result we got by using Spark dataframe API.
b) Rename Column to Dataframe:
Renaming a column in dataframe is very simple, we just need to use the function in spark dataframe, withColumnRenamed("old_col_name","new_col_name"). Let us see this with the code snippet,
rename_df=add_df.withColumnRenamed("STATUS","RESULT")
Out[]:
From the output it is clear that the STATUS column is renamed as RESULT column.
c) Drop Column to Dataframe:
drop_df=rename_df.drop("TOTAL_MARKS")
Out[]:
Output clearly shows that the TOTAL_MARKS column has been dropped from the dataframe successfully.
Try yourself:
Add a column to the dataframe with column named RES_VAL and get the value as 0 if RESULT is PASS and 1 when RESULT is FAIL . Post your results in comments session.Hope you learnt a simple trick on how to play with the dataframe. Leave your comment below if you have any queries or doubts with the above steps.
Happy Learning!!!
0 Comments