Debugging a Spark issue

While working on GraphFrames in pyspark, we encountered a ExecutorLostFailure exception. Following is the pyspark script:

from __future__ import print_function
import os
import graphframes
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as T
import re
from pyspark.sql.window import Window
def graph_coalesce(g, numPartitions):
return graphframes.GraphFrame(
g.vertices.coalesce(numPartitions),
g.edges.coalesce(numPartitions)
)
def load_graphframe(sqlContext, dir_name, numPartitions=None):
fn_vertices = os.path.join(dir_name, 'vertices.parquet')
fn_edges = os.path.join(dir_name, 'edges.parquet')
vertices = sqlContext.read.parquet(fn_vertices)
edges = sqlContext.read.parquet(fn_edges)
ret = graphframes.GraphFrame(vertices, edges)
if numPartitions is not None:
ret = graph_coalesce(ret, numPartitions)
return ret
def getOrCreateSparkContext(conf=None):
# Based on
# <a href="https://href.li/?http://www.eecs.berkeley.edu/~jegonzal/pyspark/_modules/pyspark/context.html" rel="nofollow noreferrer">http://www.eecs.berkeley.edu/~jegonzal/pyspark/_modules/pyspark/context.html</a>
# pyspark version that we currently use (1.5) doesn't provide this method.
# Thus, implementing it here.
# Note: If we use `with pyspark.SparkContext._lock:`, as in the linked code,
# the program freezes infinitely. Right now, we don't create threads within the
# main script. Thus, this code seems to be pretty safe. In the future, we will
# have to deal with the locking issue
if pyspark.SparkContext._active_spark_context is None:
pyspark.SparkContext(conf=conf or pyspark.SparkConf())
return pyspark.SparkContext._active_spark_context
sc = getOrCreateSparkContext()
sqlContext = pyspark.HiveContext(sc)
path_input_graph = '/user/anandnalya/network'
grph = load_graphframe(sqlContext, path_input_graph, 128)
vertices = grph.vertices.withColumn('cost', F.pow(F.col('pagerank'), 1.0))
edges = grph.edges.withColumn('cost', F.pow(F.col('count'), 1.0))
grph = graphframes.GraphFrame(vertices, edges)
path_search_query = '''(u0)-[likes_post00]->(p0);
(a0)-[is_author0]->(p0);
(u1)-[likes_post10]->(p0);
(u1)-[likes_post11]->(p1);
(a1)-[is_author1]->(p1)
'''
path_search_filter_statement = """u0.id IN ('1,2,3,…') AND
is_author0.edge_type = 'IS_AUTHOR' AND
is_author1.edge_type = 'IS_AUTHOR' AND
likes_post00.edge_type = 'LIKES_POST' AND
likes_post10.edge_type = 'LIKES_POST' AND
likes_post11.edge_type = 'LIKES_POST' AND
a0.node_type = 'USER' AND
a1.node_type = 'USER' AND
u0.node_type = 'USER' AND
u1.node_type = 'USER' AND
p0.node_type = 'POST' AND
p1.node_type = 'POST' AND
a0.id != u0.id AND
a0.id != u1.id AND
a1.id != u0.id AND
a1.id != u1.id AND
a0.id != a1.id AND
p0.id != p1.id AND
a0.id != 'USER__' AND
a1.id != 'USER__'"""
path_search = grph.find(
path_search_query
).filter(
path_search_filter_statement
)

view raw
network.py
hosted with ❤ by GitHub

My first instinct was that it is a resource problem and disabling dynamic allocation by passing --num-executors= should solve the issue but the job was already passing these parameters.

--driver-memory 2g --executor-memory 6g --executor-cores 6 --num-executors 40

Then I tried looking into application logs. Since some of the executors were lost and there was no way to get their logs from spark history server, I had to use yarn logs -applicationId > myapp.log to get the logs. One benefit of this is that we get the logs for all the containers for the applications as a single file, making it easier to grep through. Apart from the fact that some of the executors are being shut down for some reason, there were no helpful error messages in the logs.

My next theory was that the containers were being killed because of OOM errors. Since the input data was around 250MB, I tried narrowing down transformations in the job to identify what might be causing this. Finally, it was the UDF that was the culprit. Removing call to path_cost_function allowed the job to complete successfully.

Next, I tried optimizing path_cost_function implementation by passing the float values directly instead of the structs:

def _path_cost_user(likes_post_00_cost, p0_cost, likes_post_10_cost, u1_cost, likes_post11_cost, p1_cost):
ret = likes_post_00_cost + p0_cost + likes_post_10_cost + u1_cost + likes_post11_cost + p1_cost
return ret
path_cost_function = F.udf(_path_cost_user, T.FloatType())
rec_paths = path_search.select(
F.col('u0.id').alias('user_id'),
F.col('p1.id').alias('post_recommended'),
F.split(F.col('p1.id'), '[_-]')[1].alias('blog_recommended'),
F.col('a1.id').alias('author_recommended'),
path_cost_function('likes_post00.cost', 'p0.cost', 'likes_post10.cost', 'u1.cost', 'likes_post11.cost','p1.cost').alias'path_cost')
)

view raw
udf.py
hosted with ❤ by GitHub

This resulted in a bit better error messages with the job now failing with:

Job aborted due to stage failure: Task 57 in stage 19.0 failed 4 times, most recent failure: Lost task 57.3 in stage 19.0 (TID 1545, node-A16-R14-33.hadoop.dfw.wordpress.com): java.io.IOException: Unable to acquire 16777216 bytes of memory
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:138)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:68)
at org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:146)
at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.prepare(MapPartitionsWithPreparationRDD.scala:50)
at org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:83)
at org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:82)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at scala.collection.TraversableLike$$anonfun$collect$1.apply(TraversableLike.scala:278)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.collect(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.collect(Traversable.scala:105)
at org.apache.spark.rdd.ZippedPartitionsBaseRDD.tryPrepareParents(ZippedPartitionsRDD.scala:82)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:249)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208)

view raw
stacktrace
hosted with ❤ by GitHub

Since this seems to be originating from tungsten, my next step was to disable tungsten for the job with

conf = pyspark.SparkConf()
conf.set("spark.sql.tungsten.enabled", "false")
sc = getOrCreateSparkContext(conf)

view raw
conf.py
hosted with ❤ by GitHub

This resulted in the job being completed successfully but a bit slowly then it would have with tungsten enabled. SPARK-10474 which was fixed in Spark 1.5.1 seems relevant here and tuning spark.shuffle.memoryFraction and spark.shuffle.safetyFraction might help.

Interestingly, same job when implemented in Scala runs fine with tungsten. Once of the reason for this could be a lack of memory overhead arising due to passing data between JVM and python process. Following is the scala version.

package com.automattic.network
import org.apache.spark
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveContext
import org.graphframes._
object Main extends App {
def graph_coalesce(g: GraphFrame, numPartitions: Int) = GraphFrame(g.vertices.coalesce(numPartitions), g.edges.coalesce(numPartitions))
def load_graphframe(sqlContext: SQLContext, dir_name: String, numPartitions: Option[Int]) = {
val fn_vertices = dir_name + "/vertices.parquet"
val fn_edges = dir_name + "/edges.parquet"
val vertices = sqlContext.read.parquet(fn_vertices)
val edges = sqlContext.read.parquet(fn_edges)
val ret = GraphFrame(vertices, edges)
if (numPartitions.isDefined)
graph_coalesce(ret, numPartitions.get)
else ret
}
// Setup Spark
val conf = new SparkConf
conf.setAppName("anand network test")
val sc = new SparkContext(conf)
val sqlContext = new HiveContext( sc )
val path_input_graph = "/user/anandnalya/network"
var grph = load_graphframe(sqlContext, path_input_graph, Option(128))
val vertices = grph.vertices.withColumn("cost", pow("pagerank", 1.0))
val edges = grph.edges.withColumn("cost", pow("count", 1.0))
grph = GraphFrame(vertices, edges)
val path_search_query = """(u0)-[likes_post00]->(p0);
(a0)-[is_author0]->(p0);
(u1)-[likes_post10]->(p0);
(u1)-[likes_post11]->(p1);
(a1)-[is_author1]->(p1)
"""
val path_search_filter_statement = """u0.id IN ('1,2,3,…') AND
is_author0.edge_type = 'IS_AUTHOR' AND
is_author1.edge_type = 'IS_AUTHOR' AND
likes_post00.edge_type = 'LIKES_POST' AND
likes_post10.edge_type = 'LIKES_POST' AND
likes_post11.edge_type = 'LIKES_POST' AND
a0.node_type = 'USER' AND
a1.node_type = 'USER' AND
u0.node_type = 'USER' AND
u1.node_type = 'USER' AND
p0.node_type = 'POST' AND
p1.node_type = 'POST' AND
a0.id != u0.id AND
a0.id != u1.id AND
a1.id != u0.id AND
a1.id != u1.id AND
a0.id != a1.id AND
p0.id != p1.id AND
a0.id != 'USER__' AND
a1.id != 'USER__'"""
val path_search = grph.find(
path_search_query
).filter(
path_search_filter_statement
)
def _path_cost_user(likes_post_00: Double, p0: Double, likes_post_10: Double, u1: Double, likes_post11: Double, p1: Double) =
likes_post_00 + p0 + likes_post_10 + u1 + likes_post11 + p1
val path_cost_function = spark.sql.functions.udf(_path_cost_user(_:Double,_:Double,_:Double,_:Double,_:Double,_:Double))
val rec_paths = path_search.select(
path_search("u0.id").alias("user_id"),
path_search("p1.id").alias("post_recommended"),
split(path_search("p1.id"), "[_-]")(1).alias("blog_recommended"),
path_search("a1.id").alias("author_recommended"),
path_cost_function(
path_search("likes_post00.cost"),
path_search("p0.cost"),
path_search("likes_post10.cost"),
path_search("u1.cost"),
path_search("likes_post11.cost"),
path_search("p1.cost")
).alias("path_cost")
).cache()
println(rec_paths.count())
rec_paths.show()
rec_paths.write.parquet("/user/anandnalya/network_out")
}

view raw
network.scala
hosted with ❤ by GitHub

Leave a Reply

Your email address will not be published. Required fields are marked *