While working on GraphFrames in pyspark, we encountered a ExecutorLostFailure
exception. Following is the pyspark script:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import os | |
import graphframes | |
import pyspark | |
import pyspark.sql.functions as F | |
import pyspark.sql.types as T | |
import re | |
from pyspark.sql.window import Window | |
def graph_coalesce(g, numPartitions): | |
return graphframes.GraphFrame( | |
g.vertices.coalesce(numPartitions), | |
g.edges.coalesce(numPartitions) | |
) | |
def load_graphframe(sqlContext, dir_name, numPartitions=None): | |
fn_vertices = os.path.join(dir_name, 'vertices.parquet') | |
fn_edges = os.path.join(dir_name, 'edges.parquet') | |
vertices = sqlContext.read.parquet(fn_vertices) | |
edges = sqlContext.read.parquet(fn_edges) | |
ret = graphframes.GraphFrame(vertices, edges) | |
if numPartitions is not None: | |
ret = graph_coalesce(ret, numPartitions) | |
return ret | |
def getOrCreateSparkContext(conf=None): | |
# Based on | |
# <a href="https://href.li/?http://www.eecs.berkeley.edu/~jegonzal/pyspark/_modules/pyspark/context.html" rel="nofollow noreferrer">http://www.eecs.berkeley.edu/~jegonzal/pyspark/_modules/pyspark/context.html</a> | |
# pyspark version that we currently use (1.5) doesn't provide this method. | |
# Thus, implementing it here. | |
# Note: If we use `with pyspark.SparkContext._lock:`, as in the linked code, | |
# the program freezes infinitely. Right now, we don't create threads within the | |
# main script. Thus, this code seems to be pretty safe. In the future, we will | |
# have to deal with the locking issue | |
if pyspark.SparkContext._active_spark_context is None: | |
pyspark.SparkContext(conf=conf or pyspark.SparkConf()) | |
return pyspark.SparkContext._active_spark_context | |
sc = getOrCreateSparkContext() | |
sqlContext = pyspark.HiveContext(sc) | |
path_input_graph = '/user/anandnalya/network' | |
grph = load_graphframe(sqlContext, path_input_graph, 128) | |
vertices = grph.vertices.withColumn('cost', F.pow(F.col('pagerank'), –1.0)) | |
edges = grph.edges.withColumn('cost', F.pow(F.col('count'), –1.0)) | |
grph = graphframes.GraphFrame(vertices, edges) | |
path_search_query = '''(u0)-[likes_post00]->(p0); | |
(a0)-[is_author0]->(p0); | |
(u1)-[likes_post10]->(p0); | |
(u1)-[likes_post11]->(p1); | |
(a1)-[is_author1]->(p1) | |
''' | |
path_search_filter_statement = """u0.id IN ('1,2,3,…') AND | |
is_author0.edge_type = 'IS_AUTHOR' AND | |
is_author1.edge_type = 'IS_AUTHOR' AND | |
likes_post00.edge_type = 'LIKES_POST' AND | |
likes_post10.edge_type = 'LIKES_POST' AND | |
likes_post11.edge_type = 'LIKES_POST' AND | |
a0.node_type = 'USER' AND | |
a1.node_type = 'USER' AND | |
u0.node_type = 'USER' AND | |
u1.node_type = 'USER' AND | |
p0.node_type = 'POST' AND | |
p1.node_type = 'POST' AND | |
a0.id != u0.id AND | |
a0.id != u1.id AND | |
a1.id != u0.id AND | |
a1.id != u1.id AND | |
a0.id != a1.id AND | |
p0.id != p1.id AND | |
a0.id != 'USER__' AND | |
a1.id != 'USER__'""" | |
path_search = grph.find( | |
path_search_query | |
).filter( | |
path_search_filter_statement | |
) | |
My first instinct was that it is a resource problem and disabling dynamic allocation by passing --num-executors=
should solve the issue but the job was already passing these parameters.
--driver-memory 2g --executor-memory 6g --executor-cores 6 --num-executors 40
Then I tried looking into application logs. Since some of the executors were lost and there was no way to get their logs from spark history server, I had to use yarn logs -applicationId > myapp.log
to get the logs. One benefit of this is that we get the logs for all the containers for the applications as a single file, making it easier to grep through. Apart from the fact that some of the executors are being shut down for some reason, there were no helpful error messages in the logs.
My next theory was that the containers were being killed because of OOM errors. Since the input data was around 250MB, I tried narrowing down transformations in the job to identify what might be causing this. Finally, it was the UDF that was the culprit. Removing call to path_cost_function
allowed the job to complete successfully.
Next, I tried optimizing path_cost_function implementation by passing the float values directly instead of the structs:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _path_cost_user(likes_post_00_cost, p0_cost, likes_post_10_cost, u1_cost, likes_post11_cost, p1_cost): | |
ret = likes_post_00_cost + p0_cost + likes_post_10_cost + u1_cost + likes_post11_cost + p1_cost | |
return ret | |
path_cost_function = F.udf(_path_cost_user, T.FloatType()) | |
rec_paths = path_search.select( | |
F.col('u0.id').alias('user_id'), | |
F.col('p1.id').alias('post_recommended'), | |
F.split(F.col('p1.id'), '[_-]')[1].alias('blog_recommended'), | |
F.col('a1.id').alias('author_recommended'), | |
path_cost_function('likes_post00.cost', 'p0.cost', 'likes_post10.cost', 'u1.cost', 'likes_post11.cost','p1.cost').alias'path_cost') | |
) |
This resulted in a bit better error messages with the job now failing with:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Job aborted due to stage failure: Task 57 in stage 19.0 failed 4 times, most recent failure: Lost task 57.3 in stage 19.0 (TID 1545, node-A16-R14-33.hadoop.dfw.wordpress.com): java.io.IOException: Unable to acquire 16777216 bytes of memory | |
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351) | |
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:138) | |
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106) | |
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:68) | |
at org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:146) | |
at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169) | |
at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169) | |
at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.prepare(MapPartitionsWithPreparationRDD.scala:50) | |
at org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:83) | |
at org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:82) | |
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) | |
at scala.collection.TraversableLike$$anonfun$collect$1.apply(TraversableLike.scala:278) | |
at scala.collection.immutable.List.foreach(List.scala:318) | |
at scala.collection.TraversableLike$class.collect(TraversableLike.scala:278) | |
at scala.collection.AbstractTraversable.collect(Traversable.scala:105) | |
at org.apache.spark.rdd.ZippedPartitionsBaseRDD.tryPrepareParents(ZippedPartitionsRDD.scala:82) | |
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:97) | |
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) | |
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) | |
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) | |
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) | |
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) | |
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) | |
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) | |
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) | |
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) | |
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) | |
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) | |
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) | |
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) | |
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) | |
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:249) | |
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) | |
at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208) |
Since this seems to be originating from tungsten, my next step was to disable tungsten for the job with
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
conf = pyspark.SparkConf() | |
conf.set("spark.sql.tungsten.enabled", "false") | |
sc = getOrCreateSparkContext(conf) |
This resulted in the job being completed successfully but a bit slowly then it would have with tungsten enabled. SPARK-10474 which was fixed in Spark 1.5.1 seems relevant here and tuning spark.shuffle.memoryFraction
and spark.shuffle.safetyFraction
might help.
Interestingly, same job when implemented in Scala runs fine with tungsten. Once of the reason for this could be a lack of memory overhead arising due to passing data between JVM and python process. Following is the scala version.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.automattic.network | |
import org.apache.spark | |
import org.apache.spark.{SparkConf, SparkContext} | |
import org.apache.spark.sql._ | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.hive.HiveContext | |
import org.graphframes._ | |
object Main extends App { | |
def graph_coalesce(g: GraphFrame, numPartitions: Int) = GraphFrame(g.vertices.coalesce(numPartitions), g.edges.coalesce(numPartitions)) | |
def load_graphframe(sqlContext: SQLContext, dir_name: String, numPartitions: Option[Int]) = { | |
val fn_vertices = dir_name + "/vertices.parquet" | |
val fn_edges = dir_name + "/edges.parquet" | |
val vertices = sqlContext.read.parquet(fn_vertices) | |
val edges = sqlContext.read.parquet(fn_edges) | |
val ret = GraphFrame(vertices, edges) | |
if (numPartitions.isDefined) | |
graph_coalesce(ret, numPartitions.get) | |
else ret | |
} | |
// Setup Spark | |
val conf = new SparkConf | |
conf.setAppName("anand network test") | |
val sc = new SparkContext(conf) | |
val sqlContext = new HiveContext( sc ) | |
val path_input_graph = "/user/anandnalya/network" | |
var grph = load_graphframe(sqlContext, path_input_graph, Option(128)) | |
val vertices = grph.vertices.withColumn("cost", pow("pagerank", –1.0)) | |
val edges = grph.edges.withColumn("cost", pow("count", –1.0)) | |
grph = GraphFrame(vertices, edges) | |
val path_search_query = """(u0)-[likes_post00]->(p0); | |
(a0)-[is_author0]->(p0); | |
(u1)-[likes_post10]->(p0); | |
(u1)-[likes_post11]->(p1); | |
(a1)-[is_author1]->(p1) | |
""" | |
val path_search_filter_statement = """u0.id IN ('1,2,3,…') AND | |
is_author0.edge_type = 'IS_AUTHOR' AND | |
is_author1.edge_type = 'IS_AUTHOR' AND | |
likes_post00.edge_type = 'LIKES_POST' AND | |
likes_post10.edge_type = 'LIKES_POST' AND | |
likes_post11.edge_type = 'LIKES_POST' AND | |
a0.node_type = 'USER' AND | |
a1.node_type = 'USER' AND | |
u0.node_type = 'USER' AND | |
u1.node_type = 'USER' AND | |
p0.node_type = 'POST' AND | |
p1.node_type = 'POST' AND | |
a0.id != u0.id AND | |
a0.id != u1.id AND | |
a1.id != u0.id AND | |
a1.id != u1.id AND | |
a0.id != a1.id AND | |
p0.id != p1.id AND | |
a0.id != 'USER__' AND | |
a1.id != 'USER__'""" | |
val path_search = grph.find( | |
path_search_query | |
).filter( | |
path_search_filter_statement | |
) | |
def _path_cost_user(likes_post_00: Double, p0: Double, likes_post_10: Double, u1: Double, likes_post11: Double, p1: Double) = | |
likes_post_00 + p0 + likes_post_10 + u1 + likes_post11 + p1 | |
val path_cost_function = spark.sql.functions.udf(_path_cost_user(_:Double,_:Double,_:Double,_:Double,_:Double,_:Double)) | |
val rec_paths = path_search.select( | |
path_search("u0.id").alias("user_id"), | |
path_search("p1.id").alias("post_recommended"), | |
split(path_search("p1.id"), "[_-]")(1).alias("blog_recommended"), | |
path_search("a1.id").alias("author_recommended"), | |
path_cost_function( | |
path_search("likes_post00.cost"), | |
path_search("p0.cost"), | |
path_search("likes_post10.cost"), | |
path_search("u1.cost"), | |
path_search("likes_post11.cost"), | |
path_search("p1.cost") | |
).alias("path_cost") | |
).cache() | |
println(rec_paths.count()) | |
rec_paths.show() | |
rec_paths.write.parquet("/user/anandnalya/network_out") | |
} |