From 973aa7dca6508e2c1e82fa7c775b3d40c18fe45b Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 6 Nov 2024 12:29:06 +0100 Subject: [PATCH 01/15] [dedup] force the Relation schema when reading the merge rels --- .../java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java | 1 + 1 file changed, 1 insertion(+) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index c7efce4d74..b0bc314e2b 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -69,6 +69,7 @@ public class SparkPropagateRelation extends AbstractSparkAction { Dataset mergeRels = spark .read() + .schema(REL_BEAN_ENC.schema()) .load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) .as(REL_BEAN_ENC); From f7bb53fe7895105f4a7a73b7de14cd6b4121589e Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 7 Nov 2024 01:04:43 +0100 Subject: [PATCH 02/15] [orcid enrichment] added missing workflow parameter: workingDir --- .../resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml index 4031da15a0..1ece2c0be4 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml @@ -51,6 +51,7 @@ --orcidPath${orcidPath} --targetPath${targetPath} --graphPath${graphPath} + --workingDir${workingDir} --masteryarn From 8f5171557e20ed58f69f7abe3af1ad0a85b10ba3 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Thu, 7 Nov 2024 12:22:34 +0100 Subject: [PATCH 03/15] Remove ORCID information when the same ORCID ID is used multiple times in the same result for different authors --- .../oaf/utils/GraphCleaningFunctions.java | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java index b6574da160..9153a6476b 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java @@ -2,7 +2,6 @@ package eu.dnetlib.dhp.schema.oaf.utils; import static eu.dnetlib.dhp.schema.common.ModelConstants.*; -import static eu.dnetlib.dhp.schema.common.ModelConstants.OPENAIRE_META_RESOURCE_TYPE; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.getProvenance; import java.net.MalformedURLException; @@ -696,6 +695,7 @@ public class GraphCleaningFunctions extends CleaningFunctions { } } + // set ORCID_PENDING to all orcid values that are not coming from ORCID provenance for (Author a : r.getAuthor()) { if (Objects.isNull(a.getPid())) { a.setPid(Lists.newArrayList()); @@ -752,6 +752,40 @@ public class GraphCleaningFunctions extends CleaningFunctions { .collect(Collectors.toList())); } } + + // Identify clashing ORCIDS:that is same ORCID associated to multiple authors in this result + Map clashing_orcid = new HashMap<>(); + + for (Author a : r.getAuthor()) { + a + .getPid() + .stream() + .filter( + p -> StringUtils + .contains(StringUtils.lowerCase(p.getQualifier().getClassid()), ORCID_PENDING)) + .map(StructuredProperty::getValue) + .distinct() + .forEach(orcid -> clashing_orcid.compute(orcid, (k, v) -> (v == null) ? 1 : v + 1)); + } + + Set clashing = clashing_orcid + .entrySet() + .stream() + .filter(ee -> ee.getValue() > 1) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + + // filter out clashing orcids + for (Author a : r.getAuthor()) { + a + .setPid( + a + .getPid() + .stream() + .filter(p -> !clashing.contains(p.getValue())) + .collect(Collectors.toList())); + } + } if (value instanceof Publication) { From 6fd9ec856608c3ca9baeedfa3677a64287483d92 Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Thu, 7 Nov 2024 13:55:31 +0100 Subject: [PATCH 04/15] [danishfunders] added link for danish funders versus the unidentified project for IRFD (501100004836) CF (501100002808) and NNF(501100009708) --- .../doiboost/crossref/Crossref2Oaf.scala | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala index f284a063e9..bf11ed0a8c 100644 --- a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala @@ -566,7 +566,23 @@ case object Crossref2Oaf { queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) case _ => logger.debug("no match for " + funder.DOI.get) - + //Add for Danish funders + //Independent Research Fund Denmark (IRFD) + case "10.13039/501100004836" => + val targetId = getProjectId("irfd________", "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) + queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) + //Carlsberg Foundation (CF) + case "10.13039/501100002808" => + val targetId = getProjectId("cf__________", "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) + queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) + //Novo Nordisk Foundation (NNF) + case "10.13039/501100009708" => + val targetId = getProjectId("nnf_________", "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) + queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) + case _ => logger.debug("no match for " + funder.DOI.get) } } else { From b0283fe94c168b87176f283f414ef2c4dfd3cdab Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Mon, 11 Nov 2024 14:57:57 +0100 Subject: [PATCH 05/15] [person] fix provenance of pid in person when it is orcid (classid entityregistry to avoid the cleaning put orcid_pending) --- .../dhp/actionmanager/personentity/ExtractPerson.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java index bf2c19c3da..6830f2291b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java @@ -345,7 +345,16 @@ public class ExtractPerson implements Serializable { OafMapperUtils .structuredProperty( op.getOrcid(), ModelConstants.ORCID, ModelConstants.ORCID_CLASSNAME, - ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, null)); + ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, + OafMapperUtils.dataInfo(false, + null, + false, + false, + OafMapperUtils.qualifier(ModelConstants.SYSIMPORT_CROSSWALK_ENTITYREGISTRY, + ModelConstants.SYSIMPORT_CROSSWALK_ENTITYREGISTRY, + ModelConstants.DNET_PID_TYPES, + ModelConstants.DNET_PID_TYPES), + "0.91"))); person.setDateofcollection(op.getLastModifiedDate()); person.setOriginalId(Arrays.asList(op.getOrcid())); person.setDataInfo(ORCIDDATAINFO); From f1ea9da5bcda277451416253982c24233e40d87b Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Mon, 11 Nov 2024 15:37:56 +0100 Subject: [PATCH 06/15] [person] checked type in inferenceprovenance --- .../bipaffiliations/PrepareAffiliationRelations.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 15c1cc3760..75e58e6654 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -104,22 +104,22 @@ public class PrepareAffiliationRelations implements Serializable { .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); JavaPairRDD crossrefRelations = prepareAffiliationRelationsNewModel( - spark, crossrefInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::crossref"); + spark, crossrefInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":crossref"); JavaPairRDD pubmedRelations = prepareAffiliationRelations( - spark, pubmedInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::pubmed"); + spark, pubmedInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":pubmed"); JavaPairRDD openAPCRelations = prepareAffiliationRelationsNewModel( - spark, openapcInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::openapc"); + spark, openapcInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":openapc"); JavaPairRDD dataciteRelations = prepareAffiliationRelationsNewModel( - spark, dataciteInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::datacite"); + spark, dataciteInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":datacite"); JavaPairRDD webCrawlRelations = prepareAffiliationRelationsNewModel( - spark, webcrawlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::rawaff"); + spark, webcrawlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":rawaff"); JavaPairRDD publisherRelations = prepareAffiliationRelationFromPublisherNewModel( - spark, publisherlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::webcrawl"); + spark, publisherlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":webcrawl"); crossrefRelations .union(pubmedRelations) From 250f101779a16ffbec1c16d3e0dc1050d6533c87 Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Mon, 11 Nov 2024 16:04:06 +0100 Subject: [PATCH 07/15] [person] fixed issue in creating project identifier for the graph for person->project relations --- .../dhp/actionmanager/personentity/ExtractPerson.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java index 6830f2291b..6976def4c8 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java @@ -15,6 +15,7 @@ import java.util.stream.Collectors; import org.apache.commons.cli.ParseException; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -193,8 +194,8 @@ public class ExtractPerson implements Serializable { private static Relation getProjectRelation(String project, String orcid, String role) { String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid); - String target = PROJECT_ID_PREFIX + project.substring(0, 14) - + IdentifierFactory.md5(project.substring(15)); + String target = PROJECT_ID_PREFIX + StringUtils.substringBefore(project, "::") + "::" + + IdentifierFactory.md5(StringUtils.substringAfter(project, "::")); List properties = new ArrayList<>(); Relation relation = OafMapperUtils From 6c5df761e21d5aea6c203cb2ca2374b33a9219e5 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 12 Nov 2024 14:18:04 +0100 Subject: [PATCH 08/15] enforce resulttype based on the dnet:result_typologies vocabulary and upon merge --- .../dhp/oa/merge/GroupEntitiesSparkJob.java | 7 ++- .../dhp/schema/oaf/utils/MergeUtils.java | 62 ++++++++++++++++--- .../dhp/oa/dedup/DedupRecordFactory.java | 2 +- .../dhp/oa/dedup/DatasetMergerTest.java | 4 +- .../raw/AbstractMdRecordToOafMapper.java | 11 ++-- .../raw/GenerateEntitiesApplication.java | 2 +- .../dhp/sx/graph/SparkCreateInputGraph.scala | 2 +- 7 files changed, 66 insertions(+), 24 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java index 24de1a787a..98ec09277d 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java @@ -2,8 +2,7 @@ package eu.dnetlib.dhp.oa.merge; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import static org.apache.spark.sql.functions.col; -import static org.apache.spark.sql.functions.when; +import static org.apache.spark.sql.functions.*; import java.util.Map; import java.util.Optional; @@ -135,7 +134,9 @@ public class GroupEntitiesSparkJob { .applyCoarVocabularies(entity, vocs), OAFENTITY_KRYO_ENC) .groupByKey((MapFunction) OafEntity::getId, Encoders.STRING()) - .mapGroups((MapGroupsFunction) MergeUtils::mergeById, OAFENTITY_KRYO_ENC) + .mapGroups( + (MapGroupsFunction) (key, group) -> MergeUtils.mergeById(group, vocs), + OAFENTITY_KRYO_ENC) .map( (MapFunction>) t -> new Tuple2<>( t.getClass().getName(), t), diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java index 4c411a1550..d7e08fca77 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java @@ -23,24 +23,30 @@ import org.apache.commons.lang3.tuple.Pair; import com.github.sisyphsu.dateparser.DateParserUtils; import com.google.common.base.Joiner; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.schema.common.AccessRightComparator; +import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; public class MergeUtils { - public static T mergeById(String s, Iterator oafEntityIterator) { - return mergeGroup(s, oafEntityIterator, true); + public static T mergeById(Iterator oafEntityIterator, VocabularyGroup vocs) { + return mergeGroup(oafEntityIterator, true, vocs); } - public static T mergeGroup(String s, Iterator oafEntityIterator) { - return mergeGroup(s, oafEntityIterator, false); + public static T mergeGroup(Iterator oafEntityIterator) { + return mergeGroup(oafEntityIterator, false); } - public static T mergeGroup(String s, Iterator oafEntityIterator, - boolean checkDelegateAuthority) { + public static T mergeGroup(Iterator oafEntityIterator, boolean checkDelegateAuthority) { + return mergeGroup(oafEntityIterator, checkDelegateAuthority, null); + } + + public static T mergeGroup(Iterator oafEntityIterator, + boolean checkDelegateAuthority, VocabularyGroup vocs) { ArrayList sortedEntities = new ArrayList<>(); oafEntityIterator.forEachRemaining(sortedEntities::add); @@ -49,13 +55,49 @@ public class MergeUtils { Iterator it = sortedEntities.iterator(); T merged = it.next(); - while (it.hasNext()) { - merged = checkedMerge(merged, it.next(), checkDelegateAuthority); + if (!it.hasNext() && merged instanceof Result && vocs != null) { + return enforceResultType(vocs, (Result) merged); + } else { + while (it.hasNext()) { + merged = checkedMerge(merged, it.next(), checkDelegateAuthority); + } } - return merged; } + private static T enforceResultType(VocabularyGroup vocs, Result mergedResult) { + if (Optional.ofNullable(mergedResult.getInstance()).map(List::isEmpty).orElse(true)) { + return (T) mergedResult; + } else { + final Instance i = mergedResult.getInstance().get(0); + + if (!vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) { + return (T) mergedResult; + } else { + final Qualifier expectedResultType = vocs + .getSynonymAsQualifier( + ModelConstants.DNET_RESULT_TYPOLOGIES, + i.getInstancetype().getClassid()); + + // there is a clash among the result types + if (!expectedResultType.getClassid().equals(mergedResult.getResulttype().getClassid())) { + try { + String resulttype = expectedResultType.getClassid(); + if (EntityType.otherresearchproduct.toString().equals(resulttype)) { + resulttype = "other"; + } + Result result = (Result) ModelSupport.oafTypes.get(resulttype).newInstance(); + return (T) mergeResultFields(result, mergedResult); + } catch (InstantiationException | IllegalAccessException e) { + throw new IllegalStateException(e); + } + } else { + return (T) mergedResult; + } + } + } + } + public static T checkedMerge(final T left, final T right, boolean checkDelegateAuthority) { return (T) merge(left, right, checkDelegateAuthority); } @@ -106,7 +148,7 @@ public class MergeUtils { return mergeSoftware((Software) left, (Software) right); } - return mergeResultFields((Result) left, (Result) right); + return left; } else if (sameClass(left, right, Datasource.class)) { // TODO final int trust = compareTrust(left, right); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index 44482cfdb5..f6a4365434 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -135,7 +135,7 @@ public class DedupRecordFactory { return Collections.emptyIterator(); } - OafEntity mergedEntity = MergeUtils.mergeGroup(dedupId, cliques.iterator()); + OafEntity mergedEntity = MergeUtils.mergeGroup(cliques.iterator()); // dedup records do not have date of transformation attribute mergedEntity.setDateoftransformation(null); mergedEntity diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/DatasetMergerTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/DatasetMergerTest.java index 726814c43e..a79047590f 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/DatasetMergerTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/DatasetMergerTest.java @@ -46,8 +46,8 @@ class DatasetMergerTest implements Serializable { } @Test - void datasetMergerTest() throws InstantiationException, IllegalAccessException, InvocationTargetException { - Dataset pub_merged = MergeUtils.mergeGroup(dedupId, datasets.stream().map(Tuple2::_2).iterator()); + void datasetMergerTest() { + Dataset pub_merged = MergeUtils.mergeGroup(datasets.stream().map(Tuple2::_2).iterator()); // verify id assertEquals(dedupId, pub_merged.getId()); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index 2436a272c9..ba6887a2e9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -155,7 +155,7 @@ public abstract class AbstractMdRecordToOafMapper { final List instances = prepareInstances(doc, entityInfo, collectedFrom, hostedBy); - final String type = getResultType(doc, instances); + final String type = getResultType(instances); return createOafs(doc, type, instances, collectedFrom, entityInfo, lastUpdateTimestamp); } catch (final DocumentException e) { @@ -164,10 +164,9 @@ public abstract class AbstractMdRecordToOafMapper { } } - protected String getResultType(final Document doc, final List instances) { - final String type = doc.valueOf("//dr:CobjCategory/@type"); + protected String getResultType(final List instances) { - if (StringUtils.isBlank(type) && this.vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) { + if (this.vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) { final String instanceType = instances .stream() .map(i -> i.getInstancetype().getClassid()) @@ -178,9 +177,9 @@ public abstract class AbstractMdRecordToOafMapper { .ofNullable(this.vocs.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, instanceType)) .map(Qualifier::getClassid) .orElse("0000"); + } else { + throw new IllegalStateException("Missing vocabulary: " + ModelConstants.DNET_RESULT_TYPOLOGIES); } - - return type; } private KeyValue getProvenanceDatasource(final Document doc, final String xpathId, final String xpathName) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index c3806c211f..357fae4704 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -133,7 +133,7 @@ public class GenerateEntitiesApplication extends AbstractMigrationApplication { inputRdd .keyBy(oaf -> ModelSupport.idFn().apply(oaf)) .groupByKey() - .map(t -> MergeUtils.mergeGroup(t._1, t._2.iterator())), + .map(t -> MergeUtils.mergeGroup(t._2.iterator())), // .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf)) // .reduceByKey(MergeUtils::merge) // .map(Tuple2::_2), diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala index d94a23947a..42299cd345 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala @@ -133,7 +133,7 @@ object SparkCreateInputGraph { val ds: Dataset[T] = spark.read.load(sourcePath).as[T] ds.groupByKey(_.getId) - .mapGroups { (id, it) => MergeUtils.mergeGroup(id, it.asJava).asInstanceOf[T] } + .mapGroups { (id, it) => MergeUtils.mergeGroup(it.asJava).asInstanceOf[T] } // .reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] } // .map(_) .write From 07f267bb10911d62e30a2b299db3c50fcd1746a2 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 13 Nov 2024 08:14:26 +0100 Subject: [PATCH 09/15] fix vocabulary lookup in mergeutils --- .../dhp/schema/oaf/utils/MergeUtils.java | 425 +++++++++--------- 1 file changed, 215 insertions(+), 210 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java index d7e08fca77..dc76860f81 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java @@ -16,6 +16,8 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.schema.common.EntityType; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -23,10 +25,8 @@ import org.apache.commons.lang3.tuple.Pair; import com.github.sisyphsu.dateparser.DateParserUtils; import com.google.common.base.Joiner; -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.schema.common.AccessRightComparator; -import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; @@ -46,7 +46,7 @@ public class MergeUtils { } public static T mergeGroup(Iterator oafEntityIterator, - boolean checkDelegateAuthority, VocabularyGroup vocs) { + boolean checkDelegateAuthority, VocabularyGroup vocs) { ArrayList sortedEntities = new ArrayList<>(); oafEntityIterator.forEachRemaining(sortedEntities::add); @@ -74,11 +74,16 @@ public class MergeUtils { if (!vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) { return (T) mergedResult; } else { - final Qualifier expectedResultType = vocs - .getSynonymAsQualifier( + final Qualifier expectedResultType = vocs.lookupTermBySynonym( ModelConstants.DNET_RESULT_TYPOLOGIES, i.getInstancetype().getClassid()); + if (Objects.isNull(expectedResultType)) { + throw new IllegalArgumentException( + "instance type not bound to any result type in dnet:result_typologies: " + + i.getInstancetype().getClassid()); + } + // there is a clash among the result types if (!expectedResultType.getClassid().equals(mergedResult.getResulttype().getClassid())) { try { @@ -117,10 +122,10 @@ public class MergeUtils { return mergeRelation((Relation) left, (Relation) right); } else { throw new RuntimeException( - String - .format( - "MERGE_FROM_AND_GET incompatible types: %s, %s", - left.getClass().getCanonicalName(), right.getClass().getCanonicalName())); + String + .format( + "MERGE_FROM_AND_GET incompatible types: %s, %s", + left.getClass().getCanonicalName(), right.getClass().getCanonicalName())); } } @@ -159,10 +164,10 @@ public class MergeUtils { return mergeProject((Project) left, (Project) right); } else { throw new RuntimeException( - String - .format( - "MERGE_FROM_AND_GET incompatible types: %s, %s", - left.getClass().getCanonicalName(), right.getClass().getCanonicalName())); + String + .format( + "MERGE_FROM_AND_GET incompatible types: %s, %s", + left.getClass().getCanonicalName(), right.getClass().getCanonicalName())); } } @@ -253,7 +258,7 @@ public class MergeUtils { } private static List mergeLists(final List left, final List right, int trust, - Function keyExtractor, BinaryOperator merger) { + Function keyExtractor, BinaryOperator merger) { if (left == null || left.isEmpty()) { return right != null ? right : new ArrayList<>(); } else if (right == null || right.isEmpty()) { @@ -264,11 +269,11 @@ public class MergeUtils { List l = trust >= 0 ? right : left; return new ArrayList<>(Stream - .concat(h.stream(), l.stream()) - .filter(Objects::nonNull) - .distinct() - .collect(Collectors.toMap(keyExtractor, v -> v, merger, LinkedHashMap::new)) - .values()); + .concat(h.stream(), l.stream()) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toMap(keyExtractor, v -> v, merger, LinkedHashMap::new)) + .values()); } private static List unionDistinctLists(final List left, final List right, int trust) { @@ -282,10 +287,10 @@ public class MergeUtils { List l = trust >= 0 ? right : left; return Stream - .concat(h.stream(), l.stream()) - .filter(Objects::nonNull) - .distinct() - .collect(Collectors.toList()); + .concat(h.stream(), l.stream()) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toList()); } private static List unionDistinctListOfString(final List l, final List r) { @@ -296,10 +301,10 @@ public class MergeUtils { } return Stream - .concat(l.stream(), r.stream()) - .filter(StringUtils::isNotBlank) - .distinct() - .collect(Collectors.toList()); + .concat(l.stream(), r.stream()) + .filter(StringUtils::isNotBlank) + .distinct() + .collect(Collectors.toList()); } // TODO review @@ -325,7 +330,7 @@ public class MergeUtils { } private static List unionTitle(List left, List right, - int trust) { + int trust) { if (left == null) { return right; } else if (right == null) { @@ -336,10 +341,10 @@ public class MergeUtils { List l = trust >= 0 ? right : left; return Stream - .concat(h.stream(), l.stream()) - .filter(Objects::isNull) - .distinct() - .collect(Collectors.toList()); + .concat(h.stream(), l.stream()) + .filter(Objects::isNull) + .distinct() + .collect(Collectors.toList()); } /** @@ -374,8 +379,8 @@ public class MergeUtils { merged.setPid(mergeLists(merged.getPid(), enrich.getPid(), trust, MergeUtils::spKeyExtractor, (p1, p2) -> p1)); merged.setDateofcollection(LocalDateTime.now().toString()); merged - .setDateoftransformation( - chooseString(merged.getDateoftransformation(), enrich.getDateoftransformation(), trust)); + .setDateoftransformation( + chooseString(merged.getDateoftransformation(), enrich.getDateoftransformation(), trust)); merged.setExtraInfo(unionDistinctLists(merged.getExtraInfo(), enrich.getExtraInfo(), trust)); // When merging records OAI provenance becomes null merged.setOaiprovenance(null); @@ -392,7 +397,7 @@ public class MergeUtils { checkArgument(Objects.equals(merge.getTarget(), enrich.getTarget()), "target ids must be equal"); checkArgument(Objects.equals(merge.getRelType(), enrich.getRelType()), "relType(s) must be equal"); checkArgument( - Objects.equals(merge.getSubRelType(), enrich.getSubRelType()), "subRelType(s) must be equal"); + Objects.equals(merge.getSubRelType(), enrich.getSubRelType()), "subRelType(s) must be equal"); checkArgument(Objects.equals(merge.getRelClass(), enrich.getRelClass()), "relClass(es) must be equal"); // merge.setProvenance(mergeLists(merge.getProvenance(), enrich.getProvenance())); @@ -403,10 +408,10 @@ public class MergeUtils { merge.setValidationDate(ModelSupport.oldest(merge.getValidationDate(), enrich.getValidationDate())); } catch (ParseException e) { throw new IllegalArgumentException(String - .format( - "invalid validation date format in relation [s:%s, t:%s]: %s", merge.getSource(), - merge.getTarget(), - merge.getValidationDate())); + .format( + "invalid validation date format in relation [s:%s, t:%s]: %s", merge.getSource(), + merge.getTarget(), + merge.getValidationDate())); } // TODO keyvalue merge @@ -420,7 +425,7 @@ public class MergeUtils { T merge = mergeOafEntityFields(original, enrich, trust); if (merge.getProcessingchargeamount() == null - || StringUtils.isBlank(merge.getProcessingchargeamount().getValue())) { + || StringUtils.isBlank(merge.getProcessingchargeamount().getValue())) { merge.setProcessingchargeamount(enrich.getProcessingchargeamount()); merge.setProcessingchargecurrency(enrich.getProcessingchargecurrency()); } @@ -452,8 +457,8 @@ public class MergeUtils { } merge - .setDateofacceptance( - mergeDateOfAcceptance(merge.getDateofacceptance(), enrich.getDateofacceptance(), trust)); + .setDateofacceptance( + mergeDateOfAcceptance(merge.getDateofacceptance(), enrich.getDateofacceptance(), trust)); merge.setPublisher(coalesce(merge.getPublisher(), enrich.getPublisher())); merge.setEmbargoenddate(coalesce(merge.getEmbargoenddate(), enrich.getEmbargoenddate())); @@ -468,7 +473,7 @@ public class MergeUtils { merge.setCoverage(unionDistinctLists(merge.getCoverage(), enrich.getCoverage(), trust)); if (enrich.getBestaccessright() != null - && new AccessRightComparator<>() + && new AccessRightComparator<>() .compare(enrich.getBestaccessright(), merge.getBestaccessright()) < 0) { merge.setBestaccessright(enrich.getBestaccessright()); } @@ -481,8 +486,8 @@ public class MergeUtils { // ok merge - .setExternalReference( - mergeExternalReference(merge.getExternalReference(), enrich.getExternalReference(), trust)); + .setExternalReference( + mergeExternalReference(merge.getExternalReference(), enrich.getExternalReference(), trust)); // instance enrichment or union // review instance equals => add pid to comparision @@ -490,17 +495,17 @@ public class MergeUtils { merge.setInstance(mergeInstances(merge.getInstance(), enrich.getInstance(), trust)); } else { final List enrichmentInstances = isAnEnrichment(merge) ? merge.getInstance() - : enrich.getInstance(); + : enrich.getInstance(); final List enrichedInstances = isAnEnrichment(merge) ? enrich.getInstance() - : merge.getInstance(); + : merge.getInstance(); if (isAnEnrichment(merge)) merge.setDataInfo(enrich.getDataInfo()); merge.setInstance(enrichInstances(enrichedInstances, enrichmentInstances)); } merge - .setEoscifguidelines( - mergeEosciifguidelines(merge.getEoscifguidelines(), enrich.getEoscifguidelines(), trust)); + .setEoscifguidelines( + mergeEosciifguidelines(merge.getEoscifguidelines(), enrich.getEoscifguidelines(), trust)); merge.setIsGreen(booleanOR(merge.getIsGreen(), enrich.getIsGreen())); // OK but should be list of values merge.setOpenAccessColor(coalesce(merge.getOpenAccessColor(), enrich.getOpenAccessColor())); @@ -526,7 +531,7 @@ public class MergeUtils { LocalDate enrich_date = LocalDate.parse(enrich.getValue(), DateTimeFormatter.ISO_DATE); if (enrich_date.getYear() > 1300 - && (merge_date.getYear() < 1300 || merge_date.isAfter(enrich_date))) { + && (merge_date.getYear() < 1300 || merge_date.isAfter(enrich_date))) { return enrich; } } catch (NullPointerException | DateTimeParseException e) { @@ -544,56 +549,56 @@ public class MergeUtils { private static List mergeInstances(List v1, List v2, int trust) { return mergeLists( - v1, v2, trust, - MergeUtils::instanceKeyExtractor, - MergeUtils::instanceMerger); + v1, v2, trust, + MergeUtils::instanceKeyExtractor, + MergeUtils::instanceMerger); } private static List mergeEosciifguidelines(List v1, List v2, - int trust) { + int trust) { return mergeLists( - v1, v2, trust, er -> Joiner - .on("||") - .useForNull("") - .join(er.getCode(), er.getLabel(), er.getUrl(), er.getSemanticRelation()), - (r, l) -> r); + v1, v2, trust, er -> Joiner + .on("||") + .useForNull("") + .join(er.getCode(), er.getLabel(), er.getUrl(), er.getSemanticRelation()), + (r, l) -> r); } private static List mergeExternalReference(List v1, - List v2, int trust) { + List v2, int trust) { return mergeLists( - v1, v2, trust, er -> Joiner - .on(',') - .useForNull("") - .join( - er.getSitename(), er.getLabel(), - er.getUrl(), toString(er.getQualifier()), er.getRefidentifier(), - er.getQuery(), toString(er.getDataInfo())), - (r, l) -> r); + v1, v2, trust, er -> Joiner + .on(',') + .useForNull("") + .join( + er.getSitename(), er.getLabel(), + er.getUrl(), toString(er.getQualifier()), er.getRefidentifier(), + er.getQuery(), toString(er.getDataInfo())), + (r, l) -> r); } private static String toString(DataInfo di) { return Joiner - .on(',') - .useForNull("") - .join( - di.getInvisible(), di.getInferred(), di.getDeletedbyinference(), di.getTrust(), - di.getInferenceprovenance(), toString(di.getProvenanceaction())); + .on(',') + .useForNull("") + .join( + di.getInvisible(), di.getInferred(), di.getDeletedbyinference(), di.getTrust(), + di.getInferenceprovenance(), toString(di.getProvenanceaction())); } private static String toString(Qualifier q) { return Joiner - .on(',') - .useForNull("") - .join(q.getClassid(), q.getClassname(), q.getSchemeid(), q.getSchemename()); + .on(',') + .useForNull("") + .join(q.getClassid(), q.getClassname(), q.getSchemeid(), q.getSchemename()); } private static String toString(StructuredProperty sp) { return Joiner - .on(',') - .useForNull("") - .join(toString(sp.getQualifier()), sp.getValue()); + .on(',') + .useForNull("") + .join(toString(sp.getQualifier()), sp.getValue()); } private static List mergeStructuredProperties(List v1, List v2, int trust) { @@ -632,17 +637,17 @@ public class MergeUtils { // 2. @@ // 3. || return String - .join( - "::", - kvKeyExtractor(i.getHostedby()), - kvKeyExtractor(i.getCollectedfrom()), - qualifierKeyExtractor(i.getAccessright()), - qualifierKeyExtractor(i.getInstancetype()), - Optional.ofNullable(i.getUrl()).map(u -> String.join("@@", u)).orElse(null), - Optional - .ofNullable(i.getPid()) - .map(pp -> pp.stream().map(MergeUtils::spKeyExtractor).collect(Collectors.joining("@@"))) - .orElse(null)); + .join( + "::", + kvKeyExtractor(i.getHostedby()), + kvKeyExtractor(i.getCollectedfrom()), + qualifierKeyExtractor(i.getAccessright()), + qualifierKeyExtractor(i.getInstancetype()), + Optional.ofNullable(i.getUrl()).map(u -> String.join("@@", u)).orElse(null), + Optional + .ofNullable(i.getPid()) + .map(pp -> pp.stream().map(MergeUtils::spKeyExtractor).collect(Collectors.joining("@@"))) + .orElse(null)); } private static Instance instanceMerger(Instance i1, Instance i2) { @@ -653,30 +658,30 @@ public class MergeUtils { i.setInstancetype(i1.getInstancetype()); i.setPid(mergeLists(i1.getPid(), i2.getPid(), 0, MergeUtils::spKeyExtractor, (sp1, sp2) -> sp1)); i - .setAlternateIdentifier( - mergeLists( - i1.getAlternateIdentifier(), i2.getAlternateIdentifier(), 0, MergeUtils::spKeyExtractor, - (sp1, sp2) -> sp1)); + .setAlternateIdentifier( + mergeLists( + i1.getAlternateIdentifier(), i2.getAlternateIdentifier(), 0, MergeUtils::spKeyExtractor, + (sp1, sp2) -> sp1)); i - .setRefereed( - Collections - .min( - Stream.of(i1.getRefereed(), i2.getRefereed()).collect(Collectors.toList()), - new RefereedComparator())); + .setRefereed( + Collections + .min( + Stream.of(i1.getRefereed(), i2.getRefereed()).collect(Collectors.toList()), + new RefereedComparator())); i - .setInstanceTypeMapping( - mergeLists( - i1.getInstanceTypeMapping(), i2.getInstanceTypeMapping(), 0, - MergeUtils::instanceTypeMappingKeyExtractor, (itm1, itm2) -> itm1)); + .setInstanceTypeMapping( + mergeLists( + i1.getInstanceTypeMapping(), i2.getInstanceTypeMapping(), 0, + MergeUtils::instanceTypeMappingKeyExtractor, (itm1, itm2) -> itm1)); i.setFulltext(selectFulltext(i1.getFulltext(), i2.getFulltext())); i.setDateofacceptance(selectOldestDate(i1.getDateofacceptance(), i2.getDateofacceptance())); i.setLicense(coalesce(i1.getLicense(), i2.getLicense())); i.setProcessingchargeamount(coalesce(i1.getProcessingchargeamount(), i2.getProcessingchargeamount())); i.setProcessingchargecurrency(coalesce(i1.getProcessingchargecurrency(), i2.getProcessingchargecurrency())); i - .setMeasures( - mergeLists(i1.getMeasures(), i2.getMeasures(), 0, MergeUtils::measureKeyExtractor, (m1, m2) -> m1)); + .setMeasures( + mergeLists(i1.getMeasures(), i2.getMeasures(), 0, MergeUtils::measureKeyExtractor, (m1, m2) -> m1)); i.setUrl(unionDistinctListOfString(i1.getUrl(), i2.getUrl())); @@ -685,14 +690,14 @@ public class MergeUtils { private static String measureKeyExtractor(Measure m) { return String - .join( - "::", - m.getId(), - m - .getUnit() - .stream() - .map(KeyValue::getKey) - .collect(Collectors.joining("::"))); + .join( + "::", + m.getId(), + m + .getUnit() + .stream() + .map(KeyValue::getKey) + .collect(Collectors.joining("::"))); } private static Field selectOldestDate(Field d1, Field d2) { @@ -703,16 +708,16 @@ public class MergeUtils { } return Stream - .of(d1, d2) - .min( - Comparator - .comparing( - f -> DateParserUtils - .parseDate(f.getValue()) - .toInstant() - .atZone(ZoneId.systemDefault()) - .toLocalDate())) - .orElse(d1); + .of(d1, d2) + .min( + Comparator + .comparing( + f -> DateParserUtils + .parseDate(f.getValue()) + .toInstant() + .atZone(ZoneId.systemDefault()) + .toLocalDate())) + .orElse(d1); } private static String selectFulltext(String ft1, String ft2) { @@ -727,12 +732,12 @@ public class MergeUtils { private static String instanceTypeMappingKeyExtractor(InstanceTypeMapping itm) { return String - .join( - "::", - itm.getOriginalType(), - itm.getTypeCode(), - itm.getTypeLabel(), - itm.getVocabularyName()); + .join( + "::", + itm.getOriginalType(), + itm.getTypeCode(), + itm.getTypeLabel(), + itm.getVocabularyName()); } private static String kvKeyExtractor(KeyValue kv) { @@ -749,13 +754,13 @@ public class MergeUtils { private static String spKeyExtractor(StructuredProperty sp) { return Optional - .ofNullable(sp) - .map( - s -> Joiner - .on("||") - .useForNull("") - .join(qualifierKeyExtractor(s.getQualifier()), s.getValue())) - .orElse(null); + .ofNullable(sp) + .map( + s -> Joiner + .on("||") + .useForNull("") + .join(qualifierKeyExtractor(s.getQualifier()), s.getValue())) + .orElse(null); } private static T mergeORP(T original, T enrich) { @@ -777,8 +782,8 @@ public class MergeUtils { merge.setLicense(unionDistinctLists(merge.getLicense(), enrich.getLicense(), trust)); merge.setCodeRepositoryUrl(chooseReference(merge.getCodeRepositoryUrl(), enrich.getCodeRepositoryUrl(), trust)); merge - .setProgrammingLanguage( - chooseReference(merge.getProgrammingLanguage(), enrich.getProgrammingLanguage(), trust)); + .setProgrammingLanguage( + chooseReference(merge.getProgrammingLanguage(), enrich.getProgrammingLanguage(), trust)); return merge; } @@ -792,11 +797,11 @@ public class MergeUtils { merge.setSize(chooseReference(merge.getSize(), enrich.getSize(), trust)); merge.setVersion(chooseReference(merge.getVersion(), enrich.getVersion(), trust)); merge - .setLastmetadataupdate( - chooseReference(merge.getLastmetadataupdate(), enrich.getLastmetadataupdate(), trust)); + .setLastmetadataupdate( + chooseReference(merge.getLastmetadataupdate(), enrich.getLastmetadataupdate(), trust)); merge - .setMetadataversionnumber( - chooseReference(merge.getMetadataversionnumber(), enrich.getMetadataversionnumber(), trust)); + .setMetadataversionnumber( + chooseReference(merge.getMetadataversionnumber(), enrich.getMetadataversionnumber(), trust)); merge.setGeolocation(unionDistinctLists(merge.getGeolocation(), enrich.getGeolocation(), trust)); return merge; @@ -818,26 +823,26 @@ public class MergeUtils { merged.setLegalshortname(chooseReference(merged.getLegalshortname(), enrich.getLegalshortname(), trust)); merged.setLegalname(chooseReference(merged.getLegalname(), enrich.getLegalname(), trust)); merged - .setAlternativeNames(unionDistinctLists(enrich.getAlternativeNames(), merged.getAlternativeNames(), trust)); + .setAlternativeNames(unionDistinctLists(enrich.getAlternativeNames(), merged.getAlternativeNames(), trust)); merged.setWebsiteurl(chooseReference(merged.getWebsiteurl(), enrich.getWebsiteurl(), trust)); merged.setLogourl(chooseReference(merged.getLogourl(), enrich.getLogourl(), trust)); merged.setEclegalbody(chooseReference(merged.getEclegalbody(), enrich.getEclegalbody(), trust)); merged.setEclegalperson(chooseReference(merged.getEclegalperson(), enrich.getEclegalperson(), trust)); merged.setEcnonprofit(chooseReference(merged.getEcnonprofit(), enrich.getEcnonprofit(), trust)); merged - .setEcresearchorganization( - chooseReference(merged.getEcresearchorganization(), enrich.getEcresearchorganization(), trust)); + .setEcresearchorganization( + chooseReference(merged.getEcresearchorganization(), enrich.getEcresearchorganization(), trust)); merged - .setEchighereducation(chooseReference(merged.getEchighereducation(), enrich.getEchighereducation(), trust)); + .setEchighereducation(chooseReference(merged.getEchighereducation(), enrich.getEchighereducation(), trust)); merged - .setEcinternationalorganizationeurinterests( - chooseReference( - merged.getEcinternationalorganizationeurinterests(), - enrich.getEcinternationalorganizationeurinterests(), trust)); + .setEcinternationalorganizationeurinterests( + chooseReference( + merged.getEcinternationalorganizationeurinterests(), + enrich.getEcinternationalorganizationeurinterests(), trust)); merged - .setEcinternationalorganization( - chooseReference( - merged.getEcinternationalorganization(), enrich.getEcinternationalorganization(), trust)); + .setEcinternationalorganization( + chooseReference( + merged.getEcinternationalorganization(), enrich.getEcinternationalorganization(), trust)); merged.setEcenterprise(chooseReference(merged.getEcenterprise(), enrich.getEcenterprise(), trust)); merged.setEcsmevalidated(chooseReference(merged.getEcsmevalidated(), enrich.getEcsmevalidated(), trust)); merged.setEcnutscode(chooseReference(merged.getEcnutscode(), enrich.getEcnutscode(), trust)); @@ -861,8 +866,8 @@ public class MergeUtils { merged.setDuration(chooseReference(merged.getDuration(), enrich.getDuration(), trust)); merged.setEcsc39(chooseReference(merged.getEcsc39(), enrich.getEcsc39(), trust)); merged - .setOamandatepublications( - chooseReference(merged.getOamandatepublications(), enrich.getOamandatepublications(), trust)); + .setOamandatepublications( + chooseReference(merged.getOamandatepublications(), enrich.getOamandatepublications(), trust)); merged.setEcarticle29_3(chooseReference(merged.getEcarticle29_3(), enrich.getEcarticle29_3(), trust)); merged.setSubjects(unionDistinctLists(merged.getSubjects(), enrich.getSubjects(), trust)); merged.setFundingtree(unionDistinctLists(merged.getFundingtree(), enrich.getFundingtree(), trust)); @@ -888,8 +893,8 @@ public class MergeUtils { } merged - .setH2020classification( - unionDistinctLists(merged.getH2020classification(), enrich.getH2020classification(), trust)); + .setH2020classification( + unionDistinctLists(merged.getH2020classification(), enrich.getH2020classification(), trust)); return merged; } @@ -916,7 +921,7 @@ public class MergeUtils { * @return list of instances possibly enriched */ private static List enrichInstances(final List toEnrichInstances, - final List enrichmentInstances) { + final List enrichmentInstances) { final List enrichmentResult = new ArrayList<>(); if (toEnrichInstances == null) { @@ -954,42 +959,42 @@ public class MergeUtils { */ private static Map toInstanceMap(final List ri) { return ri - .stream() - .filter(i -> i.getPid() != null || i.getAlternateIdentifier() != null) - .flatMap(i -> { - final List> result = new ArrayList<>(); - if (i.getPid() != null) - i - .getPid() - .stream() - .filter(MergeUtils::validPid) - .forEach(p -> result.add(new ImmutablePair<>(extractKeyFromPid(p), i))); - if (i.getAlternateIdentifier() != null) - i - .getAlternateIdentifier() - .stream() - .filter(MergeUtils::validPid) - .forEach(p -> result.add(new ImmutablePair<>(extractKeyFromPid(p), i))); - return result.stream(); - }) - .collect( - Collectors - .toMap( - Pair::getLeft, - Pair::getRight, - (a, b) -> a)); + .stream() + .filter(i -> i.getPid() != null || i.getAlternateIdentifier() != null) + .flatMap(i -> { + final List> result = new ArrayList<>(); + if (i.getPid() != null) + i + .getPid() + .stream() + .filter(MergeUtils::validPid) + .forEach(p -> result.add(new ImmutablePair<>(extractKeyFromPid(p), i))); + if (i.getAlternateIdentifier() != null) + i + .getAlternateIdentifier() + .stream() + .filter(MergeUtils::validPid) + .forEach(p -> result.add(new ImmutablePair<>(extractKeyFromPid(p), i))); + return result.stream(); + }) + .collect( + Collectors + .toMap( + Pair::getLeft, + Pair::getRight, + (a, b) -> a)); } private static boolean isFromDelegatedAuthority(Result r) { return Optional - .ofNullable(r.getInstance()) - .map( - instance -> instance - .stream() - .filter(i -> Objects.nonNull(i.getCollectedfrom())) - .map(i -> i.getCollectedfrom().getKey()) - .anyMatch(cfId -> IdentifierFactory.delegatedAuthorityDatasourceIds().contains(cfId))) - .orElse(false); + .ofNullable(r.getInstance()) + .map( + instance -> instance + .stream() + .filter(i -> Objects.nonNull(i.getCollectedfrom())) + .map(i -> i.getCollectedfrom().getKey()) + .anyMatch(cfId -> IdentifierFactory.delegatedAuthorityDatasourceIds().contains(cfId))) + .orElse(false); } /** @@ -1025,15 +1030,15 @@ public class MergeUtils { * @return the list */ private static List findEnrichmentsByPID(final List pids, - final Map enrichments) { + final Map enrichments) { if (pids == null || enrichments == null) return null; return pids - .stream() - .map(MergeUtils::extractKeyFromPid) - .map(enrichments::get) - .filter(Objects::nonNull) - .collect(Collectors.toList()); + .stream() + .map(MergeUtils::extractKeyFromPid) + .map(enrichments::get) + .filter(Objects::nonNull) + .collect(Collectors.toList()); } /** @@ -1044,8 +1049,8 @@ public class MergeUtils { */ private static boolean isAnEnrichment(OafEntity e) { return e.getDataInfo() != null && - e.getDataInfo().getProvenanceaction() != null - && ModelConstants.PROVENANCE_ENRICH.equalsIgnoreCase(e.getDataInfo().getProvenanceaction().getClassid()); + e.getDataInfo().getProvenanceaction() != null + && ModelConstants.PROVENANCE_ENRICH.equalsIgnoreCase(e.getDataInfo().getProvenanceaction().getClassid()); } /** @@ -1068,17 +1073,17 @@ public class MergeUtils { merge.setHostedby(firstNonNull(merge.getHostedby(), enrichment.getHostedby())); merge.setUrl(unionDistinctLists(merge.getUrl(), enrichment.getUrl(), 0)); merge - .setDistributionlocation( - firstNonNull(merge.getDistributionlocation(), enrichment.getDistributionlocation())); + .setDistributionlocation( + firstNonNull(merge.getDistributionlocation(), enrichment.getDistributionlocation())); merge.setCollectedfrom(firstNonNull(merge.getCollectedfrom(), enrichment.getCollectedfrom())); // pid and alternateId are used for matching merge.setDateofacceptance(firstNonNull(merge.getDateofacceptance(), enrichment.getDateofacceptance())); merge - .setProcessingchargeamount( - firstNonNull(merge.getProcessingchargeamount(), enrichment.getProcessingchargeamount())); + .setProcessingchargeamount( + firstNonNull(merge.getProcessingchargeamount(), enrichment.getProcessingchargeamount())); merge - .setProcessingchargecurrency( - firstNonNull(merge.getProcessingchargecurrency(), enrichment.getProcessingchargecurrency())); + .setProcessingchargecurrency( + firstNonNull(merge.getProcessingchargecurrency(), enrichment.getProcessingchargecurrency())); merge.setRefereed(firstNonNull(merge.getRefereed(), enrichment.getRefereed())); merge.setMeasures(unionDistinctLists(merge.getMeasures(), enrichment.getMeasures(), 0)); merge.setFulltext(firstNonNull(merge.getFulltext(), enrichment.getFulltext())); @@ -1086,14 +1091,14 @@ public class MergeUtils { private static int compareTrust(Oaf a, Oaf b) { String left = Optional - .ofNullable(a.getDataInfo()) - .map(DataInfo::getTrust) - .orElse("0.0"); + .ofNullable(a.getDataInfo()) + .map(DataInfo::getTrust) + .orElse("0.0"); String right = Optional - .ofNullable(b.getDataInfo()) - .map(DataInfo::getTrust) - .orElse("0.0"); + .ofNullable(b.getDataInfo()) + .map(DataInfo::getTrust) + .orElse("0.0"); return left.compareTo(right); } From 03c262ccb981bed4d6351705026e699963c9e4fc Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Wed, 13 Nov 2024 10:56:17 +0100 Subject: [PATCH 10/15] Crossref: generate canonical openaire id for results in affiliation relationship --- .gitignore | 1 + .../eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 6fafc70555..ef9144ae33 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,4 @@ spark-warehouse /**/.scalafmt.conf /.java-version /dhp-shade-package/dependency-reduced-pom.xml +/**/job.properties diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala index e7d68920b8..d3a68c92ee 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala @@ -673,11 +673,12 @@ case object Crossref2Oaf { val doi = input.getString(0) val rorId = input.getString(1) - val pubId = s"50|${PidType.doi.toString.padTo(12, "_")}::${DoiCleaningRule.clean(doi)}" + + val pubId = IdentifierFactory.idFromPid("50", "doi", DoiCleaningRule.clean(doi), true) val affId = GenerateRorActionSetJob.calculateOpenaireId(rorId) val r: Relation = new Relation - DoiCleaningRule.clean(doi) + r.setSource(pubId) r.setTarget(affId) r.setRelType(ModelConstants.RESULT_ORGANIZATION) From fb1f0f8850b867f758fffdf9751ec9e4d2543db5 Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Thu, 7 Nov 2024 14:05:02 +0100 Subject: [PATCH 11/15] [danishfunders] added the possibility to link also versus a specif award if present in the metadata --- .../collection/crossref/Crossref2Oaf.scala | 21 ++++++++++++++++++- .../doiboost/crossref/Crossref2Oaf.scala | 3 +++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala index e7d68920b8..59a12bc032 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala @@ -978,7 +978,26 @@ case object Crossref2Oaf { case "10.13039/501100010790" => generateSimpleRelationFromAward(funder, "erasmusplus_", a => a) case _ => logger.debug("no match for " + funder.DOI.get) - + //Add for Danish funders + //Independent Research Fund Denmark (IRFD) + case "10.13039/501100004836" => + generateSimpleRelationFromAward(funder, "irfd________", a => a) + val targetId = getProjectId("irfd________", "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) + queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) + //Carlsberg Foundation (CF) + case "10.13039/501100002808" => + generateSimpleRelationFromAward(funder, "cf__________", a => a) + val targetId = getProjectId("cf__________", "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) + queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) + //Novo Nordisk Foundation (NNF) + case "10.13039/501100009708" => + generateSimpleRelationFromAward(funder, "nnf___________", a => a) + val targetId = getProjectId("nnf_________", "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) + queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) + case _ => logger.debug("no match for " + funder.DOI.get) } } else { diff --git a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala index bf11ed0a8c..031a04058d 100644 --- a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala @@ -569,16 +569,19 @@ case object Crossref2Oaf { //Add for Danish funders //Independent Research Fund Denmark (IRFD) case "10.13039/501100004836" => + generateSimpleRelationFromAward(funder, "irfd________", a => a) val targetId = getProjectId("irfd________", "1e5e62235d094afd01cd56e65112fc63") queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) //Carlsberg Foundation (CF) case "10.13039/501100002808" => + generateSimpleRelationFromAward(funder, "cf__________", a => a) val targetId = getProjectId("cf__________", "1e5e62235d094afd01cd56e65112fc63") queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) //Novo Nordisk Foundation (NNF) case "10.13039/501100009708" => + generateSimpleRelationFromAward(funder, "nnf___________", a => a) val targetId = getProjectId("nnf_________", "1e5e62235d094afd01cd56e65112fc63") queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) From 4a3b173ca2d917c52de1671c352d1296ac211736 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 13 Nov 2024 16:27:00 +0100 Subject: [PATCH 12/15] defaults to 0000 - Unknown in case the instance type lookup in the dnet:result_typologies doesn't find a corresponding result type binding --- .../dhp/schema/oaf/utils/MergeUtils.java | 12 +--- .../raw/AbstractMdRecordToOafMapper.java | 55 ++++++++++--------- .../dhp/oa/graph/raw/OafToOafMapper.java | 4 +- .../dhp/oa/graph/raw/OdfToOafMapper.java | 4 +- 4 files changed, 36 insertions(+), 39 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java index dc76860f81..c9b235fd61 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java @@ -74,15 +74,9 @@ public class MergeUtils { if (!vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) { return (T) mergedResult; } else { - final Qualifier expectedResultType = vocs.lookupTermBySynonym( - ModelConstants.DNET_RESULT_TYPOLOGIES, - i.getInstancetype().getClassid()); - - if (Objects.isNull(expectedResultType)) { - throw new IllegalArgumentException( - "instance type not bound to any result type in dnet:result_typologies: " + - i.getInstancetype().getClassid()); - } + final Qualifier expectedResultType = Optional + .ofNullable(vocs.lookupTermBySynonym(ModelConstants.DNET_RESULT_TYPOLOGIES, i.getInstancetype().getClassid())) + .orElse(OafMapperUtils.unknown(ModelConstants.DNET_RESULT_TYPOLOGIES, ModelConstants.DNET_RESULT_TYPOLOGIES)); // there is a clash among the result types if (!expectedResultType.getClassid().equals(mergedResult.getResulttype().getClassid())) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index ba6887a2e9..be84778f52 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -153,30 +153,33 @@ public abstract class AbstractMdRecordToOafMapper { final DataInfo entityInfo = prepareDataInfo(doc, this.invisible); final long lastUpdateTimestamp = new Date().getTime(); - final List instances = prepareInstances(doc, entityInfo, collectedFrom, hostedBy); + final Instance instance = prepareInstances(doc, entityInfo, collectedFrom, hostedBy); - final String type = getResultType(instances); + if (!Optional.ofNullable(instance.getInstancetype()) + .map(Qualifier::getClassid) + .filter(StringUtils::isNotBlank) + .isPresent()) { + return Lists.newArrayList(); + } - return createOafs(doc, type, instances, collectedFrom, entityInfo, lastUpdateTimestamp); + final String type = getResultType(instance); + + return createOafs(doc, type, instance, collectedFrom, entityInfo, lastUpdateTimestamp); } catch (final DocumentException e) { log.error("Error with record:\n" + xml); return Lists.newArrayList(); } } - protected String getResultType(final List instances) { - + protected String getResultType(final Instance instance) { if (this.vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) { - final String instanceType = instances - .stream() - .map(i -> i.getInstancetype().getClassid()) - .findFirst() - .filter(s -> !UNKNOWN.equalsIgnoreCase(s)) - .orElse("0000"); // Unknown - return Optional - .ofNullable(this.vocs.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, instanceType)) - .map(Qualifier::getClassid) - .orElse("0000"); + return Optional.ofNullable(instance.getInstancetype()) + .map(Qualifier::getClassid) + .map(instanceType -> Optional + .ofNullable(this.vocs.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, instanceType)) + .map(Qualifier::getClassid) + .orElse("0000")) + .orElse("0000"); } else { throw new IllegalStateException("Missing vocabulary: " + ModelConstants.DNET_RESULT_TYPOLOGIES); } @@ -196,12 +199,12 @@ public abstract class AbstractMdRecordToOafMapper { protected List createOafs( final Document doc, final String type, - final List instances, + final Instance instance, final KeyValue collectedFrom, final DataInfo info, final long lastUpdateTimestamp) { - final OafEntity entity = createEntity(doc, type, instances, collectedFrom, info, lastUpdateTimestamp); + final OafEntity entity = createEntity(doc, type, instance, collectedFrom, info, lastUpdateTimestamp); final Set originalId = Sets.newHashSet(entity.getOriginalId()); originalId.add(entity.getId()); @@ -234,19 +237,19 @@ public abstract class AbstractMdRecordToOafMapper { private OafEntity createEntity(final Document doc, final String type, - final List instances, + final Instance instance, final KeyValue collectedFrom, final DataInfo info, final long lastUpdateTimestamp) { switch (type.toLowerCase()) { case "publication": final Publication p = new Publication(); - populateResultFields(p, doc, instances, collectedFrom, info, lastUpdateTimestamp); + populateResultFields(p, doc, instance, collectedFrom, info, lastUpdateTimestamp); p.setJournal(prepareJournal(doc, info)); return p; case "dataset": final Dataset d = new Dataset(); - populateResultFields(d, doc, instances, collectedFrom, info, lastUpdateTimestamp); + populateResultFields(d, doc, instance, collectedFrom, info, lastUpdateTimestamp); d.setStoragedate(prepareDatasetStorageDate(doc, info)); d.setDevice(prepareDatasetDevice(doc, info)); d.setSize(prepareDatasetSize(doc, info)); @@ -257,7 +260,7 @@ public abstract class AbstractMdRecordToOafMapper { return d; case "software": final Software s = new Software(); - populateResultFields(s, doc, instances, collectedFrom, info, lastUpdateTimestamp); + populateResultFields(s, doc, instance, collectedFrom, info, lastUpdateTimestamp); s.setDocumentationUrl(prepareSoftwareDocumentationUrls(doc, info)); s.setLicense(prepareSoftwareLicenses(doc, info)); s.setCodeRepositoryUrl(prepareSoftwareCodeRepositoryUrl(doc, info)); @@ -267,7 +270,7 @@ public abstract class AbstractMdRecordToOafMapper { case "otherresearchproducts": default: final OtherResearchProduct o = new OtherResearchProduct(); - populateResultFields(o, doc, instances, collectedFrom, info, lastUpdateTimestamp); + populateResultFields(o, doc, instance, collectedFrom, info, lastUpdateTimestamp); o.setContactperson(prepareOtherResearchProductContactPersons(doc, info)); o.setContactgroup(prepareOtherResearchProductContactGroups(doc, info)); o.setTool(prepareOtherResearchProductTools(doc, info)); @@ -414,7 +417,7 @@ public abstract class AbstractMdRecordToOafMapper { private void populateResultFields( final Result r, final Document doc, - final List instances, + final Instance instance, final KeyValue collectedFrom, final DataInfo info, final long lastUpdateTimestamp) { @@ -448,8 +451,8 @@ public abstract class AbstractMdRecordToOafMapper { r.setExternalReference(new ArrayList<>()); // NOT PRESENT IN MDSTORES r.setProcessingchargeamount(field(doc.valueOf("//oaf:processingchargeamount"), info)); r.setProcessingchargecurrency(field(doc.valueOf("//oaf:processingchargeamount/@currency"), info)); - r.setInstance(instances); - r.setBestaccessright(OafMapperUtils.createBestAccessRights(instances)); + r.setInstance(Arrays.asList(instance)); + r.setBestaccessright(OafMapperUtils.createBestAccessRights(Arrays.asList(instance))); r.setEoscifguidelines(prepareEOSCIfGuidelines(doc, info)); } @@ -508,7 +511,7 @@ public abstract class AbstractMdRecordToOafMapper { protected abstract Qualifier prepareResourceType(Document doc, DataInfo info); - protected abstract List prepareInstances( + protected abstract Instance prepareInstances( Document doc, DataInfo info, KeyValue collectedfrom, diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java index 98da48f9e6..33351e91f1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java @@ -135,7 +135,7 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper { } @Override - protected List prepareInstances( + protected Instance prepareInstances( final Document doc, final DataInfo info, final KeyValue collectedfrom, @@ -197,7 +197,7 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper { instance.getUrl().addAll(validUrl); } - return Lists.newArrayList(instance); + return instance; } /** diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java index ad61304a08..a811aad467 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java @@ -126,7 +126,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper { } @Override - protected List prepareInstances( + protected Instance prepareInstances( final Document doc, final DataInfo info, final KeyValue collectedfrom, @@ -210,7 +210,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper { instance.setUrl(new ArrayList<>()); instance.getUrl().addAll(validUrl); } - return Arrays.asList(instance); + return instance; } protected String trimAndDecodeUrl(String url) { From b95672b4204667f1b011a7b6ed281b7fcbb3525c Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 15 Nov 2024 09:16:18 +0100 Subject: [PATCH 13/15] mergeUtils set the result identifier when enforcing the result type --- .../dhp/schema/oaf/utils/MergeUtils.java | 451 +++++++++--------- 1 file changed, 229 insertions(+), 222 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java index c9b235fd61..c092f60355 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java @@ -16,8 +16,6 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; -import eu.dnetlib.dhp.schema.common.EntityType; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -25,8 +23,10 @@ import org.apache.commons.lang3.tuple.Pair; import com.github.sisyphsu.dateparser.DateParserUtils; import com.google.common.base.Joiner; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.schema.common.AccessRightComparator; +import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; @@ -46,7 +46,7 @@ public class MergeUtils { } public static T mergeGroup(Iterator oafEntityIterator, - boolean checkDelegateAuthority, VocabularyGroup vocs) { + boolean checkDelegateAuthority, VocabularyGroup vocs) { ArrayList sortedEntities = new ArrayList<>(); oafEntityIterator.forEachRemaining(sortedEntities::add); @@ -74,22 +74,29 @@ public class MergeUtils { if (!vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) { return (T) mergedResult; } else { - final Qualifier expectedResultType = Optional - .ofNullable(vocs.lookupTermBySynonym(ModelConstants.DNET_RESULT_TYPOLOGIES, i.getInstancetype().getClassid())) - .orElse(OafMapperUtils.unknown(ModelConstants.DNET_RESULT_TYPOLOGIES, ModelConstants.DNET_RESULT_TYPOLOGIES)); + final String expectedResultType = Optional + .ofNullable( + vocs + .lookupTermBySynonym( + ModelConstants.DNET_RESULT_TYPOLOGIES, i.getInstancetype().getClassid())) + .orElse(ModelConstants.ORP_DEFAULT_RESULTTYPE) + .getClassid(); // there is a clash among the result types - if (!expectedResultType.getClassid().equals(mergedResult.getResulttype().getClassid())) { - try { - String resulttype = expectedResultType.getClassid(); - if (EntityType.otherresearchproduct.toString().equals(resulttype)) { - resulttype = "other"; - } - Result result = (Result) ModelSupport.oafTypes.get(resulttype).newInstance(); - return (T) mergeResultFields(result, mergedResult); - } catch (InstantiationException | IllegalAccessException e) { - throw new IllegalStateException(e); - } + if (!expectedResultType.equals(mergedResult.getResulttype().getClassid())) { + + Result result = (Result) Optional + .ofNullable(ModelSupport.oafTypes.get(expectedResultType)) + .map(r -> { + try { + return r.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new IllegalStateException(e); + } + }) + .orElse(new OtherResearchProduct()); + result.setId(mergedResult.getId()); + return (T) mergeResultFields(result, mergedResult); } else { return (T) mergedResult; } @@ -116,10 +123,10 @@ public class MergeUtils { return mergeRelation((Relation) left, (Relation) right); } else { throw new RuntimeException( - String - .format( - "MERGE_FROM_AND_GET incompatible types: %s, %s", - left.getClass().getCanonicalName(), right.getClass().getCanonicalName())); + String + .format( + "MERGE_FROM_AND_GET incompatible types: %s, %s", + left.getClass().getCanonicalName(), right.getClass().getCanonicalName())); } } @@ -158,10 +165,10 @@ public class MergeUtils { return mergeProject((Project) left, (Project) right); } else { throw new RuntimeException( - String - .format( - "MERGE_FROM_AND_GET incompatible types: %s, %s", - left.getClass().getCanonicalName(), right.getClass().getCanonicalName())); + String + .format( + "MERGE_FROM_AND_GET incompatible types: %s, %s", + left.getClass().getCanonicalName(), right.getClass().getCanonicalName())); } } @@ -252,7 +259,7 @@ public class MergeUtils { } private static List mergeLists(final List left, final List right, int trust, - Function keyExtractor, BinaryOperator merger) { + Function keyExtractor, BinaryOperator merger) { if (left == null || left.isEmpty()) { return right != null ? right : new ArrayList<>(); } else if (right == null || right.isEmpty()) { @@ -263,11 +270,11 @@ public class MergeUtils { List l = trust >= 0 ? right : left; return new ArrayList<>(Stream - .concat(h.stream(), l.stream()) - .filter(Objects::nonNull) - .distinct() - .collect(Collectors.toMap(keyExtractor, v -> v, merger, LinkedHashMap::new)) - .values()); + .concat(h.stream(), l.stream()) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toMap(keyExtractor, v -> v, merger, LinkedHashMap::new)) + .values()); } private static List unionDistinctLists(final List left, final List right, int trust) { @@ -281,10 +288,10 @@ public class MergeUtils { List l = trust >= 0 ? right : left; return Stream - .concat(h.stream(), l.stream()) - .filter(Objects::nonNull) - .distinct() - .collect(Collectors.toList()); + .concat(h.stream(), l.stream()) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toList()); } private static List unionDistinctListOfString(final List l, final List r) { @@ -295,10 +302,10 @@ public class MergeUtils { } return Stream - .concat(l.stream(), r.stream()) - .filter(StringUtils::isNotBlank) - .distinct() - .collect(Collectors.toList()); + .concat(l.stream(), r.stream()) + .filter(StringUtils::isNotBlank) + .distinct() + .collect(Collectors.toList()); } // TODO review @@ -324,7 +331,7 @@ public class MergeUtils { } private static List unionTitle(List left, List right, - int trust) { + int trust) { if (left == null) { return right; } else if (right == null) { @@ -335,10 +342,10 @@ public class MergeUtils { List l = trust >= 0 ? right : left; return Stream - .concat(h.stream(), l.stream()) - .filter(Objects::isNull) - .distinct() - .collect(Collectors.toList()); + .concat(h.stream(), l.stream()) + .filter(Objects::isNull) + .distinct() + .collect(Collectors.toList()); } /** @@ -373,8 +380,8 @@ public class MergeUtils { merged.setPid(mergeLists(merged.getPid(), enrich.getPid(), trust, MergeUtils::spKeyExtractor, (p1, p2) -> p1)); merged.setDateofcollection(LocalDateTime.now().toString()); merged - .setDateoftransformation( - chooseString(merged.getDateoftransformation(), enrich.getDateoftransformation(), trust)); + .setDateoftransformation( + chooseString(merged.getDateoftransformation(), enrich.getDateoftransformation(), trust)); merged.setExtraInfo(unionDistinctLists(merged.getExtraInfo(), enrich.getExtraInfo(), trust)); // When merging records OAI provenance becomes null merged.setOaiprovenance(null); @@ -391,7 +398,7 @@ public class MergeUtils { checkArgument(Objects.equals(merge.getTarget(), enrich.getTarget()), "target ids must be equal"); checkArgument(Objects.equals(merge.getRelType(), enrich.getRelType()), "relType(s) must be equal"); checkArgument( - Objects.equals(merge.getSubRelType(), enrich.getSubRelType()), "subRelType(s) must be equal"); + Objects.equals(merge.getSubRelType(), enrich.getSubRelType()), "subRelType(s) must be equal"); checkArgument(Objects.equals(merge.getRelClass(), enrich.getRelClass()), "relClass(es) must be equal"); // merge.setProvenance(mergeLists(merge.getProvenance(), enrich.getProvenance())); @@ -402,10 +409,10 @@ public class MergeUtils { merge.setValidationDate(ModelSupport.oldest(merge.getValidationDate(), enrich.getValidationDate())); } catch (ParseException e) { throw new IllegalArgumentException(String - .format( - "invalid validation date format in relation [s:%s, t:%s]: %s", merge.getSource(), - merge.getTarget(), - merge.getValidationDate())); + .format( + "invalid validation date format in relation [s:%s, t:%s]: %s", merge.getSource(), + merge.getTarget(), + merge.getValidationDate())); } // TODO keyvalue merge @@ -419,7 +426,7 @@ public class MergeUtils { T merge = mergeOafEntityFields(original, enrich, trust); if (merge.getProcessingchargeamount() == null - || StringUtils.isBlank(merge.getProcessingchargeamount().getValue())) { + || StringUtils.isBlank(merge.getProcessingchargeamount().getValue())) { merge.setProcessingchargeamount(enrich.getProcessingchargeamount()); merge.setProcessingchargecurrency(enrich.getProcessingchargecurrency()); } @@ -451,8 +458,8 @@ public class MergeUtils { } merge - .setDateofacceptance( - mergeDateOfAcceptance(merge.getDateofacceptance(), enrich.getDateofacceptance(), trust)); + .setDateofacceptance( + mergeDateOfAcceptance(merge.getDateofacceptance(), enrich.getDateofacceptance(), trust)); merge.setPublisher(coalesce(merge.getPublisher(), enrich.getPublisher())); merge.setEmbargoenddate(coalesce(merge.getEmbargoenddate(), enrich.getEmbargoenddate())); @@ -467,7 +474,7 @@ public class MergeUtils { merge.setCoverage(unionDistinctLists(merge.getCoverage(), enrich.getCoverage(), trust)); if (enrich.getBestaccessright() != null - && new AccessRightComparator<>() + && new AccessRightComparator<>() .compare(enrich.getBestaccessright(), merge.getBestaccessright()) < 0) { merge.setBestaccessright(enrich.getBestaccessright()); } @@ -480,8 +487,8 @@ public class MergeUtils { // ok merge - .setExternalReference( - mergeExternalReference(merge.getExternalReference(), enrich.getExternalReference(), trust)); + .setExternalReference( + mergeExternalReference(merge.getExternalReference(), enrich.getExternalReference(), trust)); // instance enrichment or union // review instance equals => add pid to comparision @@ -489,17 +496,17 @@ public class MergeUtils { merge.setInstance(mergeInstances(merge.getInstance(), enrich.getInstance(), trust)); } else { final List enrichmentInstances = isAnEnrichment(merge) ? merge.getInstance() - : enrich.getInstance(); + : enrich.getInstance(); final List enrichedInstances = isAnEnrichment(merge) ? enrich.getInstance() - : merge.getInstance(); + : merge.getInstance(); if (isAnEnrichment(merge)) merge.setDataInfo(enrich.getDataInfo()); merge.setInstance(enrichInstances(enrichedInstances, enrichmentInstances)); } merge - .setEoscifguidelines( - mergeEosciifguidelines(merge.getEoscifguidelines(), enrich.getEoscifguidelines(), trust)); + .setEoscifguidelines( + mergeEosciifguidelines(merge.getEoscifguidelines(), enrich.getEoscifguidelines(), trust)); merge.setIsGreen(booleanOR(merge.getIsGreen(), enrich.getIsGreen())); // OK but should be list of values merge.setOpenAccessColor(coalesce(merge.getOpenAccessColor(), enrich.getOpenAccessColor())); @@ -525,7 +532,7 @@ public class MergeUtils { LocalDate enrich_date = LocalDate.parse(enrich.getValue(), DateTimeFormatter.ISO_DATE); if (enrich_date.getYear() > 1300 - && (merge_date.getYear() < 1300 || merge_date.isAfter(enrich_date))) { + && (merge_date.getYear() < 1300 || merge_date.isAfter(enrich_date))) { return enrich; } } catch (NullPointerException | DateTimeParseException e) { @@ -543,56 +550,56 @@ public class MergeUtils { private static List mergeInstances(List v1, List v2, int trust) { return mergeLists( - v1, v2, trust, - MergeUtils::instanceKeyExtractor, - MergeUtils::instanceMerger); + v1, v2, trust, + MergeUtils::instanceKeyExtractor, + MergeUtils::instanceMerger); } private static List mergeEosciifguidelines(List v1, List v2, - int trust) { + int trust) { return mergeLists( - v1, v2, trust, er -> Joiner - .on("||") - .useForNull("") - .join(er.getCode(), er.getLabel(), er.getUrl(), er.getSemanticRelation()), - (r, l) -> r); + v1, v2, trust, er -> Joiner + .on("||") + .useForNull("") + .join(er.getCode(), er.getLabel(), er.getUrl(), er.getSemanticRelation()), + (r, l) -> r); } private static List mergeExternalReference(List v1, - List v2, int trust) { + List v2, int trust) { return mergeLists( - v1, v2, trust, er -> Joiner - .on(',') - .useForNull("") - .join( - er.getSitename(), er.getLabel(), - er.getUrl(), toString(er.getQualifier()), er.getRefidentifier(), - er.getQuery(), toString(er.getDataInfo())), - (r, l) -> r); + v1, v2, trust, er -> Joiner + .on(',') + .useForNull("") + .join( + er.getSitename(), er.getLabel(), + er.getUrl(), toString(er.getQualifier()), er.getRefidentifier(), + er.getQuery(), toString(er.getDataInfo())), + (r, l) -> r); } private static String toString(DataInfo di) { return Joiner - .on(',') - .useForNull("") - .join( - di.getInvisible(), di.getInferred(), di.getDeletedbyinference(), di.getTrust(), - di.getInferenceprovenance(), toString(di.getProvenanceaction())); + .on(',') + .useForNull("") + .join( + di.getInvisible(), di.getInferred(), di.getDeletedbyinference(), di.getTrust(), + di.getInferenceprovenance(), toString(di.getProvenanceaction())); } private static String toString(Qualifier q) { return Joiner - .on(',') - .useForNull("") - .join(q.getClassid(), q.getClassname(), q.getSchemeid(), q.getSchemename()); + .on(',') + .useForNull("") + .join(q.getClassid(), q.getClassname(), q.getSchemeid(), q.getSchemename()); } private static String toString(StructuredProperty sp) { return Joiner - .on(',') - .useForNull("") - .join(toString(sp.getQualifier()), sp.getValue()); + .on(',') + .useForNull("") + .join(toString(sp.getQualifier()), sp.getValue()); } private static List mergeStructuredProperties(List v1, List v2, int trust) { @@ -631,17 +638,17 @@ public class MergeUtils { // 2. @@ // 3. || return String - .join( - "::", - kvKeyExtractor(i.getHostedby()), - kvKeyExtractor(i.getCollectedfrom()), - qualifierKeyExtractor(i.getAccessright()), - qualifierKeyExtractor(i.getInstancetype()), - Optional.ofNullable(i.getUrl()).map(u -> String.join("@@", u)).orElse(null), - Optional - .ofNullable(i.getPid()) - .map(pp -> pp.stream().map(MergeUtils::spKeyExtractor).collect(Collectors.joining("@@"))) - .orElse(null)); + .join( + "::", + kvKeyExtractor(i.getHostedby()), + kvKeyExtractor(i.getCollectedfrom()), + qualifierKeyExtractor(i.getAccessright()), + qualifierKeyExtractor(i.getInstancetype()), + Optional.ofNullable(i.getUrl()).map(u -> String.join("@@", u)).orElse(null), + Optional + .ofNullable(i.getPid()) + .map(pp -> pp.stream().map(MergeUtils::spKeyExtractor).collect(Collectors.joining("@@"))) + .orElse(null)); } private static Instance instanceMerger(Instance i1, Instance i2) { @@ -652,30 +659,30 @@ public class MergeUtils { i.setInstancetype(i1.getInstancetype()); i.setPid(mergeLists(i1.getPid(), i2.getPid(), 0, MergeUtils::spKeyExtractor, (sp1, sp2) -> sp1)); i - .setAlternateIdentifier( - mergeLists( - i1.getAlternateIdentifier(), i2.getAlternateIdentifier(), 0, MergeUtils::spKeyExtractor, - (sp1, sp2) -> sp1)); + .setAlternateIdentifier( + mergeLists( + i1.getAlternateIdentifier(), i2.getAlternateIdentifier(), 0, MergeUtils::spKeyExtractor, + (sp1, sp2) -> sp1)); i - .setRefereed( - Collections - .min( - Stream.of(i1.getRefereed(), i2.getRefereed()).collect(Collectors.toList()), - new RefereedComparator())); + .setRefereed( + Collections + .min( + Stream.of(i1.getRefereed(), i2.getRefereed()).collect(Collectors.toList()), + new RefereedComparator())); i - .setInstanceTypeMapping( - mergeLists( - i1.getInstanceTypeMapping(), i2.getInstanceTypeMapping(), 0, - MergeUtils::instanceTypeMappingKeyExtractor, (itm1, itm2) -> itm1)); + .setInstanceTypeMapping( + mergeLists( + i1.getInstanceTypeMapping(), i2.getInstanceTypeMapping(), 0, + MergeUtils::instanceTypeMappingKeyExtractor, (itm1, itm2) -> itm1)); i.setFulltext(selectFulltext(i1.getFulltext(), i2.getFulltext())); i.setDateofacceptance(selectOldestDate(i1.getDateofacceptance(), i2.getDateofacceptance())); i.setLicense(coalesce(i1.getLicense(), i2.getLicense())); i.setProcessingchargeamount(coalesce(i1.getProcessingchargeamount(), i2.getProcessingchargeamount())); i.setProcessingchargecurrency(coalesce(i1.getProcessingchargecurrency(), i2.getProcessingchargecurrency())); i - .setMeasures( - mergeLists(i1.getMeasures(), i2.getMeasures(), 0, MergeUtils::measureKeyExtractor, (m1, m2) -> m1)); + .setMeasures( + mergeLists(i1.getMeasures(), i2.getMeasures(), 0, MergeUtils::measureKeyExtractor, (m1, m2) -> m1)); i.setUrl(unionDistinctListOfString(i1.getUrl(), i2.getUrl())); @@ -684,14 +691,14 @@ public class MergeUtils { private static String measureKeyExtractor(Measure m) { return String - .join( - "::", - m.getId(), - m - .getUnit() - .stream() - .map(KeyValue::getKey) - .collect(Collectors.joining("::"))); + .join( + "::", + m.getId(), + m + .getUnit() + .stream() + .map(KeyValue::getKey) + .collect(Collectors.joining("::"))); } private static Field selectOldestDate(Field d1, Field d2) { @@ -702,16 +709,16 @@ public class MergeUtils { } return Stream - .of(d1, d2) - .min( - Comparator - .comparing( - f -> DateParserUtils - .parseDate(f.getValue()) - .toInstant() - .atZone(ZoneId.systemDefault()) - .toLocalDate())) - .orElse(d1); + .of(d1, d2) + .min( + Comparator + .comparing( + f -> DateParserUtils + .parseDate(f.getValue()) + .toInstant() + .atZone(ZoneId.systemDefault()) + .toLocalDate())) + .orElse(d1); } private static String selectFulltext(String ft1, String ft2) { @@ -726,12 +733,12 @@ public class MergeUtils { private static String instanceTypeMappingKeyExtractor(InstanceTypeMapping itm) { return String - .join( - "::", - itm.getOriginalType(), - itm.getTypeCode(), - itm.getTypeLabel(), - itm.getVocabularyName()); + .join( + "::", + itm.getOriginalType(), + itm.getTypeCode(), + itm.getTypeLabel(), + itm.getVocabularyName()); } private static String kvKeyExtractor(KeyValue kv) { @@ -748,13 +755,13 @@ public class MergeUtils { private static String spKeyExtractor(StructuredProperty sp) { return Optional - .ofNullable(sp) - .map( - s -> Joiner - .on("||") - .useForNull("") - .join(qualifierKeyExtractor(s.getQualifier()), s.getValue())) - .orElse(null); + .ofNullable(sp) + .map( + s -> Joiner + .on("||") + .useForNull("") + .join(qualifierKeyExtractor(s.getQualifier()), s.getValue())) + .orElse(null); } private static T mergeORP(T original, T enrich) { @@ -776,8 +783,8 @@ public class MergeUtils { merge.setLicense(unionDistinctLists(merge.getLicense(), enrich.getLicense(), trust)); merge.setCodeRepositoryUrl(chooseReference(merge.getCodeRepositoryUrl(), enrich.getCodeRepositoryUrl(), trust)); merge - .setProgrammingLanguage( - chooseReference(merge.getProgrammingLanguage(), enrich.getProgrammingLanguage(), trust)); + .setProgrammingLanguage( + chooseReference(merge.getProgrammingLanguage(), enrich.getProgrammingLanguage(), trust)); return merge; } @@ -791,11 +798,11 @@ public class MergeUtils { merge.setSize(chooseReference(merge.getSize(), enrich.getSize(), trust)); merge.setVersion(chooseReference(merge.getVersion(), enrich.getVersion(), trust)); merge - .setLastmetadataupdate( - chooseReference(merge.getLastmetadataupdate(), enrich.getLastmetadataupdate(), trust)); + .setLastmetadataupdate( + chooseReference(merge.getLastmetadataupdate(), enrich.getLastmetadataupdate(), trust)); merge - .setMetadataversionnumber( - chooseReference(merge.getMetadataversionnumber(), enrich.getMetadataversionnumber(), trust)); + .setMetadataversionnumber( + chooseReference(merge.getMetadataversionnumber(), enrich.getMetadataversionnumber(), trust)); merge.setGeolocation(unionDistinctLists(merge.getGeolocation(), enrich.getGeolocation(), trust)); return merge; @@ -817,26 +824,26 @@ public class MergeUtils { merged.setLegalshortname(chooseReference(merged.getLegalshortname(), enrich.getLegalshortname(), trust)); merged.setLegalname(chooseReference(merged.getLegalname(), enrich.getLegalname(), trust)); merged - .setAlternativeNames(unionDistinctLists(enrich.getAlternativeNames(), merged.getAlternativeNames(), trust)); + .setAlternativeNames(unionDistinctLists(enrich.getAlternativeNames(), merged.getAlternativeNames(), trust)); merged.setWebsiteurl(chooseReference(merged.getWebsiteurl(), enrich.getWebsiteurl(), trust)); merged.setLogourl(chooseReference(merged.getLogourl(), enrich.getLogourl(), trust)); merged.setEclegalbody(chooseReference(merged.getEclegalbody(), enrich.getEclegalbody(), trust)); merged.setEclegalperson(chooseReference(merged.getEclegalperson(), enrich.getEclegalperson(), trust)); merged.setEcnonprofit(chooseReference(merged.getEcnonprofit(), enrich.getEcnonprofit(), trust)); merged - .setEcresearchorganization( - chooseReference(merged.getEcresearchorganization(), enrich.getEcresearchorganization(), trust)); + .setEcresearchorganization( + chooseReference(merged.getEcresearchorganization(), enrich.getEcresearchorganization(), trust)); merged - .setEchighereducation(chooseReference(merged.getEchighereducation(), enrich.getEchighereducation(), trust)); + .setEchighereducation(chooseReference(merged.getEchighereducation(), enrich.getEchighereducation(), trust)); merged - .setEcinternationalorganizationeurinterests( - chooseReference( - merged.getEcinternationalorganizationeurinterests(), - enrich.getEcinternationalorganizationeurinterests(), trust)); + .setEcinternationalorganizationeurinterests( + chooseReference( + merged.getEcinternationalorganizationeurinterests(), + enrich.getEcinternationalorganizationeurinterests(), trust)); merged - .setEcinternationalorganization( - chooseReference( - merged.getEcinternationalorganization(), enrich.getEcinternationalorganization(), trust)); + .setEcinternationalorganization( + chooseReference( + merged.getEcinternationalorganization(), enrich.getEcinternationalorganization(), trust)); merged.setEcenterprise(chooseReference(merged.getEcenterprise(), enrich.getEcenterprise(), trust)); merged.setEcsmevalidated(chooseReference(merged.getEcsmevalidated(), enrich.getEcsmevalidated(), trust)); merged.setEcnutscode(chooseReference(merged.getEcnutscode(), enrich.getEcnutscode(), trust)); @@ -860,8 +867,8 @@ public class MergeUtils { merged.setDuration(chooseReference(merged.getDuration(), enrich.getDuration(), trust)); merged.setEcsc39(chooseReference(merged.getEcsc39(), enrich.getEcsc39(), trust)); merged - .setOamandatepublications( - chooseReference(merged.getOamandatepublications(), enrich.getOamandatepublications(), trust)); + .setOamandatepublications( + chooseReference(merged.getOamandatepublications(), enrich.getOamandatepublications(), trust)); merged.setEcarticle29_3(chooseReference(merged.getEcarticle29_3(), enrich.getEcarticle29_3(), trust)); merged.setSubjects(unionDistinctLists(merged.getSubjects(), enrich.getSubjects(), trust)); merged.setFundingtree(unionDistinctLists(merged.getFundingtree(), enrich.getFundingtree(), trust)); @@ -887,8 +894,8 @@ public class MergeUtils { } merged - .setH2020classification( - unionDistinctLists(merged.getH2020classification(), enrich.getH2020classification(), trust)); + .setH2020classification( + unionDistinctLists(merged.getH2020classification(), enrich.getH2020classification(), trust)); return merged; } @@ -915,7 +922,7 @@ public class MergeUtils { * @return list of instances possibly enriched */ private static List enrichInstances(final List toEnrichInstances, - final List enrichmentInstances) { + final List enrichmentInstances) { final List enrichmentResult = new ArrayList<>(); if (toEnrichInstances == null) { @@ -953,42 +960,42 @@ public class MergeUtils { */ private static Map toInstanceMap(final List ri) { return ri - .stream() - .filter(i -> i.getPid() != null || i.getAlternateIdentifier() != null) - .flatMap(i -> { - final List> result = new ArrayList<>(); - if (i.getPid() != null) - i - .getPid() - .stream() - .filter(MergeUtils::validPid) - .forEach(p -> result.add(new ImmutablePair<>(extractKeyFromPid(p), i))); - if (i.getAlternateIdentifier() != null) - i - .getAlternateIdentifier() - .stream() - .filter(MergeUtils::validPid) - .forEach(p -> result.add(new ImmutablePair<>(extractKeyFromPid(p), i))); - return result.stream(); - }) - .collect( - Collectors - .toMap( - Pair::getLeft, - Pair::getRight, - (a, b) -> a)); + .stream() + .filter(i -> i.getPid() != null || i.getAlternateIdentifier() != null) + .flatMap(i -> { + final List> result = new ArrayList<>(); + if (i.getPid() != null) + i + .getPid() + .stream() + .filter(MergeUtils::validPid) + .forEach(p -> result.add(new ImmutablePair<>(extractKeyFromPid(p), i))); + if (i.getAlternateIdentifier() != null) + i + .getAlternateIdentifier() + .stream() + .filter(MergeUtils::validPid) + .forEach(p -> result.add(new ImmutablePair<>(extractKeyFromPid(p), i))); + return result.stream(); + }) + .collect( + Collectors + .toMap( + Pair::getLeft, + Pair::getRight, + (a, b) -> a)); } private static boolean isFromDelegatedAuthority(Result r) { return Optional - .ofNullable(r.getInstance()) - .map( - instance -> instance - .stream() - .filter(i -> Objects.nonNull(i.getCollectedfrom())) - .map(i -> i.getCollectedfrom().getKey()) - .anyMatch(cfId -> IdentifierFactory.delegatedAuthorityDatasourceIds().contains(cfId))) - .orElse(false); + .ofNullable(r.getInstance()) + .map( + instance -> instance + .stream() + .filter(i -> Objects.nonNull(i.getCollectedfrom())) + .map(i -> i.getCollectedfrom().getKey()) + .anyMatch(cfId -> IdentifierFactory.delegatedAuthorityDatasourceIds().contains(cfId))) + .orElse(false); } /** @@ -1024,15 +1031,15 @@ public class MergeUtils { * @return the list */ private static List findEnrichmentsByPID(final List pids, - final Map enrichments) { + final Map enrichments) { if (pids == null || enrichments == null) return null; return pids - .stream() - .map(MergeUtils::extractKeyFromPid) - .map(enrichments::get) - .filter(Objects::nonNull) - .collect(Collectors.toList()); + .stream() + .map(MergeUtils::extractKeyFromPid) + .map(enrichments::get) + .filter(Objects::nonNull) + .collect(Collectors.toList()); } /** @@ -1043,8 +1050,8 @@ public class MergeUtils { */ private static boolean isAnEnrichment(OafEntity e) { return e.getDataInfo() != null && - e.getDataInfo().getProvenanceaction() != null - && ModelConstants.PROVENANCE_ENRICH.equalsIgnoreCase(e.getDataInfo().getProvenanceaction().getClassid()); + e.getDataInfo().getProvenanceaction() != null + && ModelConstants.PROVENANCE_ENRICH.equalsIgnoreCase(e.getDataInfo().getProvenanceaction().getClassid()); } /** @@ -1067,17 +1074,17 @@ public class MergeUtils { merge.setHostedby(firstNonNull(merge.getHostedby(), enrichment.getHostedby())); merge.setUrl(unionDistinctLists(merge.getUrl(), enrichment.getUrl(), 0)); merge - .setDistributionlocation( - firstNonNull(merge.getDistributionlocation(), enrichment.getDistributionlocation())); + .setDistributionlocation( + firstNonNull(merge.getDistributionlocation(), enrichment.getDistributionlocation())); merge.setCollectedfrom(firstNonNull(merge.getCollectedfrom(), enrichment.getCollectedfrom())); // pid and alternateId are used for matching merge.setDateofacceptance(firstNonNull(merge.getDateofacceptance(), enrichment.getDateofacceptance())); merge - .setProcessingchargeamount( - firstNonNull(merge.getProcessingchargeamount(), enrichment.getProcessingchargeamount())); + .setProcessingchargeamount( + firstNonNull(merge.getProcessingchargeamount(), enrichment.getProcessingchargeamount())); merge - .setProcessingchargecurrency( - firstNonNull(merge.getProcessingchargecurrency(), enrichment.getProcessingchargecurrency())); + .setProcessingchargecurrency( + firstNonNull(merge.getProcessingchargecurrency(), enrichment.getProcessingchargecurrency())); merge.setRefereed(firstNonNull(merge.getRefereed(), enrichment.getRefereed())); merge.setMeasures(unionDistinctLists(merge.getMeasures(), enrichment.getMeasures(), 0)); merge.setFulltext(firstNonNull(merge.getFulltext(), enrichment.getFulltext())); @@ -1085,14 +1092,14 @@ public class MergeUtils { private static int compareTrust(Oaf a, Oaf b) { String left = Optional - .ofNullable(a.getDataInfo()) - .map(DataInfo::getTrust) - .orElse("0.0"); + .ofNullable(a.getDataInfo()) + .map(DataInfo::getTrust) + .orElse("0.0"); String right = Optional - .ofNullable(b.getDataInfo()) - .map(DataInfo::getTrust) - .orElse("0.0"); + .ofNullable(b.getDataInfo()) + .map(DataInfo::getTrust) + .orElse("0.0"); return left.compareTo(right); } From 5f512f510e92a717f1e536b9be9db15399d42805 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 15 Nov 2024 09:16:51 +0100 Subject: [PATCH 14/15] code formatting --- .../raw/AbstractMdRecordToOafMapper.java | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index be84778f52..881d3202c8 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -155,10 +155,11 @@ public abstract class AbstractMdRecordToOafMapper { final Instance instance = prepareInstances(doc, entityInfo, collectedFrom, hostedBy); - if (!Optional.ofNullable(instance.getInstancetype()) - .map(Qualifier::getClassid) - .filter(StringUtils::isNotBlank) - .isPresent()) { + if (!Optional + .ofNullable(instance.getInstancetype()) + .map(Qualifier::getClassid) + .filter(StringUtils::isNotBlank) + .isPresent()) { return Lists.newArrayList(); } @@ -173,13 +174,16 @@ public abstract class AbstractMdRecordToOafMapper { protected String getResultType(final Instance instance) { if (this.vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) { - return Optional.ofNullable(instance.getInstancetype()) - .map(Qualifier::getClassid) - .map(instanceType -> Optional - .ofNullable(this.vocs.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, instanceType)) - .map(Qualifier::getClassid) - .orElse("0000")) - .orElse("0000"); + return Optional + .ofNullable(instance.getInstancetype()) + .map(Qualifier::getClassid) + .map( + instanceType -> Optional + .ofNullable( + this.vocs.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, instanceType)) + .map(Qualifier::getClassid) + .orElse("0000")) + .orElse("0000"); } else { throw new IllegalStateException("Missing vocabulary: " + ModelConstants.DNET_RESULT_TYPOLOGIES); } From cf7d9a32ab847d50bcafe03bc66795d2719e7d03 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 15 Nov 2024 09:17:28 +0100 Subject: [PATCH 15/15] disable autoBroadcastJoin in the cleaning workflow --- .../eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml index 2512fc5bc7..01aaadae5b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml @@ -162,6 +162,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.sql.shuffle.partitions=15000 --inputPath${graphInputPath}/publication @@ -197,6 +198,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.sql.shuffle.partitions=8000 --inputPath${graphInputPath}/dataset @@ -232,6 +234,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.sql.shuffle.partitions=5000 --inputPath${graphInputPath}/otherresearchproduct @@ -267,6 +270,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.sql.shuffle.partitions=2000 --inputPath${graphInputPath}/software @@ -302,6 +306,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.sql.shuffle.partitions=1000 --inputPath${graphInputPath}/datasource @@ -337,6 +342,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.sql.shuffle.partitions=1000 --inputPath${graphInputPath}/organization @@ -372,6 +378,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.sql.shuffle.partitions=2000 --inputPath${graphInputPath}/project @@ -407,6 +414,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.sql.shuffle.partitions=2000 --inputPath${graphInputPath}/person @@ -442,6 +450,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.sql.shuffle.partitions=20000 --inputPath${graphInputPath}/relation