enforce resulttype #506

Merged
claudio.atzori merged 1 commits from merge_resulttypes into beta 2024-11-12 14:20:22 +01:00
7 changed files with 66 additions and 24 deletions

View File

@ -2,8 +2,7 @@
package eu.dnetlib.dhp.oa.merge; package eu.dnetlib.dhp.oa.merge;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.*;
import static org.apache.spark.sql.functions.when;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -135,7 +134,9 @@ public class GroupEntitiesSparkJob {
.applyCoarVocabularies(entity, vocs), .applyCoarVocabularies(entity, vocs),
OAFENTITY_KRYO_ENC) OAFENTITY_KRYO_ENC)
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING()) .groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
.mapGroups((MapGroupsFunction<String, OafEntity, OafEntity>) MergeUtils::mergeById, OAFENTITY_KRYO_ENC) .mapGroups(
(MapGroupsFunction<String, OafEntity, OafEntity>) (key, group) -> MergeUtils.mergeById(group, vocs),
OAFENTITY_KRYO_ENC)
.map( .map(
(MapFunction<OafEntity, Tuple2<String, OafEntity>>) t -> new Tuple2<>( (MapFunction<OafEntity, Tuple2<String, OafEntity>>) t -> new Tuple2<>(
t.getClass().getName(), t), t.getClass().getName(), t),

View File

@ -23,24 +23,30 @@ import org.apache.commons.lang3.tuple.Pair;
import com.github.sisyphsu.dateparser.DateParserUtils; import com.github.sisyphsu.dateparser.DateParserUtils;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.oa.merge.AuthorMerger;
import eu.dnetlib.dhp.schema.common.AccessRightComparator; import eu.dnetlib.dhp.schema.common.AccessRightComparator;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
public class MergeUtils { public class MergeUtils {
public static <T extends Oaf> T mergeById(String s, Iterator<T> oafEntityIterator) { public static <T extends Oaf> T mergeById(Iterator<T> oafEntityIterator, VocabularyGroup vocs) {
return mergeGroup(s, oafEntityIterator, true); return mergeGroup(oafEntityIterator, true, vocs);
} }
public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator) { public static <T extends Oaf> T mergeGroup(Iterator<T> oafEntityIterator) {
return mergeGroup(s, oafEntityIterator, false); return mergeGroup(oafEntityIterator, false);
} }
public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator, public static <T extends Oaf> T mergeGroup(Iterator<T> oafEntityIterator, boolean checkDelegateAuthority) {
boolean checkDelegateAuthority) { return mergeGroup(oafEntityIterator, checkDelegateAuthority, null);
}
public static <T extends Oaf> T mergeGroup(Iterator<T> oafEntityIterator,
boolean checkDelegateAuthority, VocabularyGroup vocs) {
ArrayList<T> sortedEntities = new ArrayList<>(); ArrayList<T> sortedEntities = new ArrayList<>();
oafEntityIterator.forEachRemaining(sortedEntities::add); oafEntityIterator.forEachRemaining(sortedEntities::add);
@ -49,13 +55,49 @@ public class MergeUtils {
Iterator<T> it = sortedEntities.iterator(); Iterator<T> it = sortedEntities.iterator();
T merged = it.next(); T merged = it.next();
while (it.hasNext()) { if (!it.hasNext() && merged instanceof Result && vocs != null) {
merged = checkedMerge(merged, it.next(), checkDelegateAuthority); return enforceResultType(vocs, (Result) merged);
} else {
while (it.hasNext()) {
merged = checkedMerge(merged, it.next(), checkDelegateAuthority);
}
} }
return merged; return merged;
} }
private static <T extends Oaf> T enforceResultType(VocabularyGroup vocs, Result mergedResult) {
if (Optional.ofNullable(mergedResult.getInstance()).map(List::isEmpty).orElse(true)) {
return (T) mergedResult;
} else {
final Instance i = mergedResult.getInstance().get(0);
if (!vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) {
return (T) mergedResult;
} else {
final Qualifier expectedResultType = vocs
.getSynonymAsQualifier(
ModelConstants.DNET_RESULT_TYPOLOGIES,
i.getInstancetype().getClassid());
// there is a clash among the result types
if (!expectedResultType.getClassid().equals(mergedResult.getResulttype().getClassid())) {
try {
String resulttype = expectedResultType.getClassid();
if (EntityType.otherresearchproduct.toString().equals(resulttype)) {
resulttype = "other";
}
Result result = (Result) ModelSupport.oafTypes.get(resulttype).newInstance();
return (T) mergeResultFields(result, mergedResult);
} catch (InstantiationException | IllegalAccessException e) {
throw new IllegalStateException(e);
}
} else {
return (T) mergedResult;
}
}
}
}
public static <T extends Oaf> T checkedMerge(final T left, final T right, boolean checkDelegateAuthority) { public static <T extends Oaf> T checkedMerge(final T left, final T right, boolean checkDelegateAuthority) {
return (T) merge(left, right, checkDelegateAuthority); return (T) merge(left, right, checkDelegateAuthority);
} }
@ -106,7 +148,7 @@ public class MergeUtils {
return mergeSoftware((Software) left, (Software) right); return mergeSoftware((Software) left, (Software) right);
} }
return mergeResultFields((Result) left, (Result) right); return left;
} else if (sameClass(left, right, Datasource.class)) { } else if (sameClass(left, right, Datasource.class)) {
// TODO // TODO
final int trust = compareTrust(left, right); final int trust = compareTrust(left, right);

View File

@ -135,7 +135,7 @@ public class DedupRecordFactory {
return Collections.emptyIterator(); return Collections.emptyIterator();
} }
OafEntity mergedEntity = MergeUtils.mergeGroup(dedupId, cliques.iterator()); OafEntity mergedEntity = MergeUtils.mergeGroup(cliques.iterator());
// dedup records do not have date of transformation attribute // dedup records do not have date of transformation attribute
mergedEntity.setDateoftransformation(null); mergedEntity.setDateoftransformation(null);
mergedEntity mergedEntity

View File

@ -46,8 +46,8 @@ class DatasetMergerTest implements Serializable {
} }
@Test @Test
void datasetMergerTest() throws InstantiationException, IllegalAccessException, InvocationTargetException { void datasetMergerTest() {
Dataset pub_merged = MergeUtils.mergeGroup(dedupId, datasets.stream().map(Tuple2::_2).iterator()); Dataset pub_merged = MergeUtils.mergeGroup(datasets.stream().map(Tuple2::_2).iterator());
// verify id // verify id
assertEquals(dedupId, pub_merged.getId()); assertEquals(dedupId, pub_merged.getId());

View File

@ -155,7 +155,7 @@ public abstract class AbstractMdRecordToOafMapper {
final List<Instance> instances = prepareInstances(doc, entityInfo, collectedFrom, hostedBy); final List<Instance> instances = prepareInstances(doc, entityInfo, collectedFrom, hostedBy);
final String type = getResultType(doc, instances); final String type = getResultType(instances);
return createOafs(doc, type, instances, collectedFrom, entityInfo, lastUpdateTimestamp); return createOafs(doc, type, instances, collectedFrom, entityInfo, lastUpdateTimestamp);
} catch (final DocumentException e) { } catch (final DocumentException e) {
@ -164,10 +164,9 @@ public abstract class AbstractMdRecordToOafMapper {
} }
} }
protected String getResultType(final Document doc, final List<Instance> instances) { protected String getResultType(final List<Instance> instances) {
final String type = doc.valueOf("//dr:CobjCategory/@type");
if (StringUtils.isBlank(type) && this.vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) { if (this.vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) {
final String instanceType = instances final String instanceType = instances
.stream() .stream()
.map(i -> i.getInstancetype().getClassid()) .map(i -> i.getInstancetype().getClassid())
@ -178,9 +177,9 @@ public abstract class AbstractMdRecordToOafMapper {
.ofNullable(this.vocs.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, instanceType)) .ofNullable(this.vocs.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, instanceType))
.map(Qualifier::getClassid) .map(Qualifier::getClassid)
.orElse("0000"); .orElse("0000");
} else {
throw new IllegalStateException("Missing vocabulary: " + ModelConstants.DNET_RESULT_TYPOLOGIES);
} }
return type;
} }
private KeyValue getProvenanceDatasource(final Document doc, final String xpathId, final String xpathName) { private KeyValue getProvenanceDatasource(final Document doc, final String xpathId, final String xpathName) {

View File

@ -133,7 +133,7 @@ public class GenerateEntitiesApplication extends AbstractMigrationApplication {
inputRdd inputRdd
.keyBy(oaf -> ModelSupport.idFn().apply(oaf)) .keyBy(oaf -> ModelSupport.idFn().apply(oaf))
.groupByKey() .groupByKey()
.map(t -> MergeUtils.mergeGroup(t._1, t._2.iterator())), .map(t -> MergeUtils.mergeGroup(t._2.iterator())),
// .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf)) // .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
// .reduceByKey(MergeUtils::merge) // .reduceByKey(MergeUtils::merge)
// .map(Tuple2::_2), // .map(Tuple2::_2),

View File

@ -133,7 +133,7 @@ object SparkCreateInputGraph {
val ds: Dataset[T] = spark.read.load(sourcePath).as[T] val ds: Dataset[T] = spark.read.load(sourcePath).as[T]
ds.groupByKey(_.getId) ds.groupByKey(_.getId)
.mapGroups { (id, it) => MergeUtils.mergeGroup(id, it.asJava).asInstanceOf[T] } .mapGroups { (id, it) => MergeUtils.mergeGroup(it.asJava).asInstanceOf[T] }
// .reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] } // .reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] }
// .map(_) // .map(_)
.write .write