package com.valassis.serialization;
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import org.codehaus.jackson.map.ObjectMapper;
public class UserMessageSerializer implements Serializer<UserMessage> {
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub
}
@Override
public byte[] serialize(String topic, UserMessage data) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
retVal = objectMapper.writeValueAsString(data).getBytes();
} catch (Exception e) {
e.printStackTrace();
}
return retVal;
}
}
---UserMessageDeserializer.java
package com.valassis.serialization;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.codehaus.jackson.map.ObjectMapper;
public class UserMessageDeserializer implements Deserializer<UserMessage> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub
}
@Override
public UserMessage deserialize(String topic, byte[] data) {
ObjectMapper mapper = new ObjectMapper();
UserMessage userMessage = null;
try {
userMessage = mapper.readValue(data, UserMessage.class);
} catch (Exception e) {
e.printStackTrace();
}
return userMessage;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}
package com.valassis.msg.consumer;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import com.valassis.serialization.UserMessage;
import com.valassis.serialization.UserMessageDeserializer;
public class SparkStreamingDeserializerConsumer {
static Connection connection;
public static void main(String[] args) throws InterruptedException,
IOException {
SparkConf sparkConf = new SparkConf()
.setAppName("Pawan-JavaKafkaMessageConsumer");
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", UserMessageDeserializer.class);
kafkaParams.put("group.id", "1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("testtopic");
// Create the context with 20 second interval batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
new Duration(20000));
Configuration conf = HBaseConfiguration.create();
/*
* /user/mapr/zonedatastream 17/07/31 16:36:41 ERROR AsyncProcess:
* Failed to get region location
* org.apache.hadoop.hbase.TableNotFoundException:
* /user/mapr/zonedatastream at org.apache.hadoop.hbase.client.
* ConnectionManager$HConnectionImplementation
* .locateRegionInMeta(ConnectionManager.java:1324)
*/
conf.set("hbase.zookeeper.quorum", "localhost:5181");
conf.set("hbase.master", "localhost:60000");
conf.set("hbase.zookeeper.property.clientPort", "5181");
conf.set("hbase.rootdir", "maprfs:///hbase");
conf.set("username", "mapr");
conf.set("password", "mapr");
System.out.println("getting connection");
connection = ConnectionFactory.createConnection(conf);
System.out.println("connection populated");
table = connection.getTable(TABLE_NAME);
JavaInputDStream<ConsumerRecord<String, UserMessage>> stream = KafkaUtils
.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, UserMessage> Subscribe(topics,
kafkaParams));
// stream.mapToPair(record -> new Tuple2<>(record.key(),
// record.value())).foreachRDD(System.out::println);
// stream.mapToPair(record -> new Tuple2<>(record.key(),
// record.value())).print();
stream.map(record -> print(record.key(), record.value())).print();
;
jssc.start();
table.close();
// jssc.close();
jssc.awaitTermination();
connection.close();
}
// private static final TableName TABLE_NAME =
// TableName.valueOf("/user/mapr/zonedatastream");
// private static final TableName TABLE_NAME =
// TableName.valueOf("/user/mapr/zonedatastream");
// private static final byte[] CF = Bytes.toBytes("cf1");
private static final TableName TABLE_NAME = TableName.valueOf("test_log");
private static final byte[] CF = Bytes.toBytes("name");
private static Table table;
// Storing it to hbase
private static Object print(String key, UserMessage value) throws IOException,
InterruptedException {
// System.out.println("inside print");
// System.out.println("key ->" + key);
// System.out.println("value ->" + value);
// Table table = connection.getTable(TABLE_NAME);
// describe the data we want to write.
Put p = new Put(Bytes.toBytes(key));
p.addColumn(CF, Bytes.toBytes("messagekey"), Bytes.toBytes(value.getKey()));
p.addColumn(CF, Bytes.toBytes("value"), Bytes.toBytes(value.getValue()));
table.put(p);
return key;
}
}