From 50fc5a64a00e704e7c03062060d4254197fa9fba Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 23 Jun 2021 11:49:42 +0200 Subject: [PATCH] [raw_all] Aggregator graph creation merges claims (updates) with the corresponding entity --- .../raw/GenerateEntitiesApplication.java | 46 ++++++++- .../raw/MigrateDbEntitiesApplication.java | 98 +++++++++++-------- .../graph/generate_entities_parameters.json | 6 ++ .../oa/graph/raw_all/oozie_app/workflow.xml | 1 + 4 files changed, 106 insertions(+), 45 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index 48c0df334..fcd6f459a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -27,6 +27,7 @@ import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; @@ -37,6 +38,22 @@ public class GenerateEntitiesApplication { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + /** + * Operation mode + */ + enum Mode { + + /** + * Groups all the objects by id to merge them + */ + claim, + + /** + * Default mode + */ + graph + } + public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils @@ -67,13 +84,19 @@ public class GenerateEntitiesApplication { .orElse(true); log.info("shouldHashId: {}", shouldHashId); + final Mode mode = Optional + .ofNullable(parser.get("mode")) + .map(Mode::valueOf) + .orElse(Mode.graph); + log.info("mode: {}", mode); + final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService); final SparkConf conf = new SparkConf(); runWithSparkSession(conf, isSparkSessionManaged, spark -> { HdfsSupport.remove(targetPath, spark.sparkContext().hadoopConfiguration()); - generateEntities(spark, vocs, sourcePaths, targetPath, shouldHashId); + generateEntities(spark, vocs, sourcePaths, targetPath, shouldHashId, mode); }); } @@ -82,7 +105,8 @@ public class GenerateEntitiesApplication { final VocabularyGroup vocs, final String sourcePaths, final String targetPath, - final boolean shouldHashId) { + final boolean shouldHashId, + final Mode mode) { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final List existingSourcePaths = Arrays @@ -106,7 +130,23 @@ public class GenerateEntitiesApplication { .flatMap(list -> list.iterator())); } - inputRdd + switch (mode) { + case claim: + save( + inputRdd + .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf)) + .reduceByKey((o1, o2) -> OafMapperUtils.merge(o1, o2)) + .map(Tuple2::_2), + targetPath); + break; + case graph: + save(inputRdd, targetPath); + break; + } + } + + private static void save(final JavaRDD rdd, final String targetPath) { + rdd .map( oaf -> oaf.getClass().getSimpleName().toLowerCase() + "|" diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index 9b6400a32..1a55499c1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -13,7 +13,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; -import java.util.Objects; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -36,7 +35,6 @@ import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Field; -import eu.dnetlib.dhp.schema.oaf.Journal; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Organization; @@ -54,6 +52,13 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i private static final Logger log = LoggerFactory.getLogger(MigrateDbEntitiesApplication.class); + private static final DataInfo DATA_INFO_CLAIM = dataInfo( + false, null, false, false, + qualifier(USER_CLAIM, USER_CLAIM, DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS), "0.9"); + + private static final List COLLECTED_FROM_CLAIM = listKeyValues( + createOpenaireId(10, "infrastruct_::openaire", true), "OpenAIRE"); + public static final String SOURCE_TYPE = "source_type"; public static final String TARGET_TYPE = "target_type"; @@ -443,25 +448,19 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i } public List processClaims(final ResultSet rs) { - - final DataInfo info = dataInfo( - false, null, false, false, - qualifier(USER_CLAIM, USER_CLAIM, DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS), "0.9"); - - final List collectedFrom = listKeyValues( - createOpenaireId(10, "infrastruct_::openaire", true), "OpenAIRE"); - try { - if (rs.getString(SOURCE_TYPE).equals("context")) { + final String sourceType = rs.getString(SOURCE_TYPE); + final String targetType = rs.getString(TARGET_TYPE); + if (sourceType.equals("context")) { final Result r; - if (rs.getString(TARGET_TYPE).equals("dataset")) { + if (targetType.equals("dataset")) { r = new Dataset(); r.setResulttype(DATASET_DEFAULT_RESULTTYPE); - } else if (rs.getString(TARGET_TYPE).equals("software")) { + } else if (targetType.equals("software")) { r = new Software(); r.setResulttype(SOFTWARE_DEFAULT_RESULTTYPE); - } else if (rs.getString(TARGET_TYPE).equals("other")) { + } else if (targetType.equals("other")) { r = new OtherResearchProduct(); r.setResulttype(ORP_DEFAULT_RESULTTYPE); } else { @@ -470,57 +469,72 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i } r.setId(createOpenaireId(50, rs.getString("target_id"), false)); r.setLastupdatetimestamp(lastUpdateTimestamp); - r.setContext(prepareContext(rs.getString("source_id"), info)); - r.setDataInfo(info); - r.setCollectedfrom(collectedFrom); + r.setContext(prepareContext(rs.getString("source_id"), DATA_INFO_CLAIM)); + r.setDataInfo(DATA_INFO_CLAIM); + r.setCollectedfrom(COLLECTED_FROM_CLAIM); return Arrays.asList(r); } else { final String validationDate = rs.getString("curation_date"); - final String sourceId = createOpenaireId(rs.getString(SOURCE_TYPE), rs.getString("source_id"), false); - final String targetId = createOpenaireId(rs.getString(TARGET_TYPE), rs.getString("target_id"), false); + final String sourceId = createOpenaireId(sourceType, rs.getString("source_id"), false); + final String targetId = createOpenaireId(targetType, rs.getString("target_id"), false); final Relation r1 = new Relation(); final Relation r2 = new Relation(); - r1.setValidated(true); - r1.setValidationDate(validationDate); - r1.setCollectedfrom(collectedFrom); + if (StringUtils.isNotBlank(validationDate)) { + r1.setValidated(true); + r1.setValidationDate(validationDate); + r2.setValidated(true); + r2.setValidationDate(validationDate); + } + r1.setCollectedfrom(COLLECTED_FROM_CLAIM); r1.setSource(sourceId); r1.setTarget(targetId); - r1.setDataInfo(info); + r1.setDataInfo(DATA_INFO_CLAIM); r1.setLastupdatetimestamp(lastUpdateTimestamp); - r2.setValidationDate(validationDate); - r2.setValidated(true); - r2.setCollectedfrom(collectedFrom); + r2.setCollectedfrom(COLLECTED_FROM_CLAIM); r2.setSource(targetId); r2.setTarget(sourceId); - r2.setDataInfo(info); + r2.setDataInfo(DATA_INFO_CLAIM); r2.setLastupdatetimestamp(lastUpdateTimestamp); - if (rs.getString(SOURCE_TYPE).equals("project")) { - r1.setRelType(RESULT_PROJECT); - r1.setSubRelType(OUTCOME); - r1.setRelClass(PRODUCES); + final String semantics = rs.getString("semantics"); - r2.setRelType(RESULT_PROJECT); - r2.setSubRelType(OUTCOME); - r2.setRelClass(IS_PRODUCED_BY); - } else { - r1.setRelType(RESULT_RESULT); - r1.setSubRelType(RELATIONSHIP); - r1.setRelClass(IS_RELATED_TO); + switch (semantics) { + case "resultResult_relationship_isRelatedTo": + r1.setRelType(RESULT_RESULT); + r1.setSubRelType(RELATIONSHIP); + r1.setRelClass(IS_RELATED_TO); - r2.setRelType(RESULT_RESULT); - r2.setSubRelType(RELATIONSHIP); - r2.setRelClass(IS_RELATED_TO); + r2.setRelType(RESULT_RESULT); + r2.setSubRelType(RELATIONSHIP); + r2.setRelClass(IS_RELATED_TO); + break; + case "resultProject_outcome_produces": + if (!"project".equals(sourceType)) { + throw new IllegalStateException( + String + .format( + "invalid claim, sourceId: %s, targetId: %s, semantics: %s", + sourceId, targetId, semantics)); + } + r1.setRelType(RESULT_PROJECT); + r1.setSubRelType(OUTCOME); + r1.setRelClass(PRODUCES); + + r2.setRelType(RESULT_PROJECT); + r2.setSubRelType(OUTCOME); + r2.setRelClass(IS_PRODUCED_BY); + break; + default: + throw new IllegalArgumentException("claim semantics not managed: " + semantics); } return Arrays.asList(r1, r2); } - } catch (final Exception e) { throw new RuntimeException(e); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json index 4b3ebba38..52cbbf45f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json @@ -28,5 +28,11 @@ "paramLongName": "shouldHashId", "paramDescription": "should ids be hashed?", "paramRequired": false + }, + { + "paramName": "m", + "paramLongName": "mode", + "paramDescription": "operation mode", + "paramRequired": false } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index 0821f04ea..7f1ecb39f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -460,6 +460,7 @@ --targetPath${workingDir}/entities_claim --isLookupUrl${isLookupUrl} --shouldHashId${shouldHashId} + --modeclaim