Thursday, November 20, 2014

Spark SQL: automatic schema from csv using Header

package SQL

import org.apache.spark.SparkContext
import org.apache.spark.sql._
/**
 * Created by devan on 21/11/14.
 * mail msdevanms@gmail.com
 */
object SparSQLCSV {
  def main(args: Array[String]) {

    val sc = new SparkContext("local[*]","home")
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val people = sc.textFile("/home/devan/Documents/dataset/peoplesTest.csv")
    val delimiter = ","
    val schemaString = "a,b".split(delimiter)//csv header
    //Automated Schema creation
    val schema =   StructType(schemaString.map(fieldName => StructField(fieldName, StringType, true)))
    val peopleLines = people.flatMap(x=> x.split("\n"))
    val rowRDD = peopleLines.map(p=>{
      Row.fromSeq(p.split(delimiter))
    })
    val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema)
    peopleSchemaRDD.registerTempTable("people")
    sqlContext.sql("SELECT b FROM people").foreach(println)

  }
}


No comments:

Post a Comment