Run the latest version of spark
Flexible choice of spark version in enterprise data platform
Distributors of enterprise data platforms such as Cloudera oftentimes bundle solid, but mature (=old) software. Particularly, in the case of Cloudera, they are focusing all their energy on CDP. This results in the current HDP sometimes not getting the latest updates.
In the past I already experimented with executing a custom version of spark regardless of the Hadoop or Hive version: Headless spark without Hadoop & Hive jars. However, that approach was lacking support for Hive, i.e. the tables registered in the Hive-Metastore were not available in spark.
Today, I will outline how to get Hive support to work on HDP 3.1 for Spark 3.x.
steps
First, download and unzip the (latest) version of Apache Spark. At the time of writing, this is 3.0.0. Then, extract it to the desired directory.
I wil demonstrate how to use pyspark. Certain environment variables need to be set up:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.8.0-openjdk'
os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf/'
# download your desired version of spark and unzip it
os.environ['SPARK_HOME'] ='/path/to/spark-3.0.0-bin-hadoop3.2'
hdp_classpath = !hadoop --config /usr/hdp/current/spark2-client/conf classpath
os.environ['SPARK_DIST_CLASSPATH'] = hdp_classpath[0]
We must specify the exact HDP version during startup of spark:
hdp_current_version = !ls /usr/hdp
hdp_current_version = hdp_current_version[1]
The next part of the code assumes that a kinit
is already executed successfully, i.e. kerberos authentication is already applied.
Now, lets start spark.
I was testing this on python 3.
This is not supported by Cloduera (HDP).
They still use python 2 for spark-submit
.
This means that periodic warnings due to the prelaunch script in python 2 will appear in the logs (but can be ignored) due to python 2/3 incompatibility.
When starting spark we allow it to read Hive 3 catalog, but this still does not mean that ACID tables would be supported.
# Required Classes
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, expr, rand, when, count, col
spark = SparkSession.builder.appName("custom spark") \
.master("yarn") \
.config("spark.submit.deployMode", "client") \
.config("spark.yarn.queue", "myqueue") \
.config("spark.driver.extraJavaOption", f"'-Dhdp.version={hdp_current_version}'") \
.config("spark.yarn.am.extraJavaOptions", f"'-Dhdp.version={hdp_current_version}'") \
.config("spark.hadoop.metastore.catalog.default", "hive") \
.config("spark.yarn.dist.files", "/usr/hdp/current/spark2-client/conf/hive-site.xml") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.enableHiveSupport() \
.getOrCreate()
spark.sql("show databases").show()
+--------------------+
| namespace|
+--------------------+
| foo|
| bar|
+--------------------+
only showing top 20 rows
Let’s introduce Koalas (pandas API on Spark). I had been looking forward to using koalas for a while. But the outdated version of spark was not supported. It offers a scalable pandas native API which executes the transformations on top of Spark - as well as a niceer (pandas) representation of data frames. The following is based on https://koalas.readthedocs.io/en/latest/getting_started/10min.html:
import databricks.koalas as ks
ks.DataFrame(spark.sql("show databases"))
namespace | |
---|---|
0 | foo |
1 | bar |
Koalas
Following is a brief example of how to go further with Koalas:
import pandas as pd
import numpy as np
dates = pd.date_range('20130101', periods=6)
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
pdf
A | B | C | D | |
---|---|---|---|---|
2013-01-01 | -0.198131 | -0.670062 | 0.571255 | -0.986475 |
2013-01-02 | 0.330174 | 1.699464 | -0.848992 | 1.025219 |
2013-01-03 | -1.400803 | 0.484182 | 0.861485 | 1.106958 |
2013-01-04 | 0.765681 | -1.187109 | -0.604827 | 0.676196 |
2013-01-05 | 0.008392 | 1.262556 | 1.869428 | 0.149154 |
2013-01-06 | 1.261549 | -1.675407 | -0.134585 | -1.498401 |
kdf = ks.from_pandas(pdf)
kdf
A | B | C | D | |
---|---|---|---|---|
2013-01-01 | -0.198131 | -0.670062 | 0.571255 | -0.986475 |
2013-01-03 | -1.400803 | 0.484182 | 0.861485 | 1.106958 |
2013-01-05 | 0.008392 | 1.262556 | 1.869428 | 0.149154 |
2013-01-02 | 0.330174 | 1.699464 | -0.848992 | 1.025219 |
2013-01-04 | 0.765681 | -1.187109 | -0.604827 | 0.676196 |
2013-01-06 | 1.261549 | -1.675407 | -0.134585 | -1.498401 |
kdf.describe()
A | B | C | D | |
---|---|---|---|---|
count | 6.000000 | 6.000000 | 6.000000 | 6.000000 |
mean | 0.127810 | -0.014396 | 0.285628 | 0.078775 |
std | 0.915462 | 1.369716 | 1.017690 | 1.089681 |
min | -1.400803 | -1.675407 | -0.848992 | -1.498401 |
25% | -0.198131 | -1.187109 | -0.604827 | -0.986475 |
50% | 0.008392 | -0.670062 | -0.134585 | 0.149154 |
75% | 0.765681 | 1.262556 | 0.861485 | 1.025219 |
max | 1.261549 | 1.699464 | 1.869428 | 1.106958 |
%matplotlib inline
from matplotlib import pyplot as plt
pser = pd.Series(np.random.randn(1000),
index=pd.date_range('1/1/2000', periods=1000))
kser = ks.Series(pser)
kser = kser.cummax()
kser.plot()
<AxesSubplot:>
pdf = pd.DataFrame(np.random.randn(1000, 4), index=pser.index,
columns=['A', 'B', 'C', 'D'])
kdf = ks.from_pandas(pdf)
kdf = kdf.cummax()
kdf.plot()
<AxesSubplot:>
summary
We have looked at how to start the latest version of Apache Spark (3.x) on the Hortonworks Data Platform HDP 3.1. Using YARN as a general system to allocate compute resources (basically like in a cloud) we can execute arbitrary versions of Spark easily, even though these are not supported out of the box by a specific distributor.