partial refactoring

This commit is contained in:
Michele Artini 2020-06-16 15:53:13 +02:00
parent 8a4f84f8c0
commit 76ea7607f7
3 changed files with 76 additions and 84 deletions

View File

@ -51,8 +51,7 @@ public class GenerateEventsApplication {
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(GenerateEventsApplication.class
GenerateEventsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json"))); .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json")));
parser.parseArgument(args); parser.parseArgument(args);
@ -118,14 +117,11 @@ public class GenerateEventsApplication {
return results return results
.joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner") .joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner")
.groupByKey( .groupByKey((MapFunction<Tuple2<OpenaireBrokerResult, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
(MapFunction<Tuple2<OpenaireBrokerResult, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
.agg(aggr) .agg(aggr)
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) .map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class))
.filter(ResultGroup::isValid) .filter(ResultGroup::isValid)
.map( .map((MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig), Encoders.kryo(EventGroup.class))
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig),
Encoders.kryo(EventGroup.class))
.flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class)); .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class));
} }
@ -134,25 +130,22 @@ public class GenerateEventsApplication {
final String graphPath, final String graphPath,
final Class<SRC> sourceClass) { final Class<SRC> sourceClass) {
final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class); final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class);
final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath( final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class); final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class);
final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class); final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class);
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class) final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
.cache();
final Dataset<OpenaireBrokerResult> r0 = readPath( final Dataset<OpenaireBrokerResult> r0 = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class)
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class)
.filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getDataInfo().getDeletedbyinference())
.map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class));
final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
final Dataset<OpenaireBrokerResult> r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class)); final Dataset<OpenaireBrokerResult> r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class));
final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class)); final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class));
final Dataset<OpenaireBrokerResult> r4 = join( final Dataset<OpenaireBrokerResult> r4 = join(r3, rels, relatedEntities(publications, rels, RelatedProject.class));;
r3, rels, relatedEntities(publications, rels, RelatedProject.class));
;
return r4; return r4;
} }
@ -162,9 +155,7 @@ public class GenerateEventsApplication {
final Class<RT> clazz) { final Class<RT> clazz) {
return rels return rels
.joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner") .joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner")
.map( .map(t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), Encoders.kryo(clazz));
t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz),
Encoders.kryo(clazz));
} }
private static <T> Dataset<OpenaireBrokerResult> join(final Dataset<OpenaireBrokerResult> sources, private static <T> Dataset<OpenaireBrokerResult> join(final Dataset<OpenaireBrokerResult> sources,
@ -172,13 +163,11 @@ public class GenerateEventsApplication {
final Dataset<T> typedRels) { final Dataset<T> typedRels) {
final TypedColumn<Tuple2<OpenaireBrokerResult, T>, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator<T>() final TypedColumn<Tuple2<OpenaireBrokerResult, T>, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator<T>()
.toColumn(); .toColumn();;
;
return sources return sources
.joinWith(typedRels, sources.col("result.id").equalTo(rels.col("source")), "left_outer") .joinWith(typedRels, sources.col("result.id").equalTo(rels.col("source")), "left_outer")
.groupByKey( .groupByKey((MapFunction<Tuple2<OpenaireBrokerResult, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
(MapFunction<Tuple2<OpenaireBrokerResult, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
.agg(aggr) .agg(aggr)
.map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class)); .map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class));
} }
@ -197,11 +186,8 @@ public class GenerateEventsApplication {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
final String conf = isLookUpService final String conf = isLookUpService
.getResourceProfileByQuery( .getResourceProfileByQuery(String
String .format("for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", profId));
.format(
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
profId));
final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class); final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class);
dedupConfig.getPace().initModel(); dedupConfig.getPace().initModel();

View File

@ -57,8 +57,7 @@ public class ConversionUtils {
.setOriginalId(d.getOriginalId().get(0)) .setOriginalId(d.getOriginalId().get(0))
.setTitle(structPropValue(d.getTitle())) .setTitle(structPropValue(d.getTitle()))
.setPids(d.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) .setPids(d.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
.setInstances( .setInstances(d
d
.getInstance() .getInstance()
.stream() .stream()
.map(ConversionUtils::oafInstanceToBrokerInstances) .map(ConversionUtils::oafInstanceToBrokerInstances)
@ -68,6 +67,21 @@ public class ConversionUtils {
: null; : null;
} }
public static eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication p) {
return p != null ? new eu.dnetlib.broker.objects.Publication()
.setOriginalId(p.getOriginalId().get(0))
.setTitle(structPropValue(p.getTitle()))
.setPids(p.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
.setInstances(p
.getInstance()
.stream()
.map(ConversionUtils::oafInstanceToBrokerInstances)
.flatMap(List::stream)
.collect(Collectors.toList()))
.setCollectedFrom(p.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null))
: null;
}
public static final OpenaireBrokerResult oafResultToBrokerResult(final Result result) { public static final OpenaireBrokerResult oafResultToBrokerResult(final Result result) {
return result != null ? new OpenaireBrokerResult() return result != null ? new OpenaireBrokerResult()
@ -78,26 +92,22 @@ public class ConversionUtils {
.setAbstracts(fieldList(result.getDescription())) .setAbstracts(fieldList(result.getDescription()))
.setLanguage(result.getLanguage().getClassid()) .setLanguage(result.getLanguage().getClassid())
.setSubjects(structPropTypedList(result.getSubject())) .setSubjects(structPropTypedList(result.getSubject()))
.setCreators( .setCreators(result.getAuthor().stream().map(ConversionUtils::oafAuthorToBrokerAuthor).collect(Collectors.toList()))
result.getAuthor().stream().map(ConversionUtils::oafAuthorToBrokerAuthor).collect(Collectors.toList()))
.setPublicationdate(result.getDateofacceptance().getValue()) .setPublicationdate(result.getDateofacceptance().getValue())
.setPublisher(fieldValue(result.getPublisher())) .setPublisher(fieldValue(result.getPublisher()))
.setEmbargoenddate(fieldValue(result.getEmbargoenddate())) .setEmbargoenddate(fieldValue(result.getEmbargoenddate()))
.setContributor(fieldList(result.getContributor())) .setContributor(fieldList(result.getContributor()))
.setJournal( .setJournal(result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null)
result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null)
.setCollectedFromId(result.getCollectedfrom().stream().map(KeyValue::getKey).findFirst().orElse(null)) .setCollectedFromId(result.getCollectedfrom().stream().map(KeyValue::getKey).findFirst().orElse(null))
.setCollectedFromName(result.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) .setCollectedFromName(result.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null))
.setPids(result.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) .setPids(result.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
.setInstances( .setInstances(result
result
.getInstance() .getInstance()
.stream() .stream()
.map(ConversionUtils::oafInstanceToBrokerInstances) .map(ConversionUtils::oafInstanceToBrokerInstances)
.flatMap(List::stream) .flatMap(List::stream)
.collect(Collectors.toList())) .collect(Collectors.toList()))
.setExternalReferences( .setExternalReferences(result
result
.getExternalReference() .getExternalReference()
.stream() .stream()
.map(ConversionUtils::oafExtRefToBrokerExtRef) .map(ConversionUtils::oafExtRefToBrokerExtRef)
@ -108,8 +118,7 @@ public class ConversionUtils {
private static List<TypedValue> structPropTypedList(final List<StructuredProperty> list) { private static List<TypedValue> structPropTypedList(final List<StructuredProperty> list) {
return list return list
.stream() .stream()
.map( .map(p -> new TypedValue()
p -> new TypedValue()
.setValue(p.getValue()) .setValue(p.getValue())
.setType(p.getQualifier().getClassid())) .setType(p.getQualifier().getClassid()))
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -118,8 +127,7 @@ public class ConversionUtils {
private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) { private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) {
return author != null ? new eu.dnetlib.broker.objects.Author() return author != null ? new eu.dnetlib.broker.objects.Author()
.setFullname(author.getFullname()) .setFullname(author.getFullname())
.setOrcid( .setOrcid(author
author
.getPid() .getPid()
.stream() .stream()
.filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid"))
@ -147,9 +155,7 @@ public class ConversionUtils {
} }
public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) { public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) {
if (p == null) { if (p == null) { return null; }
return null;
}
final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project() final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project()
.setTitle(fieldValue(p.getTitle())) .setTitle(fieldValue(p.getTitle()))

View File

@ -1,10 +1,11 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import eu.dnetlib.broker.objects.Dataset; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.broker.objects.Project; import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.broker.objects.Publication; import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.broker.objects.Software; import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Software;
public class RelatedEntityFactory { public class RelatedEntityFactory {
@ -13,18 +14,17 @@ public class RelatedEntityFactory {
final String relType, final String relType,
final T target, final T target,
final Class<RT> clazz) { final Class<RT> clazz) {
if (clazz == RelatedProject.class) { if (clazz == RelatedProject.class) {
return (RT) new RelatedProject(sourceId, relType, (Project) target); return (RT) new RelatedProject(sourceId, relType, ConversionUtils.oafProjectToBrokerProject((Project) target));
} } else if (clazz == RelatedSoftware.class) {
if (clazz == RelatedSoftware.class) { return (RT) new RelatedSoftware(sourceId, relType, ConversionUtils.oafSoftwareToBrokerSoftware((Software) target));
return (RT) new RelatedSoftware(sourceId, relType, (Software) target); } else if (clazz == RelatedDataset.class) {
} return (RT) new RelatedDataset(sourceId, relType, ConversionUtils.oafDatasetToBrokerDataset((Dataset) target));
if (clazz == RelatedDataset.class) { } else if (clazz == RelatedPublication.class) {
return (RT) new RelatedDataset(sourceId, relType, (Dataset) target); return (RT) new RelatedPublication(sourceId, relType, ConversionUtils.oafPublicationToBrokerPublication((Publication) target));
} } else {
if (clazz == RelatedPublication.class) {
return (RT) new RelatedPublication(sourceId, relType, (Publication) target);
}
return null; return null;
} }
} }
}