Yes, I can. I'm not sure when I'll be able to start the work, though. We have some pending tasks in IIS and some planning within our team is needed.
Hmm, I didn't think about that, the methods runWithSparkSession
were to be used by action manager's spark actions, where Hive support is not needed. But I think that the version of runWithSparkSession
that accepts SparkSession
builder function could be used to supply a builder that builds a SparkSession
with Hive support using code like this
SparkSession.builder().config(c).enableHiveSupport().getOrCreate()
PS. Sorry for the late reply, didn't see the comment earlier.
The most possible solution is that Oozie distcp actions in workflows do not delete their target dirs using prepare
section and when target dir exists distcp copies source dir as a nested dir inside target dir. This behavior does not seem to be documented well.
Spark jobs development notes
Container killed by YARN
For large datasets e.g. publications default number of partitions after shuffling (equal to 200) can cause memory problems and job failing with the following message
Container killed by YARN for exceeding memory limits. 11.1 GB of 11 GB physical memory used.
This easiest way to resolve this problem is to increase the number of partitions after shuffling using this spark conf param:
spark.sql.shuffle.partitions=2560
The number should be 3 or 4 times the number of cores in the cluster.
OutOfMemoryError
It seems that hierarchical OAF model is not fully compatible with spark Java bean encoder accessible through Encoders.bean
and an OOM error can be produced when grouping and aggregating:
rowDS
.groupByKey((MapFunction<Software, String>) x -> x.getId(), Encoders.bean(Software.class))
.reduceGroups((ReduceFunction<G>) (v1, v2) -> v1.mergeFrom(v2));
The solution is to use kryo encoder accessible using Encoders.kryo
. This has a drawback: when using kryo serialization dataset schema (based on the schema of objects held by the dataset) is lost and the dataset has only one column value
with byte representation of objects. This means that spark SQL-based API cannot be used and only spark dataset API must be used.
NotSerializableException
When creating utility classes with methods for processing spark Dataset
s taking lambdas as input java.io.NotSerializableException
can be produced; this is caused by the fact that java.util.function
interfaces are not serializable.
The solution is to wrap the lambda in a serializable wrapper.
Failed tasks
When running spark jobs on a busy cluster executors can be killed by the driver if an executor is idle for more than spark.dynamicAllocation.executorIdleTimeout
seconds (default to 60s); when this happens spark executor logs contain this message
2020-04-01 02:30:02,929 [SIGTERM handler] ERROR org.apache.spark.executor.CoarseGrainedExecutorBackend - RECEIVED SIGNAL TERM
2020-04-01 02:30:02,936 [Thread-2] INFO org.apache.spark.storage.DiskBlockManager - Shutdown hook called
and spark driver logs contain this message
2020-03-31 18:56:31,991 [dispatcher-event-loop-47] INFO org.apache.spark.deploy.yarn.ApplicationMaster$AMEndpoint - Driver requested to kill executor(s) 74.
2020-03-31 18:56:32,148 [spark-dynamic-executor-allocation] INFO org.apache.spark.ExecutorAllocationManager - Removing executor 74 because it has been idle for 60 seconds (new desired total will be 79)
This is a normal situation and failed executors should be recreated by spark driver.