Using Avro In kafka Streams: the hard way
In this fourth part we will see how to use avro in kafka, and we will try to do it withhout using any extra help, the old way. This way we will experience the pain of not having a good integration ecosystem (e.g. a native library and the schema registry).
NOTE
I am using MapR so not all configurations are the same in its Open Source counterparts (e.g. I don't need to add the ZooKeeper nodes)
The input data
Since we will write this part of the application using kafka-streams, the input must be a topic, and since we already have a way to produce data to a topic, we'll reuse it.
The input topic name we will use is /steampipe:json-data
, created and populated by the JSONProducer App created in the last post. It has the following key and value.
R3 {"event":"9bf57c73-3169-4114-867d-41740e28e949","trainId":5,"line":"R3","stopTime":"2019-08-23T18:57:34.118Z","passengerOn":80,"passengersOff":73}
RL2 {"event":"4723c00a-6549-41cf-9722-634526188d47","trainId":5,"line":"RL2","stopTime":"2019-08-23T18:57:35.286Z","passengerOn":14,"passengersOff":34}
L1 {"event":"93797a8a-aa00-452c-89fb-5a9c9659b7c7","trainId":4,"line":"L1","stopTime":"2019-08-23T18:57:35.847Z","passengerOn":24,"passengersOff":62}
L7 {"event":"a9dd38a1-aecb-4a8f-a472-63c751d62ceb","trainId":1,"line":"L7","stopTime":"2019-08-23T18:57:36.310Z","passengerOn":81,"passengersOff":49}
From JSON to serialized avro
Before defining or reading the stream, we should tackle the hard part: how we will convert a JSON string to Avro serialized data.
In order to deserialize from JSON, we will use the same json4s library introduced in the previous post that will return a the case class created there also.
import org.json4s.native.Serialization
import org.json4s.native.Serialization.read
implicit val formats = Serialization.formats(NoTypeHints)
val data = read[PassengersDiff](json)
I've defined manually the schema to be used by avro
val schema = """
|{ "type": "record",
|"name":"passengersDiff",
|"fields": [
| {"name":"event", "type":"string"},
| {"name":"trainId", "type":"int"},
| {"name":"line", "type":"string"},
| {"name":"stopTime", "type":"long", "logicalType":"timestamp-millis"},
| {"name":"passengerOn", "type":"int"},
| {"name":"passengersOff", "type":"int"}
|]
|}
""".stripMargin
val avroSchema = new Parser().parse(schema)
And now, to serialize a case class to avro's GenericRecord we can use avro4s library. I've discovered that it is pretty useful and takes off the nuances of converting the schema manually. Once we have it, then convert to a byte array so we can write it to Kafka. You could use the schema provided by this library if you wish.
import com.sksamuel.avro4s.RecordFormat
def caseClassToAvro(data: PassengersDiff): Array[Byte] = {
val format = RecordFormat[PassengersDiff]
// record is a type that implements both GenericRecord and Specific Record
val record = format.to(data)
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(out, null)
val writer = new GenericDatumWriter[GenericRecord](PassengersDiff.avroSchema)
writer.write(record, encoder)
encoder.flush()
out.close()
out.toByteArray
}
This is a similar process that will need to be done if you are trying to use spark structured streaming to transform a dataframe's Row
/case class
to an avro record.
Putting everything together, we will have a method that takes a JSON string and transforms it to an avro byte array encoded. Notice that it will not include the schema since
def jsonToAvro(json: String): Array[Byte] = {
val data = read[PassengersDiff](json)
caseClassToAvro(data)
}
Transforming the stream content
Now that we have the way to obtain a record in avro, let's see how we actually transform the stream data.
My first approach, coming from spark, was to use the mapValues
method, however, I was getting into odd serialization and compiling errors, and looking through the Kafka streams documentation, I found the following
mapValues
is preferable tomap
because it will not cause data re-partitioning. However, it does not allow you to modify the key or key type like map does.
This did not fully solve all my problems, and in resignation, I had to change to map
despite being less efficient. To change any type, we need to use the class KeyValue
. In our case, we are changing the value type from String to binary ( Array[Byte]
in scala or byte[]
in java).
val lines: KStream[Array[Byte], String] = builder.stream(TOPIC_INPUT)
val parsed: KStream[Array[Byte], Array[Byte]] = lines.map(
(k, v) => new KeyValue[Array[Byte], Array[Byte]](k, jsonToAvro(v))
)
If we try to directly write to the stream, using parsed.to(TOPIC_OUTPUT)
we could face the following error
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: [B / value type: [B). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)
...
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
...
In our case, is because we set the default stream deserializer to binary for the key and string for the value. And we are now passing two binary values! To fix it, as the logstack points out, we can provide the correct values through the parameter with the Produced.`with`
method
parsed.to(TOPIC_OUTPUT, Produced.`with`(byteArraySerde,byteArraySerde))
This way, we will have successfully converted a JSON value to an avro serialized one.
[mapr@maprdemo ~]$ /opt/mapr/kafka/kafka-1.1.1/bin/kafka-console-consumer.sh --bootstrap-server fake.9092 --topic /steampipe:avro-data --property print.key=true
L5 Hb3ae5b6b-aab5-4241-a833-bacb8716c41dL5??Ì?[XT
RL2 Hc2a6ac74-d4f1-4375-b36f-df39af98463cRL2??Ì?[??
L7 H9a934420-abed-4a6d-b8ce-be518aa11dcL7??Ì?[(
S1 H609de372-c97a-4bc6-a7d8-ed4be91153bS1??Ì?[?
L5 H300c0da5-1222-4ce8-90cb-5115b5d6e72L5??Ì?[r$
S2 Hb88138fa-8212-4445-834a-88ff752ea026S2??Ì?[4?
R1 Ha3f88835-45bd-489d-9770-1d1766f36afe
R1??Ì?[??
RT1 H509284e3-c2d1-4b24-aacd-8f67aed0f9c9
RT1??Ì?[N
L3 H9334e8d3-9916-4141-ba5e-4c9c8690f399L3??Ì?[p:
The kafka series
You can check out the previous post in the series
- Simple Kafka streams app in MapR environment
- Kafka streams and HBase as de-duplicator
- Producing JSON data to a Kafka topic
Or the next post
--> Using Avro in Kafka streams: the easy way
As always, you can find the source code in my github repository.