some wf fixing

This commit is contained in:
Michele Artini 2020-06-18 13:15:13 +02:00
parent 9a847b4557
commit e659b02e6b
1 changed files with 31 additions and 15 deletions

View File

@ -50,7 +50,8 @@ public class GenerateEventsApplication {
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString(GenerateEventsApplication.class .toString(
GenerateEventsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json"))); .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json")));
parser.parseArgument(args); parser.parseArgument(args);
@ -119,11 +120,14 @@ public class GenerateEventsApplication {
return results return results
.joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner") .joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner")
.groupByKey((MapFunction<Tuple2<OpenaireBrokerResult, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING()) .groupByKey(
(MapFunction<Tuple2<OpenaireBrokerResult, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
.agg(aggr) .agg(aggr)
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) .map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class))
.filter(ResultGroup::isValid) .filter(ResultGroup::isValid)
.map((MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig), Encoders.kryo(EventGroup.class)) .map(
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig),
Encoders.kryo(EventGroup.class))
.flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class)); .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class));
} }
@ -133,7 +137,8 @@ public class GenerateEventsApplication {
final Class<SRC> sourceClass) { final Class<SRC> sourceClass) {
final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class); final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class);
final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath(
spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class); final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class);
final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class); final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class);
@ -141,14 +146,17 @@ public class GenerateEventsApplication {
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
.cache(); .cache();
final Dataset<OpenaireBrokerResult> r0 = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) final Dataset<OpenaireBrokerResult> r0 = readPath(
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class)
.filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getDataInfo().getDeletedbyinference())
.map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class));
final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
final Dataset<OpenaireBrokerResult> r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class)); final Dataset<OpenaireBrokerResult> r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class));
final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class)); final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class));
final Dataset<OpenaireBrokerResult> r4 = join(r3, rels, relatedEntities(publications, rels, RelatedProject.class));; final Dataset<OpenaireBrokerResult> r4 = join(
r3, rels, relatedEntities(publications, rels, RelatedProject.class));
;
return r4; return r4;
} }
@ -158,7 +166,9 @@ public class GenerateEventsApplication {
final Class<RT> clazz) { final Class<RT> clazz) {
return rels return rels
.joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner") .joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner")
.map(t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), Encoders.kryo(clazz)); .map(
t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz),
Encoders.kryo(clazz));
} }
private static <T> Dataset<OpenaireBrokerResult> join(final Dataset<OpenaireBrokerResult> sources, private static <T> Dataset<OpenaireBrokerResult> join(final Dataset<OpenaireBrokerResult> sources,
@ -166,10 +176,13 @@ public class GenerateEventsApplication {
final Dataset<T> typedRels) { final Dataset<T> typedRels) {
final TypedColumn<Tuple2<OpenaireBrokerResult, T>, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator<T>() final TypedColumn<Tuple2<OpenaireBrokerResult, T>, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator<T>()
.toColumn();; .toColumn();
;
return sources.joinWith(typedRels, sources.col("openaireId").equalTo(rels.col("source")), "left_outer") return sources
.groupByKey((MapFunction<Tuple2<OpenaireBrokerResult, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .joinWith(typedRels, sources.col("openaireId").equalTo(rels.col("source")), "left_outer")
.groupByKey(
(MapFunction<Tuple2<OpenaireBrokerResult, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
.agg(aggr) .agg(aggr)
.map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class)); .map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class));
@ -190,8 +203,11 @@ public class GenerateEventsApplication {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
final String conf = isLookUpService final String conf = isLookUpService
.getResourceProfileByQuery(String .getResourceProfileByQuery(
.format("for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", profId)); String
.format(
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
profId));
final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class); final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class);
dedupConfig.getPace().initModel(); dedupConfig.getPace().initModel();