Compute 1-d histogram in spark

Compute 1-d histogram in spark

Spark has a way to compute the histogram, however, it is kept under low level and sometimes obscure classes. In this post I'll give you a function that provides you with the desired values passing a dataframe.

In the official documentation the only mention to histogram is in the DoubleRDDFunctions class. It contains some handy functions that you can call if and only if you have a RDD[Double].

To obtain the histogram from a DataSet we need some boilerplate code:

val histogram = df.
  select(col("column")).
  rdd.
  map(r => r.getDouble(0)).
  histogram(numberOfBins)

The code was created to fit my needs, where I was ok with having an histogram with automatic edges and having the bins equally spaced. Instead of returning the bin edges, i get the middle value for later processing needs. Feel free to modify this to fit your needs. If you provide the bins beforehand it will be less coputationally expensive.

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row

def histogram(df: DataFrame,column: String, numBuckets: Int): DataFrame = { 
  val desiredDf = df.select(col(column))
  desiredDf.cache()
  val vector = desiredDf.rdd.map(r => r.getDouble(0))
  
  // Computation of min and max is optional
  val maxMin:Row = desiredDf.agg(max(col(column)), min(col(column))).head
  val minimum = maxMin.getDouble(1)
  val maximum = maxMin.getDouble(0)

  def computeBins(min: Double, max: Double, numBuckets: Int): Array[Double] = { 
    val step = (max - min)/numBuckets
    List.tabulate(numBuckets + 1)(min + _*step).toArray
  }
  def computeMidValueBins(min: Double, max: Double, numBuckets: Int): Array[Double] = { 
    val step = (max - min)/numBuckets
    List.tabulate(numBuckets)(min + _*step + step/2).toArray
  }

  val bins = computeBins(minimum, maximum, numBuckets)
  val midBins = computeMidValueBins(minimum, maximum, numBuckets)

  val results = vector.histogram(bins, true)
  desiredDf.unpersist()
  import spark.implicits._
  
  // Get the density of the bins also so it is normalized
  val densityValues = { 
    val sum = results.foldLeft(0L)(_+_)
    results.map(_*1.0/sum)
  }
  midBins.zip(densityValues).zip(results).toList.
    map{ case ((a,b),c) => (a,b,c)}.toDF("x","y","count")
}