added needed parameter

This commit is contained in:
Miriam Baglioni 2023-12-19 12:15:01 +01:00
parent 3eca5d2e1c
commit d410ea8a41
5 changed files with 41 additions and 116 deletions

View File

@ -2,26 +2,19 @@
package eu.dnetlib.dhp.resulttoorganizationfrominstrepo; package eu.dnetlib.dhp.resulttoorganizationfrominstrepo;
import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable; import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.bulktag.community.ResultTagger;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
/** /**
* @author miriam.baglioni * @author miriam.baglioni
@ -54,7 +47,7 @@ public class AppendNewRelations implements Serializable {
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkHiveSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> appendNewRelation(spark, inputPath, outputPath)); spark -> appendNewRelation(spark, inputPath, outputPath));

View File

@ -5,9 +5,10 @@
<description>the source path</description> <description>the source path</description>
</property> </property>
<property> <property>
<name>outputPath</name> <name>iterations</name>
<description>sets the outputPath</description> <description>the number of hops to be done up on the hierarchy</description>
</property> </property>
</parameters> </parameters>
<global> <global>
@ -21,119 +22,26 @@
</configuration> </configuration>
</global> </global>
<start to="resume_from"/> <start to="reset_outputpath"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<decision name="resume_from">
<switch>
<case to="prepare_info">${wf:conf('resumeFrom') eq 'PrepareInfo'}</case>
<default to="reset_outputpath"/> <!-- first action to be done when downloadDump is to be performed -->
</switch>
</decision>
<action name="reset_outputpath"> <action name="reset_outputpath">
<fs> <fs>
<delete path="${outputPath}"/> <delete path="${workingDir}"/>
<mkdir path="${outputPath}"/> <mkdir path="${workingDir}"/>
</fs> </fs>
<ok to="copy_entities"/> <ok to="prepare_info"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<fork name="copy_entities">
<path start="copy_relation"/>
<path start="copy_publication"/>
<path start="copy_dataset"/>
<path start="copy_orp"/>
<path start="copy_software"/>
<path start="copy_organization"/>
<path start="copy_projects"/>
<path start="copy_datasources"/>
</fork>
<action name="copy_relation">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/relation</arg>
<arg>${nameNode}/${outputPath}/relation</arg>
</distcp>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="copy_publication">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/publication</arg>
<arg>${nameNode}/${outputPath}/publication</arg>
</distcp>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="copy_dataset">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/dataset</arg>
<arg>${nameNode}/${outputPath}/dataset</arg>
</distcp>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="copy_orp">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/otherresearchproduct</arg>
<arg>${nameNode}/${outputPath}/otherresearchproduct</arg>
</distcp>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="copy_software">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/software</arg>
<arg>${nameNode}/${outputPath}/software</arg>
</distcp>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/organization</arg>
<arg>${nameNode}/${outputPath}/organization</arg>
</distcp>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="copy_projects">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/project</arg>
<arg>${nameNode}/${outputPath}/project</arg>
</distcp>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="copy_datasources">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/datasource</arg>
<arg>${nameNode}/${outputPath}/datasource</arg>
</distcp>
<ok to="wait"/>
<error to="Kill"/>
</action>
<join name="wait" to="prepare_info"/>
<action name="prepare_info"> <action name="prepare_info">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>PrepareResultOrganizationAssociation</name> <name>PrepareResultProjectOrganizationAssociation</name>
<class>eu.dnetlib.dhp.entitytoorganizationfromsemrel.PrepareInfo</class> <class>eu.dnetlib.dhp.entitytoorganizationfromsemrel.PrepareInfo</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar> <jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
@ -161,7 +69,7 @@
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>resultToOrganizationFromSemRel</name> <name>resultProjectToOrganizationFromSemRel</name>
<class>eu.dnetlib.dhp.entitytoorganizationfromsemrel.SparkEntityToOrganizationFromSemRel</class> <class>eu.dnetlib.dhp.entitytoorganizationfromsemrel.SparkEntityToOrganizationFromSemRel</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar> <jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
@ -177,7 +85,7 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--relationPath</arg><arg>${workingDir}/preparedInfo/relation</arg> <arg>--relationPath</arg><arg>${workingDir}/preparedInfo/relation</arg>
<arg>--outputPath</arg><arg>${outputPath}/relation</arg> <arg>--outputPath</arg><arg>${sourcePath}/relation</arg>
<arg>--leavesPath</arg><arg>${workingDir}/preparedInfo/leavesPath</arg> <arg>--leavesPath</arg><arg>${workingDir}/preparedInfo/leavesPath</arg>
<arg>--childParentPath</arg><arg>${workingDir}/preparedInfo/childParentPath</arg> <arg>--childParentPath</arg><arg>${workingDir}/preparedInfo/childParentPath</arg>
<arg>--resultOrgPath</arg><arg>${workingDir}/preparedInfo/resultOrgPath</arg> <arg>--resultOrgPath</arg><arg>${workingDir}/preparedInfo/resultOrgPath</arg>

View File

@ -0,0 +1,20 @@
[
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "the path where prepared info have been stored",
"paramRequired": false
},{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "institutional repositories that should not be considered for the propagation",
"paramRequired": false
}
]

View File

@ -1,5 +1,5 @@
sourcePath=/tmp/beta_provision/graph/09_graph_dedup_enriched sourcePath=/tmp/beta_provision/graph/09_graph_dedup_enriched
resumeFrom=default resumeFrom=AffiliationSemanticRelation
allowedsemrelsorcidprop=isSupplementedBy;isSupplementTo allowedsemrelsorcidprop=isSupplementedBy;isSupplementTo
allowedsemrelsresultproject=isSupplementedBy;isSupplementTo allowedsemrelsresultproject=isSupplementedBy;isSupplementTo
allowedsemrelscommunitysemrel=isSupplementedBy;isSupplementTo allowedsemrelscommunitysemrel=isSupplementedBy;isSupplementTo
@ -24,5 +24,5 @@ pathMap ={"author":"$['author'][*]['fullname']", \
blacklist=empty blacklist=empty
allowedpids=orcid;orcid_pending allowedpids=orcid;orcid_pending
baseURL = https://services.openaire.eu/openaire/community/ baseURL = https://services.openaire.eu/openaire/community/
iterations=1

View File

@ -195,13 +195,13 @@
</property> </property>
</configuration> </configuration>
</sub-workflow> </sub-workflow>
<ok to="affiliation_semantic_relation" /> <ok to="entity_semantic_relation" />
<error to="Kill" /> <error to="Kill" />
</action> </action>
<action name="affiliation_semantic_relation"> <action name="entity_semantic_relation">
<sub-workflow> <sub-workflow>
<app-path>${wf:appPath()}/affiliation_semantic_relation <app-path>${wf:appPath()}/entity_semantic_relation
</app-path> </app-path>
<propagate-configuration/> <propagate-configuration/>
<configuration> <configuration>
@ -209,6 +209,10 @@
<name>sourcePath</name> <name>sourcePath</name>
<value>${outputPath}</value> <value>${outputPath}</value>
</property> </property>
<property>
<name>iterations</name>
<value>${iterations}</value>
</property>
</configuration> </configuration>
</sub-workflow> </sub-workflow>
<ok to="community_organization" /> <ok to="community_organization" />