Compute 1-d histogram in spark
Spark has a way to compute the histogram, however, it is kept under low level and sometimes obscure classes. In this post I'll give you a function that provides you with the desired values passing a dataframe.
In the official documentation the only mention to histogram is in the DoubleRDDFunctions
class. It contains some handy functions that you can call if and only if you have a RDD[Double]
.
To obtain the histogram from a DataSet
we need some boilerplate code:
val histogram = df.
select(col("column")).
rdd.
map(r => r.getDouble(0)).
histogram(numberOfBins)
The code was created to fit my needs, where I was ok with having an histogram with automatic edges and having the bins equally spaced. Instead of returning the bin edges, i get the middle value for later processing needs. Feel free to modify this to fit your needs. If you provide the bins beforehand it will be less coputationally expensive.
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
def histogram(df: DataFrame,column: String, numBuckets: Int): DataFrame = {
val desiredDf = df.select(col(column))
desiredDf.cache()
val vector = desiredDf.rdd.map(r => r.getDouble(0))
// Computation of min and max is optional
val maxMin:Row = desiredDf.agg(max(col(column)), min(col(column))).head
val minimum = maxMin.getDouble(1)
val maximum = maxMin.getDouble(0)
def computeBins(min: Double, max: Double, numBuckets: Int): Array[Double] = {
val step = (max - min)/numBuckets
List.tabulate(numBuckets + 1)(min + _*step).toArray
}
def computeMidValueBins(min: Double, max: Double, numBuckets: Int): Array[Double] = {
val step = (max - min)/numBuckets
List.tabulate(numBuckets)(min + _*step + step/2).toArray
}
val bins = computeBins(minimum, maximum, numBuckets)
val midBins = computeMidValueBins(minimum, maximum, numBuckets)
val results = vector.histogram(bins, true)
desiredDf.unpersist()
import spark.implicits._
// Get the density of the bins also so it is normalized
val densityValues = {
val sum = results.foldLeft(0L)(_+_)
results.map(_*1.0/sum)
}
midBins.zip(densityValues).zip(results).toList.
map{ case ((a,b),c) => (a,b,c)}.toDF("x","y","count")
}