Serialization is the process of converting an object into a stream of bytes 
Deserialization is the process of converting stream of bytes to an object.
Remember Kafka stores message as array of bytes
To create a serializer class, we need to implement below interface
org.apache.kafka.common.serialization.Serializer
To create a deserializer class, we need to implement below interface  org.apache.kafka.common.serialization.Deserializer 
  
  
  
Deserialization is the process of converting stream of bytes to an object.
Remember Kafka stores message as array of bytes
To create a serializer class, we need to implement below interface
org.apache.kafka.common.serialization.Serializer
public interface Serializer extends Closeable {
//startup with configuration
  void configure(Map<String, ?> var1, boolean var2);
  byte[] serialize(String var1, T var2);
//method is called when the Kafka session is to be closed 
 void close();
}
public interface Deserializer extends Closeable {
//startup with configuration.
  void configure(Map<String, ?> var1, boolean var2);
  T deserialize(String var1, byte[] var2);
//method is called when the Kafka session is to be closed
  void close();
}
Its preety simple to implement so lets directly start with example - here I am creating 
1. UserMessage class, 
2. writing its serializer 
3. deserializer
4. using serializer in producer to convert object to byte stream and sending it to kafka
5. using deserializer in comsumer toget the object back from byte stream
 
-- UserMesasge.java-----
package com.valassis.serialization; public class UserMessage { private String key; private String value; public UserMessage() { } public String getKey() { return key; } public void setKey(String key) { this.key = key; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } public UserMessage(String key, String value) { this.key = key; this.value = value; } @Override public String toString() { return "UserMessage(" + key + "=>" + value + ")"; } }
---UserMessageSerializer.java
package com.valassis.serialization;
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import org.codehaus.jackson.map.ObjectMapper;
public class UserMessageSerializer implements Serializer<UserMessage> {
 @Override
 public void close() { 
 }
 @Override
 public void configure(Map<String, ?> configs, boolean isKey) {
  // TODO Auto-generated method stub
 }
 @Override
 public byte[] serialize(String topic, UserMessage data) {
  byte[] retVal = null;
  ObjectMapper objectMapper = new ObjectMapper();
  try {
   retVal = objectMapper.writeValueAsString(data).getBytes();
  } catch (Exception e) {
   e.printStackTrace();
  }
  return retVal;
 }
}
---UserMessageDeserializer.java
package com.valassis.serialization;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.codehaus.jackson.map.ObjectMapper;
public class UserMessageDeserializer implements Deserializer<UserMessage> {
 @Override
 public void configure(Map<String, ?> configs, boolean isKey) {
  // TODO Auto-generated method stub
 }
 @Override
 public UserMessage deserialize(String topic, byte[] data) {
     ObjectMapper mapper = new ObjectMapper();
     UserMessage userMessage = null;
     try {
      userMessage = mapper.readValue(data, UserMessage.class);
     } catch (Exception e) {
       e.printStackTrace();
     }
     return userMessage;
 }
 @Override
 public void close() {
  // TODO Auto-generated method stub
 }
 }
----ZoneDataSerializerProducer.java
package com.valassis.msg.producer; import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import com.valassis.msg.generation.ZoneDataMessageCreationLocalFS; import com.valassis.msg.generation.ZoneDataMessageCreationSerializerLocalFS; import com.valassis.serialization.UserMessage; public class ZoneDataSerializerProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "com.valassis.serialization.UserMessageSerializer"); props.put("request.required.acks", "1"); props.put("client.id", "test"); Producer<String, UserMessage> producer = new KafkaProducer<String, UserMessage>( props); ZoneDataMessageCreationSerializerLocalFS messages = new ZoneDataMessageCreationSerializerLocalFS(); Map<String, UserMessage> messageMap = messages.getZoneData(); System.out.println(messageMap.size()); messageMap.forEach((key, value) -> { producer.send(new ProducerRecord<String, UserMessage>("testtopic", key , value)); }); producer.close(); } }
--- SparkStreamingDeserializerConsumer .java
package com.valassis.msg.consumer;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import com.valassis.serialization.UserMessage;
import com.valassis.serialization.UserMessageDeserializer;
public class SparkStreamingDeserializerConsumer {
 static Connection connection;
 public static void main(String[] args) throws InterruptedException,
   IOException {
  SparkConf sparkConf = new SparkConf()
    .setAppName("Pawan-JavaKafkaMessageConsumer");
  Map<String, Object> kafkaParams = new HashMap<>();
  kafkaParams.put("bootstrap.servers", "localhost:9092");
  kafkaParams.put("key.deserializer", StringDeserializer.class);
  kafkaParams.put("value.deserializer", UserMessageDeserializer.class);
  kafkaParams.put("group.id", "1");
  kafkaParams.put("auto.offset.reset", "latest");
  kafkaParams.put("enable.auto.commit", false);
  Collection<String> topics = Arrays.asList("testtopic");
  // Create the context with 20 second interval batch size
  JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
    new Duration(20000));
  Configuration conf = HBaseConfiguration.create();
  /*
   * /user/mapr/zonedatastream 17/07/31 16:36:41 ERROR AsyncProcess:
   * Failed to get region location
   * org.apache.hadoop.hbase.TableNotFoundException:
   * /user/mapr/zonedatastream at org.apache.hadoop.hbase.client.
   * ConnectionManager$HConnectionImplementation
   * .locateRegionInMeta(ConnectionManager.java:1324)
   */
  conf.set("hbase.zookeeper.quorum", "localhost:5181");
  conf.set("hbase.master", "localhost:60000");
  conf.set("hbase.zookeeper.property.clientPort", "5181");
  conf.set("hbase.rootdir", "maprfs:///hbase");
  conf.set("username", "mapr");
  conf.set("password", "mapr");
  System.out.println("getting connection");
  connection = ConnectionFactory.createConnection(conf);
  System.out.println("connection populated");
  table = connection.getTable(TABLE_NAME);
  JavaInputDStream<ConsumerRecord<String, UserMessage>> stream = KafkaUtils
    .createDirectStream(jssc,
      LocationStrategies.PreferConsistent(),
      ConsumerStrategies.<String, UserMessage> Subscribe(topics,
        kafkaParams));
  // stream.mapToPair(record -> new Tuple2<>(record.key(),
  // record.value())).foreachRDD(System.out::println);
  // stream.mapToPair(record -> new Tuple2<>(record.key(),
  // record.value())).print();
  stream.map(record -> print(record.key(), record.value())).print();
  ;
  jssc.start();
  table.close();
  // jssc.close();
  jssc.awaitTermination();
  connection.close();
 }
 // private static final TableName TABLE_NAME =
 // TableName.valueOf("/user/mapr/zonedatastream");
 // private static final TableName TABLE_NAME =
 // TableName.valueOf("/user/mapr/zonedatastream");
 // private static final byte[] CF = Bytes.toBytes("cf1");
 private static final TableName TABLE_NAME = TableName.valueOf("test_log");
 private static final byte[] CF = Bytes.toBytes("name");
 private static Table table;
 // Storing it to hbase
 private static Object print(String key, UserMessage value) throws IOException,
   InterruptedException {
  // System.out.println("inside print");
  // System.out.println("key ->" + key);
  // System.out.println("value ->" + value);
  // Table table = connection.getTable(TABLE_NAME);
  // describe the data we want to write.
  Put p = new Put(Bytes.toBytes(key));
  p.addColumn(CF, Bytes.toBytes("messagekey"), Bytes.toBytes(value.getKey()));
  p.addColumn(CF, Bytes.toBytes("value"), Bytes.toBytes(value.getValue()));
  table.put(p);
  return key;
 }
}
 
 
 
 
No comments:
Post a Comment