Data transform in Apache Spark

In old time we used RDD methods such as map/reduce to format and transform data in Apache spark. Such as the following statement.

 scala> rdd.collect
res39: Array[String] = Array(apple, orange, apple, cherry, orange, apple)

scala> rdd.map((_,1)).reduceByKey(_+_).collect.sortBy(-_._2)
res40: Array[(String, Int)] = Array((apple,3), (orange,2), (cherry,1))

But in pratice today we most time use dataframe methods in Apache spark. Dataframe methods are flexible enough to transform data as well.

Given the following dataset which is a voting statistics for urban words.

Urban words dataset

After unzip, we could load it directly as dataframe into spark.

 scala> val df = spark.read.option("inferSchema",true).option("header",true).csv("urbanwords.csv")
df: org.apache.spark.sql.DataFrame = [word_id: int, word: string ... 4 more fields]

Check the schema and count.

 scala> df.printSchema
root
|-- word_id: integer (nullable = true)
|-- word: string (nullable = true)
|-- up_votes: string (nullable = true)
|-- down_votes: string (nullable = true)
|-- author: string (nullable = true)
|-- definition: string (nullable = true)

scala> df.count
res41: Long = 2580925

As you see, all columns are string though we expect the columns "up_votes" and "down_votes" to be numeric. This is b/c there are many dirty data which we should filter out from the dataset.

Let's write this statement to filter the dirty data. We should have numeric data in those two columns.

 scala> val df2 = df.filter($"up_votes".rlike("^[0-9]+$")).filter($"down_votes".rlike("^[0-9]+$"))

And, we should transform data types for two columns into numeric.

 scala> val df3 = df2.withColumn("up_votes",$"up_votes".cast("int")).withColumn("down_votes",$"down_votes".cast("int"))

We have clean data and correct types in dataframe object "df3". Let's check it.

 scala> df3.printSchema
root
|-- word_id: integer (nullable = true)
|-- word: string (nullable = true)
|-- up_votes: integer (nullable = true)
|-- down_votes: integer (nullable = true)
|-- author: string (nullable = true)
|-- definition: string (nullable = true)

You can see types for "up_votes" and "down_votes" are "integer" for which we can aggregate in spark.

 scala> df3.groupBy("word").agg(max("up_votes").alias("max_up"),max("down_votes").alias("max_down") ).orderBy(desc("max_up")).show()
+---------+------+--------+
| word|max_up|max_down|
+---------+------+--------+
| sex|289002| 79134|
|cartossin|270206| 100377|
| hipster|182625| 42696|
| nigger|110245| 25614|
| slut| 94894| 19969|
| love| 94664| 23548|
| woody| 91820| 1024|
|Gate Rape| 79211| 9214|
| Pussy| 73146| 27388|
| Emo| 70992| 27893|
| cunt| 67451| 21808|
| fuck| 66250| 20306|
| Penis| 60686| 31492|
| nigga| 59844| 16572|
| SWAG| 59732| 30924|
| Blowjob| 56248| 24977|
| muslim| 54053| 21358|
| feminist| 53773| 14376|
| vagina| 53368| 10885|
| emo| 49492| 25112|
+---------+------+--------+
only showing top 20 rows

The statement above group the dataset by "word", get the max numbers for "up_votes" and "down_votes", output the result after ordering.

Now we have cleaned and transformed the data and aggregate them to get the statistics reports in Apache spark.