In this second part of the Kafka in MapR series we are going to de-duplicate identical messages using HBase.

NOTE
I am using MapR so not all configurations are the same in its Open Source counterparts (e.g. I don't need to add the ZooKeeper nodes)

This is the diagram of our desired application. The two first boxes are filters and the last one is a foreach method.

steam-pipe-deduplication

Set-up HBase

We'll need to create a table and a family. Since logic is not complicated, a single column will be good enough for this exercise.

Run hbase shell and execute the sentences below.

create '/user/mapr/deduplication-table', {NAME=>'dedup'}

Description of the table, where we can see the Bloom filter being enabled on rows. That's great!

hbase(main):004:0> describe '/user/mapr/deduplication-table'
Table /user/mapr/deduplication-table is ENABLED                                           
/user/mapr/deduplication-table, {TABLE_ATTRIBUTES => {MAX_FILESIZE => '4294967296', METADA
TA => {'AUTOSPLIT' => 'true', 'MAPR_UUID' => 'dedeca7f-5df7-6a66-aae4-043bf8305c00', 'MAX_
VALUE_SIZE_IN_MEM' => '100'}}                                                             
COLUMN FAMILIES DESCRIPTION                                                               
{NAME => 'dedup', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETE
D_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE'
, MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '8192', REPLICATION_SCOPE => '0'
, METADATA => {'compression_raw' => '31'}}                                                
1 row(s) in 0.0510 seconds

Bloom filters are data structures optimized to efficiently (computation and space-wise) check if an element is a member of a set. They are probabilistic and their returns are: possibly in set or definitely not. They are really cool, check them out! Probably it'll cost you Constant time

Application

Our process has two filtering steps:

  1. Filter-out empty messages
  2. Only insert messages we have not seen

The second part is accomplished comparing the hash of the message using HBase. The hash will be the lookup table key and we'll check for existence (for now). The table is prepared to be able to reprocess a message checking for a column value (e.g.: if that value is 1, do nothing, if it is 0, do not filter-out the message).

val hasher: HashFunction = Hashing.sha256()
def hash(string: String): Array[Byte] = hasher.hashString(string, StandardCharsets.UTF_8).asBytes()

def messageEmpty[T](msg: T): Boolean = msg == null

def messageToProcess[T](msg: T, hbaseTable: Table ): Boolean = {
  val msgHash = hash(msg.toString)
  // See if the table contains its hash
  val get = new Get(msgHash)
  // Done this way so in a future we could act depending on the result value
  hbaseTable.get(get).isEmpty
}

Now that the filters are clear, there is one extra step to the logic: writing the processed messages in HBase.

def getHBaseTable: Table = {
  val conf = HBaseConfiguration.create()
  val connection = ConnectionFactory.createConnection(conf)
  connection.getTable(TableName.valueOf(HBASE_TABLE))
}

def insertToHBase[T](msg: T, table: Table): Unit = {
  val msgHash = hash(msg.toString)
  val put = new Put(msgHash)
  put.addColumn(HBASE_COL_FAMILY,HBASE_COL_NAME,
    Calendar.getInstance().getTimeInMillis,HBASE_PROCESSED_VALUE)
  table.put(put)
}

WARNING: This simple approach has couple of limitations:

  • Possible race condition if two kafka-streams app instances process the same message (duplicated) at the same time
  • Inconsistency could happen if an agent fails after inserting the message in HBase but before writing to the stream

And then, the stream logic is the following:

// configure HBase
val table = getHBaseTable
    
// Starts stream logic
val lines: KStream[Array[Byte],String] = builder.stream(TOPIC_INPUT)

val nonSeenMessages: KStream[Array[Byte],String] = lines
  // If message is null, we consider it not valid
  .filterNot((_, v) => messageEmpty(v))
  // We let pass only the messages that do not exist in our HBase table
  .filter((_,v) => messageToProcess(v, table))

// We insert the HASHES as key of HBase table
// ASSUMPTION: connection does not fail
nonSeenMessages.foreach((_,v) => insertToHBase(v, table))

// Finally, we write to the output topic
nonSeenMessages.to(TOPIC_OUTPUT)

// Starting stream
val topology: Topology = builder.build()
val streams: KafkaStreams = new KafkaStreams(topology, streamingConfig)
streams.start()

IMPORTANT
It you find errors with some calls to a google library, try downgrading guava dependency to 17.0 or something like that. There is a conflict at least between hbase-client and guava 19.0

Run it

java -cp "/opt/mapr/hbase/hbase-1.1.8/conf/:/opt/mapr/lib/*:steam-pipe-1.0-SNAPSHOT-jar-with-dependencies.jar" cat.martsec.kafka.steampipe.Deduplicator

IMPORTANT
Add /opt/mapr/hbase/hbase-1.1.8/conf/ and /opt/mapr/lib/* to the classpath. If you don't add the first one, the app won't be able to connect to HBase

Then we can use the console producer to write some messages

>This is a unique record
>It works
>It works
>NyanCat
>So 2011
>NyanCat

We can see that the duplicate messages do not appear in the output topic

This is a unique record
It works
NyanCat
So 2011

The kafka series

You can check out the previous post in the series

Or the next post
--> Producing JSON data to a Kafka topic

As always, you can find the source code in my github repository.


Resources