From 5edcc6832a567cb24b9c022f462a6a1193c513a1 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 23 Jun 2021 09:53:29 +0200 Subject: [PATCH 1/5] applying sonarLint suggestions --- .../project/PrepareProgramme.java | 48 +++++++++---------- 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java index 760e5131d..9e852eb77 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java @@ -144,22 +144,7 @@ public class PrepareProgramme { JavaRDD h2020Programmes = programme .toJavaRDD() .mapToPair(csvProgramme -> new Tuple2<>(csvProgramme.getCode(), csvProgramme)) - .reduceByKey((a, b) -> { - if (!a.getLanguage().equals("en")) { - if (b.getLanguage().equalsIgnoreCase("en")) { - a.setTitle(b.getTitle()); - a.setLanguage(b.getLanguage()); - } - } - if (StringUtils.isEmpty(a.getShortTitle())) { - if (!StringUtils.isEmpty(b.getShortTitle())) { - a.setShortTitle(b.getShortTitle()); - } - } - - return a; - - }) + .reduceByKey(PrepareProgramme::groupProgrammeByCode) .map(p -> { CSVProgramme csvProgramme = p._2(); String programmeTitle = csvProgramme.getTitle().trim(); @@ -176,20 +161,31 @@ public class PrepareProgramme { return csvProgramme; }); - // prepareClassification(h2020Programmes); - - JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); + final JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD rdd = jsc.parallelize(prepareClassification(h2020Programmes), 1); rdd - .map(csvProgramme -> { - String tmp = OBJECT_MAPPER.writeValueAsString(csvProgramme); - return tmp; - }) + .map(OBJECT_MAPPER::writeValueAsString) .saveAsTextFile(outputPath); } + private static CSVProgramme groupProgrammeByCode(CSVProgramme a, CSVProgramme b) { + if (!a.getLanguage().equals("en")) { + if (b.getLanguage().equalsIgnoreCase("en")) { + a.setTitle(b.getTitle()); + a.setLanguage(b.getLanguage()); + } + } + if (StringUtils.isEmpty(a.getShortTitle())) { + if (!StringUtils.isEmpty(b.getShortTitle())) { + a.setShortTitle(b.getShortTitle()); + } + } + + return a; + } + private static List prepareClassification(JavaRDD h2020Programmes) { Object[] codedescription = h2020Programmes .map( @@ -240,10 +236,10 @@ public class PrepareProgramme { if (!ent.contains("Euratom")) { String parent; - String tmp_key = tmp[0] + "."; + String tmpKey = tmp[0] + "."; for (int i = 1; i < tmp.length - 1; i++) { - tmp_key += tmp[i] + "."; - parent = map.get(tmp_key)._1().toLowerCase().trim(); + tmpKey += tmp[i] + "."; + parent = map.get(tmpKey)._1().toLowerCase().trim(); if (parent.contains("|")) { parent = parent.substring(parent.lastIndexOf("|") + 1).trim(); } From 50fc5a64a00e704e7c03062060d4254197fa9fba Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 23 Jun 2021 11:49:42 +0200 Subject: [PATCH 2/5] [raw_all] Aggregator graph creation merges claims (updates) with the corresponding entity --- .../raw/GenerateEntitiesApplication.java | 46 ++++++++- .../raw/MigrateDbEntitiesApplication.java | 98 +++++++++++-------- .../graph/generate_entities_parameters.json | 6 ++ .../oa/graph/raw_all/oozie_app/workflow.xml | 1 + 4 files changed, 106 insertions(+), 45 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index 48c0df334..fcd6f459a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -27,6 +27,7 @@ import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; @@ -37,6 +38,22 @@ public class GenerateEntitiesApplication { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + /** + * Operation mode + */ + enum Mode { + + /** + * Groups all the objects by id to merge them + */ + claim, + + /** + * Default mode + */ + graph + } + public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils @@ -67,13 +84,19 @@ public class GenerateEntitiesApplication { .orElse(true); log.info("shouldHashId: {}", shouldHashId); + final Mode mode = Optional + .ofNullable(parser.get("mode")) + .map(Mode::valueOf) + .orElse(Mode.graph); + log.info("mode: {}", mode); + final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService); final SparkConf conf = new SparkConf(); runWithSparkSession(conf, isSparkSessionManaged, spark -> { HdfsSupport.remove(targetPath, spark.sparkContext().hadoopConfiguration()); - generateEntities(spark, vocs, sourcePaths, targetPath, shouldHashId); + generateEntities(spark, vocs, sourcePaths, targetPath, shouldHashId, mode); }); } @@ -82,7 +105,8 @@ public class GenerateEntitiesApplication { final VocabularyGroup vocs, final String sourcePaths, final String targetPath, - final boolean shouldHashId) { + final boolean shouldHashId, + final Mode mode) { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final List existingSourcePaths = Arrays @@ -106,7 +130,23 @@ public class GenerateEntitiesApplication { .flatMap(list -> list.iterator())); } - inputRdd + switch (mode) { + case claim: + save( + inputRdd + .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf)) + .reduceByKey((o1, o2) -> OafMapperUtils.merge(o1, o2)) + .map(Tuple2::_2), + targetPath); + break; + case graph: + save(inputRdd, targetPath); + break; + } + } + + private static void save(final JavaRDD rdd, final String targetPath) { + rdd .map( oaf -> oaf.getClass().getSimpleName().toLowerCase() + "|" diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index 9b6400a32..1a55499c1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -13,7 +13,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; -import java.util.Objects; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -36,7 +35,6 @@ import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Field; -import eu.dnetlib.dhp.schema.oaf.Journal; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Organization; @@ -54,6 +52,13 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i private static final Logger log = LoggerFactory.getLogger(MigrateDbEntitiesApplication.class); + private static final DataInfo DATA_INFO_CLAIM = dataInfo( + false, null, false, false, + qualifier(USER_CLAIM, USER_CLAIM, DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS), "0.9"); + + private static final List COLLECTED_FROM_CLAIM = listKeyValues( + createOpenaireId(10, "infrastruct_::openaire", true), "OpenAIRE"); + public static final String SOURCE_TYPE = "source_type"; public static final String TARGET_TYPE = "target_type"; @@ -443,25 +448,19 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i } public List processClaims(final ResultSet rs) { - - final DataInfo info = dataInfo( - false, null, false, false, - qualifier(USER_CLAIM, USER_CLAIM, DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS), "0.9"); - - final List collectedFrom = listKeyValues( - createOpenaireId(10, "infrastruct_::openaire", true), "OpenAIRE"); - try { - if (rs.getString(SOURCE_TYPE).equals("context")) { + final String sourceType = rs.getString(SOURCE_TYPE); + final String targetType = rs.getString(TARGET_TYPE); + if (sourceType.equals("context")) { final Result r; - if (rs.getString(TARGET_TYPE).equals("dataset")) { + if (targetType.equals("dataset")) { r = new Dataset(); r.setResulttype(DATASET_DEFAULT_RESULTTYPE); - } else if (rs.getString(TARGET_TYPE).equals("software")) { + } else if (targetType.equals("software")) { r = new Software(); r.setResulttype(SOFTWARE_DEFAULT_RESULTTYPE); - } else if (rs.getString(TARGET_TYPE).equals("other")) { + } else if (targetType.equals("other")) { r = new OtherResearchProduct(); r.setResulttype(ORP_DEFAULT_RESULTTYPE); } else { @@ -470,57 +469,72 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i } r.setId(createOpenaireId(50, rs.getString("target_id"), false)); r.setLastupdatetimestamp(lastUpdateTimestamp); - r.setContext(prepareContext(rs.getString("source_id"), info)); - r.setDataInfo(info); - r.setCollectedfrom(collectedFrom); + r.setContext(prepareContext(rs.getString("source_id"), DATA_INFO_CLAIM)); + r.setDataInfo(DATA_INFO_CLAIM); + r.setCollectedfrom(COLLECTED_FROM_CLAIM); return Arrays.asList(r); } else { final String validationDate = rs.getString("curation_date"); - final String sourceId = createOpenaireId(rs.getString(SOURCE_TYPE), rs.getString("source_id"), false); - final String targetId = createOpenaireId(rs.getString(TARGET_TYPE), rs.getString("target_id"), false); + final String sourceId = createOpenaireId(sourceType, rs.getString("source_id"), false); + final String targetId = createOpenaireId(targetType, rs.getString("target_id"), false); final Relation r1 = new Relation(); final Relation r2 = new Relation(); - r1.setValidated(true); - r1.setValidationDate(validationDate); - r1.setCollectedfrom(collectedFrom); + if (StringUtils.isNotBlank(validationDate)) { + r1.setValidated(true); + r1.setValidationDate(validationDate); + r2.setValidated(true); + r2.setValidationDate(validationDate); + } + r1.setCollectedfrom(COLLECTED_FROM_CLAIM); r1.setSource(sourceId); r1.setTarget(targetId); - r1.setDataInfo(info); + r1.setDataInfo(DATA_INFO_CLAIM); r1.setLastupdatetimestamp(lastUpdateTimestamp); - r2.setValidationDate(validationDate); - r2.setValidated(true); - r2.setCollectedfrom(collectedFrom); + r2.setCollectedfrom(COLLECTED_FROM_CLAIM); r2.setSource(targetId); r2.setTarget(sourceId); - r2.setDataInfo(info); + r2.setDataInfo(DATA_INFO_CLAIM); r2.setLastupdatetimestamp(lastUpdateTimestamp); - if (rs.getString(SOURCE_TYPE).equals("project")) { - r1.setRelType(RESULT_PROJECT); - r1.setSubRelType(OUTCOME); - r1.setRelClass(PRODUCES); + final String semantics = rs.getString("semantics"); - r2.setRelType(RESULT_PROJECT); - r2.setSubRelType(OUTCOME); - r2.setRelClass(IS_PRODUCED_BY); - } else { - r1.setRelType(RESULT_RESULT); - r1.setSubRelType(RELATIONSHIP); - r1.setRelClass(IS_RELATED_TO); + switch (semantics) { + case "resultResult_relationship_isRelatedTo": + r1.setRelType(RESULT_RESULT); + r1.setSubRelType(RELATIONSHIP); + r1.setRelClass(IS_RELATED_TO); - r2.setRelType(RESULT_RESULT); - r2.setSubRelType(RELATIONSHIP); - r2.setRelClass(IS_RELATED_TO); + r2.setRelType(RESULT_RESULT); + r2.setSubRelType(RELATIONSHIP); + r2.setRelClass(IS_RELATED_TO); + break; + case "resultProject_outcome_produces": + if (!"project".equals(sourceType)) { + throw new IllegalStateException( + String + .format( + "invalid claim, sourceId: %s, targetId: %s, semantics: %s", + sourceId, targetId, semantics)); + } + r1.setRelType(RESULT_PROJECT); + r1.setSubRelType(OUTCOME); + r1.setRelClass(PRODUCES); + + r2.setRelType(RESULT_PROJECT); + r2.setSubRelType(OUTCOME); + r2.setRelClass(IS_PRODUCED_BY); + break; + default: + throw new IllegalArgumentException("claim semantics not managed: " + semantics); } return Arrays.asList(r1, r2); } - } catch (final Exception e) { throw new RuntimeException(e); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json index 4b3ebba38..52cbbf45f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json @@ -28,5 +28,11 @@ "paramLongName": "shouldHashId", "paramDescription": "should ids be hashed?", "paramRequired": false + }, + { + "paramName": "m", + "paramLongName": "mode", + "paramDescription": "operation mode", + "paramRequired": false } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index 0821f04ea..7f1ecb39f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -460,6 +460,7 @@ --targetPath${workingDir}/entities_claim --isLookupUrl${isLookupUrl} --shouldHashId${shouldHashId} + --modeclaim From 4dc9ebf2171231d2ff9e5fc6a03acde32b67de5f Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 23 Jun 2021 14:38:07 +0200 Subject: [PATCH 3/5] [raw_all] fixed unit test --- .../eu/dnetlib/dhp/oa/graph/raw/claimsrel_resultset_entry.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/claimsrel_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/claimsrel_resultset_entry.json index 28fa70035..c6e57ac0a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/claimsrel_resultset_entry.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/claimsrel_resultset_entry.json @@ -21,7 +21,7 @@ }, { "field": "semantics", - "type": "not_used", + "type": "string", "value": "resultProject_outcome_produces" } ] \ No newline at end of file From 2e8fd2c531ba28ab2006693f8b0373691a7b8627 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 23 Jun 2021 14:38:24 +0200 Subject: [PATCH 4/5] cleanup --- .../dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java index 7e911f2b7..d529d2eb2 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java @@ -26,7 +26,6 @@ import org.mockito.junit.jupiter.MockitoExtension; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.oaf.*; From 67afd06cd182b46ff4a0ae3036209dadccf533db Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 24 Jun 2021 12:10:17 +0200 Subject: [PATCH 5/5] [cleaning] cleaning instance.pid and instance.alternateidentifier using the same procedure used to clean result.pid --- .../oaf/utils/GraphCleaningFunctions.java | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java index 7088e56e1..a75cc52e6 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java @@ -224,24 +224,20 @@ public class GraphCleaningFunctions extends CleaningFunctions { if (Objects.nonNull(r.getInstance())) { for (Instance i : r.getInstance()) { + if (Objects.nonNull(i.getPid())) { + i.setPid(processPidCleaning(i.getPid())); + } + if (Objects.nonNull(i.getAlternateIdentifier())) { + i.setAlternateIdentifier(processPidCleaning(i.getAlternateIdentifier())); + } Optional .ofNullable(i.getPid()) .ifPresent(pid -> { - final Set pids = pid - .stream() - .filter(Objects::nonNull) - .filter(p -> StringUtils.isNotBlank(p.getValue())) - .collect(Collectors.toCollection(HashSet::new)); - + final Set pids = Sets.newHashSet(pid); Optional .ofNullable(i.getAlternateIdentifier()) .ifPresent(altId -> { - final Set altIds = altId - .stream() - .filter(Objects::nonNull) - .filter(p -> StringUtils.isNotBlank(p.getValue())) - .collect(Collectors.toCollection(HashSet::new)); - + final Set altIds = Sets.newHashSet(altId); i.setAlternateIdentifier(Lists.newArrayList(Sets.difference(altIds, pids))); }); });