enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
17 changed files with 138 additions and 66 deletions
Showing only changes of commit 5869cb76b3 - Show all commits

View File

@ -91,35 +91,29 @@ public class GenerateEventsApplication {
private static final UpdateMatcher<Pair<Result, List<Software>>, ?> enrichMoreSoftware = new EnrichMoreSoftware(); private static final UpdateMatcher<Pair<Result, List<Software>>, ?> enrichMoreSoftware = new EnrichMoreSoftware();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMisissingPublicationIsRelatedTo = new EnrichMissingPublicationIsRelatedTo(); private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMisissingPublicationIsRelatedTo = new EnrichMissingPublicationIsRelatedTo();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsReferencedBy = private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsReferencedBy = new EnrichMissingPublicationIsReferencedBy();
new EnrichMissingPublicationIsReferencedBy();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationReferences = new EnrichMissingPublicationReferences(); private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationReferences = new EnrichMissingPublicationReferences();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsSupplementedTo = private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsSupplementedTo = new EnrichMissingPublicationIsSupplementedTo();
new EnrichMissingPublicationIsSupplementedTo(); private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsSupplementedBy = new EnrichMissingPublicationIsSupplementedBy();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsSupplementedBy =
new EnrichMissingPublicationIsSupplementedBy();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMisissingDatasetIsRelatedTo = private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMisissingDatasetIsRelatedTo = new EnrichMissingDatasetIsRelatedTo();
new EnrichMissingDatasetIsRelatedTo(); private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsReferencedBy = new EnrichMissingDatasetIsReferencedBy();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsReferencedBy = private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetReferences = new EnrichMissingDatasetReferences();
new EnrichMissingDatasetIsReferencedBy(); private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsSupplementedTo = new EnrichMissingDatasetIsSupplementedTo();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetReferences = private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsSupplementedBy = new EnrichMissingDatasetIsSupplementedBy();
new EnrichMissingDatasetReferences();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsSupplementedTo =
new EnrichMissingDatasetIsSupplementedTo();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsSupplementedBy =
new EnrichMissingDatasetIsSupplementedBy();
// Aggregators // Aggregators
private static final TypedColumn<Tuple2<Result, Relation>, ResultGroup> resultAggrTypedColumn = new ResultAggregator().toColumn(); private static final TypedColumn<Tuple2<Result, Relation>, ResultGroup> resultAggrTypedColumn = new ResultAggregator()
.toColumn();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString(GenerateEventsApplication.class .toString(
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json"))); GenerateEventsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json")));
parser.parseArgument(args); parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional final Boolean isSparkSessionManaged = Optional
@ -172,18 +166,23 @@ public class GenerateEventsApplication {
final Class<R> resultClazz, final Class<R> resultClazz,
final DedupConfig dedupConfig) { final DedupConfig dedupConfig) {
final Dataset<Result> results = readPath(spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), Result.class) final Dataset<Result> results = readPath(
.filter(r -> r.getDataInfo().getDeletedbyinference()); spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), Result.class)
.filter(r -> r.getDataInfo().getDeletedbyinference());
final Dataset<Relation> mergedRels = readPath(spark, graphPath + "/relation", Relation.class) final Dataset<Relation> mergedRels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
return results.joinWith(mergedRels, results.col("id").equalTo(mergedRels.col("source")), "inner") return results
.joinWith(mergedRels, results.col("id").equalTo(mergedRels.col("source")), "inner")
.groupByKey((MapFunction<Tuple2<Result, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING()) .groupByKey((MapFunction<Tuple2<Result, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
.agg(resultAggrTypedColumn) .agg(resultAggrTypedColumn)
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) .map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class))
.filter(ResultGroup::isValid) .filter(ResultGroup::isValid)
.map((MapFunction<ResultGroup, EventGroup>) g -> GenerateEventsApplication.generateSimpleEvents(g, dedupConfig), Encoders.kryo(EventGroup.class)) .map(
(MapFunction<ResultGroup, EventGroup>) g -> GenerateEventsApplication
.generateSimpleEvents(g, dedupConfig),
Encoders.kryo(EventGroup.class))
.flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class)); .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class));
} }
@ -207,16 +206,19 @@ public class GenerateEventsApplication {
return events; return events;
} }
private static <SRC extends Result, TRG extends OafEntity> Dataset<Event> generateRelationEvents(final SparkSession spark, private static <SRC extends Result, TRG extends OafEntity> Dataset<Event> generateRelationEvents(
final SparkSession spark,
final String graphPath, final String graphPath,
final Class<SRC> sourceClass, final Class<SRC> sourceClass,
final Class<TRG> targetClass, final Class<TRG> targetClass,
final DedupConfig dedupConfig) { final DedupConfig dedupConfig) {
final Dataset<Result> sources = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) final Dataset<Result> sources = readPath(
.filter(r -> r.getDataInfo().getDeletedbyinference()); spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class)
.filter(r -> r.getDataInfo().getDeletedbyinference());
final Dataset<TRG> targets = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), targetClass); final Dataset<TRG> targets = readPath(
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), targetClass);
final Dataset<Relation> mergedRels = readPath(spark, graphPath + "/relation", Relation.class) final Dataset<Relation> mergedRels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
@ -224,7 +226,8 @@ public class GenerateEventsApplication {
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class) final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
final Dataset<ResultGroup> duplicates = sources.joinWith(mergedRels, sources.col("id").equalTo(rels.col("source")), "inner") final Dataset<ResultGroup> duplicates = sources
.joinWith(mergedRels, sources.col("id").equalTo(rels.col("source")), "inner")
.groupByKey((MapFunction<Tuple2<Result, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING()) .groupByKey((MapFunction<Tuple2<Result, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
.agg(resultAggrTypedColumn) .agg(resultAggrTypedColumn)
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) .map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class))
@ -243,7 +246,8 @@ public class GenerateEventsApplication {
return null; return null;
} }
private List<Event> generateProjectsEvents(final Collection<Pair<Result, List<Project>>> childrenWithProjects, final DedupConfig dedupConfig) { private List<Event> generateProjectsEvents(final Collection<Pair<Result, List<Project>>> childrenWithProjects,
final DedupConfig dedupConfig) {
final List<UpdateInfo<?>> list = new ArrayList<>(); final List<UpdateInfo<?>> list = new ArrayList<>();
for (final Pair<Result, List<Project>> target : childrenWithProjects) { for (final Pair<Result, List<Project>> target : childrenWithProjects) {
@ -254,7 +258,8 @@ public class GenerateEventsApplication {
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList()); return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
} }
private List<Event> generateSoftwareEvents(final Collection<Pair<Result, List<Software>>> childrenWithSoftwares, final DedupConfig dedupConfig) { private List<Event> generateSoftwareEvents(final Collection<Pair<Result, List<Software>>> childrenWithSoftwares,
final DedupConfig dedupConfig) {
final List<UpdateInfo<?>> list = new ArrayList<>(); final List<UpdateInfo<?>> list = new ArrayList<>();
for (final Pair<Result, List<Software>> target : childrenWithSoftwares) { for (final Pair<Result, List<Software>> target : childrenWithSoftwares) {
@ -279,15 +284,30 @@ public class GenerateEventsApplication {
for (final Pair<Result, List<Publication>> target : cleanedChildrens) { for (final Pair<Result, List<Publication>> target : cleanedChildrens) {
if (relType.equals("isRelatedTo")) { if (relType.equals("isRelatedTo")) {
list.addAll(enrichMisissingPublicationIsRelatedTo.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); list
.addAll(
enrichMisissingPublicationIsRelatedTo
.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
} else if (relType.equals("references")) { } else if (relType.equals("references")) {
list.addAll(enrichMissingPublicationReferences.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); list
.addAll(
enrichMissingPublicationReferences
.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
} else if (relType.equals("isReferencedBy")) { } else if (relType.equals("isReferencedBy")) {
list.addAll(enrichMissingPublicationIsReferencedBy.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); list
.addAll(
enrichMissingPublicationIsReferencedBy
.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
} else if (relType.equals("isSupplementedTo")) { } else if (relType.equals("isSupplementedTo")) {
list.addAll(enrichMissingPublicationIsSupplementedTo.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); list
.addAll(
enrichMissingPublicationIsSupplementedTo
.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
} else if (relType.equals("isSupplementedBy")) { } else if (relType.equals("isSupplementedBy")) {
list.addAll(enrichMissingPublicationIsSupplementedBy.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); list
.addAll(
enrichMissingPublicationIsSupplementedBy
.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
} }
} }
@ -310,15 +330,29 @@ public class GenerateEventsApplication {
for (final Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>> target : cleanedChildrens) { for (final Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>> target : cleanedChildrens) {
if (relType.equals("isRelatedTo")) { if (relType.equals("isRelatedTo")) {
list.addAll(enrichMisissingDatasetIsRelatedTo.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); list
.addAll(
enrichMisissingDatasetIsRelatedTo
.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
} else if (relType.equals("references")) { } else if (relType.equals("references")) {
list.addAll(enrichMissingDatasetReferences.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); list
.addAll(
enrichMissingDatasetReferences.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
} else if (relType.equals("isReferencedBy")) { } else if (relType.equals("isReferencedBy")) {
list.addAll(enrichMissingDatasetIsReferencedBy.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); list
.addAll(
enrichMissingDatasetIsReferencedBy
.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
} else if (relType.equals("isSupplementedTo")) { } else if (relType.equals("isSupplementedTo")) {
list.addAll(enrichMissingDatasetIsSupplementedTo.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); list
.addAll(
enrichMissingDatasetIsSupplementedTo
.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
} else if (relType.equals("isSupplementedBy")) { } else if (relType.equals("isSupplementedBy")) {
list.addAll(enrichMissingDatasetIsSupplementedBy.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); list
.addAll(
enrichMissingDatasetIsSupplementedBy
.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
} }
} }
@ -339,8 +373,12 @@ public class GenerateEventsApplication {
private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception { private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
final String conf = isLookUpService.getResourceProfileByQuery(String final String conf = isLookUpService
.format("for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", profId)); .getResourceProfileByQuery(
String
.format(
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
profId));
final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class); final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class);
dedupConfig.getPace().initModel(); dedupConfig.getPace().initModel();

View File

@ -22,7 +22,8 @@ public abstract class UpdateMatcher<K, T> {
this.multipleUpdate = multipleUpdate; this.multipleUpdate = multipleUpdate;
} }
public Collection<UpdateInfo<T>> searchUpdatesForRecord(final K res, final Collection<K> others, final DedupConfig dedupConfig) { public Collection<UpdateInfo<T>> searchUpdatesForRecord(final K res, final Collection<K> others,
final DedupConfig dedupConfig) {
final Map<String, UpdateInfo<T>> infoMap = new HashMap<>(); final Map<String, UpdateInfo<T>> infoMap = new HashMap<>();
@ -30,7 +31,8 @@ public abstract class UpdateMatcher<K, T> {
if (source != res) { if (source != res) {
for (final UpdateInfo<T> info : findUpdates(source, res, dedupConfig)) { for (final UpdateInfo<T> info : findUpdates(source, res, dedupConfig)) {
final String s = DigestUtils.md5Hex(info.getHighlightValueAsString()); final String s = DigestUtils.md5Hex(info.getHighlightValueAsString());
if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {} else { if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {
} else {
infoMap.put(s, info); infoMap.put(s, info);
} }
} }

View File

@ -18,9 +18,11 @@ public class EnrichMissingAbstract extends UpdateMatcher<Result, String> {
} }
@Override @Override
protected List<UpdateInfo<String>> findUpdates(final Result source, final Result target, final DedupConfig dedupConfig) { protected List<UpdateInfo<String>> findUpdates(final Result source, final Result target,
final DedupConfig dedupConfig) {
if (isMissing(target.getDescription()) && !isMissing(source.getDescription())) { if (isMissing(target.getDescription()) && !isMissing(source.getDescription())) {
return Arrays.asList(generateUpdateInfo(source.getDescription().get(0).getValue(), source, target, dedupConfig)); return Arrays
.asList(generateUpdateInfo(source.getDescription().get(0).getValue(), source, target, dedupConfig));
} }
return new ArrayList<>(); return new ArrayList<>();
} }

View File

@ -19,7 +19,8 @@ public class EnrichMissingAuthorOrcid extends UpdateMatcher<Result, Pair<String,
} }
@Override @Override
protected List<UpdateInfo<Pair<String, String>>> findUpdates(final Result source, final Result target, final DedupConfig dedupConfig) { protected List<UpdateInfo<Pair<String, String>>> findUpdates(final Result source, final Result target,
final DedupConfig dedupConfig) {
// TODO // TODO
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f)); // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList(); return Arrays.asList();

View File

@ -21,7 +21,8 @@ public class EnrichMissingOpenAccess extends UpdateMatcher<Result, Instance> {
} }
@Override @Override
protected List<UpdateInfo<Instance>> findUpdates(final Result source, final Result target, final DedupConfig dedupConfig) { protected List<UpdateInfo<Instance>> findUpdates(final Result source, final Result target,
final DedupConfig dedupConfig) {
final long count = target final long count = target
.getInstance() .getInstance()
.stream() .stream()
@ -29,7 +30,9 @@ public class EnrichMissingOpenAccess extends UpdateMatcher<Result, Instance> {
.filter(right -> right.equals(BrokerConstants.OPEN_ACCESS)) .filter(right -> right.equals(BrokerConstants.OPEN_ACCESS))
.count(); .count();
if (count > 0) { return Arrays.asList(); } if (count > 0) {
return Arrays.asList();
}
return source return source
.getInstance() .getInstance()

View File

@ -20,10 +20,13 @@ public class EnrichMissingPid extends UpdateMatcher<Result, Pid> {
} }
@Override @Override
protected List<UpdateInfo<Pid>> findUpdates(final Result source, final Result target, final DedupConfig dedupConfig) { protected List<UpdateInfo<Pid>> findUpdates(final Result source, final Result target,
final DedupConfig dedupConfig) {
final long count = target.getPid().size(); final long count = target.getPid().size();
if (count > 0) { return Arrays.asList(); } if (count > 0) {
return Arrays.asList();
}
return source return source
.getPid() .getPid()
@ -33,7 +36,8 @@ public class EnrichMissingPid extends UpdateMatcher<Result, Pid> {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
public UpdateInfo<Pid> generateUpdateInfo(final Pid highlightValue, final Result source, final Result target, final DedupConfig dedupConfig) { public UpdateInfo<Pid> generateUpdateInfo(final Pid highlightValue, final Result source, final Result target,
final DedupConfig dedupConfig) {
return new UpdateInfo<>( return new UpdateInfo<>(
Topic.ENRICH_MISSING_PID, Topic.ENRICH_MISSING_PID,
highlightValue, source, target, highlightValue, source, target,

View File

@ -18,9 +18,11 @@ public class EnrichMissingPublicationDate extends UpdateMatcher<Result, String>
} }
@Override @Override
protected List<UpdateInfo<String>> findUpdates(final Result source, final Result target, final DedupConfig dedupConfig) { protected List<UpdateInfo<String>> findUpdates(final Result source, final Result target,
final DedupConfig dedupConfig) {
if (isMissing(target.getDateofacceptance()) && !isMissing(source.getDateofacceptance())) { if (isMissing(target.getDateofacceptance()) && !isMissing(source.getDateofacceptance())) {
return Arrays.asList(generateUpdateInfo(source.getDateofacceptance().getValue(), source, target, dedupConfig)); return Arrays
.asList(generateUpdateInfo(source.getDateofacceptance().getValue(), source, target, dedupConfig));
} }
return new ArrayList<>(); return new ArrayList<>();
} }

View File

@ -23,7 +23,8 @@ public class EnrichMissingSubject extends UpdateMatcher<Result, Pair<String, Str
} }
@Override @Override
protected List<UpdateInfo<Pair<String, String>>> findUpdates(final Result source, final Result target, final DedupConfig dedupConfig) { protected List<UpdateInfo<Pair<String, String>>> findUpdates(final Result source, final Result target,
final DedupConfig dedupConfig) {
final Set<String> existingTypes = target final Set<String> existingTypes = target
.getSubject() .getSubject()
.stream() .stream()

View File

@ -21,7 +21,8 @@ public class EnrichMoreOpenAccess extends UpdateMatcher<Result, Instance> {
} }
@Override @Override
protected List<UpdateInfo<Instance>> findUpdates(final Result source, final Result target, final DedupConfig dedupConfig) { protected List<UpdateInfo<Instance>> findUpdates(final Result source, final Result target,
final DedupConfig dedupConfig) {
final Set<String> urls = target final Set<String> urls = target
.getInstance() .getInstance()
.stream() .stream()

View File

@ -20,7 +20,8 @@ public class EnrichMorePid extends UpdateMatcher<Result, Pid> {
} }
@Override @Override
protected List<UpdateInfo<Pid>> findUpdates(final Result source, final Result target, final DedupConfig dedupConfig) { protected List<UpdateInfo<Pid>> findUpdates(final Result source, final Result target,
final DedupConfig dedupConfig) {
final Set<String> existingPids = target final Set<String> existingPids = target
.getPid() .getPid()
.stream() .stream()
@ -36,7 +37,8 @@ public class EnrichMorePid extends UpdateMatcher<Result, Pid> {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
public UpdateInfo<Pid> generateUpdateInfo(final Pid highlightValue, final Result source, final Result target, final DedupConfig dedupConfig) { public UpdateInfo<Pid> generateUpdateInfo(final Pid highlightValue, final Result source, final Result target,
final DedupConfig dedupConfig) {
return new UpdateInfo<>( return new UpdateInfo<>(
Topic.ENRICH_MORE_PID, Topic.ENRICH_MORE_PID,
highlightValue, source, target, highlightValue, source, target,

View File

@ -21,7 +21,8 @@ public class EnrichMoreSubject extends UpdateMatcher<Result, Pair<String, String
} }
@Override @Override
protected List<UpdateInfo<Pair<String, String>>> findUpdates(final Result source, final Result target, final DedupConfig dedupConfig) { protected List<UpdateInfo<Pair<String, String>>> findUpdates(final Result source, final Result target,
final DedupConfig dedupConfig) {
final Set<String> existingSubjects = target final Set<String> existingSubjects = target
.getSubject() .getSubject()
.stream() .stream()

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.broker.oa.util; package eu.dnetlib.dhp.broker.oa.util;
import java.io.Serializable; import java.io.Serializable;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.broker.oa.util; package eu.dnetlib.dhp.broker.oa.util;
import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoder;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.broker.oa.util; package eu.dnetlib.dhp.broker.oa.util;
import java.io.Serializable; import java.io.Serializable;

View File

@ -1,14 +1,22 @@
package eu.dnetlib.dhp.broker.oa.util; package eu.dnetlib.dhp.broker.oa.util;
public class TrustUtils { public class TrustUtils {
public static float rescale(final double score, final double threshold) { public static float rescale(final double score, final double threshold) {
if (score >= BrokerConstants.MAX_TRUST) { return BrokerConstants.MAX_TRUST; } if (score >= BrokerConstants.MAX_TRUST) {
return BrokerConstants.MAX_TRUST;
}
final double val = (score - threshold) * (BrokerConstants.MAX_TRUST - BrokerConstants.MIN_TRUST) / (BrokerConstants.MAX_TRUST - threshold); final double val = (score - threshold) * (BrokerConstants.MAX_TRUST - BrokerConstants.MIN_TRUST)
/ (BrokerConstants.MAX_TRUST - threshold);
if (val < BrokerConstants.MIN_TRUST) { return BrokerConstants.MIN_TRUST; } if (val < BrokerConstants.MIN_TRUST) {
if (val > BrokerConstants.MAX_TRUST) { return BrokerConstants.MAX_TRUST; } return BrokerConstants.MIN_TRUST;
}
if (val > BrokerConstants.MAX_TRUST) {
return BrokerConstants.MAX_TRUST;
}
return (float) val; return (float) val;
} }

View File

@ -68,8 +68,10 @@ public final class UpdateInfo<T> {
private float calculateTrust(final DedupConfig dedupConfig, final Result r1, final Result r2) { private float calculateTrust(final DedupConfig dedupConfig, final Result r1, final Result r2) {
try { try {
final ObjectMapper objectMapper = new ObjectMapper(); final ObjectMapper objectMapper = new ObjectMapper();
final MapDocument doc1 = MapDocumentUtil.asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r1)); final MapDocument doc1 = MapDocumentUtil
final MapDocument doc2 = MapDocumentUtil.asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r2)); .asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r1));
final MapDocument doc2 = MapDocumentUtil
.asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r2));
final double score = new TreeProcessor(dedupConfig).computeScore(doc1, doc2); final double score = new TreeProcessor(dedupConfig).computeScore(doc1, doc2);
final double threshold = dedupConfig.getWf().getThreshold(); final double threshold = dedupConfig.getWf().getThreshold();
@ -118,7 +120,8 @@ public final class UpdateInfo<T> {
.map(Instance::getUrl) .map(Instance::getUrl)
.flatMap(List::stream) .flatMap(List::stream)
.findFirst() .findFirst()
.orElse(null);; .orElse(null);
;
final Provenance provenance = new Provenance().setId(provId).setRepositoryName(provRepo).setUrl(provUrl); final Provenance provenance = new Provenance().setId(provId).setRepositoryName(provRepo).setUrl(provUrl);

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.broker.oa.util; package eu.dnetlib.dhp.broker.oa.util;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;