Stage-Level Job Pipelining in Apache Spark: A Deep Dive

How we improved TracksETL throughput by understanding Spark’s execution model

Introduction

Back in 2018, we migrated TracksETL (one of the largest ETL job) from Hive to Spark to handle our exploding event volumes. At that time, we processed around 2.28 billion events monthly. Fast forward to today, and we’re processing over 15 billion events — a 6x increase. Throughout this growth, our Spark-based ETL has continued to perform well, largely thanks to an optimization we implemented: stage-level job pipelining.

In this post, I’ll walk through the evolution of our job scheduling strategy, from naive sequential processing to a sophisticated stage-aware pipelining approach that maximizes cluster utilization.

Understanding the Problem

TracksETL processes event data by day partitions. Due to data volume, we can’t process all days simultaneously — each day becomes a separate Spark job within a single application. A typical job has three distinct stages:

  1. READ – Read from HDFS (etl_events, prod_useraliases, etc.). This is very fast as it’s all sequential reads and parallelism is limited to the number of files on HDFS.
  2. PROCESS – Process and deduplicate data. This is computationally expensive and entails shuffling large amounts of data to join and de-duplicate.
  3. WRITE – Write back to HDFS. Since we don’t want to write data into a large number of small files, output is coalesced and then written. This is the slowest stage and is I/O bound.

The WRITE stage is particularly problematic: it’s I/O bound and uses fewer tasks (~120) than our available cores (~300), leaving resources idle.

The Evolution of Our Scheduling Strategy

Iteration Zero: Sequential Processing

The simplest approach — process one day at a time:

Result: Works with minimal resources, but painfully slow. The timeline was pretty boring and it never made it to production.

Iteration One: Parallel Jobs with par

As each stage was already individually optimized, it was not possible to get this running faster without running multiple jobs in parallel and throwing more hardware at it. This can be done by triggering Spark Actions for independent jobs in parallel. In Scala, this would look like:

jobs.par.foreach(job => job.compute)

Here, jobs is a list of jobs to be computed and compute triggers one of the Spark actions. par converts the list to a parallel list and hence they are submitted to Spark in parallel. By default, it will run four jobs in parallel.

This is a reasonably straightforward way of running parallel jobs without introducing any code complexity. In fact, for most cases this should be enough.

Problem: Jobs run in batches. The entire batch must complete before the next starts. In the diagram above, you can see that Jobs 2 and 3 finished before Job 1, but no new job is submitted until all jobs in the first batch are finished. This leads to some executors sitting idle when only a few tasks from one of the WRITE stages remain.

Iteration Two: Job-Level Pipelining

The next thing to try was to start a job as soon as one of the jobs is over:

I never implemented this approach as there was an even better approach.

Iteration Three: Stage-Level Pipelining (The Final Solution)

While describing the jobs earlier, I mentioned that the first two stages were fast/optimized but the third stage was I/O bound and slow. A WRITE stage is usually broken into around 120 tasks while we have around 300 cores dedicated for the TracksETL application.

Also, since there is considerable variation in volume of events that we get every day, it often happens that the WRITE stage of various parallel jobs start at different times. During each batch, we end up in a state with only WRITE stages running which are painfully slow.

To alleviate this issue, we can start the next job as soon as one of the already running jobs starts their WRITE stage. This will slow down the existing job a bit but not considerably, while we gain from starting the new job much earlier by not waiting for the whole job to finish. The wall-clock time for all the WRITE stages is amortized over the execution of READ and PROCESS stages.

The key insight: don’t wait for jobs to finish — start new jobs when running jobs enter their WRITE stage. Since WRITE is I/O bound and uses fewer cores, we can overlap it with the CPU-intensive READ and PROCESS stages of new jobs.

Implementation Deep Dive

The implementation uses Spark’s StatusTracker API to monitor job and stage progress. Here’s how it works:

Architecture Overview

The diagram below shows the overall architecture of our stage-level pipelining implementation:

The system consists of three main components:

  1. BlockingQueue – Holds all partition groups (days) to be processed. Each day’s data becomes a work item in the queue.
  2. Scheduling Loop – The main control loop that polls the StatusTracker every 20 seconds to check the state of running jobs and decides whether to submit new work.
  3. Thread Pool – An ExecutionContext with a fixed number of worker threads that execute the actual Spark jobs. Each worker pulls from the queue and processes partitions independently.

The Scheduling Decision Algorithm

The flowchart below illustrates the decision logic that runs every 20 seconds:

The algorithm follows these steps:

  1. Check capacity – If the number of active jobs is less than our target concurrency (tasks), submit a new job immediately.
  2. Query job stages – If we’re at capacity, use StatusTracker to get the current stage of each running job.
  3. Identify final stages – For each running job, find its maximum stage ID (which corresponds to its WRITE stage).
  4. Check for overlap opportunity – If all running jobs are currently executing their final (WRITE) stage, it’s safe to submit a new job because WRITE is I/O bound and won’t compete for CPU resources.

This approach ensures we maintain high throughput without overloading the cluster during CPU-intensive stages

Core Implementation

val statusTracker = sparkSession.sparkContext.statusTracker

  val blockingQueue = new ArrayBlockingQueue[PartitionGroup](
    partitionGroups.size,
    false,
    partitionGroups.asJava
  )

  while (!blockingQueue.isEmpty) {
    val activeJobIds: Array[Int] = statusTracker.getActiveJobIds()

    val newJob: Option[Future[Option[Seq[Long]]]] =
      if (activeJobIds.length < tasks) {
        // Simple case: we have capacity
        Some(Future { processPartitions(blockingQueue.poll().partitions) })
      } else {
        // Check if running jobs are in their final (WRITE) stage
        val runningJobStatuses = Seq(JobExecutionStatus.RUNNING, JobExecutionStatus.UNKNOWN)

        val runningJobInfos: Array[Option[SparkJobInfo]] =
          activeJobIds
            .map(statusTracker.getJobInfo)
            .filter {
              case None    => true
              case Some(x) => runningJobStatuses.contains(x.status())
            }

        if (runningJobInfos.length <= tasks + 1) {
          // Get the maximum (final) stage ID for each running job
          val maxStageIdsForRunningJobs: Array[Int] =
            runningJobInfos.flatten
              .filter(_.status() == JobExecutionStatus.RUNNING)
              .map(_.stageIds().max)

          val runningStageIds: Array[Int] = statusTracker.getActiveStageIds()

          // If all jobs are on their final stage, submit another
          if (maxStageIdsForRunningJobs.diff(runningStageIds).isEmpty) {
            Some(Future { processPartitions(blockingQueue.poll().partitions) })
          } else {
            None
          }
        } else {
          None
        }
      }

    newJob.foreach(j => futures.add(j))
    Thread.sleep(20000) // Poll every 20 seconds
  }

  // Wait for all jobs to complete
  Await.result(Future.sequence(futures.asScala), Duration(2, TimeUnit.HOURS))

The Key Insight Explained

The magic happens in this condition:

maxStageIdsForRunningJobs.diff(runningStageIds).isEmpty

Let’s break it down with an example:

Example: 3 jobs running, each has stages [0, 1, 2] where stage 2 is WRITE

  • Job A: stages [10, 11, 12] → max = 12
  • Job B: stages [20, 21, 22] → max = 22
  • Job C: stages [30, 31, 32] → max = 32

So maxStageIdsForRunningJobs = [12, 22, 32]

Case 1: Jobs still in PROCESS stage

runningStageIds = [11, 21, 31]  (middle stages)
  diff = [12, 22, 32] - [11, 21, 31] = [12, 22, 32]  (not empty)
  → DON'T submit new job (would overload cluster)

Case 2: All jobs in WRITE stage

runningStageIds = [12, 22, 32]  (final stages)
  diff = [12, 22, 32] - [12, 22, 32] = []  (empty!)
  → SUBMIT new job (WRITE is I/O bound, has spare CPU capacity)

Trade-offs and Considerations

Advantages

BenefitDescription
Better utilizationI/O-bound WRITE stages overlap with CPU-bound READ/PROCESS
Reduced wall-clock timeWRITE time is amortized across multiple job starts
No job changes neededScheduling is external to job logic
Graceful fallbackSet jobSubmissionThreads <= 1 for sequential mode

Disadvantages

Trade-offDescription
Code complexityRequires understanding Spark internals
Polling overhead20-second intervals add some latency
Memory pressureMultiple concurrent jobs increase memory usage
Debugging difficultyInterleaved jobs can be harder to trace

When to Use This Pattern

Good fit:

  • Many independent partitions to process
  • Jobs have distinct stage boundaries (READ → PROCESS → WRITE)
  • Final stages are I/O bound with lower parallelism
  • Cluster can handle 2-3 concurrent jobs

Not needed:

  • Simple, short-running jobs
  • Jobs with uniform resource usage across stages
  • Memory-constrained environments

Conclusion

By understanding Spark’s execution model — particularly how jobs decompose into stages with different resource profiles — we can implement intelligent scheduling that keeps our cluster busy. The key insight is that not all stages are equal: I/O-bound final stages can share resources with CPU-bound early stages of new jobs.

This optimization has allowed TracksETL to scale from 2 billion to 15+ billion events while maintaining acceptable processing times. The additional code complexity is worth it for workloads at this scale.

The StatusTracker API is underutilized in the Spark ecosystem. If you’re running large batch jobs with predictable stage patterns, consider whether stage-aware scheduling could improve your throughput.

Role based views using css and javascript

In web applications, role based views are normally done using server side if..else tags etc. When developing pure javascript clients, one way of doing role based views is again using javascript conditionals. This approach suffers from the fact that role-related code is scattered throughout the document. Another simple way to achieve the same is using css classes.

Let us assume that we have three roles, ADMIN and USER and GUEST. Then we will define three css classes as follows:

.role_admin, .role_user, .role_guest{
    display:none;
}

Now consider the following html doc, that has css classes attached to elements according to the current user’s role.

<html>
<head>
<link href="./role.css" rel="stylesheet" type="text/css" />
</head>
<body>
<h1 class="role_admin">This should be visible to admin only.<h1>
<h2 class="role_user">This is for the normal logged in user.</h1>
<h3 class="role_guest">And this is for the guest.</h1>
</body>
</html>

At this stage if you load the document in a browser, you will not see anything as nothing all the roles are invisible. So lets spruce this up with a bit of javascript to set proper css class properties.

$(document).ready(function() {
    // We are assuming that jQuery and jQuery Cookie plugin are included in the doc

    var role = $.cookie('user_role'); // Should return one of 'ADMIN', 'USER' OR 'GUEST'

    // set property for relevant css class
    var cssClass = '.role_'+ role.toLowerCase();
    $("<style type='text/css'> "+ cssClass + "{ display:block } </style>").appendTo("head");
});

If a cookie role is set that represents the current user’s role then adding the above snippet of code will enable all the elements tagged with css class .role_{roleCookie} to be visible.

Even though this is a very simple implementation, we can modify it easily to take into account more complex scenarios. For example, if you have completely ordered role based visibility (access level), i.e. given role R1 and R2, we can always tell which role has more visibility, then we can extend as follows:

$(document).ready(function() {
    // We are assuming that jQuery and jQuery Cookie plugin are included in the doc

    var role = $.cookie('user_role'); // Should return one of 'ADMIN', 'USER' OR 'GUEST'

    // set property for relevant css class
    if(role == "ADMIN") {
        // make everything visible
        $("<style type='text/css'>.role_admin, .role_user, .role_guest {display:block} </style>").appendTo("head");

    } else if(app.role == "USER") {
        // make user and guest part visible, admin part will remain invisible
        $("<style type='text/css'>.role_user, .role_guest {display:block} </style>").appendTo("head");

    } else if(app.role == "GUEST") {
        // only show the guest part
        $("<style type='text/css'>.role_guest {display:block} </style>").appendTo("head");
    }
});

Caution: One thing to note here is that all the parts of the document are sent to all the users, only what is visible on the browser is role based. So, server side authorisation checks will always be there as nothing stops a suspecting user from looking into the source and firing that dreaded request.

Building a simple [Yahoo] S4 application

S4 is a distributed stream processing platform from Yahoo. It is often seen as the real-time counterpart of Hadoop. S4 being fault tolerant and horizontally scalable helps you in building very large stream processing application that can do anything from detecting earthquakes to finding that perfect bit of advertising that the visitor on your website is most likely to click.

At its core, an S4 application consists of a number of Processing Elements (PEs) that are wired together with the help of a spring configuration file that defines the PEs and the flow of events in the system. Also, events are produced by event producers that listen that sends these events to the client adapter for S4, from where, the S4 platform takes over and dispatch it to appropriate processing elements. After processing these events, PEs can choose to dispatch them to other PEs for further processing or they can choose to produce output events. Thus, arbitrarily complex behavior can be derived together by wiring a simple set of PEs.

S4 comes with a few example applications, but here is a much simpler S4WordCount application that shows how to:

  1. Keep state in a PE.
  2. Dispatch events from a PE.
  3. Process multiple events from a single PE.
  4. Write a simple java client for sending events to S4.

S4 is a distributed stream processing platform from Yahoo. It is often seen as the real-time counterpart of Hadoop. S4 being fault tolerant and horizontally scalable helps you in building very large stream processing application that can do anything from detecting earthquakes to finding that perfect bit of advertising that the visitor on your website is most likely to click.

At its core, an S4 application consists of a number of Processing Elements (PEs) that are wired together with the help of a spring configuration file that defines the PEs and the flow of events in the system. Also, events are produced by event producers that listen that sends these events to the client adapter for S4, from where, the S4 platform takes over and dispatch it to appropriate processing elements. After processing these events, PEs can choose to dispatch them to other PEs for further processing or they can choose to produce output events. Thus, arbitrarily complex behavior can be derived together by wiring a simple set of PEs.

S4 comes with a few example applications, but here is a much simpler S4WordCount application that shows how to:

  1. Keep state in a PE.
  2. Dispatch events from a PE.
  3. Process multiple events from a single PE.
  4. Write a simple java client for sending events to S4.

In S4WordCount, we will build a simple WordReceiverPE, that will receive events in the form of word and will simply print these words on stdout. It will also identify sentences in the word stream and then forward these sentences for further processing to SenteceReceiverPE. WordReceiverPE will also produce receive sentence events and print them to stdout.

First lets have a look at Word and Sentence, the event object used in our example:

package test.s4;

public class Word {
	private String string;

	public String getString() {
		return string;
	}

	public void setString(String message) {
		this.string = message;
	}

	@Override
	public String toString() {
		return "Word [string=" + string + "]";
	}
}

S4 uses keys, which is a set of some properties of the event object, for routing/dispatching events. In this case since Word is the key-less entry point into the system, and we don’t have any key for it, but for Sentence which will be processed further, we have a Sentence.sentenceId as the key. (For simplicity, all Sentence have the same sentenceId, 1)

Now let’s have a look at our first PE, i.e. WordReceiverPE:

package test.s4;

import io.s4.dispatcher.Dispatcher;
import io.s4.processor.AbstractPE;

/**
 * This class receive word and sentence events and print them to stdout
 */
public class WordReceiverPE extends AbstractPE {
	private StringBuilder builder = new StringBuilder();

	/**
	 * Dispatcher that will dispatch events on <code>Sentence *</code> stream.
	 */
	private Dispatcher dispatcher;

	public Dispatcher getDispatcher() {
		return dispatcher;
	}

	public void setDispatcher(Dispatcher dispatcher) {
		this.dispatcher = dispatcher;
	}

	/**
	 * Process word events. This prints out the received word and also builds
	 * the sentence that will be dispatched once we found the end of the
	 * sentence identified by <code>.</code>
	 * 
	 * @param word
	 *            Word received on <code>RawWords *</code> stream.
	 */
	public void processEvent(Word word) {
		System.out.println("Received: " + word);

		// keep building the sentence
		builder.append(' ').append(word.getString().trim());

		if (builder.toString().endsWith(".")) {
			System.err.print("End of sentence found");

			// dispatch a Sentence event
			dispatcher.dispatchEvent("Sentence", new Sentence(builder.toString()));

			// reset buffer.
			builder.setLength(0);
		}
	}

	/**
	 * Process sentence events. This method demonstrated that one class can
	 * recieve multiple type of events on different Streams.
	 * 
	 * @param sentence
	 *            Sentence recieved on <code>Sentence *<code> stream.
	 */
	public void processEvent(Sentence sentence) {
		System.out.println("Received Sentence(WordReceiverPE) : " + sentence);
	}

	@Override
	public void output() {
		// TODO Auto-generated method stub
	}

	@Override
	public String getId() {
		return this.getClass().getName();
	}
}

We define a StringBuilder that will be used to accumulate words to form a Sentence. The processEvent(Word) method simply prints the received word on stdout and appends the word to the builder. It then checks if the sentence is complete by checking for . at the end of the builder and if so, it creates a Sentence event(object) and dispatches it to the Sentence stream. Once dispatched the processEvent(Sentence) will receive that event and will again print the sentence to stdout.

Now let’s have a look at SenteceReceiverPE, our second PE which does nothing but print the received Sentence on stdout.

package test.s4;

import io.s4.processor.AbstractPE;

/**
 * This class receives Sentence events and print them on stdout.
 */
public class SentenceReceiverPE extends AbstractPE {
	
	/**
	 * Print received sentence event.
	 * @param sentence
	 */
	public void processEvent(Sentence sentence){
		System.out.println("Received Sentence: " + sentence);
	}

	@Override
	public void output() {
		// TODO Auto-generated method stub
	}
	
	@Override
	public String getId() {
		return this.getClass().getName();
	}

}

Finally, lets see the content of the application config file that wires all this together and forms a valid S4 application. The name of the config file should follow the naming convention of <AppName>-config.xml. In this case we will call the config file S4WordCount-conf.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">

	<bean id="wordCatcher" class="test.s4.WordReceiverPE">
	   <property name="dispatcher" ref="dispatcher"/>
		
		<property name="keys">
		  <!--  Listen for both words and sentences -->
			<list>
				<value>RawWords *</value>
				<value>Sentence *</value>
			</list>
		</property>
	</bean>
	
	   <bean id="sentenceCatcher" class="test.s4.SentenceReceiverPE">
        <property name="keys">
            <list>
                <value>Sentence *</value>
            </list>
        </property>
    </bean>

	<bean id="dispatcher" class="io.s4.dispatcher.Dispatcher"
		init-method="init">
		<property name="partitioners">
			<list>
			<!-- Partition on senteceId which is "1" for all sentences. -->
				<ref bean="sentenceIdPartitioner" />
			</list>
		</property>
		<property name="eventEmitter" ref="commLayerEmitter" />
		<property name="loggerName" value="s4" />
	</bean>

	<bean id="sentenceIdPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
		<property name="streamNames">
			<list>
				<value>Sentence</value>
			</list>
		</property>
		<property name="hashKey">
			<list>
				<value>sentenceId</value>
			</list>
		</property>
		<property name="hasher" ref="hasher" />
		<property name="debug" value="true" />
	</bean>

</beans>

This is a spring bean definition file. The first bean, wordCatcher is an object of class test.s4.WordReceiverPE and it injects a dispatcher called dispatcher into WordRecieverPE. We will look at properties of dispatcher later. keys define the stream on which our PE will be listening for event. RawWords * means that it will be receiving all events on the stream named RawWords irrespective of their keys. Similar is the intention for Sentence *.

Our second bean is sentenceCatcher which is an object of class test.s4.SentenceReceiverPE and will accept all events on stream called Sentence.

Third is the definition for the dispatcher which we injected into wordCatcher. The dispatcher needs a partitioner that partitions the events based on some key and then dispatches them to appropriate PEs. In this case we are using the DefaultPartitioner whose properties are defined later by sentenceIdPartitoner bean which says that partition the event objects on Sentence stream by sentenceId property. dispatcher uses the S4 provided commLayerEmitter to emit the events.

Running the application

To run this application on the S4:

  1. Setup S4 as documented here.
  2. Create a S4WordCount.jar from above classes.
  3. Deploy the application on S4, by creating the following directory structure:
    	/$S4_IMAGE
    		/s4-apps
    			/S4WordCount
    				S4WordCount-conf.xml
    				/lib
    					S4Wordcount.jar
  4. Start S4: $S4_IMAGE/scripts/start-s4.sh -r client-adapter &
  5. Start client adapter: $S4_IMAGE/scripts/run-client-adapter.sh -s client-adapter -g s4 -d $S4_IMAGE/s4-core/conf/default/client-stub-conf.xml &

Now S4 is ready to receive events. Following is an event sender that uses the java client library to send events to S4. It reads one word at a time from stdin and sends it to S4.

package test.s4.generator;

import io.s4.client.Driver;
import io.s4.client.Message;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;

public class TestMessageSender {
	public static void main(String[] args) {
		if (args.length < 1) {
            System.err.println("No host name specified");
            System.exit(1);
        }
        String hostName = args[0];
        
        if (args.length < 2) {
            System.err.println("No port specified");
            System.exit(1);
        }
        
        int port = -1;
        try {
            port = Integer.parseInt(args[1]);
        } catch (NumberFormatException nfe) {
            System.err.println("Bad port number specified: " + args[1]);
            System.exit(1);
        }
        
        if (args.length < 3) {
            System.err.println("No stream name specified");
            System.exit(1);
        }
        String streamName = args[2];
        
        if (args.length < 4) {
            System.err.println("No class name specified");
            System.exit(1);
        }
        String clazz = args[3];       
        
        Driver d = new Driver(hostName, port);
        Reader inputReader = null;
        BufferedReader br = null;
        try {
            if (!d.init()) {
                System.err.println("Driver initialization failed");
                System.exit(1);
            }
            
            if (!d.connect()) {
                System.err.println("Driver initialization failed");
                System.exit(1);           
            }
            
            inputReader = new InputStreamReader(System.in);
            br = new BufferedReader(inputReader);

            for  (String inputLine = null; (inputLine = br.readLine()) != null;) {
                String string = "{\"string\":\""+inputLine+"\"}";
                System.out.println("sending " + string);
				Message m = new Message(streamName, clazz, string);
                d.send(m);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        finally {
            try { d.disconnect(); } catch (Exception e) {}
            try { br.close(); } catch (Exception e) {}
            try { inputReader.close(); } catch (Exception e) {}
        }
	}
}

Run client: java TestMessageSender localhost 2334 RawWords test.s4.Word

Here is the sample output on the S4 console:


Received: Word [string=this]
Received: Word [string=is]
Received: Word [string=a]
Received: Word [string=sentence.]

Using fast path!
Value 1, partition id 0
wrapper is stream:Sentence keys:[{sentenceId = 1}:0] keyNames:null event:Sentence [string= this is a sentence.] => 0
Received Sentence(WordReceiverPE) : Sentence [string= this is a sentence.]
Received Sentence: Sentence [string= this is a sentence.]

Java Code Generators – A short rant

Java is known to be a verbose language and the situation worsens when you step into bloated enterprise java world. You need to write tons of code and configure a lot of JXXX to make your simple webapp work. Though the situation is improving in the recent years with the introduction of convention over configuration approach and also of annotation based configurations but still, if you compare the amount of code required for a functional webapp in Java then it would be at least 2X to 4X more than that of similar webapps written in other frameworks like Django or RoR.

A lot of java frameworks and IDEs tries to hide this complexity by generating code – from simple getters and setters to entire DAO layers and what not – that might give small productivity gains in the beginning but eventually every additional line of code in your project, either hand-written or generated by a state of art code generator, someone will need to maintain and evolve it, which according to some is almost 90% of the total software costs.

Thats why framework like Play feels like fresh air and it seems to be making inroads in the bloated enterprise java world.

My first Scheme program

I’ve just started experimenting with Scheme, specifically to follow Structure and Interpretation of Computer Programs. Here is a Scheme procedure that returns the sum of squares of two larger numbers out of the given three.

(define (sumLargeSquare a b c)
  (cond ((and (&lt; a b) (&lt; a c)) (+ (* b b) (* c c)))
        ((and (&lt; b a) (&lt; b c)) (+ (* a a) (* c c)))
        (else (+ (* a a) (* b b))))
)

Minifying Javascript/css without changing file references in your source

Rule 10 of Steve Souders High Performance Web Sites: Minify Javascript

The most common problem faced while implemnting this is how you handle the full and minified version and how to change there reference in referencing documents. One of the easier ways to do this is to make it part of the deployment process.

Here are the relevent steps involved.

I’m using YUI Compressor.

#!/bin/bash
 
#Execute this script after checking out the latest source from repository.
 
#Minify all javascript files
cd /path/to/javascript
for x in `ls *.js`
do
        java -jar /path/to/compressor/yuicompressor-2.4.2.jar -o ${x%%.*}-min.js --preserve-semi  $x
done
 
#Minfiy all css files
cd /path/to/css
for x in `ls *.css`
do
        java -jar /path/to/compressor/yuicompressor-2.4.2.jar -o ${x%%.*}-min.css  $x
done

Now you don’t want to replace all references to x.css or x.js in your development code with references to x-min.css and x-min.js respectively. So what you can do is rewrite all those filenames at the web server level.

For apache the following rewrite rules work fine:

#enable rewriting
RewriteEngine on
RewriteRule /(.*)\.js /$1-min.js
RewriteRule /(.*)\.css /$1-min.css

Caution: Remember to delete existing minified css/js file before running the minifying script or you will end up with file names like x-min-min-min.js and so on. One way to do this is to clear the js/css folder before checking out files from your source repository.