这篇文章主要讲解了“Spark生产作业容错能力的负面影响有哪些”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Spark生产作业容错能力的负面影响有哪些”吧!
创新互联公司是一家专注于网站建设、做网站和绵阳主机托管的网络公司,有着丰富的建站经验和案例。
在 Spark 中数据本地性通过 TaskLocality 来表示,有如下几个级别,
PROCESS_LOCAL
NODE_LOCAL
NO_PREF
RACK_LOCAL
ANY
从上到下数据本地性依次递减。
Spark 在执行前通过数据的分区信息进行计算 Task 的 Locality,Task 总是会被优先分配到它要计算的数据所在节点以尽可能地减少网络 IO。这个计算的过程通过 spark.locality.wait
默认为3s,控制这个计算的过程。
原理这里不细讲,简而言之就是重试。Spark 规定了同一个 Job 中同一个 Stage 连续失败重试的上限(spark.stage.maxConsecutiveAttempts
),默认为4,也规定了一个 Stage 中 同一个 Task 可以失败重试的次数(spark.task.maxFailures
),默认为4。当其中任何一个阈值达到上限,Spark 都会使整个 Job 失败,停止可能的“无意义”的重试。
我们首先来看一个例子,如图所示,图为 Spark Stage 页面下 Task Page 的详细视图。
第一列表示该 Task 进行了4次重试,所以这个 Task 对应的 Job 也因此失败了。
第三列表示该 Task 的数据本地性,都是 NODE_LOCAL 级别,对于一个从HDFS读取数据的任务,显然获得了最优的数据本地性
第四列表示的是 Executor ID,我们可以看到我们任务的重试被分配到ID 为5和6两个 Executor 上
第五列表示我们运行这些重试的 Task 所在的 Executor 所在的物理机地址,我们可以看到他们都被调度到了同一个
最后列表示每次重试失败的错误栈
结合硬件层面的排查,发现是 NodeManager 物理节点上挂在的 /mnt/dfs/4,出现硬件故障导致盘只读,ShuffleMapTask 在即将完成时,将index文件和data文件commit时,获取index的临时文件时候发生FileNotFoundException
。
java.io.FileNotFoundException: /mnt/dfs/4/yarn/local/usercache/da_haitao/appcache/application_1568691584183_1953115/blockmgr-1b6553f2-a564-4b31-a4a6-031f21c9c30f/0a/shuffle_96_2685_0.index.82594412-1f46-465e-a067-2c5e386a978e (No such file or directory) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.(FileOutputStream.java:213) at java.io.FileOutputStream. (FileOutputStream.java:162) at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:245) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:190) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109)
3.2 问题二:为什么该 Task 的4次重试都在同一个物理节点?
这是由于 Driver 在调度该 Task 的时候进行了数据本地性的运算,而且在
spark.locality.wait
默认为3s的时间约束内成功获得了NODE_LOCAL级别的数据本地性,故而都调度到了同一个
NodeManger
物理节点。
1. /mnt/dfs/4/yarn/local/2. usercache/da_haitao/appcache/application_1568691584183_1953115/ blockmgr-1b6553f2-a564-4b31-a4a6-031f21c9c30f/3. 0a/4. shuffle_96_2685_0.index5. .82594412-1f46-465e-a067-2c5e386a978e
spark.diskStore.subDirectories
默认为64控制.data
结尾,另一个就是这个与之对应的 .index
文件。96是 ShuffleID 表标识是哪个Shuffle 过程,2685是 MapID 对应的是 一个RDD 所以有分区中其中一个的顺序号, 而0是一个固定值,原本表示是ReduceID,Spark Sort Based Shuffle 的实现不需要依赖这个值,所以被固定为了0。通过Shuffle ID和 MapId,Shufle Write 阶段就可以生成类似shuffle_96_2685_0.index这样的文件,而Shuffle Read 阶段也可以通过两个ID 定位到这个文件。scala> math.abs("shuffle_96_2685_0.index".hashCode) % 12res0: Int = 6
scala> def randomizeInPlace[T](arr: Array[Int], rand: java.util.Random = new java.util.Random): Array[Int] = {
| for (i <- (arr.length - 1) to 1 by -1) {
| val j = rand.nextInt(i + 1)
| val tmp = arr(j)
| arr(j) = arr(i)
| arr(i) = tmp
| }
| arr
| }
randomizeInPlace: [T](arr: Array[Int], rand: java.util.Random)Array[Int]
scala> randomizeInPlace(res11)
res23: Array[Int] = Array(3, 2, 4, 1)
scala> randomizeInPlace(res11)
res24: Array[Int] = Array(2, 3, 4, 1)
scala> randomizeInPlace(res11)
res25: Array[Int] = Array(2, 1, 3, 4)
scala> randomizeInPlace(res11)
res26: Array[Int] = Array(4, 2, 1, 3)
scala> randomizeInPlace(res11)
res27: Array[Int] = Array(2, 3, 4, 1)
感谢各位的阅读,以上就是“Spark生产作业容错能力的负面影响有哪些”的内容了,经过本文的学习后,相信大家对Spark生产作业容错能力的负面影响有哪些这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!