[Enrichment single step] modification of workflow ans some change in the classes

This commit is contained in:
Miriam Baglioni 2022-11-23 09:54:50 +01:00
parent b0969461f8
commit de9d0ace38
22 changed files with 167 additions and 167 deletions

View File

@ -8,6 +8,8 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import eu.dnetlib.dhp.countrypropagation.pojo.CountrySbs;
import eu.dnetlib.dhp.countrypropagation.pojo.DatasourceCountry;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;

View File

@ -7,12 +7,12 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.countrypropagation.pojo.CountrySbs;
import eu.dnetlib.dhp.countrypropagation.pojo.DatasourceCountry;
import eu.dnetlib.dhp.countrypropagation.pojo.ResultCountrySet;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;

View File

@ -9,6 +9,8 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.countrypropagation.pojo.CountrySbs;
import eu.dnetlib.dhp.countrypropagation.pojo.ResultCountrySet;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.countrypropagation; package eu.dnetlib.dhp.countrypropagation.pojo;
import java.io.Serializable; import java.io.Serializable;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.countrypropagation; package eu.dnetlib.dhp.countrypropagation.pojo;
import java.io.Serializable; import java.io.Serializable;

View File

@ -1,5 +1,7 @@
package eu.dnetlib.dhp.countrypropagation; package eu.dnetlib.dhp.countrypropagation.pojo;
import eu.dnetlib.dhp.countrypropagation.pojo.CountrySbs;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;

View File

@ -51,8 +51,7 @@ public class SparkResultToProjectThroughSemRelJob {
final String alreadyLinkedPath = parser.get("alreadyLinkedPath"); final String alreadyLinkedPath = parser.get("alreadyLinkedPath");
log.info("alreadyLinkedPath {}: ", alreadyLinkedPath); log.info("alreadyLinkedPath {}: ", alreadyLinkedPath);
final Boolean saveGraph = Boolean.valueOf(parser.get("saveGraph"));
log.info("saveGraph: {}", saveGraph);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
@ -60,11 +59,9 @@ public class SparkResultToProjectThroughSemRelJob {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath);
}
execPropagation( execPropagation(
spark, outputPath, alreadyLinkedPath, potentialUpdatePath, saveGraph); spark, outputPath, alreadyLinkedPath, potentialUpdatePath);
}); });
} }
@ -72,13 +69,12 @@ public class SparkResultToProjectThroughSemRelJob {
SparkSession spark, SparkSession spark,
String outputPath, String outputPath,
String alreadyLinkedPath, String alreadyLinkedPath,
String potentialUpdatePath, String potentialUpdatePath) {
Boolean saveGraph) {
Dataset<ResultProjectSet> toaddrelations = readPath(spark, potentialUpdatePath, ResultProjectSet.class); Dataset<ResultProjectSet> toaddrelations = readPath(spark, potentialUpdatePath, ResultProjectSet.class);
Dataset<ResultProjectSet> alreadyLinked = readPath(spark, alreadyLinkedPath, ResultProjectSet.class); Dataset<ResultProjectSet> alreadyLinked = readPath(spark, alreadyLinkedPath, ResultProjectSet.class);
if (saveGraph) {
toaddrelations toaddrelations
.joinWith( .joinWith(
alreadyLinked, alreadyLinked,
@ -89,7 +85,7 @@ public class SparkResultToProjectThroughSemRelJob {
.mode(SaveMode.Append) .mode(SaveMode.Append)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath); .json(outputPath);
}
} }
private static FlatMapFunction<Tuple2<ResultProjectSet, ResultProjectSet>, Relation> mapRelationRn() { private static FlatMapFunction<Tuple2<ResultProjectSet, ResultProjectSet>, Relation> mapRelationRn() {

View File

@ -56,11 +56,7 @@ public class SparkResultToCommunityFromOrganizationJob {
final String resultClassName = parser.get("resultTableName"); final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName); log.info("resultTableName: {}", resultClassName);
final Boolean saveGraph = Optional
.ofNullable(parser.get("saveGraph"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("saveGraph: {}", saveGraph);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName); Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
@ -72,10 +68,9 @@ public class SparkResultToCommunityFromOrganizationJob {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
removeOutputDir(spark, outputPath);
if (saveGraph) {
execPropagation(spark, inputPath, outputPath, resultClazz, possibleupdatespath); execPropagation(spark, inputPath, outputPath, resultClazz, possibleupdatespath);
}
}); });
} }

View File

@ -70,13 +70,10 @@ public class SparkResultToCommunityThroughSemRelJob {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath);
}
if (saveGraph) {
execPropagation( execPropagation(
spark, inputPath, outputPath, preparedInfoPath, resultClazz); spark, inputPath, outputPath, preparedInfoPath, resultClazz);
}
}); });
} }

View File

@ -98,13 +98,13 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
String leavesPath, String leavesPath,
String childParentPath, String childParentPath,
String resultOrganizationPath, String resultOrganizationPath,
String graphPath, String relationPath,
String workingPath, String workingPath,
String outputPath, String outputPath,
int iterations) { int iterations) {
if (iterations == 1) { if (iterations == 1) {
doPropagateOnce( doPropagateOnce(
spark, leavesPath, childParentPath, resultOrganizationPath, graphPath, spark, leavesPath, childParentPath, resultOrganizationPath, relationPath,
workingPath, outputPath); workingPath, outputPath);
} else { } else {
@ -123,26 +123,26 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
notReachedFirstParent); notReachedFirstParent);
doPropagate( doPropagate(
spark, leavesPath, childParentPath, resultOrganizationPath, graphPath, spark, leavesPath, childParentPath, resultOrganizationPath, relationPath,
workingPath, outputPath, propagationCounter); workingPath, outputPath, propagationCounter);
} }
} }
private static void doPropagateOnce(SparkSession spark, String leavesPath, String childParentPath, private static void doPropagateOnce(SparkSession spark, String leavesPath, String childParentPath,
String resultOrganizationPath, String graphPath, String workingPath, String resultOrganizationPath, String relationPath, String workingPath,
String outputPath) { String outputPath) {
StepActions StepActions
.execStep( .execStep(
spark, graphPath, workingPath + NEW_RELATION_PATH, spark, relationPath, workingPath + NEW_RELATION_PATH,
leavesPath, childParentPath, resultOrganizationPath); leavesPath, childParentPath, resultOrganizationPath);
addNewRelations(spark, workingPath + NEW_RELATION_PATH, outputPath); addNewRelations(spark, workingPath + NEW_RELATION_PATH, outputPath);
} }
private static void doPropagate(SparkSession spark, String leavesPath, String childParentPath, private static void doPropagate(SparkSession spark, String leavesPath, String childParentPath,
String resultOrganizationPath, String graphPath, String workingPath, String outputPath, String resultOrganizationPath, String relationPath, String workingPath, String outputPath,
PropagationCounter propagationCounter) { PropagationCounter propagationCounter) {
int iteration = 0; int iteration = 0;
long leavesCount; long leavesCount;
@ -151,7 +151,7 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
iteration++; iteration++;
StepActions StepActions
.execStep( .execStep(
spark, graphPath, workingPath + NEW_RELATION_PATH, spark, relationPath, workingPath + NEW_RELATION_PATH,
leavesPath, childParentPath, resultOrganizationPath); leavesPath, childParentPath, resultOrganizationPath);
StepActions StepActions
.prepareForNextStep( .prepareForNextStep(

View File

@ -27,10 +27,10 @@ import scala.Tuple2;
public class StepActions implements Serializable { public class StepActions implements Serializable {
public static void execStep(SparkSession spark, public static void execStep(SparkSession spark,
String graphPath, String newRelationPath, String relationPath, String newRelationPath,
String leavesPath, String chldParentOrgPath, String resultOrgPath) { String leavesPath, String chldParentOrgPath, String resultOrgPath) {
Dataset<Relation> relationGraph = readPath(spark, graphPath, Relation.class); Dataset<Relation> relationGraph = readPath(spark, relationPath, Relation.class);
// select only the relation source target among those proposed by propagation that are not already existent // select only the relation source target among those proposed by propagation that are not already existent
getNewRels( getNewRels(
newRelationPath, relationGraph, newRelationPath, relationGraph,

View File

@ -176,10 +176,6 @@
<name>pathMap</name> <name>pathMap</name>
<value>${pathMap}</value> <value>${pathMap}</value>
</property> </property>
<property>
<name>outputPath</name>
<value>${workingDir}/results</value>
</property>
</configuration> </configuration>
</sub-workflow> </sub-workflow>
<ok to="affiliation_inst_repo" /> <ok to="affiliation_inst_repo" />
@ -220,10 +216,6 @@
<name>sourcePath</name> <name>sourcePath</name>
<value>${outputPath}</value> <value>${outputPath}</value>
</property> </property>
<!-- <property>-->
<!-- <name>outputPath</name>-->
<!-- <value>${outputPath}</value>-->
<!-- </property>-->
</configuration> </configuration>
</sub-workflow> </sub-workflow>
<ok to="community_organization" /> <ok to="community_organization" />
@ -240,10 +232,6 @@
<name>sourcePath</name> <name>sourcePath</name>
<value>${outputPath}</value> <value>${outputPath}</value>
</property> </property>
<property>
<name>outputPath</name>
<value>${outputPath}</value>
</property>
<property> <property>
<name>organizationtoresultcommunitymap</name> <name>organizationtoresultcommunitymap</name>
<value>${organizationtoresultcommunitymap}</value> <value>${organizationtoresultcommunitymap}</value>
@ -264,10 +252,6 @@
<name>sourcePath</name> <name>sourcePath</name>
<value>${outputPath}</value> <value>${outputPath}</value>
</property> </property>
<property>
<name>outputPath</name>
<value>${outputPath}</value>
</property>
<property> <property>
<name>allowedsemrels</name> <name>allowedsemrels</name>
<value>${allowedsemrelsresultproject}</value> <value>${allowedsemrelsresultproject}</value>
@ -280,7 +264,7 @@
<action name="community_sem_rel"> <action name="community_sem_rel">
<sub-workflow> <sub-workflow>
<app-path>${wf:appPath()}/result_project <app-path>${wf:appPath()}/community_sem_rel
</app-path> </app-path>
<propagate-configuration/> <propagate-configuration/>
<configuration> <configuration>
@ -288,10 +272,7 @@
<name>sourcePath</name> <name>sourcePath</name>
<value>${outputPath}</value> <value>${outputPath}</value>
</property> </property>
<property>
<name>outputPath</name>
<value>${workingDir}/communitysemrel</value>
</property>
<property> <property>
<name>allowedsemrels</name> <name>allowedsemrels</name>
<value>${allowedsemrelscommunitysemrel}</value> <value>${allowedsemrelscommunitysemrel}</value>

View File

@ -186,7 +186,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}</arg> <arg>--sourcePath</arg><arg>${outputPath}</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscTag</arg> <arg>--workingPath</arg><arg>${workingDir}/bulktag</arg>
</spark> </spark>
<ok to="eosc_get_datasource_master"/> <ok to="eosc_get_datasource_master"/>
<error to="Kill"/> <error to="Kill"/>
@ -230,7 +230,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/publication</arg> <arg>--sourcePath</arg><arg>${outputPath}/publication</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/publication</arg> <arg>--workingPath</arg><arg>${workingDir}/bulktag/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg> <arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark> </spark>
@ -256,7 +256,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/dataset</arg> <arg>--sourcePath</arg><arg>${outputPath}/dataset</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/dataset</arg> <arg>--workingPath</arg><arg>${workingDir}/bulktag/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg> <arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark> </spark>
@ -281,7 +281,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/software</arg> <arg>--sourcePath</arg><arg>${outputPath}/software</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/software</arg> <arg>--workingPath</arg><arg>${workingDir}/bulktag/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg> <arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark> </spark>
@ -306,14 +306,24 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/otherresearchproduct</arg> <arg>--sourcePath</arg><arg>${outputPath}/otherresearchproduct</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/otherresearchproduct</arg> <arg>--workingPath</arg><arg>${workingDir}/bulktag/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg> <arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark> </spark>
<ok to="wait_eosc_context_tag"/> <ok to="wait_eosc_context_tag"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="wait_eosc_context_tag" to="End"/> <join name="wait_eosc_context_tag" to="reset_workingDir"/>
<action name="reset_workingDir">
<fs>
<delete path="${workingDir}"/>
<mkdir path="${workingDir}"/>
</fs>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -30,20 +30,13 @@
</configuration> </configuration>
</global> </global>
<start to="reset_outputpath"/> <start to="prepare_datasource_country_association"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="reset_outputpath">
<fs>
<delete path="${outputPath}"/>
<mkdir path="${outputPath}"/>
</fs>
<ok to="prepare_datasource_country_association"/>
<error to="Kill"/>
</action>
<action name="prepare_datasource_country_association"> <action name="prepare_datasource_country_association">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
@ -222,7 +215,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg> <arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--workingPath</arg><arg>${workingDir}/country/publication</arg> <arg>--workingPath</arg><arg>${workingDir}/country/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg> <arg>--outputPath</arg><arg>${workingDir}/country/result/publication</arg>
</spark> </spark>
<ok to="wait"/> <ok to="wait"/>
<error to="Kill"/> <error to="Kill"/>
@ -251,7 +244,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg> <arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--workingPath</arg><arg>${workingDir}/country/dataset</arg> <arg>--workingPath</arg><arg>${workingDir}/country/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg> <arg>--outputPath</arg><arg>${workingDir}/country/result/dataset</arg>
</spark> </spark>
<ok to="wait"/> <ok to="wait"/>
<error to="Kill"/> <error to="Kill"/>
@ -280,7 +273,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg> <arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--workingPath</arg><arg>${workingDir}/country/otherresearchproduct</arg> <arg>--workingPath</arg><arg>${workingDir}/country/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg> <arg>--outputPath</arg><arg>${workingDir}/country/result/otherresearchproduct</arg>
</spark> </spark>
<ok to="wait"/> <ok to="wait"/>
<error to="Kill"/> <error to="Kill"/>
@ -309,14 +302,21 @@
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg> <arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--workingPath</arg><arg>${workingDir}/country/software</arg> <arg>--workingPath</arg><arg>${workingDir}/country/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg> <arg>--outputPath</arg><arg>${workingDir}/country/result/software</arg>
</spark> </spark>
<ok to="wait"/> <ok to="wait"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="wait" to="End"/> <join name="wait" to="reset_workingDir"/>
<action name="reset_workingDir">
<fs>
<delete path="${workingDir}"/>
<mkdir path="${workingDir}"/>
</fs>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -390,7 +390,16 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="wait2" to="End"/> <join name="wait2" to="reset_workingDir"/>
<action name="reset_workingDir">
<fs>
<delete path="${workingDir}"/>
<mkdir path="${workingDir}"/>
</fs>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/> <end name="End"/>

View File

@ -8,10 +8,7 @@
<name>allowedsemrels</name> <name>allowedsemrels</name>
<description>the allowed semantics </description> <description>the allowed semantics </description>
</property> </property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
</parameters> </parameters>
<global> <global>
@ -76,16 +73,22 @@
--conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts> </spark-opts>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--outputPath</arg><arg>${outputPath}/relation</arg> <arg>--outputPath</arg><arg>${sourcePath}/relation</arg>
<arg>--potentialUpdatePath</arg><arg>${workingDir}/resultproject/preparedInfo/potentialUpdates</arg> <arg>--potentialUpdatePath</arg><arg>${workingDir}/resultproject/preparedInfo/potentialUpdates</arg>
<arg>--alreadyLinkedPath</arg><arg>${workingDir}/resultproject/preparedInfo/alreadyLinked</arg> <arg>--alreadyLinkedPath</arg><arg>${workingDir}/resultproject/preparedInfo/alreadyLinked</arg>
</spark> </spark>
<ok to="reset_workingDir"/>
<error to="Kill"/>
</action>
<action name="reset_workingDir">
<fs>
<delete path="${workingDir}"/>
<mkdir path="${workingDir}"/>
</fs>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -8,10 +8,7 @@
<name>organizationtoresultcommunitymap</name> <name>organizationtoresultcommunitymap</name>
<description>organization community map</description> <description>organization community map</description>
</property> </property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
</parameters> </parameters>
<global> <global>
@ -25,21 +22,12 @@
</configuration> </configuration>
</global> </global>
<start to="reset_outputpath"/> <start to="prepare_result_communitylist"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="reset_outputpath">
<fs>
<delete path="${outputPath}"/>
<mkdir path="${outputPath}"/>
</fs>
<ok to="prepare_result_communitylist"/>
<error to="Kill"/>
</action>
<action name="prepare_result_communitylist"> <action name="prepare_result_communitylist">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
@ -97,7 +85,7 @@
<arg>--outputPath</arg><arg>${workingDir}/communityorganization/publication</arg> <arg>--outputPath</arg><arg>${workingDir}/communityorganization/publication</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>
<error to="Kill"/> <error to="Kill"/>
@ -126,7 +114,7 @@
<arg>--outputPath</arg><arg>${workingDir}/communityorganization/dataset</arg> <arg>--outputPath</arg><arg>${workingDir}/communityorganization/dataset</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>
<error to="Kill"/> <error to="Kill"/>
@ -155,7 +143,7 @@
<arg>--outputPath</arg><arg>${workingDir}/communityorganization/otherresearchproduct</arg> <arg>--outputPath</arg><arg>${workingDir}/communityorganization/otherresearchproduct</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>
<error to="Kill"/> <error to="Kill"/>
@ -184,14 +172,22 @@
<arg>--outputPath</arg><arg>${workingDir}/communityorganization/software</arg> <arg>--outputPath</arg><arg>${workingDir}/communityorganization/software</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="wait2" to="End"/> <join name="wait2" to="reset_workingDir"/>
<action name="reset_workingDir">
<fs>
<delete path="${workingDir}"/>
<mkdir path="${workingDir}"/>
</fs>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -18,21 +18,12 @@
</property> </property>
</parameters> </parameters>
<start to="reset_outputpath"/> <start to="fork_prepare_assoc_step1"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="reset_outputpath">
<fs>
<delete path="${outputPath}"/>
<mkdir path="${outputPath}"/>
</fs>
<ok to="fork_prepare_assoc_step1"/>
<error to="Kill"/>
</action>
<fork name="fork_prepare_assoc_step1"> <fork name="fork_prepare_assoc_step1">
@ -214,8 +205,8 @@
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg> <arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg> <arg>--outputPath</arg><arg>${workingDir}/communitysemrel/publication</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>
<error to="Kill"/> <error to="Kill"/>
@ -243,8 +234,8 @@
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg> <arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg> <arg>--outputPath</arg><arg>${workingDir}/communitysemrel/dataset</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>
<error to="Kill"/> <error to="Kill"/>
@ -272,8 +263,8 @@
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg> <arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg> <arg>--outputPath</arg><arg>${workingDir}/communitysemrel/otherresearchproduct</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>
<error to="Kill"/> <error to="Kill"/>
@ -301,15 +292,22 @@
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg> <arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg> <arg>--outputPath</arg><arg>${workingDir}/communitysemrel/software</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="wait2" to="End"/> <join name="wait2" to="reset_workingDir"/>
<action name="reset_workingDir">
<fs>
<delete path="${workingDir}"/>
<mkdir path="${workingDir}"/>
</fs>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -4,10 +4,7 @@
<name>sourcePath</name> <name>sourcePath</name>
<description>the source path</description> <description>the source path</description>
</property> </property>
<!-- <property>-->
<!-- <name>outputPath</name>-->
<!-- <description>sets the outputPath</description>-->
<!-- </property>-->
</parameters> </parameters>
<global> <global>
@ -27,23 +24,6 @@
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<!-- <decision name="resume_from">-->
<!-- <switch>-->
<!-- <case to="prepare_info">${wf:conf('resumeFrom') eq 'PrepareInfo'}</case>-->
<!-- <default to="reset_outputpath"/> &lt;!&ndash; first action to be done when downloadDump is to be performed &ndash;&gt;-->
<!-- </switch>-->
<!-- </decision>-->
<!-- <action name="reset_outputpath">-->
<!-- <fs>-->
<!-- <delete path="${outputPath}"/>-->
<!-- <mkdir path="${outputPath}"/>-->
<!-- </fs>-->
<!-- <ok to="prepare_info"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<action name="prepare_info"> <action name="prepare_info">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
@ -99,12 +79,19 @@
<arg>--workingDir</arg><arg>${workingDir}/affiliationSemanticRelation/working</arg> <arg>--workingDir</arg><arg>${workingDir}/affiliationSemanticRelation/working</arg>
<arg>--iterations</arg><arg>${iterations}</arg> <arg>--iterations</arg><arg>${iterations}</arg>
</spark> </spark>
<ok to="reset_workingDir"/>
<error to="Kill"/>
</action>
<action name="reset_workingDir">
<fs>
<delete path="${workingDir}"/>
<mkdir path="${workingDir}"/>
</fs>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -5,6 +5,7 @@ import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import eu.dnetlib.dhp.countrypropagation.pojo.DatasourceCountry;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;

View File

@ -1,12 +1,11 @@
package eu.dnetlib.dhp.countrypropagation; package eu.dnetlib.dhp.countrypropagation;
import static eu.dnetlib.dhp.PropagationConstant.isSparkSessionManaged;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import eu.dnetlib.dhp.countrypropagation.pojo.ResultCountrySet;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;

View File

@ -33,21 +33,21 @@ public class ProjectPropagationJobTest {
private static SparkSession spark; private static SparkSession spark;
private static Path workingDir; private static Path workingDir;
private static final SparkConf conf = new SparkConf();
@BeforeAll @BeforeAll
public static void beforeAll() throws IOException { public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(ProjectPropagationJobTest.class.getSimpleName());
log.info("using work dir {}", workingDir); log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(ProjectPropagationJobTest.class.getSimpleName()); conf.setAppName(ProjectPropagationJobTest.class.getSimpleName());
conf.setMaster("local[*]"); conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost"); conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true"); conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false"); conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession spark = SparkSession
.builder() .builder()
@ -58,7 +58,7 @@ public class ProjectPropagationJobTest {
@AfterAll @AfterAll
public static void afterAll() throws IOException { public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop(); spark.stop();
} }
@ -71,6 +71,7 @@ public class ProjectPropagationJobTest {
@Test @Test
void NoUpdateTest() throws Exception { void NoUpdateTest() throws Exception {
workingDir = Files.createTempDirectory(ProjectPropagationJobTest.class.getSimpleName());
final String potentialUpdateDate = getClass() final String potentialUpdateDate = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/projecttoresult/preparedInfo/noupdates/potentialUpdates") "/eu/dnetlib/dhp/projecttoresult/preparedInfo/noupdates/potentialUpdates")
@ -82,10 +83,10 @@ public class ProjectPropagationJobTest {
SparkResultToProjectThroughSemRelJob SparkResultToProjectThroughSemRelJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-hive_metastore_uris", "", "-hive_metastore_uris", "",
"-saveGraph", "true",
"-outputPath", workingDir.toString() + "/relation", "-outputPath", workingDir.toString() + "/relation",
"-potentialUpdatePath", potentialUpdateDate, "-potentialUpdatePath", potentialUpdateDate,
"-alreadyLinkedPath", alreadyLinkedPath, "-alreadyLinkedPath", alreadyLinkedPath,
@ -98,6 +99,10 @@ public class ProjectPropagationJobTest {
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); .map(item -> OBJECT_MAPPER.readValue(item, Relation.class));
Assertions.assertEquals(0, tmp.count()); Assertions.assertEquals(0, tmp.count());
FileUtils.deleteDirectory(workingDir.toFile());
} }
/** /**
@ -107,6 +112,12 @@ public class ProjectPropagationJobTest {
*/ */
@Test @Test
void UpdateTenTest() throws Exception { void UpdateTenTest() throws Exception {
workingDir = Files.createTempDirectory(ProjectPropagationJobTest.class.getSimpleName());
spark = SparkSession
.builder()
.appName(ProjectPropagationJobTest.class.getSimpleName())
.config(conf)
.getOrCreate();
final String potentialUpdatePath = getClass() final String potentialUpdatePath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/projecttoresult/preparedInfo/tenupdates/potentialUpdates") "/eu/dnetlib/dhp/projecttoresult/preparedInfo/tenupdates/potentialUpdates")
@ -118,10 +129,10 @@ public class ProjectPropagationJobTest {
SparkResultToProjectThroughSemRelJob SparkResultToProjectThroughSemRelJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-hive_metastore_uris", "", "-hive_metastore_uris", "",
"-saveGraph", "true",
"-outputPath", workingDir.toString() + "/relation", "-outputPath", workingDir.toString() + "/relation",
"-potentialUpdatePath", potentialUpdatePath, "-potentialUpdatePath", potentialUpdatePath,
"-alreadyLinkedPath", alreadyLinkedPath, "-alreadyLinkedPath", alreadyLinkedPath,
@ -169,6 +180,9 @@ public class ProjectPropagationJobTest {
.sql( .sql(
"Select * from temporary where datainfo.inferenceprovenance = 'propagation'") "Select * from temporary where datainfo.inferenceprovenance = 'propagation'")
.count()); .count());
FileUtils.deleteDirectory(workingDir.toFile());
} }
/** /**
@ -179,6 +193,12 @@ public class ProjectPropagationJobTest {
*/ */
@Test @Test
void UpdateMixTest() throws Exception { void UpdateMixTest() throws Exception {
workingDir = Files.createTempDirectory(ProjectPropagationJobTest.class.getSimpleName());
spark = SparkSession
.builder()
.appName(ProjectPropagationJobTest.class.getSimpleName())
.config(conf)
.getOrCreate();
final String potentialUpdatepath = getClass() final String potentialUpdatepath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/projecttoresult/preparedInfo/updatesmixed/potentialUpdates") "/eu/dnetlib/dhp/projecttoresult/preparedInfo/updatesmixed/potentialUpdates")
@ -190,10 +210,10 @@ public class ProjectPropagationJobTest {
SparkResultToProjectThroughSemRelJob SparkResultToProjectThroughSemRelJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-hive_metastore_uris", "", "-hive_metastore_uris", "",
"-saveGraph", "true",
"-outputPath", workingDir.toString() + "/relation", "-outputPath", workingDir.toString() + "/relation",
"-potentialUpdatePath", potentialUpdatepath, "-potentialUpdatePath", potentialUpdatepath,
"-alreadyLinkedPath", alreadyLinkedPath, "-alreadyLinkedPath", alreadyLinkedPath,
@ -244,5 +264,7 @@ public class ProjectPropagationJobTest {
.sql( .sql(
"Select * from temporary where datainfo.inferenceprovenance = 'propagation'") "Select * from temporary where datainfo.inferenceprovenance = 'propagation'")
.count()); .count());
FileUtils.deleteDirectory(workingDir.toFile());
} }
} }