diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java index 24de1a787..98ec09277 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java @@ -2,8 +2,7 @@ package eu.dnetlib.dhp.oa.merge; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import static org.apache.spark.sql.functions.col; -import static org.apache.spark.sql.functions.when; +import static org.apache.spark.sql.functions.*; import java.util.Map; import java.util.Optional; @@ -135,7 +134,9 @@ public class GroupEntitiesSparkJob { .applyCoarVocabularies(entity, vocs), OAFENTITY_KRYO_ENC) .groupByKey((MapFunction) OafEntity::getId, Encoders.STRING()) - .mapGroups((MapGroupsFunction) MergeUtils::mergeById, OAFENTITY_KRYO_ENC) + .mapGroups( + (MapGroupsFunction) (key, group) -> MergeUtils.mergeById(group, vocs), + OAFENTITY_KRYO_ENC) .map( (MapFunction>) t -> new Tuple2<>( t.getClass().getName(), t), diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java index 4c411a155..d7e08fca7 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java @@ -23,24 +23,30 @@ import org.apache.commons.lang3.tuple.Pair; import com.github.sisyphsu.dateparser.DateParserUtils; import com.google.common.base.Joiner; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.schema.common.AccessRightComparator; +import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; public class MergeUtils { - public static T mergeById(String s, Iterator oafEntityIterator) { - return mergeGroup(s, oafEntityIterator, true); + public static T mergeById(Iterator oafEntityIterator, VocabularyGroup vocs) { + return mergeGroup(oafEntityIterator, true, vocs); } - public static T mergeGroup(String s, Iterator oafEntityIterator) { - return mergeGroup(s, oafEntityIterator, false); + public static T mergeGroup(Iterator oafEntityIterator) { + return mergeGroup(oafEntityIterator, false); } - public static T mergeGroup(String s, Iterator oafEntityIterator, - boolean checkDelegateAuthority) { + public static T mergeGroup(Iterator oafEntityIterator, boolean checkDelegateAuthority) { + return mergeGroup(oafEntityIterator, checkDelegateAuthority, null); + } + + public static T mergeGroup(Iterator oafEntityIterator, + boolean checkDelegateAuthority, VocabularyGroup vocs) { ArrayList sortedEntities = new ArrayList<>(); oafEntityIterator.forEachRemaining(sortedEntities::add); @@ -49,13 +55,49 @@ public class MergeUtils { Iterator it = sortedEntities.iterator(); T merged = it.next(); - while (it.hasNext()) { - merged = checkedMerge(merged, it.next(), checkDelegateAuthority); + if (!it.hasNext() && merged instanceof Result && vocs != null) { + return enforceResultType(vocs, (Result) merged); + } else { + while (it.hasNext()) { + merged = checkedMerge(merged, it.next(), checkDelegateAuthority); + } } - return merged; } + private static T enforceResultType(VocabularyGroup vocs, Result mergedResult) { + if (Optional.ofNullable(mergedResult.getInstance()).map(List::isEmpty).orElse(true)) { + return (T) mergedResult; + } else { + final Instance i = mergedResult.getInstance().get(0); + + if (!vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) { + return (T) mergedResult; + } else { + final Qualifier expectedResultType = vocs + .getSynonymAsQualifier( + ModelConstants.DNET_RESULT_TYPOLOGIES, + i.getInstancetype().getClassid()); + + // there is a clash among the result types + if (!expectedResultType.getClassid().equals(mergedResult.getResulttype().getClassid())) { + try { + String resulttype = expectedResultType.getClassid(); + if (EntityType.otherresearchproduct.toString().equals(resulttype)) { + resulttype = "other"; + } + Result result = (Result) ModelSupport.oafTypes.get(resulttype).newInstance(); + return (T) mergeResultFields(result, mergedResult); + } catch (InstantiationException | IllegalAccessException e) { + throw new IllegalStateException(e); + } + } else { + return (T) mergedResult; + } + } + } + } + public static T checkedMerge(final T left, final T right, boolean checkDelegateAuthority) { return (T) merge(left, right, checkDelegateAuthority); } @@ -106,7 +148,7 @@ public class MergeUtils { return mergeSoftware((Software) left, (Software) right); } - return mergeResultFields((Result) left, (Result) right); + return left; } else if (sameClass(left, right, Datasource.class)) { // TODO final int trust = compareTrust(left, right); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index 44482cfdb..f6a436543 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -135,7 +135,7 @@ public class DedupRecordFactory { return Collections.emptyIterator(); } - OafEntity mergedEntity = MergeUtils.mergeGroup(dedupId, cliques.iterator()); + OafEntity mergedEntity = MergeUtils.mergeGroup(cliques.iterator()); // dedup records do not have date of transformation attribute mergedEntity.setDateoftransformation(null); mergedEntity diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/DatasetMergerTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/DatasetMergerTest.java index 726814c43..a79047590 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/DatasetMergerTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/DatasetMergerTest.java @@ -46,8 +46,8 @@ class DatasetMergerTest implements Serializable { } @Test - void datasetMergerTest() throws InstantiationException, IllegalAccessException, InvocationTargetException { - Dataset pub_merged = MergeUtils.mergeGroup(dedupId, datasets.stream().map(Tuple2::_2).iterator()); + void datasetMergerTest() { + Dataset pub_merged = MergeUtils.mergeGroup(datasets.stream().map(Tuple2::_2).iterator()); // verify id assertEquals(dedupId, pub_merged.getId()); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index 2436a272c..ba6887a2e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -155,7 +155,7 @@ public abstract class AbstractMdRecordToOafMapper { final List instances = prepareInstances(doc, entityInfo, collectedFrom, hostedBy); - final String type = getResultType(doc, instances); + final String type = getResultType(instances); return createOafs(doc, type, instances, collectedFrom, entityInfo, lastUpdateTimestamp); } catch (final DocumentException e) { @@ -164,10 +164,9 @@ public abstract class AbstractMdRecordToOafMapper { } } - protected String getResultType(final Document doc, final List instances) { - final String type = doc.valueOf("//dr:CobjCategory/@type"); + protected String getResultType(final List instances) { - if (StringUtils.isBlank(type) && this.vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) { + if (this.vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) { final String instanceType = instances .stream() .map(i -> i.getInstancetype().getClassid()) @@ -178,9 +177,9 @@ public abstract class AbstractMdRecordToOafMapper { .ofNullable(this.vocs.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, instanceType)) .map(Qualifier::getClassid) .orElse("0000"); + } else { + throw new IllegalStateException("Missing vocabulary: " + ModelConstants.DNET_RESULT_TYPOLOGIES); } - - return type; } private KeyValue getProvenanceDatasource(final Document doc, final String xpathId, final String xpathName) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index c3806c211..357fae470 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -133,7 +133,7 @@ public class GenerateEntitiesApplication extends AbstractMigrationApplication { inputRdd .keyBy(oaf -> ModelSupport.idFn().apply(oaf)) .groupByKey() - .map(t -> MergeUtils.mergeGroup(t._1, t._2.iterator())), + .map(t -> MergeUtils.mergeGroup(t._2.iterator())), // .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf)) // .reduceByKey(MergeUtils::merge) // .map(Tuple2::_2), diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala index d94a23947..42299cd34 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala @@ -133,7 +133,7 @@ object SparkCreateInputGraph { val ds: Dataset[T] = spark.read.load(sourcePath).as[T] ds.groupByKey(_.getId) - .mapGroups { (id, it) => MergeUtils.mergeGroup(id, it.asJava).asInstanceOf[T] } + .mapGroups { (id, it) => MergeUtils.mergeGroup(it.asJava).asInstanceOf[T] } // .reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] } // .map(_) .write