diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml index eddc042c6b..cd3257991f 100644 --- a/dhp-workflows/dhp-broker-events/pom.xml +++ b/dhp-workflows/dhp-broker-events/pom.xml @@ -24,11 +24,7 @@ org.apache.spark spark-sql_2.11 - - org.apache.spark - spark-hive_2.11 - test - + eu.dnetlib.dhp @@ -45,10 +41,6 @@ dnet-pace-core - - com.jayway.jsonpath - json-path - dom4j dom4j diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index c1f12f43c5..980b01fe18 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -31,7 +31,6 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAg import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Relation; @@ -51,9 +50,8 @@ public class GenerateEventsApplication { public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - GenerateEventsApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json"))); + .toString(GenerateEventsApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -78,18 +76,21 @@ public class GenerateEventsApplication { conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.registerKryoClasses(BrokerConstants.getModelClasses()); - final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId); + // TODO UNCOMMENT + // final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId); + final DedupConfig dedupConfig = null; runWithSparkSession(conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, eventsPath); + // TODO UNCOMMENT spark .emptyDataset(Encoders.kryo(Event.class)) .union(generateEvents(spark, graphPath, Publication.class, dedupConfig)) - .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig)) - .union(generateEvents(spark, graphPath, Software.class, dedupConfig)) - .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig)) + // .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig)) + // .union(generateEvents(spark, graphPath, Software.class, dedupConfig)) + // .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") @@ -117,15 +118,12 @@ public class GenerateEventsApplication { .toColumn(); return results - .joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner") - .groupByKey( - (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) + .joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner") + .groupByKey((MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) .agg(aggr) .map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) .filter(ResultGroup::isValid) - .map( - (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), - Encoders.kryo(EventGroup.class)) + .map((MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), Encoders.kryo(EventGroup.class)) .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class)); } @@ -133,9 +131,9 @@ public class GenerateEventsApplication { final SparkSession spark, final String graphPath, final Class sourceClass) { + final Dataset projects = readPath(spark, graphPath + "/project", Project.class); - final Dataset datasets = readPath( - spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); + final Dataset datasets = readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); final Dataset softwares = readPath(spark, graphPath + "/software", Software.class); final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class); @@ -143,17 +141,14 @@ public class GenerateEventsApplication { .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) .cache(); - final Dataset r0 = readPath( - spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) - .filter(r -> r.getDataInfo().getDeletedbyinference()) - .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); + final Dataset r0 = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) + .filter(r -> r.getDataInfo().getDeletedbyinference()) + .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class)); final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class)); - final Dataset r4 = join( - r3, rels, relatedEntities(publications, rels, RelatedProject.class)); - ; + final Dataset r4 = join(r3, rels, relatedEntities(publications, rels, RelatedProject.class));; return r4; } @@ -163,9 +158,7 @@ public class GenerateEventsApplication { final Class clazz) { return rels .joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner") - .map( - t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), - Encoders.kryo(clazz)); + .map(t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), Encoders.kryo(clazz)); } private static Dataset join(final Dataset sources, @@ -173,15 +166,13 @@ public class GenerateEventsApplication { final Dataset typedRels) { final TypedColumn, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator() - .toColumn(); - ; + .toColumn();; - return sources - .joinWith(typedRels, sources.col("result.id").equalTo(rels.col("source")), "left_outer") - .groupByKey( - (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) + return sources.joinWith(typedRels, sources.col("openaireId").equalTo(rels.col("source")), "left_outer") + .groupByKey((MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) .map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class)); + } public static Dataset readPath( @@ -195,14 +186,12 @@ public class GenerateEventsApplication { } private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception { + final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); final String conf = isLookUpService - .getResourceProfileByQuery( - String - .format( - "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", - profId)); + .getResourceProfileByQuery(String + .format("for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", profId)); final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class); dedupConfig.getPace().initModel(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java index 82d0178648..fca9cf89e8 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java @@ -63,8 +63,14 @@ public final class UpdateInfo { return target; } - private float calculateTrust(final DedupConfig dedupConfig, final OpenaireBrokerResult r1, + private float calculateTrust(final DedupConfig dedupConfig, + final OpenaireBrokerResult r1, final OpenaireBrokerResult r2) { + + if (dedupConfig == null) { + return BrokerConstants.MIN_TRUST; + } + try { final ObjectMapper objectMapper = new ObjectMapper(); final MapDocument doc1 = MapDocumentUtil diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index da573ae9cd..ea9aabcfce 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -78,21 +78,33 @@ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + - - - - - eu.dnetlib.dhp.broker.oa.GenerateEventsApplication + + yarn + cluster + GenerateEvents + eu.dnetlib.dhp.broker.oa.GenerateEventsApplication + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + --graphPath${graphInputPath} --eventsPath${eventsOutputPath} --isLookupUrl${isLookupUrl} --dedupConfProfile${dedupConfProfId} - + +