Spark has a way to compute the histogram, however, it is kept under low level and sometimes obscure classes. In this post I'll give you a function that provides you with the desired values passing a dataframe.

In the official documentation the only mention to histogram is in the `DoubleRDDFunctions`

class. It contains some handy functions that you can call if and only if you have a `RDD[Double]`

.

To obtain the histogram from a `DataSet`

we need some boilerplate code:

```
val histogram = df.
select(col("column")).
rdd.
map(r => r.getDouble(0)).
histogram(numberOfBins)
```

The code was created to fit my needs, where I was ok with having an histogram with automatic edges and having the bins equally spaced. Instead of returning the bin edges, i get the middle value for later processing needs. Feel free to modify this to fit your needs. If you provide the bins beforehand it will be less coputationally expensive.

```
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
def histogram(df: DataFrame,column: String, numBuckets: Int): DataFrame = {
val desiredDf = df.select(col(column))
desiredDf.cache()
val vector = desiredDf.rdd.map(r => r.getDouble(0))
// Computation of min and max is optional
val maxMin:Row = desiredDf.agg(max(col(column)), min(col(column))).head
val minimum = maxMin.getDouble(1)
val maximum = maxMin.getDouble(0)
def computeBins(min: Double, max: Double, numBuckets: Int): Array[Double] = {
val step = (max - min)/numBuckets
List.tabulate(numBuckets + 1)(min + _*step).toArray
}
def computeMidValueBins(min: Double, max: Double, numBuckets: Int): Array[Double] = {
val step = (max - min)/numBuckets
List.tabulate(numBuckets)(min + _*step + step/2).toArray
}
val bins = computeBins(minimum, maximum, numBuckets)
val midBins = computeMidValueBins(minimum, maximum, numBuckets)
val results = vector.histogram(bins, true)
desiredDf.unpersist()
import spark.implicits._
// Get the density of the bins also so it is normalized
val densityValues = {
val sum = results.foldLeft(0L)(_+_)
results.map(_*1.0/sum)
}
midBins.zip(densityValues).zip(results).toList.
map{ case ((a,b),c) => (a,b,c)}.toDF("x","y","count")
}
```