enforce resulttype #506
|
@ -2,8 +2,7 @@
|
||||||
package eu.dnetlib.dhp.oa.merge;
|
package eu.dnetlib.dhp.oa.merge;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
import static org.apache.spark.sql.functions.col;
|
import static org.apache.spark.sql.functions.*;
|
||||||
import static org.apache.spark.sql.functions.when;
|
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
@ -135,7 +134,9 @@ public class GroupEntitiesSparkJob {
|
||||||
.applyCoarVocabularies(entity, vocs),
|
.applyCoarVocabularies(entity, vocs),
|
||||||
OAFENTITY_KRYO_ENC)
|
OAFENTITY_KRYO_ENC)
|
||||||
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
|
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
|
||||||
.mapGroups((MapGroupsFunction<String, OafEntity, OafEntity>) MergeUtils::mergeById, OAFENTITY_KRYO_ENC)
|
.mapGroups(
|
||||||
|
(MapGroupsFunction<String, OafEntity, OafEntity>) (key, group) -> MergeUtils.mergeById(group, vocs),
|
||||||
|
OAFENTITY_KRYO_ENC)
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<OafEntity, Tuple2<String, OafEntity>>) t -> new Tuple2<>(
|
(MapFunction<OafEntity, Tuple2<String, OafEntity>>) t -> new Tuple2<>(
|
||||||
t.getClass().getName(), t),
|
t.getClass().getName(), t),
|
||||||
|
|
|
@ -23,24 +23,30 @@ import org.apache.commons.lang3.tuple.Pair;
|
||||||
import com.github.sisyphsu.dateparser.DateParserUtils;
|
import com.github.sisyphsu.dateparser.DateParserUtils;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
|
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
|
||||||
import eu.dnetlib.dhp.schema.common.AccessRightComparator;
|
import eu.dnetlib.dhp.schema.common.AccessRightComparator;
|
||||||
|
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
|
||||||
public class MergeUtils {
|
public class MergeUtils {
|
||||||
|
|
||||||
public static <T extends Oaf> T mergeById(String s, Iterator<T> oafEntityIterator) {
|
public static <T extends Oaf> T mergeById(Iterator<T> oafEntityIterator, VocabularyGroup vocs) {
|
||||||
return mergeGroup(s, oafEntityIterator, true);
|
return mergeGroup(oafEntityIterator, true, vocs);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator) {
|
public static <T extends Oaf> T mergeGroup(Iterator<T> oafEntityIterator) {
|
||||||
return mergeGroup(s, oafEntityIterator, false);
|
return mergeGroup(oafEntityIterator, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator,
|
public static <T extends Oaf> T mergeGroup(Iterator<T> oafEntityIterator, boolean checkDelegateAuthority) {
|
||||||
boolean checkDelegateAuthority) {
|
return mergeGroup(oafEntityIterator, checkDelegateAuthority, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T extends Oaf> T mergeGroup(Iterator<T> oafEntityIterator,
|
||||||
|
boolean checkDelegateAuthority, VocabularyGroup vocs) {
|
||||||
|
|
||||||
ArrayList<T> sortedEntities = new ArrayList<>();
|
ArrayList<T> sortedEntities = new ArrayList<>();
|
||||||
oafEntityIterator.forEachRemaining(sortedEntities::add);
|
oafEntityIterator.forEachRemaining(sortedEntities::add);
|
||||||
|
@ -49,13 +55,49 @@ public class MergeUtils {
|
||||||
Iterator<T> it = sortedEntities.iterator();
|
Iterator<T> it = sortedEntities.iterator();
|
||||||
T merged = it.next();
|
T merged = it.next();
|
||||||
|
|
||||||
while (it.hasNext()) {
|
if (!it.hasNext() && merged instanceof Result && vocs != null) {
|
||||||
merged = checkedMerge(merged, it.next(), checkDelegateAuthority);
|
return enforceResultType(vocs, (Result) merged);
|
||||||
|
} else {
|
||||||
|
while (it.hasNext()) {
|
||||||
|
merged = checkedMerge(merged, it.next(), checkDelegateAuthority);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return merged;
|
return merged;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static <T extends Oaf> T enforceResultType(VocabularyGroup vocs, Result mergedResult) {
|
||||||
|
if (Optional.ofNullable(mergedResult.getInstance()).map(List::isEmpty).orElse(true)) {
|
||||||
|
return (T) mergedResult;
|
||||||
|
} else {
|
||||||
|
final Instance i = mergedResult.getInstance().get(0);
|
||||||
|
|
||||||
|
if (!vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) {
|
||||||
|
return (T) mergedResult;
|
||||||
|
} else {
|
||||||
|
final Qualifier expectedResultType = vocs
|
||||||
|
.getSynonymAsQualifier(
|
||||||
|
ModelConstants.DNET_RESULT_TYPOLOGIES,
|
||||||
|
i.getInstancetype().getClassid());
|
||||||
|
|
||||||
|
// there is a clash among the result types
|
||||||
|
if (!expectedResultType.getClassid().equals(mergedResult.getResulttype().getClassid())) {
|
||||||
|
try {
|
||||||
|
String resulttype = expectedResultType.getClassid();
|
||||||
|
if (EntityType.otherresearchproduct.toString().equals(resulttype)) {
|
||||||
|
resulttype = "other";
|
||||||
|
}
|
||||||
|
Result result = (Result) ModelSupport.oafTypes.get(resulttype).newInstance();
|
||||||
|
return (T) mergeResultFields(result, mergedResult);
|
||||||
|
} catch (InstantiationException | IllegalAccessException e) {
|
||||||
|
throw new IllegalStateException(e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return (T) mergedResult;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static <T extends Oaf> T checkedMerge(final T left, final T right, boolean checkDelegateAuthority) {
|
public static <T extends Oaf> T checkedMerge(final T left, final T right, boolean checkDelegateAuthority) {
|
||||||
return (T) merge(left, right, checkDelegateAuthority);
|
return (T) merge(left, right, checkDelegateAuthority);
|
||||||
}
|
}
|
||||||
|
@ -106,7 +148,7 @@ public class MergeUtils {
|
||||||
return mergeSoftware((Software) left, (Software) right);
|
return mergeSoftware((Software) left, (Software) right);
|
||||||
}
|
}
|
||||||
|
|
||||||
return mergeResultFields((Result) left, (Result) right);
|
return left;
|
||||||
} else if (sameClass(left, right, Datasource.class)) {
|
} else if (sameClass(left, right, Datasource.class)) {
|
||||||
// TODO
|
// TODO
|
||||||
final int trust = compareTrust(left, right);
|
final int trust = compareTrust(left, right);
|
||||||
|
|
|
@ -135,7 +135,7 @@ public class DedupRecordFactory {
|
||||||
return Collections.emptyIterator();
|
return Collections.emptyIterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
OafEntity mergedEntity = MergeUtils.mergeGroup(dedupId, cliques.iterator());
|
OafEntity mergedEntity = MergeUtils.mergeGroup(cliques.iterator());
|
||||||
// dedup records do not have date of transformation attribute
|
// dedup records do not have date of transformation attribute
|
||||||
mergedEntity.setDateoftransformation(null);
|
mergedEntity.setDateoftransformation(null);
|
||||||
mergedEntity
|
mergedEntity
|
||||||
|
|
|
@ -46,8 +46,8 @@ class DatasetMergerTest implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void datasetMergerTest() throws InstantiationException, IllegalAccessException, InvocationTargetException {
|
void datasetMergerTest() {
|
||||||
Dataset pub_merged = MergeUtils.mergeGroup(dedupId, datasets.stream().map(Tuple2::_2).iterator());
|
Dataset pub_merged = MergeUtils.mergeGroup(datasets.stream().map(Tuple2::_2).iterator());
|
||||||
|
|
||||||
// verify id
|
// verify id
|
||||||
assertEquals(dedupId, pub_merged.getId());
|
assertEquals(dedupId, pub_merged.getId());
|
||||||
|
|
|
@ -155,7 +155,7 @@ public abstract class AbstractMdRecordToOafMapper {
|
||||||
|
|
||||||
final List<Instance> instances = prepareInstances(doc, entityInfo, collectedFrom, hostedBy);
|
final List<Instance> instances = prepareInstances(doc, entityInfo, collectedFrom, hostedBy);
|
||||||
|
|
||||||
final String type = getResultType(doc, instances);
|
final String type = getResultType(instances);
|
||||||
|
|
||||||
return createOafs(doc, type, instances, collectedFrom, entityInfo, lastUpdateTimestamp);
|
return createOafs(doc, type, instances, collectedFrom, entityInfo, lastUpdateTimestamp);
|
||||||
} catch (final DocumentException e) {
|
} catch (final DocumentException e) {
|
||||||
|
@ -164,10 +164,9 @@ public abstract class AbstractMdRecordToOafMapper {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getResultType(final Document doc, final List<Instance> instances) {
|
protected String getResultType(final List<Instance> instances) {
|
||||||
final String type = doc.valueOf("//dr:CobjCategory/@type");
|
|
||||||
|
|
||||||
if (StringUtils.isBlank(type) && this.vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) {
|
if (this.vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) {
|
||||||
final String instanceType = instances
|
final String instanceType = instances
|
||||||
.stream()
|
.stream()
|
||||||
.map(i -> i.getInstancetype().getClassid())
|
.map(i -> i.getInstancetype().getClassid())
|
||||||
|
@ -178,9 +177,9 @@ public abstract class AbstractMdRecordToOafMapper {
|
||||||
.ofNullable(this.vocs.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, instanceType))
|
.ofNullable(this.vocs.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, instanceType))
|
||||||
.map(Qualifier::getClassid)
|
.map(Qualifier::getClassid)
|
||||||
.orElse("0000");
|
.orElse("0000");
|
||||||
|
} else {
|
||||||
|
throw new IllegalStateException("Missing vocabulary: " + ModelConstants.DNET_RESULT_TYPOLOGIES);
|
||||||
}
|
}
|
||||||
|
|
||||||
return type;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private KeyValue getProvenanceDatasource(final Document doc, final String xpathId, final String xpathName) {
|
private KeyValue getProvenanceDatasource(final Document doc, final String xpathId, final String xpathName) {
|
||||||
|
|
|
@ -133,7 +133,7 @@ public class GenerateEntitiesApplication extends AbstractMigrationApplication {
|
||||||
inputRdd
|
inputRdd
|
||||||
.keyBy(oaf -> ModelSupport.idFn().apply(oaf))
|
.keyBy(oaf -> ModelSupport.idFn().apply(oaf))
|
||||||
.groupByKey()
|
.groupByKey()
|
||||||
.map(t -> MergeUtils.mergeGroup(t._1, t._2.iterator())),
|
.map(t -> MergeUtils.mergeGroup(t._2.iterator())),
|
||||||
// .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
|
// .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
|
||||||
// .reduceByKey(MergeUtils::merge)
|
// .reduceByKey(MergeUtils::merge)
|
||||||
// .map(Tuple2::_2),
|
// .map(Tuple2::_2),
|
||||||
|
|
|
@ -133,7 +133,7 @@ object SparkCreateInputGraph {
|
||||||
val ds: Dataset[T] = spark.read.load(sourcePath).as[T]
|
val ds: Dataset[T] = spark.read.load(sourcePath).as[T]
|
||||||
|
|
||||||
ds.groupByKey(_.getId)
|
ds.groupByKey(_.getId)
|
||||||
.mapGroups { (id, it) => MergeUtils.mergeGroup(id, it.asJava).asInstanceOf[T] }
|
.mapGroups { (id, it) => MergeUtils.mergeGroup(it.asJava).asInstanceOf[T] }
|
||||||
// .reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] }
|
// .reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] }
|
||||||
// .map(_)
|
// .map(_)
|
||||||
.write
|
.write
|
||||||
|
|
Loading…
Reference in New Issue