forked from antonis.lempesis/dnet-hadoop
partial implemantation of broker events generation
This commit is contained in:
parent
97177d7f7b
commit
7e82996e7c
|
@ -39,9 +39,8 @@ public class EventFactory {
|
|||
|
||||
final String payload = createPayload(updateInfo);
|
||||
|
||||
final String eventId = calculateEventId(
|
||||
updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0),
|
||||
updateInfo.getHighlightValueAsString());
|
||||
final String eventId =
|
||||
calculateEventId(updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0), updateInfo.getHighlightValueAsString());
|
||||
|
||||
res.setEventId(eventId);
|
||||
res.setProducerId(PRODUCER_ID);
|
||||
|
@ -56,7 +55,7 @@ public class EventFactory {
|
|||
|
||||
private static String createPayload(final UpdateInfo<?> updateInfo) {
|
||||
final OpenAireEventPayload payload = new OpenAireEventPayload();
|
||||
// TODO
|
||||
// TODO : use ConversionUtils
|
||||
|
||||
updateInfo.compileHighlight(payload);
|
||||
|
||||
|
@ -93,17 +92,13 @@ public class EventFactory {
|
|||
final List<StructuredProperty> subjects = target.getSubject();
|
||||
if (subjects.size() > 0) {
|
||||
map
|
||||
.put(
|
||||
"target_publication_subject_list",
|
||||
subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList()));
|
||||
.put("target_publication_subject_list", subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
final List<Author> authors = target.getAuthor();
|
||||
if (authors.size() > 0) {
|
||||
map
|
||||
.put(
|
||||
"target_publication_author_list",
|
||||
authors.stream().map(Author::getFullname).collect(Collectors.toList()));
|
||||
.put("target_publication_author_list", authors.stream().map(Author::getFullname).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
// PROVENANCE INFO
|
||||
|
@ -130,9 +125,7 @@ public class EventFactory {
|
|||
}
|
||||
|
||||
private static long parseDateTolong(final String date) {
|
||||
if (StringUtils.isBlank(date)) {
|
||||
return -1;
|
||||
}
|
||||
if (StringUtils.isBlank(date)) { return -1; }
|
||||
try {
|
||||
return DateUtils.parseDate(date, DATE_PATTERNS).getTime();
|
||||
} catch (final ParseException e) {
|
||||
|
|
|
@ -43,20 +43,21 @@ import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublic
|
|||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedBy;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedTo;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationReferences;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMissingSoftware;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMoreSoftware;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAbstract;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAuthorOrcid;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingOpenAccess;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPid;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSoftware;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSubject;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSoftware;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject;
|
||||
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
|
@ -177,14 +178,6 @@ public class GenerateEventsApplication {
|
|||
|
||||
}
|
||||
|
||||
private static <SRC extends Result, TRG extends Result> JavaRDD<Event> generateRelationEvents(final SparkSession spark,
|
||||
final String graphPath,
|
||||
final Class<SRC> sourceClass,
|
||||
final Class<TRG> targetClass) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
private List<Event> generateSimpleEvents(final Collection<Result> children) {
|
||||
final List<UpdateInfo<?>> list = new ArrayList<>();
|
||||
|
||||
|
@ -203,6 +196,35 @@ public class GenerateEventsApplication {
|
|||
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private static <SRC extends Result, TRG extends OafEntity> JavaRDD<Event> generateRelationEvents(final SparkSession spark,
|
||||
final String graphPath,
|
||||
final Class<SRC> sourceClass,
|
||||
final Class<TRG> targetClass) {
|
||||
|
||||
final Dataset<SRC> sources = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
|
||||
.filter(r -> r.getDataInfo().getDeletedbyinference());
|
||||
|
||||
final Dataset<TRG> targets = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), targetClass);
|
||||
|
||||
final Dataset<Relation> mergedRels = readPath(spark, graphPath + "/relation", Relation.class)
|
||||
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
|
||||
|
||||
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
|
||||
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
|
||||
|
||||
if (targetClass == Project.class) {
|
||||
// TODO join using: generateProjectsEvents
|
||||
} else if (targetClass == Software.class) {
|
||||
// TODO join using: generateSoftwareEvents
|
||||
} else if (targetClass == Publication.class) {
|
||||
// TODO join using: generatePublicationRelatedEvents
|
||||
} else if (targetClass == eu.dnetlib.dhp.schema.oaf.Dataset.class) {
|
||||
// TODO join using: generateDatasetRelatedEvents
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private List<Event> generateProjectsEvents(final Collection<Pair<Result, List<Project>>> childrenWithProjects) {
|
||||
final List<UpdateInfo<?>> list = new ArrayList<>();
|
||||
|
||||
|
|
|
@ -3,11 +3,13 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects;
|
|||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
@ -22,8 +24,16 @@ public class EnrichMissingProject
|
|||
@Override
|
||||
protected List<UpdateInfo<eu.dnetlib.broker.objects.Project>> findUpdates(final Pair<Result, List<Project>> source,
|
||||
final Pair<Result, List<Project>> target) {
|
||||
// TODO
|
||||
return Arrays.asList();
|
||||
|
||||
if (source.getRight().isEmpty()) {
|
||||
return Arrays.asList();
|
||||
} else {
|
||||
return target.getRight()
|
||||
.stream()
|
||||
.map(ConversionUtils::oafProjectToBrokerProject)
|
||||
.map(p -> generateUpdateInfo(p, source, target))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,13 +1,15 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
@ -21,8 +23,18 @@ public class EnrichMoreProject extends UpdateMatcher<Pair<Result, List<Project>>
|
|||
@Override
|
||||
protected List<UpdateInfo<eu.dnetlib.broker.objects.Project>> findUpdates(final Pair<Result, List<Project>> source,
|
||||
final Pair<Result, List<Project>> target) {
|
||||
// TODO
|
||||
return Arrays.asList();
|
||||
|
||||
final Set<String> existingProjects = source.getRight()
|
||||
.stream()
|
||||
.map(Project::getId)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
return target.getRight()
|
||||
.stream()
|
||||
.filter(p -> !existingProjects.contains(p.getId()))
|
||||
.map(ConversionUtils::oafProjectToBrokerProject)
|
||||
.map(p -> generateUpdateInfo(p, source, target))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,13 +1,15 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa.matchers.simple;
|
||||
package eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
|
@ -23,8 +25,16 @@ public class EnrichMissingSoftware
|
|||
protected List<UpdateInfo<eu.dnetlib.broker.objects.Software>> findUpdates(
|
||||
final Pair<Result, List<Software>> source,
|
||||
final Pair<Result, List<Software>> target) {
|
||||
// TODO
|
||||
return Arrays.asList();
|
||||
|
||||
if (source.getRight().isEmpty()) {
|
||||
return Arrays.asList();
|
||||
} else {
|
||||
return target.getRight()
|
||||
.stream()
|
||||
.map(ConversionUtils::oafSoftwareToBrokerSoftware)
|
||||
.map(p -> generateUpdateInfo(p, source, target))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
|
@ -1,13 +1,15 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa.matchers.simple;
|
||||
package eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
|
@ -23,8 +25,18 @@ public class EnrichMoreSoftware
|
|||
protected List<UpdateInfo<eu.dnetlib.broker.objects.Software>> findUpdates(
|
||||
final Pair<Result, List<Software>> source,
|
||||
final Pair<Result, List<Software>> target) {
|
||||
// TODO
|
||||
return Arrays.asList();
|
||||
|
||||
final Set<String> existingSoftwares = source.getRight()
|
||||
.stream()
|
||||
.map(Software::getId)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
return target.getRight()
|
||||
.stream()
|
||||
.filter(p -> !existingSoftwares.contains(p.getId()))
|
||||
.map(ConversionUtils::oafSoftwareToBrokerSoftware)
|
||||
.map(p -> generateUpdateInfo(p, source, target))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
|
@ -28,16 +28,14 @@ public class EnrichMissingOpenAccess extends UpdateMatcher<Result, Instance> {
|
|||
.filter(right -> right.equals(BrokerConstants.OPEN_ACCESS))
|
||||
.count();
|
||||
|
||||
if (count > 0) {
|
||||
return Arrays.asList();
|
||||
}
|
||||
if (count > 0) { return Arrays.asList(); }
|
||||
|
||||
return source
|
||||
.getInstance()
|
||||
.stream()
|
||||
.filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
|
||||
.map(ConversionUtils::oafInstanceToBrokerInstances)
|
||||
.flatMap(s -> s)
|
||||
.flatMap(List::stream)
|
||||
.map(i -> generateUpdateInfo(i, source, target))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ public class EnrichMoreOpenAccess extends UpdateMatcher<Result, Instance> {
|
|||
.stream()
|
||||
.filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
|
||||
.map(ConversionUtils::oafInstanceToBrokerInstances)
|
||||
.flatMap(s -> s)
|
||||
.flatMap(List::stream)
|
||||
.filter(i -> !urls.contains(i.getUrl()))
|
||||
.map(i -> generateUpdateInfo(i, source, target))
|
||||
.collect(Collectors.toList());
|
||||
|
|
|
@ -1,19 +1,25 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa.util;
|
||||
|
||||
import java.util.stream.Stream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import eu.dnetlib.broker.objects.Instance;
|
||||
import eu.dnetlib.broker.objects.Pid;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
|
||||
public class ConversionUtils {
|
||||
|
||||
public static Stream<Instance> oafInstanceToBrokerInstances(final eu.dnetlib.dhp.schema.oaf.Instance i) {
|
||||
public static List<Instance> oafInstanceToBrokerInstances(final eu.dnetlib.dhp.schema.oaf.Instance i) {
|
||||
return i.getUrl().stream().map(url -> {
|
||||
final Instance r = new Instance();
|
||||
r.setUrl(url);
|
||||
|
@ -21,7 +27,7 @@ public class ConversionUtils {
|
|||
r.setLicense(BrokerConstants.OPEN_ACCESS);
|
||||
r.setHostedby(i.getHostedby().getValue());
|
||||
return r;
|
||||
});
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public static Pid oafPidToBrokerPid(final StructuredProperty sp) {
|
||||
|
@ -37,13 +43,60 @@ public class ConversionUtils {
|
|||
|
||||
public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) {
|
||||
final eu.dnetlib.broker.objects.Dataset res = new eu.dnetlib.broker.objects.Dataset();
|
||||
// TODO
|
||||
res.setOriginalId(d.getOriginalId().get(0));
|
||||
res.setTitles(structPropList(d.getTitle()));
|
||||
res.setPids(d.getPid().stream().map(ConversionUtils::structeredPropertyToPid).collect(Collectors.toList()));
|
||||
res.setInstances(d.getInstance().stream().map(ConversionUtils::oafInstanceToBrokerInstances).flatMap(List::stream).collect(Collectors.toList()));
|
||||
res.setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toList()));
|
||||
return res;
|
||||
}
|
||||
|
||||
public static final eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication d) {
|
||||
final eu.dnetlib.broker.objects.Publication res = new eu.dnetlib.broker.objects.Publication();
|
||||
// TODO
|
||||
// TODO This should be reusable
|
||||
return res;
|
||||
}
|
||||
|
||||
public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) {
|
||||
final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project();
|
||||
res.setTitle(fieldValue(p.getTitle()));
|
||||
res.setAcronym(fieldValue(p.getAcronym()));
|
||||
res.setCode(fieldValue(p.getCode()));
|
||||
res.setFunder(null); // TODO
|
||||
res.setFundingProgram(null); // TODO
|
||||
res.setJurisdiction(null); // TODO
|
||||
return res;
|
||||
}
|
||||
|
||||
public static final eu.dnetlib.broker.objects.Software oafSoftwareToBrokerSoftware(final Software sw) {
|
||||
final eu.dnetlib.broker.objects.Software res = new eu.dnetlib.broker.objects.Software();
|
||||
res.setName(structPropValue(sw.getTitle()));
|
||||
res.setDescription(fieldValue(sw.getDescription()));
|
||||
res.setRepository(fieldValue(sw.getCodeRepositoryUrl()));
|
||||
res.setLandingPage(fieldValue(sw.getDocumentationUrl()));
|
||||
return res;
|
||||
}
|
||||
|
||||
private static Pid structeredPropertyToPid(final StructuredProperty sp) {
|
||||
final Pid pid = new Pid();
|
||||
pid.setValue(sp.getValue());
|
||||
pid.setType(sp.getQualifier().getClassid());
|
||||
return pid;
|
||||
}
|
||||
|
||||
private static String fieldValue(final Field<String> f) {
|
||||
return f != null ? f.getValue() : null;
|
||||
}
|
||||
|
||||
private static String fieldValue(final List<Field<String>> fl) {
|
||||
return fl != null && !fl.isEmpty() && fl.get(0) != null ? fl.get(0).getValue() : null;
|
||||
}
|
||||
|
||||
private static String structPropValue(final List<StructuredProperty> props) {
|
||||
return props != null && !props.isEmpty() && props.get(0) != null ? props.get(0).getValue() : null;
|
||||
}
|
||||
|
||||
private static List<String> structPropList(final List<StructuredProperty> props) {
|
||||
return props != null ? props.stream().map(StructuredProperty::getValue).collect(Collectors.toList()) : new ArrayList<>();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue