[Dump Funders] -

This commit is contained in:
Miriam Baglioni 2022-03-23 10:10:38 +01:00
parent c763aded70
commit faf79db4d5
6 changed files with 142 additions and 76 deletions

View File

@ -10,6 +10,7 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
@ -79,12 +80,11 @@ public class SparkPrepareResultProject implements Serializable {
private static void prepareResultProjectList(SparkSession spark, String inputPath, String outputPath) {
Dataset<Relation> relation = Utils
.readPath(spark, inputPath + "/relation", Relation.class)
.filter(
"dataInfo.deletedbyinference = false and lower(relClass) = '"
+ ModelConstants.IS_PRODUCED_BY.toLowerCase() + "'");
Dataset<eu.dnetlib.dhp.schema.oaf.Project> projects = Utils
.readPath(spark, inputPath + "/project", eu.dnetlib.dhp.schema.oaf.Project.class);
.readPath(spark, inputPath + "/relation", Relation.class)
.filter((FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
r.getRelClass().equalsIgnoreCase(ModelConstants.IS_PRODUCED_BY));
Dataset<eu.dnetlib.dhp.schema.oaf.Project> projects = Utils.readPath(spark, inputPath + "/project", eu.dnetlib.dhp.schema.oaf.Project.class);
projects
.joinWith(relation, projects.col("id").equalTo(relation.col("target")), "inner")

View File

@ -76,8 +76,7 @@ public class SparkDumpFunderResults implements Serializable {
.union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class));
List<String> funderList = project
.select("id")
.map((MapFunction<Row, String>) value -> value.getString(0).substring(0, 15), Encoders.STRING())
.map((MapFunction<eu.dnetlib.dhp.schema.oaf.Project, String>) p -> p.getId(),Encoders.STRING() )
.distinct()
.collectAsList();

View File

@ -6,8 +6,15 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Optional;
import eu.dnetlib.dhp.oa.graph.dump.Constants;
import eu.dnetlib.dhp.oa.graph.dump.DumpProducts;
import eu.dnetlib.dhp.oa.graph.dump.ResultMapper;
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
import eu.dnetlib.dhp.oa.graph.dump.community.ResultProject;
import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
@ -23,6 +30,7 @@ import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import scala.Tuple2;
/**
* Selects the results linked to projects. Only for these results the dump will be performed.
@ -58,8 +66,10 @@ public class SparkResultLinkedToProject implements Serializable {
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String resultProjectsPath = parser.get("graphPath");
log.info("graphPath: {}", resultProjectsPath);
String communityMapPath = parser.get("communityMapPath");
@SuppressWarnings("unchecked")
Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName);
@ -70,43 +80,32 @@ public class SparkResultLinkedToProject implements Serializable {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
writeResultsLinkedToProjects(spark, inputClazz, inputPath, outputPath, graphPath);
writeResultsLinkedToProjects(communityMapPath, spark, inputClazz, inputPath, outputPath, resultProjectsPath);
});
}
private static <R extends Result> void writeResultsLinkedToProjects(SparkSession spark, Class<R> inputClazz,
String inputPath, String outputPath, String graphPath) {
private static <R extends Result> void writeResultsLinkedToProjects(String communityMapPath, SparkSession spark, Class<R> inputClazz,
String inputPath, String outputPath, String resultProjectsPath) {
Dataset<R> results = Utils
.readPath(spark, inputPath, inputClazz)
.filter("dataInfo.deletedbyinference = false and datainfo.invisible = false");
Dataset<Relation> relations = Utils
.readPath(spark, graphPath + "/relation", Relation.class)
.filter(
"dataInfo.deletedbyinference = false and lower(relClass) = '"
+ ModelConstants.IS_PRODUCED_BY.toLowerCase() + "'");
Dataset<Project> project = Utils.readPath(spark, graphPath + "/project", Project.class);
.filter((FilterFunction<R>) r -> !r.getDataInfo().getDeletedbyinference() &&
!r.getDataInfo().getInvisible())
;
Dataset<ResultProject> resultProjectDataset = Utils
.readPath(spark, resultProjectsPath , ResultProject.class)
;
CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath);
results.joinWith(resultProjectDataset, results.col("id").equalTo(resultProjectDataset.col("resultId")))
.map((MapFunction<Tuple2<R, ResultProject>, CommunityResult>) t2 ->
{
CommunityResult cr = (CommunityResult) ResultMapper.map(t2._1(),
communityMap, Constants.DUMPTYPE.FUNDER.getType());
cr.setProjects(t2._2().getProjectsList());
return cr;
}
results.createOrReplaceTempView("result");
relations.createOrReplaceTempView("relation");
project.createOrReplaceTempView("project");
Dataset<R> tmp = spark
.sql(
"Select res.* " +
"from relation rel " +
"join result res " +
"on rel.source = res.id " +
"join project p " +
"on rel.target = p.id " +
"")
.as(Encoders.bean(inputClazz));
tmp
.groupByKey(
(MapFunction<R, String>) value -> value
.getId(),
Encoders.STRING())
.mapGroups((MapGroupsFunction<String, R, R>) (k, it) -> it.next(), Encoders.bean(inputClazz))
, Encoders.bean(CommunityResult.class) )
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")

View File

@ -0,0 +1,23 @@
[
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
}
]

View File

@ -92,8 +92,6 @@
</kill>
<fork name="fork_dump">
<path start="dump_publication"/>
<path start="dump_dataset"/>

View File

@ -77,12 +77,59 @@
</configuration>
</global>
<start to="fork_result_linked_to_projects"/>
<start to="prepareResultProject"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="prepareResultProject">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Prepare association result subset of project info</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkPrepareResultProject</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="fork_result_linked_to_projects"/>
<error to="Kill"/>
</action>
<!-- <action name="select_relations">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <master>yarn</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>Dump funder results </name>-->
<!-- <class>eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkSelectRelations</class>-->
<!-- <jar>dhp-graph-mapper-${projectVersion}.jar</jar>-->
<!-- <spark-opts>-->
<!-- &#45;&#45;executor-memory=${sparkExecutorMemory}-->
<!-- &#45;&#45;executor-cores=${sparkExecutorCores}-->
<!-- &#45;&#45;driver-memory=${sparkDriverMemory}-->
<!-- &#45;&#45;conf spark.extraListeners=${spark2ExtraListeners}-->
<!-- &#45;&#45;conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
<!-- &#45;&#45;conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
<!-- &#45;&#45;conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
<!-- &#45;&#45;conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}-->
<!-- </spark-opts>-->
<!-- <arg>&#45;&#45;sourcePath</arg><arg>${sourcePath}/relation</arg>-->
<!-- <arg>&#45;&#45;outputPath</arg><arg>${workingDir}/preparedInfo</arg>-->
<!-- </spark>-->
<!-- <ok to="fork_result_linked_to_projects"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<fork name="fork_result_linked_to_projects">
<path start="select_publication_linked_to_projects"/>
@ -111,7 +158,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/publication</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
<arg>--graphPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_link"/>
<error to="Kill"/>
@ -137,7 +184,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/dataset</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
<arg>--graphPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_link"/>
<error to="Kill"/>
@ -163,7 +210,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/otherresearchproduct</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
<arg>--graphPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_link"/>
<error to="Kill"/>
@ -189,41 +236,41 @@
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/software</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
<arg>--graphPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_link"/>
<error to="Kill"/>
</action>
<join name="join_link" to="common_action_community_funder"/>
<join name="join_link" to="dump_funder_results"/>
<action name="common_action_community_funder">
<sub-workflow>
<app-path>${wf:appPath()}/dump_common
</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>sourcePath</name>
<value>${sourcePath}</value>
</property>
<property>
<name>selectedResults</name>
<value>${workingDir}/result</value>
</property>
<property>
<name>communityMapPath</name>
<value>${workingDir}/communityMap</value>
</property>
<property>
<name>outputPath</name>
<value>${workingDir}</value>
</property>
</configuration>
</sub-workflow>
<ok to="dump_funder_results" />
<error to="Kill" />
</action>
<!-- <action name="common_action_community_funder">-->
<!-- <sub-workflow>-->
<!-- <app-path>${wf:appPath()}/dump_common-->
<!-- </app-path>-->
<!-- <propagate-configuration/>-->
<!-- <configuration>-->
<!-- <property>-->
<!-- <name>sourcePath</name>-->
<!-- <value>${sourcePath}</value>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>selectedResults</name>-->
<!-- <value>${workingDir}/result</value>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>communityMapPath</name>-->
<!-- <value>${workingDir}/communityMap</value>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>outputPath</name>-->
<!-- <value>${workingDir}</value>-->
<!-- </property>-->
<!-- </configuration>-->
<!-- </sub-workflow>-->
<!-- <ok to="dump_funder_results" />-->
<!-- <error to="Kill" />-->
<!-- </action>-->
<action name="dump_funder_results">
<spark xmlns="uri:oozie:spark-action:0.2">
@ -242,7 +289,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/ext</arg>
<arg>--sourcePath</arg><arg>${workingDir}/result</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
</spark>