# Simple kafka streams app in MapR environment

While Spark continues to thrive as the main big data processing framework for batch and streaming, alternatives emerging from the 1970s actor model and the reactive manifesto are gaining notoriety. Akka is widely known in the Scala community and on March 2016 Confluent released its library Kafka Streams.

One of the main problems of Spark for me is Scaling. Much of the time we are just tweaking executor characteristics: the number of them, its memory... And if the data volume is changing from day to day, either we are using way more resources than needed or the application just fails (depending on the app workload). If this is a nightmare just with batch jobs, in streaming I can't start imagine the situation.

Thats a very big issue and was one of the main reasons I've started processing libraries that scale-up and down automatically without causing me a headache. Keep in mind that since in my work have MapR, we are tied to their supported ecosystem pack.

This is, at the time of writing and regarding to streaming:

• Spark 2.3.1
• Flume 1.8
• Kafka-streamsish (Since it is a library and they "support" kafka 1.1.1, we can use it)

I've chosen to explore kafka-streams, and it has a nice feature out-of-the-box: Exactly-once processing (YAAAAY!). Since we don't need fancy analytics on our data, it can be a nice candidate.

The bad thing: it does not scale-up and down automatically, rather you have to implement some logic to do so (via metrics). I'll will have to tackle that in a later post.

First of all: what is kafka-streams? It's just a JVM application: one consumer that reads from topics and writes to other topics. If you want to parallelize: start up to N instances of the app. (N is the number of topic partition and determines the max number of consumers)

## Set-up MapR environment (VM)

We'll use the sandbox provided by MapR that already contains all the components needed. In our case the version is 6.1.0 (release date yyMM: 1808) so all the packages and dependencies will reference those numbers. It is important to use >6.1.0 since it included several key improvements to streaming applications (for example, integration with kafka-streams apps).

It has all what we need almost installed, and any other thing we need:

• MapR Event Store (kafka)
• MapR DB (HBase)
• MapR FS (HDFS)
• Spark
• ...

Now, follow the MapR documentation to install and set-up the VM. Make sure you download the latest version and kafka is > 1.1.1

Once done, you should be able to access the VM via its IP address, either local or a WLAN one (run ifconfig). In my case, I have set it up using a bridged adapter so access is available via:

Additionally, we'll need to create the following directory to write temporal files

# Tem
mkdir /tmp/kafka-streams


### Install a mapr gateway locally

MapR Streams needs to have at least one gateway service up in the cluster in order to manage replication and compaction of topics. However in the sandbox this service is missing so we must install and configure it. As root user (password: mapr), execute

yum install mapr-gateway
maprcli cluster gateway set -dstcluster demo.mapr.com -gateways 127.0.0.1

# Check that we canm find the gateway
maprcli cluster gateway resolve -dstcluster demo.mapr.com


NOTE
Cluster name can be found in /opt/mapr/conf/mapr-clusters.conf should be demo.mapr.com in our case

### Create the Stream and topics

MapR ES has some unique characteristics:

• Stream: an abstraction that allows you to group topics
• Streams are stored in HDFS so they usually are a route
• No need for zookeeper (Remember the gateway node?)

You can create and configure the streams via MapR MCS GUI, in Data -> Streams -> Create Stream We'll use as Stream name /steampipe. We'll leave all the default values and then topics need to be created via the Topics tab: duplicated-records and no-duplicates

Another option is to use the following shell commands

maprcli stream create -path /sample-stream \
-produceperm p -consumeperm p -topicperm p
maprcli stream topic create -path /steampipe -topic duplicated-records
maprcli stream topic create -path /steampipe -topic no-duplicates


#### Test it

Using the console producers and consumers we can test our process

# Read messages (and show keys)
/opt/mapr/kafka/kafka-1.1.1/bin/kafka-console-consumer.sh \
--bootstrap-server fake.9092 \
--topic /steampipe:duplicated-records \
--property print.key=true


And in another terminal do:

# Produce messages
#
#   To set the message's key, add the following and write the messages as key:value
#    --property "parse.key=true" --property "key.separator=:"
#
/opt/mapr/kafka/kafka-1.1.1/bin/kafka-console-producer.sh \
--broker-list fake.server:9999 \
--topic /steampipe:duplicated-records


## Application set-up

### Dependencies and subtleties

Instead of what new kafka documentation states you can't use yet the package kafka-streams-scala since it is for Kafka > 2.0.0. Instead, we'll use the artifact with ID kafka_2.11 since we are going to use scala 2.11.

Since MapR is a bit special (in the good and the bad way), most of the apache packages have some changes and we must use their versions. As an example, kafka-streams runs in the same cluster and you don't need to set any zookeeper quorum property when executing kafka applications. Adding its repository in our pom.xml will allow maven to download them.

<properties>
<mapr-version>6.1.0-mapr</mapr-version>
<mapr-streams-version>1.1.1-mapr-streams-6.1.0</mapr-streams-version>
<avro.version>1.8.2</avro.version>
<scala.version>2.11.12</scala.version>
<scala.compat.version>2.11</scala.compat.version>
</properties>

<repositories>
<repository>
<id>mapr-maven</id>
<url>http://repository.mapr.com/maven</url>
<releases><enabled>true</enabled></releases>
<snapshots><enabled>false</enabled></snapshots>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<!-- This version must be compatible with the Scala version of the Kafka dependency. -->
<version>${scala.version}</version> </dependency> <dependency> <groupId>com.mapr.streams</groupId> <artifactId>mapr-streams</artifactId> <version>${mapr-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${mapr-streams-version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>${mapr-streams-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.compat.version}</artifactId> <version>${mapr-streams-version}</version>
</dependency>
...
</dependencies>


HINT
To find all the mapr-specific package versions, go to their nexus repository

Furthermore, some additional arguments must be set in the scala plugin to be able to enable Java8 lambdas (needed for map, foreach ...)

<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.1</version>
<configuration>
<args>
<!--
In combination with Scala 2.11, -Xexperimental enables SAM
for Java 8 lambda support.  Make sure kafka.scala.version
is set to 2.11, not 2.10.
SUPER IMPORTANT PART!
-->
<arg>-Xexperimental</arg>
...
</args>
</configuration>
...
</plugin>


## Simple hash application

Let's say that for now we need to hash all the data from one topic and store into another one. In another post we will reuse part of the code, so let's keep it simple.

The configuration includes topic names (in MapR topics are mapped to an HDFS directory), the default serializers for the messages and its application_id. This property is important since it has to be unique and it is used to keep track of read offsets. Those offsets are synced across instances via MapR FS, stored inside /apps/kafka-streams/application_id

val TOPIC_INPUT = "/steampipe:duplicated-records"
val TOPIC_OUTPUT = "/steampipe:no-duplicates"

// Set up builder and config
val builder = new StreamsBuilder()
val streamingConfig = {
val settings = new Properties()
settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass.getName)
settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "hasher-app")
settings
}


And now, we have here the application logic, where we filter-out the null messages, since we can't deal with them, and apply a hash function over the message. Output is finally written to our output topic.

val hasher = Hashing.sha256()
def hash(string: String): String = hasher.hashString(string, StandardCharsets.UTF_8).toString

val lines: KStream[Array[Byte],String] = builder.stream(TOPIC_INPUT)
val hashed: KStream[Array[Byte],String] = lines.filterNot((_, v) => v == null).mapValues(hash(_))
hashed.to(TOPIC_OUTPUT)


We are almost set, we just have to build and order the start of the stream.

val topology: Topology = builder.build()
val streams: KafkaStreams = new KafkaStreams(topology, streamingConfig)

streams.start()


Now, package via mvn package and copy it to your VM.

### Run it!

First of all, to see the results run a console consumer reading from output topic /steampipe:no-duplicates

/opt/mapr/kafka/kafka-1.1.1/bin/kafka-console-consumer.sh --bootstrap-server fake.9092 --topic /steampipe:no-duplicates --property print.key=true


And then start the kafka-streams application.

java -cp "/opt/mapr/lib/*:steam-pipe-1.0-SNAPSHOT-jar-with-dependencies.jar" cat.martsec.kafka.steampipe.Hasher


So now you can start inputing messages in the console producer and its hash will appear in the no-duplicates topic. We are done!

>key1:aaaaaa
>key2:111111
>k:1
>k:2


Result

key1	ed02457b5c41d964dbd2f2a609d63fe1bb7528dbe55e1abf5b52c249cd735797
k	d4735e3a265e16eee03f59718b9b5d03019c07d8b6c51f90da3a666eec13ab35


## Recap

We have set-up the MapR sandbox to run our applications. Then, we've created a stream and a couple of topics and build a kafka-streams application that hashes the content of a message and pits into another topic.

As always, you can see the source code in my github repository

In the next post we sill deal with a common problem: duplicated messages (but not duplicates caused by kafka-streams, we have exactly-once).

Kafka streams and HBase as de-duplicator

References: