How to Convert Pandas DataFrame into Spark DataFrame | Pandas DF to Spark DF using PySpark

In this session, we will see how to convert pandas dataframe into Spark dataframe in a best and efficient performing approach. We will learn this concept with a problem statement.

Problem Statement:



Consider a input CSV file which has some transaction data in it. We had read the CSV file using pandas read_csv() method and the input pandas dataframe will look like as shown in the above figure. Our requirement is to convert the pandas df to spark df using PySpark and display the resultant dataframe as shown in the picture above.

Attachment:

Download the sample dataset from the below link 

Apache Arrow in Spark for Converting pandas df:

Apache Arrow is the open source cross-platform tool, that closely works with the in-memory columnar data formats of Spark. This provides us the ability to convert data between python process and JVM in a efficient and fast way. By default Apache Arrow is disabled in Spark and can be enabled using the below line.

#Enable Apache Arrow
spark.conf.set("spark.sql.execution.arrow.enabled","true")
All Spark SQL datatype are compatible with the version of 0.10.0 of arrow and if there is any issue with the casting while conversion, then it could fall back to a non-Arrow implementation. You can check this configuration by, 

spark.conf.get("spark.sql.execution.arrow.fallback.enabled")

Note: Keep in mind that even if arrow is enabled, the conversion of Spark to pandas involves the Driver memory for collection of data into single node which make the memory overhead issue. So, the best approach is to take a sample before making conversion using toPandas() API.

Cast Issues While Conversion:

We can convert the pandas df to Spark df using createDataFrame() API as shown below.

#Code Syntax to create dataframe from pandas df
in_df=spark.createDataFrame(pandas_dataframe)
But if there is any datatype mismatch then while creating Spark dataframe itself it throws error stating type mismatch, refer the below diagram for issue


Explanation with Demo:



If you like this explanation with demo, please do subscribe to this channel for more video on scenario based questions on spark.

Solution:

We can convert the Pandas DF to Spark DF in two methods. 
  • By casting all the columns in pandas as string using astype() 
  • By defining structType() schema and using it.

Step 1:

Read the input csv file using read_csv() to create pandas df, Use the below code snippet to create pandas input dataframe.


#import the required python package
import pandas as pd
import datetime

#Read the input csv file
in_pd=pd.read_csv("trans.csv")

#Display sample result from pandas df
in_pd.head()

Out[]:

Step 2:

Method 1 - casting before conversion.

As we discussed, we need to cast all the columns in the pandas df to string type to overcome this datatype issue while converting pandas df to spark df. 

General Syntax:
  • pandas_dataframe.astype(dataType)
astype() is an alias of cast()

Code Snippet:

#Create spark df from pandas df using astype()
in_df=spark.createDataFrame(in_pd.astype(str))
in_df.show()

#printSchema() used to display schema
in_df.printSchema()
Out[]:

From the result, one can observe that, we are able to make conversion from pandas df to spark df successfully. But, also notice the schema of the spark dataframe. It gives clear picture that all the column are of String datatype. If we need to perform any aggregation with numerical column in spark dataframe, then we need to again cast the column to Integer or decimal. So, the best approach would be to define the schema and try converting into spark dataframe as shown in the below step.

Step 3: 

Method II - Define Schema and Convert.


#define the schema using StructType()
from pyspark.sql.types import *
pdsch=StructType([StructField("CustomerId",IntegerType(),True),
                  StructField("CustomerName",StringType(),True),
                  StructField("dateTime",StringType(),True),
                  StructField("Amount",FloatType(),True),
                  StructField("Discount",StringType(),True),
                  StructField("Member",BooleanType(),True),
                 ])

#Create dataframe using defined schema
method2_df=spark.createDataFrame(in_pd,schema=pdsch)
method2_df.show()

#printSchema used to check the schema of spark df
method2_df.printSchema()

Out[]:

With this approach, we can convert the pandas dataframe to Spark DataFrame as well as we can retain the schema of data.

Convert Spark DF to Pandas DF:

Now, let us see how we convert spark df to pandas df. To change the Spark DataFrame to Pandas DataFrame, we can use toPandas() as shown below.

General Syntax:

  • df.toPandas()
returns the content of Spark dataframe as pandas.dataframe

Code Snippet:


#conversion spark df into pandas df
pd_df2=meth2_df.toPandas()

#Display the results
pd_df2.head()
pd_df2.dtypes
type(pd_df2)
Out[]:

Hope you learnt the ways offered by PySpark to convert the pandas df to Spark DF and pandas df to Spark df conversion. Let me know your thought and suggestion on this topic in the comment box given below.

Happy Learning !!!

Related Topic:

Post a Comment

0 Comments