Producing JSON data to a Kafka topic

Producing JSON data to a Kafka topic

The first big step to work with Kafka is to put data in a topic, and so is the purpose of this post. Being JSON the most common way to intercomunicate, and having the schema with the data, we will explore how we build a producer in scala to start populating our system.

NOTE
I am using MapR so not all configurations are the same in its Open Source counterparts (e.g. I don't need to add the ZooKeeper nodes)

Generate random input data

The first step to simulate a data pipeline will be to generate a more complex data, for example in JSON. For this, we'll use the json4s library that allows to easily convert a case class to a JSON string. Other options would be to use jackson or play JSON. However, since I want to explore other JSON libraries, this has been the choosen one.

case class PassengersDiff(event: String,
                          trainId: Int,
                          line: String,
                          stopTime: Timestamp,
                          passengerOn: Int,
                          passengersOff: Int)

val trainLines = List("R1", "R2", "R3", "R4", "RT1", "L3", "S2", "L7", "RL2", "L5", "L1", "S1")
def randomLine: String = trainLines(Random.nextInt(trainLines.size))
def randomPassengerCount: Int = Random.nextInt(100)
def randomTrainNumber: Int = Random.nextInt(6)

def randomDiff: PassengersDiff = PassengersDiff(
    UUID.randomUUID().toString,
    randomTrainNumber,
    randomLine,
    new Timestamp(System.currentTimeMillis()),
    randomPassengerCount,
    randomPassengerCount
)

Once we have the code to generate a case class with random values, it can be converted to a JSON string with the following piece of code.

import org.json4s.native.Serialization
import org.json4s.native.Serialization.write


implicit val formats = Serialization.formats(NoTypeHints)
val value = write(randomDiff)

Kafka producer: put put put

Once we have our data in a string format, we need to initializate a Producer so we can send the data. The configurations are similar to the ones from kafka stremas, but instend of having a default serializer, we'll only use one for the key and another for the value. In both our cases we'll use strings.

val TOPIC_OUTPUT = "/steampipe:json-data"

val kafkaCfg = {
  val settings = new Properties()
  settings.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
  settings.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
  settings
}

val producer = new KafkaProducer[String, String](kafkaCfg)

And once we have the producer, we will continuouslly put data to the topic. Easy peasy.

while (true) {
  implicit val formats = Serialization.formats(NoTypeHints)
  val value = write(randomDiff)
  // Send the key and data
  producer.send(new ProducerRecord(TOPIC_OUTPUT, passengerDiff.line, value))
  Thread.sleep(Random.nextInt(1000))
}

And we can see what this will look like in the topic

[mapr@maprdemo ~]$ /opt/mapr/kafka/kafka-1.1.1/bin/kafka-console-consumer.sh --bootstrap-server fake.9092 --topic /steampipe:json-data --property print.key=true
R3	{"event":"9bf57c73-3169-4114-867d-41740e28e949","trainId":5,"line":"R3","stopTime":"2019-08-23T18:57:34.118Z","passengerOn":80,"passengersOff":73}
RL2	{"event":"4723c00a-6549-41cf-9722-634526188d47","trainId":5,"line":"RL2","stopTime":"2019-08-23T18:57:35.286Z","passengerOn":14,"passengersOff":34}
L1	{"event":"93797a8a-aa00-452c-89fb-5a9c9659b7c7","trainId":4,"line":"L1","stopTime":"2019-08-23T18:57:35.847Z","passengerOn":24,"passengersOff":62}
L7	{"event":"a9dd38a1-aecb-4a8f-a472-63c751d62ceb","trainId":1,"line":"L7","stopTime":"2019-08-23T18:57:36.310Z","passengerOn":81,"passengersOff":49}

Now, other applications can consume from it, and we are all set to move to explore avro, finding a couple nuences along the way.


The kafka series

You can check out the previous post in the series

Or the next post
--> Using Avro in Kafka streams: the hard way

As always, you can find the source code in my github repository (see the code for JSONProducer).