CS4225 Summary (Hadoop Related Tools)
Performance Guidelines for Big Data Tools
- Linear Scalability: More nodes can do more work at the same time. Linear on data size and linear on computer resources.
- Minimise the amount of I/Os in hard disk and network. Minimum sequential, random IOs
- Memory working set of each task/worker. Larger memory working set -> High probability of failures.
What is Hadoop?
It is a framework that manages big data storage in a distributed way and processes it parallely.
Components of Hadoop:
- Storage Unit of Hadoop
- Processing Unit of Hadoop
- Resource Management Unit of Hadoop (YARN)
Distributed File System (HDFS) (Storage unit of Hadoop):
HDFS is specially designed for storing huge datasets on commodity machines. They have 2 core components: (1) NameNode (2) DataNode.
There is only 1 NameNode. There can be multiple DataNodes.
Master/slave nodes typically form the HDFS cluster.
Store meta data of the file system. Maintains and manages DataNode.
Directs clients to data nodes for writes and reads
No data moves through namenodes
Store actual data. Perform reading, writing and processing work.
HeartBeat are the signals that data node continuously send to NameNode.
- Scale out concept
- Write once, appended to
- Files stored in chunks where each chunk is replicated across 3+ chunk servers. The blocks of a file are replicated for fault tolerance. The block size and replication factor are configurable per file. An application can specify the number of replicas of a file.
- For the common case, when the replication factor is three, HDFS’s placement policy is to put one replica on one node in the local rack, another on a node in a different (remote) rack, and the last on a different node in the same local rack.
- This policy cuts the inter-rack write traffic which generally improves write performance. The chance of rack failure is far less than that of node failure; this policy does not impact data reliability and availability guarantees. However, it does reduce the aggregate network bandwidth used when reading data since a block is placed in only two unique racks rather than three. With this policy, the replicas of a file do not evenly distribute across the racks. One third of replicas are on one node, two thirds of replicas are on one rack, and the other third are evenly distributed across the remaining racks. This policy improves write performance without compromising data reliability or read performance.
- There is a single master node to coordinate access and keep meta data.
- No data caching is done due to large streaming data.
Hadoop MapReduce (Processing unit of Hadoop)
- MapReduce is a programming technique where huge data is processed in a parallel and distributed fashion.
- Brute force method alternative to traditional relational Databases with schemas
- No Indexing involved
Map Reduce Runtime
- Handles scheduling where we assign workers to map and reduce tasks
- Moves processes to data
- Handles synchronisation where it gathers, sort and shuffles intermediate data
- Handles errors and fault: Detect worker failures and restarts
- Map -> Execute computations in parallel map functions
- Reduce -> All values with the same key are aggregated together in the same reduce function
- Partitioner -> Stage between map and reduce -> Controls which reducers process key -> Partitioning of the keys of the intermediate map output is controlled by Partitioner. Hash function, is used to derive partition. On the basis of key-value pair each map output is partitioned. Record having same key value goes into the same partition (within each mapper), and then each partition is sent to a Reducer. Default Partitioner (Hash Partitioner) computes a hash value for the key and assigns the partition based on this result
- Combiner -> Optimisations that cannot change the correctness of the algorithm. Combiners must have the same inputs and output key value types -> Think of it as mini reducers that run in memory after the map phase. A Combiner, also known as a semi-reducer, is an optional class that operates by accepting the inputs from the Map class and thereafter passing the output key-value pairs to the Reducer class. The main function of a Combiner is to summarize the map output records with the same key. The primary goal of Combiners is to save as much bandwidth as possible by minimizing the number of key/value pairs that will be shuffled across the network and provided as input to the Reducer. The output (key-value collection) of the combiner will be sent over the network to the actual Reducer task as input.
Note that we still write the results to disk after finishing the Map phase and before going to the Reduce phase to ensure stable storage.
Keys arrive at each reducer in sorted order. But there is no enforced ordering across reducers.
The sequence of execution of the mentioned components happens in the below order:
Mapper -> Combiner -> Partitioner -> Reducer
- A framework for implementing RPC in services, with cross-language support.
- RPC Remote Procedure Call is like calling a function, only that it is present remotely on a different server as a service. A service exposes many such functions/procedures to its client and the client needs some way to know what are the functions/procedures that this service provides. Hence this is where Apache Thrift comes in. It has its own Interface Definition Language which allows you to define what are the functions and parameters (like API calls). The Thrift compiler will then generate the corresponding code for any language of your choice.
Hadoop + DB = HadoopDB
- We know that there are advantages of column stores (store data by putting data of the same columns together) as queries are often concerned with a certain column and it can help save time in retrieval.
- HadoopDB is a MapReduce Hybrid which has parallel databases to focus on performance and also have hadoop to focus on scalability, flexibility and fault tolerance.
- The idea is to store a RDBMS on every slave node. We follow the usual Hadoop architecture where we have a Hadoop core and many slave nodes.
But..some more problems with Hadoop
- Writing Java programs is verbose and slow. We need a high level data processing language.
- Hadoop is slow when we only have a small task to perform due to its’ data shuffling, writing in and out to disks. The overheads becomes big when the actual computation is small.
- Hence here comes Hive and Pig. PIG and Hive are Script languages, which translate high-level commands to MapReduce execution, simplifying Hadoop parallel programming, which uses the Java language.
- Hive is an SQL based application that utilises Hadoop framework (operations are based on map reduces) to process data in HDFS (which is a way to store big data).
- We call Hive a data warehouse application in Hadoop where the query language is HQL, variant of SQL.
- Hive is intended as a convenience or interface for querying data stored in HDFS.
- The meta store in Hive holds metadata such as the databases, tables, schemas, permission information.
- Hive data are stored in HDFS by placing tables in directories, partition of tables in sub-directories and actual data in files.
- Hadoop can only run MapReduce operations. What if we want to make use of HDFS but use another framework? In this case, we need the help of Yarn (Yet another resource negotiator) which can provide an API to develop any generic distribution application. MapReduce is just one application in YARN.
- Yarn is a resource management and a central platform to deliver consistent operations, security, and data governance tools across Hadoop clusters (HDFS).
- Key-Value stores
- Column-oriented Databases
- Document Stores
- Graph Databases
- An open source distributed SQL query engine. It is a SQL database with no storage.
- Presto CLI: Command Line
- Presto Coordinator: Master
- Presto Worker: Slave nodes
- The clients connects to the coordinator and issues queries to it. It parses, analyses, plans and schedules queries. Data is pipelined from workers back to the client. The workers then fetches data from connectors (interface to data sources) eg, Hive, MongoDB, Elastic.
- Designed to display live data analytics.
How is Presto different from Hive?
- Hive is optimised for query throughput while presto is optimised for latency (limit on the maximum amount of memory that each task in a query can store).
- Hive has a pull model: Map reduce stage pulls data from the preceding tasks. Uses Map reduce architecture.
- Presto has a push model: Traditional implementation of DBMS, processing a SQL query using multiple stages running concurrently. An upstream stage receives data from its downstream stages, so the intermediate data can be parsed directly without using disks. It uses HDFS architecture without using Map Reduce.
- An open source engine for large scale batch processing. It supports generalised dataflows and is written in Scala, with bindings in Java and Python.
Problems with Map Reduce
-> Need to write in Java. While Hive provides a good interface, using just SQL programming language, it only has Mapper and Reduce API. In reality, we need to do more work than that.
-> For batch processing, not good for real time processing.
-> Unsuitable for trivial operations such as Filter, Joins -> Because of key value pattern
-> Performance. Cannot do well for data with lots of data shuffling. Each Map Reduce job reads and writes output to disk.
-> Unfit for processing graphs
-> State-less execution, doesnt fit with use cases like kmeans that needs iterative execution
- Spark is a framework that does not adopt the MapReduce layer of Hadoop. Its primary motivation is to carry out processing “in memory”. Hadoop with MapReduce is a typical batch operation, and therefore slower compared to the processes called “in-memory.” Spark was developed to address the limitations of MapReduce, which does not keep the data in-memory after processing for use and immediate analysis.
- Hence, Spark wants to solve these problems. It is motivated by iterative computation and its’ signature is in-memory computing.
- RDD -> DataFrame -> DataSet
- All transformations in Spark are lazy, in that they do not compute their results right away.
- Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently — for example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.
- Important operations: cache(), count()
- We can do cache on data to achieve faster results. But bear in mind that the amount of RAM limits the amount of cache we can do.
- For recovery, Spark uses lineage information. Spark does not replicate the data in DRAM as it is costly.
- A collection of Row objects. Expression based operations with logical plans and optimiser.
- Note that RDD is still the core of programming in Spark. DataFrames API will generate RDD.
- DataFrames hold rows with a known schema and offer relational operations on them.
- Internally rows, externally JVM objects. Combines RDD and DataFrames. Type Safe + Fast.
- Like DataFrames, Datasets take advantage of Spark’s Catalyst optimizer by exposing expressions and data fields to a query planner. Datasets also leverage Tungsten’s fast in-memory encoding.
- Datasets extend these benefits with compile-time type safety — meaning production applications can be checked for errors before they are run. They also allow direct operations over user-defined classes.
- In the long run, we expect Datasets to become a powerful way to write more efficient Spark applications.
Check out another article which i have posted on Spark.
Characteristics of Graph Algorithms
- They often exhibit poor locality of memory access.
- Very little computation work is required per vertex
- Changing degree of parallelism over the course of execution
- Distribution over many machines is error prone.
Systems for Graph Data Processing
Graph Processing in Hadoop:
- Disadvantage: Iterative algorithms are slow due to lots of read and writes to disk.
- Advantage: No additional libraries needed
- Hence, introducing Pregel and Giraph
Pregel: A System for Large-Scale Graph Processing
Pregel: A System for Large-Scale Graph Processing - Malewicz et al. (Google) 2010 "Many practical computing problems…
- Pregel is the system at Google that powers PageRank. It is designed to provide scalability, fault-tolerence and flexibility to express arbitrary algorithms.
- Pregel keeps vertices and edges on the machine that performs computation, and uses network transfers only for messages. MapReduce, however, is essentially functional, so expressing a graph algorithm as a chained MapReduce requires passing the entire state of the graph from one stage to the next — in general requiring much more communication and associated serialization overhead. In addition, the need to coordinate the steps of a chained MapReduce adds programming complexity that is avoided by Pregel’s iteration over supersteps.
- Pregel uses a message passing architecture whereby computations is consisted of a sequence of iterations, called supersteps. In a superstep, in parallel, each vertex executes a user defined function to modify its state. It receives a message directed at it from the previous super step and then emits messages to another vertex in its outgoing path (or any node whom it needs to communicate with) at the next superstep.
- The Pregel terminates when every vertex is inactive. A node deactivates itself by voting to halt, after which time the Pregel framework will not execute that vertex in subsequent supersteps unless it receives a message.
- Has a Master-Worker implementation.
Case Scenario application:
- Four supersteps to complete a maximum value calculation for a graph with four nodes.
- In each step, each vertex reads any incoming messages and sets its value to the maximum of its current value and those sent to it.
- It then sends this maximum value out along all of its edges. If the maximum value at a node does not change during a superstep, the node then votes to halt.
- Master Slave architecture whereby vertices are hash partitioned and assigned to workers. The Pregel library divides a graph into partitions, each consisting of a set of vertices and all of those vertices’ outgoing edges. Assignment of a vertex to a partition depends solely on the vertex ID, which implies it is possible to know which partition a given vertex belongs to even if the vertex is owned by a different machine, or even if the vertex does not yet exist. The default partitioning function is just hash(ID) mod N, where N is the number of partitions, but users can replace it. Everything is In memory.
- Processing cycle: The master node tells all workers to advance a single superstep. Worker then delivers messages from previous superstep and execute vertex computation. Workers then notify master how many active nodes.
- Fault tolerance is achieved through checkpointing. At the beginning of a superstep, the master instructs the workers to save the state of their partitions to persistent storage, including vertex values, edge values, and incoming messages; the master separately saves the aggregator values.
- When one or more workers fail, the current state of the partitions assigned to these workers is lost. The master reassigns graph partitions to the currently available set of workers, and they all reload their partition state from the most recent available checkpoint at the beginning of a superstep S.
More applications: PageRank, single-source shortest path, bipartite maching, and semi-clustering algorithms implemented using Pregel’s programming model.
Refer to the original paper:
Refer to this very informative slides below for more on Pregel and Giraph.
- Giraph originated from Pregel.
- It originated as the Open Source Counterpart to Pregel, with several additional features. Giraph runs in memory and can be a task in MapReduce. Giraph uses ZooKeeper for synchronization through the network nodes.
- It is an implementation of Graph algorithms in Hadoop clusters.
- We do not implement Giraph with multiple Map Reduce jobs because there is too much disk involved, no in memory caching and a superstep becomes a job. Hadoop is purely a resource manager for Giraph, all communication is done through Netty-based IPC.
GraphX= Spark for Graph
- Spark is more efficient in iterative computation than Hadoop.
Systems for Streaming
- Storm recorded and analyses streaming data in real time such as device-generated logins, e-commerce, sensor data generation on the Internet of Things, social network information, and geospatial services.
- Storm is a Hadoop framework used for real time data streaming.
- Storm Topologies = “job”. Once started, it runs continuously until it is killed.
- Storm Topology = a computation graph. Graph contains nodes and edges.
- Each node holds the processing logic and Stream techniques such as Hashing (Flajolet Martin Counter, Bloom filter, Count Min Hash) and Reservoir Sampling can be implemented in its’ node logic.
- Directed edges indicate communication between nodes.
- Spouts: Data generators
- Bolt: Nodes that have processing logic and can implement hashing or sampling.
To Explore More (TODO): https://www.michael-noll.com/blog/2012/10/16/understanding-the-parallelism-of-a-storm-topology/
Overall Picture of Hadoop
- Hadoop is the Big Data operating system. Optimized for parallel processing using structured and unstructured data. It ecosystem, is composed of frameworks, open source software, libraries and methodologies for data analysis.
- The idea behind Hadoop is: Batch processing operations with large sets of static data based on reading and writes to disk and returning the result later when completing the data processing.
- The Hadoop platform comprises an Ecosystem including its core components, which are HDFS (File Management System), YARN (Resource Manager and Service Scheduler), and MapReduce (Parallel Programming Model).
- And, other applications such as High-level languages (PIG for Script and HIVE for SQL queries). Graphing (GIRAPH). Real-time and memory processing (STORM, SPARK, FLINK). NoSQL Database (HBase, CASSANDRA, MongoDB). Service Management (ZOOKEEPER).
- MapReduce provides access to high-level applications using scripts in languages such as Hive and Pig, and programming languages as Scala and Python.
- PIG and Hive are Script languages, which translate high-level commands to MapReduce execution, simplifying Hadoop parallel programming, which utilises the Java language.
- Giraph is a scalable and fault-tolerant implementation of graphic algorithms in Hadoop clusters for thousands of computing nodes.
- The needs to analyze data in large volumes, from different sources and formats, has given rise to NoSQL (Not Only SQL) technology. Hbase, Cassandra and MongoDB are the NoSQL Hadoop applications.
- To capture data or to move them into Hadoop we have two tools that are an integral part of the Hadoop Ecosystem, called FLUME and SQOOP.
- Spark is a framework that does not adopt the MapReduce layer of Hadoop. Its primary motivation is to carry out processing “in memory”. Hadoop with MapReduce is a typical batch operation, and therefore slower compared to the processes called “in-memory.” Spark was developed to address the limitations of MapReduce, which does not keep the data in-memory after processing for use and immediate analysis. Storm recorded and analyzed streaming data in real time. Sqoop is a software designed to transfer data between relational and Hadoop database systems.
- Apache ZooKeeper is Hadoop’s centralized network coordination service, able to maintain Hadoop ecosystem configuration information up to date. It is used to name nodes, allowing one network node to find another on a set of thousands of machines.
Comparisons between Spark, Hadoop, Storm
Data Processing Model
- Hadoop is best for batch processing. Apache Spark can do more than just batch processing. It provides libraries for machine learning, graph processing. It can be used for both batch processing and real time processing. Storm is only for Streaming, real time processing.
- Spark processing in-memory data but Hadoop writes data to disk after a map action or reduce action. Hence, Hadoop is slower. Spark requires huge memory and also stores it for caching. It has good performance only when it is allowed to run on dedicated clusters where the entire dataset can be stored in memory. However, if there is not enough memory for example when Spark runs on top of YARN with other services (which might be resource demanding), Spark’s performance might be affected too. In Hadoop, since the process is killed as soon as it is completed, hence whether data does or does not fit in memory, the impact on its performance is small. Hadoop can run with other resource demanding services with just a slight difference in performance.
- Spark streams events in small batches that reaches it within a certain time period before processing them all in one shot. On the other hand, Storm processes data as they come, one at a time. Spark has latency of a few seconds while Storm processes an event in a few millisecond latency. Storm is a good option when an application needs sub-second latency without data loss
Ease of Development
- Hadoop is written in Java. Apache Pig makes it easier to develop in Hadoop. To add SQL compatibility to Hadoop, developers can use Hive.
- Spark uses Scala but provides python alternatives.
- Storm uses DAG. The data transfer between every node in the directed acyclic graph transfer between the nodes in a directed acyclic graph has a interface and this happens through storm tuples.
Fault Tolerance (Handling process/node level failures)
- Storm: Storm is intended with fault-tolerance at its core. Storm daemons (Nimbus and Supervisor) are made to be fail-fast (that means that method self-destructs whenever any sudden scenario is encountered) and stateless (all state is unbroken in Zookeeper or on disk). In Apache Storm, when a process fails, the supervisor process will restart it automatically as state management is managed by Zookeeper.
- Spark Streaming: The Driver Node (an equivalent of JT) is SPOF. If driver node fails, then all executors will be lost with their received and replicated in-memory information. Hence, Spark Streaming uses data checkpointing to get over from driver failure.
Why is efficient large scale graph processing challenging?
Graph Processing in Hadoop:
- Disadvantage: Iterative algorithms are slow due to lots of read and writes to disk.
- Advantage: No additional libraries needed
- Network traffic will be generated. the direct messaging can be aware of graph structures (you can think: MR is general-purpose; pregel is specialized and can have more optimizations for graphs).
Comparisons between Pregel and Giraph
- Both uses Bulk Synchronous Parallel model in which computations are divided into a series of supersteps.
- Pregel is not open source source but Giraph is.
- Method of computation: Giraph employs Hadoop’s MAP phase to run computation and employs ZooKeeper to provide distributed synchronisation.
- Pregel is not edge oriented input. Edges are not first class citizens in Pregel. But Edges in Giraph is edge oriented input.
- Like Pregel, Giraph was originally designed to run the whole computation in-memory, and to touch disks only for input/output and checkpointing. Unfortunately, sometimes a graph is too large to fit into a cluster’s main memory (or the cluster is too small for a graph). Moreover, certain algorithms produce too large message sets (either producing many messages or very large ones), again not fitting into the cluster’s memory. For these scenarios, Giraph was extended with out-of-core capabilities, meaning that Giraph is able to spill to disk excessive messages or partitions, according to user-defined parameters.
- On the other hand, Pregel strictly keeps the dataset in memory rather than writing to disk. It has message passing between the vertices of a graph and handles the fact that graph algorithms have poor memory access locality.