enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
1 changed files with 23 additions and 6 deletions
Showing only changes of commit 961a0d0b49 - Show all commits

View File

@ -4,6 +4,7 @@ package eu.dnetlib.dhp.actionmanager.promote;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass; import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
import java.io.IOException;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.function.BiFunction; import java.util.function.BiFunction;
@ -20,6 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier; import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier;
@ -134,24 +136,39 @@ public class PromoteActionPayloadForGraphTableJob {
.map( .map(
(MapFunction<String, G>) value -> OBJECT_MAPPER.readValue(value, rowClazz), (MapFunction<String, G>) value -> OBJECT_MAPPER.readValue(value, rowClazz),
Encoders.bean(rowClazz)); Encoders.bean(rowClazz));
/*
* return spark .read() .parquet(path) .as(Encoders.bean(rowClazz));
*/
} }
private static <A extends Oaf> Dataset<A> readActionPayload( private static <A extends Oaf> Dataset<A> readActionPayload(
SparkSession spark, String path, Class<A> actionPayloadClazz) { SparkSession spark, String path, Class<A> actionPayloadClazz) {
logger.info("Reading action payload from path: {}", path); logger.info("Reading action payload from path: {}", path);
return spark return spark
.read() .read()
.parquet(path) .parquet(path)
.map((MapFunction<Row, String>) value -> extractPayload(value), Encoders.STRING())
.map( .map(
(MapFunction<Row, A>) value -> OBJECT_MAPPER (MapFunction<String, A>) value -> decodePayload(actionPayloadClazz, value),
.readValue(value.<String> getAs("payload"), actionPayloadClazz),
Encoders.bean(actionPayloadClazz)); Encoders.bean(actionPayloadClazz));
} }
private static String extractPayload(Row value) {
try {
return value.<String> getAs("payload");
} catch (IllegalArgumentException | ClassCastException e) {
logger.error("cannot extract payload from action: {}", value.toString());
throw e;
}
}
private static <A extends Oaf> A decodePayload(Class<A> actionPayloadClazz, String payload) throws IOException {
try {
return OBJECT_MAPPER.readValue(payload, actionPayloadClazz);
} catch (UnrecognizedPropertyException e) {
logger.error("error decoding payload: {}", payload);
throw e;
}
}
private static <G extends Oaf, A extends Oaf> Dataset<G> promoteActionPayloadForGraphTable( private static <G extends Oaf, A extends Oaf> Dataset<G> promoteActionPayloadForGraphTable(
Dataset<G> rowDS, Dataset<G> rowDS,
Dataset<A> actionPayloadDS, Dataset<A> actionPayloadDS,