Example : case class, registerTempTable() & SQL Query

An Example to retrieve only Math marks using DataFrame
sparkexamples > c20 > a20 > a20
cat c15_plain_text.input
English|70
Math|80
Math|75
[raj@Rajkumars-MacBook-Pro ~/gitws/sparkexamples/src/main/scala/c20_SparkSQL/a20_case_class]$spark-shell --master local[*] 
2016-05-29 14:17:43.005 java[35420:7318795] Unable to load realm info from SCDynamicStore
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.
scala> // Need 'case class' to register our data as Table

scala> case class Subject(name:String, mark:String)
defined class Subject

scala> val input = sc.textFile("file:///Users/raj/gitws/sparkexamples/src/main/scala/c20_SparkSQL/a20_case_class/c15_plain_text.input")
input: org.apache.spark.rdd.RDD[String] = file:///Users/raj/gitws/sparkexamples/src/main/scala/c20_SparkSQL/a20_case_class/c15_plain_text.input MapPartitionsRDD[63] at textFile at <console>:27

scala> input.take(10)
res52: Array[String] = Array(English|70, Math|80, Math|75, Science|80)

scala> val splittedRdd = input.map(_.split("\\|"))
splittedRdd: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[64] at map at <console>:29

scala> splittedRdd.take(10)
res53: Array[Array[String]] = Array(Array(English, 70), Array(Math, 80), Array(Math, 75), Array(Science, 80))

scala> val subjectRdd = splittedRdd.map(x => Subject(x(0), x(1)))
subjectRdd: org.apache.spark.rdd.RDD[Subject] = MapPartitionsRDD[65] at map at <console>:33

scala> subjectRdd.take(10)
res54: Array[Subject] = Array(Subject(English,70), Subject(Math,80), Subject(Math,75), Subject(Science,80))

scala> val subjectDf = subjectRdd.toDF
subjectDf: org.apache.spark.sql.DataFrame = [name: string, mark: string]

scala> subjectDf.take(10)
res55: Array[org.apache.spark.sql.Row] = Array([English,70], [Math,80], [Math,75], [Science,80])

scala> subjectDf.printSchema()
root
 |-- name: string (nullable = true)
 |-- mark: string (nullable = true)


scala> val selected = subjectDf.select(subjectDf("name"))
selected: org.apache.spark.sql.DataFrame = [name: string]

scala> selected.take(10)
res57: Array[org.apache.spark.sql.Row] = Array([English], [Math], [Math], [Science])

scala> subjectDf.registerTempTable("subjecttable")

scala> val mathDf = sqlContext.sql("select * from subjecttable where name='Math'")
mathDf: org.apache.spark.sql.DataFrame = [name: string, mark: string]

scala> mathDf.take(10)
res59: Array[org.apache.spark.sql.Row] = Array([Math,80], [Math,75])

No comments:

Post a Comment