Question-5: What are RDD Operations in Spark?
Answer:
Spark RDD supports two types of Operations:
Transformations - are lazy operations on a RDD that create one
or many new RDDs. It takes RDD as input and produces one or more RDD
as output. Each time it creates new RDD when we apply any
transformation.
After the transformation, the resultant RDD is always different from
its parent RDD.
It can be smaller (e.g. filter, count, distinct,
sample), bigger (e.g. flatMap(), union(), Cartesian()) or the same
size (e.g. map).
Some frequently used RDD Transformations are - map(), flatmap(),
filter(), union(), distinct(), reduceByKey(),
groupByKey(),coalesce(),repartition(), join() etc.
Actions - Actions are the functions that perform some kind of computation over the transformed RDD and sends the computed result from executors to driver.
Actions triggers execution using lineage graph to
load the data into original RDD, carry out all intermediate
transformations and return final results to Driver program or write it
out to file system.
Some frequently used RDD Actions are - reduce(func), collect(),
count(), first(), take(), takeSample(), saveAsTextFile(),
foreach(func) etc.
Question-6: What are Narrow and Wide transformations?
Answer:
Transformations can further be divided into 2 types:
Narrow transformation: A pipeline of operations that can be
executed as one stage and does not require the data to be shuffled
across the partitions —
For example, Map, flatmap, filter, sample, union, etc..
Wide transformation: Here each operation requires the data
to be shuffled, henceforth for each wide transformation a new stage
will be created —
For example, Intersection, distinct, reduceByKey, groupByKey, join,
repartition, coalesce, etc..
Note: When compared to Narrow transformations, wider
transformations are expensive operations due to shuffling.
Question-7: What are the Shared Variables in Spark?
Answer:
There are two different types of Shared Variables in spark
-Broadcast Variable and Accumulator:
Broadcast Variable - Broadcast variables are read-only
variables that are distributed across worker nodes in-memory instead
of shipping a copy of data with tasks. The data broadcasted this way
is cached in a serialized form and deserialized before running each
task. They are used to cache a value in memory on all nodes.
Generally small datasets can be broadcasted.
Accumulator - Accumulators are shared variables which are
used to update the variables in parallel during execution and share
the results from workers to the driver program running on master
node. They are similar to counters in map-reduce. Are those
variables that are used to perform associative and commutative
operations such as counters or sums.
Question-8: What is the difference between Persist and Cache?
Answer:
Cache and Persist are optimization techniques for both iterative
and interactive Spark applications to improve the performance of the
jobs or applications.
Iterative means Reuse intermediate results. Whereas interactive
means allowing a two-way flow of information.
Spark provides an optimization mechanism to store the intermediate
results of an RDD, DataFrame, and Dataset, so they can be reused in
subsequent stages.
Both caching and persisting are used to save the intermediate
resultsets but, the difference among them is that cache() will cache
the RDD into memory(MEMORY_ONLY), whereas persist(level) can cache
in memory, on disk, or off-heap memory according to the caching
strategy specified by level. Those storage levels are like
MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER,
DISK_ONLY
Note:
- persist() without an argument is equivalent with cache().
- unpersist() can be used to Freeing up space from the Storage memory.
Question-9: What is Spark checkpointing and how it is different
from Persist to disk?
Answer:
“When should I cache or checkpoint?”
- to determining if the
results of a set of transformations can be reused for a very long
time or not.
If the answer is yes, use
checkpointing. If the answer is no, use caching.
Caching is extremely effective and more useful than checkpointing,
as it’s typically much faster when used properly, when you have a
lot of available memory, as RDDs and DataFrames can be massive in
size, like giga or terabyte size. Essentially, caching will maintain
the result of applying a set of transformations to an RDD or
DataFrame so that those transformations will not have to be
recomputed again when an additional transformation is applied or
fault occurs within a RDD or DataFrame.
Checkpointing will persist the transformed RDD or DataFrame
forever. Now you’re thinking “this is great, I can throw away old
history for free and not have Spark need to redo transformations
from time to time like caching.” Well not for free exactly. The main
problem with checkpointing is that Spark must be able to persist any
checkpoint RDD or DataFrame to HDFS which is slower and less
flexible than caching. You also need to setup checkpointing to a
location on HDFS, where a RDD or DataFrame’s transformations can be
persisted, whereas caching is part of Spark’s implicit default
setup.
Question-10: What is SparkSession and how it is different from
SparkContext?
Answer:
Spark Context -
SparkContext is the entry point of Apache Spark
functionality.
It allows your Spark Application to access Spark Cluster with the
help of Resource Manager
To create SparkContext, first SparkConf should be made, which had
all the cluster configs and parameters to create a Spark Context
object.
Only one SparkContext may be active per JVM, its a expensive
operation
Spark Session -
Spark session is a unified entry point of a spark application from
Spark 2.0
Spark session is a combination of spark context, hive context and
SQL context
Spark session provides with spark.implicits._ which is one of the
most useful imports in all of the spark packages which comes in
handy with a lot of implicit methods for converting Scala objects to
Datasets and some other handy utils.
You can think of Spark Session as a data structure where the driver
maintains all the information including the executor location and
their status.
Question-11: What parameters to specify in spark-submit command
-
Answer:
To submit a Spark job, below command has to be
triggered:
Parameters can be passed as per your project
configuration
spark-submit --num-executers a --executor-cores b --driver-memory c
-- executor-memory d --class com.spark.examples.TestSparkJob
--master yarn --deploy-mode cluster
/path/to/examples.jar
Values of a,b,c,d can be substituted as below:
a= No of executors required. Eg: 15
b= No of cores per executor. 5 cores(default)
c= Driver memory. Eg: 1G
d= Executor memery. Eg: 2G
Question-12: What are the different types of Cluster manager
available in Spark?
Answer:
Apache Spark supports four different cluster managers:
Apache YARN - YARN is the cluster manager for Hadoop. As of
date, YARN is the most widely used cluster manager for Apache
Spark.
Apache Mesos - Apache Mesos is another general-purpose
cluster manager. If you are not using Hadoop, you might be using
Mesos for your Spark cluster.
Kubernetes - it's a general purpose container orchestration
platform from Google.
Standalone - Finally, the standalone. The Standalone
is a simple and basic cluster manager that comes with Apache Spark
and makes it easy to set up a Spark cluster very quickly. This is
basically used during development.
No matter which cluster manager do we use, primarily, all of them
delivers the same purpose.
Question-13: What are deploy modes in Spark?
Answer:
When you start an application, you have a choice to specify the
execution mode, and there are three options - Local, Client &
Cluster :
1. Client Mode -
In Client mode, driver starts on the local machine and then as soon
as the driver create a Spark Session, a request goes to YARN
resource manager to create a YARN application. The YARN resource
manager starts an Application Master. For the client mode, the AM
acts as an Executor Launcher. So, the YARN application master will
reach out to YARN resource manager and request for further
containers. The resource manager will allocate new containers, and
the Application Master starts an executor in each container. After
the initial setup, these executors directly communicate with the
driver.
 |
Client Deploy Mode
|
2. Cluster Mode -
In Cluster mode, the spark-submit utility will send a YARN
application request to the YARN resource manager. The YARN resource
manager starts an application master. And then, the driver starts in
the AM container. That's where the client mode and cluster mode
differs.
In the client mode, the YARN AM acts as an executor launcher, and
the driver resides on your local machine, but in the cluster mode,
the YARN AM starts the driver, and you don't have any dependency on
your local computer. Once started, the driver will reach out to
resource manager with a request for more Containers. The resource
manager will allocate new Containers, and the driver starts an
executor in each Container.
 |
Cluster Deploy Mode
|
3. Local Mode -
Local mode is a for debugging purpose. The local mode doesn't use
the cluster at all and everything runs in a single JVM on your local
machine.
Question-14: What is the difference between executor & executor
core?
Answer:
An Executor is a JVM process within a YARN container and an
executor core is a thread within the JVM. Each task is processed by
a thread. Each task work on a subset of the data. An Executor is
dedicated to a specific Spark application and terminated when the
application completes. A Spark program normally consists of many
Executors, often working in parallel.
Executor Core defines number of concurrent tasks each
executor can run. If we give executors cores very high, then spark
will create very high number of tasks for each executor. These tasks
will compete each other for resources and It will reduce data I/O
throughput. The more cores we have, the more work we can do. In
spark, this controls the number of parallel tasks an executor can
run.
Question-15: What is Shuffling concept in Spark?
Answer:
Shuffling is a process of redistributing data across multiple
partitions. Its a process of data transfer between stages. By
default, shuffling doesn’t change the number of partitions, but
their content. It is a costly and complex operation.
When an “Action” is called, Spark sends out tasks to the worker
nodes. If the action is a reduction, data shuffling takes
place.
When the DAGScheduler generates the physical execution plan from
our logical plan it pipelines together all RDDs into one stage that
are connected by a narrow dependency. Consequently, it cuts the
logical plan between all RDDs with wide dependencies
In general, avoiding shuffle will make your program run faster. All
shuffle data must be written to disk and then transferred over the
network.
Repartition, Join, cogroup, distinct, and any of the *By or *ByKey
transformations can result in shuffles.
map, filter and union generate a only stage (no shuffling).
Question-16: How to decide number of stages created in Spark
job?
Answer:
DAG is a collection of stages in spark, which contains different
task.
A Stage is a sequence of Tasks that don't require a Shuffle
in-between Or you can say a Stage is collection of transformations
which does not include shuffling.
When an wide transformation executed then data will shuffled
between partitions. Due to this a new stage is created in DAG.
In spark, stages are of two types:
-ShuffleMapStage
-ResultStage
Basically, Number of stages to be created completely depends on
shuffling, i.e. whenever you perform any transformation where Spark
needs to shuffle the data by communicating to the other partitions,
it creates other stages for such transformations. And the
transformation does not require the shuffling of your data; it
creates a single stage for it.
And the number of tasks to be created depends on your number of
partitions. Lets say we have only two partitions, so each stage is
divided into two tasks. And a single task runs on a single
partition.
The number of tasks for a job is = ( no of your stages * no of your
partitions )
Question-17: What happens when Spark job is submitted OR How Spark
Internally Executes a Program?
Answer:
When we do a transformation on any RDD, it gives us a new RDD. But
it does not start the execution of those transformations. The
execution is performed only when an action is performed on the new
RDD and gives us a final result.
So once you perform any action on an RDD, Spark context gives your
program to the driver.
The driver creates the DAG (directed acyclic graph) or execution
plan (job) for your program. Once the DAG is created, the driver
divides this DAG into a number of stages. These stages are then
divided into smaller tasks and all the tasks are given to the
executors for execution on the worker node.
Question-18: Can output be directly stored without sending it back
to driver?
Answer:
Yes. Code needs to be written in such a way that all explicit
result collection at the driver should be avoided. We can very well
delegate this task to one of the executors.
E.g., if we want to save the results to a particular file, either
we can collect it at the driver or assign an executor to do that for
you.
Question-19: What is
coalesce and repartition used for ?
Answer:
These are the functions used to increase/decrease the number of
partitions generated.
Coalesce is used to decrease the number of partitions
without invoking Shuffling.It should be used if the number of output
partitions is less than the input. It can trigger RDD shuffling
depending on the shuffle flag which is disabled by default (i.e.
false).
While Repartition function helps to increase or decrease the
number of partitions by doing Shuffling of data.
Question-20: What is physical and logical plan optimization in
Spark?
Answer:
Logical Plan:
In this phase, an RDD is created using a set of transformations, It
keeps track of those transformations in the driver program by
building a computing chain (a series of RDD)as a Graph of
transformations to produce one RDD called a Lineage Graph.
Logical Plan is divided into three parts:
a. Unresolved Logical Plan OR Parsed Logical Plan
- When code
is valid and the syntax is correct.
b. Resolved Logical Plan OR Analyzed Logical Plan OR Logical Plan
- This plan is then passed on
to a “Catalyst Optimizer” which will apply its own rule and will try
to optimize the plan.
c. Optimized Logical Plan
- Once our Optimized Logical Plan
is created then further, Physical Plan is generated.
 |
Catalyst Optimizer
|
Physical Plan:
It simply specifies how our Logical Plan is going to be executed on
the cluster. It generates different kinds of execution strategies
and then keeps comparing them in the “Cost Model”. Spark decides
which partitions should be joined first (basically it decides the
order of joining the partitions), the type of join, etc for better
optimization.
Physical Plan is specific to Spark operation and for this it will
do check up of multiple physical plans and decide the best optimal
physical plan. And finally the Best Physical Plan runs in our
cluster.
Question-21: How catalyst optimizer work internally?
Answer:
The Catalyst optimizer handles: analysis, logical optimization,
physical planning, and code generation to compile parts of queries
to Java byte code. Catalyst now supports both rule-based and
cost-based optimization.
- Analyzing logical plan --
Rule based
- Logical plan optimization
-- Rule based
- Physical planning -- Cost
based
- Code generation to
generate Java bytecode-- Rule based
*Refer Question(20) for more detailed answer on this.
Question-22: What are new features have Tungsten brought?
Answer:
Tungsten Engine has been introduced in Spark 2.x. The goal of
Project Tungsten is to improve Spark execution by optimizing Spark
jobs for CPU and memory efficiency. It does so by offering the
following optimization features:
a. Memory Management and Binary Processing: to eliminate the
overhead of JVM object model and garbage collection
- Tungsten is a Spark SQL
component that provides increased performance by rewriting Spark
operations in bytecode, at runtime.
b. Cache-aware Computation:
c. Code generation: For performance reasons,
Spark tries to group together multiple operators inside a
Whole-Stage CodeGen.
Question-23: What is Predicate Pushdown in Spark?
Answer:
Predicate pushdown is a technique to process only the required
data. Predicates can be applied to SparkSQL by defining filters in
where conditions. By using explain command to query we can check the
query processing stages. If the query plan contains PushedFilter
than the query is optimized to select only required data as every
predicate returns either True or False. If there is no PushedFilter
found in query plan than better is to cast the where condition.
Predicate Pushdowns limits the number of files and partitions that
SparkSQL reads while querying, thus reducing disk I/O. Querying on
data in buckets with predicate push downs produce results faster
with less shuffle.
*Predicate Pushdown does not work with all the datatypes in
Parquet, rather when used with Int, pushes down the values.
Question-28: Why we prefer treereduce and treeaggregate over
reduceByKey and aggregateByKey?
Answer:
In a regular reduce or aggregate functions in Spark (and the original
MapReduce) all partitions have to send their reduced value to the
driver machine, and that machine spends linear time on the number of
partitions. It becomes a bottleneck when there are many partitions and
the data from each partition is big.
So Spark has introduced a new aggregation communication pattern based
on multi-level aggregation trees.
In this setup, data are combined partially on a small set of
executors before they are sent to the driver, which dramatically
reduces the load the driver has to deal with.
So, in treeReduce and in treeAggregate, the partitions talk to each
other in a logarithmic number of rounds.
Also would like to mention difference between reduceByKey and
treeReduce -
reduceByKey is only available
on key-value pair RDDs, while treeReduce is a generalization of reduce
operation on any RDD.
reduceByKey performs
reduction for each key, resulting in an RDD; it is not an action but a
transformation that returns a Shuffled RDD.
On the other hand, treeReduce
perform the reduction in parallel using reduceByKey .
Question-29: In what cases Accumulators are reliable? OR If executors
are crashed and we are using accumulators, then can we rely on
accumulators output. If yes, why and if no, why?
Answer:
Accumulators are variables that are used for aggregating information
across the executors. For example, this information can be used to
find out how many records are corrupted or how many times a particular
library API was called.
Spark automatically deals with failed or slow machines by
re-executing failed or slow tasks.
Example :- if the node running a partition of a map() operation
crashes, Spark will rerun it on another node; and even if the node
does not crash but is simply much slower than other nodes, Spark can
preemptively launch a “speculative” copy of the task on another node,
and take its result if that finishes.
Even if no nodes fail,Spark may have to rerun a task to rebuild a
cached value that falls out of memory.The net result is therefore that
the same function may run multiple times on the same data depending on
what happens on the cluster.
So Accumulators are truely reliable when they are present in an
Action operation.
Accumulator updates are sent back to the driver when a task is
successfully completed. So your accumulator results are guaranteed to
be correct when you are certain that each task will have been executed
exactly once and each task did as you expected.
Question-30: What are Stragglers? How to handle stragglers and how to
detect them.
Answer:
“Stragglers” are tasks within a stage that take much longer to
execute than other tasks.
Stragglers in general are those tasks that take more time to complete
than their peers. This could happen due to many reasons such as load
imbalance, I/O blocks, garbage collections, etc.
Once a Spark job is submitted, it gets broken down into multiple
stages and each stage spawns many tasks which will be executed on each
node. One or more stage might be dependent on the previous stages. So,
when a task of the earlier stage takes a longer time to complete, then
the dependent stages execution in the pipeline also gets delayed. Such
tasks that takes longer time to complete are termed as
straggler.
As the size of the cluster and the size of the stages/tasks grows,
the impact of stragglers increases dramatically impacting the job
completion time.
Thus, addressing the problem of straggler becomes prudent in order to
speed up the job completion time and also improve the cluster
efficiency
The Straggler Mitigation strategy in Spark is based on Speculation. A
task is identified as a Straggler (or Speculatable) and is
duplicated.
The Straggler identification strategy starts when
spark.speculation.quantile fraction of tasks in a stage have
completed.
By default, spark.speculation.quantile is set as 0.75. Thereafter, if
a currently running task exceeds spark.speculation.multiplier(1.5)
times the median task time from the set of successful task completions
of this particular stage, then this task is identified as a
straggler.
The default value of spark.speculation.multiplier is 1.5.
Question-31: Difference between groupByKey and reduceByKey
Answer:
Both groupByKey and reduceByKey produce the same answer but concept
to produce results are different.
reduceByKey works much better on a large dataset because Spark can be
combine output with a common key on each partition before shuffling
the data.
While on the other side in groupByKey, all the key-value pairs are
shuffled around. This is a lot of unnecessary data to being transferred
over the network.
Question-32: How will you handle OOM errors in Spark?
Answer:
Out of memory issues can be observed for the driver nodes or for
executor nodes:
- Out-of-memory errors in the driver. -
Caused by actions
Common causes which result in driver
OOM are:
- rdd.collect()
- sparkContext.broadcast
- Low driver memory configured as per the application
requirements
- Misconfiguration of spark.sql.autoBroadcastJoinThreshold.
- Out-of-memory errors on the executor
nodes. - Caused by shuffles associated with wide transformations
Common causes which result in executor OOM are:
- HIGH CONCURRENCY
- INEFFICIENT QUERIES
- INCORRECT CONFIGURATION
**OutofMemoryError due to operations: Increase parallelism so each
task input is smaller. Since Spark reuses executor JVM for many tasks,
increase parallelism to more than the number of cores.
Question-33: What is the command to see RDD lineage ?
Answer:
"ToDebugString" Method - Returns “A description of the RDD and
its recursive dependencies for debugging.
Question-34: What is Explode function and how it can be used?
Answer:
explode() takes in an array (or a map) as an input and outputs the
elements of the array (map) as separate rows.
In Spark, we can use “explode” method to convert single column values
into multiple rows.
+---+----+---------+
| id|col1|
col2|
+---+----+---------+
| 1| abc|[p, q,
r]|
| 2| def|[x, y,
z]|
+---+----+---------+
df.withColumn("col2",explode($"col2")).show()
+---+----+----+
| id|col1|col2|
+---+----+----+
| 1| abc|
p|
| 1| abc|
q|
| 1| abc|
r|
| 2| def|
x|
| 2| def|
y|
| 2| def|
z|
+---+----+----+
Question-35: Does Parquet file contains the header of data?
Answer: Yes
Yeah its really helpful 😊
ReplyDeleteReally you did hard work to prepare this.Definetily it would be helpful for others
ReplyDeleteThank you Sir :)
DeleteIt's really helpful Manushree. Just one small point, in second question you have mentioned that Dataframes are mutable but dataframes are not mutable because at the core it's rdd only, you can perform any transformation and derive a new dataframe out of an existing one . Let me know your thoughts on this
ReplyDeleteYes.. RDD, Dataframe and Datasets - all are immutable in nature.
DeleteWhen we apply any operations on DataFrames it will generate a new data frame instead of updating the existing data frame but we can reassign the updated value to the same Dataframe name.
for e.g. df = df.withColumn() --- So with that reference i mentioned it.
I think I should remove that point, as it might create confusion :)
Great explaination to evry questions. Thanks for taking your time in preparing this document. Really helpful
ReplyDeleteThanks Puneet.
DeleteThis is what I was looking for. I have bookmarked it.
ReplyDeleteA concise list of fundamental terms being used in Spark world!! I appreciate your efforts!
Thank you..
DeleteThanks for curating the questions, I went through the entire content & its really helpful. However i have doubt on
ReplyDeleteQuestion 9 - still not clear on the difference b/w check pointing & persist to disk. Both of them are used to save data to disk, then how are these different. Can you give some example?
Caching is used to store the RDD/DF to Memory and will also retain the data lineage which can be used later to recompute the lost partitions in case of job failure.
Deletewhereas Checkpointing is to reuse RDD partitions when failures occur during job execution. What it does is to freeze the content of your data. Like in case of job failure, partitions will be re-calculated all the way from beginning which is expensive in terms of performance of overall job. So if we define checkpoint at some stage, it will save the data till that stage on HDFS and destroys the lineage. During rerun of the job, computation happens from that point not from the starting. Consider checkpoint as new base line from where all calculations are performed when a partition or stage fails.
I hope this explanation helps you to understand it better :)
Very Informative and helpful. Thank you investing so much time and effort.
ReplyDeleteThank you Swati :)
DeleteThis is what I am looking for.thanks a lot and very well explained. Appreciate your efforts.
ReplyDeleteThanks..
DeleteThanks for covering almost all topics with detailed explanation.It is very very helpful.
ReplyDeleteIn Question No 15 Answer(By default, shuffling doesn’t change the number of partitions, but their content).
ReplyDeleteAs i worked spark change the shuffle partition By default:200 and you can change using property spark.sql.shuffle.partitions.
I have more questions. Can I post here? It would be helpful if you can provide that answer and add to this post also?
ReplyDeleteAwesome , Thank you ManuShree
ReplyDelete