Partial implementation of broker events

This commit is contained in:
Michele Artini 2020-05-07 12:31:26 +02:00
parent 17860d3ab6
commit ac0da5a7ee
14 changed files with 687 additions and 0 deletions

View File

@ -0,0 +1,101 @@
package eu.dnetlib.dhp.broker.model;
import java.util.Map;
public class Event {
private String eventId;
private String producerId;
private String topic;
private String payload;
private Long creationDate;
private Long expiryDate;
private boolean instantMessage;
private Map<String, Object> map;
public Event() {}
public Event(final String producerId, final String eventId, final String topic, final String payload, final Long creationDate, final Long expiryDate,
final boolean instantMessage,
final Map<String, Object> map) {
this.producerId = producerId;
this.eventId = eventId;
this.topic = topic;
this.payload = payload;
this.creationDate = creationDate;
this.expiryDate = expiryDate;
this.instantMessage = instantMessage;
this.map = map;
}
public String getProducerId() {
return this.producerId;
}
public void setProducerId(final String producerId) {
this.producerId = producerId;
}
public String getEventId() {
return this.eventId;
}
public void setEventId(final String eventId) {
this.eventId = eventId;
}
public String getTopic() {
return this.topic;
}
public void setTopic(final String topic) {
this.topic = topic;
}
public String getPayload() {
return this.payload;
}
public void setPayload(final String payload) {
this.payload = payload;
}
public Long getCreationDate() {
return this.creationDate;
}
public void setCreationDate(final Long creationDate) {
this.creationDate = creationDate;
}
public Long getExpiryDate() {
return this.expiryDate;
}
public void setExpiryDate(final Long expiryDate) {
this.expiryDate = expiryDate;
}
public boolean isInstantMessage() {
return this.instantMessage;
}
public void setInstantMessage(final boolean instantMessage) {
this.instantMessage = instantMessage;
}
public Map<String, Object> getMap() {
return this.map;
}
public void setMap(final Map<String, Object> map) {
this.map = map;
}
}

View File

@ -0,0 +1,130 @@
package eu.dnetlib.dhp.broker.model;
import java.text.ParseException;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class EventFactory {
private final static String PRODUCER_ID = "OpenAIRE";
private static final int TTH_DAYS = 365;
private final static String[] DATE_PATTERNS = {
"yyyy-MM-dd"
};
public static Event newBrokerEvent(final Result source, final Result target, final UpdateInfo<?> updateInfo) {
final long now = new Date().getTime();
final Event res = new Event();
final Map<String, Object> map = createMapFromResult(target, source, updateInfo);
final String payload = createPayload(target, updateInfo);
final String eventId =
calculateEventId(updateInfo.getTopic(), target.getOriginalId().get(0), updateInfo.getHighlightValueAsString());
res.setEventId(eventId);
res.setProducerId(PRODUCER_ID);
res.setPayload(payload);
res.setMap(map);
res.setTopic(updateInfo.getTopic());
res.setCreationDate(now);
res.setExpiryDate(calculateExpiryDate(now));
res.setInstantMessage(false);
return res;
}
private static String createPayload(final Result result, final UpdateInfo<?> updateInfo) {
final OpenAireEventPayload payload = new OpenAireEventPayload();
// TODO
updateInfo.compileHighlight(payload);
return payload.toJSON();
}
private static Map<String, Object> createMapFromResult(final Result oaf, final Result source, final UpdateInfo<?> updateInfo) {
final Map<String, Object> map = new HashMap<>();
final List<KeyValue> collectedFrom = oaf.getCollectedfrom();
if (collectedFrom.size() == 1) {
map.put("target_datasource_id", collectedFrom.get(0).getKey());
map.put("target_datasource_name", collectedFrom.get(0).getValue());
}
final List<String> ids = oaf.getOriginalId();
if (ids.size() > 0) {
map.put("target_publication_id", ids.get(0));
}
final List<StructuredProperty> titles = oaf.getTitle();
if (titles.size() > 0) {
map.put("target_publication_title", titles.get(0));
}
final long date = parseDateTolong(oaf.getDateofacceptance().getValue());
if (date > 0) {
map.put("target_dateofacceptance", date);
}
final List<StructuredProperty> subjects = oaf.getSubject();
if (subjects.size() > 0) {
map.put("target_publication_subject_list", subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList()));
}
final List<Author> authors = oaf.getAuthor();
if (authors.size() > 0) {
map.put("target_publication_author_list", authors.stream().map(Author::getFullname).collect(Collectors.toList()));
}
// PROVENANCE INFO
map.put("trust", updateInfo.getTrust());
final List<KeyValue> sourceCollectedFrom = source.getCollectedfrom();
if (sourceCollectedFrom.size() == 1) {
map.put("provenance_datasource_id", sourceCollectedFrom.get(0).getKey());
map.put("provenance_datasource_name", sourceCollectedFrom.get(0).getValue());
}
map.put("provenance_publication_id_list", source.getOriginalId());
return map;
}
private static String calculateEventId(final String topic, final String publicationId, final String value) {
return "event-"
+ DigestUtils.md5Hex(topic).substring(0, 6) + "-"
+ DigestUtils.md5Hex(publicationId).substring(0, 8) + "-"
+ DigestUtils.md5Hex(value).substring(0, 8);
}
private static long calculateExpiryDate(final long now) {
return now + TTH_DAYS * 24 * 60 * 60 * 1000;
}
private static long parseDateTolong(final String date) {
if (StringUtils.isBlank(date)) { return -1; }
try {
return DateUtils.parseDate(date, DATE_PATTERNS).getTime();
} catch (final ParseException e) {
return -1;
}
}
}

View File

@ -0,0 +1,106 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.model.EventFactory;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingAbstract;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingAuthorOrcid;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingOpenAccess;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingPid;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingProject;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingPublicationDate;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingSubject;
import eu.dnetlib.dhp.broker.oa.util.EnrichMoreOpenAccess;
import eu.dnetlib.dhp.broker.oa.util.EnrichMorePid;
import eu.dnetlib.dhp.broker.oa.util.EnrichMoreSubject;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Result;
public class GenerateEventsApplication {
private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils.toString(GenerateEventsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String eventsPath = parser.get("eventsPath");
log.info("eventsPath: {}", eventsPath);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
removeOutputDir(spark, eventsPath);
generateEvents(spark, graphPath, eventsPath);
});
}
private static void removeOutputDir(final SparkSession spark, final String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
private static void generateEvents(final SparkSession spark, final String graphPath, final String eventsPath) {
// TODO
}
private List<Event> generateEvents(final Result... children) {
final List<Event> list = new ArrayList<>();
for (final Result source : children) {
for (final Result target : children) {
if (source != target) {
list.addAll(findUpdates(source, target).stream()
.map(info -> EventFactory.newBrokerEvent(source, target, info))
.collect(Collectors.toList()));
}
}
}
return list;
}
private List<UpdateInfo<?>> findUpdates(final Result source, final Result target) {
final List<UpdateInfo<?>> list = new ArrayList<>();
list.addAll(EnrichMissingAbstract.findUpdates(source, target));
list.addAll(EnrichMissingAuthorOrcid.findUpdates(source, target));
list.addAll(EnrichMissingOpenAccess.findUpdates(source, target));
list.addAll(EnrichMissingPid.findUpdates(source, target));
list.addAll(EnrichMissingProject.findUpdates(source, target));
list.addAll(EnrichMissingPublicationDate.findUpdates(source, target));
list.addAll(EnrichMissingSubject.findUpdates(source, target));
list.addAll(EnrichMoreOpenAccess.findUpdates(source, target));
list.addAll(EnrichMorePid.findUpdates(source, target));
list.addAll(EnrichMoreSubject.findUpdates(source, target));
return list;
}
}

View File

@ -0,0 +1,30 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingAbstract extends UpdateInfo<String> {
public static List<EnrichMissingAbstract> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
private EnrichMissingAbstract(final String highlightValue, final float trust) {
super("ENRICH/MISSING/ABSTRACT", highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
payload.getHighlight().getAbstracts().add(getHighlightValue());
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue();
}
}

View File

@ -0,0 +1,30 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingAuthorOrcid extends UpdateInfo<String> {
public static List<EnrichMissingAuthorOrcid> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
private EnrichMissingAuthorOrcid(final String highlightValue, final float trust) {
super("ENRICH/MISSING/AUTHOR/ORCID", highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
// TODO
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue();
}
}

View File

@ -0,0 +1,31 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.Instance;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingOpenAccess extends UpdateInfo<Instance> {
public static List<EnrichMissingOpenAccess> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
private EnrichMissingOpenAccess(final Instance highlightValue, final float trust) {
super("ENRICH/MISSING/OPENACCESS_VERSION", highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
payload.getHighlight().getInstances().add(getHighlightValue());
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue().getUrl();
}
}

View File

@ -0,0 +1,31 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.broker.objects.Pid;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingPid extends UpdateInfo<Pid> {
public static List<EnrichMissingPid> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
private EnrichMissingPid(final Pid highlightValue, final float trust) {
super("ENRICH/MISSING/PID", highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
payload.getHighlight().getPids().add(getHighlightValue());
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue().getType() + "::" + getHighlightValue().getValue();
}
}

View File

@ -0,0 +1,31 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.broker.objects.Project;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingProject extends UpdateInfo<Project> {
public static List<EnrichMissingProject> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
private EnrichMissingProject(final Project highlightValue, final float trust) {
super("ENRICH/MISSING/PROJECT", highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
payload.getHighlight().getProjects().add(getHighlightValue());
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue().getFunder() + "::" + getHighlightValue().getFundingProgram() + getHighlightValue().getCode();
}
}

View File

@ -0,0 +1,30 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingPublicationDate extends UpdateInfo<String> {
public static List<EnrichMissingPublicationDate> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
private EnrichMissingPublicationDate(final String highlightValue, final float trust) {
super("ENRICH/MISSING/PUBLICATION_DATE", highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
payload.getHighlight().setPublicationdate(getHighlightValue());
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue();
}
}

View File

@ -0,0 +1,35 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingSubject extends UpdateInfo<String> {
public static List<EnrichMissingSubject> findUpdates(final Result source, final Result target) {
// MESHEUROPMC
// ARXIV
// JEL
// DDC
// ACM
return Arrays.asList();
}
private EnrichMissingSubject(final String subjectClassification, final String highlightValue, final float trust) {
super("ENRICH/MISSING/SUBJECT/" + subjectClassification, highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
payload.getHighlight().getSubjects().add(getHighlightValue());
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue();
}
}

View File

@ -0,0 +1,31 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.Instance;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMoreOpenAccess extends UpdateInfo<Instance> {
public static List<EnrichMoreOpenAccess> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
private EnrichMoreOpenAccess(final Instance highlightValue, final float trust) {
super("ENRICH/MORE/OPENACCESS_VERSION", highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
payload.getHighlight().getInstances().add(getHighlightValue());
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue().getUrl();
}
}

View File

@ -0,0 +1,31 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.broker.objects.Pid;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMorePid extends UpdateInfo<Pid> {
public static List<EnrichMorePid> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
private EnrichMorePid(final Pid highlightValue, final float trust) {
super("ENRICH/MORE/PID", highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
payload.getHighlight().getPids().add(getHighlightValue());
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue().getType() + "::" + getHighlightValue().getValue();
}
}

View File

@ -0,0 +1,35 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMoreSubject extends UpdateInfo<String> {
public static List<EnrichMoreSubject> findUpdates(final Result source, final Result target) {
// MESHEUROPMC
// ARXIV
// JEL
// DDC
// ACM
return Arrays.asList();
}
private EnrichMoreSubject(final String subjectClassification, final String highlightValue, final float trust) {
super("ENRICH/MORE/SUBJECT/" + subjectClassification, highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
payload.getHighlight().getSubjects().add(getHighlightValue());
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue();
}
}

View File

@ -0,0 +1,35 @@
package eu.dnetlib.dhp.broker.oa.util;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
public abstract class UpdateInfo<T> {
private final String topic;
private final T highlightValue;
private final float trust;
protected UpdateInfo(final String topic, final T highlightValue, final float trust) {
this.topic = topic;
this.highlightValue = highlightValue;
this.trust = trust;
}
public T getHighlightValue() {
return highlightValue;
}
public float getTrust() {
return trust;
}
public String getTopic() {
return topic;
}
abstract public void compileHighlight(OpenAireEventPayload payload);
abstract public String getHighlightValueAsString();
}