enrichment steps #38
|
@ -57,7 +57,7 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>eu.dnetlib</groupId>
|
<groupId>eu.dnetlib</groupId>
|
||||||
<artifactId>dnet-openaire-broker-common</artifactId>
|
<artifactId>dnet-openaire-broker-common</artifactId>
|
||||||
<version>[2.0.0,3.0.0)</version>
|
<version>[2.0.1,3.0.0)</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
|
@ -12,7 +12,6 @@ import org.apache.commons.codec.digest.DigestUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.time.DateUtils;
|
import org.apache.commons.lang3.time.DateUtils;
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.OpenAireEventPayload;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||||
|
@ -37,15 +36,12 @@ public class EventFactory {
|
||||||
|
|
||||||
final Map<String, Object> map = createMapFromResult(updateInfo);
|
final Map<String, Object> map = createMapFromResult(updateInfo);
|
||||||
|
|
||||||
final String payload = createPayload(updateInfo);
|
final String eventId =
|
||||||
|
calculateEventId(updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0), updateInfo.getHighlightValueAsString());
|
||||||
final String eventId = calculateEventId(
|
|
||||||
updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0),
|
|
||||||
updateInfo.getHighlightValueAsString());
|
|
||||||
|
|
||||||
res.setEventId(eventId);
|
res.setEventId(eventId);
|
||||||
res.setProducerId(PRODUCER_ID);
|
res.setProducerId(PRODUCER_ID);
|
||||||
res.setPayload(payload);
|
res.setPayload(updateInfo.asBrokerPayload().toJSON());
|
||||||
res.setMap(map);
|
res.setMap(map);
|
||||||
res.setTopic(updateInfo.getTopicPath());
|
res.setTopic(updateInfo.getTopicPath());
|
||||||
res.setCreationDate(now);
|
res.setCreationDate(now);
|
||||||
|
@ -54,15 +50,6 @@ public class EventFactory {
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String createPayload(final UpdateInfo<?> updateInfo) {
|
|
||||||
final OpenAireEventPayload payload = new OpenAireEventPayload();
|
|
||||||
// TODO
|
|
||||||
|
|
||||||
updateInfo.compileHighlight(payload);
|
|
||||||
|
|
||||||
return payload.toJSON();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Map<String, Object> createMapFromResult(final UpdateInfo<?> updateInfo) {
|
private static Map<String, Object> createMapFromResult(final UpdateInfo<?> updateInfo) {
|
||||||
final Map<String, Object> map = new HashMap<>();
|
final Map<String, Object> map = new HashMap<>();
|
||||||
|
|
||||||
|
@ -93,17 +80,13 @@ public class EventFactory {
|
||||||
final List<StructuredProperty> subjects = target.getSubject();
|
final List<StructuredProperty> subjects = target.getSubject();
|
||||||
if (subjects.size() > 0) {
|
if (subjects.size() > 0) {
|
||||||
map
|
map
|
||||||
.put(
|
.put("target_publication_subject_list", subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList()));
|
||||||
"target_publication_subject_list",
|
|
||||||
subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final List<Author> authors = target.getAuthor();
|
final List<Author> authors = target.getAuthor();
|
||||||
if (authors.size() > 0) {
|
if (authors.size() > 0) {
|
||||||
map
|
map
|
||||||
.put(
|
.put("target_publication_author_list", authors.stream().map(Author::getFullname).collect(Collectors.toList()));
|
||||||
"target_publication_author_list",
|
|
||||||
authors.stream().map(Author::getFullname).collect(Collectors.toList()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// PROVENANCE INFO
|
// PROVENANCE INFO
|
||||||
|
@ -130,9 +113,7 @@ public class EventFactory {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long parseDateTolong(final String date) {
|
private static long parseDateTolong(final String date) {
|
||||||
if (StringUtils.isBlank(date)) {
|
if (StringUtils.isBlank(date)) { return -1; }
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
return DateUtils.parseDate(date, DATE_PATTERNS).getTime();
|
return DateUtils.parseDate(date, DATE_PATTERNS).getTime();
|
||||||
} catch (final ParseException e) {
|
} catch (final ParseException e) {
|
||||||
|
|
|
@ -43,20 +43,21 @@ import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublic
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedBy;
|
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedBy;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedTo;
|
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedTo;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationReferences;
|
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationReferences;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMissingSoftware;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMoreSoftware;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAbstract;
|
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAbstract;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAuthorOrcid;
|
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAuthorOrcid;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingOpenAccess;
|
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingOpenAccess;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPid;
|
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPid;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate;
|
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSoftware;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSubject;
|
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSubject;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess;
|
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid;
|
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSoftware;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject;
|
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
@ -177,14 +178,6 @@ public class GenerateEventsApplication {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <SRC extends Result, TRG extends Result> JavaRDD<Event> generateRelationEvents(final SparkSession spark,
|
|
||||||
final String graphPath,
|
|
||||||
final Class<SRC> sourceClass,
|
|
||||||
final Class<TRG> targetClass) {
|
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<Event> generateSimpleEvents(final Collection<Result> children) {
|
private List<Event> generateSimpleEvents(final Collection<Result> children) {
|
||||||
final List<UpdateInfo<?>> list = new ArrayList<>();
|
final List<UpdateInfo<?>> list = new ArrayList<>();
|
||||||
|
|
||||||
|
@ -203,6 +196,35 @@ public class GenerateEventsApplication {
|
||||||
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
|
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static <SRC extends Result, TRG extends OafEntity> JavaRDD<Event> generateRelationEvents(final SparkSession spark,
|
||||||
|
final String graphPath,
|
||||||
|
final Class<SRC> sourceClass,
|
||||||
|
final Class<TRG> targetClass) {
|
||||||
|
|
||||||
|
final Dataset<SRC> sources = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
|
||||||
|
.filter(r -> r.getDataInfo().getDeletedbyinference());
|
||||||
|
|
||||||
|
final Dataset<TRG> targets = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), targetClass);
|
||||||
|
|
||||||
|
final Dataset<Relation> mergedRels = readPath(spark, graphPath + "/relation", Relation.class)
|
||||||
|
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
|
||||||
|
|
||||||
|
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
|
||||||
|
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
|
||||||
|
|
||||||
|
if (targetClass == Project.class) {
|
||||||
|
// TODO join using: generateProjectsEvents
|
||||||
|
} else if (targetClass == Software.class) {
|
||||||
|
// TODO join using: generateSoftwareEvents
|
||||||
|
} else if (targetClass == Publication.class) {
|
||||||
|
// TODO join using: generatePublicationRelatedEvents
|
||||||
|
} else if (targetClass == eu.dnetlib.dhp.schema.oaf.Dataset.class) {
|
||||||
|
// TODO join using: generateDatasetRelatedEvents
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
private List<Event> generateProjectsEvents(final Collection<Pair<Result, List<Project>>> childrenWithProjects) {
|
private List<Event> generateProjectsEvents(final Collection<Pair<Result, List<Project>>> childrenWithProjects) {
|
||||||
final List<UpdateInfo<?>> list = new ArrayList<>();
|
final List<UpdateInfo<?>> list = new ArrayList<>();
|
||||||
|
|
||||||
|
|
|
@ -3,11 +3,13 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.broker.model.Topic;
|
import eu.dnetlib.dhp.broker.model.Topic;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
@ -22,8 +24,16 @@ public class EnrichMissingProject
|
||||||
@Override
|
@Override
|
||||||
protected List<UpdateInfo<eu.dnetlib.broker.objects.Project>> findUpdates(final Pair<Result, List<Project>> source,
|
protected List<UpdateInfo<eu.dnetlib.broker.objects.Project>> findUpdates(final Pair<Result, List<Project>> source,
|
||||||
final Pair<Result, List<Project>> target) {
|
final Pair<Result, List<Project>> target) {
|
||||||
// TODO
|
|
||||||
return Arrays.asList();
|
if (source.getRight().isEmpty()) {
|
||||||
|
return Arrays.asList();
|
||||||
|
} else {
|
||||||
|
return target.getRight()
|
||||||
|
.stream()
|
||||||
|
.map(ConversionUtils::oafProjectToBrokerProject)
|
||||||
|
.map(p -> generateUpdateInfo(p, source, target))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1,13 +1,15 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects;
|
package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.broker.model.Topic;
|
import eu.dnetlib.dhp.broker.model.Topic;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
@ -21,8 +23,18 @@ public class EnrichMoreProject extends UpdateMatcher<Pair<Result, List<Project>>
|
||||||
@Override
|
@Override
|
||||||
protected List<UpdateInfo<eu.dnetlib.broker.objects.Project>> findUpdates(final Pair<Result, List<Project>> source,
|
protected List<UpdateInfo<eu.dnetlib.broker.objects.Project>> findUpdates(final Pair<Result, List<Project>> source,
|
||||||
final Pair<Result, List<Project>> target) {
|
final Pair<Result, List<Project>> target) {
|
||||||
// TODO
|
|
||||||
return Arrays.asList();
|
final Set<String> existingProjects = source.getRight()
|
||||||
|
.stream()
|
||||||
|
.map(Project::getId)
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
|
return target.getRight()
|
||||||
|
.stream()
|
||||||
|
.filter(p -> !existingProjects.contains(p.getId()))
|
||||||
|
.map(ConversionUtils::oafProjectToBrokerProject)
|
||||||
|
.map(p -> generateUpdateInfo(p, source, target))
|
||||||
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -39,7 +39,7 @@ public abstract class AbstractEnrichMissingPublication
|
||||||
.getRight()
|
.getRight()
|
||||||
.stream()
|
.stream()
|
||||||
.filter(d -> !existingPublications.contains(d.getId()))
|
.filter(d -> !existingPublications.contains(d.getId()))
|
||||||
.map(ConversionUtils::oafPublicationToBrokerPublication)
|
.map(ConversionUtils::oafResultToBrokerPublication)
|
||||||
.map(i -> generateUpdateInfo(i, source, target))
|
.map(i -> generateUpdateInfo(i, source, target))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
|
|
@ -1,13 +1,15 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.broker.oa.matchers.simple;
|
package eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.broker.model.Topic;
|
import eu.dnetlib.dhp.broker.model.Topic;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||||
|
@ -23,8 +25,16 @@ public class EnrichMissingSoftware
|
||||||
protected List<UpdateInfo<eu.dnetlib.broker.objects.Software>> findUpdates(
|
protected List<UpdateInfo<eu.dnetlib.broker.objects.Software>> findUpdates(
|
||||||
final Pair<Result, List<Software>> source,
|
final Pair<Result, List<Software>> source,
|
||||||
final Pair<Result, List<Software>> target) {
|
final Pair<Result, List<Software>> target) {
|
||||||
// TODO
|
|
||||||
return Arrays.asList();
|
if (source.getRight().isEmpty()) {
|
||||||
|
return Arrays.asList();
|
||||||
|
} else {
|
||||||
|
return target.getRight()
|
||||||
|
.stream()
|
||||||
|
.map(ConversionUtils::oafSoftwareToBrokerSoftware)
|
||||||
|
.map(p -> generateUpdateInfo(p, source, target))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
|
@ -1,13 +1,15 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.broker.oa.matchers.simple;
|
package eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.broker.model.Topic;
|
import eu.dnetlib.dhp.broker.model.Topic;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||||
|
@ -23,8 +25,18 @@ public class EnrichMoreSoftware
|
||||||
protected List<UpdateInfo<eu.dnetlib.broker.objects.Software>> findUpdates(
|
protected List<UpdateInfo<eu.dnetlib.broker.objects.Software>> findUpdates(
|
||||||
final Pair<Result, List<Software>> source,
|
final Pair<Result, List<Software>> source,
|
||||||
final Pair<Result, List<Software>> target) {
|
final Pair<Result, List<Software>> target) {
|
||||||
// TODO
|
|
||||||
return Arrays.asList();
|
final Set<String> existingSoftwares = source.getRight()
|
||||||
|
.stream()
|
||||||
|
.map(Software::getId)
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
|
return target.getRight()
|
||||||
|
.stream()
|
||||||
|
.filter(p -> !existingSoftwares.contains(p.getId()))
|
||||||
|
.map(ConversionUtils::oafSoftwareToBrokerSoftware)
|
||||||
|
.map(p -> generateUpdateInfo(p, source, target))
|
||||||
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
|
@ -28,16 +28,14 @@ public class EnrichMissingOpenAccess extends UpdateMatcher<Result, Instance> {
|
||||||
.filter(right -> right.equals(BrokerConstants.OPEN_ACCESS))
|
.filter(right -> right.equals(BrokerConstants.OPEN_ACCESS))
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
if (count > 0) {
|
if (count > 0) { return Arrays.asList(); }
|
||||||
return Arrays.asList();
|
|
||||||
}
|
|
||||||
|
|
||||||
return source
|
return source
|
||||||
.getInstance()
|
.getInstance()
|
||||||
.stream()
|
.stream()
|
||||||
.filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
|
.filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
|
||||||
.map(ConversionUtils::oafInstanceToBrokerInstances)
|
.map(ConversionUtils::oafInstanceToBrokerInstances)
|
||||||
.flatMap(s -> s)
|
.flatMap(List::stream)
|
||||||
.map(i -> generateUpdateInfo(i, source, target))
|
.map(i -> generateUpdateInfo(i, source, target))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ public class EnrichMoreOpenAccess extends UpdateMatcher<Result, Instance> {
|
||||||
.stream()
|
.stream()
|
||||||
.filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
|
.filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
|
||||||
.map(ConversionUtils::oafInstanceToBrokerInstances)
|
.map(ConversionUtils::oafInstanceToBrokerInstances)
|
||||||
.flatMap(s -> s)
|
.flatMap(List::stream)
|
||||||
.filter(i -> !urls.contains(i.getUrl()))
|
.filter(i -> !urls.contains(i.getUrl()))
|
||||||
.map(i -> generateUpdateInfo(i, source, target))
|
.map(i -> generateUpdateInfo(i, source, target))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
|
@ -1,49 +1,155 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.broker.oa.util;
|
package eu.dnetlib.dhp.broker.oa.util;
|
||||||
|
|
||||||
import java.util.stream.Stream;
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
import org.dom4j.Document;
|
||||||
|
import org.dom4j.DocumentException;
|
||||||
|
import org.dom4j.DocumentHelper;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.Instance;
|
|
||||||
import eu.dnetlib.broker.objects.Pid;
|
import eu.dnetlib.broker.objects.Pid;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.ExternalReference;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Instance;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Journal;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||||
|
|
||||||
public class ConversionUtils {
|
public class ConversionUtils {
|
||||||
|
|
||||||
public static Stream<Instance> oafInstanceToBrokerInstances(final eu.dnetlib.dhp.schema.oaf.Instance i) {
|
private static final Logger log = LoggerFactory.getLogger(ConversionUtils.class);
|
||||||
|
|
||||||
|
public static List<eu.dnetlib.broker.objects.Instance> oafInstanceToBrokerInstances(final Instance i) {
|
||||||
return i.getUrl().stream().map(url -> {
|
return i.getUrl().stream().map(url -> {
|
||||||
final Instance r = new Instance();
|
return new eu.dnetlib.broker.objects.Instance()
|
||||||
r.setUrl(url);
|
.setUrl(url)
|
||||||
r.setInstancetype(i.getInstancetype().getClassid());
|
.setInstancetype(i.getInstancetype().getClassid())
|
||||||
r.setLicense(BrokerConstants.OPEN_ACCESS);
|
.setLicense(BrokerConstants.OPEN_ACCESS)
|
||||||
r.setHostedby(i.getHostedby().getValue());
|
.setHostedby(i.getHostedby().getValue());
|
||||||
return r;
|
}).collect(Collectors.toList());
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Pid oafPidToBrokerPid(final StructuredProperty sp) {
|
public static Pid oafPidToBrokerPid(final StructuredProperty sp) {
|
||||||
final Pid pid = new Pid();
|
return sp != null ? new Pid()
|
||||||
pid.setValue(sp.getValue());
|
.setValue(sp.getValue())
|
||||||
pid.setType(sp.getQualifier().getClassid());
|
.setType(sp.getQualifier().getClassid()) : null;
|
||||||
return pid;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final Pair<String, String> oafSubjectToPair(final StructuredProperty sp) {
|
public static final Pair<String, String> oafSubjectToPair(final StructuredProperty sp) {
|
||||||
return Pair.of(sp.getQualifier().getClassid(), sp.getValue());
|
return sp != null ? Pair.of(sp.getQualifier().getClassid(), sp.getValue()) : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) {
|
public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) {
|
||||||
final eu.dnetlib.broker.objects.Dataset res = new eu.dnetlib.broker.objects.Dataset();
|
return d != null ? new eu.dnetlib.broker.objects.Dataset()
|
||||||
// TODO
|
.setOriginalId(d.getOriginalId().get(0))
|
||||||
|
.setTitles(structPropList(d.getTitle()))
|
||||||
|
.setPids(d.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
|
||||||
|
.setInstances(d.getInstance().stream().map(ConversionUtils::oafInstanceToBrokerInstances).flatMap(List::stream).collect(Collectors.toList()))
|
||||||
|
.setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toList()))
|
||||||
|
: null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final eu.dnetlib.broker.objects.Publication oafResultToBrokerPublication(final Result result) {
|
||||||
|
|
||||||
|
return result != null ? new eu.dnetlib.broker.objects.Publication()
|
||||||
|
.setOriginalId(result.getOriginalId().get(0))
|
||||||
|
.setTitles(structPropList(result.getTitle()))
|
||||||
|
.setAbstracts(fieldList(result.getDescription()))
|
||||||
|
.setLanguage(result.getLanguage().getClassid())
|
||||||
|
.setSubjects(structPropList(result.getSubject()))
|
||||||
|
.setCreators(result.getAuthor().stream().map(Author::getFullname).collect(Collectors.toList()))
|
||||||
|
.setPublicationdate(result.getDateofcollection())
|
||||||
|
.setPublisher(fieldValue(result.getPublisher()))
|
||||||
|
.setEmbargoenddate(fieldValue(result.getEmbargoenddate()))
|
||||||
|
.setContributor(fieldList(result.getContributor()))
|
||||||
|
.setJournal(result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null)
|
||||||
|
.setCollectedFrom(result.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toList()))
|
||||||
|
.setPids(result.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
|
||||||
|
.setInstances(result.getInstance().stream().map(ConversionUtils::oafInstanceToBrokerInstances).flatMap(List::stream).collect(Collectors.toList()))
|
||||||
|
.setExternalReferences(result.getExternalReference().stream().map(ConversionUtils::oafExtRefToBrokerExtRef).collect(Collectors.toList()))
|
||||||
|
: null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static eu.dnetlib.broker.objects.Journal oafJournalToBrokerJournal(final Journal journal) {
|
||||||
|
return journal != null ? new eu.dnetlib.broker.objects.Journal()
|
||||||
|
.setName(journal.getName())
|
||||||
|
.setIssn(journal.getIssnPrinted())
|
||||||
|
.setEissn(journal.getIssnOnline())
|
||||||
|
.setLissn(journal.getIssnLinking()) : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static eu.dnetlib.broker.objects.ExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) {
|
||||||
|
return ref != null ? new eu.dnetlib.broker.objects.ExternalReference()
|
||||||
|
.setRefidentifier(ref.getRefidentifier())
|
||||||
|
.setSitename(ref.getSitename())
|
||||||
|
.setType(ref.getQualifier().getClassid())
|
||||||
|
.setUrl(ref.getUrl())
|
||||||
|
: null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) {
|
||||||
|
if (p == null) { return null; }
|
||||||
|
|
||||||
|
final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project()
|
||||||
|
.setTitle(fieldValue(p.getTitle()))
|
||||||
|
.setAcronym(fieldValue(p.getAcronym()))
|
||||||
|
.setCode(fieldValue(p.getCode()));
|
||||||
|
|
||||||
|
final String ftree = fieldValue(p.getFundingtree());
|
||||||
|
if (StringUtils.isNotBlank(ftree)) {
|
||||||
|
try {
|
||||||
|
final Document fdoc = DocumentHelper.parseText(ftree);
|
||||||
|
res.setFunder(fdoc.valueOf("/fundingtree/funder/shortname"));
|
||||||
|
res.setJurisdiction(fdoc.valueOf("/fundingtree/funder/jurisdiction"));
|
||||||
|
res.setFundingProgram(fdoc.valueOf("//funding_level_0/name"));
|
||||||
|
} catch (final DocumentException e) {
|
||||||
|
log.error("Error in record " + p.getId() + ": invalid fundingtree: " + ftree);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication d) {
|
public static final eu.dnetlib.broker.objects.Software oafSoftwareToBrokerSoftware(final Software sw) {
|
||||||
final eu.dnetlib.broker.objects.Publication res = new eu.dnetlib.broker.objects.Publication();
|
return sw != null ? new eu.dnetlib.broker.objects.Software()
|
||||||
// TODO
|
.setName(structPropValue(sw.getTitle()))
|
||||||
return res;
|
.setDescription(fieldValue(sw.getDescription()))
|
||||||
|
.setRepository(fieldValue(sw.getCodeRepositoryUrl()))
|
||||||
|
.setLandingPage(fieldValue(sw.getDocumentationUrl()))
|
||||||
|
: null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String fieldValue(final Field<String> f) {
|
||||||
|
return f != null ? f.getValue() : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String fieldValue(final List<Field<String>> fl) {
|
||||||
|
return fl != null ? fl.stream().map(Field::getValue).filter(StringUtils::isNotBlank).findFirst().orElse(null) : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String structPropValue(final List<StructuredProperty> props) {
|
||||||
|
return props != null ? props.stream().map(StructuredProperty::getValue).filter(StringUtils::isNotBlank).findFirst().orElse(null) : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<String> fieldList(final List<Field<String>> fl) {
|
||||||
|
return fl != null ? fl.stream().map(Field::getValue).filter(StringUtils::isNotBlank).collect(Collectors.toList())
|
||||||
|
: new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<String> structPropList(final List<StructuredProperty> props) {
|
||||||
|
return props != null ? props.stream().map(StructuredProperty::getValue).filter(StringUtils::isNotBlank).collect(Collectors.toList())
|
||||||
|
: new ArrayList<>();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,16 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.broker.oa.util;
|
package eu.dnetlib.dhp.broker.oa.util;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.OpenAireEventPayload;
|
import eu.dnetlib.broker.objects.OpenAireEventPayload;
|
||||||
|
import eu.dnetlib.broker.objects.Provenance;
|
||||||
import eu.dnetlib.broker.objects.Publication;
|
import eu.dnetlib.broker.objects.Publication;
|
||||||
import eu.dnetlib.dhp.broker.model.Topic;
|
import eu.dnetlib.dhp.broker.model.Topic;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Instance;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
|
||||||
public final class UpdateInfo<T> {
|
public final class UpdateInfo<T> {
|
||||||
|
@ -66,12 +70,29 @@ public final class UpdateInfo<T> {
|
||||||
return trust;
|
return trust;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void compileHighlight(final OpenAireEventPayload payload) {
|
|
||||||
compileHighlight.accept(payload.getHighlight(), getHighlightValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getHighlightValueAsString() {
|
public String getHighlightValueAsString() {
|
||||||
return highlightToString.apply(getHighlightValue());
|
return highlightToString.apply(getHighlightValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public OpenAireEventPayload asBrokerPayload() {
|
||||||
|
|
||||||
|
final Publication p = ConversionUtils.oafResultToBrokerPublication(getSource());
|
||||||
|
compileHighlight.accept(p, getHighlightValue());
|
||||||
|
|
||||||
|
final Publication hl = new Publication();
|
||||||
|
compileHighlight.accept(hl, getHighlightValue());
|
||||||
|
|
||||||
|
final String provId = getSource().getOriginalId().stream().findFirst().orElse(null);
|
||||||
|
final String provRepo = getSource().getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null);
|
||||||
|
final String provUrl = getSource().getInstance().stream().map(Instance::getUrl).flatMap(List::stream).findFirst().orElse(null);;
|
||||||
|
|
||||||
|
final Provenance provenance = new Provenance().setId(provId).setRepositoryName(provRepo).setUrl(provUrl);
|
||||||
|
|
||||||
|
return new OpenAireEventPayload()
|
||||||
|
.setPublication(p)
|
||||||
|
.setHighlight(hl)
|
||||||
|
.setTrust(trust)
|
||||||
|
.setProvenance(provenance);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,10 @@
|
||||||
<name>hiveJdbcUrl</name>
|
<name>hiveJdbcUrl</name>
|
||||||
<description>hive server jdbc url</description>
|
<description>hive server jdbc url</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hive_timeout</name>
|
||||||
|
<description>the time period, in seconds, after which Hive fails a transaction if a Hive client has not sent a hearbeat. The default value is 300 seconds.</description>
|
||||||
|
</property>
|
||||||
</parameters>
|
</parameters>
|
||||||
|
|
||||||
<global>
|
<global>
|
||||||
|
@ -31,6 +35,10 @@
|
||||||
<name>hive.metastore.uris</name>
|
<name>hive.metastore.uris</name>
|
||||||
<value>${hiveMetastoreUris}</value>
|
<value>${hiveMetastoreUris}</value>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hive.txn.timeout</name>
|
||||||
|
<value>${hive_timeout}</value>
|
||||||
|
</property>
|
||||||
</configuration>
|
</configuration>
|
||||||
</global>
|
</global>
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue