1
0
Fork 0

ActionManager promote: allow to ingest person records in a graph that did not contain them, bumped dhp-schemas version

This commit is contained in:
Claudio Atzori 2024-07-31 11:02:22 +02:00
parent 9486e21a44
commit 6bdb8643e6
4 changed files with 16 additions and 10 deletions

View File

@ -151,12 +151,17 @@ public class PromoteActionPayloadForGraphTableJob {
SparkSession spark, String path, Class<G> rowClazz) { SparkSession spark, String path, Class<G> rowClazz) {
logger.info("Reading graph table from path: {}", path); logger.info("Reading graph table from path: {}", path);
return spark if (HdfsSupport.exists(path, spark.sparkContext().hadoopConfiguration())) {
.read() return spark
.textFile(path) .read()
.map( .textFile(path)
(MapFunction<String, G>) value -> OBJECT_MAPPER.readValue(value, rowClazz), .map(
Encoders.bean(rowClazz)); (MapFunction<String, G>) value -> OBJECT_MAPPER.readValue(value, rowClazz),
Encoders.bean(rowClazz));
} else {
logger.info("Found empty graph table from path: {}", path);
return spark.emptyDataset(Encoders.bean(rowClazz));
}
} }
private static <A extends Oaf> Dataset<A> readActionPayload( private static <A extends Oaf> Dataset<A> readActionPayload(
@ -223,7 +228,7 @@ public class PromoteActionPayloadForGraphTableJob {
rowClazz, rowClazz,
actionPayloadClazz); actionPayloadClazz);
if (shouldGroupById) { if (Boolean.TRUE.equals(shouldGroupById)) {
return PromoteActionPayloadFunctions return PromoteActionPayloadFunctions
.groupGraphTableByIdAndMerge( .groupGraphTableByIdAndMerge(
joinedAndMerged, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, rowClazz); joinedAndMerged, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, rowClazz);
@ -250,6 +255,8 @@ public class PromoteActionPayloadForGraphTableJob {
return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Relation()); return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Relation());
case "eu.dnetlib.dhp.schema.oaf.Software": case "eu.dnetlib.dhp.schema.oaf.Software":
return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Software()); return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Software());
case "eu.dnetlib.dhp.schema.oaf.Person":
return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Person());
default: default:
throw new RuntimeException("unknown class: " + clazz.getCanonicalName()); throw new RuntimeException("unknown class: " + clazz.getCanonicalName());
} }

View File

@ -50,7 +50,7 @@ public class PromoteActionPayloadFunctions {
PromoteAction.Strategy promoteActionStrategy, PromoteAction.Strategy promoteActionStrategy,
Class<G> rowClazz, Class<G> rowClazz,
Class<A> actionPayloadClazz) { Class<A> actionPayloadClazz) {
if (!isSubClass(rowClazz, actionPayloadClazz)) { if (Boolean.FALSE.equals(isSubClass(rowClazz, actionPayloadClazz))) {
throw new RuntimeException( throw new RuntimeException(
"action payload type must be the same or be a super type of table row type"); "action payload type must be the same or be a super type of table row type");
} }

View File

@ -77,7 +77,6 @@
<switch> <switch>
<case to="PromotePersonActionPayloadForPersonTable"> <case to="PromotePersonActionPayloadForPersonTable">
${(activePromotePersonActionPayload eq "true") and ${(activePromotePersonActionPayload eq "true") and
(fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputGraphRootPath')),'/'),'person')) eq "true") and
(fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputActionPayloadRootPath')),'/'),'clazz=eu.dnetlib.dhp.schema.oaf.Person')) eq "true")} (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputActionPayloadRootPath')),'/'),'clazz=eu.dnetlib.dhp.schema.oaf.Person')) eq "true")}
</case> </case>
<default to="SkipPromotePersonActionPayloadForPersonTable"/> <default to="SkipPromotePersonActionPayloadForPersonTable"/>

View File

@ -937,7 +937,7 @@
<commons.logging.version>1.1.3</commons.logging.version> <commons.logging.version>1.1.3</commons.logging.version>
<commons-validator.version>1.7</commons-validator.version> <commons-validator.version>1.7</commons-validator.version>
<dateparser.version>1.0.7</dateparser.version> <dateparser.version>1.0.7</dateparser.version>
<dhp-schemas.version>[7.0.1]</dhp-schemas.version> <dhp-schemas.version>[7.0.2]</dhp-schemas.version>
<dhp.cdh.version>cdh5.9.2</dhp.cdh.version> <dhp.cdh.version>cdh5.9.2</dhp.cdh.version>
<dhp.commons.lang.version>3.5</dhp.commons.lang.version> <dhp.commons.lang.version>3.5</dhp.commons.lang.version>
<dhp.guava.version>11.0.2</dhp.guava.version> <dhp.guava.version>11.0.2</dhp.guava.version>