use spark to calculate moving average for time series data

Spark Window Functions for DataFrames and SQL

from: http://xinhstechblog.blogspot.de/2016/04/spark-window-functions-for-dataframes.html

Introduced in Spark 1.4, Spark window functions improved the expressiveness of Spark DataFrames and Spark SQL. With window functions, you can easily calculate a moving average or cumulative sum, or reference a value in a previous row of a table. Window functions allow you to do many common calculations with DataFrames, without having to resort to RDD manipulation.

Aggregates, UDFs vs. Window functions

Window functions are complementary to existing DataFrame operations: aggregates, such as sumand avg, and UDFs. To review, aggregates calculate one result, a sum or average, for each group of rows, whereas UDFs calculate one result for each row based on only data in that row. In contrast, window functions calculate one result for each row based on a window of rows. For example, in a moving average, you calculate for each row the average of the rows surrounding the current row; this can be done with window functions.

Moving Average Example

Let us dive right into the moving average example. In this example dataset, there are two customers who have spent different amounts of money each day.

// Building the customer DataFrame. All examples are written in Scala with Spark 1.6.1, but the same can be done in Python or SQL.

val customers = sc.parallelize(List((“Alice”, “2016-05-01”, 50.00),

                                    (“Alice”, “2016-05-03”, 45.00),

                                    (“Alice”, “2016-05-04”, 55.00),

                                    (“Bob”, “2016-05-01”, 25.00),

                                    (“Bob”, “2016-05-04”, 29.00),

                                    (“Bob”, “2016-05-06”, 27.00))).

                               toDF(“name”, “date”, “amountSpent”)

// Import the window functions.

import org.apache.spark.sql.expressions.Window

import org.apache.spark.sql.functions._

// Create a window spec.

val wSpec1 = Window.partitionBy(“name”).orderBy(“date”).rowsBetween(-1, 1)

In this window spec, the data is partitioned by customer. Each customer’s data is ordered by date. And, the window frame is defined as starting from -1 (one row before the current row) and ending at 1 (one row after the current row), for a total of 3 rows in the sliding window.

// Calculate the moving average

customers.withColumn( “movingAvg”,

                                             avg(customers(“amountSpent”)).over(wSpec1)  ).show()

This code adds a new column, “movingAvg”, by applying the avg function on the sliding window defined in the window spec:

name

date

amountSpent

movingAvg

Alice

5/1/2016

50

47.5

Alice

5/3/2016

45

50

Alice

5/4/2016

55

50

Bob

5/1/2016

25

27

Bob

5/4/2016

29

27

Bob

5/6/2016

27

28

 

Window function and Window Spec definition

As shown in the above example, there are two parts to applying a window function: (1) specifying the window function, such as avg in the example, and (2) specifying the window spec, or wSpec1 in the example. For (1), you can find a full list of the window functions here:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$
 You can use functions listed under “Aggregate Functions” and “Window Functions”.

For (2) specifying a window spec, there are three components: partition by, order by, and frame.

  1.     “Partition by” defines how the data is grouped; in the above example, it was by customer. You have to specify a reasonable grouping because all data within a group will be collected to the same machine. Ideally, the DataFrame has already been partitioned by the desired grouping.
  2.       “Order by” defines how rows are ordered within a group; in the above example, it was by date.
  3.       “Frame” defines the boundaries of the window with respect to the current row; in the above example, the window ranged between the previous row and the next row.


Cumulative Sum

Next, let us calculate the cumulative sum of the amount spent per customer.

// Window spec: the frame ranges from the beginning (Long.MinValue) to the current row (0).

val wSpec2 = Window.partitionBy(“name”).orderBy(“date”).rowsBetween(Long.MinValue, 0)

// Create a new column which calculates the sum over the defined window frame.

customers.withColumn( “cumSum”,

  sum(customers(“amountSpent”)).over(wSpec2)  ).show()

name

date

amountSpent

cumSum

Alice

5/1/2016

50

50

Alice

5/3/2016

45

95

Alice

5/4/2016

55

150

Bob

5/1/2016

25

25

Bob

5/4/2016

29

54

Bob

5/6/2016

27

81

 

Data from previous row

In the next example, we want to see the amount spent by the customer in their previous visit.

// Window spec. No need to specify a frame in this case.

val wSpec3 = Window.partitionBy(“name”).orderBy(“date”)

// Use the lag function to look backwards by one row.

customers.withColumn(“prevAmountSpent”,

 lag(customers(“amountSpent”), 1).over(wSpec3) ).show()

name

date

amountSpent

prevAmountSpent

Alice

5/1/2016

50

null

Alice

5/3/2016

45

50

Alice

5/4/2016

55

45

Bob

5/1/2016

25

null

Bob

5/4/2016

29

25

Bob

5/6/2016

27

29

 

Rank

In this example, we want to know the order of a customer’s visit (whether this is their first, second, or third visit).

// The rank function returns what we want.

customers.withColumn( “rank”, rank().over(wSpec3) ).show()

name

date

amountSpent

rank

Alice

5/1/2016

50

1

Alice

5/3/2016

45

2

Alice

5/4/2016

55

3

Bob

5/1/2016

25

1

Bob

5/4/2016

29

2

Bob

5/6/2016

27

3

 

Conclusion

I hope these examples have helped you understand Spark’s window functions. There is more functionality that was not covered here. To learn more, please see the Databricks article on this topic: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html