Spark集群使用

什么是Spark?Spark和Hadoop区别?

Spark一般指Apache Spark是专为大规模数据处理而设计的快速通用内存并行计算框架。
Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
根据Hadoop MapReduce的工程流程,可以分析出Hadoop MapReduce的一些缺点。
  1. Hadoop MapReduce的表达能力有限
    1. 所有计算都需要转化成Map和Reduce两个操作,不能适用于所有场景,对于复杂的数据处理过程难以描述。
  1. 磁盘I/O开销大
    1. Hadoop MapReduce要求每个步骤间的数据序列化到磁盘,所有I/O成本很高,导致交互分析和迭代算法开销很大,而几乎所有的最优化和机器学习都是迭代的。所以,Hadoop MapReduce不适合于交互分析和机器学习。
  1. 计算 延迟高
    1. 如果想要完成比较复杂的工作,就必须将一系列的MapReduce作业串联起来然后顺序执行这些作业。每一个作业都是高时延的,而且只有在前一个作业完成之后下一个作业才能开始起动。因此,Hadoop MapReduce不能胜任比较复杂的、多阶段的计算服务。
Spark是借鉴了Hadoop MapReduce技术发展而来的,继承了其分布式并行计算的优点并改进了MapReduce明显的缺陷。
Spark使用Scala语言进行实现,它是一种面向对象的函数式编程语言,能够像操作本地集合对象一样操作分布式数据集。它具有运行速度快、易用性好、通用性强和随处运行等特点,具体优势如下。
  1. Spark提供了内存计算,把中间结果放到内存中,带来了更高的迭代运算效率。通过支持有向无环图(DAG)的分布式并行计算的编程框架,Spark减少了迭代过程中数据需要谢如磁盘的要求,提高了处理效率。
  1. Spark为我们提供了一个全面、统一的框架,用于管理各种有着不同性质(文本数据、图表数据等)的数据集和数据源(批量数据或实时的数据流)的大小数据处理的需求。
    1. Spark使用函数式编程范式扩展了MapReduce模型以支持更多的计算类型,可以涵盖广泛的工作流,这些工作流之前被实现为Hadoop之上的特殊系统。
      Spark使用内存缓存来提升性能,因此进行交互式分析也足够快速,缓存同时提升了迭代算法的性能,这使得Spark非常适合数据理论任务,特别是机器学习。
  1. Spark比Hadoop更加通用。Hadoop只提供了Map和Reduce两种处理操作,而Spark提供的数据操作类型更加丰富,从而可以支持更多类型的应用。
    1. Spark的计算模式也属于MapReduce类型,但提供的操作不仅包括Map和Reduce,还提供了包括Map、Filter、FlatMap、Sample、GroupByKey、ReduceByKey、Union、Join、Cogroup、MapValues、Sort、PartionBy等多种转换操作,以及Count、Collect、Reduce、Lookup、Save等行为操作。
  1. Spark基于DAG的任务调度执行机制比Hadoop MapReduce的迭代执行机制更优越。
    1. Spark 各个处理结点之间的通信模型不再像Hadoop一样只有shuffle一种模式,程序开发者可以使用DAG开发复杂的多步数据管道,控制中间结果的存储、分区等。
      图 1 - Hadoop与Spark执行流程对比
      图 1 - Hadoop与Spark执行流程对比
      从中可以看出,Hadoop不适合做迭代计算,因为每次迭代都需要从磁盘中读入数据,向磁盘写入中间结果,而且每个任务都需要从磁盘中读入数据,处理的结果也要写入磁盘,磁盘I/O开销很大。而Spark将数据载入内存后,后面的迭代都可以直接使用内存中的中间结果做计算,从而避免从磁盘中频繁的读取数据。
      对于多维度随机查询也是一样。在对HDFS同一批数据做成百上千维度查询的时候,Hadoop每做一个独立的查询,都要从磁盘中读取这个数据,而Spark只需要从磁盘读取一次后,就可以针对保留在内存中的中间结果进行反复查询。
      Spark在2014年打破了Hadoop保持的基础排序(SortBenchmark)记录,使用206个结点在23分钟完成了100TB数据的排序,而Hadoop则是使用了2000个结点在72分钟才完成相同的数据的排序。也就是说,Spark只使用了百分之十的计算资源,就获得了Hadoop 3 倍的速度。
      尽管与Hadoop相比,Spark有较大的优势,但是并不能取代Hadoop。
      因为Spark是基于内存的进行数据处理的 ,所有不适合于数据量特别大、对实时性要求不高的场合。另外,Hadoop可以使用廉价的通用服务器来搭建集群,而Spark对硬件要求比较高,特别是对内存和CPU有更高的要求。

      Spark的适用场景

      总而言之,大数据处理场景有以下几个类型。
    2. 复杂的批量处理
      1. 偏重点是处理海量数据的能力,对处理速度可忍受,通常的时间可以能是在数十分钟到数小时。
    3. 基于历史数据的交互式查询
      1. 通常的时间在数十秒到数十分钟之间
    4. 基于实时数据流的数据处理
      1. 通常在数百毫秒到数秒之间。
      目前对以上三种场景需求的都有比较成熟的处理框架。
      • 用Hadoop的MapReduce技术来进行批量海量数据处理。
      • 用Impala进行交互式查询。
      • 用Storm分布式处理框架处理实时流式数据。
      以上三者都是比较独立的,所以维护成本比较高,而Spark能够一站式满足以上需求。
      通过以上分析,可以总结Spark适应的场景有以下几种。
    5. Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大;数据量小但是计算密集度较大的场合,受益就相对较小。
    6. Spark适用于数据量不是特别大,但是要求实时统计分析的场景。
    7. 由于RDD的特性,Spark不适用于那种异步细粒度更新状态的应用,例如,web服务的存储,或增量的web爬虫和索引,也就是不适合增量修改的应用模型。
    8. SPark RDD是什么?

      Spark的核心是建立在统一的抽象弹性分布式数据集(Resilient Distribution Datasets,RDD)之上的,这使得Spark的各个组件可以无缝的进行集成,能够在同一个应用程序中完成大数据处理。

      RDD的基本概念

      RDD是Spark提供的最重要的抽象概念,它是一种有容错机制的特殊数据集合,可以分布在集群的结点上,以函数式操作集合的方式进行各种并行操作。
      通俗点说,可以将RDD理解为一个分布式对象集合,本质上是一个只读的分区记录集合。每个RDD可以分成多个分区,每个分区就是一个数据集片段。一个RDD的不同分区可以保存到集群中的不同结点上,从而可以在集群中的不同结点上进行并行计算。
      下图展示了RDD的分区及分区与工作结点(worker Node)的分布关系
      notion image
      RDD具有容错机制,并且只能读取不能修改,可以执行确定的转换操作创建新的RDD。具体来讲,RDD具有一下几个属性:
      • 只读: 不能修改,只能通过转换操作生成新的RDD。
      • 分布式: 可以分布在多台机器上进行并行处理。
      • 弹性: 计算过程中内存不够时它会和磁盘进行数据交换。
      • 基于内存: 可以全部或部分缓存在内存中,在多次计算间重用。
      RDD实质上是一种更为通用的迭代并行计算框架,用户可以显示控制计算的中间结果,然后将其自由运用于之后的计算。
      在大数据实际应用开发中存在许多迭代算法,如机器学习、图算法等,和交互式数据挖掘工具。这些应用场景的共同之处是在不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作用下一个阶段的输入。
      RDD正是为了满足这种需求而设计的。虽然MapReduce具有自动容错、负载平衡和可扩展性的优点,但是其最大的缺点是采用非循环式的数据流模型,使得在迭代计算时要进行大量的磁盘I/O 和数据序列化的开销。

      RDD的基本操作

      RDD的操作分为转化(transformation)操作和行动(Action)操作。转化操作就是从一个RDD产生一个新的RDD,而行动操作就是进行实际的计算。
      RDD的操作是惰性的,当RDD执行转化操作的时候,实际计算并没有被执行,只有当RDD执行行动操作时才会触发计算任务提交,从而执行相应的计算操作。
    9. 构建操作
      1. Spark里的计算都是通过操作RDD来完成的,学习RDD的第一个问题就是如何构建RDD,构建RDD的方式从数据来源角度分为以下两类。
        • 从内存里直接读取数据
        • 从文件系统里读取数据,文件系统的种类很多,常见的就是HDFS及本地文件系统。
        第一类方式是从内存里构造RDD,需要使用makeRDD的方法,代码如下所示。
        val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
        这个语句创建了一个由“1,2,3,4,5,6”元素组成的RDD。
        第二类方式就是通过文件系统构造RDD,代码如下:
        val rdd:RDD[String] == sc.textFile("file://D:/sparkdata.txt",1)
        这里的例子使用的是本地文件系统,所以文件路径协议前缀是 file://
    10. 转换操作
      1. RDD的转换操作是返回新的RDD的操作。转换出来的RDD是惰性求值的,只有在行动操作中用到
        许多转换操作都是只针对各个元素的,也就是说,这些转换操作每次只会操作RDD中的一个元素,不过并不是所有的转换操作都是这样的。表1 描述了常用的RDD转换操作。
        表 1 RDD转换操作(rdd1={1, 2, 3, 3},rdd2={3,4,5})
        函数名
        作用
        示例
        结果
        map()
        将函数应用于 RDD 的每个元素,返回值是新的 RDD
        rdd1.map(x=>x+l)
        {2,3,4,4}
        flatMap()
        将函数应用于 RDD 的每个元素,将元素数据进行拆分,变成迭代器,返回值是新的 RDD
        rdd1.flatMap(x=>x.to(3))
        {1,2,3,2,3,3,3}
        filter()
        函数会过滤掉不符合条件的元素,返回值是新的 RDD
        rdd1.filter(x=>x!=1)
        {2,3,3}
        distinct()
        将 RDD 里的元素进行去重操作
        rdd1.distinct()
        (1,2,3)
        union()
        生成包含两个 RDD 所有元素的新的 RDD
        rdd1.union(rdd2)
        {1,2,3,3,3,4,5}
        intersection()
        求出两个 RDD 的共同元素
        rdd1.intersection(rdd2)
        {3}
        subtract()
        将原 RDD 里和参数 RDD 里相同的元素去掉
        rdd1.subtract(rdd2)
        {1,2}
        cartesian()
        求两个 RDD 的笛卡儿积
        rdd1.cartesian(rdd2)
        {(1,3),(1,4)......(3,5)}
    11. 行动操作
      1. 行动操作用于执行计算并按指定的方式输出结果。行动操作接受RDD, 但是返回非RDD,即输出一个值或结果。在RDD执行过程中,真正的计算发生在行动操作。下表描述了常用的RDD行动操作。
        函数名
        作用
        示例
        结果
        collect()
        返回 RDD 的所有元素
        rdd.collect()
        {1,2,3,3}
        count()
        RDD 里元素的个数
        rdd.count()
        4
        countByValue()
        各元素在 RDD 中的出现次数
        rdd.countByValue()
        {(1,1),(2,1),(3,2})}
        take(num)
        从 RDD 中返回 num 个元素
        rdd.take(2)
        {1,2}
        top(num)
        从 RDD 中,按照默认(降序)或者指定的排序返回最前面的 num 个元素
        rdd.top(2)
        {3,3}
        reduce()
        并行整合所有 RDD 数据,如求和操作
        rdd.reduce((x,y)=>x+y)
        9
        fold(zero)(func)
        和 reduce() 功能一样,但需要提供初始值
        rdd.fold(0)((x,y)=>x+y)
        9
        foreach(func)
        对 RDD 的每个元素都使用特定函数
        rdd1.foreach(x=>printIn(x))
        打印每一个元素
        saveAsTextFile(path)
        将数据集的元素,以文本的形式保存到文件系统中
        rdd1.saveAsTextFile(file://home/test)
        saveAsSequenceFile(path)
        将数据集的元素,以顺序文件格式保存到指 定的目录下
        saveAsSequenceFile(hdfs://home/test)
        aggregate() 函数的返回类型不需要和 RDD 中的元素类型一致,所以在使用时,需要提供所期
         
       

一些个人在Spark Scala使用上的问题

变量广播:在老旧的spark scala版本上,对于字典变量,如果使用toSet转为字典dict之后,需要对变量进行重新广播,使用广播之后的dict_broadcasst.value 作为新的字典变量进行遍历或查找。
需要广播的字典长度不要太大,否则对于特定版本的spark在added broadcast阶段将耗费大量时间。
如需要使用模糊查找,可以在自定义函数中使用字典contains方法进行定义返回。如:
def search_func(input_value: String):String = { for((k, v) <- dict_broadcasst.value){ if(input_value.contains(k){ return k + "\t" + v; } } return "NANA"; } // 上述dict_broadcasst.value也可以直接用字典dict。

Scala常用语句

读取文本:sc.textFile
条件过滤:filter(ture)
map操作:map
转为RDD 数组:Array
转为RDD 字符串:mkString
保存文本:saveAsTextfile
val txt_value = sc.textFile("hdfs://ip/dir/*"). //从hdfs路径下读入文件 map(_.split("\t")). //使用"\t" (tab)分隔符对文本进行分割(等同于map(x => x.split("\t"))) filter(_.length >= 17). //filter针对Boolean值为ture的条件进行过滤,并不对数据进行修改,map会修改 map(x => Array(x(6), x(14).toLowerCase, search_func(x(14).toLowerCase))). //将RDD分割之后的指定字段进行处理或查找之后转为新的Array类型的RDD filter(x => x(2) != "NANA"). map(x => x.mkString("\t"). //使用"\t" 对上述处理的Array进行分隔,并转为String类型的RDD。 saveAsTextFile("hdfs://ip/dir/save_dir") //对结果进行保存(注:save的结果为文件夹,从hdfs保存到本地时可以使用getmerge对结果进行合并)
 
定义时间戳转换函数:
def tranTimeToString(tm:String) :String={ val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val tim = fm.format(new Date(tm.toLong)) tim }