first implementation of the BETA & PROD graphs merge procedure

This commit is contained in:
Claudio Atzori 2020-07-08 16:54:26 +02:00
parent e2ea30f89d
commit 610d377d57
6 changed files with 1242 additions and 1 deletions

View File

@ -0,0 +1,130 @@
package eu.dnetlib.dhp.oa.graph.merge;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.util.Objects;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
/**
* Combines the content from two aggregator graph tables of the same type, entities (or relationships) with the same ids
* are picked preferring those from the BETA aggregator rather then from PROD. The identity of a relationship is defined
* by eu.dnetlib.dhp.schema.common.ModelSupport#idFn()
*/
public class MergeGraphSparkJob {
private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
CleanGraphSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String betaInputPath = parser.get("betaInputPath");
log.info("betaInputPath: {}", betaInputPath);
String prodInputPath = parser.get("prodInputPath");
log.info("prodInputPath: {}", prodInputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
mergeGraphTable(spark, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath);
});
}
private static <P extends Oaf, B extends Oaf> void mergeGraphTable(
SparkSession spark,
String betaInputPath,
String prodInputPath,
Class<P> p_clazz,
Class<B> b_clazz,
String outputPath) {
Dataset<Tuple2<String, B>> beta = readTableFromPath(spark, betaInputPath, b_clazz);
Dataset<Tuple2<String, P>> prod = readTableFromPath(spark, prodInputPath, p_clazz);
prod.joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer")
.map((MapFunction<Tuple2<Tuple2<String, P>, Tuple2<String, B>>, P>) value -> {
Optional<P> p = Optional.ofNullable(value._1()).map(Tuple2::_2);
Optional<B> b = Optional.ofNullable(value._2()).map(Tuple2::_2);
if (p.isPresent() & !b.isPresent()) {
return p.get();
}
if (b.isPresent()) {
return (P) b.get();
}
return null;
}, Encoders.bean(p_clazz))
.filter((FilterFunction<P>) Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
private static <T extends Oaf> Dataset<Tuple2<String, T>> readTableFromPath(
SparkSession spark, String inputEntityPath, Class<T> clazz) {
log.info("Reading Graph table from: {}", inputEntityPath);
return spark
.read()
.textFile(inputEntityPath)
.map(
(MapFunction<String, Tuple2<String, T>>) value -> {
final T t = OBJECT_MAPPER.readValue(value, clazz);
final String id = ModelSupport.idFn().apply(t);
return new Tuple2<>(id, t);
},
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -0,0 +1,282 @@
<workflow-app name="merge graphs" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>betaInputGgraphPath</name>
<description>the beta graph root path</description>
</property>
<property>
<name>prodInputGgraphPath</name>
<description>the production graph root path</description>
</property>
<property>
<name>graphOutputPath</name>
<description>the output merged graph root path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<start to="fork_merge_graph"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<fork name="fork_merge_graph">
<path start="merge_publication"/>
<path start="merge_dataset"/>
<path start="merge_otherresearchproduct"/>
<path start="merge_software"/>
<path start="merge_datasource"/>
<path start="merge_organization"/>
<path start="merge_project"/>
<path start="merge_relation"/>
</fork>
<action name="merge_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge publications</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/publication</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/publication</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge datasets</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/dataset</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/dataset</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge otherresearchproducts</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/otherresearchproduct</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge softwares</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/software</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/software</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_datasource">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge datasources</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/datasource</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/datasource</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_organization">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge organizations</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/organization</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/organization</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge projects</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/project</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/project</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_relation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge relations</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/relation</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/relation</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<join name="wait_merge" to="End"/>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,32 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "bin",
"paramLongName": "betaInputPath",
"paramDescription": "the beta graph root path",
"paramRequired": true
},
{
"paramName": "pin",
"paramLongName": "prodInputPath",
"paramDescription": "the production graph root path",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the output merged graph root path",
"paramRequired": true
},
{
"paramName": "class",
"paramLongName": "graphTableClassName",
"paramDescription": "class name moelling the graph table",
"paramRequired": true
}
]

View File

@ -0,0 +1,779 @@
<RESOURCE_PROFILE>
<HEADER>
<RESOURCE_IDENTIFIER value="b05c97e6-69b5-497d-87fd-2137d3ff2c2e_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/>
<RESOURCE_TYPE value="WorkflowDSResourceType"/>
<RESOURCE_KIND value="WorkflowDSResources"/>
<RESOURCE_URI value=""/>
<DATE_OF_CREATION value="2020-06-15T16:35:09+00:00"/>
</HEADER>
<BODY>
<WORKFLOW_NAME>Graph Construction [HYBRID]</WORKFLOW_NAME>
<WORKFLOW_TYPE>Data Provision</WORKFLOW_TYPE>
<WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
<CONFIGURATION start="manual">
<NODE isStart="true" name="reuseProdContent" type="SetEnvParameter">
<DESCRIPTION>reuse cached content from the PROD aggregation system</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">reuseProdContent</PARAM>
<PARAM function="validValues(['true', 'false'])" managedBy="user" name="parameterValue" required="true" type="string">true</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="contentPathProd"/>
</ARCS>
</NODE>
<NODE name="contentPathProd" type="SetEnvParameter">
<DESCRIPTION>set the PROD aggregator content path</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">prodContentPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_aggregator</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="prodAggregatorGraphPath"/>
</ARCS>
</NODE>
<NODE name="prodAggregatorGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the path containing the PROD AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">prodAggregatorGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/00_prod_graph_aggregator</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="reuseBetaContent" type="SetEnvParameter">
<DESCRIPTION>reuse cached content from the BETA aggregation system</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">reuseBetaContent</PARAM>
<PARAM function="validValues(['true', 'false'])" managedBy="user" name="parameterValue" required="true" type="string">true</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="contentPathBeta"/>
</ARCS>
</NODE>
<NODE name="contentPathBeta" type="SetEnvParameter">
<DESCRIPTION>set the BETA aggregator content path</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">betaContentPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_aggregator</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="betaAggregatorGraphPath"/>
</ARCS>
</NODE>
<NODE name="betaAggregatorGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the path containing the BETA AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">betaAggregatorGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/00_beta_graph_aggregator</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="setIsLookUpUrl"/>
</ARCS>
</NODE>
<NODE name="setIsLookUpUrl" type="SetEnvParameter">
<DESCRIPTION>Set the IS lookup service address</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">isLookUpUrl</PARAM>
<PARAM managedBy="system" name="parameterValue" required="true" type="string">http://services.openaire.eu:8280/is/services/isLookUp?wsdl</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setMergedGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the MERGED graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">mergedGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/01_graph_merged</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setRawGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the RAW graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">rawGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/02_graph_raw</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setDedupGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the DEDUPED graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">dedupGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/03_graph_dedup</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setInferredGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the INFERRED graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">inferredGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/04_graph_inferred</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setConsistentGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the CONSISTENCY graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">consistentGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/05_graph_consistent</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setOrcidGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the ORCID enriched graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">orcidGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/06_graph_orcid</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setBulkTaggingGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the BULK TAGGED graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">bulkTaggingGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/07_graph_bulktagging</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setAffiliationGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the AFFILIATION from INSTITUTIONAL REPOS graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">affiliationGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/08_graph_affiliation</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setCommunityOrganizationGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the COMMUNITY from SELECTED SOURCES graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">communityOrganizationGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/09_graph_comunity_organization</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setFundingGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the FUNDING from SEMANTIC RELATION graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">fundingGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/10_graph_funding</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setCommunitySemRelGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the COMMUNITY from SEMANTIC RELATION graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">communitySemRelGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/11_graph_comunity_sem_rel</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setCountryGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the COUNTRY enriched graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">countryGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/12_graph_country</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setCleanedGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the CLEANED graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">cleanedGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/13_graph_cleaned</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setBlacklistedGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the blacklisted graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">blacklistedGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/14_graph_blacklisted</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setBulkTaggingPathMap" type="SetEnvParameter">
<DESCRIPTION>Set the map of paths for the Bulk Tagging</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">bulkTaggingPathMap</PARAM>
<PARAM managedBy="system" name="parameterValue" required="true" type="string">{"author" : "$['author'][*]['fullname']", "title" : "$['title'][*]['value']", "orcid" : "$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']", "contributor" : "$['contributor'][*]['value']", "description" : "$['description'][*]['value']"}</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setPropagationOrganizationCommunityMap" type="SetEnvParameter">
<DESCRIPTION>Set the map of associations organization, community list for the propagation of community to result through organization</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">propagationOrganizationCommunityMap</PARAM>
<PARAM managedBy="system" name="parameterValue" required="true" type="string">{"20|corda__h2020::3fb05a9524c3f790391261347852f638":["mes","euromarine"], "20|corda__h2020::e8dbe14cca9bf6fce09d468872f813f8":["mes","euromarine"], "20|snsf________::9b253f265e3bef5cae6d881fdf61aceb":["mes","euromarine"],"20|rcuk________::e054eea0a47665af8c3656b5785ccf76":["mes","euromarine"],"20|corda__h2020::edc18d67c9b11fb616ca9f6e1db1b151":["mes","euromarine"],"20|rcuk________::d5736d9da90521ddcdc7828a05a85e9a":["mes","euromarine"],"20|corda__h2020::f5d418d3aa1cf817ddefcc3fdc039f27":["mes","euromarine"],"20|snsf________::8fa091f8f25a846779acb4ea97b50aef":["mes","euromarine"],"20|corda__h2020::81e020977211c2c40fae2e1a50bffd71":["mes","euromarine"],"20|corda_______::81e020977211c2c40fae2e1a50bffd71":["mes","euromarine"],"20|snsf________::31d0a100e54e3cdb3c6f52d91e638c78":["mes","euromarine"],"20|corda__h2020::ea379ef91b8cc86f9ac5edc4169292db":["mes","euromarine"],"20|corda__h2020::f75ee2ee48e5cb0ec8c8d30aaa8fef70":["mes","euromarine"],"20|rcuk________::e16010089551a1a9182a94604fc0ea59":["mes","euromarine"],"20|corda__h2020::38531a2cce7c5c347ffc439b07c1f43b":["mes","euromarine"],"20|corda_______::38531a2cce7c5c347ffc439b07c1f43b":["mes","euromarine"],"20|grid________::b2cbbf5eadbbf87d534b022bad3191d7":["mes","euromarine"],"20|snsf________::74730ef1439d7f7636a8be58a6b471b8":["mes","euromarine"],"20|nsf_________::ad72e19043a5a467e35f9b444d11563e":["mes","euromarine"],"20|rcuk________::0fc3e92500290902a2d38ec2445e74c3":["mes","euromarine"],"20|grid________::ad2c29905da0eb3c06b3fa80cacd89ea":["mes","euromarine"],"20|corda__h2020::30b53e4d63d3724f00acb9cbaca40860":["mes","euromarine"],"20|corda__h2020::f60f84bee14ad93f0db0e49af1d5c317":["mes","euromarine"], "20|corda__h2020::7bf251ac3765b5e89d82270a1763d09f":["mes","euromarine"], "20|corda__h2020::65531bd11be9935948c7f2f4db1c1832":["mes","euromarine"], "20|corda__h2020::e0e98f86bbc76638bbb72a8fe2302946":["mes","euromarine"], "20|snsf________::3eb43582ac27601459a8d8b3e195724b":["mes","euromarine"], "20|corda__h2020::af2481dab65d06c8ea0ae02b5517b9b6":["mes","euromarine"], "20|corda__h2020::c19d05cfde69a50d3ebc89bd0ee49929":["mes","euromarine"], "20|corda__h2020::af0bfd9fc09f80d9488f56d71a9832f0":["mes","euromarine"], "20|rcuk________::f33c02afb0dc66c49d0ed97ca5dd5cb0":["beopen"],
"20|grid________::a867f78acdc5041b34acfe4f9a349157":["beopen"], "20|grid________::7bb116a1a9f95ab812bf9d2dea2be1ff":["beopen"], "20|corda__h2020::6ab0e0739dbe625b99a2ae45842164ad":["beopen"], "20|corda__h2020::8ba50792bc5f4d51d79fca47d860c602":["beopen"], "20|corda_______::8ba50792bc5f4d51d79fca47d860c602":["beopen"], "20|corda__h2020::e70e9114979e963eef24666657b807c3":["beopen"], "20|corda_______::e70e9114979e963eef24666657b807c3":["beopen"], "20|corda_______::15911e01e9744d57205825d77c218737":["beopen"], "20|opendoar____::056a41e24e2a9a67215e87bbee6a80ab":["beopen"], "20|opendoar____::7f67f2e6c6fbb0628f8160fcd3d92ae3":["beopen"], "20|grid________::a8ecfd7c084e561168bcbe6bf0daf3e3":["beopen"], "20|corda_______::7bbe6cc5d8ec1864739a04b0d020c9e9":["beopen"], "20|corda_______::3ff558e30c2e434d688539548300b050":["beopen"], "20|corda__h2020::5ffee5b3b83b33a8cf0e046877bd3a39":["beopen"], "20|corda__h2020::5187217e2e806a6df3579c46f82401bc":["beopen"], "20|grid________::5fa7e2709bcd945e26bfa18689adeec1":["beopen"], "20|corda_______::d8696683c53027438031a96ad27c3c07":["beopen"], "20|corda__h2020::d8696683c53027438031a96ad27c3c07":["beopen"], "20|rcuk________::23a79ebdfa59790864e4a485881568c1":["beopen"], "20|corda__h2020::b76cf8fe49590a966953c37e18608af9":["beopen"], "20|grid________::d2f0204126ee709244a488a4cd3b91c2":["beopen"], "20|corda__h2020::05aba9d2ed17533d15221e5655ac11e6":["beopen"], "20|grid________::802401579481dc32062bdee69f5e6a34":["beopen"], "20|corda__h2020::3f6d9d54cac975a517ba6b252c81582d":["beopen"]}
</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setDedupConfig" type="SetEnvParameter">
<DESCRIPTION>Set the dedup orchestrator name</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">dedupConfig</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">decisiontree-dedup-test</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="actionSetsRaw" type="SetEnvParameter">
<DESCRIPTION>declares the ActionSet ids to promote in the RAW graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">actionSetIdsRawGraph</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">scholexplorer-dump,gridac-dump,doiboost-organizations,doiboost,orcidworks-no-doi,iis-wos-entities,iis-entities-software,iis-entities-patent</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="actionSetsIIS" type="SetEnvParameter">
<DESCRIPTION>declares the ActionSet ids to promote in the INFERRED graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">actionSetIdsIISGraph</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">iis-researchinitiative,iis-document-citations,iis-document-affiliation,iis-document-classes,iis-document-similarities,iis-referenced-datasets-main,iis-referenced-datasets-preprocessing,iis-referenced-projects-main,iis-referenced-projects-preprocessing,iis-referenceextraction-pdb,document_software_url,iis-extracted-metadata,iis-communities,iis-referenced-patents,iis-covid-19</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isJoin="true" name="waitConfig">
<DESCRIPTION>wait configurations</DESCRIPTION>
<PARAMETERS/>
<ARCS>
<ARC to="betaAggregatorGraph"/>
<ARC to="prodAggregatorGraph"/>
</ARCS>
</NODE>
<NODE name="betaAggregatorGraph" type="SubmitHadoopJob">
<DESCRIPTION>create the AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'graphOutputPath' : 'betaAggregatorGraphPath',
'isLookupUrl' : 'isLookUpUrl',
'reuseContent' : 'reuseBetaContent',
'contentPath' : 'betaContentPath'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/graph/raw_all/oozie_app',
'mongoURL' : 'mongodb://beta.services.openaire.eu',
'mongoDb' : 'mdstore',
'postgresURL' : 'jdbc:postgresql://beta.services.openaire.eu:5432/dnet_openaireplus',
'postgresUser' : 'dnet',
'postgresPassword' : '',
'workingDir' : '/tmp/core_provision/working_dir/beta_aggregator'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitAggregatorGraph"/>
</ARCS>
</NODE>
<NODE name="prodAggregatorGraph" type="SubmitHadoopJob">
<DESCRIPTION>create the AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'graphOutputPath' : 'prodAggregatorGraphPath',
'isLookupUrl' : 'isLookUpUrl',
'reuseContent' : 'reuseProdContent',
'contentPath' : 'prodContentPath'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/graph/raw_all/oozie_app',
'mongoURL' : 'mongodb://services.openaire.eu',
'mongoDb' : 'mdstore',
'postgresURL' : 'jdbc:postgresql://postgresql.services.openaire.eu:5432/dnet_openaireplus',
'postgresUser' : 'dnet',
'postgresPassword' : '',
'workingDir' : '/tmp/core_provision/working_dir/prod_aggregator'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitAggregatorGraph"/>
</ARCS>
</NODE>
<NODE isJoin="true" name="waitAggregatorGraph">
<DESCRIPTION>wait configurations</DESCRIPTION>
<PARAMETERS/>
<ARCS>
<ARC to="mergeAggregatorGraphs"/>
</ARCS>
</NODE>
<NODE name="mergeAggregatorGraphs" type="SubmitHadoopJob">
<DESCRIPTION>create the AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'betaInputGgraphPath' : 'betaAggregatorGraphPath',
'prodInputGgraphPath' : 'prodAggregatorGraphPath',
'graphOutputPath' : 'mergedGraphPath'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/graph/merge/oozie_app',
'workingDir' : '/tmp/core_provision/working_dir/merge_graph'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="promoteActionsRaw"/>
</ARCS>
</NODE>
<NODE name="promoteActionsRaw" type="SubmitHadoopJob">
<DESCRIPTION>create the RAW graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'inputActionSetIds' : 'actionSetIdsRawGraph',
'inputGraphRootPath' : 'mergedGraphPath',
'outputGraphRootPath' : 'rawGraphPath',
'isLookupUrl' : 'isLookUpUrl'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/actionmanager/wf/main/oozie_app',
'sparkExecutorCores' : '3',
'sparkExecutorMemory' : '10G',
'activePromoteDatasetActionPayload' : 'true',
'activePromoteDatasourceActionPayload' : 'true',
'activePromoteOrganizationActionPayload' : 'true',
'activePromoteOtherResearchProductActionPayload' : 'true',
'activePromoteProjectActionPayload' : 'true',
'activePromotePublicationActionPayload' : 'true',
'activePromoteRelationActionPayload' : 'true',
'activePromoteResultActionPayload' : 'true',
'activePromoteSoftwareActionPayload' : 'true',
'mergeAndGetStrategy' : 'MERGE_FROM_AND_GET',
'workingDir' : '/tmp/core_provision/working_dir/promoteActionsRaw'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="duplicateScan"/>
</ARCS>
</NODE>
<NODE name="duplicateScan" type="SubmitHadoopJob">
<DESCRIPTION>search for duplicates in the raw graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'actionSetId' : 'dedupConfig',
'graphBasePath' : 'rawGraphPath',
'dedupGraphPath': 'dedupGraphPath',
'isLookUpUrl' : 'isLookUpUrl'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/dedup/scan/oozie_app',
'workingPath' : '/tmp/core_provision/working_dir/dedup'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="promoteActionsIIS"/>
</ARCS>
</NODE>
<NODE name="promoteActionsIIS" type="SubmitHadoopJob">
<DESCRIPTION>create the INFERRED graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'inputActionSetIds' : 'actionSetIdsIISGraph',
'inputGraphRootPath' : 'dedupGraphPath',
'outputGraphRootPath' : 'inferredGraphPath',
'isLookupUrl' : 'isLookUpUrl'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/actionmanager/wf/main/oozie_app',
'sparkExecutorCores' : '3',
'sparkExecutorMemory' : '10G',
'activePromoteDatasetActionPayload' : 'true',
'activePromoteDatasourceActionPayload' : 'true',
'activePromoteOrganizationActionPayload' : 'true',
'activePromoteOtherResearchProductActionPayload' : 'true',
'activePromoteProjectActionPayload' : 'true',
'activePromotePublicationActionPayload' : 'true',
'activePromoteRelationActionPayload' : 'true',
'activePromoteResultActionPayload' : 'true',
'activePromoteSoftwareActionPayload' : 'true',
'mergeAndGetStrategy' : 'MERGE_FROM_AND_GET',
'workingDir' : '/tmp/core_provision/working_dir/promoteActionsIIS'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="dedupConsistency"/>
</ARCS>
</NODE>
<NODE name="dedupConsistency" type="SubmitHadoopJob">
<DESCRIPTION>mark duplicates as deleted and redistribute the relationships</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'graphBasePath' : 'inferredGraphPath',
'dedupGraphPath': 'consistentGraphPath'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/dedup/consistency/oozie_app',
'workingPath' : '/tmp/core_provision/working_dir/dedup'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="orcidPropagation"/>
</ARCS>
</NODE>
<NODE name="orcidPropagation" type="SubmitHadoopJob">
<DESCRIPTION>propagates ORCID among results linked by allowedsemrels semantic relationships</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'sourcePath' : 'consistentGraphPath',
'outputPath': 'orcidGraphPath'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/enrichment/orcidtoresultfromsemrel/oozie_app',
'workingDir' : '/tmp/core_provision/working_dir/orcid',
'allowedsemrels' : 'isSupplementedBy;isSupplementTo',
'saveGraph' : 'true'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="bulkTagging"/>
</ARCS>
</NODE>
<NODE name="bulkTagging" type="SubmitHadoopJob">
<DESCRIPTION>mark results respecting some rules as belonging to communities</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'sourcePath' : 'orcidGraphPath',
'outputPath': 'bulkTaggingGraphPath',
'isLookUpUrl' : 'isLookUpUrl',
'pathMap' : 'bulkTaggingPathMap'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/enrichment/bulktag/oozie_app',
'workingDir' : '/tmp/core_provision/working_dir/bulktag'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="affiliationPropagation"/>
</ARCS>
</NODE>
<NODE name="affiliationPropagation" type="SubmitHadoopJob">
<DESCRIPTION>creates relashionships between results and organizations when the organizations are associated to institutional repositories</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'sourcePath' : 'bulkTaggingGraphPath',
'outputPath': 'affiliationGraphPath'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/enrichment/affiliation/oozie_app',
'workingDir' : '/tmp/core_provision/working_dir/affiliation',
'saveGraph' : 'true'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="communityOrganizationPropagation"/>
</ARCS>
</NODE>
<NODE name="communityOrganizationPropagation" type="SubmitHadoopJob">
<DESCRIPTION>marks as belonging to communities the result collected from datasources related to the organizations specified in the organizationCommunityMap</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'sourcePath' : 'affiliationGraphPath',
'outputPath': 'communityOrganizationGraphPath',
'organizationtoresultcommunitymap': 'propagationOrganizationCommunityMap'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/enrichment/community_organization/oozie_app',
'workingDir' : '/tmp/core_provision/working_dir/community_organization',
'saveGraph' : 'true'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="resultProjectPropagation"/>
</ARCS>
</NODE>
<NODE name="resultProjectPropagation" type="SubmitHadoopJob">
<DESCRIPTION>created relation between projects and results linked to other results trough allowedsemrel semantic relations linked to projects</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'sourcePath' : 'communityOrganizationGraphPath',
'outputPath': 'fundingGraphPath'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/enrichment/funding/oozie_app',
'workingDir' : '/tmp/core_provision/working_dir/funding',
'allowedsemrels' : 'isSupplementedBy;isSupplementTo',
'saveGraph' : 'true'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="communitySemrelPropagation"/>
</ARCS>
</NODE>
<NODE name="communitySemrelPropagation" type="SubmitHadoopJob">
<DESCRIPTION>tag as belonging to communitites result in in allowedsemrels relation with other result already linked to communities </DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'sourcePath' : 'fundingGraphPath',
'outputPath': 'communitySemRelGraphPath',
'isLookUpUrl' : 'isLookUpUrl'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/enrichment/community_semrel/oozie_app',
'workingDir' : '/tmp/core_provision/working_dir/community_semrel',
'allowedsemrels' : 'isSupplementedBy;isSupplementTo',
'saveGraph' : 'true'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="countryPropagation"/>
</ARCS>
</NODE>
<NODE name="countryPropagation" type="SubmitHadoopJob">
<DESCRIPTION>associated to results colleced from allowedtypes and those in the whithelist the country of the organization(s) handling the datasource it is collected from </DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'sourcePath' : 'communitySemRelGraphPath',
'outputPath': 'countryGraphPath'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/enrichment/country/oozie_app',
'sparkExecutorCores' : '3',
'sparkExecutorMemory' : '10G',
'workingDir' : '/tmp/core_provision/working_dir/country',
'allowedtypes' : 'pubsrepository::institutional',
'whitelist' : '10|opendoar____::300891a62162b960cf02ce3827bb363c',
'saveGraph' : 'true'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="graphCleaning"/>
</ARCS>
</NODE>
<NODE name="graphCleaning" type="SubmitHadoopJob">
<DESCRIPTION>clean the properties in the graph typed as Qualifier according to the vocabulary indicated in schemeid</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'graphInputPath' : 'countryGraphPath',
'graphOutputPath': 'cleanedGraphPath',
'isLookupUrl': 'isLookUpUrl'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/graph/clean/oozie_app',
'workingPath' : '/tmp/core_provision/working_dir/clean'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="blacklistRelations"/>
</ARCS>
</NODE>
<NODE name="blacklistRelations" type="SubmitHadoopJob">
<DESCRIPTION>removes blacklisted relations </DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'sourcePath' : 'cleanedGraphPath',
'outputPath': 'blacklistedGraphPath'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/enrichment/blacklist/oozie_app',
'workingDir' : '/tmp/core_provision/working_dir/blacklist',
'postgresURL' : 'jdbc:postgresql://beta.services.openaire.eu:5432/dnet_openaireplus',
'postgresUser' : 'dnet',
'postgresPassword' : ''
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="success"/>
</ARCS>
</NODE>
</CONFIGURATION>
<STATUS>
<LAST_EXECUTION_ID>wf_20200615_163630_609</LAST_EXECUTION_ID>
<LAST_EXECUTION_DATE>2020-06-15T17:08:00+00:00</LAST_EXECUTION_DATE>
<LAST_EXECUTION_STATUS>SUCCESS</LAST_EXECUTION_STATUS>
<LAST_EXECUTION_ERROR/>
</STATUS>
</BODY>
</RESOURCE_PROFILE>

View File

@ -413,7 +413,7 @@
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM> <PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS> </PARAMETERS>
<ARCS> <ARCS>
<ARC to="graphCleaning"/> <ARC to="orcidPropagation"/>
</ARCS> </ARCS>
</NODE> </NODE>