How to Transform Rows and Column using Apache Spark

Pivot and Un-Pivot using Pyspark:



We often encounter a need to transpose or transform the row and column in a given input data while dealing with big data in data analytics.Also, we might be asked in Spark interviews How to pivot Dataframes ?. In this blog, we will learn to convert the value of a column into rows in Spark dataframe. We also have some hands-on experience to understand how it is done using Spark SQL dataframe and RDD with one simple example.

Problem Statement:


To understand the problem statement, let us take an example of student data comprising of three columns as below,

STUDENT_ID, SUBJECT, MARKS

Our problem statement is to convert the column value in rows as shown in the diagram below and also un-pivot the data to its original structure.

Problem Statement


a) We have a column named SUBJECT, and values inside this column as a multiple rows has to be transformed into separate column with values getting populated from MARKS columns as shown in the figure II.

b) Again we need to unpivot the data that is transposed and bring back as the original data, as like it was.



I guess, you understood the problem statement. Let's move on to look into the solution for this problems.

Solution:


Pivoting is nothing but the concept of manupulating the data from one column into multiple columns. It is an aggregation where one of the grouping columns values transformed into a seperate columns that hold an unique data with it. We can apply pivot to both RDD as well as Dataframe in Spark. Let us look into the solution to the above problem one by one. We start with the fresh jupyter notebook, establish an entry point, SparkSession and read the input data from the file we have. Follow the below code snippet to do so.

a) Transform or Pivot Multiple rows into column:


Method 1: Using Dataframe API to tranpose:


As,we read the header directly from input CSV file, all the columns are of type String. We in-order to transpose, for MARKS column should be of type Interger. So, before we apply pivot to the rows, we cast the Mark columns as Integertype. Code snippet to do this casting is show below.

from pyspark.sql.types import IntegerType
print("Before casting")
input_df.printSchema()
input_df_cast=input_df.withColumn("MARKS",input_df['MARKS'] \
                                  .cast('integer'))
print("After casting")
input_df_cast.printSchema()


And output of the above lines of code is

Out[]:


Now, as we can observe that the MARKS column got casted into integer type we are good to proceed with applying transpose. The multiple rows can be transformed into columns using pivot() function that is available in Spark dataframe API. We will implement it by first applying group by function on ROLL_NO column, pivot the SUBJECT column and apply aggregation on MARKS column. Follow the below code snippet to get the expected result.

pivot_df = input_df_cast.groupby('ROLL_NO') \
                        .pivot('SUBJECT') \
                        .max('MARKS') \
                        .fillna(0)
pivot_df.show()

fillna(0) is used in the above block of code to handle the situation where the MARKS for particular SUBJECT for given ROLL_NO is not available, then made as 0 by default. And output will be as follows,

Out[]:


Performance of Pivot in Spark:


Hence we could see, the data is transposed. Thanks to spark 2.0, as from Apache spark 2.0 the performance has been improved a lot with respect to pivot operation. But, if you are still using the lower version of Spark, then keep in mind that pivot on a dataframe in spark is really an expensive operation, so it will be good if you can provide column data as an argument to the function like shown in code snippet.

subject_list=["English","History","Maths","Physics","Science"]
pivot_df = input_df_cast.groupby('ROLL_NO') \
                        .pivot('SUBJECT',subject_list) \
                        .max('MARKS') \
                        .fillna(0)
pivot_df.show()

Output will be same as the above one.

Method 2: Using two-phase aggregation:


Another way to achieve the transpose of rows into column is by using the optimized way called two-phase aggregation. Spark 2.0 uses this sort of method to improve the performance. For more details you can have a look into the jira ticket raised regarding this here Spark-13749. Piece of code to obtain the output with this method this shown below.



pivot_perf_DF = input_df_cast.groupBy("ROLL_NO","SUBJECT") \
      .max("MARKS") \
      .groupBy("ROLL_NO") \
      .pivot("SUBJECT") \
      .max("max(MARKS)")
pivot_perf_DF.show()


Again the output will be same, and also you can notice some improvement in code run time.

Method 3: Using groupby over RDD:


The same pivot can be achieved in RDD aswell, and the code snipped to get the result is given below.

from pyspark.sql import Row
grouped = input_df.rdd \
          .map(lambda row: (row.ROLL_NO, (row.SUBJECT, row.MARKS))) \
          .groupByKey()

def make_row(kv):
    key, val = kv
    out = dict([("ROLL_NO", key)] + list(val))
    return Row(**out)

grouped.map(make_row).collect()


Here, we convert the input dataframe as RDD and apply groupby function on top of it. Note that we don't need to cast MARKS columns if we are dealing with RDD. If we run the above code snippet it results in the RDD, with Row format as shown in the below diagram.

Out[]:


Now, the RDD with Row can be converted into Dataframe. To get more details on how to convert rdd to dataframe, I would recommend you to go through the link Convert RDD to dataframe in spark. Code snippet to do this as follows.

pivot_rdd = spark.createDataFrame(grouped.map(make_row))
pivot_rdd.show()

We can observe that the columns are shuffled. This is because, the data gets sorted internally by its key to address the problem with older version of python.

Out[]:


b) Un-Pivot the above data:


Unpivot is the reverse operation of transpose or pivot operation that we did previously. we can achieve by changing the column values back as a rows values. Spark SQL has no corresponding function on dataframe to do unpivot easily. Therefore we will be using SelectExpr() function along with the stack() function. 

Syntax of stack() function:


input_df.selectExpr("stack('number_of_columns', <col0_label>, <col0_value>, <col1_label>, <col1_value> ...)")

Below is a code snippet to converts column values back into row.

unPivot_dF = pivot_perf_DF.selectExpr("ROLL_NO", \
         "stack(6, 'English',English, 'History',History, \
          'Maths',Maths,'Physics' ,Physics, 'Science',Science)") \
.withColumnRenamed("col0","SUBJECT") \
.withColumnRenamed("col1","MARKS") \
.where("MARKS is not null")

unPivot_dF.show()


Output of the un-pivoted data is shown below.

Out[]:


Summary:


In this blog, we had see different ways to achieve to pivot operation with the example and did some performance tuning on pivot operation and finally, we unpivoted the data back as original one. Which technique you feel comfortable, provide your comments on the same. Hope you had a good learning session today.


Happy Learning !!!

Post a Comment

0 Comments