Using Avro In kafka Streams: the hard way

In this fourth part we will see how to use avro in kafka, and we will try to do it withhout using any extra help, the old way. This way we will experience the pain of not having a good integration ecosystem (e.g. a native library and the schema registry).

NOTE
I am using MapR so not all configurations are the same in its Open Source counterparts (e.g. I don't need to add the ZooKeeper nodes)

The input data

Since we will write this part of the application using kafka-streams, the input must be a topic, and since we already have a way to produce data to a topic, we'll reuse it.

The input topic name we will use is /steampipe:json-data, created and populated by the JSONProducer App created in the last post. It has the following key and value.

R3	{"event":"9bf57c73-3169-4114-867d-41740e28e949","trainId":5,"line":"R3","stopTime":"2019-08-23T18:57:34.118Z","passengerOn":80,"passengersOff":73}
RL2	{"event":"4723c00a-6549-41cf-9722-634526188d47","trainId":5,"line":"RL2","stopTime":"2019-08-23T18:57:35.286Z","passengerOn":14,"passengersOff":34}
L1	{"event":"93797a8a-aa00-452c-89fb-5a9c9659b7c7","trainId":4,"line":"L1","stopTime":"2019-08-23T18:57:35.847Z","passengerOn":24,"passengersOff":62}
L7	{"event":"a9dd38a1-aecb-4a8f-a472-63c751d62ceb","trainId":1,"line":"L7","stopTime":"2019-08-23T18:57:36.310Z","passengerOn":81,"passengersOff":49}

From JSON to serialized avro

Before defining or reading the stream, we should tackle the hard part: how we will convert a JSON string to Avro serialized data.

In order to deserialize from JSON, we will use the same json4s library introduced in the previous post that will return a the case class created there also.

import org.json4s.native.Serialization
import org.json4s.native.Serialization.read

implicit val formats = Serialization.formats(NoTypeHints)
val data = read[PassengersDiff](json)

I've defined manually the schema to be used by avro

val schema = """
      |{ "type": "record",
      |"name":"passengersDiff",
      |"fields": [
      |  {"name":"event", "type":"string"},
      |  {"name":"trainId", "type":"int"},
      |  {"name":"line", "type":"string"},
      |  {"name":"stopTime", "type":"long", "logicalType":"timestamp-millis"},
      |  {"name":"passengerOn", "type":"int"},
      |  {"name":"passengersOff", "type":"int"}
      |]
      |}
      """.stripMargin
val avroSchema = new Parser().parse(schema)

And now, to serialize a case class to avro's GenericRecord we can use avro4s library. I've discovered that it is pretty useful and takes off the nuances of converting the schema manually. Once we have it, then convert to a byte array so we can write it to Kafka. You could use the schema provided by this library if you wish.

import com.sksamuel.avro4s.RecordFormat

def caseClassToAvro(data: PassengersDiff): Array[Byte] = {
  val format = RecordFormat[PassengersDiff]
  // record is a type that implements both GenericRecord and Specific Record
  val record = format.to(data)

  val out = new ByteArrayOutputStream()
  val encoder = EncoderFactory.get.binaryEncoder(out, null)
  val writer = new GenericDatumWriter[GenericRecord](PassengersDiff.avroSchema)

  writer.write(record, encoder)
  encoder.flush()
  out.close()
  out.toByteArray
}

This is a similar process that will need to be done if you are trying to use spark structured streaming to transform a dataframe's Row/case class  to an avro record.

Putting everything together, we will have a method that takes a JSON string and transforms it to an avro byte array encoded. Notice that it will not include the schema since

def jsonToAvro(json: String): Array[Byte] = {
  val data = read[PassengersDiff](json)
  caseClassToAvro(data)
}

Transforming the stream content

Now that we have the way to obtain a record in avro, let's see how we actually transform the stream data.

My first approach, coming from spark, was to use the mapValues method, however, I was getting into odd serialization and compiling errors, and looking through the Kafka streams documentation, I found the following

mapValues is preferable to map because it will not cause data re-partitioning. However, it does not allow you to modify the key or key type like map does.

This did not fully solve all my problems, and in resignation, I had to change to map despite being less efficient. To change any type, we need to use the class KeyValue. In our case, we are changing the value type from String to binary ( Array[Byte] in scala or byte[] in java).

val lines: KStream[Array[Byte], String] = builder.stream(TOPIC_INPUT)

val parsed: KStream[Array[Byte], Array[Byte]] = lines.map(
  (k, v) => new KeyValue[Array[Byte], Array[Byte]](k, jsonToAvro(v))
)

If we try to directly write to the stream, using parsed.to(TOPIC_OUTPUT) we could face the following error

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: [B / value type: [B). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
	at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)
...
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
...

In our case, is because we set the default stream deserializer to binary for the key and string for the value. And we are now passing two binary values! To fix it, as the logstack points out, we can provide the correct values through the parameter with the Produced.`with` method

parsed.to(TOPIC_OUTPUT, Produced.`with`(byteArraySerde,byteArraySerde))

This way, we will have successfully converted a JSON value to an avro serialized one.

[mapr@maprdemo ~]$ /opt/mapr/kafka/kafka-1.1.1/bin/kafka-console-consumer.sh --bootstrap-server fake.9092 --topic /steampipe:avro-data --property print.key=true
 
L5	Hb3ae5b6b-aab5-4241-a833-bacb8716c41dL5??Ì?[XT
RL2	Hc2a6ac74-d4f1-4375-b36f-df39af98463cRL2??Ì?[??
L7	H9a934420-abed-4a6d-b8ce-be518aa11dcL7??Ì?[(
S1	H609de372-c97a-4bc6-a7d8-ed4be91153bS1??Ì?[?
L5	H300c0da5-1222-4ce8-90cb-5115b5d6e72L5??Ì?[r$
S2	Hb88138fa-8212-4445-834a-88ff752ea026S2??Ì?[4?
R1	Ha3f88835-45bd-489d-9770-1d1766f36afe
R1??Ì?[??
RT1	H509284e3-c2d1-4b24-aacd-8f67aed0f9c9
RT1??Ì?[N
L3	H9334e8d3-9916-4141-ba5e-4c9c8690f399L3??Ì?[p:

The kafka series

You can check out the previous post in the series

Or the next post
--> Using Avro in Kafka streams: the easy way

As always, you can find the source code in my github repository.