Building a simple [Yahoo] S4 application

S4 is a distributed stream processing platform from Yahoo. It is often seen as the real-time counterpart of Hadoop. S4 being fault tolerant and horizontally scalable helps you in building very large stream processing application that can do anything from detecting earthquakes to finding that perfect bit of advertising that the visitor on your website is most likely to click.

At its core, an S4 application consists of a number of Processing Elements (PEs) that are wired together with the help of a spring configuration file that defines the PEs and the flow of events in the system. Also, events are produced by event producers that listen that sends these events to the client adapter for S4, from where, the S4 platform takes over and dispatch it to appropriate processing elements. After processing these events, PEs can choose to dispatch them to other PEs for further processing or they can choose to produce output events. Thus, arbitrarily complex behavior can be derived together by wiring a simple set of PEs.

S4 comes with a few example applications, but here is a much simpler S4WordCount application that shows how to:

  1. Keep state in a PE.
  2. Dispatch events from a PE.
  3. Process multiple events from a single PE.
  4. Write a simple java client for sending events to S4.

S4 is a distributed stream processing platform from Yahoo. It is often seen as the real-time counterpart of Hadoop. S4 being fault tolerant and horizontally scalable helps you in building very large stream processing application that can do anything from detecting earthquakes to finding that perfect bit of advertising that the visitor on your website is most likely to click.

At its core, an S4 application consists of a number of Processing Elements (PEs) that are wired together with the help of a spring configuration file that defines the PEs and the flow of events in the system. Also, events are produced by event producers that listen that sends these events to the client adapter for S4, from where, the S4 platform takes over and dispatch it to appropriate processing elements. After processing these events, PEs can choose to dispatch them to other PEs for further processing or they can choose to produce output events. Thus, arbitrarily complex behavior can be derived together by wiring a simple set of PEs.

S4 comes with a few example applications, but here is a much simpler S4WordCount application that shows how to:

  1. Keep state in a PE.
  2. Dispatch events from a PE.
  3. Process multiple events from a single PE.
  4. Write a simple java client for sending events to S4.

In S4WordCount, we will build a simple WordReceiverPE, that will receive events in the form of word and will simply print these words on stdout. It will also identify sentences in the word stream and then forward these sentences for further processing to SenteceReceiverPE. WordReceiverPE will also produce receive sentence events and print them to stdout.

First lets have a look at Word and Sentence, the event object used in our example:

package test.s4;

public class Word {
	private String string;

	public String getString() {
		return string;
	}

	public void setString(String message) {
		this.string = message;
	}

	@Override
	public String toString() {
		return "Word [string=" + string + "]";
	}
}

S4 uses keys, which is a set of some properties of the event object, for routing/dispatching events. In this case since Word is the key-less entry point into the system, and we don’t have any key for it, but for Sentence which will be processed further, we have a Sentence.sentenceId as the key. (For simplicity, all Sentence have the same sentenceId, 1)

Now let’s have a look at our first PE, i.e. WordReceiverPE:

We define a StringBuilder that will be used to accumulate words to form a Sentence. The processEvent(Word) method simply prints the received word on stdout and appends the word to the builder. It then checks if the sentence is complete by checking for . at the end of the builder and if so, it creates a Sentence event(object) and dispatches it to the Sentence stream. Once dispatched the processEvent(Sentence) will receive that event and will again print the sentence to stdout.

Now let’s have a look at SenteceReceiverPE, our second PE which does nothing but print the received Sentence on stdout.

Finally, lets see the content of the application config file that wires all this together and forms a valid S4 application. The name of the config file should follow the naming convention of <AppName>-config.xml. In this case we will call the config file S4WordCount-conf.xml

This is a spring bean definition file. The first bean, wordCatcher is an object of class test.s4.WordReceiverPE and it injects a dispatcher called dispatcher into WordRecieverPE. We will look at properties of dispatcher later. keys define the stream on which our PE will be listening for event. RawWords * means that it will be receiving all events on the stream named RawWords irrespective of their keys. Similar is the intention for Sentence *.

Our second bean is sentenceCatcher which is an object of class test.s4.SentenceReceiverPE and will accept all events on stream called Sentence.

Third is the definition for the dispatcher which we injected into wordCatcher. The dispatcher needs a partitioner that partitions the events based on some key and then dispatches them to appropriate PEs. In this case we are using the DefaultPartitioner whose properties are defined later by sentenceIdPartitoner bean which says that partition the event objects on Sentence stream by sentenceId property. dispatcher uses the S4 provided commLayerEmitter to emit the events.

Running the application

To run this application on the S4:

  1. Setup S4 as documented here.
  2. Create a S4WordCount.jar from above classes.
  3. Deploy the application on S4, by creating the following directory structure:
    	/$S4_IMAGE
    		/s4-apps
    			/S4WordCount
    				S4WordCount-conf.xml
    				/lib
    					S4Wordcount.jar
  4. Start S4: $S4_IMAGE/scripts/start-s4.sh -r client-adapter &
  5. Start client adapter: $S4_IMAGE/scripts/run-client-adapter.sh -s client-adapter -g s4 -d $S4_IMAGE/s4-core/conf/default/client-stub-conf.xml &

Now S4 is ready to receive events. Following is an event sender that uses the java client library to send events to S4. It reads one word at a time from stdin and sends it to S4.

Run client: java TestMessageSender localhost 2334 RawWords test.s4.Word

Here is the sample output on the S4 console:


Received: Word [string=this]
Received: Word [string=is]
Received: Word [string=a]
Received: Word [string=sentence.]

Using fast path!
Value 1, partition id 0
wrapper is stream:Sentence keys:[{sentenceId = 1}:0] keyNames:null event:Sentence [string= this is a sentence.] => 0
Received Sentence(WordReceiverPE) : Sentence [string= this is a sentence.]
Received Sentence: Sentence [string= this is a sentence.]

26 thoughts on “Building a simple [Yahoo] S4 application”

  1. Very useful. Can you please give some complex examples and explain how S4 works? particularly in cases like how it can be used to find perfect advertisement for a user.. Thanks!

    1. @Sundar, See part 5 Online Parameter Optimization in the s4 paper. It details how you can use S4 for automatic tuning of parameters for advertising system using live traffic.

  2. 1) if a have a Test.java like

    class Test{
    String carNo;
    int yearOfMacnufature;
    float noOfKm;
    }

    and i want to send these type of objects as a event, then what should the string representation of my Message object look like:
    i mean
    Message m = new Message(“RawCarRecord”, “bean.Test”, “What should be the string representation for above class”);

    2) what is the use of overriding toString() method in each bean class(Words,Sentence)???

    1. 1. The string representation for the message should be the json representation, for example {"carNo":"1234", "yearOfMacnufature":1984,"noOfKm":5230.21}
      2. toString methods are overriden just for debugging purpose. Its not necessary.

  3. Hey anand, great post.
    I am currently trying to run it, and had a hard time figuring out the a small change in the code.

    import io.s4.*

    should now be:

    org.apache.s4.*

    Moreover, I am not able to run the java client library due to this exception:

    org.apache.s4.client.util.ObjectBuilder$Exception: bad class name for json-encoded object: test.s4.Word
    at org.apache.s4.client.GenericJsonClientStub.eventWrapperFromBytes(GenericJsonClientStub.java:56)
    at org.apache.s4.client.ClientConnection$1.run(ClientConnection.java:132)
    at java.lang.Thread.run(Thread.java:619)
    Caused by: java.lang.ClassNotFoundException: test.s4.Word

    Any hints on what might be wrong? I've included the S4WordCount.jar in the classpath, but it's not able to find the test.s4.Word class
    Thanks again
    M.

    1. The S4 project has been moved to Apache Incubator, hence the name change. You can either rename the imports or use s4 0.3.0 available at github. This should also solve your second problem.

        1. It says classes bot found. So do we have to create a class folder with the details of the classes and then it might run? Other examples of s4 seem to have a class folder which specifies the classes used in the example.

          1. I think jar should work perfectly fine, see the directory structure mentioned in this post.

        2. Also when I try to compile WordReceiverPE.java or SentenceReceiverPE.java using javac i get error: package org.apache.s4.client does not exist. So am not able to get the class WordReceiverPE.class and hence showing the error- class not found.

          1. Are you keeping the s4 jars on classpath while compiling? If problem persists, try using eclipse to compile the files after adding those jars manually to your buildpath.

          2. The core classes of s4. If stuck, try gradle eclipse to convert your project to an eclipse project.

      1. Sir we have 4 java files in which 2 of them are PE.java .So when i try to compile all 4 of them together using javac I am getting an error- SentenceReceiverPE.java:3: error: package io.s4.processor does not exist
        import io.s4.processor.AbstractPE.
        So is there any way to compile this? Also is it because of the path not specified properly – import org.apache.processor.abstractPE ?

      2. Sir actually inside the jar file am only having one file META_INF. The class files are not there. Hence i tried to compile them seperately using javac. Getting the errors i mentioned before.

      3. I was able to run s4. But i am getting one last error

        ~/s4$ java TestMessageSender localhost 2334 RawWords test.s4.Word
        Exception in thread “main” java.lang.NoClassDefFoundError: TestMessageSender
        Caused by: java.lang.ClassNotFoundException: TestMessageSender
        at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
        Could not find the main class: TestMessageSender. Program will exit.

  4. hello
    i tried using s4 in localhost .it works fine.then i copied the s4 folder from git repository to a server.there i when i tried building the s4 using ./graldlew build
    i get following error
    ###@#### # ./gradlew
    Downloading http://repo.gradle.org/gradle/distributions/gradle-1.0-milestone-3-bin.zip

    Exception in thread “main” java.net.UnknownHostException: repo.gradle.org
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:177)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:366)
    at java.net.Socket.connect(Socket.java:525)
    at java.net.Socket.connect(Socket.java:475)
    at sun.net.NetworkClient.doConnect(NetworkClient.java:163)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:394)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:529)
    at sun.net.www.http.HttpClient.(HttpClient.java:233)
    at sun.net.www.http.HttpClient.New(HttpClient.java:306)
    at sun.net.www.http.HttpClient.New(HttpClient.java:323)
    at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:860)
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:801)
    at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:726)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1049)
    at org.gradle.wrapper.Download.downloadInternal(Download.java:49)
    at org.gradle.wrapper.Download.download(Download.java:37)
    at org.gradle.wrapper.Install.createDist(Install.java:54)
    at org.gradle.wrapper.Wrapper.execute(Wrapper.java:80)
    at org.gradle.wrapper.GradleWrapperMain.main(GradleWrapperMain.java:37)

    1. gradle is trying to access the internet for building the s4 inside the server.but the server is not connected to the network.try enabling network access for the server

Leave a Reply

Your email address will not be published. Required fields are marked *