MapReduce Version 1

MapR has made a number of improvements to the MapReduce framework, designed to improve performance and manageability of the cluster, and performance and reliability of MapReduce jobs.

MapR provides performance improvements in the shuffle phase of MapReduce and adds high availability for all Hadoop services. You can configure Hadoop services to run on multiple nodes for failover. If one service node fails, another continues to perform the tasks related to that service without delaying the MapReduce job.

Other Hadoop distributions keep map output on local disk, which creates competition for disk space between local and distributed storage. In MapR, any spilled data is stored in the distributed file system making it directly accessible.

NOTE: You may need to recompile existing MapReduce v1 jobs in order to successfully run the job in a Hadoop 2.x cluster.

DirectShuffle

MapR has made performance optimizations to the shuffle phase, in which output from Mappers is sent to reducers. This phase involves a great deal of copying and coordination between nodes in the cluster. Shuffling in MapR-FS is much faster than other Hadoop distributions because MapR uses highly optimized, efficient remote procedure call connections to transport data while other Hadoop distributions use HTTP connections.

Instead of writing intermediate data to local disks controlled by the operating system, MapR writes to a MapR-FS volume limited by its topology to the local node. This improves performance and reduces demand on local disk space while making the output available cluster-wide.

The direct shuffle leverages the underlying storage layer and takes advantage of its unique capabilities:
  • High sequential and random I/O performance, including the ability to create millions of files at extremely high rates (using sequential I/O)
  • The ability to leverage multiple NICs via RPC-level bonding. By comparison, the shuffle in other distributions can only leverage a single NIC (in theory, one could use port trunking in any distribution, but the performance gains would be minimal compared to the MapR distribution’s RPC-level load balancing)
  • The ability to compress data at the block level

Protection from Runaway Jobs

MapR includes several mechanisms to protect against runaway jobs. Many Hadoop users experience situations in which the tasks of a poorly designed job consume too much memory and, as a result, the nodes start swapping and quickly become unavailable. Since tasks have an upper bound on memory usage, tasks that exceed this limit are automatically killed with an out-of-memory exception. Quotas on disk usage can be set on a per-user, as well as a per-volume, basis.

JobTracker HA

In a MapR cluster, the JobTracker can be configured for High Availability (HA). If the node running the JobTracker fails, the ZooKeeper instructs the Warden on another JobTracker node to start an instance of the JobTracker. The new JobTracker takes over where the first JobTracker left off. The TaskTrackers maintain information about the state of each task, so that when they connect to the new JobTracker they are able to continue without interruption.

Label-Based Scheduling

MapR lets you use labels to create subsets of nodes within a cluster so you can allocate jobs to those nodes depending on a given use case. The labels are in a simple node-labels mapping file that correlates node identifiers to lists of labels. Each identifier can be the name of a node, or a regular expression or glob that matches multiple nodes.

The JobTracker caches the mapping file, checking the file’s modification time every two minutes (by default) for updates. If the file has been modified, the JobTracker updates the labels for all active TaskTrackers. The change takes effect immediately, meaning that it affects running jobs; tasks that are currently in process are allowed to finish, but new tasks will not be started on nodes that no longer match the label under which the job has been run.

Centralized Logging

Centralized logging provides a job-centric view of all the log files generated by TaskTracker nodes throughout the cluster. This enables users to gain a complete picture of job execution by having all the logs available in a single directory, without having to navigate from node to node. Centralized logs are available cluster-wide as they are written to the following local volume on the MapR-FS:
/var/mapr/local/<TaskTracker node>/logs/mapred/userlogs
Since the log files are stored in a local volume directory that is associated with each TaskTracker node, you run the maprcli job linklogs command to create symbolic links for all the logs in a single directory. You can then use tools such as grep and awk to analyze them from an NFS mount point. You can also view the entire set of logs for a particular job using the TaskTracker UI.
The central directories for task attempts contain the log.index, stdout, stderr, and syslog files for all tasks, regardless of JVM reuse. MapReduce programs generate three types of output that are intercepted by the task runner:
  • Standard output stream: captured in the stdout file
  • Standard error stream: captured in the stderr file
  • Log4j logs: captured in the syslog file
Hadoop maintains another file named log.index in every task attempt’s log directory. This file is required to deal with the cases where the same JVM is reused for multiple tasks. The number of times a JVM is reused is controlled by the mapred.job.reuse.jvm.num.tasks configuration variable. When the JVM is reused, the physical log files stdout, stderr, and syslog only appear in the log directory of the first task attempt run by that JVM. These files are shared by all tasks.
The Task Tracker UI uses the log.index file to separate information relating to different tasks. The log.index file stores the following information in human-readable format:
  • The log directory where the log files are stored. This is the log directory for the first task attempt run by a given JVM.
  • The beginning offset and length of output within a given log file where the information for each subsequent task attempt is located within that log file.

Job Metrics

MapR collects and stores job-related metrics in a MySQL database as well as in a local MapR-FS volume called metrics. There are two different types of metrics:
  • Node metrics and events (data about services on each node)
  • MapReduce metrics and events (job, task, and task attempt data)

Node metrics are inserted into the database at the point where they are produced (by the hoststats service and the warden). MapReduce job metrics are propagated to local hoststats from the JobTracker via remote procedure calls (RPC) along with task and task attempt data. The task attempt data is partitioned by day based on job submission time, and cleaned up if the corresponding job data is not viewed within 48 hours.

Job, task attempt, and task metrics are gathered by the Hadoop Metrics Framework every minute. TaskAttempt counters are updated on the JobTracker only every minute from the TaskTrackers. Hoststats collects metrics from each node and gets metrics from MapR-FS every ten seconds via shared memory. The JobTracker and TaskTrackers also use the Hadoop Metrics Framework to write metrics and events every ten seconds into a job history file in MapR-FS. There is a new history file that includes transactional and event data from the MapReduce job. These files created by hoststats are used to generate the charts that are viewable in the MapR Metrics user interface in the MapR Control System.