diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DispatchEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java similarity index 96% rename from dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DispatchEntitiesSparkJob.java rename to dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java index ea738836b5..caa9e8106a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DispatchEntitiesSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java @@ -1,11 +1,11 @@ -package eu.dnetlib.dhp.oa.dedup; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.util.Objects; -import java.util.Optional; +package eu.dnetlib.dhp.oa.merge; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; @@ -17,12 +17,10 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Objects; +import java.util.Optional; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.OafEntity; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; public class DispatchEntitiesSparkJob { @@ -38,7 +36,7 @@ public class DispatchEntitiesSparkJob { .requireNonNull( DispatchEntitiesSparkJob.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/dispatch_entities_parameters.json"))); + "/eu/dnetlib/dhp/oa/graph/group/dispatch_entities_parameters.json"))); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java similarity index 98% rename from dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java rename to dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java index a19f863808..771eba8731 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java @@ -1,15 +1,17 @@ -package eu.dnetlib.dhp.oa.dedup; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; - -import java.io.IOException; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.stream.Collectors; +package eu.dnetlib.dhp.oa.merge; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.Option; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; @@ -20,21 +22,17 @@ import org.apache.spark.sql.*; import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.jayway.jsonpath.Configuration; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.JsonPath; -import com.jayway.jsonpath.Option; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import scala.Tuple2; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; + /** * Groups the graph content by entity identifier to ensure ID uniqueness */ @@ -53,7 +51,7 @@ public class GroupEntitiesSparkJob { .toString( GroupEntitiesSparkJob.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/group_graph_entities_parameters.json")); + "/eu/dnetlib/dhp/oa/graph/group/group_graph_entities_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java index 720fe47fb2..bbdd59975b 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java @@ -47,6 +47,17 @@ public class OafMapperUtils { } public static Result mergeResults(Result left, Result right) { + + final boolean leftFromDeletedAuthority = isFromDeletedAuthority(left); + final boolean rightFromDeletedAuthority = isFromDeletedAuthority(right); + + if (leftFromDeletedAuthority && !rightFromDeletedAuthority) { + return left; + } + if (!leftFromDeletedAuthority && rightFromDeletedAuthority) { + return right; + } + if (new ResultTypeComparator().compare(left, right) < 0) { left.mergeFrom(right); return left; @@ -56,6 +67,13 @@ public class OafMapperUtils { } } + private static boolean isFromDeletedAuthority(Result r) { + return r.getInstance() + .stream() + .map(i -> i.getCollectedfrom().getKey()) + .anyMatch(cfId -> IdentifierFactory.delegatedAuthorityDatasourceIds().contains(cfId)); + } + public static KeyValue keyValue(final String k, final String v) { final KeyValue kv = new KeyValue(); kv.setKey(k); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml index 4ea0039263..7c500493f9 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml @@ -104,7 +104,7 @@ yarn cluster group graph entities - eu.dnetlib.dhp.oa.dedup.GroupEntitiesSparkJob + eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob dhp-dedup-openaire-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -138,7 +138,7 @@ yarn cluster Dispatch publications - eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob + eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob dhp-dedup-openaire-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -163,7 +163,7 @@ yarn cluster Dispatch project - eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob + eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob dhp-dedup-openaire-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -188,7 +188,7 @@ yarn cluster Dispatch organization - eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob + eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob dhp-dedup-openaire-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -213,7 +213,7 @@ yarn cluster Dispatch publication - eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob + eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob dhp-dedup-openaire-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -238,7 +238,7 @@ yarn cluster Dispatch dataset - eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob + eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob dhp-dedup-openaire-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -263,7 +263,7 @@ yarn cluster Dispatch software - eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob + eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob dhp-dedup-openaire-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -288,7 +288,7 @@ yarn cluster Dispatch otherresearchproduct - eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob + eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob dhp-dedup-openaire-${projectVersion}.jar --executor-cores=${sparkExecutorCores} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/dispatch_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/dispatch_entities_parameters.json new file mode 100644 index 0000000000..aa8d2a7c22 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/dispatch_entities_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "i", + "paramLongName": "inputPath", + "paramDescription": "the source path", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "outputPath", + "paramDescription": "path of the output graph", + "paramRequired": true + }, + { + "paramName": "c", + "paramLongName": "graphTableClassName", + "paramDescription": "the graph entity class name", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/group_graph_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/group_graph_entities_parameters.json new file mode 100644 index 0000000000..e65acb3c43 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/group_graph_entities_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "gin", + "paramLongName": "graphInputPath", + "paramDescription": "the graph root path", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the output merged graph root path", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/config-default.xml new file mode 100644 index 0000000000..2e0ed9aeea --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml new file mode 100644 index 0000000000..883d0e1fb3 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml @@ -0,0 +1,289 @@ + + + + graphBasePath + the input graph base path + + + workingPath + path of the working directory + + + graphOutputPath + path of the output graph + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + group graph entities + eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=15000 + + --graphInputPath${graphBasePath} + --outputPath${workingPath}/grouped_entities + + + + + + + + + + + + + + + + + + yarn + cluster + Dispatch publications + eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${workingPath}/grouped_entities + --outputPath${graphOutputPath}/datasource + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource + + + + + + + + yarn + cluster + Dispatch project + eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${workingPath}/grouped_entities + --outputPath${graphOutputPath}/project + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project + + + + + + + + yarn + cluster + Dispatch organization + eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${workingPath}/grouped_entities + --outputPath${graphOutputPath}/organization + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization + + + + + + + + yarn + cluster + Dispatch publication + eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${workingPath}/grouped_entities + --outputPath${graphOutputPath}/publication + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication + + + + + + + + yarn + cluster + Dispatch dataset + eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${workingPath}/grouped_entities + --outputPath${graphOutputPath}/dataset + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset + + + + + + + + yarn + cluster + Dispatch software + eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${workingPath}/grouped_entities + --outputPath${graphOutputPath}/software + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software + + + + + + + + yarn + cluster + Dispatch otherresearchproduct + eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${workingPath}/grouped_entities + --outputPath${graphOutputPath}/otherresearchproduct + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 7a026e668a..86c3b4526f 100644 --- a/pom.xml +++ b/pom.xml @@ -797,7 +797,7 @@ 3.3.3 3.4.2 [2.12,3.0) - [2.10.24] + [2.10.26-SNAPSHOT] [4.0.3] [6.0.5] [3.1.6]