Merge branch 'master' of https://code-repo.d4science.org/D-Net/dnet-hadoop into islookup_timeout

This commit is contained in:
Claudio Atzori 2020-06-26 14:30:07 +02:00
commit 74da8a08cf
20 changed files with 181 additions and 79 deletions

View File

@ -2,7 +2,6 @@
package eu.dnetlib.dhp.broker.model;
import java.io.Serializable;
import java.util.Map;
public class Event implements Serializable {
@ -25,7 +24,7 @@ public class Event implements Serializable {
private boolean instantMessage;
private Map<String, Object> map;
private MappedFields map;
public Event() {
}
@ -33,7 +32,7 @@ public class Event implements Serializable {
public Event(final String producerId, final String eventId, final String topic, final String payload,
final Long creationDate, final Long expiryDate,
final boolean instantMessage,
final Map<String, Object> map) {
final MappedFields map) {
this.producerId = producerId;
this.eventId = eventId;
this.topic = topic;
@ -100,11 +99,11 @@ public class Event implements Serializable {
this.instantMessage = instantMessage;
}
public Map<String, Object> getMap() {
public MappedFields getMap() {
return this.map;
}
public void setMap(final Map<String, Object> map) {
public void setMap(final MappedFields map) {
this.map = map;
}
}

View File

@ -3,9 +3,8 @@ package eu.dnetlib.dhp.broker.model;
import java.text.ParseException;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
@ -30,7 +29,7 @@ public class EventFactory {
final Event res = new Event();
final Map<String, Object> map = createMapFromResult(updateInfo);
final MappedFields map = createMapFromResult(updateInfo);
final String eventId = calculateEventId(
updateInfo.getTopicPath(), updateInfo.getTarget().getOpenaireId(), updateInfo.getHighlightValueAsString());
@ -46,35 +45,35 @@ public class EventFactory {
return res;
}
private static Map<String, Object> createMapFromResult(final UpdateInfo<?> updateInfo) {
final Map<String, Object> map = new HashMap<>();
private static MappedFields createMapFromResult(final UpdateInfo<?> updateInfo) {
final MappedFields map = new MappedFields();
final OaBrokerMainEntity source = updateInfo.getSource();
final OaBrokerMainEntity target = updateInfo.getTarget();
map.put("target_datasource_id", target.getCollectedFromId());
map.put("target_datasource_name", target.getCollectedFromName());
map.setTargetDatasourceId(target.getCollectedFromId());
map.setTargetDatasourceName(target.getCollectedFromName());
map.put("target_publication_id", target.getOpenaireId());
map.setTargetResultId(target.getOpenaireId());
final List<String> titles = target.getTitles();
if (titles.size() > 0) {
map.put("target_publication_title", titles.get(0));
map.setTargetResultTitle(titles.get(0));
}
final long date = parseDateTolong(target.getPublicationdate());
if (date > 0) {
map.put("target_dateofacceptance", date);
map.setTargetDateofacceptance(date);
}
map.put("target_publication_subject_list", target.getSubjects());
map.put("target_publication_author_list", target.getCreators());
map.setTargetSubjects(target.getSubjects().stream().map(s -> s.getValue()).collect(Collectors.toList()));
map.setTargetAuthors(target.getCreators().stream().map(a -> a.getFullname()).collect(Collectors.toList()));
// PROVENANCE INFO
map.put("trust", updateInfo.getTrust());
map.put("provenance_datasource_id", source.getCollectedFromId());
map.put("provenance_datasource_name", source.getCollectedFromName());
map.put("provenance_publication_id_list", source.getOpenaireId());
map.setTrust(updateInfo.getTrust());
map.setProvenanceDatasourceId(source.getCollectedFromId());
map.setProvenanceDatasourceName(source.getCollectedFromName());
map.setProvenanceResultId(source.getOpenaireId());
return map;
}

View File

@ -0,0 +1,114 @@
package eu.dnetlib.dhp.broker.model;
import java.io.Serializable;
import java.util.List;
public class MappedFields implements Serializable {
/**
*
*/
private static final long serialVersionUID = -7999704113195802008L;
private String targetDatasourceId;
private String targetDatasourceName;
private String targetResultId;
private String targetResultTitle;
private long targetDateofacceptance;
private List<String> targetSubjects;
private List<String> targetAuthors;
private float trust;
private String provenanceDatasourceId;
private String provenanceDatasourceName;
private String provenanceResultId;
public String getTargetDatasourceId() {
return targetDatasourceId;
}
public void setTargetDatasourceId(final String targetDatasourceId) {
this.targetDatasourceId = targetDatasourceId;
}
public String getTargetDatasourceName() {
return targetDatasourceName;
}
public void setTargetDatasourceName(final String targetDatasourceName) {
this.targetDatasourceName = targetDatasourceName;
}
public String getTargetResultId() {
return targetResultId;
}
public void setTargetResultId(final String targetResultId) {
this.targetResultId = targetResultId;
}
public String getTargetResultTitle() {
return targetResultTitle;
}
public void setTargetResultTitle(final String targetResultTitle) {
this.targetResultTitle = targetResultTitle;
}
public long getTargetDateofacceptance() {
return targetDateofacceptance;
}
public void setTargetDateofacceptance(final long targetDateofacceptance) {
this.targetDateofacceptance = targetDateofacceptance;
}
public List<String> getTargetSubjects() {
return targetSubjects;
}
public void setTargetSubjects(final List<String> targetSubjects) {
this.targetSubjects = targetSubjects;
}
public List<String> getTargetAuthors() {
return targetAuthors;
}
public void setTargetAuthors(final List<String> targetAuthors) {
this.targetAuthors = targetAuthors;
}
public float getTrust() {
return trust;
}
public void setTrust(final float trust) {
this.trust = trust;
}
public String getProvenanceDatasourceId() {
return provenanceDatasourceId;
}
public void setProvenanceDatasourceId(final String provenanceDatasourceId) {
this.provenanceDatasourceId = provenanceDatasourceId;
}
public String getProvenanceDatasourceName() {
return provenanceDatasourceName;
}
public void setProvenanceDatasourceName(final String provenanceDatasourceName) {
this.provenanceDatasourceName = provenanceDatasourceName;
}
public String getProvenanceResultId() {
return provenanceResultId;
}
public void setProvenanceResultId(final String provenanceResultId) {
this.provenanceResultId = provenanceResultId;
}
}

View File

@ -1,13 +1,14 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
@ -19,15 +20,15 @@ import eu.dnetlib.pace.config.DedupConfig;
public abstract class UpdateMatcher<T> {
private final boolean multipleUpdate;
private final int maxNumber;
private final Function<T, Topic> topicFunction;
private final BiConsumer<OaBrokerMainEntity, T> compileHighlightFunction;
private final Function<T, String> highlightToStringFunction;
public UpdateMatcher(final boolean multipleUpdate, final Function<T, Topic> topicFunction,
public UpdateMatcher(final int maxNumber, final Function<T, Topic> topicFunction,
final BiConsumer<OaBrokerMainEntity, T> compileHighlightFunction,
final Function<T, String> highlightToStringFunction) {
this.multipleUpdate = multipleUpdate;
this.maxNumber = maxNumber;
this.topicFunction = topicFunction;
this.compileHighlightFunction = compileHighlightFunction;
this.highlightToStringFunction = highlightToStringFunction;
@ -57,17 +58,19 @@ public abstract class UpdateMatcher<T> {
}
}
final Collection<UpdateInfo<T>> values = infoMap.values();
if (values.isEmpty() || multipleUpdate) {
return values;
} else {
final UpdateInfo<T> v = values
final List<UpdateInfo<T>> values = infoMap
.values()
.stream()
.sorted((o1, o2) -> Float.compare(o1.getTrust(), o2.getTrust()))
.findFirst()
.get();
return Arrays.asList(v);
.sorted((o1, o2) -> Float.compare(o2.getTrust(), o1.getTrust())) // DESCENDING
.collect(Collectors.toList());
if (values.isEmpty()) {
return new ArrayList<>();
} else if (values.size() > maxNumber) {
System.err.println("Too many events (" + values.size() + ") matched by " + getClass().getSimpleName());
return values.subList(0, maxNumber);
} else {
return values;
}
}
@ -81,8 +84,8 @@ public abstract class UpdateMatcher<T> {
return StringUtils.isBlank(field);
}
public boolean isMultipleUpdate() {
return multipleUpdate;
public int getMaxNumber() {
return maxNumber;
}
public Function<T, Topic> getTopicFunction() {

View File

@ -13,7 +13,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public abstract class AbstractEnrichMissingDataset extends UpdateMatcher<OaBrokerRelatedDataset> {
public AbstractEnrichMissingDataset(final Topic topic) {
super(true,
super(10,
rel -> topic,
(p, rel) -> p.getDatasets().add(rel),
rel -> rel.getOpenaireId());

View File

@ -12,7 +12,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMissingProject extends UpdateMatcher<OaBrokerProject> {
public EnrichMissingProject() {
super(true,
super(20,
prj -> Topic.ENRICH_MISSING_PROJECT,
(p, prj) -> p.getProjects().add(prj),
prj -> prj.getFunder() + "::" + prj.getFundingProgram() + prj.getCode());

View File

@ -13,7 +13,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMoreProject extends UpdateMatcher<OaBrokerProject> {
public EnrichMoreProject() {
super(true,
super(20,
prj -> Topic.ENRICH_MORE_PROJECT,
(p, prj) -> p.getProjects().add(prj),
prj -> projectAsString(prj));

View File

@ -13,7 +13,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public abstract class AbstractEnrichMissingPublication extends UpdateMatcher<OaBrokerRelatedPublication> {
public AbstractEnrichMissingPublication(final Topic topic) {
super(true,
super(10,
rel -> topic,
(p, rel) -> p.getPublications().add(rel),
rel -> rel.getOpenaireId());

View File

@ -13,7 +13,7 @@ public class EnrichMissingSoftware
extends UpdateMatcher<OaBrokerRelatedSoftware> {
public EnrichMissingSoftware() {
super(true,
super(10,
s -> Topic.ENRICH_MISSING_SOFTWARE,
(p, s) -> p.getSoftwares().add(s),
s -> s.getOpenaireId());

View File

@ -13,7 +13,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMoreSoftware extends UpdateMatcher<OaBrokerRelatedSoftware> {
public EnrichMoreSoftware() {
super(true,
super(10,
s -> Topic.ENRICH_MORE_SOFTWARE,
(p, s) -> p.getSoftwares().add(s),
s -> s.getOpenaireId());

View File

@ -12,7 +12,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMissingAbstract extends UpdateMatcher<String> {
public EnrichMissingAbstract() {
super(false,
super(1,
s -> Topic.ENRICH_MISSING_ABSTRACT,
(p, s) -> p.getAbstracts().add(s),
s -> s);

View File

@ -15,7 +15,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMissingAuthorOrcid extends UpdateMatcher<OaBrokerAuthor> {
public EnrichMissingAuthorOrcid() {
super(true,
super(40,
aut -> Topic.ENRICH_MISSING_AUTHOR_ORCID,
(p, aut) -> p.getCreators().add(aut),
aut -> aut.getOrcid());

View File

@ -14,7 +14,7 @@ import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
public class EnrichMissingOpenAccess extends UpdateMatcher<OaBrokerInstance> {
public EnrichMissingOpenAccess() {
super(true,
super(20,
i -> Topic.ENRICH_MISSING_OA_VERSION,
(p, i) -> p.getInstances().add(i),
OaBrokerInstance::getUrl);

View File

@ -13,7 +13,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMissingPid extends UpdateMatcher<OaBrokerTypedValue> {
public EnrichMissingPid() {
super(true,
super(10,
pid -> Topic.ENRICH_MISSING_PID,
(p, pid) -> p.getPids().add(pid),
pid -> pid.getType() + "::" + pid.getValue());

View File

@ -12,7 +12,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMissingPublicationDate extends UpdateMatcher<String> {
public EnrichMissingPublicationDate() {
super(false,
super(1,
date -> Topic.ENRICH_MISSING_PUBLICATION_DATE,
(p, date) -> p.setPublicationdate(date),
s -> s);

View File

@ -13,7 +13,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMissingSubject extends UpdateMatcher<OaBrokerTypedValue> {
public EnrichMissingSubject() {
super(true,
super(20,
s -> Topic.fromPath("ENRICH/MISSING/SUBJECT/" + s.getType()),
(p, s) -> p.getSubjects().add(s),
s -> subjectAsString(s));

View File

@ -14,7 +14,7 @@ import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
public class EnrichMoreOpenAccess extends UpdateMatcher<OaBrokerInstance> {
public EnrichMoreOpenAccess() {
super(true,
super(20,
i -> Topic.ENRICH_MORE_OA_VERSION,
(p, i) -> p.getInstances().add(i),
OaBrokerInstance::getUrl);

View File

@ -13,7 +13,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMorePid extends UpdateMatcher<OaBrokerTypedValue> {
public EnrichMorePid() {
super(true,
super(20,
pid -> Topic.ENRICH_MORE_PID,
(p, pid) -> p.getPids().add(pid),
pid -> pidAsString(pid));

View File

@ -13,7 +13,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMoreSubject extends UpdateMatcher<OaBrokerTypedValue> {
public EnrichMoreSubject() {
super(true,
super(20,
s -> Topic.fromPath("ENRICH/MORE/SUBJECT/" + s.getType()),
(p, s) -> p.getSubjects().add(s),
s -> subjectAsString(s));

View File

@ -7,20 +7,7 @@ import java.util.List;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.EventFactory;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy;
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsRelatedTo;
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedBy;
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedTo;
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetReferences;
import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMissingProject;
import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMoreProject;
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsReferencedBy;
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsRelatedTo;
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedBy;
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedTo;
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationReferences;
import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMissingSoftware;
import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMoreSoftware;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAbstract;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAuthorOrcid;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingOpenAccess;
@ -47,22 +34,22 @@ public class EventFinder {
matchers.add(new EnrichMorePid());
matchers.add(new EnrichMoreSubject());
// Advanced matchers
// // Advanced matchers
matchers.add(new EnrichMissingProject());
matchers.add(new EnrichMoreProject());
matchers.add(new EnrichMissingSoftware());
matchers.add(new EnrichMoreSoftware());
matchers.add(new EnrichMissingPublicationIsRelatedTo());
matchers.add(new EnrichMissingPublicationIsReferencedBy());
matchers.add(new EnrichMissingPublicationReferences());
matchers.add(new EnrichMissingPublicationIsSupplementedTo());
matchers.add(new EnrichMissingPublicationIsSupplementedBy());
matchers.add(new EnrichMissingDatasetIsRelatedTo());
matchers.add(new EnrichMissingDatasetIsReferencedBy());
matchers.add(new EnrichMissingDatasetReferences());
matchers.add(new EnrichMissingDatasetIsSupplementedTo());
matchers.add(new EnrichMissingDatasetIsSupplementedBy());
matchers.add(new EnrichMissingAbstract());
// matchers.add(new EnrichMoreProject());
// matchers.add(new EnrichMissingSoftware());
// matchers.add(new EnrichMoreSoftware());
// matchers.add(new EnrichMissingPublicationIsRelatedTo());
// matchers.add(new EnrichMissingPublicationIsReferencedBy());
// matchers.add(new EnrichMissingPublicationReferences());
// matchers.add(new EnrichMissingPublicationIsSupplementedTo());
// matchers.add(new EnrichMissingPublicationIsSupplementedBy());
// matchers.add(new EnrichMissingDatasetIsRelatedTo());
// matchers.add(new EnrichMissingDatasetIsReferencedBy());
// matchers.add(new EnrichMissingDatasetReferences());
// matchers.add(new EnrichMissingDatasetIsSupplementedTo());
// matchers.add(new EnrichMissingDatasetIsSupplementedBy());
// matchers.add(new EnrichMissingAbstract());
}
public static EventGroup generateEvents(final ResultGroup results, final DedupConfig dedupConfig) {