Simple kafka streams app in MapR environment
While Spark continues to thrive as the main big data processing framework for batch and streaming, alternatives emerging from the 1970s actor model and the reactive manifesto are gaining notoriety. Akka is widely known in the Scala community and on March 2016 Confluent released its library Kafka Streams.
One of the main problems of Spark for me is Scaling. Much of the time we are just tweaking executor characteristics: the number of them, its memory... And if the data volume is changing from day to day, either we are using way more resources than needed or the application just fails (depending on the app workload). If this is a nightmare just with batch jobs, in streaming I can't start imagine the situation.
Thats a very big issue and was one of the main reasons I've started processing libraries that scale-up and down automatically without causing me a headache. Keep in mind that since in my work have MapR, we are tied to their supported ecosystem pack.
This is, at the time of writing and regarding to streaming:
- Spark 2.3.1
- Flume 1.8
- Kafka-streamsish (Since it is a library and they "support" kafka 1.1.1, we can use it)
I've chosen to explore kafka-streams, and it has a nice feature out-of-the-box: Exactly-once processing (YAAAAY!). Since we don't need fancy analytics on our data, it can be a nice candidate.
The bad thing: it does not scale-up and down automatically, rather you have to implement some logic to do so (via metrics). I'll will have to tackle that in a later post.
First of all: what is kafka-streams? It's just a JVM application: one consumer that reads from topics and writes to other topics. If you want to parallelize: start up to N instances of the app. (N is the number of topic partition and determines the max number of consumers)
Set-up MapR environment (VM)
We'll use the sandbox provided by MapR that already contains all the components needed. In our case the version is 6.1.0 (release date yyMM: 1808) so all the packages and dependencies will reference those numbers. It is important to use >6.1.0 since it included several key improvements to streaming applications (for example, integration with kafka-streams apps).
It has all what we need almost installed, and any other thing we need:
- MapR Event Store (kafka)
- MapR DB (HBase)
- MapR FS (HDFS)
- Spark
- Flink
- ...
Now, follow the MapR documentation to install and set-up the VM. Make sure you download the latest version and kafka is > 1.1.1
Once done, you should be able to access the VM via its IP address, either local or a WLAN one (run ifconfig
). In my case, I have set it up using a bridged adapter so access is available via:
ssh mapr@192.168.1.60
- HUE: http://192.168.1.23:8888/
- MapR Control System (admin page): https://192.168.1.23:8443/
Additionally, we'll need to create the following directory to write temporal files
# Tem
mkdir /tmp/kafka-streams
Install a mapr gateway locally
MapR Streams needs to have at least one gateway service up in the cluster in order to manage replication and compaction of topics. However in the sandbox this service is missing so we must install and configure it. As root
user (password: mapr
), execute
yum install mapr-gateway
maprcli cluster gateway set -dstcluster demo.mapr.com -gateways 127.0.0.1
# Check that we canm find the gateway
maprcli cluster gateway resolve -dstcluster demo.mapr.com
NOTE
Cluster name can be found in/opt/mapr/conf/mapr-clusters.conf
should be demo.mapr.com in our case
Create the Stream and topics
MapR ES has some unique characteristics:
- Stream: an abstraction that allows you to group topics
- Streams are stored in HDFS so they usually are a route
- No need for zookeeper (Remember the gateway node?)
You can create and configure the streams via MapR MCS GUI, in Data -> Streams -> Create Stream We'll use as Stream name /steampipe
. We'll leave all the default values and then topics need to be created via the Topics tab: duplicated-records
and no-duplicates
Another option is to use the following shell commands
maprcli stream create -path /sample-stream \
-produceperm p -consumeperm p -topicperm p
maprcli stream topic create -path /steampipe -topic duplicated-records
maprcli stream topic create -path /steampipe -topic no-duplicates
Test it
Using the console producers and consumers we can test our process
# Read messages (and show keys)
/opt/mapr/kafka/kafka-1.1.1/bin/kafka-console-consumer.sh \
--bootstrap-server fake.9092 \
--topic /steampipe:duplicated-records \
--property print.key=true
And in another terminal do:
# Produce messages
#
# To set the message's key, add the following and write the messages as key:value
# --property "parse.key=true" --property "key.separator=:"
#
/opt/mapr/kafka/kafka-1.1.1/bin/kafka-console-producer.sh \
--broker-list fake.server:9999 \
--topic /steampipe:duplicated-records
Application set-up
Dependencies and subtleties
Instead of what new kafka documentation states you can't use yet the package kafka-streams-scala
since it is for Kafka > 2.0.0. Instead, we'll use the artifact with ID kafka_2.11
since we are going to use scala 2.11.
Since MapR is a bit special (in the good and the bad way), most of the apache packages have some changes and we must use their versions. As an example, kafka-streams runs in the same cluster and you don't need to set any zookeeper quorum property when executing kafka applications. Adding its repository in our pom.xml
will allow maven to download them.
<properties>
<mapr-version>6.1.0-mapr</mapr-version>
<mapr-streams-version>1.1.1-mapr-streams-6.1.0</mapr-streams-version>
<avro.version>1.8.2</avro.version>
<scala.version>2.11.12</scala.version>
<scala.compat.version>2.11</scala.compat.version>
</properties>
<repositories>
<repository>
<id>mapr-maven</id>
<url>http://repository.mapr.com/maven</url>
<releases><enabled>true</enabled></releases>
<snapshots><enabled>false</enabled></snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<!-- This version must be compatible with the Scala version of the Kafka dependency. -->
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.mapr.streams</groupId>
<artifactId>mapr-streams</artifactId>
<version>${mapr-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${mapr-streams-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${mapr-streams-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.compat.version}</artifactId>
<version>${mapr-streams-version}</version>
</dependency>
...
</dependencies>
HINT
To find all the mapr-specific package versions, go to their nexus repository
Furthermore, some additional arguments must be set in the scala plugin to be able to enable Java8 lambdas (needed for map
, foreach
...)
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.1</version>
<configuration>
<args>
<!--
In combination with Scala 2.11, `-Xexperimental` enables SAM
for Java 8 lambda support. Make sure `kafka.scala.version`
is set to `2.11`, not `2.10`.
SUPER IMPORTANT PART!
-->
<arg>-Xexperimental</arg>
...
</args>
</configuration>
...
</plugin>
Simple hash application
Let's say that for now we need to hash all the data from one topic and store into another one. In another post we will reuse part of the code, so let's keep it simple.
The configuration includes topic names (in MapR topics are mapped to an HDFS directory), the default serializers for the messages and its application_id
. This property is important since it has to be unique and it is used to keep track of read offsets. Those offsets are synced across instances via MapR FS, stored inside /apps/kafka-streams/application_id
val TOPIC_INPUT = "/steampipe:duplicated-records"
val TOPIC_OUTPUT = "/steampipe:no-duplicates"
// Set up builder and config
val builder = new StreamsBuilder()
val streamingConfig = {
val settings = new Properties()
settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass.getName)
settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "hasher-app")
settings
}
And now, we have here the application logic, where we filter-out the null messages, since we can't deal with them, and apply a hash function over the message. Output is finally written to our output topic.
val hasher = Hashing.sha256()
def hash(string: String): String = hasher.hashString(string, StandardCharsets.UTF_8).toString
val lines: KStream[Array[Byte],String] = builder.stream(TOPIC_INPUT)
val hashed: KStream[Array[Byte],String] = lines.filterNot((_, v) => v == null).mapValues(hash(_))
hashed.to(TOPIC_OUTPUT)
We are almost set, we just have to build and order the start of the stream.
val topology: Topology = builder.build()
val streams: KafkaStreams = new KafkaStreams(topology, streamingConfig)
streams.start()
Now, package via mvn package
and copy it to your VM.
Run it!
First of all, to see the results run a console consumer reading from output topic /steampipe:no-duplicates
/opt/mapr/kafka/kafka-1.1.1/bin/kafka-console-consumer.sh --bootstrap-server fake.9092 --topic /steampipe:no-duplicates --property print.key=true
And then start the kafka-streams application.
java -cp "/opt/mapr/lib/*:steam-pipe-1.0-SNAPSHOT-jar-with-dependencies.jar" cat.martsec.kafka.steampipe.Hasher
So now you can start inputing messages in the console producer and its hash will appear in the no-duplicates
topic. We are done!
>key1:aaaaaa
>key2:111111
>k:1
>k:2
Result
key1 ed02457b5c41d964dbd2f2a609d63fe1bb7528dbe55e1abf5b52c249cd735797
key2 bcb15f821479b4d5772bd0ca866c00ad5f926e3580720659cc80d39c9d09802a
k 6b86b273ff34fce19d6b804eff5a3f5747ada4eaa22f1d49c01e52ddb7875b4b
k d4735e3a265e16eee03f59718b9b5d03019c07d8b6c51f90da3a666eec13ab35
k 6b86b273ff34fce19d6b804eff5a3f5747ada4eaa22f1d49c01e52ddb7875b4b
Recap
We have set-up the MapR sandbox to run our applications. Then, we've created a stream and a couple of topics and build a kafka-streams application that hashes the content of a message and pits into another topic.
As always, you can see the source code in my github repository
In the next post we sill deal with a common problem: duplicated messages (but not duplicates caused by kafka-streams, we have exactly-once).
Kafka streams and HBase as de-duplicator
References:
- https://github.com/confluentinc/kafka-streams-examples/blob/4.0.2-post/src/main/scala/io/confluent/examples/streams/MapFunctionScalaExample.scala
- https://mapr.com/docs/61/Kafka/KStreams/Kstreams-demo-example.html
- https://mapr.com/docs/61/Gateways/gateways_for_replicating_streams.html
- https://mapr.com/docs/61/Gateways/ConfiguringMapRGatewaysForTRAndI.html
- https://mapr.com/docs/61/Gateways/Gateways-managing.html