Notes On Pyspark
Intro
Apache Spark™ is a fast and general engine for large-scale data processing. - website
Spark is made accessible to Python users via the Python API, which is virtually as up to date as the Scala and Java API.
I was introduced to Spark via way of Databricks (Also Apache) cloud platform through my company. Consider it a commercial version of Jupyter or Zeppelin notebooks, language-agnostic, integrated on top of a Spark with a bunch of fancy runtime features.
Going through the registration for a free trial of Databricks and deploying it on a free trial of Amazon AWS takes minutes, and I would highly recommend it as a starting point to getting introduced to Spark. Databricks also has a community edition for learning purposes worth looking into.
Browse the Databricks training material and public notebooks for more.
Regarding Python, the Python API docs are available from Apache and the best source for up to date examples. As well, Databricks has some good notes, but not complete as they are not trying to replace the official docs.
Common Techniques
There are two common ways to select on DataFrames through the Python API of Spark. Either df.select("myCol")
or df.selectExp("myCol")
.
For me .select()
is the most intuitive coming from Pandas, however I also always do from pyspark.sql import functions as F
and use df.select(F.col("myCol").key)
or df.select(F.explode("myCol"))
.
While others prefer to use .selectExp()
which accepts SQL but still returns the DataFrame, and do df.selectExp("myCol.key")
or df.selectExp("explode(myCol)")
.
Group by and aggregation look like this, df.groupby("myCol1", "myCol2").agg(F.countDistinct("myCol3"))
.
The below table (stolen from a Databricks talk I attended) outlines main SQL functions and their Python API equivalent. The cool thing is these apply the same across Spark APIs (Scala, R, Java etc.)
SQL | DataFame API | DataFrame example (with String column names) |
---|---|---|
SELECT | select, selectExpr | myDataFrame.select(“someColumn”) |
WHERE | filter, where | myDataFrame.filter(“someColumn > 10”) |
GROUP BY | groupBy | myDataFrame.groupBy(“someColumn”) |
ORDER BY | orderBy | myDataFrame.orderBy(“column”) |
JOIN | join | myDataFrame.join(otherDataFrame, “innerEquiJoinColumn”) |
UNION | union | myDataFrame.union(otherDataFrame) |
Chaining functions together works really well, for example: df.filter(...).select(...).join(...).groupby(...).agg(...)
.
Also, when dealing with Spark Datasets (RDDs or HIVE tables), nested dictionaries and arrays can be utilized quite powerfully, and accessed in natural Pythonic ways. For example, as show above, access dictionaries using myDictCol.key
, and index arrays simply with myArrayCol[index]
.
Common functions to remember are .withColumn()
to add calculated fields to DataFrames or chain more than one explode
together, and also .withColumnRenamed()
to quickly rename that function-applied column.
After from pyspark.sql import functions as F
, you have access to a lot of basic tools, which can be combined in all sorts of ways to analysis. Consider the example of making SQL-accepted date column by combining a string year and month column. df.withColumn(F.to_date(F.concat_ws('-', df.year, df.month, F.lit('01')), format='yyyy-MM-dd').alias("date"))
.
Code Snippets
Below is a collection of code samples which I have created and needed to use in my work more than once. They may be helpful to others.
Explode with Index
Explode an elements in an array, or a key in an array of nested dictionaries with an index value, to capture the sequence.
The below code creates a PySpark user defined function
which implements enumerate
on a list and returns a dictionary with {index:value}
as integer and string respectively. I apply this to a dummy column “myNestedDict” which contains a key “myNestedCol” to show that this can work on dictionaries as well as arrays.
from pyspark.sql.functions import udf, col, explode
from pyspark.sql.types import ArrayType, MapType, StringType, IntegerType
explodeWithIndex = udf(lambda x: i:str(v) for i,v in enumerate(x) if x is not None, MapType(IntegerType(),StringType()))
df2 = df.filter(col("myNestedDict").isNotNull()).select("id", explode(explodeWithIndex("myNestedDict").myNestedCol))
Reverse Explode
Convert a column of strings to comma separated values list as a single string within a group by statement.
The below sample code will group by a dummy id, and roll up a column of string values into a single column of comma separated values. This could also be considered as a reverse explode.
from pyspark.sql import functions as F
df2 = df.groupby("id").agg(F.concat_ws(",", F.collect_list("myValues"))).withColumnRenamed("concat_ws(,, collect_list(myValues))", "myCSV")
Machine Learning
I have a really basic example of how to fully train and evaluate a Linear Regression Model using a Spark DataFrame, you can find it as a notebook at notebooks/pyspark_linear_regression.