Kafka Object Serialization and DeSerialization

Serialization is the process of converting an object into a stream of bytes 

Deserialization is the process of converting stream of bytes  to an object.

Remember Kafka stores message as array of bytes 

To create a serializer class, we need to implement below interface 

public interface Serializer extends Closeable {

//startup with configuration
  void configure(Map<String, ?> var1, boolean var2);
  byte[] serialize(String var1, T var2);
//method is called when the Kafka session is to be closed 
 void close();

To create a deserializer class, we need to implement below interface  org.apache.kafka.common.serialization.Deserializer 

public interface Deserializer extends Closeable {
//startup with configuration.
  void configure(Map<String, ?> var1, boolean var2);
  T deserialize(String var1, byte[] var2);
//method is called when the Kafka session is to be closed
  void close();


Its preety simple to implement so lets directly start with example - here I am creating 
1. UserMessage class, 
2. writing its serializer 
3. deserializer
4. using serializer in producer to convert object to byte stream and sending it to kafka
5. using deserializer in comsumer toget the object back from byte stream
package com.valassis.serialization;

public class UserMessage {

 private String key;
   private String value;
   public UserMessage() {
   public String getKey() {
  return key;
 public void setKey(String key) {
  this.key = key;
 public String getValue() {
  return value;
 public void setValue(String value) {
  this.value = value;
 public UserMessage(String key, String value) {
     this.key = key;
     this.value = value;
   @Override public String toString() {
     return "UserMessage(" + key + "=>" + value + ")";
package com.valassis.serialization;

import java.util.Map;

import org.apache.kafka.common.serialization.Serializer;

public class UserMessageSerializer implements Serializer<UserMessage> {

public void close() {

public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub


public byte[] serialize(String topic, UserMessage data) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
retVal = objectMapper.writeValueAsString(data).getBytes();
} catch (Exception e) {
return retVal;

package com.valassis.serialization;

import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;

public class UserMessageDeserializer implements Deserializer<UserMessage> {

public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub

public UserMessage deserialize(String topic, byte[] data) {
   ObjectMapper mapper = new ObjectMapper();
   UserMessage userMessage = null;
   try {
    userMessage = mapper.readValue(data, UserMessage.class);
   } catch (Exception e) {
   return userMessage;


public void close() {
// TODO Auto-generated method stub

package com.valassis.msg.producer;

import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.valassis.msg.generation.ZoneDataMessageCreationLocalFS;
import com.valassis.msg.generation.ZoneDataMessageCreationSerializerLocalFS;
import com.valassis.serialization.UserMessage;

public class ZoneDataSerializerProducer {
 public static void main(String[] args) {

  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092");

  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("request.required.acks", "1");
  props.put("", "test");
  Producer<String, UserMessage> producer = new KafkaProducer<String, UserMessage>(
  ZoneDataMessageCreationSerializerLocalFS messages = new ZoneDataMessageCreationSerializerLocalFS();
  Map<String, UserMessage> messageMap = messages.getZoneData();
  messageMap.forEach((key, value) -> {
   producer.send(new ProducerRecord<String, UserMessage>("testtopic", key , value));



--- SparkStreamingDeserializerConsumer .java

package com.valassis.msg.consumer;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import com.valassis.serialization.UserMessage;
import com.valassis.serialization.UserMessageDeserializer;

public class SparkStreamingDeserializerConsumer {

static Connection connection;

public static void main(String[] args) throws InterruptedException,
IOException {

SparkConf sparkConf = new SparkConf()

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", UserMessageDeserializer.class);
kafkaParams.put("", "1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("", false);

Collection<String> topics = Arrays.asList("testtopic");

// Create the context with 20 second interval batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
new Duration(20000));

Configuration conf = HBaseConfiguration.create();
* /user/mapr/zonedatastream 17/07/31 16:36:41 ERROR AsyncProcess:
* Failed to get region location
* org.apache.hadoop.hbase.TableNotFoundException:
* /user/mapr/zonedatastream at org.apache.hadoop.hbase.client.
* ConnectionManager$HConnectionImplementation
* .locateRegionInMeta(
conf.set("hbase.zookeeper.quorum", "localhost:5181");
conf.set("hbase.master", "localhost:60000");
conf.set("", "5181");
conf.set("hbase.rootdir", "maprfs:///hbase");

conf.set("username", "mapr");
conf.set("password", "mapr");
System.out.println("getting connection");
connection = ConnectionFactory.createConnection(conf);
System.out.println("connection populated");
table = connection.getTable(TABLE_NAME);

JavaInputDStream<ConsumerRecord<String, UserMessage>> stream = KafkaUtils
ConsumerStrategies.<String, UserMessage> Subscribe(topics,

// stream.mapToPair(record -> new Tuple2<>(record.key(),
// record.value())).foreachRDD(System.out::println);

// stream.mapToPair(record -> new Tuple2<>(record.key(),
// record.value())).print(); -> print(record.key(), record.value())).print();

// jssc.close();


// private static final TableName TABLE_NAME =
// TableName.valueOf("/user/mapr/zonedatastream");
// private static final TableName TABLE_NAME =
// TableName.valueOf("/user/mapr/zonedatastream");
// private static final byte[] CF = Bytes.toBytes("cf1");

private static final TableName TABLE_NAME = TableName.valueOf("test_log");
private static final byte[] CF = Bytes.toBytes("name");
private static Table table;

// Storing it to hbase
private static Object print(String key, UserMessage value) throws IOException,
InterruptedException {

// System.out.println("inside print");
// System.out.println("key ->" + key);
// System.out.println("value ->" + value);
// Table table = connection.getTable(TABLE_NAME);
// describe the data we want to write.
Put p = new Put(Bytes.toBytes(key));
p.addColumn(CF, Bytes.toBytes("messagekey"), Bytes.toBytes(value.getKey()));
p.addColumn(CF, Bytes.toBytes("value"), Bytes.toBytes(value.getValue()));
return key;


