Thursday, October 8, 2015

MR Unit

You can test Mapper and reducer using MRUnit. Its the framework build on Junit so its straight forward if you have worked with Junit.

MRUnit helps bridge the gap between MapReduce programs and JUnit by providing a set of interfaces and test harnesses, which allow MapReduce programs to be more easily tested using standard tools and practices.

To write your test you would:
  1. Instantiate an instance of the MapDriver class parameterized exactly as the mapper under test.
  2. Add an instance of the Mapper you are testing in the withMapper call.
  3. In the withInput call pass in your key and input value, in this case a LongWritable with an arbitrary value and a Text object that contains a line from from the NCDC weather dataset contained in a String array called ‘temps’ that was set up earlier in the test (not displayed here as it would take away from the presentation).
  4. Specify the expected output in the withOutput call, here we are expecting a Text object with the value of “190101” and a TemperatureAveragingPair object containing the values -61 (temperature) and a 1 (count).
  5. The last call runTest feeds the specified input values into the mapper and compares the actual output against the expected output set in the ‘withOutput’ method.

A map/reduce pair can be tested using MRUnit’s MapReduceDriver.

 
Code Example:

package com.valassis.mrunit;


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

public class MRTester {

//Specification of Mapper
MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
//Specification of Reduce
ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
//Specification of MapReduce program
MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;

@Before
public void setUp() {
MaxTemperatureMapper mapper = new MaxTemperatureMapper();
MaxTemperatureReducer reducer = new MaxTemperatureReducer();
//Setup Mapper
mapDriver = MapDriver.newMapDriver(mapper);
//Setup Reduce
reduceDriver = ReduceDriver.newReduceDriver(reducer);
//Setup MapReduce job
mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
}

@Test
public void testMapper() {
//Test Mapper with this input
mapDriver.withInput(new LongWritable(), new Text("a a a a a a a"));
//Expect this output
mapDriver.withOutput(new Text("a"), new IntWritable(1));
 try {
//Run Map test with above input and ouput
mapDriver.runTest();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

@Test
public void testReducer() {
List<IntWritable> values = new ArrayList<IntWritable>();
values.add(new IntWritable(1));
/*
values.add(new IntWritable(1));
values.add(new IntWritable(1));
values.add(new IntWritable(1));
values.add(new IntWritable(1));
values.add(new IntWritable(1));
values.add(new IntWritable(1));
*/
//Run Reduce with this input
reduceDriver.withInput(new Text("a"), values);
//Expect this output
reduceDriver.withOutput(new Text("a"), new IntWritable(1));
try {
//Run Reduce test with above input and ouput
reduceDriver.runTest();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

Thursday, September 24, 2015

File compression Using Codec in MapReduce



 File compression brings two major benefits: 

  • it reduces the space needed to store files, and 
  • it speeds up data transfer across the network or to or from disk.

If the input file is compressed, then the bytes read in from HDFS is reduced, which means less time to read data. This time conservation is beneficial to the performance of job execution.

If the input files are compressed, they will be decompressed automatically as they are read by MapReduce, using the filename extension to determine which codec to use. For example, a file ending in .gz can be identified as gzip-compressed file and thus read with GzipCodec.


Compression has some tradeoffs too. Compressed data is not splittable. so only one map is going to read gzip file in hadoop map-reduce. Only bzip2 support splitting with compression.

Here is the code to compress the file/input in map-reduce.


  • Register a codec
       Class<?> codecClass =       Class.forName(codecClassName);
  • create a instance of codec       
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
  • create compression output stream to write       
//CompressionOutputStream out = codec.createOutputStream(System.out);
CompressionOutputStream outFile = codec.createOutputStream(fs.create(new Path(args[1])));


Code to compression the file

package com.valassis.codec;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;

public class SteamCompression {

    /**
     * @param args
     * @throws ClassNotFoundException
     * @throws IOException
     */
    public static void main(String[] args) throws ClassNotFoundException, IOException {
        // TODO Auto-generated method stub
        String codecClassName = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Class<?> codecClass =       Class.forName(codecClassName);
        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
        CompressionOutputStream outFile = codec.createOutputStream(fs.create(new Path(args[1])));
        IOUtils.copyBytes(System.in, outFile, 4096,false);
        outFile.finish();
    
       
    }

}

Reading and Writing the files in HDFS


 Reading and Writing the files in HDFS


Reading and Writing the file in HDFS is same as Java. You need to only know the Hdfs specific apis. The only difference is HDFS uses org.apache.hadoop.fs.Path object than File object (java.io).

FSDataOutputStream and FSDataInputStream classes are used for writing the files and reading it.

FileSystem open() returns  FSDataInputStream to read the data from HDFS

FileSystem create() returns  FSDataOutputStream to write the data to HDFS

FileStatus class is used to get the status of the  file. its ownership, creation time, replication etc.

Below Example read the configuration file. The program need a configure file as a input.


package com.valassis.io;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class FileRead {
    public static void main(String[] args) throws IOException {
       
        String url = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        FSDataInputStream in = fs.open(new Path(url));        IOUtils.copyBytes(in, System.out, conf);
        IOUtils.closeStream(in);
       
    }

}


Below Example write the file into HDFS. The program need a file name to create as input.



package com.valassis.io;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class FileWrite {

    public static void main(String[] args) throws IOException {
        String url = args[0];
        Configuration conf = new Configuration();
       
        FileSystem fs = FileSystem.get(conf);
        FSDataOutputStream out = fs.create(new Path(url));
     
   out.writeChars("this is the file written from fs.create");
        out.close();
       
    }
}


Below Example gets the file status of HDFS file/directory. The program need a file name to get the status as input.


package com.valassis.io;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.tools.GetConf;

public class FileStatus_Filter {

    /**
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        // TODO Auto-generated method stub

        Path path = new Path(args[0]);
       
        FileSystem fs = FileSystem.get(new Configuration());
        //listStatus takes relative and absolute path of file or directory
        //hadoop jar FileStatus_Filter.jar hdfs://localhost.localdomain:8020/user/hdfs/output*
        //cause error as output* didnt exist but globStatus will return output,output1 and output2
//        FileStatus[] fstatus = fs.listStatus(path, new PathFilter() {
        FileStatus[] fstatus = fs.globStatus(path, new PathFilter() {
                   
            @Override
            public boolean accept(Path arg0) {
                // TODO Auto-generated method stub
                if(arg0.toString().contains(".staging"))
                return false;
                return true;
            }
        });
        for(FileStatus tempfs : fstatus)
        {
            System.out.println(tempfs.getPath().toString());
           
            System.out.println(tempfs.getAccessTime());
            System.out.print(tempfs.getBlockSize());
            System.out.print(tempfs.getGroup());
            System.out.print(tempfs.getLen());
            System.out.print(tempfs.getModificationTime());
            System.out.print(tempfs.getOwner());
            System.out.print(tempfs.getPermission());
            System.out.print(tempfs.getReplication());

        }
       
    }

}

Friday, September 18, 2015

Home


CCHD -  Cloudera Certified Hadoop Developer - CDH5


I would like to share some tips on preparing for the cloudera certification. Mostly importantly cloudera exam is now on CDH Version 5. Please note that you should have hands-on experience on CDH5 and also have good exposure to MapReduce programming/design patterns else consider yourself half prepared.  30-40% Questions in the exam were scenario based and Map Reduce programming and Sqool, Hive or File system commands options.  

I suggest to download and install the vm on your machine and practice
http://www.cloudera.com/content/cloudera/en/downloads/quickstart_vms/cdh-5-4-x.html

If you are very serious about the Hadoop certification, I highly recommend the Cloudera Developer Training for Apache Hadoop

Material referred during the preparation

  • Hadoop Definitive Guide(4th Edition) by Tom White
  • MapReduce Design Pattern by Donald Miner
  • Good understanding of Sqoop1.4, Sqoop2 and Hive is required for passing the exam. Overview of PigOozieFlume, Avro,Crunch is required as you may see few questions in the exam. 
  • An overview of an Eco System can be referred via link below http://hadoopecosystemtable.github.io/
  • https://developer.yahoo.com/hadoop/tutorial/
  • Understand HDFS commands with various options.  
  • http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html
  • Deep understanding of classic map reduce and YARN Architecture, replicate and re-partition Joins and Sorting etc. 
  • You should be good at core java, IO or NIO and regular expressions
  • knowledge of available input/output format and you should be able to create your own input/output formats
  • Custom Writable,WritableComaparable,RawComparator
  • Should be comfortable in writing map reduce job given a complex SQL queries(HiveQL)
Here is my certificate:



Sunday, September 13, 2015

Spark Scala basics


Scala  - 


High level language for JVM. Its Object oriented + functional programming. Java is catching up by introducing Lambda expressions in Java8.

It has type inferred feature and can be use to call or get called from Java code.

Scala is simple to use. most often while working you declare variables and define functions which operate on those variables.  Scala has very simple in both the aspects and most of the type you dont have to care about the type of the variable while using Scala

Declaring Variable in Scala

// declaring a variable as int
var x: Int = 7
// declaring a variable. Scala will infer the type at runtime
var x = 7
// declaring a read/only or final variable
val x = 7


Function Definition is Scala is more like C++/script language type


Syntax -

def functionname( param: type) : return type = body of the function

so defining square function for int as below

def square(x: Int) : Int = x*x

the other way if you have multi-line function

def square(x: Int) : Int = {
x*x
}

You can write a functions as
(x: Int) => x*x

all the three syntax will give same result.



You detailed overview I suggest to follow the link - http://www.scala-lang.org/



Saturday, September 12, 2015

Spark RDD




Spark RDD - 
Spark is build on the concept of RDD. RDD(Resilient Distributed Dataset) is Spark's primary data abstraction. Its a fault tolerant collection of elements that can be operated on parallel. They are immutable, so once it is created, it cannot be changed. They are the building blocks of spark. RDD are distributed across the worker nodes and operated in parallel..
There are three ways in which RDD are created
  • Parallelizing an existing collection
    • val data = 1 to 1000
    • val dataRDD = sc.parallelize(data) // sc is spark context
  • Referencing an existing dataset from HDFS, HBase etc. 
    • val testFile = sc.textFile("hdfs://testFile")
  • Performing transformation to existing RDD.
    • val filteredRDD = dataRDD.filter()
Types of files supported by Spark RDDs
  • Text Files
  • Sequence Files
  • Hadoop Input Formats
sc.hadoopFile(keyClass, valClass, inputformat, conf)
There are two types of RDD operations


  • transformations 
    • lazily evaluated.
    • A transformed RDD is executed only when an action runs on it.
    • can also persist, or cache RDDs in memory or on disk.
  • actions
    • cause Spark to execute the recipe to transform the source data. 
    • they kick off the job on the cluster

RDD are created when the action operations is performed on a direct acyclic graph(RDD) which is formed using parallelize method or sc.testFile(). 

when we applied transformation(operation) to the RDD only DAG is adjusted in memory and it returns a pointer to the updated RDD or RDD DAG to be precise. Action (operation) is operated on DAG to return a value (resultant RDD). 


Transformations

The following table lists some of the common transformations supported by Spark. 
TransformationMeaning
map(func)Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func)Return a new dataset formed by selecting those elements of the source on which funcreturns true.
flatMap(func)Similar to map, but each input item can be mapped to 0 or more output items (so funcshould return a Seq rather than a single item).
mapPartitions(func)Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func)Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
sample(withReplacementfractionseed)Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset)Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset)Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numTasks]))Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKey(func, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOpcombOp, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks])When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin,rightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.
cartesian(otherDataset)When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command[envVars])Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions)Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions)Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner)Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

Actions

The following table lists some of the common actions supported by Spark.

ActionMeaning
reduce(func)Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect()Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count()Return the number of elements in the dataset.
first()Return the first element of the dataset (similar to take(1)).
take(n)Return an array with the first n elements of the dataset.
takeSample(withReplacement,num, [seed])Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n[ordering])Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path)Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path)
(Java and Scala)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path)
(Java and Scala)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
countByKey()Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func)Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for m




Spark Program Structure and Execution

Program Execution in Spark

Spark Context - A Spark context object (sc) is the main entry point for Spark functionality. A Spark context can be used to create Resilient Distributed Datasets (RDDs) on a cluster. When running Spark, you start a new Spark application by creating a Spark Context. When the SparkContext is created, it asks the master for some cores to use to do work. The master sets these cores aside. 
Spark cluster components


A Spark program consists of two programs, a driver program and a workers program.Drivers program runs on the driver machine and Worker programs run on cluster nodes(Cluster managers like Yarn or Mesos) or in local threads( Spark's standalone cluster manager). Driver programs access Spark through a SparkContext object, which represents a connection to a computing cluster.  
There are several useful things to note about this architecture:

  1. Each application gets its own executor processes, which stay up for the duration of the whole application and run tasks in multiple threads. 
  2. This has the benefit of isolating applications from each other, on both the scheduling side (each driver schedules its own tasks) and executor side (tasks from different applications run in different JVMs). However, it also means that data cannot be shared across different Spark applications (instances of SparkContext) without writing it to an external storage system.
  3. Spark is agnostic to the underlying cluster manager. As long as it can acquire executor processes, and these communicate with each other, it is relatively easy to run it even on a cluster manager that also supports other applications (e.g. Mesos/YARN).
  4. The driver program must listen for and accept incoming connections from its executors throughout its lifetime. The driver program must be network addressable from the worker nodes.
  5. Because the driver schedules tasks on the cluster, it should be run close to the worker nodes, preferably on the same local area network. If you’d like to send requests to the cluster remotely, it’s better to open an RPC to the driver and have it submit operations from nearby than to run a driver far away from the worker nodes.

Spark Program Structure
  1. create RDDs from some external data source or parallelize a collection in your driver program.
  2. lazily transform these RDDs into new RDDs.
  3. cache some of those RDDs for future reuse.
  4. perform actions to execute parallel computation and to produce results.
lines = sc.textFile("logfile.log") // point 1

val errors = lines.filter(_.startWith("ERROR")) //point 2

errors.cache() // point 3

var sqlError = errors.filter(_.contains("sql")) // perform more transformation as needed 

sqlError.count() //point 4

var mqError = errors.filter(_.contains("MQ")) // perform more transformation as needed

mqError.count() //point 4


Goal of above program identify the  errors in the log files and count them by sql error and MQ errors. When you load the data, the data is partitioned in different blocks on the cluster. Driver sent the code to the worker nodes to to be executed on each block. In this examples the various transformation and actions are sent to the worker node for execution.


Executors on each worker is going to perform the task. Executors read the data from HDFS to prepare the data and perform the transformations in parallel. 

After a series of transformation you want to cache the result up until the point in memory, a result is cached until that point. after the first action is complete the results are sent back to the driver. in this case the result of sql error count is return to the driver. 

For the second action, Spark reuse the cached data and perform the operation on it and return the result.  


Tuesday, September 8, 2015

Spark Introduction


Apache Spark is a tool to analyze big data quickly and efficiently. Its a computing framework for distributed processing. big data computation is all about memory and computation(speed and time)

MapReduce started a decade ago. It is ideal for batch programming but it didn't fit in many uses cases so this spans many different systems(interactive tools, Streaming tools etc) to handle specialized use cases in the system. Its not only difficult to manage multiple systems but also its has many restrictions on tool support.

Spark combines batch processing, interactive and Steaming api and also support ML algos.

Spark's in-memory primitives provides performance up to 100 faster for certain applications. By allowing user programs to load data into a cluster's memory and query it repeatedly, Spark is well-suited to machine learning algorithms 


The Spark project consists of multiple components.
Spark Core and Resilient Distributed Datasets
Spark Core is the foundation of the overall project. It provides distributed task dispatching, scheduling, and basicI/O functionalities. The fundamental programming abstraction is called Resilient Distributed Datasets (RDDs), a logical collection of data partitioned across machines. RDDs can be created by referencing datasets in external storage systems, or by applying transformation on existing RDDs.
The RDD abstraction is exposed through a language-integrated API in Java, Python, Scala, and R similar to local, in-process collections. This simplifies programming complexity because the way applications manipulate RDDs is similar to manipulating local collections of data.
Spark SQL
Spark sql is a component on top of Spark Core that introduces a new data abstraction called DataFrames, which provides support for structured and semi-structured data. Spark SQL provides a domain-specific language to manipulate SchemaRDDs in Scala, Java, or Python. It also provides SQL language support, with command-line interfaces and ODBC/JDBC server. prior to version 1.3 of Spark, DataFrames were referred to as SchemaRDDs.

Spark Streaming
Spark Streaming leverages Spark Core's fast scheduling capability to perform streaming analytics. It ingests data in mini-batches and performs RDD transformations on those mini-batches of data. This design enables the same set of application code written for batch analytics to be used in streaming analytics, on a single engine.

MLlib Machine Learning Library
Spark MLlib is a distributed machine learning framework on top of Spark Core that, due in large part of the distributed memory-based Spark architecture, is as much as nine times as fast as the disk-based implementation used by Apache Mahout (according to benchmarks done by the MLlib developers against the Alternating Least Squares (ALS) implementations, and before Mahout itself gained a Spark interface. Many common machine learning and statistical algorithms have been implemented and are shipped with MLlib which simplifies large scale machine learning pipelines, including:
  1. summary statistics, correlations, stratified sampling, hypothesis testing, random data generation
  2. classification and regression: state vector machines, logistic regression, linear regression, decision trees,naive Bayes classification
  3. collaborative filtering techniques including alternating least squares (ALS)
  4. cluster analysis methods including k-means, and Latent Dirichlet Allocation (LDA)
GraphX
GraphX is a distributed graph processing framework on top of Spark. It provides an API for expressing graph computation that can model the Pregel abstraction. 

Like Spark, GraphX initially started as a research project at UC Berkeley's AMPLab and Databricks, and was later donated to the the Apache Software Foundation and the Spark project.


Word Count problem with MapReduce


What is MapReduce?

It is a way to process large data sets by distributing the work across a large number of nodes.
Prior to executing the Mapper function, the master node partitions the input into smaller subproblems
which are then distributed to worker nodes.

The JobTracker runs on the master node, and a TaskTracker works on each data (or worker)
node. Worker nodes may themselves act as master nodes in that they in turn may partition the
sub-problem into even smaller sub-problems.

In the Reduce step the master node takes the answers from all of the Mapper sub-problems and
combines them in such as way as to get the output that solves the problem.

The term MapReduce actually refers to the following two different tasks that Hadoop programs perform:
  • The Map Task: This is the first task, which takes input data and converts it into a set of data, where individual elements are broken down into tuples (key/value pairs).
  • The Reduce Task: This task takes the output from a map task as input and combines those data tuples into a smaller set of tuples. The reduce task is always performed after the map task.


  1. During a MapReduce job, Hadoop sends the Map and Reduce tasks to the appropriate servers in the cluster.
  2. The framework manages all the details of data-passing such as issuing tasks, verifying task completion, and copying data around the cluster between the nodes.
  3. Most of the computing takes place on nodes with data on local disks that reduces the network traffic.
  4. After completion of the given tasks, the cluster collects and reduces the data to form an appropriate result, and sends it back to the Hadoop server.


WordCount.java


3. import java.io.IOException;
4. import java.util.*;
5.
6. import org.apache.hadoop.fs.Path;
7. import org.apache.hadoop.conf.*;
8. import org.apache.hadoop.io.*;
9. import org.apache.hadoop.mapred.*;
10. import org.apache.hadoop.util.*;
11.
12. public class WordCount {
13.
14. public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
15. private final static IntWritable one = new IntWritable(1);
16. private Text word = new Text();
17.
18. public void map(LongWritable key, Text value, OutputCollector<Text,
IntWritable> output, Reporter reporter) throws IOException {
19. String line = value.toString();
20. StringTokenizer tokenizer = new StringTokenizer(line);
21. while (tokenizer.hasMoreTokens()) {
22. word.set(tokenizer.nextToken());
23. output.collect(word, one);
24. }
25. }
26. }
27.
28. public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
29. public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws
IOException {
2
30. int sum = 0;
31. while (values.hasNext()) {
32. sum += values.next().get();
33. }
34. output.collect(key, new IntWritable(sum));
35. }
36. }
37.
38. public static void main(String[] args) throws Exception {
39. JobConf conf = new JobConf(WordCount.class);
40. conf.setJobName("wordcount");
41.
42. conf.setOutputKeyClass(Text.class);
43. conf.setOutputValueClass(IntWritable.class);
44.
45. conf.setMapperClass(Map.class);
46. conf.setCombinerClass(Reduce.class);
47. conf.setReducerClass(Reduce.class);
48.
49. conf.setInputFormat(TextInputFormat.class);
50. conf.setOutputFormat(TextOutputFormat.class);
51.
52. FileInputFormat.setInputPaths(conf, new Path(args[0]));
53. FileOutputFormat.setOutputPath(conf, new Path(args[1]));
54.
55. JobClient.runJob(conf);
57. }
58. }
3

Word Count Code Walk-through

The WordCount application is quite straight-forward.
The Mapper implementation (lines 14-26), via the map method (lines 18-25), processes
one line at a time, as provided by the specified TextInputFormat (line 49). It then splits
the line into tokens separated by whitespaces, via the StringTokenizer, and emits a keyvalue
pair of
< <word>, 1>.
For the given sample input the first map emits:
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>
The second map emits:
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>
We'll learn more about the number of maps spawned for a given job, and how to control
them in a fine-grained manner, a bit later in the tutorial.
WordCount also specifies a combiner (line 46). Hence, the output of each map is passed
through the local combiner (which is same as the Reducer as per the job configuration)
for local aggregation, after being sorted on the keys.
The output of the first map:
< Bye, 1>
< Hello, 1>
< World, 2>
The output of the second map:
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>
The Reducer implementation (lines 28-36), via the reduce method (lines 29-35) just sums
up the values, which are the occurence counts for each key (i.e. words in this example).
Thus the output of the job is:
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>
The run method specifies various facets of the job, such as the input/output paths (passed
via the command line), key/value types, input/output formats etc., in the JobConf. It then
calls the JobClient.runJob (line 55) to submit the and monitor its progress.

Usage: hadoop job [GENERIC_OPTIONS]
The following are the Generic Options available in a Hadoop job.
GENERIC_OPTIONSDescription
-submit <job-file>Submits the job.
status <job-id>Prints the map and reduce completion percentage and all job counters.
counter <job-id> <group-name> <countername>Prints the counter value.
-kill <job-id>Kills the job.
-events <job-id> <fromevent-#> <#-of-events>Prints the events' details received by jobtracker for the given range.
-history [all] <jobOutputDir> - history < jobOutputDir>Prints job details, failed and killed tip details. More details about the job such as successful tasks and task attempts made for each task can be viewed by specifying the [all] option.
-list[all]Displays all jobs. -list displays only jobs which are yet to complete.
-kill-task <task-id>Kills the task. Killed tasks are NOT counted against failed attempts.
-fail-task <task-id>Fails the task. Failed tasks are counted against
failed attempts.
set-priority <job-id> <priority>Changes the priority of the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW

Monday, September 7, 2015

HDFS Configurations and Commands notes

Misc

key
value
example
fs.default.name
protocol://servername:port
hdfs://alpha.milkman.org:9000
dfs.data.dir
pathname
/home/username/hdfs/data
dfs.name.dir
pathname
/home/username/hdfs/name

hadoop-site.xml file for a single-node configuration:
<configuration>
--namenode url, port and hostname
  <property>
    <name>fs.default.name</name>
    <value>hdfs://your.server.name.com:9000</value>
  </property>
--datanode store the data

  <property>
    <name>dfs.data.dir</name>

    <value>/home/username/hdfs/data</value>
  </property>
--namenode store the metadata

  <property>
    <name>dfs.name.dir</name>

    <value>/home/username/hdfs/name</value>
  </property>
</configuration>

we must format the file system that we just configured:
  user@namenode:hadoop$ bin/hadoop namenode -format
This process should only be performed once. When it is complete, we are free to start the distributed file system:
  user@namenode:hadoop$ bin/start-dfs.sh
This command will start the NameNode server on the master machine (which is where the start-dfs.shscript was invoked). It will also start the DataNode instances on each of the slave machines. In a single-machine "cluster," this is the same machine as the NameNode instance. On a real cluster of two or more machines, this script will ssh into each slave machine and start a DataNode instance.

Command:
Assuming:
Outcome:
bin/hadoop dfs -put foo bar
No file/directory named/user/$USER/bar exists in HDFS
Uploads local file foo to a file named/user/$USER/bar
bin/hadoop dfs -put foo bar
/user/$USER/bar is a directory
Uploads local file foo to a file named/user/$USER/bar/foo
bin/hadoop dfs -put foo somedir/somefile
/user/$USER/somedirdoes not exist in HDFS
Uploads local file foo to a file named/user/$USER/somedir/somefile, creating the missing directory
bin/hadoop dfs -put foo bar
/user/$USER/bar is already a file in HDFS
No change in HDFS, and an error is returned to the user.
When the put command operates on a file, it is all-or-nothing.


Command line reference -> I found this pdf on net. it looks pretty organized so that's

https://images.linoxide.com/hadoop-hdfs-commands-cheatsheet.pdf


Can be used for multiple files

Another synonym for -put is -copyFromLocal. The syntax and functionality are identical.
-get, -cat, -ls, -du(disk usage), -mv, -cp, rm, -rmr
-moveFromLocallocalSrc dest
Copies the file or directory from the local file system identified by localSrc to dest within HDFS, then deletes the local copy on success.
-getmerge srclocalDest[addnl]
Retrieves all files that match the path src in HDFS, and copies them to a single, merged file in the local file system identified by localDest.

-setrep [-R] [-w]rep path
Sets the target replication factor for files identified by path to rep. (The actual replication factor will move toward the target over time)

-touchz path
Creates a file at path containing the current time as a timestamp. Fails if a file already exists at path, unless the file is already size 0.
-help cmd
Returns usage information for one of the commands listed above. You must omit the leading '-' character in cmd


Shutdown hdfs
someone@namenode:hadoop$ bin/stop-dfs.sh

DFSADMIN COMMAND
State of the namenode metadata
dfsadmin -metasave filename
 dfsadmin -safemode what
safemode based on the value of what, described below:
·         enter - Enters safemode
·         leave - Forces the NameNode to exit safemode
·         get - Returns a string indicating whether safemode is ON or OFF
·         wait - Waits until safemode has exited and returns


Changing HDFS membership
dfsadmin  -refreshNodes
Upgrading HDFS versions
bin/start-dfs.sh –upgrade
get the status of upgrade
dfsadmin -upgradeProgress status
dfsadmin -upgradeProgress details
Rollback to previous version
When HDFS is upgraded, Hadoop retains backup information allowing you to downgrade to the original HDFS version in case you need to revert Hadoop versions. To back out the changes, stop the cluster, re-install the older version of Hadoop, and then use the command: bin/start-dfs.sh -rollback. It will restore the previous HDFS state.
Only one such archival copy can be kept at a time. Thus, after a few days of operation with the new version (when it is deemed stable), the archival copy can be removed with the command bin/hadoop dfsadmin -finalizeUpgrade. The rollback command cannot be issued after this point. This must be performed before a second Hadoop upgrade is allowed.

Help    
hadoop dfsadmin -help cmd

Hive - ^A or \001 as field separator use
Pig - \t as field separator ---use PigStorage(‘,’) otherwise 
Directory
Description
Default location
Suggested location
HADOOP_LOG_DIR
Output location for log files from daemons
${HADOOP_HOME}/logs
/var/log/hadoop
hadoop.tmp.dir
A base for other temporary directories
/tmp/hadoop-${user.name}
/tmp/hadoop
dfs.name.dir
Where the NameNode metadata should be stored
${hadoop.tmp.dir}/dfs/name
/home/hadoop/dfs/name
dfs.data.dir
Where DataNodes store their blocks
${hadoop.tmp.dir}/dfs/data
/home/hadoop/dfs/data
mapred.system.dir
The in-HDFS path to shared MapReduce system files
${hadoop.tmp.dir}/mapred/system
/hadoop/mapred/sy



Websphere Dummy certificate expired - DummyServerKeyFile.jks , DummyServerTrustFile.jks

If you faced issue with ibm provided dummy certificate expired just like us and looking for the solution.  This blog is for you.  You can re...