Tuesday, December 31, 2019

BigData MapReduce Concept

MapReduce

Is a programming Model to process large datasets in parallel.
MapReduce divides the task into subtasks and handles them in parallel.
Input and Output always be a key-value format.

  • Map
  • Reducer

Map - You have to write a program that can produce local(key, value) pairs.
Eg:- If your data(ex. data is x,y,z) is in 3 data nodes.
After Map - you will get a key-value pair. i.e, data is key & count is value.
DataNode1 o/p => (x,3),(y,3),(z,4)
DataNode2 o/p => (x,3),(y,3),(z,4)
DataNode3 o/p => (x,4),(y,4),(z,3)

Shuffle - Single key and all the values of that key are brought together. This will happen automatically.
Eg:- After the Shuffle phase, for each key, the value from all the data nodes will be accumulated.
(x,(3,3,4))
(y,(3,3,4))
(z,(4,4,3))

Reducer - You have to write a program that will read the key from Shuffle & sum the values of a specific key. The output will be a key-value pair.
Eg:- After the Reducer phase, you will get the following output.
(x,10)
(y,10)
(z,10)

BigData Concepts

BigData Concepts

- Big Data is a massive volume of both structured and unstructured data.
- Big Data is characterized by 3V's. i.e., Volume, Velocity and Variety
                 (Volume - How much data is generating)
                 (Velocity - What pace/speed the data is generating)
                 (Variety - Unstructured data. Ex. picture data, video data, log data..)
- Problems of Big Data is :
                 Storage of the large volumes of Data.
                 Processing of the large volumes of Data.
                 Resource Management.

Hadoop

- It is an Infrastructure. It is a software by using which we can solve the Big Data problems.
  • HDFS - Distributed File System - Solve the problem of Storage.
                                         Horizontal scaling
         Distributed DB = Partitioning  (3GB = 3 * 1GB machines) + Replication (Making multiple copies of the same data at different places - Replication factor is 3 - Fault Tolerance) 
  • Map Reduce - Distributed Computing Engine - Solve the problem of Processing
                                         Shared-Nothing Architecture
  • YARN - Cluster/Resource Manager - Solve the problem of Resource Management.
                                                    Scheduling & Coordination

Data Ecosystems in Enterprise:
Hadoop - Is an additional layer, will allows you to process the BigData in the EcoSystem, which was not done before because of many limitations. Hadoop will co-exists with existing technologies/systems and works with them.



Hadoop:
- Inexpensive commodity hardware
- Free Open Source software
- Scalable
- Reliable
- Enable data archival and reporting
- Enable cutting edge analytics

Hadoop Features:
- Designed to store large files (huge data)
- Processing the data sequentially. So it is good for analyzing entire datasets quickly.
- Run large batch processes that may take several hours, keeps running despite partial failure of the cluster.
- Handles Scheduling & Coordination very well.
- Can store and process unstructured data(log files, text, image, audio, vedio...).

Hadoop is not desined For :
- Processing small files.
- Random access of the data retrieval & should not be used to run Transactional applications.
- Interactive querying. Most of the jobs will take at least several minutes to process.
- Not allows modification of data in place. It is a write once, read many times system. New data can be appended to the file.
- Does n't meet ACID standards, 3NF, data quality in the way that relational databases do.
- Hadoop is not a replacement for your existing database systems!

Enterprise = Hadoop(bulk storage for analytics) + RDBMS(for business operations) + NO SQL DB(for run a website)

Hadoop EcoSystem:



Hadoop = HDFS + YARN + MapReduce


MapReduce - A programming framework for parallel processing of data. Hadoop coordinates execution throughout the cluster.

Pig and Hive - These tools allow the user to write data processing programs. Internally those commands are translated into MapReduce jobs.

Cluster - Set of host machines. Its an hardware infrastructure.

YARN - Resource Manager + Node Manager

Resource Manager(Like Project Manager) - Manage all the Resources - One per Cluster. (Installed on Master Machine) - Monitor the resources at regular intervals. If any resource is not working then it will create a backup for that resource.
- Resource Manager not store any kind of data.
Node Manager(Like Project Developer) - Each Machine has one Node Manager. (Installed on all Slave Machines). It will update the status of each machine is working fine to Resource Manager using heart beat. And provide the Resources, means it will update the status(memory, storage space...) information on each machine to Resource Manager. Container is nothing but a Machine.








HDFS - Hadoop Distributed File System on top of UNIX file system.

HDFS get the information of Resources by YARN.
- When user want to store File(1TB) into HDFS, Based on the Resources information, HDFS split the File(1TB) into small pieces/blocks and store in different machines.
- When user wants the File(1TB) back, then HDFS collect all the pieces and combine them and give it back to the user.
- HDFS

HDFS replicate the received File(Ex.,1TB) and store on different machines for backup and Fault tolerance. Default replication factor is 3.

HDFC = Like YARN you have Master/Slave architecture.
NameNode = Master = Installed on only Master machine = Don't store any data = Job is to manage where different blocks of data is stored on slave machines. It has all meta data information(where different data blocks are stored...). Has the information of all the blocks and store location and all meta data/folder structure information.
- Store the Metadata in memory for faster access.
- NameNode has backup called Secondary NameNode in case of Primary NameNode failure.
- Replication factor info store by NameNode but actual replication is done by DataNode. NameNode replicate DataNode blocks in event of failure.
DataNode = Slave =  Installed on all slave machines = Store all the data in different blocks.

HDFC data blocks = Large file into small chunks/blocks = Each chunk/block is 128MB in size.

- Hadoop can easily calculate how many blocks can fit on a Node. These blocks are large enough to read quickly from disk.



Saturday, October 19, 2019

PIG Project work

BigData PIG Project work
  • LOAD
  • FILTER
  • FOREACH ... GENERATE
  • SPLIT
  • GROUP
  • JOIN
  • DESCRIBE
  • EXPLAIN
  • ILLUSTRATE
  • DUMP
> pig -x local
> pig -x local [script]
> pig -x hadoop [script]

Case Study 1:
Movies dataset with 50000 observations. This dataset has 5 columns(Id, Name, Year, Rating, Duration).

1) grunt> mov = load 'Desktop/Basha/Basha2019/PIG_Practicals/movies_data.xls' using PigStorage(',') as (id:int, name:chararray, year:int, rating:float, duration:int);
grunt> describe mov;
mov: {id: int,name: chararray,year: int,rating: float,duration: int}
grunt> dump mov;

2) Movies list which has rating>4.
grunt> mov_ratingfour = filter mov by (float)rating>4.0;

3) List of movies in the file.
grunt> mov_group = group mov all;
grunt> mov_count = foreach mov_group generate COUNT(mov.id);
grunt> dump mov_count;

4) List title and duration from the file & display the list using duration in DESC.
grunt> mov_duration = foreach mov generate name,(double)duration/60;
grunt> mov_notnull = filter mov_duration by $1 is not null;
grunt> mov_duration_order = order mov_notnull by $1 DESC;
grunt> mov_long = LIMIT mov_duration_order 50;
grunt> dump mov_long;

5) Grouping file List using year.
grunt> mov_group_year = group mov by year;
grunt> mov_group_rating = foreach mov_group_year generate group as year, MAX(mov.rating) as highest_rating;
grunt> dump mov_group_rating;

6) JOIN concept in movies dataset.
grunt> mov_join = JOIN mov_group_rating by (year,highest_rating),mov by (year,rating);
grunt> describe mov_join;
mov_join: {mov_group_rating::year: int,mov_group_rating::highest_rating: float,mov::id: int,mov::name: chararray,mov::year: int,mov::rating: float,mov::duration: int}
grunt> mov_best = foreach mov_join generate $0 as year,$3 as title,$1 as rating;
grunt> describe mov_best;
mov_best: {year: int,title: chararray,rating: float}
grunt> dump mov_best;

CaseStudy 2:
We have a demonetization dataset. We will extract the twitter #Demonitisation tweets and we will want to do some kind of sentimental analysis. Like the people are +ve or -ve sentiment about demonetization.

1) Load the demonetization dataset.
grunt> tweet_load = load 'Desktop/Basha/Basha2019/PIG_Practicals/demonitization_tweets.csv' using PigStorage(','); 
2) Extract id, text columns from the tweets.
grunt> tweet_extract = foreach tweet_load generate $0 as id, $1 as text;
grunt> describe tweet_extract;
tweet_extract: {id: bytearray,text: bytearray}

3) Tokenize the text column value.
grunt> tweet_tokens = foreach tweet_extract generate id, text, FLATTEN(TOKENIZE(text)) as word;
grunt> describe tweet_tokens;
tweet_tokens: {id: bytearray,text: bytearray,word: chararray}

4) Using AFINN dictionary, we will define the +ve/-ve words.
grunt> tweet_dictionary = load '/home/cloudera/Desktop/Basha/Basha2019/PIG_Practicals/AFINN.txt' USING PigStorage('\t') as (word:chararray, rating:int);

5) Join the tweet_tokens and tweet_dictionary.
grunt> tweet_join = join tweet_tokens by word left outer, tweet_dictionary by word using 'replicated';
grunt> describe tweet_join;
tweet_join: {tweet_tokens::id: bytearray,tweet_tokens::text: bytearray,tweet_tokens::word: chararray,tweet_dictionary::word: chararray,tweet_dictionary::rating: int}

6) Tweet Rating.
grunt> tweet_rating = foreach tweet_join generate tweet_tokens::id as id,tweet_tokens::text as text, tweet_dictionary::rating as rate;

7) Grouping the word.
grunt> tweet_word_group = group tweet_rating by (id,text);

8) Average of tweet_rating rate value.
grunt> tweet_avg_rate = foreach tweet_word_group generate group,AVG(tweet_rating.rate) as tweet_finalrating;

9) +ve & -ve tweets.
grunt> tweet_positive = filter tweet_avg_rate by tweet_finalrating>=0;
grunt> tweet_negative = filter tweet_avg_rate by tweet_finalrating<0;