diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java index 56cbda4d67..f72fd4269c 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java @@ -151,12 +151,17 @@ public class PromoteActionPayloadForGraphTableJob { SparkSession spark, String path, Class rowClazz) { logger.info("Reading graph table from path: {}", path); - return spark - .read() - .textFile(path) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, rowClazz), - Encoders.bean(rowClazz)); + if (HdfsSupport.exists(path, spark.sparkContext().hadoopConfiguration())) { + return spark + .read() + .textFile(path) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, rowClazz), + Encoders.bean(rowClazz)); + } else { + logger.info("Found empty graph table from path: {}", path); + return spark.emptyDataset(Encoders.bean(rowClazz)); + } } private static Dataset readActionPayload( @@ -223,7 +228,7 @@ public class PromoteActionPayloadForGraphTableJob { rowClazz, actionPayloadClazz); - if (shouldGroupById) { + if (Boolean.TRUE.equals(shouldGroupById)) { return PromoteActionPayloadFunctions .groupGraphTableByIdAndMerge( joinedAndMerged, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, rowClazz); @@ -250,6 +255,8 @@ public class PromoteActionPayloadForGraphTableJob { return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Relation()); case "eu.dnetlib.dhp.schema.oaf.Software": return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Software()); + case "eu.dnetlib.dhp.schema.oaf.Person": + return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Person()); default: throw new RuntimeException("unknown class: " + clazz.getCanonicalName()); } diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java index f0b094240e..a3b975d0a9 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java @@ -50,7 +50,7 @@ public class PromoteActionPayloadFunctions { PromoteAction.Strategy promoteActionStrategy, Class rowClazz, Class actionPayloadClazz) { - if (!isSubClass(rowClazz, actionPayloadClazz)) { + if (Boolean.FALSE.equals(isSubClass(rowClazz, actionPayloadClazz))) { throw new RuntimeException( "action payload type must be the same or be a super type of table row type"); } diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/person/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/person/oozie_app/workflow.xml index 7c119b3054..1bacd09f1f 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/person/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/person/oozie_app/workflow.xml @@ -77,7 +77,6 @@ ${(activePromotePersonActionPayload eq "true") and - (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputGraphRootPath')),'/'),'person')) eq "true") and (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputActionPayloadRootPath')),'/'),'clazz=eu.dnetlib.dhp.schema.oaf.Person')) eq "true")} diff --git a/pom.xml b/pom.xml index 666ba2350c..175cb9e7ca 100644 --- a/pom.xml +++ b/pom.xml @@ -937,7 +937,7 @@ 1.1.3 1.7 1.0.7 - [7.0.1] + [7.0.2] cdh5.9.2 3.5 11.0.2