diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java index 17bfc4af36..5fa9e67235 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java @@ -4,6 +4,7 @@ package eu.dnetlib.dhp.actionmanager.promote; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass; +import java.io.IOException; import java.util.Objects; import java.util.Optional; import java.util.function.BiFunction; @@ -20,6 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier; @@ -134,24 +136,39 @@ public class PromoteActionPayloadForGraphTableJob { .map( (MapFunction) value -> OBJECT_MAPPER.readValue(value, rowClazz), Encoders.bean(rowClazz)); - - /* - * return spark .read() .parquet(path) .as(Encoders.bean(rowClazz)); - */ } private static Dataset readActionPayload( SparkSession spark, String path, Class actionPayloadClazz) { logger.info("Reading action payload from path: {}", path); + return spark .read() .parquet(path) + .map((MapFunction) value -> extractPayload(value), Encoders.STRING()) .map( - (MapFunction) value -> OBJECT_MAPPER - .readValue(value. getAs("payload"), actionPayloadClazz), + (MapFunction) value -> decodePayload(actionPayloadClazz, value), Encoders.bean(actionPayloadClazz)); } + private static String extractPayload(Row value) { + try { + return value. getAs("payload"); + } catch (IllegalArgumentException | ClassCastException e) { + logger.error("cannot extract payload from action: {}", value.toString()); + throw e; + } + } + + private static A decodePayload(Class actionPayloadClazz, String payload) throws IOException { + try { + return OBJECT_MAPPER.readValue(payload, actionPayloadClazz); + } catch (UnrecognizedPropertyException e) { + logger.error("error decoding payload: {}", payload); + throw e; + } + } + private static Dataset promoteActionPayloadForGraphTable( Dataset rowDS, Dataset actionPayloadDS,