Kafka streams and HBase as de-duplicator
In this second part of the Kafka in MapR series we are going to de-duplicate identical messages using HBase.
NOTE
I am using MapR so not all configurations are the same in its Open Source counterparts (e.g. I don't need to add the ZooKeeper nodes)
This is the diagram of our desired application. The two first boxes are filters and the last one is a foreach
method.
Set-up HBase
We'll need to create a table and a family. Since logic is not complicated, a single column will be good enough for this exercise.
Run hbase shell
and execute the sentences below.
create '/user/mapr/deduplication-table', {NAME=>'dedup'}
Description of the table, where we can see the Bloom filter being enabled on rows. That's great!
hbase(main):004:0> describe '/user/mapr/deduplication-table'
Table /user/mapr/deduplication-table is ENABLED
/user/mapr/deduplication-table, {TABLE_ATTRIBUTES => {MAX_FILESIZE => '4294967296', METADA
TA => {'AUTOSPLIT' => 'true', 'MAPR_UUID' => 'dedeca7f-5df7-6a66-aae4-043bf8305c00', 'MAX_
VALUE_SIZE_IN_MEM' => '100'}}
COLUMN FAMILIES DESCRIPTION
{NAME => 'dedup', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETE
D_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE'
, MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '8192', REPLICATION_SCOPE => '0'
, METADATA => {'compression_raw' => '31'}}
1 row(s) in 0.0510 seconds
Bloom filters are data structures optimized to efficiently (computation and space-wise) check if an element is a member of a set. They are probabilistic and their returns are: possibly in set or definitely not. They are really cool, check them out! Probably it'll cost you Constant time
Application
Our process has two filtering steps:
- Filter-out empty messages
- Only insert messages we have not seen
The second part is accomplished comparing the hash of the message using HBase. The hash will be the lookup table key and we'll check for existence (for now). The table is prepared to be able to reprocess a message checking for a column value (e.g.: if that value is 1, do nothing, if it is 0, do not filter-out the message).
val hasher: HashFunction = Hashing.sha256()
def hash(string: String): Array[Byte] = hasher.hashString(string, StandardCharsets.UTF_8).asBytes()
def messageEmpty[T](msg: T): Boolean = msg == null
def messageToProcess[T](msg: T, hbaseTable: Table ): Boolean = {
val msgHash = hash(msg.toString)
// See if the table contains its hash
val get = new Get(msgHash)
// Done this way so in a future we could act depending on the result value
hbaseTable.get(get).isEmpty
}
Now that the filters are clear, there is one extra step to the logic: writing the processed messages in HBase.
def getHBaseTable: Table = {
val conf = HBaseConfiguration.create()
val connection = ConnectionFactory.createConnection(conf)
connection.getTable(TableName.valueOf(HBASE_TABLE))
}
def insertToHBase[T](msg: T, table: Table): Unit = {
val msgHash = hash(msg.toString)
val put = new Put(msgHash)
put.addColumn(HBASE_COL_FAMILY,HBASE_COL_NAME,
Calendar.getInstance().getTimeInMillis,HBASE_PROCESSED_VALUE)
table.put(put)
}
WARNING: This simple approach has couple of limitations:
- Possible race condition if two
kafka-streams
app instances process the same message (duplicated) at the same time- Inconsistency could happen if an agent fails after inserting the message in HBase but before writing to the stream
And then, the stream logic is the following:
// configure HBase
val table = getHBaseTable
// Starts stream logic
val lines: KStream[Array[Byte],String] = builder.stream(TOPIC_INPUT)
val nonSeenMessages: KStream[Array[Byte],String] = lines
// If message is null, we consider it not valid
.filterNot((_, v) => messageEmpty(v))
// We let pass only the messages that do not exist in our HBase table
.filter((_,v) => messageToProcess(v, table))
// We insert the HASHES as key of HBase table
// ASSUMPTION: connection does not fail
nonSeenMessages.foreach((_,v) => insertToHBase(v, table))
// Finally, we write to the output topic
nonSeenMessages.to(TOPIC_OUTPUT)
// Starting stream
val topology: Topology = builder.build()
val streams: KafkaStreams = new KafkaStreams(topology, streamingConfig)
streams.start()
IMPORTANT
It you find errors with some calls to a google library, try downgradingguava
dependency to 17.0 or something like that. There is a conflict at least betweenhbase-client
and guava 19.0
Run it
java -cp "/opt/mapr/hbase/hbase-1.1.8/conf/:/opt/mapr/lib/*:steam-pipe-1.0-SNAPSHOT-jar-with-dependencies.jar" cat.martsec.kafka.steampipe.Deduplicator
IMPORTANT
Add/opt/mapr/hbase/hbase-1.1.8/conf/
and/opt/mapr/lib/*
to the classpath. If you don't add the first one, the app won't be able to connect to HBase
Then we can use the console producer to write some messages
>This is a unique record
>It works
>It works
>NyanCat
>So 2011
>NyanCat
We can see that the duplicate messages do not appear in the output topic
This is a unique record
It works
NyanCat
So 2011
The kafka series
You can check out the previous post in the series
Or the next post
--> Producing JSON data to a Kafka topic
As always, you can find the source code in my github repository.