Hadoop Tuning Notes

This is a quick dump of my notes for hadoop tuning.

General

  • Hadoop is designed to use multiple cores and disks, so it will be able to take full advantage of more powerful hardware.
  • Don’t use RAID for HDFS datanodes as redundancy is handled by HDFS. RAID should be used for namenodes as it provides protection against metadata corruption.
  • Machine running the namenode should run on a 64-bit system to avoid 3GB limit on JVM heap size.
  • In case the cluster consists of more than one rack, it is recommended to tell Hadoop about the network topology. Rack awareness will help Hadoop in calculating data locality while assigning MapReduce tasks and it will also help HDFS to choose replica location for files more intelligently.
  • Two processors will be engaged by datanode and tasktracker, and the remaining n-2 processors can have a factor of 1 to 2 extra jobs.
  • On Master node, each of namenode, jobtracker and secondary namenode takes 1000M of memory. If you have a large number of files than increase the JVM heap size for namenode and secondary namenode.

Configuration Parameters

HDFS

dfs.block.size
The block size used by HDFS which defaults to 64MB. On large clusters this can be increased to 128MB or 256MB to reduce memory requirements on namenode and also to increase the size of data given to map tasks.
dfs.name.dir
should give a list of directories where the namenode persists copies of data. It should be one or two local disks and a remote disk such as nfs mounted directory so that in case of node failure, metadata can be recovered from the remote disk.
dfs.data.dir
specifies the list of directories used by datanodes to store data. It should always be local disks and if there are multiple disks then each directory should be on different disk so as to maximize parallel read and writes.
fs.checkpoint.dir
list of directories where secondary namenode keeps checkpoints. It should use redundant disks for the same reason as dfs.name.dir
dfs.replication
number of copies of data to be maintained. It should be at least 2 more than the number of machines that are expected to fail everyday in the cluster.
dfs.access.time.precision
The precision in msec that access times are maintained. It this value is 0, no access time are maintained resulting in performance boosts. Also, Storage disks should be mounted with noatime which disables last access time updates during file reads resulting in considerable performance gains.
dfs.datanode.handler.count
Number of threads handling block requests. In case on multiple physical disks, the throughput can increase by increasing this number from default value of 3.
dfs.namenode.handler.count
Number of threads on Namenode. This number should be increase from default value of 10 for large clusters.

MapReduce

mapred.local.dir
is the list of directories where intermediate data and working files are store by the tasks. These should be a number of local disks to facilitate parallel IO. Also, these partitions should the same which are used by datanodes to store data (dfs.data.dir)
mapred.tasktracker.map.tasks.maximum, mapred.tasktracker.reduce.tasks.maximum
specifies the maximum number of map and reduce tasks that can be run at the same time. It should be a multiple of number of cores.
mapred.job.tracker.handler.count
Number of server threads for handling tasktracker request. Default value is 10 and the recommended value is 4% of the tasktracker nodes.
mapred.child.java.opts
increase the JVM heap memory of tasks that require more memory.

Others

core-site.xml::io.file.buffer.size
This is buffer size used by hadoop during IO which default to 4KB. On modern system it can be increased to 64KB or 128KB for performance gains.
Posted in Codeprix | Tagged , , , | Comments Off

Building a simple [Yahoo] S4 application

S4 is a distributed stream processing platform from Yahoo. It is often seen as the real-time counterpart of Hadoop. S4 being fault tolerant and horizontally scalable helps you in building very large stream processing application that can do anything from detecting earthquakes to finding that perfect bit of advertising that the visitor on your website is most likely to click.

At its core, an S4 application consists of a number of Processing Elements (PEs) that are wired together with the help of a spring configuration file that defines the PEs and the flow of events in the system. Also, events are produced by event producers that listen that sends these events to the client adapter for S4, from where, the S4 platform takes over and dispatch it to appropriate processing elements. After processing these events, PEs can choose to dispatch them to other PEs for further processing or they can choose to produce output events. Thus, arbitrarily complex behavior can be derived together by wiring a simple set of PEs.

S4 comes with a few example applications, but here is a much simpler S4WordCount application that shows how to:

  1. Keep state in a PE.
  2. Dispatch events from a PE.
  3. Process multiple events from a single PE.
  4. Write a simple java client for sending events to S4.

In S4WordCount, we will build a simple WordReceiverPE, that will receive events in the form of word and will simply print these words on stdout. It will also identify sentences in the word stream and then forward these sentences for further processing to SenteceReceiverPE. WordReceiverPE will also produce receive sentence events and print them to stdout.

First lets have a look at Word and Sentence, the event object used in our example:

package test.s4;

public class Word {
private String string;

public String getString() {
return string;
}

public void setString(String message) {
this.string = message;
}

@Override
public String toString() {
return "Word [string=" + string + "]";
}
}

view raw Word.java This Gist brought to you by GitHub.

package test.s4;

public class Sentence {
private String string;

public Sentence(){
// default constructor
}

public Sentence(String string) {
this.string = string;
}

public String getString() {
return string;
}

public void setString(String message) {
this.string = message;
}

public String getSentenceId(){
// all sentences have the same key
return "1";
}
public void setSentenceId(String id){
// do nothing
}

@Override
public String toString() {
return "Sentence [string=" + string + "]";
}

}

view raw Sentence.java This Gist brought to you by GitHub.

S4 uses keys, which is a set of some properties of the event object, for routing/dispatching events. In this case since Word is the key-less entry point into the system, and we don’t have any key for it, but for Sentence which will be processed further, we have a Sentence.sentenceId as the key. (For simplicity, all Sentence have the same sentenceId, 1)

Now let’s have a look at our first PE, i.e. WordReceiverPE:

package test.s4;

import io.s4.dispatcher.Dispatcher;
import io.s4.processor.AbstractPE;

/**
* This class receive word and sentence events and print them to stdout
*/
public class WordReceiverPE extends AbstractPE {
private StringBuilder builder = new StringBuilder();

/**
* Dispatcher that will dispatch events on <code>Sentence *</code> stream.
*/
private Dispatcher dispatcher;

public Dispatcher getDispatcher() {
return dispatcher;
}

public void setDispatcher(Dispatcher dispatcher) {
this.dispatcher = dispatcher;
}

/**
* Process word events. This prints out the received word and also builds
* the sentence that will be dispatched once we found the end of the
* sentence identified by <code>.</code>
*
* @param word
* Word received on <code>RawWords *</code> stream.
*/
public void processEvent(Word word) {
System.out.println("Received: " + word);

// keep building the sentence
builder.append(' ').append(word.getString().trim());

if (builder.toString().endsWith(".")) {
System.err.print("End of sentence found");

// dispatch a Sentence event
dispatcher.dispatchEvent("Sentence", new Sentence(builder.toString()));

// reset buffer.
builder.setLength(0);
}
}

/**
* Process sentence events. This method demonstrated that one class can
* recieve multiple type of events on different Streams.
*
* @param sentence
* Sentence recieved on <code>Sentence *<code> stream.
*/
public void processEvent(Sentence sentence) {
System.out.println("Received Sentence(WordReceiverPE) : " + sentence);
}

@Override
public void output() {
// TODO Auto-generated method stub
}

@Override
public String getId() {
return this.getClass().getName();
}
}

We define a StringBuilder that will be used to accumulate words to form a Sentence. The processEvent(Word) method simply prints the received word on stdout and appends the word to the builder. It then checks if the sentence is complete by checking for . at the end of the builder and if so, it creates a Sentence event(object) and dispatches it to the Sentence stream. Once dispatched the processEvent(Sentence) will receive that event and will again print the sentence to stdout.

Now let’s have a look at SenteceReceiverPE, our second PE which does nothing but print the received Sentence on stdout.

package test.s4;

import io.s4.processor.AbstractPE;

/**
* This class receives Sentence events and print them on stdout.
*/
public class SentenceReceiverPE extends AbstractPE {

/**
* Print received sentence event.
* @param sentence
*/
public void processEvent(Sentence sentence){
System.out.println("Received Sentence: " + sentence);
}

@Override
public void output() {
// TODO Auto-generated method stub
}

@Override
public String getId() {
return this.getClass().getName();
}

}

Finally, lets see the content of the application config file that wires all this together and forms a valid S4 application. The name of the config file should follow the naming convention of <AppName>-config.xml. In this case we will call the config file S4WordCount-conf.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">

<bean id="wordCatcher" class="test.s4.WordReceiverPE">
<property name="dispatcher" ref="dispatcher"/>

<property name="keys">
<!-- Listen for both words and sentences -->
<list>
<value>RawWords *</value>
<value>Sentence *</value>
</list>
</property>
</bean>

<bean id="sentenceCatcher" class="test.s4.SentenceReceiverPE">
        <property name="keys">
            <list>
                <value>Sentence *</value>
            </list>
        </property>
    </bean>

<bean id="dispatcher" class="io.s4.dispatcher.Dispatcher"
init-method="init">
<property name="partitioners">
<list>
<!-- Partition on senteceId which is "1" for all sentences. -->
<ref bean="sentenceIdPartitioner" />
</list>
</property>
<property name="eventEmitter" ref="commLayerEmitter" />
<property name="loggerName" value="s4" />
</bean>

<bean id="sentenceIdPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
<property name="streamNames">
<list>
<value>Sentence</value>
</list>
</property>
<property name="hashKey">
<list>
<value>sentenceId</value>
</list>
</property>
<property name="hasher" ref="hasher" />
<property name="debug" value="true" />
</bean>

</beans>

This is a spring bean definition file. The first bean, wordCatcher is an object of class test.s4.WordReceiverPE and it injects a dispatcher called dispatcher into WordRecieverPE. We will look at properties of dispatcher later. keys define the stream on which our PE will be listening for event. RawWords * means that it will be receiving all events on the stream named RawWords irrespective of their keys. Similar is the intention for Sentence *.

Our second bean is sentenceCatcher which is an object of class test.s4.SentenceReceiverPE and will accept all events on stream called Sentence.

Third is the definition for the dispatcher which we injected into wordCatcher. The dispatcher needs a partitioner that partitions the events based on some key and then dispatches them to appropriate PEs. In this case we are using the DefaultPartitioner whose properties are defined later by sentenceIdPartitoner bean which says that partition the event objects on Sentence stream by sentenceId property. dispatcher uses the S4 provided commLayerEmitter to emit the events.

Running the application

To run this application on the S4:

  1. Setup S4 as documented here.
  2. Create a S4WordCount.jar from above classes.
  3. Deploy the application on S4, by creating the following directory structure:
    	/$S4_IMAGE
    		/s4-apps
    			/S4WordCount
    				S4WordCount-conf.xml
    				/lib
    					S4Wordcount.jar
  4. Start S4: $S4_IMAGE/scripts/start-s4.sh -r client-adapter &
  5. Start client adapter: $S4_IMAGE/scripts/run-client-adapter.sh -s client-adapter -g s4 -d $S4_IMAGE/s4-core/conf/default/client-stub-conf.xml &

Now S4 is ready to receive events. Following is an event sender that uses the java client library to send events to S4. It reads one word at a time from stdin and sends it to S4.

package test.s4.generator;

import io.s4.client.Driver;
import io.s4.client.Message;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;

public class TestMessageSender {
public static void main(String[] args) {
if (args.length < 1) {
            System.err.println("No host name specified");
            System.exit(1);
        }
        String hostName = args[0];
        
        if (args.length < 2) {
            System.err.println("No port specified");
            System.exit(1);
        }
        
        int port = -1;
        try {
            port = Integer.parseInt(args[1]);
        } catch (NumberFormatException nfe) {
            System.err.println("Bad port number specified: " + args[1]);
            System.exit(1);
        }
        
        if (args.length < 3) {
            System.err.println("No stream name specified");
            System.exit(1);
        }
        String streamName = args[2];
        
        if (args.length < 4) {
            System.err.println("No class name specified");
            System.exit(1);
        }
        String clazz = args[3];
        
        Driver d = new Driver(hostName, port);
        Reader inputReader = null;
        BufferedReader br = null;
        try {
            if (!d.init()) {
                System.err.println("Driver initialization failed");
                System.exit(1);
            }
            
            if (!d.connect()) {
                System.err.println("Driver initialization failed");
                System.exit(1);
            }
            
            inputReader = new InputStreamReader(System.in);
            br = new BufferedReader(inputReader);

            for (String inputLine = null; (inputLine = br.readLine()) != null;) {
                String string = "{\"string\":\""+inputLine+"\"}";
                System.out.println("sending " + string);
Message m = new Message(streamName, clazz, string);
                d.send(m);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        finally {
            try { d.disconnect(); } catch (Exception e) {}
            try { br.close(); } catch (Exception e) {}
            try { inputReader.close(); } catch (Exception e) {}
        }
}
}

Run client: java TestMessageSender localhost 2334 RawWords test.s4.Word

Here is the sample output on the S4 console:


Received: Word [string=this]
Received: Word [string=is]
Received: Word [string=a]
Received: Word [string=sentence.]

Using fast path!
Value 1, partition id 0
wrapper is stream:Sentence keys:[{sentenceId = 1}:0] keyNames:null event:Sentence [string= this is a sentence.] => 0
Received Sentence(WordReceiverPE) : Sentence [string= this is a sentence.]
Received Sentence: Sentence [string= this is a sentence.]

Posted in Codeprix | Tagged , , , , , , | 4 Comments

Curious case of Lewis Carrol, tournament runner ups and sorting

No less than four grand slams are played every year to find the best tennis player in the world. These are knockout tournaments in which, at each stage half of the players lose and get out. Two [supposedly] best players clash in the finals and the winner of that game is declared the champion (the best player) and the loser the runner up (the second best player). All is fine with our champion but there is just a small problem with our runner up: He might not be the second best player of the tournament.

How?

Let us assume that there were 16 players in the beginning with seedings 1 to 16, 16 being the best. Consider the following results for the matches in the tournament

15
	15
1
		15
5
	9
9
			16
12
	16
16
		16
4
	4
3
				16
11
	11
6
		11
7
	10
10
			14
2
	8
8
		14
13
	14
14

As expected, 16 is the winner but the runner up is 14 not 15. 15 crashed out in the semi finals playing against the champion.

So what can we conclude from this?

  1. Runner up may not be the second best player of the tournament.
  2. log n matches are required to decide the best player in the tournament.
  3. We can find the second best player in the tournament looking at the match results of the winner which is a list of log n players. At some point he must have defeated the second best player.

Here is a program that gives you the second best player in (n - 1) + ((log n) - 1) = n + (log n) - 2 comparisons:

# Finding the second best player in a knockout tournament

defeatedListMap = {}

def fillMap(x,y):
        if x in defeatedListMap:
                defeatedListMap[x].add(y)
        else:
                defeatedListMap[x] = set([y])


def largest(array):
        if len(array) == 0:
                return array
        elif len(array) == 1:
                return array[0];
        elif len(array)== 2:
                fillMap(array[0], array[1])
                if(array[0]>array[1]):
                        fillMap(array[0],array[1])
                        return array[0]
                else:
                        fillMap(array[1], array[0])
                        return array[1]
        else:
                x = largest(array[:len(array)/2])
                y = largest(array[len(array)/2:])
                if x > y:
                        fillMap(x,y)
                        return x
                else:
                        fillMap(y,x)
                        return y


# generate a randomize draw for matches
import random
inputList = range(1,17)
random.shuffle(inputList)

#get the champion
champion = largest(inputList)

# look for the runner up in the list of players defeatee by champion
runnerUp = largest(list(defeatedListMap.get(champion)))

print champion, runnerUp

view raw tournament.py This Gist brought to you by GitHub.
Posted in Codeprix | Tagged , , | Leave a comment