Producing JSON data to a Kafka topic
The first big step to work with Kafka is to put data in a topic, and so is the purpose of this post. Being JSON the most common way to intercomunicate, and having the schema with the data, we will explore how we build a producer in scala to start populating our system.
NOTE
I am using MapR so not all configurations are the same in its Open Source counterparts (e.g. I don't need to add the ZooKeeper nodes)
Generate random input data
The first step to simulate a data pipeline will be to generate a more complex data, for example in JSON. For this, we'll use the json4s library that allows to easily convert a case class to a JSON string. Other options would be to use jackson or play JSON. However, since I want to explore other JSON libraries, this has been the choosen one.
case class PassengersDiff(event: String,
trainId: Int,
line: String,
stopTime: Timestamp,
passengerOn: Int,
passengersOff: Int)
val trainLines = List("R1", "R2", "R3", "R4", "RT1", "L3", "S2", "L7", "RL2", "L5", "L1", "S1")
def randomLine: String = trainLines(Random.nextInt(trainLines.size))
def randomPassengerCount: Int = Random.nextInt(100)
def randomTrainNumber: Int = Random.nextInt(6)
def randomDiff: PassengersDiff = PassengersDiff(
UUID.randomUUID().toString,
randomTrainNumber,
randomLine,
new Timestamp(System.currentTimeMillis()),
randomPassengerCount,
randomPassengerCount
)
Once we have the code to generate a case class with random values, it can be converted to a JSON string with the following piece of code.
import org.json4s.native.Serialization
import org.json4s.native.Serialization.write
implicit val formats = Serialization.formats(NoTypeHints)
val value = write(randomDiff)
Kafka producer: put put put
Once we have our data in a string format, we need to initializate a Producer so we can send the data. The configurations are similar to the ones from kafka stremas, but instend of having a default serializer, we'll only use one for the key and another for the value. In both our cases we'll use strings.
val TOPIC_OUTPUT = "/steampipe:json-data"
val kafkaCfg = {
val settings = new Properties()
settings.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
settings.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
settings
}
val producer = new KafkaProducer[String, String](kafkaCfg)
And once we have the producer, we will continuouslly put data to the topic. Easy peasy.
while (true) {
implicit val formats = Serialization.formats(NoTypeHints)
val value = write(randomDiff)
// Send the key and data
producer.send(new ProducerRecord(TOPIC_OUTPUT, passengerDiff.line, value))
Thread.sleep(Random.nextInt(1000))
}
And we can see what this will look like in the topic
[mapr@maprdemo ~]$ /opt/mapr/kafka/kafka-1.1.1/bin/kafka-console-consumer.sh --bootstrap-server fake.9092 --topic /steampipe:json-data --property print.key=true
R3 {"event":"9bf57c73-3169-4114-867d-41740e28e949","trainId":5,"line":"R3","stopTime":"2019-08-23T18:57:34.118Z","passengerOn":80,"passengersOff":73}
RL2 {"event":"4723c00a-6549-41cf-9722-634526188d47","trainId":5,"line":"RL2","stopTime":"2019-08-23T18:57:35.286Z","passengerOn":14,"passengersOff":34}
L1 {"event":"93797a8a-aa00-452c-89fb-5a9c9659b7c7","trainId":4,"line":"L1","stopTime":"2019-08-23T18:57:35.847Z","passengerOn":24,"passengersOff":62}
L7 {"event":"a9dd38a1-aecb-4a8f-a472-63c751d62ceb","trainId":1,"line":"L7","stopTime":"2019-08-23T18:57:36.310Z","passengerOn":81,"passengersOff":49}
Now, other applications can consume from it, and we are all set to move to explore avro, finding a couple nuences along the way.
The kafka series
You can check out the previous post in the series
Or the next post
--> Using Avro in Kafka streams: the hard way
As always, you can find the source code in my github repository (see the code for JSONProducer).