Saturday, July 29, 2017

HackerRank Equals

Christy is interning at HackerRank. One day she has to distribute some chocolates to her colleagues. She is biased towards her friends and may have distributed the chocolates unequally. One of the program managers gets to know this and orders Christy to make sure everyone gets equal number of chocolates.
But to make things difficult for the intern, she is ordered to equalize the number of chocolates for every colleague in the following manner,
For every operation, she can choose one of her colleagues and can do one of the three things.
  1. She can give one chocolate to every colleague other than chosen one.
  2. She can give two chocolates to every colleague other than chosen one.
  3. She can give five chocolates to every colleague other than chosen one.
Calculate minimum number of such operations needed to ensure that every colleague has the same number of chocolates. 
Input Format
First line contains an integer  denoting the number of testcases.  testcases follow.
Each testcase has  lines. First line of each testcase contains an integer  denoting the number of colleagues. Second line contains N space separated integers denoting the current number of chocolates each colleague has.
Constraints


Number of initial chocolates each colleague has < 
Output Format
 lines, each containing the minimum number of operations needed to make sure all colleagues have the same number of chocolates.
Sample Input
1
4
2 2 3 7
Sample Output
2
Explanation
1st operation: Christy increases all elements by 1 except 3rd one
2 2 3 7 -> 3 3 3 8
2nd operation: Christy increases all element by 5 except last one
3 3 3 8 -> 8 8 8 8


       

import java.util.Scanner;

/**
 * Created by vdokku on 7/29/2017.
 */
public class Equal {

    static boolean DBG = false;

    // Min number in the Array.
    static int min;

    //Find the minimised action counts
    public static int MinRound(int[] counts) {

        int[][] results = new int[counts.length][3];
        for (int i = 0; i < counts.length; i++) {
            for (int j = 0; j < 3; j++) {
                int delta = counts[i] - min + j; // calculating the DELTA is important.
                results[i][j] = 0;
                while (true) {
                    // Greedy approach
                    if (delta >= 5) {
                        delta -= 5;
                        results[i][j]++;
                    } else if (delta >= 2) {
                        delta -= 2;
                        results[i][j]++;
                    } else if (delta >= 1) {
                        delta -= 1;
                        results[i][j]++;
                    } else {
                        break;
                    }
                }
            }
        }
        int finalResult = -1;
        // Compare results from different baseline cases (keep min, take 1, 2 ).
        for (int i = 0; i < 3; i++) {
            int subFinal = 0;
            for (int j = 0; j < counts.length; j++) {
                subFinal += results[j][i];
                if (DBG) System.out.format("results[%d][%d] = %d \n", j, i, results[j][i]);
            }
            if (DBG) System.out.println(subFinal);
            if (finalResult < 0 || finalResult > subFinal) {
                finalResult = subFinal;
            }
        }

        return finalResult;
    }

    public static void main(String[] args) {

        int casesCount = 0;

        Scanner s = new Scanner(System.in);

        if (s.hasNextInt()) {
            casesCount = s.nextInt();
        }
        s.nextLine(); // throw away the newline.

        int[] outputArrary = new int[casesCount];

        for (int i = 0; i < casesCount; i++) {
            int count = 0;
            if (s.hasNextInt()) {
                count = s.nextInt();
            }
            s.nextLine();

            int[] numbers = new int[count];
            min = -1;
            for (int j = 0; j < count; j++) {
                if (s.hasNextInt()) {
                    numbers[j] = s.nextInt();
                    // get min value from input array
                    if (numbers[j] < min || min < 0) {
                        min = numbers[j];
                    }
                } else {
                    System.out.println("You didn't provide enough numbers");
                    break;
                }
            }
            //SortCounts(numbers);
            outputArrary[i] = MinRound(numbers);

        }

        for (int i = 0; i < casesCount; i++) {
            System.out.println(outputArrary[i]);
        }
    }
}

       
 

Thursday, July 27, 2017

LEARN complex HIVE Queries

When working with the complex hive queries which involves different analytical functions.
Here is a sample query.

       

SELECT
mag.co_magasin,
dem.id_produit                                  as id_produit_orig,
pnvente.dt_debut_commercial                     as dt_debut_commercial,
COALESCE(pnvente.id_produit,dem.id_produit)     as id_produit,
min(
CASE WHEN dem.co_validation IS NULL THEN 0 ELSE 1 END
)                                               as flg_demarque_valide,
sum(CASE WHEN dem.co_validation IS NULL THEN 0 ELSE cast(dem.mt_revient_ope AS INT) END)
           as me_dem_con_prx_cs,
0                                               as me_dem_inc_prx_cs,
0                                               as me_dem_prov_stk_cs,
sum(CASE WHEN dem.co_validation IS NULL THEN 0 ELSE cast(dem.qt_demarque AS INT) END)
           as qt_dem_con,
0                                               as qt_dem_inc,
0                                               as qt_dem_prov_stk,
RANK() OVER (PARTITION BY mag.co_magasin, dem.id_produit ORDER BY pnvente.dt_debut_commercial DESC, COALESCE(pnvente.id_produit,dem.id_produit) DESC) as rang
from default.calendrier cal
INNER JOIN default.demarque_mag_jour dem
ON  CASE WHEN dem.co_societe = 1 THEN 1 ELSE 2 END = '${hiveconf:in_co_societe}'
AND dem.dt_jour    = cal.dt_jour
LEFT OUTER JOIN default.produit_norm pn
ON  pn.co_societe = dem.co_societe
AND pn.id_produit = dem.id_produit
LEFT OUTER JOIN default.produit_norm pnvente
ON  pnvente.co_societe = pn.co_societe
AND pnvente.co_produit_rfu = pn.co_produit_lip
AND pnvente.co_type_motif='05'
INNER JOIN default.kpi_magasin mag
ON  mag.co_societe = '${hiveconf:in_co_societe}'
AND mag.id_magasin = dem.id_magasin
WHERE cal.dt_jour = '${hiveconf:in_dt_jour}'
AND NOT (dem.co_validation IS NULL AND cal.dt_jour > from_unixtime(unix_timestamp()-3*60*60*24, 'ddmmyyyy'))
-- JYP 4.4
AND dem.co_operation_magasin IN ('13','14','32')
GROUP BY
mag.co_magasin,
dem.id_produit,
pnvente.dt_debut_commercial,
COALESCE(pnvente.id_produit,dem.id_produit)

       
 










Tuesday, July 18, 2017

com.google.common.collect.Lists provide different methods.

com.google.common.collect;
Lists


public static <T> List<T> reverse(List<T> list) {
    if (list instanceof ImmutableList) {
      return ((ImmutableList<T>) list).reverse();
    } else if (list instanceof ReverseList) {
      return ((ReverseList<T>) list).getForwardList();
    } else if (list instanceof RandomAccess) {
      return new RandomAccessReverseList<T>(list);
    } else {
      return new ReverseList<T>(list);
    }
  }


 @Override public List<T> subList(int fromIndex, int toIndex) {
      checkPositionIndexes(fromIndex, toIndex, size());
      return reverse(forwardList.subList(
          reversePosition(toIndex), reversePosition(fromIndex)));
    }

public static <T> List<List<T>> partition(List<T> list, int size) {
    checkNotNull(list);
    checkArgument(size > 0);
    return (list instanceof RandomAccess)
        ? new RandomAccessPartition<T>(list, size)
        : new Partition<T>(list, size);
  }




why variables inside foreach loop of java8 should be final?

The Java Memory Model has very important property: it guarantees that local variables and method parameters are never writable by another thread. This adds much safety to multi-threading programming. However when you create a lambda (or an anonymous class), nobody knows how it will be used. It can be passed to another thread for execution (for example, if you use parallelStream().forEach(...)). Were it possible to modify the local variable that important property would be violated. Not the thing the Java language developers would sacrifice.

Usually when you are using lambdas, you are trying to program in functional way. But in functional programming mutable variables are considered bad practice: it's better to assign every variable only once. So trying to modify the local variable actually smells. Use various stream reduction methods instead of forEach to produce a good functional code.

https://stackoverflow.com/questions/16635398/java-8-iterable-foreach-vs-foreach-loop
https://stackoverflow.com/questions/31801313/why-variables-inside-foreach-loop-of-java8-should-be-final



Difference between == and === in scala,spark


The "==" is using the equals methods which checks if the two references point to the same object. The definition of "===" depends on the context/object. For Spark , "===" is using the equalTo method.

== returns a boolean
=== returns a column (which contains the result of the comparisons of the elements of two columns)

Quick Sample: Normally SchemaRDD is as RDD[org.apache.spark.sql.Row] , extracting the information out of schemaRDD is by using rec.getString(column number).


.filter(rec => (rec.getString(5) == "" || rec.getString(5) == null)).map(rec => (rec.getString(1))).

Mergin the PART file to a single file. You can use the same option of .repartition(1) or coalesce(1) to save only 1 part file.

hadoop fs -getmerge "HDFS_LOCATION_PART_FILES_LIST"  "NFS_LOCATION_SINGLE_PART_FILE"

Tuesday, July 11, 2017

Hadoop Certification Plan

Hive AVRO
Hive Parquet

Hive Window Operation.
Hive GroupBy

Removing the header of any (TEXT/CSV) files can be done in two ways.

stocksData.take(12).foreach(println)


Stick with the First approach.

1)  mapPartitionsWithIndex
 Ex: stocksData.mapPartitionsWithIndex{(index, itr) => if (index ==0) itr.drop(1) else itr}.take(12).foreach(println)



2)  mapPartitions
 Ex: stocksData.mapPartitions(itr => itr.drop(1)).take(12).foreach(println)



Simple commands that needs to be remembered.
  1. import sqlContext.implicits._
  2. spark-shell --packages com.databricks:spark-avro_2.10:2.0.1
  3. import com.databricks.spark.avro._
  4. import org.apache.spark.sql.functions._

Different ways of reading a Text file from HDFS.
Considering the input source files are comma separated.

       

val ordersDF = sc.textFile("/PATH").map(_.split(',')).map(p => orders(p(0).toInt,p(1),p(2).toInt,p(3))).toDF()
val rdd1 = sc.textFile("/Path").map(rec => {  val rec1 =  rec.split(',')  orders(rec1(0).toInt,rec1(1),rec1(2).toInt,rec1(3)) } ).toDF()

       
 

Things you need to know, when reading a Parquet & AVRO files. 

Once you get the DF out of the PARQUET or AVRO. You can access the values using the rec(INDEX)

Sample example:

       

val ordersDF = sqlContext.read.parquet("PATH")
ordersDF.map(rec => rec.getInt(0)+rec.getLong(1)+rec.getInt(2)+rec.getString(3)).take(12).foreach(println)

ordersDF.map(rec => rec(0)+"\t"+rec(1)+"\t"+rec(2)+"\t"+rec(3)).take(12).foreach(println)

import com.databricks.spark.avro._
val ordersDF = sqlContext.read.avro("CCA_Prep/avro/orders")
ordersDF.map(rec => rec(0)+"\t"+rec(1))

       
 


Things to remember when importing the data using the SQOOP.

1) Sqoop import MYSQL data to the TEXT file. 
       

sqoop import --connect "" --username "" --password --table order_items --as-textfile --target-dir --fields-terminated-by '' --lines-terminated-by ''



       
 

2) Sqoop import MYSQL data to the AVRO Data file.
       

sqoop import --connect "" --username "" --password "" --table order_items --as-avrodatafile --target-dir "" --fileds-terminated-by '' --lines-terminated-by ''


       
 

3) Sqoop import MYSQL data to the PARQUET Data file
       

sqoop import --connect "" --username "" --password "" --table order_items --as-parquetfile --target-dir "" --fields-terminated-by '' --lines-terminated-by ''

       
 




Wednesday, July 5, 2017

Java 8 Functional interfaces

Definition:

@FunctionalInterface annotation is useful for compilation time checking of your code.
You cannot have more than one method besides
static,
default and
abstract methods that override methods in Object in your @FunctionalInterface or any other interface used as a functional interface.

Insight 1:

Conceptually,
a functional interface has exactly one abstract method.
Since default methods have an implementation, they are not abstract.
Since default methods are not abstract you’re free to add default methods to your functional interface as many as you like.

Insight 2:

If an interface declares an abstract method overriding one of the public methods of java.lang.Object, that also does not count toward the interface’s abstract method count since any implementation of the interface will have an implementation from java.lang.Object or elsewhere.

Insight 3:

If an interface declares an abstract method overriding one of the public methods of java.lang.Object, that also does not count toward the interface’s abstract method count since any implementation of the interface will have an implementation from java.lang.Object or elsewhere.
e.g. Below is a valid functional interface even though it declared two abstract methods. Why? Because one of these abstract methods “equals()” which has signature equal to public method in Object class.
@FunctionalInterface
 interface MathOperation {
      int operation(int a, int b);
      @Override
      public String toString();  //Overridden from Object class
      @Override
      public boolean equals(Object obj); //Overridden from Object class
 }
While the intended use of Functional interfaces is for lambda expressionsmethod references and constructor references, they can still be used, like any interface, with anonymous classes, implemented by classes, or created by factory methods.

Java 8 & Lambda Expressions


In JDK 8 a LAMBDA Expression is a way  of defining the anonymous functions. Simple example is as follows.
Consumer<String> consumer = (x) -> System.out.print(x);
Now calling consumer.accept(" Hey There !"); and it will apply the string parameter to the body of the anonymous function.
Result:  Hey There !
We can think of it as a function defined in two parts:
  1. The right part of the expression is the body of the function
  2. The signature of the function is defined in a Functional Interface, in this case the Consumer<T>interface (more about Functional Interfaces below.
One important detail is that the lambda expression is of the same type of the functional interface associated with it.
Lambda Expression vs Anonymous classes.
Need to update ....

Java 8 Streams


  • A Stream is a pipeline of functions that can be evaluated
  • Streams can transform data
  • A Stream is not a data structure
  • Streams cannot mutate data
Most importantly, a stream isn’t a data structure. You can often create a stream from collections to apply a number of functions on a data structure, but a stream itself is not a data structure. That’s so important, I mentioned it twice! A stream can be composed of multiple functions that create a pipeline that data that flows through. This data cannot be mutated. That is to say the original data structure doesn’t change. However the data can be transformed and later stored in another data structure or perhaps consumed by another operation.
We stated that a stream is a pipeline of functions, or operations. These operations can either be classed as an intermediate operation or a terminal operation. The difference between the two is in the output which the operation creates. If an operation outputs another stream, to which you could apply a further operation, we call it an intermediate operation. However, if the operation outputs a concrete type or produces a side effect, it is a terminal type. A subsequent stream operation cannot follow a terminal operation, obviously, as a stream is not returned by the terminal operation!

Intermediate operations

An intermediate operation is always lazily executed. That is to say they are not run until the point a terminal operation is reached. We’ll look in more depth at a few of the most popular intermediate operations used in a stream.
  • filter – the filter operation returns a stream of elements that satisfy the predicate passed in as a parameter to the operation. The elements themselves before and after the filter will have the same type, however the number of elements will likely change.
  • map – the map operation returns a stream of elements after they have been processed by the function passed in as a parameter. The elements before and after the mapping may have a different type, but there will be the same total number of elements.
  • distinct – the distinct operation is a special case of the filter operation. Distinct returns a stream of elements such that each element is unique in the stream, based on the equals method of the elements.
Here’s a table that summarises this, including a couple of other common intermediate operations.

FunctionPreserves countPreserves typePreserves order
map
filter
distinct
sorted
peek

Terminal operations

A terminal operation is always eagerly executed. This operation will kick off the execution of all previous lazy operations present in the stream. Terminal operations either return concrete types or produce a side effect. For instance, a reduce operation which calls the Integer::sum operation would produce an Optional, which is a concrete type. Alternatively, the forEach operation does not return a concrete type, but you are able to add a side effect such as print out each element. The collect terminal operation is a special type of reduce which takes all the elements from the stream and can produce a SetMap or List. Here’s a tabulated summary.

FunctionOutputWhen to use
reduceconcrete typeto cumulate elements
collectlist, map or setto group elements
forEachside effectto perform a side effect on elements


All Terminal Operations.

Difference between collect_set & collect_list

hive (venkat_db)> describe purchases;
OK
salesrepid string
purchaseorderid int
amount int
hive (venkat_db)> select * from purchases;
OK
Daniel 8 160
Daniel 8 160
Jana 9 100
Nadia 10 660
Nadia 10 600
Jana 1 100
Nadia 2 200
Nadia 3 600
Daniel 4 80
Jana 5 120
William 6 170
Daniel 7 140
select salesrepid, collect_set(amount) from purchases group by salesrepid;
Daniel [160,80,140]
Jana [100,120]
Nadia [660,600,200]
William [170]
hive (venkat_db)> select salesrepid, collect_list(amount) from purchases group by salesrepid;
Daniel [160,160,80,140]
Jana [100,100,120]
Nadia [660,600,200,600]
William [170]

Tuesday, July 4, 2017

Java Transient Modifier


A transient modifier applied to a field tells Java that this attribute should be excluded when the object is being serialized. When the object is being deserialized, the field will be initialized with its default value (this will typically be a null value for a reference type, or zero/ false if the object is a primitive type).


Where should we apply a transient modifier? When will it be useful?

Think of a User object. This User contains, among all its properties, login, email, and password. When the data is being serialized and transmitted through the network, we can think of a few security reasons why we would not like to send the field password together with the entire object. In this case, marking the field as transient will solve this security problem.




In summary: Use transient when an object contains sensitive data that you do not want to transmit, or when it contains data that you can derive from other elements. Static fields are implicitly transient.


Synchronized & Volatile in Java.

In computing world, a resource can be accessed from different threads concurrently. This can lead to inconsistency and data corruption. A thread ThreadA accesses a resource and modifies it. In the meantime, the thread ThreadB starts accessing the same resource. Data may get corrupted since it is concurrently being modified. 

Example to show, how multi-threading works. 

Synchronized Modifier
Let's start with a sample example with out the synchronized block.



Following program generated different outputs, because the same method printCount is accessed by two threads and called. 
Output-1:


Output-2: 
 Output-3:

Both modifiers deal with multi-threading and protection of code sections from threads accesses. 


The synchronized modifier can be applied to a statement block or a method. Synchronized provides protection by ensuring that a crucial section of the code is never executed concurrently by two different threads, ensuring data consistency. Let´s apply the modifier synchronized to the previous example to see how it would be protected:




After adding the Synchronized block in the code, following is the output.



Volatile Modifier

Essentially, volatile is used to indicate that a variable's value will be modified by different threads.
Declaring a volatile Java variable means:
  • The value of this variable will never be cached thread-locally: all reads and writes will go straight to "main memory";
  • Access to the variable acts as though it is enclosed in a synchronized block, synchronized on itself.
We say "acts as though" in the second point, because to the programmer at least (and probably in most JVM implementations) there is no actual lock object involved. Here is how synchronized and volatile compare:
Difference between synchronized and volatile

Characteristic
Synchronized
Volatile
Type of variable
Object
Object or primitive
Null allowed?
No
Yes
Can block?
Yes
No
Yes
From Java 5 onwards
When synchronization happens
When you explicitly enter/exit a synchronized block
Whenever a volatile variable is accessed.
Can be used to combined several operations into an atomic operation?
Yes
Pre-Java 5, no. Atomic get-set of volatiles possible in Java 5.

In other words, the main differences between synchronized and volatile are:
  • a primitive variable may be declared volatile (whereas you can't synchronize on a primitive with synchronized);
  • an access to a volatile variable never has the potential to block: we're only ever doing a simple read or write, so unlike a synchronized block we will never hold on to any lock;
  • because accessing a volatile variable never holds a lock, it is not suitable for cases where we want to read-update-write as an atomic operation (unless we're prepared to "miss an update");
  • a volatile variable that is an object reference may be null (because you're effectively synchronizing on the reference, not the actual object).
Attempting to synchronize on a null object will throw a NullPointerException.


int firstVariable;
int getFirstVariable() {return firstVariable;}
volatile int secondVariable;
int getSecondVariable() {return secondVariable;}
int thirdVariable;
synchronized int getThirdVariable() {return thirdVariable;}



The first method is non-protected. A thread T1 will access the method, create its own local copy of firstVariable, and play with it. In the meantime, T2 and T3 can also access firstVariable and modify its value. T1, T2, and T3 will have their own values of firstVariable, which might not be the same and that have not been copied to the Main memory of Java, where the real results are held.

getSecondVariable(), on the other hand, accesses a variable that has been declared as volatile. That means, each thread is still able to access the method or block since it has not been protected with synchronized, but they will all access the same variable from the main memory, and will not create their own local copy. Each thread will be accessing the same value.

And as we can imagine from the previous examples, getThirdVariable() will only be accessed from one thread at a time. That ensures that the variable will remained synchronized throughout all of the threads executions.





Ref:
http://www.javamex.com/tutorials/java_final_performance_vs_non_final.shtml
http://javarevisited.blogspot.in/2011/06/volatile-keyword-java-example-tutorial.html
https://stackoverflow.com/questions/3519664/difference-between-volatile-and-synchronized-in-java






Use of Mutability in Java


An immutable class is a class whose state cannot be changed. This can be done in three ways.

  • Private: If a field is private, it cannot be externally changed. If it's a final, you cannot accidentally change it. 
  • Do not provide the setter: In support to the earlier point, if the fields are private and no setter is provided, this class fields always remain the same. 
  • Declare as final: Sub-classes should not be able to override the methods. 


Pro's:


  • Immutable objects can make life easier. An immutable object is automatically thread-safe, and it has no synchronization issues. They make concurrent programming safer and cleaner.
  • Immutable objects do not need a copy constructor or an implementation of clone. For the sake of simplicity.
  • When an immutable object throws any exception, it will never result with the object left in an indeterminate state. - atomicity by Joshua Bloch in Effective Java


Con's:

  • Making copies of objects in order to mutate them can drain our resources, especially when dealing with complex data structures. Also, changing objects with a distinct identity can be also more intuitive.

In Effective Java, Joshua Block writes:
“Classes should be immutable unless there’s a very good reason to make them mutable. If a class cannot be made immutable, limit its mutability as much as possible.”


Immutability can solve many problems and make your life easier, especially when it comes to working in a concurrent environment.

Monday, July 3, 2017

Hive RANK function

RANK() function in SQL is used to allocate a sequential incremented number (rank) to each of the row in question, based on a particular column. It acts like ROW_NUMBER function with only difference that if two rows have same value, they will be given same rank.

ROW_NUMBER(): attributes a unique value to each row
RANK(): attributes the same row number to the same value, leaving "holes"
DENSE_RANK(): attributes the same row number to the same value, leaving no "holes"

Let's do this by creating a table in HIVE.

CREATE TABLE test_test AS
SELECT 'a' v FROM dual UNION ALL
SELECT 'a' FROM dual UNION ALL
SELECT 'a' FROM dual UNION ALL
SELECT 'b' FROM dual UNION ALL
SELECT 'c' FROM dual UNION ALL
SELECT 'c' FROM dual UNION ALL
SELECT 'd' FROM dual UNION ALL
SELECT 'e' FROM dual;

select v, row_number() over (order by v) row_number, rank() over (order by v) rank, dense_rank() over (order by v) dense_rank from test_test order by v;

After running this query, that took 2 mapreduce jobs and here is following output.

a 3 1 1
a 2 1 1
a 1 1 1
b 4 4 2
c 6 5 3
c 5 5 3
d 7 7 4
e 8 8 5

rank() : It is used to rank a record within a group of rows.
dense_rank() : The DENSE_RANK function acts like the RANK function except that it assigns consecutive ranks.

Query -
select
ENAME,SAL,
RANK() over (order by SAL) RANK
from EMP;

Output -
+--------+------+------+
| ENAME | SAL | RANK |
+--------+------+------+
| SMITH | 800 | 1 |
| JAMES | 950 | 2 |
| ADAMS | 1100 | 3 |
| MARTIN | 1250 | 4 |
| WARD | 1250 | 4 |
| TURNER | 1500 | 6 |
+--------+------+------+

Query -
select
ENAME,SAL,dense_rank() over (order by SAL) DEN_RANK
from
EMP;

Output -
+--------+------+-----------+
| ENAME | SAL | DEN_RANK |
+--------+------+-----------+
| SMITH | 800 | 1 |
| JAMES | 950 | 2 |
| ADAMS | 1100 | 3 |
| MARTIN | 1250 | 4 |
| WARD | 1250 | 4 |
| TURNER | 1500 | 5 |
+--------+------+-----------+

Let's understand with this simple query on the sample EMP table in SCOTT schema:

SELECT empno, deptno, sal, 
rank() over(partition BY deptno order by sal) rn
FROM emp;

     EMPNO     DEPTNO        SAL         RN
---------- ---------- ---------- ----------
      7934         10       1300          1
      7782         10       2450          2
      7839         10       5000          3
      7369         20        800          1
      7876         20       1100          2
      7566         20       2975          3
      7788         20       3000          4
      7902         20       3000          4
      7900         30        950          1
      7654         30       1250          2
      7521         30       1250          2
      7844         30       1500          4
      7499         30       1600          5
      7698         30       2850          6

14 rows selected.

RANK function is a built in analytic function which is used to rank a record within a group of rows.
The PARTITION BY clause in the window is what groups the rows, and the ORDER BY clause tells how to rank, i.e. which row in each group will hold the 1st rank and then assign next rank to the next rows in that order.
  • GROUP
  • SORT
  • Assign rank
So, in above example, the rows are grouped by department, and ordered by salary. In each group, the rank is assigned starting from the lowest salary(ascending order). When there is a tie, the rank is not incremented, however, the immediate next row with change in value will not have consecutive sequence. And that's what happened here:

7654         30       1250          2
7521         30       1250          2
7844         30       1500          4

The rank is not consecutive because there is a tie between two rows with salary 1250. To keep the sequence consecutive, you need to use DENSE_RANK.
Another set of example.

Windowing and Analytics functions.

1. Sales Rep data
Here is a CSV file with Sales Rep data:
$ more reps.csv 
1,William,2
2,Nadia,1
3,Daniel,2
4,Jana,1
Create a Hive table for the Sales Rep data:
create table SalesRep (
  RepID INT,
  RepName STRING,
  Territory INT
  )
  ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    LINES TERMINATED BY '\n';

... and load the CSV into the Hive Sales Rep table:
LOAD DATA 
 LOCAL INPATH '/home/hadoop/MyDemo/reps.csv'
 INTO TABLE SalesRep;

2. Purchase Order data
Here is a CSV file with PO data:
$ more purchases.csv 
4,1,100
2,2,200
2,3,600
3,4,80
4,5,120
1,6,170
3,7,140
Create a Hive table for the PO's:
create table purchases (
SalesRepId INT,
PurchaseOrderId INT,
Amount INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
... and load CSV into the Hive PO table:
LOAD DATA
LOCAL INPATH '/home/hadoop/MyDemo/purchases.csv'
INTO TABLE purchases;

3. Hive JOIN
So this is the underlining data that is being worked with:
SELECT p.PurchaseOrderId, s.RepName, p.amount, s.Territory
FROM purchases p JOIN SalesRep s
WHERE p.SalesRepId = s.RepID;
PO IDRep
Amount
Territory
1Jana1001
2Nadia2001
3Nadia6001
4Daniel802
5Jana1201
6William1702
7Daniel1402

4. Hive Rank by Volume only
SELECT 
  s.RepName, s.Territory, V.volume, 
rank() over (ORDER BY V.volume DESC) as rank
FROM 
  SalesRep s
  JOIN 
    ( SELECT
      SalesRepId, SUM(amount) as Volume
      FROM purchases
      GROUP BY SalesRepId) V
  WHERE V.SalesRepId = s.RepID
  ORDER BY V.volume DESC;
Rep
TerritoryAmount
Rank
Nadia18001
Daniel22202
Jana12202
William21704
The ranking over the entire data set - Daniel is tied for second among all Reps.

5. Hive Rank within Territory, by Volume
SELECT 
  s.RepName, s.Territory, V.volume, 
  rank() over (PARTITION BY s.Territory ORDER BY V.volume DESC) as rank
FROM 
  SalesRep s
  JOIN 
    ( SELECT
      SalesRepId, SUM(amount) as Volume
      FROM purchases
      GROUP BY SalesRepId) V
  WHERE V.SalesRepId = s.RepID
  ORDER BY V.volume DESC;
Rep
TerritoryAmount
Rank
Nadia18001
Jana12202
Daniel22201
William21702
The ranking is within the territory - Daniel is the best is his territory.

Difference between Hive AND Impala
Impala is an excellent choice