How to access a Hive table using Pyspark?
Contents
- 1 Pyspark
- 1.1 Hive Table
- 1.2 Write Pyspark program to read the Hive Table
- 1.2.1 Step 1 : Set the Spark environment variables
- 1.2.2 Step 2 : spark-submit command
- 1.2.3 Step 3: Write a Pyspark program to read hive table
- 1.2.4 Pyspark program to read Hive table => read_hive_table.py
- 1.2.5 Shell script to call the Pyspark program => test_script.sh
- 1.2.6 Execute shell script to run the Pyspark program
Pyspark
Apache Spark is an in-memory data processing framework written in Scala language. It process the data 100 times faster than Hadoop map reduce jobs. It is providing API’s for the programming languages such as Scala, Java and Python.
Pyspark is a Python API to support python with Apache Spark. It allows us to write a spark application in Python script. Also it provides the PySpark shell for interactively analyzing our data in a distributed environment. Using the spark-submit command, We can submit this Spark or Pyspark application to the cluster.
Hive Table
In this tutorial, we are going to read the Hive table using Pyspark program. In Hive, we have a table called electric_cars in car_master database. It contains two columns such as car_model and price_in_usd.
Write Pyspark program to read the Hive Table
Step 1 : Set the Spark environment variables
Before running the program, we need to set the location where the spark files are installed. Also it needs to be add to the PATH variable. In case if we have multiple spark version installed in the system, we need to set the specific spark version also.
1 2 3 4 5 6 7 8 |
# Set spark version to 2 export SPARK_MAJOR_VERSION=2 # Set the location where the spark files are installed export SPARK_HOME=/usr/hdp/2.6.5.0-292/spark2 #Spark location added to Path variable. export PATH=$SPARK_HOME/bin:$PATH |
Step 2 : spark-submit command
We write a spark program in the python script with the file extension of .py. To submit that spark application to cluster, we use the spark-submit command as below
1 2 |
# read_hive_table.py - is a pyspark file name spark-submit read_hive_table.py |
Step 3: Write a Pyspark program to read hive table
In the pyspark program, we need to create a spark session. For that we are importing the pyspark library as dependency
1 |
from pyspark.sql import SparkSession |
Next we creates the spark session in the main block which is used to read the Hive table. enableHiveSupport() – Enables Hive support, including connectivity to a persistent Hive metastore, support for Hive SerDes, and Hive user-defined functions.
1 2 3 4 5 |
#Setting our application name appname = "ExtractCars" #Create spark session spark = SparkSession.builder.appName(appname).enableHiveSupport().getOrCreate() |
To read a Hive table, We are writing a custom function as FetchHiveTable. This function runs select query on the electric_cars table using spark.sql method. Then we are storing the result in the data frame.
Next we are using collect() function to retrieve the elements from data frame. It returns the elements in an array. Please note that collect function is recommended for the small dataset as it brings all data to the driver node. Otherwise it will cause out of memory error.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
def FetchHiveTable(): fetch_sql = "select * from car_master.electric_cars" #Run sql and retrieve data frame elements as an array table_res = spark.sql(fetch_sql).collect() # Loop through the result set and printing the each column values for row in table_res: car_model_name = row["car_model"] car_price = row["price_in_usd"] print("car model name : " + car_model_name) print("car price : " + car_price) print("for loop is exit") |
Next we will call this function from the main block and stop the spark application.
1 2 |
FetchHiveTable() spark.stop() |
Pyspark program to read Hive table => read_hive_table.py
Lets code pyspark program in a single file and name it as read_hive_table.py.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
from pyspark.sql import SparkSession #custom function to access Hive Table def FetchHiveTable(): fetch_sql = "select * from car_master.electric_cars" table_res = spark.sql(fetch_sql).collect() print(table_res) for row in table_res: car_model_name = row["car_model"] car_price = row["price_in_usd"] print("car model name : " + car_model_name) print("car price : " + car_price) print("for loop is exit") #Main program starts here if __name__ == "__main__": appname = "ExtractCars" #Creating Spark Session spark = SparkSession.builder.appName(appname).enableHiveSupport().getOrCreate() print("Spark application name: " + appname) FetchHiveTable() spark.stop() exit(0) |
Shell script to call the Pyspark program => test_script.sh
To set the spark environment variables and execute our pyspark program, we are creating shell script file named as test_script.sh. In side this script, we are executing the pyspark program using spark-submit command.
1 2 3 4 5 6 7 8 9 10 |
#!/bin/bash echo "Info: Setting global variables" export SPARK_MAJOR_VERSION=2 export SPARK_HOME=/usr/hdp/2.6.5.0-292/spark2 export PATH=$SPARK_HOME/bin:$PATH spark-submit /x/home/revisit_user/test/read_hive_table.py |
Execute shell script to run the Pyspark program
Finally we are running the shell script file test_script.sh as below. It will execute our pyspark program read_hive_table.py.
1 |
sh test_script.sh |
Output
Recommended Articles
- How to save Spark dataframe to Hive table?
- How to send Spark dataframe values as HTML table to Email?
- How to read BigQuery table using PySpark?