Tuesday, December 9, 2014

Programatically Disable all forms of logging to the console while running SPARK sbt application



First of all import log4j Logger and Level.

import org.apache.log4j.Logger
import org.apache.log4j.Level


Then add these two lines for disable logging.

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)


Try :) 

Wednesday, November 26, 2014

Spark 1.1.1 Released

Spark Release 1.1.1

Spark 1.1.1 is a maintenance release with bug fixes. This release is based on the branch-1.1 maintenance branch of Spark. Recommending all 1.1.0 users to upgrade to this stable release.
To download Spark 1.1.1 visit the downloads page.

Fixes

Spark 1.1.1 contains bug fixes in several components. Some of the more important fixes are highlighted below. You can visit the Spark issue tracker for the full list of fixes.

Spark Core

  • Avoid many small spills in external data structures (SPARK-4480)
  • Memory leak in connection manager timeout thread (SPARK-4393)
  • Incorrect of channel read return value may lead to data truncation (SPARK-4107)
  • Stream corruption exceptions observed in sort-based shuffle (SPARK-3948)
  • Integer overflow in sort-based shuffle key comparison (SPARK-3032)
  • Lack of thread safety in Hadoop configuration usage in Spark (SPARK-2546)

SQL

  • Wrong Parquet filters are created for all inequality predicates with literals on the left hand side (SPARK-4468)
  • Support backticks in aliases (SPARK-3708 and SPARK-3834)
  • ColumnValue types do not match in Spark rows vs Hive rows (SPARK-3704)

PySpark

  • Fix sortByKey on empty RDD (SPARK-4304)
  • Avoid using the same random seed for all partitions (SPARK-4148)
  • Avoid OOMs when take() is run on empty partitions (SPARK-3211)

MLlib

  • KryoException caused by ALS.trainImplicit in PySpark (SPARK-3990)

Streaming

  • Block replication continuously fails if target is down (SPARK-3495)
  • Block replication may choose driver as target (SPARK-3496)

GraphX

  • Ensure VertexRDD.apply uses mergeFunc (SPARK-2062)

Thursday, November 20, 2014

Spark SQL: automatic schema from csv using Header

package SQL

import org.apache.spark.SparkContext
import org.apache.spark.sql._
/**
 * Created by devan on 21/11/14.
 * mail msdevanms@gmail.com
 */
object SparSQLCSV {
  def main(args: Array[String]) {

    val sc = new SparkContext("local[*]","home")
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val people = sc.textFile("/home/devan/Documents/dataset/peoplesTest.csv")
    val delimiter = ","
    val schemaString = "a,b".split(delimiter)//csv header
    //Automated Schema creation
    val schema =   StructType(schemaString.map(fieldName => StructField(fieldName, StringType, true)))
    val peopleLines = people.flatMap(x=> x.split("\n"))
    val rowRDD = peopleLines.map(p=>{
      Row.fromSeq(p.split(delimiter))
    })
    val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema)
    peopleSchemaRDD.registerTempTable("people")
    sqlContext.sql("SELECT b FROM people").foreach(println)

  }
}


Tuesday, November 18, 2014

Slowly changing dimensions and ACID with HIVE(Version 0.14)


Slow changing dimensions ( http://en.wikipedia.org/wiki/Slowly_changing_dimension ). In a typical star schema data warehouse, dimensions tables change slowly over time. For example, a retailer will open new stores, which need to be added to the stores table, or an existing store may change its square footage or some other tracked characteristic. These changes lead to inserts of individual records or updates of records (depending on the strategy chosen). Starting with 0.14, Hive is able to support this.
ACID stands for four traits of database transactions: Atomicity (an operation either succeeds completely or fails, it does not leave partial data), Consistency (once an application performs an operation the results of that operation are visible to it in every subsequent operation), Isolation (operations by one user do not cause unexpected side effects for other users), and Durability (once an operation is complete it will be preserved even in the face of machine or system failure). These traits have long been expected of database systems as part of their transaction functionality.

Hive Update

UPDATE tablename SET column = value [, column = value ...] [WHERE expression]
  1. The referenced column must be a column of the table being updated.
  2. The value assigned must be an expression that Hive supports in the select clause. Thus arithmetic operators, UDFs, casts, literals, etc. are supported. Subqueries are not supported.
  3. Only rows that match the WHERE clause will be updated.
  4. Partitioning columns cannot be updated.
  5. Bucketing columns cannot be updated.
  6. In Hive 0.14, upon successful completion of this operation the changes will be auto-committed.
Hive Delete

DELETE FROM tablename [WHERE expression]




Jira issue status : https://issues.apache.org/jira/browse/HIVE-5317

Tuesday, September 9, 2014

What's the difference between RDD's map,FlatMap and mapPartitions method?

What's the difference between RDD's map,FlatMap and mapPartitions method?

map converts each element of the source RDD into a single element of the result RDD by applying a function. mapPartitions converts each partition of the source RDD into into multiple elements of the result (possibly none). FlatMap works on a single element (as map) and produces multiple elements of the result (as mapPartitions).
RDD -> Map -> Single Element(one to one)
RDD -> MapPartition -> MultipleElements (single) for Each Partition
RDD -> FlatMap -> Multiple Elements(one to many)

These are the same in functioning
a.map(r => r * 2)
a.mapPartitions(iter => iter.map(r => r *2))

Wednesday, June 18, 2014

JDBC Client Sample Code

import java.sql.SQLException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.DriverManager;
 
public class HiveJdbcClient {
  private static String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver";
 
  /**
 * @param args
 * @throws SQLException
   */
  public static void main(String[] args) throws SQLException {
      try {
      Class.forName(driverName);
    catch (ClassNotFoundException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
      System.exit(1);
    }
    Connection con = DriverManager.getConnection("jdbc:hive://localhost:10000/default""""");
    Statement stmt = con.createStatement();
    String tableName = "testHiveDriverTable";
    stmt.executeQuery("drop table " + tableName);
    ResultSet res = stmt.executeQuery("create table " + tableName + " (key int, value string)");
    // show tables
    String sql = "show tables '" + tableName + "'";
    System.out.println("Running: " + sql);
    res = stmt.executeQuery(sql);
    if (res.next()) {
      System.out.println(res.getString(1));
    }
    // describe table
    sql = "describe " + tableName;
    System.out.println("Running: " + sql);
    res = stmt.executeQuery(sql);
    while (res.next()) {
      System.out.println(res.getString(1) + "\t" + res.getString(2));
    }
 
    // load data into table
    // NOTE: filepath has to be local to the hive server
    // NOTE: /tmp/a.txt is a ctrl-A separated file with two fields per line
    String filepath = "/tmp/a.txt";
    sql = "load data local inpath '" + filepath + "' into table " + tableName;
    System.out.println("Running: " + sql);
    res = stmt.executeQuery(sql);
 
    // select * query
    sql = "select * from " + tableName;
    System.out.println("Running: " + sql);
    res = stmt.executeQuery(sql);
    while (res.next()) {
      System.out.println(String.valueOf(res.getInt(1)) + "\t" + res.getString(2));
    }
 
    // regular hive query
    sql = "select count(1) from " + tableName;
    System.out.println("Running: " + sql);
    res = stmt.executeQuery(sql);
    while (res.next()) {
      System.out.println(res.getString(1));
    }
  }
}