forked from D-Net/dnet-hadoop
partial refactoring
This commit is contained in:
parent
113c9b1de0
commit
9e2c23e391
|
@ -51,8 +51,9 @@ public class GenerateEventsApplication {
|
||||||
public static void main(final String[] args) throws Exception {
|
public static void main(final String[] args) throws Exception {
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils
|
IOUtils
|
||||||
.toString(GenerateEventsApplication.class
|
.toString(
|
||||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json")));
|
GenerateEventsApplication.class
|
||||||
|
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json")));
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
final Boolean isSparkSessionManaged = Optional
|
final Boolean isSparkSessionManaged = Optional
|
||||||
|
@ -117,11 +118,14 @@ public class GenerateEventsApplication {
|
||||||
|
|
||||||
return results
|
return results
|
||||||
.joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner")
|
.joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner")
|
||||||
.groupByKey((MapFunction<Tuple2<OpenaireBrokerResult, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
|
.groupByKey(
|
||||||
|
(MapFunction<Tuple2<OpenaireBrokerResult, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
|
||||||
.agg(aggr)
|
.agg(aggr)
|
||||||
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class))
|
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class))
|
||||||
.filter(ResultGroup::isValid)
|
.filter(ResultGroup::isValid)
|
||||||
.map((MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig), Encoders.kryo(EventGroup.class))
|
.map(
|
||||||
|
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig),
|
||||||
|
Encoders.kryo(EventGroup.class))
|
||||||
.flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class));
|
.flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -130,7 +134,8 @@ public class GenerateEventsApplication {
|
||||||
final String graphPath,
|
final String graphPath,
|
||||||
final Class<SRC> sourceClass) {
|
final Class<SRC> sourceClass) {
|
||||||
final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class);
|
final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class);
|
||||||
final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
|
final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath(
|
||||||
|
spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
|
||||||
final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class);
|
final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class);
|
||||||
final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class);
|
final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class);
|
||||||
|
|
||||||
|
@ -138,14 +143,17 @@ public class GenerateEventsApplication {
|
||||||
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
|
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
|
||||||
.cache();
|
.cache();
|
||||||
|
|
||||||
final Dataset<OpenaireBrokerResult> r0 = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class)
|
final Dataset<OpenaireBrokerResult> r0 = readPath(
|
||||||
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class)
|
||||||
.map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class));
|
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
||||||
|
.map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class));
|
||||||
|
|
||||||
final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
|
final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
|
||||||
final Dataset<OpenaireBrokerResult> r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class));
|
final Dataset<OpenaireBrokerResult> r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class));
|
||||||
final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class));
|
final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class));
|
||||||
final Dataset<OpenaireBrokerResult> r4 = join(r3, rels, relatedEntities(publications, rels, RelatedProject.class));;
|
final Dataset<OpenaireBrokerResult> r4 = join(
|
||||||
|
r3, rels, relatedEntities(publications, rels, RelatedProject.class));
|
||||||
|
;
|
||||||
|
|
||||||
return r4;
|
return r4;
|
||||||
}
|
}
|
||||||
|
@ -155,7 +163,9 @@ public class GenerateEventsApplication {
|
||||||
final Class<RT> clazz) {
|
final Class<RT> clazz) {
|
||||||
return rels
|
return rels
|
||||||
.joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner")
|
.joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner")
|
||||||
.map(t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), Encoders.kryo(clazz));
|
.map(
|
||||||
|
t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz),
|
||||||
|
Encoders.kryo(clazz));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T> Dataset<OpenaireBrokerResult> join(final Dataset<OpenaireBrokerResult> sources,
|
private static <T> Dataset<OpenaireBrokerResult> join(final Dataset<OpenaireBrokerResult> sources,
|
||||||
|
@ -163,11 +173,13 @@ public class GenerateEventsApplication {
|
||||||
final Dataset<T> typedRels) {
|
final Dataset<T> typedRels) {
|
||||||
|
|
||||||
final TypedColumn<Tuple2<OpenaireBrokerResult, T>, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator<T>()
|
final TypedColumn<Tuple2<OpenaireBrokerResult, T>, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator<T>()
|
||||||
.toColumn();;
|
.toColumn();
|
||||||
|
;
|
||||||
|
|
||||||
return sources
|
return sources
|
||||||
.joinWith(typedRels, sources.col("result.id").equalTo(rels.col("source")), "left_outer")
|
.joinWith(typedRels, sources.col("result.id").equalTo(rels.col("source")), "left_outer")
|
||||||
.groupByKey((MapFunction<Tuple2<OpenaireBrokerResult, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
|
.groupByKey(
|
||||||
|
(MapFunction<Tuple2<OpenaireBrokerResult, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
|
||||||
.agg(aggr)
|
.agg(aggr)
|
||||||
.map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class));
|
.map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class));
|
||||||
}
|
}
|
||||||
|
@ -186,8 +198,11 @@ public class GenerateEventsApplication {
|
||||||
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||||
|
|
||||||
final String conf = isLookUpService
|
final String conf = isLookUpService
|
||||||
.getResourceProfileByQuery(String
|
.getResourceProfileByQuery(
|
||||||
.format("for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", profId));
|
String
|
||||||
|
.format(
|
||||||
|
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
|
||||||
|
profId));
|
||||||
|
|
||||||
final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class);
|
final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class);
|
||||||
dedupConfig.getPace().initModel();
|
dedupConfig.getPace().initModel();
|
||||||
|
|
|
@ -57,12 +57,13 @@ public class ConversionUtils {
|
||||||
.setOriginalId(d.getOriginalId().get(0))
|
.setOriginalId(d.getOriginalId().get(0))
|
||||||
.setTitle(structPropValue(d.getTitle()))
|
.setTitle(structPropValue(d.getTitle()))
|
||||||
.setPids(d.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
|
.setPids(d.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
|
||||||
.setInstances(d
|
.setInstances(
|
||||||
.getInstance()
|
d
|
||||||
.stream()
|
.getInstance()
|
||||||
.map(ConversionUtils::oafInstanceToBrokerInstances)
|
.stream()
|
||||||
.flatMap(List::stream)
|
.map(ConversionUtils::oafInstanceToBrokerInstances)
|
||||||
.collect(Collectors.toList()))
|
.flatMap(List::stream)
|
||||||
|
.collect(Collectors.toList()))
|
||||||
.setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null))
|
.setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null))
|
||||||
: null;
|
: null;
|
||||||
}
|
}
|
||||||
|
@ -72,12 +73,13 @@ public class ConversionUtils {
|
||||||
.setOriginalId(p.getOriginalId().get(0))
|
.setOriginalId(p.getOriginalId().get(0))
|
||||||
.setTitle(structPropValue(p.getTitle()))
|
.setTitle(structPropValue(p.getTitle()))
|
||||||
.setPids(p.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
|
.setPids(p.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
|
||||||
.setInstances(p
|
.setInstances(
|
||||||
.getInstance()
|
p
|
||||||
.stream()
|
.getInstance()
|
||||||
.map(ConversionUtils::oafInstanceToBrokerInstances)
|
.stream()
|
||||||
.flatMap(List::stream)
|
.map(ConversionUtils::oafInstanceToBrokerInstances)
|
||||||
.collect(Collectors.toList()))
|
.flatMap(List::stream)
|
||||||
|
.collect(Collectors.toList()))
|
||||||
.setCollectedFrom(p.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null))
|
.setCollectedFrom(p.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null))
|
||||||
: null;
|
: null;
|
||||||
}
|
}
|
||||||
|
@ -92,48 +94,54 @@ public class ConversionUtils {
|
||||||
.setAbstracts(fieldList(result.getDescription()))
|
.setAbstracts(fieldList(result.getDescription()))
|
||||||
.setLanguage(result.getLanguage().getClassid())
|
.setLanguage(result.getLanguage().getClassid())
|
||||||
.setSubjects(structPropTypedList(result.getSubject()))
|
.setSubjects(structPropTypedList(result.getSubject()))
|
||||||
.setCreators(result.getAuthor().stream().map(ConversionUtils::oafAuthorToBrokerAuthor).collect(Collectors.toList()))
|
.setCreators(
|
||||||
|
result.getAuthor().stream().map(ConversionUtils::oafAuthorToBrokerAuthor).collect(Collectors.toList()))
|
||||||
.setPublicationdate(result.getDateofacceptance().getValue())
|
.setPublicationdate(result.getDateofacceptance().getValue())
|
||||||
.setPublisher(fieldValue(result.getPublisher()))
|
.setPublisher(fieldValue(result.getPublisher()))
|
||||||
.setEmbargoenddate(fieldValue(result.getEmbargoenddate()))
|
.setEmbargoenddate(fieldValue(result.getEmbargoenddate()))
|
||||||
.setContributor(fieldList(result.getContributor()))
|
.setContributor(fieldList(result.getContributor()))
|
||||||
.setJournal(result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null)
|
.setJournal(
|
||||||
|
result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null)
|
||||||
.setCollectedFromId(result.getCollectedfrom().stream().map(KeyValue::getKey).findFirst().orElse(null))
|
.setCollectedFromId(result.getCollectedfrom().stream().map(KeyValue::getKey).findFirst().orElse(null))
|
||||||
.setCollectedFromName(result.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null))
|
.setCollectedFromName(result.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null))
|
||||||
.setPids(result.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
|
.setPids(result.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
|
||||||
.setInstances(result
|
.setInstances(
|
||||||
.getInstance()
|
result
|
||||||
.stream()
|
.getInstance()
|
||||||
.map(ConversionUtils::oafInstanceToBrokerInstances)
|
.stream()
|
||||||
.flatMap(List::stream)
|
.map(ConversionUtils::oafInstanceToBrokerInstances)
|
||||||
.collect(Collectors.toList()))
|
.flatMap(List::stream)
|
||||||
.setExternalReferences(result
|
.collect(Collectors.toList()))
|
||||||
.getExternalReference()
|
.setExternalReferences(
|
||||||
.stream()
|
result
|
||||||
.map(ConversionUtils::oafExtRefToBrokerExtRef)
|
.getExternalReference()
|
||||||
.collect(Collectors.toList()))
|
.stream()
|
||||||
|
.map(ConversionUtils::oafExtRefToBrokerExtRef)
|
||||||
|
.collect(Collectors.toList()))
|
||||||
: null;
|
: null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<TypedValue> structPropTypedList(final List<StructuredProperty> list) {
|
private static List<TypedValue> structPropTypedList(final List<StructuredProperty> list) {
|
||||||
return list
|
return list
|
||||||
.stream()
|
.stream()
|
||||||
.map(p -> new TypedValue()
|
.map(
|
||||||
.setValue(p.getValue())
|
p -> new TypedValue()
|
||||||
.setType(p.getQualifier().getClassid()))
|
.setValue(p.getValue())
|
||||||
|
.setType(p.getQualifier().getClassid()))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) {
|
private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) {
|
||||||
return author != null ? new eu.dnetlib.broker.objects.Author()
|
return author != null ? new eu.dnetlib.broker.objects.Author()
|
||||||
.setFullname(author.getFullname())
|
.setFullname(author.getFullname())
|
||||||
.setOrcid(author
|
.setOrcid(
|
||||||
.getPid()
|
author
|
||||||
.stream()
|
.getPid()
|
||||||
.filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid"))
|
.stream()
|
||||||
.map(pid -> pid.getValue())
|
.filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid"))
|
||||||
.findFirst()
|
.map(pid -> pid.getValue())
|
||||||
.orElse(null))
|
.findFirst()
|
||||||
|
.orElse(null))
|
||||||
: null;
|
: null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,7 +163,9 @@ public class ConversionUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) {
|
public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) {
|
||||||
if (p == null) { return null; }
|
if (p == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project()
|
final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project()
|
||||||
.setTitle(fieldValue(p.getTitle()))
|
.setTitle(fieldValue(p.getTitle()))
|
||||||
|
|
|
@ -16,13 +16,17 @@ public class RelatedEntityFactory {
|
||||||
final Class<RT> clazz) {
|
final Class<RT> clazz) {
|
||||||
|
|
||||||
if (clazz == RelatedProject.class) {
|
if (clazz == RelatedProject.class) {
|
||||||
return (RT) new RelatedProject(sourceId, relType, ConversionUtils.oafProjectToBrokerProject((Project) target));
|
return (RT) new RelatedProject(sourceId, relType,
|
||||||
|
ConversionUtils.oafProjectToBrokerProject((Project) target));
|
||||||
} else if (clazz == RelatedSoftware.class) {
|
} else if (clazz == RelatedSoftware.class) {
|
||||||
return (RT) new RelatedSoftware(sourceId, relType, ConversionUtils.oafSoftwareToBrokerSoftware((Software) target));
|
return (RT) new RelatedSoftware(sourceId, relType,
|
||||||
|
ConversionUtils.oafSoftwareToBrokerSoftware((Software) target));
|
||||||
} else if (clazz == RelatedDataset.class) {
|
} else if (clazz == RelatedDataset.class) {
|
||||||
return (RT) new RelatedDataset(sourceId, relType, ConversionUtils.oafDatasetToBrokerDataset((Dataset) target));
|
return (RT) new RelatedDataset(sourceId, relType,
|
||||||
|
ConversionUtils.oafDatasetToBrokerDataset((Dataset) target));
|
||||||
} else if (clazz == RelatedPublication.class) {
|
} else if (clazz == RelatedPublication.class) {
|
||||||
return (RT) new RelatedPublication(sourceId, relType, ConversionUtils.oafPublicationToBrokerPublication((Publication) target));
|
return (RT) new RelatedPublication(sourceId, relType,
|
||||||
|
ConversionUtils.oafPublicationToBrokerPublication((Publication) target));
|
||||||
} else {
|
} else {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue