New action manager implementation #4

Closed
opened 2020-04-02 12:27:11 +02:00 by przemyslaw.jacewicz · 1 comment
Contributor

Overview

Add new action manager module for promoting action sets. New module should take as input materialized graph and a list of action sets and merge action payload with compatibile graph tables producing a new graph materialization. Both graph materializations should be consistent with graph representation here.

Input

  1. materialized graph consistent with graph representation
  2. comma separated list of action sets, each action set should contain a sequence file and have in the following format:
action_set
├─ action_set_id
│  ├─ part-00000
│  ├─ part-00001
│  └─ ...
  1. strategy for merging graph table with action payload; currently only two methods should be provided: merge using mergeFrom method of OAF model and select newer instance
  2. a set of boolean flags indicating activation of promotion for specific type of action payload

Output

  1. materialized graph consistent with graph representation

Oozie workflows implementation notes

If possible use subworkflows and divide processing into separate actions to avoid memory and performance issues for jobs running on the cluster.

Spark jobs implementation notes

Use Spark Dataset/Dataframe API.

## Overview Add new action manager module for promoting action sets. New module should take as input materialized graph and a list of action sets and merge action payload with compatibile graph tables producing a new graph materialization. Both graph materializations should be consistent with graph representation [here](https://code-repo.d4science.org/D-Net/dnet-hadoop/wiki/Data-provision-workflow#user-content-graph-representation). ### Input 1. materialized graph consistent with graph representation 1. comma separated list of action sets, each action set should contain a sequence file and have in the following format: ```bash action_set ├─ action_set_id │ ├─ part-00000 │ ├─ part-00001 │ └─ ... ``` 1. strategy for merging graph table with action payload; currently only two methods should be provided: merge using `mergeFrom` method of OAF model and select newer instance 1. a set of boolean flags indicating activation of promotion for specific type of action payload ### Output 1. materialized graph consistent with graph representation ### Oozie workflows implementation notes If possible use subworkflows and divide processing into separate actions to avoid memory and performance issues for jobs running on the cluster. ### Spark jobs implementation notes Use Spark Dataset/Dataframe API.
Author
Contributor

Spark jobs development notes

Container killed by YARN

For large datasets e.g. publications default number of partitions after shuffling (equal to 200) can cause memory problems and job failing with the following message

Container killed by YARN for exceeding memory limits.  11.1 GB of 11 GB physical memory used.

This easiest way to resolve this problem is to increase the number of partitions after shuffling using this spark conf param:

spark.sql.shuffle.partitions=2560

The number should be 3 or 4 times the number of cores in the cluster.

OutOfMemoryError

It seems that hierarchical OAF model is not fully compatible with spark Java bean encoder accessible through Encoders.bean and an OOM error can be produced when grouping and aggregating:

rowDS
	.groupByKey((MapFunction<Software, String>) x -> x.getId(), Encoders.bean(Software.class))
	.reduceGroups((ReduceFunction<G>) (v1, v2) -> v1.mergeFrom(v2));

The solution is to use kryo encoder accessible using Encoders.kryo. This has a drawback: when using kryo serialization dataset schema (based on the schema of objects held by the dataset) is lost and the dataset has only one column value with byte representation of objects. This means that spark SQL-based API cannot be used and only spark dataset API must be used.

NotSerializableException

When creating utility classes with methods for processing spark Datasets taking lambdas as input java.io.NotSerializableException can be produced; this is caused by the fact that java.util.function interfaces are not serializable.

The solution is to wrap the lambda in a serializable wrapper.

Failed tasks

When running spark jobs on a busy cluster executors can be killed by the driver if an executor is idle for more than spark.dynamicAllocation.executorIdleTimeout seconds (default to 60s); when this happens spark executor logs contain this message

2020-04-01 02:30:02,929 [SIGTERM handler] ERROR org.apache.spark.executor.CoarseGrainedExecutorBackend  - RECEIVED SIGNAL TERM
2020-04-01 02:30:02,936 [Thread-2] INFO  org.apache.spark.storage.DiskBlockManager  - Shutdown hook called

and spark driver logs contain this message

2020-03-31 18:56:31,991 [dispatcher-event-loop-47] INFO  org.apache.spark.deploy.yarn.ApplicationMaster$AMEndpoint  - Driver requested to kill executor(s) 74.
2020-03-31 18:56:32,148 [spark-dynamic-executor-allocation] INFO  org.apache.spark.ExecutorAllocationManager  - Removing executor 74 because it has been idle for 60 seconds (new desired total will be 79)

This is a normal situation and failed executors should be recreated by spark driver.

### Spark jobs development notes #### Container killed by YARN For large datasets e.g. publications default number of partitions after shuffling (equal to 200) can cause memory problems and job failing with the following message ```bash Container killed by YARN for exceeding memory limits. 11.1 GB of 11 GB physical memory used. ``` This easiest way to resolve this problem is to increase the number of partitions after shuffling using this spark conf param: ```bash spark.sql.shuffle.partitions=2560 ``` The number should be 3 or 4 times the number of cores in the cluster. #### OutOfMemoryError It seems that hierarchical OAF model is not fully compatible with spark Java bean encoder accessible through `Encoders.bean` and an OOM error can be produced when grouping and aggregating: ```java rowDS .groupByKey((MapFunction<Software, String>) x -> x.getId(), Encoders.bean(Software.class)) .reduceGroups((ReduceFunction<G>) (v1, v2) -> v1.mergeFrom(v2)); ``` The solution is to use kryo encoder accessible using `Encoders.kryo`. This has a drawback: when using kryo serialization dataset schema (based on the schema of objects held by the dataset) is lost and the dataset has only one column `value` with byte representation of objects. This means that spark SQL-based API cannot be used and only spark dataset API must be used. #### NotSerializableException When creating utility classes with methods for processing spark `Dataset`s taking lambdas as input `java.io.NotSerializableException` can be produced; this is caused by the fact that `java.util.function` interfaces are not serializable. The solution is to wrap the lambda in a serializable wrapper. #### Failed tasks When running spark jobs on a busy cluster executors can be killed by the driver if an executor is idle for more than `spark.dynamicAllocation.executorIdleTimeout` seconds (default to 60s); when this happens spark executor logs contain this message ```log 2020-04-01 02:30:02,929 [SIGTERM handler] ERROR org.apache.spark.executor.CoarseGrainedExecutorBackend - RECEIVED SIGNAL TERM 2020-04-01 02:30:02,936 [Thread-2] INFO org.apache.spark.storage.DiskBlockManager - Shutdown hook called ``` and spark driver logs contain this message ```log 2020-03-31 18:56:31,991 [dispatcher-event-loop-47] INFO org.apache.spark.deploy.yarn.ApplicationMaster$AMEndpoint - Driver requested to kill executor(s) 74. 2020-03-31 18:56:32,148 [spark-dynamic-executor-allocation] INFO org.apache.spark.ExecutorAllocationManager - Removing executor 74 because it has been idle for 60 seconds (new desired total will be 79) ``` This is a normal situation and failed executors should be recreated by spark driver.
claudio.atzori added the
enhancement
label 2020-07-27 18:02:25 +02:00
Sign in to join this conversation.
No Milestone
No project
No Assignees
1 Participants
Notifications
Due Date
The due date is invalid or out of range. Please use the format 'yyyy-mm-dd'.

No due date set.

Dependencies

No dependencies set.

Reference: D-Net/dnet-hadoop#4
No description provided.