Unit 3 - Notes

INT312 8 min read

Unit 3: Map Reduce and YARN

1. Introduction to MapReduce Core Components

MapReduce is a programming model and processing engine designed for distributed computing across massive datasets. A standard MapReduce job in Java is divided into three primary components:

  1. The Mapper Class: Processes input data and generates intermediate key-value pairs.
  2. The Reducer Class: Processes the intermediate key-value pairs and aggregates them to produce the final output.
  3. The Driver Class: Contains the main method, configures the job, sets input/output paths, and submits the job to the Hadoop cluster.

Hadoop uses specialized data types that implement the Writable interface (e.g., IntWritable, LongWritable, Text) rather than standard Java data types (int, long, String) to handle network serialization efficiently.


2. Review of Java Code for MapReduce

2.1 The Mapper Class

The Mapper is responsible for the "Map" phase. It takes input splits, processes them line by line (or record by record), and outputs intermediate key-value pairs.

Syntax and Structure:

JAVA
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;

// Mapper<InputKey, InputValue, OutputKey, OutputValue>
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1. Process the 'value' (usually a line of text)
        // 2. Apply business logic
        // 3. Write intermediate output to context
        // Example: context.write(new Text(word), new IntWritable(1));
    }
}

  • LongWritable (Input Key): The byte offset of the line in the file.
  • Text (Input Value): The actual content of the line.
  • Text (Output Key): The key passed to the Reducer.
  • IntWritable (Output Value): The value passed to the Reducer.
  • Context: The object used to communicate with the Hadoop framework and output data.

2.2 The Reducer Class

The Reducer receives the output of the Mapper. Before reaching the Reducer, Hadoop performs a "Shuffle and Sort" phase, grouping all values associated with the same key into an Iterable list.

Syntax and Structure:

JAVA
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;

// Reducer<InputKey, InputValue, OutputKey, OutputValue>
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // 1. Iterate through 'values' for the given 'key'
        // 2. Perform aggregation/summarization
        // 3. Write final output to context
        // Example: context.write(key, new IntWritable(sum));
    }
}

2.3 The Program Driver

The Driver class orchestrates the execution. It tells Hadoop which classes to use for the Mapper and Reducer, what data types to expect, and where the data is located.

Syntax and Structure:

JAVA
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Job Name");
        
        job.setJarByClass(MyDriver.class); // Specify Driver
        job.setMapperClass(MyMapper.class); // Specify Mapper
        job.setReducerClass(MyReducer.class); // Specify Reducer
        
        job.setOutputKeyClass(Text.class); // Final Output Key
        job.setOutputValueClass(IntWritable.class); // Final Output Value
        
        FileInputFormat.addInputPath(job, new Path(args[0])); // HDFS Input
        FileOutputFormat.setOutputPath(job, new Path(args[1])); // HDFS Output
        
        System.exit(job.waitForCompletion(true) ? 0 : 1); // Execute Job
    }
}


3. The YARN Model

YARN (Yet Another Resource Negotiator) was introduced in Hadoop 2.x to separate the resource management layer from the data processing layer. In Hadoop 1.x, MapReduce handled both, which caused scalability bottlenecks. YARN acts as the operating system for Hadoop, allowing multiple processing engines (MapReduce, Spark, Storm, etc.) to share the same cluster resources.

3.1 Core Components of YARN

  1. ResourceManager (RM):
    • The master daemon running on the master node.
    • It is the ultimate authority that arbitrates resources among all competing applications in the cluster.
    • It has two main components:
      • Scheduler: Allocates resources to various running applications based on constraints (capacity, queues). It does no monitoring or tracking of status.
      • ApplicationsManager: Accepts job submissions, negotiates the first container for the ApplicationMaster, and restarts the ApplicationMaster container on failure.
  2. NodeManager (NM):
    • The worker daemon running on every slave node.
    • Responsible for launching containers, monitoring their resource usage (CPU, memory, disk, network), and reporting this back to the ResourceManager.
  3. ApplicationMaster (AM):
    • A framework-specific library that negotiates resources from the ResourceManager and works with the NodeManager(s) to execute and monitor tasks.
    • Created once per application/job.
  4. Container:
    • The basic unit of allocation in YARN.
    • Represents a collection of physical resources (RAM, CPU cores) on a single node. Tasks run inside these containers.

3.2 YARN Workflow

  1. Client submits an application/job.
  2. ResourceManager allocates a container to start the ApplicationMaster.
  3. The ApplicationMaster registers with the ResourceManager and requests resources (more containers) to execute the job tasks.
  4. The ApplicationMaster communicates with NodeManagers to launch the containers.
  5. NodeManagers execute the tasks (e.g., Mappers and Reducers) inside the containers.
  6. Once the job finishes, the ApplicationMaster unregisters with the ResourceManager and frees up the containers.

4. MapReduce Java Code Examples

Below are complete, logical representations of MapReduce code for specific algorithms. Note: In standard practice, these classes (Mapper, Reducer, Driver) are often nested inside a single file or separated into distinct .java files.

4.1 Word Count

Counts the occurrence of each word in a dataset.

JAVA
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
    // MAPPER
    public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    // REDUCER
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    // DRIVER
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class); // Optional optimization
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

4.2 Sum of Even Numbers

Reads a file containing integers (one per line or space-separated), filters out the odd numbers, and sums the even numbers.

JAVA
public class EvenNumberSum {
    
    // MAPPER: Checks if number is even
    public static class EvenMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static Text evenKey = new Text("Sum of Even Numbers:");
        
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] numbers = value.toString().split("\\s+");
            for (String numStr : numbers) {
                try {
                    int num = Integer.parseInt(numStr.trim());
                    if (num % 2 == 0) {
                        context.write(evenKey, new IntWritable(num));
                    }
                } catch (NumberFormatException e) {
                    // Ignore non-numeric data
                }
            }
        }
    }

    // REDUCER: Sums all even numbers passed to it
    public static class SumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }
    
    // Driver code omitted for brevity (Identical structure to WordCount)
}

4.3 Palindrome Checker

Reads a text file, checks if each word is a palindrome, and outputs the palindromes.

JAVA
public class PalindromeChecker {

    // MAPPER: Checks if a word is a palindrome
    public static class PalindromeMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        
        private boolean isPalindrome(String str) {
            String clean = str.replaceAll("[^a-zA-Z]", "").toLowerCase();
            if (clean.length() < 2) return false; // Ignore single letters
            String reverse = new StringBuilder(clean).reverse().toString();
            return clean.equals(reverse);
        }

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer tokenizer = new StringTokenizer(value.toString());
            while (tokenizer.hasMoreTokens()) {
                String word = tokenizer.nextToken();
                if (isPalindrome(word)) {
                    context.write(new Text(word), NullWritable.get());
                }
            }
        }
    }

    // REDUCER: Identity Reducer (just passes the keys through, grouping duplicates)
    public static class PalindromeReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
        public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }
}

4.4 Factorial

Reads a dataset of integers and computes the factorial of each. (Note: Using LongWritable to accommodate large factorial results. Realistically, factorials grow too fast for standard longs, but this demonstrates the logic).

JAVA
public class FactorialCalculator {

    // MAPPER: Calculates factorial for each number
    public static class FactorialMapper extends Mapper<LongWritable, Text, IntWritable, LongWritable> {
        
        private long calculateFactorial(int n) {
            if (n == 0 || n == 1) return 1;
            long fact = 1;
            for (int i = 2; i <= n; i++) {
                fact *= i;
            }
            return fact;
        }

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] numbers = value.toString().split("\\s+");
            for (String numStr : numbers) {
                try {
                    int num = Integer.parseInt(numStr.trim());
                    if (num >= 0 && num <= 20) { // Limit to 20 to fit in 64-bit Long
                        long fact = calculateFactorial(num);
                        context.write(new IntWritable(num), new LongWritable(fact));
                    }
                } catch (NumberFormatException e) {
                    // Ignore non-numeric
                }
            }
        }
    }

    // REDUCER: Identity Reducer (Outputs Number -> Factorial)
    public static class IdentityReducer extends Reducer<IntWritable, LongWritable, IntWritable, LongWritable> {
        public void reduce(IntWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            // Because Map outputs exact (num, factorial), we just take the first one
            for (LongWritable val : values) {
                context.write(key, val);
                break; // Only write once per number
            }
        }
    }
}

4.5 Armstrong Number

An Armstrong number is a number that is equal to the sum of its own digits each raised to the power of the number of digits. Example: 153 = 1^3 + 5^3 + 3^3.

JAVA
public class ArmstrongNumberChecker {

    // MAPPER: Filters out non-Armstrong numbers
    public static class ArmstrongMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
        private final static Text ARMSTRONG_TAG = new Text("is Armstrong");

        private boolean isArmstrong(int num) {
            int originalNum, remainder, result = 0, n = 0;
            originalNum = num;

            // Find number of digits
            for (;originalNum != 0; originalNum /= 10, ++n);

            originalNum = num;
            // Calculate sum of nth power of digits
            for (;originalNum != 0; originalNum /= 10) {
                remainder = originalNum % 10;
                result += Math.pow(remainder, n);
            }

            return result == num;
        }

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] numbers = value.toString().split("\\s+");
            for (String numStr : numbers) {
                try {
                    int num = Integer.parseInt(numStr.trim());
                    if (isArmstrong(num)) {
                        context.write(new IntWritable(num), ARMSTRONG_TAG);
                    }
                } catch (NumberFormatException e) {
                    // Ignore error
                }
            }
        }
    }

    // REDUCER: Writes output
    public static class ArmstrongReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
        public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            context.write(key, values.iterator().next()); // Output: 153    is Armstrong
        }
    }
}