Wednesday, August 16, 2017

Kafka Object Serialization and DeSerialization

Serialization is the process of converting an object into a stream of bytes 

Deserialization is the process of converting stream of bytes  to an object.

Remember Kafka stores message as array of bytes 

To create a serializer class, we need to implement below interface 
org.apache.kafka.common.serialization.Serializer  


public interface Serializer extends Closeable {

//startup with configuration
  void configure(Map<String, ?> var1, boolean var2);
  byte[] serialize(String var1, T var2);
//method is called when the Kafka session is to be closed 
 void close();
}

To create a deserializer class, we need to implement below interface  org.apache.kafka.common.serialization.Deserializer 


public interface Deserializer extends Closeable {
//startup with configuration.
  void configure(Map<String, ?> var1, boolean var2);
  T deserialize(String var1, byte[] var2);
//method is called when the Kafka session is to be closed
  void close();

}


Its preety simple to implement so lets directly start with example - here I am creating 
1. UserMessage class, 
2. writing its serializer 
3. deserializer
4. using serializer in producer to convert object to byte stream and sending it to kafka
5. using deserializer in comsumer toget the object back from byte stream
 
-- UserMesasge.java-----
package com.valassis.serialization;

public class UserMessage {

 private String key;
   private String value;
   public UserMessage() {
   }
   public String getKey() {
  return key;
 }
 public void setKey(String key) {
  this.key = key;
 }
 public String getValue() {
  return value;
 }
 public void setValue(String value) {
  this.value = value;
 }
 public UserMessage(String key, String value) {
     this.key = key;
     this.value = value;
   }
   @Override public String toString() {
     return "UserMessage(" + key + "=>" + value + ")";
   }
}

---UserMessageSerializer.java
package com.valassis.serialization;

import java.util.Map;

import org.apache.kafka.common.serialization.Serializer;
import org.codehaus.jackson.map.ObjectMapper;

public class UserMessageSerializer implements Serializer<UserMessage> {

@Override
public void close() {
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub

}

@Override
public byte[] serialize(String topic, UserMessage data) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
retVal = objectMapper.writeValueAsString(data).getBytes();
} catch (Exception e) {
e.printStackTrace();
}
return retVal;
}


}
---UserMessageDeserializer.java
package com.valassis.serialization;

import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;
import org.codehaus.jackson.map.ObjectMapper;

public class UserMessageDeserializer implements Deserializer<UserMessage> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub
}

@Override
public UserMessage deserialize(String topic, byte[] data) {
   ObjectMapper mapper = new ObjectMapper();
   UserMessage userMessage = null;
   try {
    userMessage = mapper.readValue(data, UserMessage.class);
   } catch (Exception e) {
     e.printStackTrace();
   }
   return userMessage;


}

@Override
public void close() {
// TODO Auto-generated method stub
}
}


----ZoneDataSerializerProducer.java 

package com.valassis.msg.producer;

import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.valassis.msg.generation.ZoneDataMessageCreationLocalFS;
import com.valassis.msg.generation.ZoneDataMessageCreationSerializerLocalFS;
import com.valassis.serialization.UserMessage;

public class ZoneDataSerializerProducer {
 public static void main(String[] args) {

  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092");

  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer",
    "com.valassis.serialization.UserMessageSerializer");
  props.put("request.required.acks", "1");
  props.put("client.id", "test");
  Producer<String, UserMessage> producer = new KafkaProducer<String, UserMessage>(
    props);
  ZoneDataMessageCreationSerializerLocalFS messages = new ZoneDataMessageCreationSerializerLocalFS();
  Map<String, UserMessage> messageMap = messages.getZoneData();
  System.out.println(messageMap.size());
  messageMap.forEach((key, value) -> {
     
   producer.send(new ProducerRecord<String, UserMessage>("testtopic", key , value));

  });
  producer.close();
 }

}

--- SparkStreamingDeserializerConsumer .java

package com.valassis.msg.consumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import com.valassis.serialization.UserMessage;
import com.valassis.serialization.UserMessageDeserializer;

public class SparkStreamingDeserializerConsumer {

static Connection connection;

public static void main(String[] args) throws InterruptedException,
IOException {

SparkConf sparkConf = new SparkConf()
.setAppName("Pawan-JavaKafkaMessageConsumer");

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", UserMessageDeserializer.class);
kafkaParams.put("group.id", "1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("testtopic");

// Create the context with 20 second interval batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
new Duration(20000));

Configuration conf = HBaseConfiguration.create();
/*
* /user/mapr/zonedatastream 17/07/31 16:36:41 ERROR AsyncProcess:
* Failed to get region location
* org.apache.hadoop.hbase.TableNotFoundException:
* /user/mapr/zonedatastream at org.apache.hadoop.hbase.client.
* ConnectionManager$HConnectionImplementation
* .locateRegionInMeta(ConnectionManager.java:1324)
*/
conf.set("hbase.zookeeper.quorum", "localhost:5181");
conf.set("hbase.master", "localhost:60000");
conf.set("hbase.zookeeper.property.clientPort", "5181");
conf.set("hbase.rootdir", "maprfs:///hbase");

conf.set("username", "mapr");
conf.set("password", "mapr");
System.out.println("getting connection");
connection = ConnectionFactory.createConnection(conf);
System.out.println("connection populated");
table = connection.getTable(TABLE_NAME);

JavaInputDStream<ConsumerRecord<String, UserMessage>> stream = KafkaUtils
.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, UserMessage> Subscribe(topics,
kafkaParams));

// stream.mapToPair(record -> new Tuple2<>(record.key(),
// record.value())).foreachRDD(System.out::println);

// stream.mapToPair(record -> new Tuple2<>(record.key(),
// record.value())).print();
stream.map(record -> print(record.key(), record.value())).print();
;

jssc.start();
table.close();
// jssc.close();
jssc.awaitTermination();
connection.close();

}

// private static final TableName TABLE_NAME =
// TableName.valueOf("/user/mapr/zonedatastream");
// private static final TableName TABLE_NAME =
// TableName.valueOf("/user/mapr/zonedatastream");
// private static final byte[] CF = Bytes.toBytes("cf1");

private static final TableName TABLE_NAME = TableName.valueOf("test_log");
private static final byte[] CF = Bytes.toBytes("name");
private static Table table;

// Storing it to hbase
private static Object print(String key, UserMessage value) throws IOException,
InterruptedException {

// System.out.println("inside print");
// System.out.println("key ->" + key);
// System.out.println("value ->" + value);
// Table table = connection.getTable(TABLE_NAME);
// describe the data we want to write.
Put p = new Put(Bytes.toBytes(key));
p.addColumn(CF, Bytes.toBytes("messagekey"), Bytes.toBytes(value.getKey()));
p.addColumn(CF, Bytes.toBytes("value"), Bytes.toBytes(value.getValue()));
table.put(p);
return key;
}

}

No comments:

Post a Comment

Websphere Dummy certificate expired - DummyServerKeyFile.jks , DummyServerTrustFile.jks

If you faced issue with ibm provided dummy certificate expired just like us and looking for the solution.  This blog is for you.  You can re...