.
  • Adding Multiple Columns to Spark DataFrames

    Adding Multiple Columns to Spark DataFrames

    from: https://p058.github.io/spark/2017/01/08/spark-dataframes.html

    I have been using spark’s dataframe API for quite sometime and often I would want to add many columns to a dataframe(for ex : Creating more features from existing features for a machine learning model) and find it hard to write many withColumn statements. So I monkey patched spark dataframe to make it easy to add multiple columns to spark dataframe.

    First lets create a udf_wrapper decorator to keep the code concise

    Lets create a spark dataframe with columns, user_id, app_usage (app and number of sessions of each app),

    [Read More...]
  • use spark to calculate moving average for time series data

    Spark Window Functions for DataFrames and SQL

    from: http://xinhstechblog.blogspot.de/2016/04/spark-window-functions-for-dataframes.html

    Introduced in Spark 1.4, Spark window functions improved the expressiveness of Spark DataFrames and Spark SQL. With window functions, you can easily calculate a moving average or cumulative sum, or reference a value in a previous row of a table. Window functions allow you to do many common calculations with DataFrames, without having to resort to RDD manipulation.

    Aggregates, UDFs vs. Window functions

    Window functions are complementary to existing DataFrame operations: aggregates, such as sumand avg, and UDFs. To review,

    [Read More...]
  • memoryOverhead issue in Spark

    memoryOverhead issue in Spark

    When using Spark and Hadoop for Big Data applications you may find yourself asking: How to deal with this error, that usually ends-up killing your job: Container killed by YARN for exceeding memory limits. 16.9 GB of 16 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.?

    When I was trying to extract deep-learning features from 15T images, I was facing issues with the memory limitations, which resulted in executors getting killed by YARN, and despite the fact that the job would run for a day, it would eventually fail. The dataset had 200k partitions and our cluster was of version Spark 1.6.2.

    [Read More...]
  • spark 性能优化

    spark 配置:

    配置

    大部分为Spark on YARN模式提供的配置与其它部署模式提供的配置相同。下面这些是为Spark on YARN模式提供的配置。

    Spark属性

    Property Name
    Default
    Meaning

    spark.yarn.applicationMaster.waitTries
    10

    ApplicationMaster等待Spark master的次数

    以及SparkContext初始化尝试的次数

    spark.yarn.submit.file.replication
    HDFS默认的复制次数(3)

    上传到HDFS的文件的HDFS复制水平。这些文件

    包括Spark jar、app jar以及任何分布式缓存文件/档案

    spark.yarn.preserve.staging.files
    false

    设置为true,则在作业结束时保留阶段性文件

    (Spark jar、app jar以及任何分布式缓存文件)而不是删除它们

    spark.yarn.scheduler.heartbeat.interval-ms
    5000

    Spark application master给YARN

    ResourceManager发送心跳的时间间隔(ms)

    spark.yarn.max.executor.failures
    numExecutors * 2,最小为3
    失败应用程序之前最大的执行失败数

    spark.yarn.historyServer.address
    (none)

    Spark历史服务器(如host.com:18080)的地址。

    这个地址不应该包含一个模式(http://)。

    默认情况下没有设置值,这是因为该选项是一个可选选项

    。当Spark应用程序完成从ResourceManager

    UI到Spark历史服务器UI的连接时,这个地址从YARN
    ResourceManager得到

    spark.yarn.dist.archives
    (none)
    提取逗号分隔的档案列表到每个执行器的工作目录

    spark.yarn.dist.files
    (none)
    放置逗号分隔的文件列表到每个执行器的工作目录

    spark.yarn.executor.memoryOverhead
    executorMemory * 0.07,最小384
    分配给每个执行器的堆内存大小(以MB为单位)。
    它是VM开销、interned字符串或者其它本地开销占用的内存。
    这往往随着执行器大小而增长。(典型情况下是6%-10%)

    spark.yarn.driver.memoryOverhead
    driverMemory * 0.07,最小384
    分配给每个driver的堆内存大小(以MB为单位)。
    它是VM开销、interned字符串或者其它本地开销占用的内存。
    这往往随着执行器大小而增长。(典型情况下是6%-10%)

    spark.yarn.queue
    default
    应用程序被提交到的YARN队列的名称

    spark.yarn.jar
    (none)
    Spark jar文件的位置,覆盖默认的位置。默认情况下,
    Spark on YARN将会用到本地安装的Spark jar。
    但是Spark jar也可以HDFS中的一个公共位置。
    这允许YARN缓存它到节点上,而不用在每次运行
    应用程序时都需要分配。指向HDFS中的jar包,
    可以这个参数为”hdfs:///some/path”

    [Read More...]
  • spark之路第五课——配置spark

    from: http://uohzoaix.github.io/studies/2014/09/18/sparkConfiguration/
    spark之路第五课——配置spark

    HOMEABOUTGUESTBOOKCATEGORIESTAGSLINKSSUBSCRIBE

    spark提供了三种方式进行相关的一些属性配置:

    1.Spark properties

    这种方式就是在程序中进行属性的设置,将属性传递给SparkConf类即可:

    从代码中可以看出它既支持常见属性(masterURL,appName)的设置还支持key-value形式。

    动态加载

    很多时候你不愿意以这种硬编码的方式来设置属性,你可以通过无参的SparkConf的构造方法来构造SparkConf类,在运行时使用如下方式启动:

    上述命令也会读取conf/spark-defaults.conf文件加载属性。

    spark支持的属性

    可以通过http://:4040页面上的Environment标签查看所有已经正确设置的属性。

    属性名
    默认值
    注释

    spark.app.name
    (none)
    任务名称

    spark.master
    (none)
    集群地址

    spark.executor.memory
    512m
    每个执行器进程所用内存大小

    spark.serializer
    org.apache.spark.serializer.JavaSerializer
    对象序列化所用的类,默认的JavaSerializer性能太差,推荐使用org.apache.spark.serializer.KryoSerializer,你也可以通过集成org.apache.spark.Serializer来实现自己的序列化器

    spark.kryo.registrator
    (none)
    当使用了KryoSerializer,可以设置该值为KryoRegistrator将自定义类注册到Kryo

    spark.local.dir
    /tmp
    输出文件和RDD存储的目录,可以逗号分隔指定多个目录

    spark.logConf
    false
    指定日志级别为INFO

    spark.executor.extraJavaOptions
    (none)
    JVM选项,不能以这种方式设置spark属性和使用内存大小

    spark.files.userClassPathFirst
    false
    是否使用户添加的jar包优先于spark自身的jar包

    shuffle过程使用的属性

    属性名
    默认值
    注释

    spark.shuffle.consolidateFiles
    false
    如果设置为true,那么shuffle过程产生的中间文件会被整合到一起,这会提高reduce任务的效率。当使用ext4或xfs文件系统时建议设置为true。但如果文件系统是ext3形式的,该选项会恶化机器性能,特别是CPU核数大于8时

    spark.shuffle.spill
    true
    设置为true则会限制在reduce阶段的内存使用量,超出部分会写到硬盘中,超出的阀值通过spark.shuffle.memoryFraction指定

    spark.shuffle.spill.compress
    true
    是否压缩shuffle期间溢出的数据。通过spark.io.compression.codec设置压缩方式

    spark.shuffle.memoryFraction
    0.2
    如果spark.shuffle.spill设置为true,那么shuffle期间内存使用最大为总内存*该值,超出部分会写到硬盘,如果经常会溢出,则可适当增大该值。

    spark.shuffle.compress
    true
    是否压缩输出文件

    spark.shuffle.file.buffer.kb
    32
    每次shuffle过程驻留在内存的buffer大小(单位:字节),在shuffle中间数据的产生过程中可减少硬盘的IO操作

    spark.reducer.maxMbInFlight
    48
    设置reduce任务能同时从map任务的输出文件中取多大的数据(单位:M)。在内存较少的情况下需要降低该值

    spark.shuffle.manager
    HASH
    指定如何shuffle数据,默认为HASH,从1.1后新增一种基于排序的方式(SORT),可以更有效的使用内存

    spark.shuffle.sort.bypassMergeThreshold
    200
    在基于排序的方式时,在没有map端的聚合操作或者reduce分区小于该值时应该避免合并排序后数据

    spark UI相关参数

    属性名
    默认值
    注释

    spark.ui.port
    4040
    任务控制台使用端口

    spark.ui.retainedStages
    1000
    在垃圾回收器收集之前spark UI能保留的最大stage数量

    spark.ui.killEnabled
    true
    允许通过web ui界面停止stage和jobs

    spark.eventLog.enabled
    false
    是否记录spark事件的日志

    spark.eventLog.compress
    false
    是否压缩事件产生的日志

    spark.eventLog.dir
    file://tmp/spark-events
    spark事件产生日志的目录,在这个目录里,每个任务会创建一个子目录存放各个任务的日志文件

    压缩序列化相关参数

    属性名
    默认值
    注释

    spark.broadcast.compress
    true
    是否压缩需要广播的数据

    spark.rdd.compress
    false
    RDD数据在序列化之后是否进一步进行压缩后再存储到内存或磁盘上

    spark.io.compression.codec
    snappy
    RDD数据或shuffle输出数据使用的压缩算法,有lz4,lzf和snappy三种方式

    spark.io.compression.snappy.block.size
    32768
    在snappy压缩时指定的块大小(字节),降低该值也会降低shuffle过程使用的内存

    spark.io.compression.lz4.block.size
    32768
    和上述类似,只不过只在压缩方式为lz4时有效

    spark.closure.serializer
    org.apache.spark.serializer.JavaSerializer
    序列化类

    spark.serializer.objectStreamReset
    100
    当序列化方式使用JavaSerializer时,序列化器会缓存对象以免写入冗余的数据,但这会使垃圾回收器停止对这些对象进行垃圾收集。所以当使用reset序列化器后就会使垃圾回收器重新收集那些旧对象。该值设置为-1则表示禁止周期性的reset,默认情况下每100个对象就会被reset一次序列化器

    spark.kryo.referenceTracking
    true
    当使用kryo序列化器时,是否跟踪对同一个对象的引用情况,这对对象引用有循环引用或同一对象有多个副本的情况是很有用的。否则可以设置为false以提高性能

    spark.kryo.registrationRequired
    false
    是否需要使用kryo来注册对象。当为true时,如果序列化一个未使用kryo注册的对象则会抛出异常,当为false,kryo会将未注册的类的名字一起写到序列化对象中,所以这会带来性能开支,所以在用户还没有从注册队列中删除相应的类时应该设置为true

    spark.kryoserializer.buffer.mb
    0.064
    kryo的序列化缓冲区的初始值。每个worker的每个core都会有一个缓冲区

    spark.kryoserializer.buffer.max.md
    64
    kryo序列化缓冲区允许的最大值(单位:M),这个值必须大于任何你需要序列化的对象。当遇到buffer limit exceeded异常时可以适当增大该值

    执行时相关属性

    属性名
    默认值
    注释

    spark.default.parallelism

    • local mode:本地机器的CPU数量
    • mesos file grained mode:8
    • 其他模式:所有执行器节点的cpu数量之和与2的最大值

    当没有显式设置该值表示系统使用集群中运行shuffle操作(如groupByKey,reduceByKey)的默认的任务数

    spark.broadcast.factory
    org.apache.spark.broadcast.TorrentBroadcastFactory
    广播时使用的实现类

    spark.broadcast.blockSize
    4096
    TorrentBroadcastFactory的块大小。该值过大会降低广播时的并行度(速度变慢),过小的话BlockManager的性能不能发挥到最佳

    spark.files.overwrite
    false
    通过SparkContext.addFile()添加的文件是否可以覆盖之前已经存在并且内容不匹配的文件

    spark.files.fetchTimeout
    false
    获取由driver通过SparkContext.addFile()添加的文件时是否启用通信超时

    spark.storage.memoryFraction
    0.6
    java heap用于spark内存缓存的比例,该值不应该大于jvm中老生代对象的大小。当你自己设置了老生代的大小时可以适当加大该值

    spark.storage.unrollFraction
    0.2
    spark.storage.memoryFraction中用于展开块的内存比例,当没有足够内存来展开新的块的时候会通过丢弃已经存在的旧的块来腾出空间

    spark.tachyonStore.baseDir
    System.getProperty(“java.io.tmpdir”)
    Tachyon文件系统存放RDD的目录。tachyon文件系统的URL通过spark.tachyonStore.url进行设置。可通过逗号分隔设置多个目录

    spark.storage.memoryMapThreshold
    8192
    以字节为单位的快大小,用于磁盘读取一个块大小时进行内存映射。这可以防止spark在内存映射时使用很小的块,一般情况下,对块进行内存映射的开销接近或低于操作系统的页大小

    spark.tachyonStore.url
    tachyon://localhost:19998
    tachyon文件系统的url

    spark.cleaner.ttl
    (infinite)
    spark记录任何元数据(stages生成、task生成等)的持续时间。定期清理可以确保将超期的元数据删除,这在运行长时间任务时是非常有用的,如运行7*24的spark streaming任务。RDD持久化在内存中的超期数据也会被清理

    spark.hadoop.validateOutputSpecs
    true
    当为true时,在使用saveAsHadoopFile或者其他变体时会验证数据输出的合理性(如检查输出目录是否还存在)。

    spark.executor.heartbeatInterval
    10000
    每个executor向driver发送心跳的间隔时间(毫秒)。

    网络相关属性

    属性名
    默认值
    注释

    spark.driver.host
    (本地主机名)
    driver监听的IP或主机名,用于与执行器和standalone模式的master节点进行通信

    spark.driver.port
    (随机)
    driver监听的端口号

    spark.fileserver.port
    (随机)
    driver的HTTP文件服务器监听的端口

    spark.broadcast.port
    (随机)
    driver的广播服务器监听的端口,该参数对于torrent广播模式是没有作用的

    spark.replClassServer.port
    (随机)
    driver的HTTP类服务器监听的端口,只用于spark shell

    spark.blockManager.port
    (随机)
    所有块管理者监听的端口

    spark.executor.port
    (随机)
    executor监听的端口,用于与driver进行通信

    spark.port.maxRetries
    16
    绑定到某个端口的最大重试次数

    spark.akka.frameSieze
    10
    以MB为单位的driver和executor之间通信信息的大小,该值越大,driver可以接受更大的计算结果(如在一个很大的数据集上使用collect()方法)

    spark.akka.threads
    4
    用于通信的actor线程数,当在很大的集群中driver拥有更多的CPU内核数的driver可以适当增加该属性的值

    spark.akka.timeout
    100
    spark节点之间通信的超时时间(秒)

    spark.akka.heartbeat.pauses
    600
    下面三个参数通常一起使用。如果启用错误探测器,有助于对恶意的executor的定位,而对于由于GC暂停或网络滞后引起的情况下,不需要开启错误探测器,另外错误探测器的开启会导致由于心跳信息的频繁交换引起网络泛滥。设大该值可以禁用akka内置的错误探测器,表示akka可接受的心跳停止时间(秒)

    spark.akka.failure-detector.threshold
    300.0
    设大该值可以禁用akka内置的错误探测器,对应akka的akka.remote.transport-failure-detector.threshold

    spark.akka.heartbeat.interval
    1000
    设大该值可以禁用akka内置的错误探测器,该值越大会减少网络负载,越小就会向akka的错误探测器发送信息

    调度相关属性

    属性名
    默认值
    注释

    spark.task.cpus
    1
    为每个人物分配的cpu数

    spark.task.maxFailures
    4
    每个单独任务允许的失败次数,必须设置为大于1,重试次数=该值-1

    spark.scheduler.mode
    FIFO
    提交到SparkContext的任务的调度模式。设置为FAIR表示使用公平的方式进行调度而不是以队列的方式

    spark.cores.max
    (未设置)
    当应用程序运行在standalone集群活着粗粒度共享模式mesos集群时,应用程序向集群请求的最大cpu内核总数(不是指每台机器,而是整个集群)。如果不设置,对于standalone集群将使用spark.deploy.defaultCores的值,而mesos将使用集群中可用的内核

    spark.mesos.coarse
    false
    如果设置为true,在mesos集群上就会使用粗粒度共享模式,这种模式使得spark获得一个长时间运行的mesos任务而不是一个spark任务对应一个mesos任务。这对短查询会带来更低的等待时间,但资源会在整个spark任务的执行期间内被占用

    spark.speculation
    false
    当设置为true时,将会推断任务的执行情况,当一个或多个任务在stage里执行较慢时,这些任务会被重新发布

    spark.speculation.interval
    100
    spark推断任务执行情况的间隔时间(毫秒)

    spark.speculation.quantile
    0.75
    推断启动前,stage必须要完成总task的百分比

    spark.speculation.multiplier
    1.5
    比已完成task的运行速度中位数慢多少倍才启用推断

    spark.locality.wait
    3000
    启动一个本地数据任务的等待时间,当等待时间超过该值时,就会启动下一个本地优先级别的任务。该设置同样可以应用到各优先级别的本地性之间(本地进程,本地节点,本地机架,任意节点),当然可以通过spark.locality.wait.node等参数设置不同优先级别的本地性

    spark.locality.wait.process
    spark.locality.wait
    本地进程的本地等待时间,它会影响尝试访问缓存数据的任务

    spark.locality.wait.node
    spark.locality.wait
    本地节点的本地等待时间,当设置为0,就会忽略本地节点并立即在本地机架上寻找

    spark.locality.wait.rack
    spark.locality.wait
    本地机架的本地等待时间

    spark.scheduler.revive.interval
    1000
    复活重新获取资源的任务的最长时间间隔(毫秒)

    spark.scheduler.minRegisteredResourcesRatio
    0
    在调度开始之前已注册的资源需要达到的最小比例,如果不设置该属性的话,那么调度开始之前需要等待的最大时间由spark.scheduler.maxRegisteredResourcesWaitingTime设置

    spark.scheduler.maxRegisteredResourcesWaitingTime
    30000
    调度开始之前需要等待的最大时间(毫秒)

    spark.localExecution.enabled
    false
    在spark调用first()或take()等任务时是否将任务发送给集群,当设置为true时会使这些任务执行速度加快,但是可能需要将整个分区的数据装载到driver

    安全相关属性

    属性名
    默认值
    注释

    spark.authenticate
    false
    各个连接之间是否需要验证

    spark.authenticate.secret
    None
    各组件之间进行验证的秘钥,如果集群不是在YARN并且spark.authenticate设置为true的时候需要设置该属性

    spark.core.connection.auth.wait.timeout
    30
    连接时验证的超时时间(秒)

    spark.core.connection.ack.wait.timeout
    60
    连接时应答的超时时间,是为了避免由于GC带来的长时间等待

    spark.ui.filters
    None
    以逗号分隔的过滤器的名字,这些过滤器会在web UI中使用,它们必须是继承自javax servlet Filter类。过滤器的参数应为:spark..params=’param1=value1,param2=value2’形式

    spark.acls.enable
    false
    spark acls是否应该启用,如果为true那么会检查用户是否有访问和修改job的权限

    spark.ui.view.acls
    Empty
    逗号分隔的那些有权限访问web UI的用户,默认情况下只有启动当前job的用户才有访问权限

    spark.modify.acls
    Empty
    逗号分隔的那些有权限修改job的用户,默认情况下只有启动当前job的用户才有访问权限

    spark.admin.acls
    Empty
    逗号分隔的有权限查看和修改所有job的用户

    spark streaming相关参数

    属性名
    默认值
    注释

    spark.streaming.blockInterval
    200
    spark streaming接收器将接受到的数据合并成数据块并存储在spark里的时间间隔(毫秒)

    spark.streaming.receiver.maxRate
    infinite
    每个接收器将数据放入block的最大速率(每秒),每个stream每秒最多只能消费这么多的数据。如果该值《=0那么速率将没有限制

    spark.streaming.unpersist
    true
    如果为true,那么强迫将spark streaming持久化的RDD数据从spark内存中清理,同样的,spark streaming接收的原始输入数据也会自动被清理;如果为false,则允许原始输入数据和持久化的RDD数据可被外部的streaming应用程序访问,因为这些数据不会被自动清理,但会带来更大的内存使用

    spark.executor.log.rolling.strategy
    (none)
    执行器日志的滚动策略,可以设置为基于时间(time)和基于大小(size)。当设置为time,使用spark.executor.logs.rolling.time.interval属性的值作为日志滚动的间隔,当设置为size,那么当大小(字节)达到spark.executor.logs.rolling.size.maxBytes的值是就会出现滚动

    spark.executor.logs.rolling.size.maxBytes
    (none)
    当日志大小达到该值时日志会自动进行清理

    spark.executor.logs.rolling.time.interval
    daily
    日志每天会被清理,可设置的值有:daily,hourly,minutely,或任何以秒为单位的值

    spark.executor.logs.rolling.maxRetainedFiles
    (none)
    最近需要保留的日志数量,旧的日志会被删除

    [Read More...]
  • spark submit multiple jars

    It is straight to include only one dependency jar file when submit Spark jobs. See the following example:

    How about including multiple jars? See I want to include all the jars like this: ./lib/*.jar. 

    According to spark-submit‘s –help, the –jars option expects a comma-separated list of local jars to include on the driver and executor classpaths.

    However,  ./lib/*.jar is expanding into a space-separated list of jars. 

    According to this answer on StackOverflow, we have different ways to generate a list of jars that are separated by comma. 

    [Read More...]
  • Class com.hadoop.compression.lzo.LzoCodec not found for Spark

    Here is the solution to solve the Class com.hadoop.compression.lzo.LzoCodec not found for Spark:

    Put the following command at spark-env.sh, or simply run it before submit the spark jobs. 

    export SPARK_CLASSPATH=”$(ls ${HADOOP_PREFIX}/share/hadoop/common/hadoop-gpl-compression.jar)”

    [Read More...]
  • Count word frequency

    Count word frequency is a popular task for text analysis. In this post, I describe how to count word frequency using Java HashMap, python dictionary, and Spark. 

    Use Java HashMap to Count Word frequency

    {a=5, b=2, c=6, d=3}

    Use Python Dict to count word frequency

    The output:

    {‘a’: 5, ‘c’: 6, ‘b’: 2, ‘d’: 3}

    Use Spark to count word Frequency

    The above method works well for small dataset. However, if you have a huge dataset, the hashTable based method will not work. You will need to develop a distributed program to accomplish this task.

    [Read More...]
  • Run spark on oozie with command line arguments

    We have described how to use oozie to run a pyspark program.  This post will use a simple example to show how to use oozie to run a spark program in scala. 

    You might be interested in: 1. develop a spark program using SBT.  2. Parse arguments for a spark program using Scopt.  

    Here are the key points of this post:

    1. A workable example to show how to use oozie spark action to run a spark program
    2. How to specify third party libraries in oozie
    3. How to specify command line arguments to the spark program in oozie

    The following code shows the content of the workflow.xml file,

    [Read More...]
  • A Spark program using Scopt to Parse Arguments

    To develop a Spark program, we often need to read arguments from the command line. Scopt is a popular and easy-to-use argument parser. In this post, I provide a workable example to show how to use the scopt parser to read arguments for a spark program in scala. Then I describe how to run the spark job in yarn-cluster mode.

    The main contents of this post include:

    1. Use scopt option parser to parse arguments for a scala program.
    2. Use sbt to package the scala program
    3. Run spark on yarn-cluster mode with third party libraries

    Use Scopt to parse arguments in a scala program

    In the following program,

    [Read More...]
Page 1 of 212