Dong Guo's Blog

Spark使用经验分享

| Comments

Spark是一个基于内存的分布式计算engine,最近1-2年在开源社区(github)和工业界非常火,国内的一些公司也搭建自己的spark集群。典型的应用场景是大数据上的机器学习模型的训练以及各种数据分析。下面是我理解的spark的优势:

  1. Spark使得分布式编程更简单

    Spark将实际分布在众多Nodes上的数据抽象成RDD(resilient distributed dataset),使得我们可以像本地数据一样进行处理。同时,Spark提供了相比MapReduce更丰富的API,相比MapReduce编程更加简单。

  2. Spark通过充分利用内存提高计算效率

    随着数据量越来越大,内存越来越便宜,使用较多的内存让(某些类型的)计算效率提升10至100倍,对很多公司来说是比较划算的。Spark和Facebook的Presto都基于这样的思想。在Spark中,你可以指定将那些在后续需要被多次使用的RDD缓存在内存中,减少了IO的开销,可以显著提高如机器学习模型训练这种需要迭代计算的应用的效率。

  3. Spark提供了一整套的数据分析和计算解决方案,降低了学习和维护成本

    • Spark本身支持做batch的计算,比如每天机器学习模型的训练,各种数据的处;
    • Spark Streaming可以用来做realtime计算和数据处理,Spark Streaming的API和Spark的比较类似,其实背后的实现也是把一段段的realtime数据用batch的方式去处理;
    • MLlib实现了常用的机器学习和推荐算法,可以直接用或者作为baseline;
    • Spark SQL使得可以通过SQL来对Hive表,Json文件等数据源进行查询,查询会被转变为一个Spark job;
    • 还有GraphX, 我没有用过,其用于一些图相关的计算;

  4. Spark可以和MapReduce通过YARN共享机器资源

    所有的存储(HDFS),计算,内存资源都可以共享

个人使用Spark的一些经验总结

  1. 理解spark application的运行原理, 可以避免犯很多错误 Driver中涉及到RDD操作的代码(比如RDD.map{}中的代码)需要Serialize后由Driver所在的Node传输给Executors所在的Nodes,并做Deserialize后在executors上执行,RDD操作中涉及到的数据结构,比如map中用到了一个user_id –> user_profile的hashtable,也需要由Driver所在的Node传输给Executors所在的Nodes。理解了这点就可以更好理解下面2点分享

  2. 保证Rdd操作中的代码都是可序列化的,否则会有NonSerializableException

    一种常见的错误是,在rdd1.map{objectOfClassA.fun}中,对象objectOfClassA所属的类ClassA需要是可序列化的,这也以为ClassA中用到的所有成员属性都是可序列化的。如果classA使用的某个成员属性无法序列化(或者标识为Serializable),scala中可以通过@transient关键字标明序列化ClassA时不序列化该成员变量。推荐stakoverflow的2个讨论:link1 link2

  3. 正确地使用广播变量(broadcast variables)

    如果我们有一份const数据,需要在executors上用到,一个典型的例子是Driver从数据库中load了一份数据dbData,在很多RDD操作中都引用了dbData,这样的话,每次RDD操作,driver node都需要将dbData分发到各个executors node一遍(分享1中已经介绍了背景),这非常的低效,特别是dbData比较大且RDD操作次数较多时。Spark的广播变量使得Driver可以提前只给各个executors node传一遍(spark内部具体的实现可能是driver传给某几个executors,这几个executors再传给其余executors)。使用广播变量有一个我犯过的错误如下:

     val brDbData = sparkContext.broadcast(dbData) //broadcast dbDataA, and name it as brDbData
     val dbDataB = brDbData.value //no longer broadcast variable
     oneRDD.map(x=>{dbDataB.getOrElse(key, -1); …})
    第一行将dbData已经广播出去且命名为brDbData,一定要在RDD操作中直接使用该广播变量,如果提前提取出值,第三行的RDD操作还需要将dbData传送一遍。正确的代码如下
     val brDbData = sparkContext.broadcast(dbData) //broadcast dbDataA, and name it as brDbData
     oneRDD.map(x=>{brDbData.value.getOrElse(key, -1); …})

  4. 使用yarn-client或者yarn-cluster模式运行spark应用之前,在IDE中配置spark local模式调试以及测试好代码

    spark的yanr-client或者yarn-cluster模式做一次测试比较耗时,因为涉及到代码打包以及上传。在IDE(推荐IntelliJ)中配置local模型用于debug和测试,将显著提升开发和测试效率;

    在VM option中配置:”-Dspark.master=local -Dspark.app.name=Test -Xmx2G” (also increase maximal memory for Heap)

  5. 充分利用spark的并行性

    理想的情况是整个代码的逻辑是对一个或几个RDD做处理,这时候spark的并行性往往是充分利用的。有时候代码逻辑会更复杂,比如你需要统计一年中每一天的一些数值,由于代码逻辑比较复杂,一种简单的“偷懒”方式是用一个for循环,在for循环内部做RDD的操作,这种情况是要努力避免的,务必思考将不同date的统计并行化。我写过的两个应用中都遇到了这种情况:优化之后速度提升非常明显。

  6. 使用cache()操作

    cache RDD需要考虑自己有多少内存,对于后续不需要多次使用的RDD不要cache,如果内存有限却又指定要cache,大量的时间将被花在memory和disk的in-out上

  7. 为spark-submit选择合适的参数

    spark-submit用于提交spark job,其可配置为job申请多少资源,包括Driver的内存和cpu,executor的个数,每个executor的内存,cpu和线程数。如果使用yarn做资源管理,只有内存是硬性占有的,一个job过多地申请内存,将会有资源浪费,可能会使别的job因为申请不到足够的内存无法跑。可以用JMX(Java Management Extension)来监控你的spark job到底消耗多少内存,可以指导你申请合适的内存大小。

  8. Spark可以访问众多的数据源:比如HDFS, HBase, Cassandra或者Hive表(Hive on Spark), 直接得到一个RDD用作后续处理

Comments