added propagation for project to organization

This commit is contained in:
Miriam Baglioni 2023-05-31 11:06:58 +02:00
parent a235d2a24a
commit 0389b57ca7
21 changed files with 153 additions and 70 deletions

View File

@ -57,7 +57,10 @@ public class PropagationConstant {
public static final String PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME = "Propagation of affiliation to result collected from datasources of type institutional repository"; public static final String PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME = "Propagation of affiliation to result collected from datasources of type institutional repository";
public static final String PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_ID = "result:organization:semrel"; public static final String PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_ID = "result:organization:semrel";
public static final String PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_NAME = "Propagation of affiliation to result through sematic relations"; public static final String PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_NAME = "Propagation of affiliation to result through semantic relations";
public static final String PROPAGATION_RELATION_PROJECT_ORGANIZATION_SEM_REL_CLASS_ID = "project:organization:semrel";
public static final String PROPAGATION_RELATION_PROJECT_ORGANIZATION_SEM_REL_CLASS_NAME = "Propagation of participation to project through semantic relations";
public static final String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID = "result:project:semrel"; public static final String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID = "result:project:semrel";
public static final String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME = "Propagation of result to project through semantic relation"; public static final String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME = "Propagation of result to project through semantic relation";
@ -171,6 +174,32 @@ public class PropagationConstant {
return newRelations; return newRelations;
} }
public static Relation getParticipantRelation(
String source,
String target,
String rel_class) {
return getRelation(source, target ,
rel_class,
ModelConstants.PROJECT_ORGANIZATION,
ModelConstants.PARTICIPATION,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_PROJECT_ORGANIZATION_SEM_REL_CLASS_ID,
PROPAGATION_RELATION_PROJECT_ORGANIZATION_SEM_REL_CLASS_NAME);
}
public static Relation getAffiliationRelation(
String source,
String target,
String rel_class) {
return getRelation(source, target ,
rel_class,
ModelConstants.RESULT_ORGANIZATION,
ModelConstants.AFFILIATION,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_ID,
PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_NAME);
}
public static Relation getRelation( public static Relation getRelation(
String source, String source,
String target, String target,

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.resulttoorganizationfromsemrel; package eu.dnetlib.dhp.entitytoorganizationfromsemrel;
import java.io.Serializable; import java.io.Serializable;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.resulttoorganizationfromsemrel; package eu.dnetlib.dhp.entitytoorganizationfromsemrel;
import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
@ -7,6 +7,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.io.Serializable; import java.io.Serializable;
import java.util.*; import java.util.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*; import org.apache.spark.api.java.function.*;
@ -14,8 +15,6 @@ import org.apache.spark.sql.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.KeyValueSet; import eu.dnetlib.dhp.KeyValueSet;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob; import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob;
@ -47,13 +46,20 @@ public class PrepareInfo implements Serializable {
"' and datainfo.deletedbyinference = false " + "' and datainfo.deletedbyinference = false " +
"GROUP BY source"; "GROUP BY source";
// associate projects to all the participant orgs
private static final String PROJECT_ORGANIZATION_QUERY = "SELECT source key, collect_set(target) as valueSet " +
"FROM relation " +
"WHERE lower(relclass) = '" + ModelConstants.IS_PARTICIPANT.toLowerCase() +
"' and datainfo.deletedbyinference = false " +
"GROUP BY source";
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
.toString( .toString(
SparkResultToOrganizationFromIstRepoJob.class SparkResultToOrganizationFromIstRepoJob.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_preparation_parameter.json")); "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/input_preparation_parameter.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
@ -74,6 +80,9 @@ public class PrepareInfo implements Serializable {
final String resultOrganizationPath = parser.get("resultOrgPath"); final String resultOrganizationPath = parser.get("resultOrgPath");
log.info("resultOrganizationPath: {}", resultOrganizationPath); log.info("resultOrganizationPath: {}", resultOrganizationPath);
final String projectOrgPath = parser.get("projectOrganizationPath");
log.info("projectOrgPath: {}", projectOrgPath);
final String relationPath = parser.get("relationPath"); final String relationPath = parser.get("relationPath");
log.info("relationPath: {}", relationPath); log.info("relationPath: {}", relationPath);
@ -89,11 +98,12 @@ public class PrepareInfo implements Serializable {
childParentPath, childParentPath,
leavesPath, leavesPath,
resultOrganizationPath, resultOrganizationPath,
projectOrgPath,
relationPath)); relationPath));
} }
private static void prepareInfo(SparkSession spark, String inputPath, String childParentOrganizationPath, private static void prepareInfo(SparkSession spark, String inputPath, String childParentOrganizationPath,
String currentIterationPath, String resultOrganizationPath, String relationPath) { String currentIterationPath, String resultOrganizationPath, String resultProjectPath, String relationPath) {
Dataset<Relation> relation = readPath(spark, inputPath + "/relation", Relation.class); Dataset<Relation> relation = readPath(spark, inputPath + "/relation", Relation.class);
relation.createOrReplaceTempView("relation"); relation.createOrReplaceTempView("relation");
@ -113,6 +123,14 @@ public class PrepareInfo implements Serializable {
.option("compression", "gzip") .option("compression", "gzip")
.json(resultOrganizationPath); .json(resultOrganizationPath);
spark
.sql(PROJECT_ORGANIZATION_QUERY)
.as(Encoders.bean(KeyValueSet.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(resultProjectPath);
relation relation
.filter( .filter(
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() && (FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
@ -120,7 +138,16 @@ public class PrepareInfo implements Serializable {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(relationPath); .json(relationPath + "/result");
relation
.filter(
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
r.getRelClass().equals(ModelConstants.IS_PARTICIPANT))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(relationPath + "/project");
Dataset<String> children = spark Dataset<String> children = spark
.sql( .sql(

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.resulttoorganizationfromsemrel; package eu.dnetlib.dhp.entitytoorganizationfromsemrel;
import java.io.Serializable; import java.io.Serializable;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.resulttoorganizationfromsemrel; package eu.dnetlib.dhp.entitytoorganizationfromsemrel;
import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
@ -30,7 +30,8 @@ import eu.dnetlib.dhp.schema.oaf.Relation;
public class SparkResultToOrganizationFromSemRel implements Serializable { public class SparkResultToOrganizationFromSemRel implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromSemRel.class); private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromSemRel.class);
private static final int MAX_ITERATION = 5; private static final int MAX_ITERATION = 5;
public static final String NEW_RELATION_PATH = "/newRelation"; public static final String NEW_RESULT_RELATION_PATH = "/newResultRelation";
public static final String NEW_PROJECT_RELATION_PATH = "/newProjectRelation";
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@ -62,6 +63,9 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
final String resultOrganizationPath = parser.get("resultOrgPath"); final String resultOrganizationPath = parser.get("resultOrgPath");
log.info("resultOrganizationPath: {}", resultOrganizationPath); log.info("resultOrganizationPath: {}", resultOrganizationPath);
final String projectOrganizationPath = parser.get("projectOrganizationPath");
log.info("projectOrganizationPath: {}", projectOrganizationPath);
final String workingPath = parser.get("workingDir"); final String workingPath = parser.get("workingDir");
log.info("workingPath: {}", workingPath); log.info("workingPath: {}", workingPath);
@ -88,6 +92,7 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
leavesPath, leavesPath,
childParentPath, childParentPath,
resultOrganizationPath, resultOrganizationPath,
projectOrganizationPath,
relationPath, relationPath,
workingPath, workingPath,
outputPath, outputPath,
@ -98,13 +103,14 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
String leavesPath, String leavesPath,
String childParentPath, String childParentPath,
String resultOrganizationPath, String resultOrganizationPath,
String projectOrganizationPath,
String graphPath, String graphPath,
String workingPath, String workingPath,
String outputPath, String outputPath,
int iterations) { int iterations) {
if (iterations == 1) { if (iterations == 1) {
doPropagateOnce( doPropagateOnce(
spark, leavesPath, childParentPath, resultOrganizationPath, graphPath, spark, leavesPath, childParentPath, resultOrganizationPath, projectOrganizationPath, graphPath,
workingPath, outputPath); workingPath, outputPath);
} else { } else {
@ -130,15 +136,22 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
} }
private static void doPropagateOnce(SparkSession spark, String leavesPath, String childParentPath, private static void doPropagateOnce(SparkSession spark, String leavesPath, String childParentPath,
String resultOrganizationPath, String graphPath, String workingPath, String resultOrganizationPath, String projectOrganizationPath, String graphPath, String workingPath,
String outputPath) { String outputPath) {
StepActions StepActions
.execStep( .execStep(
spark, graphPath, workingPath + NEW_RELATION_PATH, spark, graphPath + "/result", workingPath + NEW_RESULT_RELATION_PATH,
leavesPath, childParentPath, resultOrganizationPath); leavesPath, childParentPath, resultOrganizationPath);
addNewRelations(spark, workingPath + NEW_RELATION_PATH, outputPath); addNewRelations(spark, workingPath + NEW_RESULT_RELATION_PATH, outputPath);
StepActions
.execStep(
spark, graphPath + "/project", workingPath + NEW_PROJECT_RELATION_PATH,
leavesPath, childParentPath, projectOrganizationPath);
addNewRelations(spark, workingPath + NEW_PROJECT_RELATION_PATH, outputPath);
} }
private static void doPropagate(SparkSession spark, String leavesPath, String childParentPath, private static void doPropagate(SparkSession spark, String leavesPath, String childParentPath,
@ -151,11 +164,11 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
iteration++; iteration++;
StepActions StepActions
.execStep( .execStep(
spark, graphPath, workingPath + NEW_RELATION_PATH, spark, graphPath, workingPath + NEW_RESULT_RELATION_PATH,
leavesPath, childParentPath, resultOrganizationPath); leavesPath, childParentPath, resultOrganizationPath);
StepActions StepActions
.prepareForNextStep( .prepareForNextStep(
spark, workingPath + NEW_RELATION_PATH, resultOrganizationPath, leavesPath, spark, workingPath + NEW_RESULT_RELATION_PATH, resultOrganizationPath, leavesPath,
childParentPath, workingPath + "/leaves", workingPath + "/resOrg"); childParentPath, workingPath + "/leaves", workingPath + "/resOrg");
moveOutput(spark, workingPath, leavesPath, resultOrganizationPath); moveOutput(spark, workingPath, leavesPath, resultOrganizationPath);
leavesCount = readPath(spark, leavesPath, Leaves.class).count(); leavesCount = readPath(spark, leavesPath, Leaves.class).count();
@ -185,7 +198,7 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
propagationCounter.getNotReachedFirstParent().add(1); propagationCounter.getNotReachedFirstParent().add(1);
} }
addNewRelations(spark, workingPath + NEW_RELATION_PATH, outputPath); addNewRelations(spark, workingPath + NEW_RESULT_RELATION_PATH, outputPath);
} }
private static void moveOutput(SparkSession spark, String workingPath, String leavesPath, private static void moveOutput(SparkSession spark, String workingPath, String leavesPath,
@ -212,16 +225,24 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
.mapGroups( .mapGroups(
(MapGroupsFunction<String, Relation, Relation>) (k, it) -> it.next(), Encoders.bean(Relation.class)) (MapGroupsFunction<String, Relation, Relation>) (k, it) -> it.next(), Encoders.bean(Relation.class))
.flatMap( .flatMap(
(FlatMapFunction<Relation, Relation>) r -> Arrays (FlatMapFunction<Relation, Relation>) r ->
{
if(r.getSource().startsWith("50|")){
return Arrays
.asList( .asList(
r, getRelation( r, getAffiliationRelation(
r.getTarget(), r.getSource(), ModelConstants.IS_AUTHOR_INSTITUTION_OF, r.getTarget(), r.getSource(), ModelConstants.IS_AUTHOR_INSTITUTION_OF))
ModelConstants.RESULT_ORGANIZATION, .iterator();
ModelConstants.AFFILIATION, }else{
PROPAGATION_DATA_INFO_TYPE, return Arrays
PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_ID, .asList(
PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_NAME)) r, getParticipantRelation(
.iterator() r.getTarget(), r.getSource(), ModelConstants.HAS_PARTICIPANT))
.iterator();
}
}
, Encoders.bean(Relation.class)) , Encoders.bean(Relation.class))
.write() .write()

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.resulttoorganizationfromsemrel; package eu.dnetlib.dhp.entitytoorganizationfromsemrel;
import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.PropagationConstant.readPath; import static eu.dnetlib.dhp.PropagationConstant.readPath;
@ -14,8 +14,6 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@ -28,13 +26,13 @@ public class StepActions implements Serializable {
public static void execStep(SparkSession spark, public static void execStep(SparkSession spark,
String graphPath, String newRelationPath, String graphPath, String newRelationPath,
String leavesPath, String chldParentOrgPath, String resultOrgPath) { String leavesPath, String chldParentOrgPath, String entityOrgPath) {
Dataset<Relation> relationGraph = readPath(spark, graphPath, Relation.class); Dataset<Relation> relationGraph = readPath(spark, graphPath, Relation.class);
// select only the relation source target among those proposed by propagation that are not already existent // select only the relation source target among those proposed by propagation that are not already existent
getNewRels( getNewRels(
newRelationPath, relationGraph, newRelationPath, relationGraph,
getPropagationRelation(spark, leavesPath, chldParentOrgPath, resultOrgPath)); getPropagationRelation(spark, leavesPath, chldParentOrgPath, entityOrgPath, ModelConstants.HAS_AUTHOR_INSTITUTION));
} }
@ -152,19 +150,20 @@ public class StepActions implements Serializable {
private static Dataset<Relation> getPropagationRelation(SparkSession spark, private static Dataset<Relation> getPropagationRelation(SparkSession spark,
String leavesPath, String leavesPath,
String chldParentOrgPath, String chldParentOrgPath,
String resultOrgPath) { String entityOrgPath,
String semantics) {
Dataset<KeyValueSet> childParent = readPath(spark, chldParentOrgPath, KeyValueSet.class); Dataset<KeyValueSet> childParent = readPath(spark, chldParentOrgPath, KeyValueSet.class);
Dataset<KeyValueSet> resultOrg = readPath(spark, resultOrgPath, KeyValueSet.class); Dataset<KeyValueSet> entityOrg = readPath(spark, entityOrgPath, KeyValueSet.class);
Dataset<Leaves> leaves = readPath(spark, leavesPath, Leaves.class); Dataset<Leaves> leaves = readPath(spark, leavesPath, Leaves.class);
childParent.createOrReplaceTempView("childParent"); childParent.createOrReplaceTempView("childParent");
resultOrg.createOrReplaceTempView("resultOrg"); entityOrg.createOrReplaceTempView("entityOrg");
leaves.createOrReplaceTempView("leaves"); leaves.createOrReplaceTempView("leaves");
Dataset<KeyValueSet> resultParent = spark Dataset<KeyValueSet> resultParent = spark
.sql( .sql(
"SELECT resId as key, " + "SELECT entityId as key, " +
"collect_set(parent) valueSet " + "collect_set(parent) valueSet " +
"FROM (SELECT key as child, parent " + "FROM (SELECT key as child, parent " +
" FROM childParent " + " FROM childParent " +
@ -172,7 +171,7 @@ public class StepActions implements Serializable {
"JOIN leaves " + "JOIN leaves " +
"ON leaves.value = cp.child " + "ON leaves.value = cp.child " +
"JOIN (" + "JOIN (" +
"SELECT key as resId, org " + "SELECT key as entityId, org " +
"FROM resultOrg " + "FROM resultOrg " +
"LATERAL VIEW explode (valueSet) ks as org ) as ro " + "LATERAL VIEW explode (valueSet) ks as org ) as ro " +
"ON leaves.value = ro.org " + "ON leaves.value = ro.org " +
@ -186,19 +185,16 @@ public class StepActions implements Serializable {
.getValueSet() .getValueSet()
.stream() .stream()
.map( .map(
orgId -> getRelation( orgId -> getAffiliationRelation(
v.getKey(), v.getKey(),
orgId, orgId,
ModelConstants.HAS_AUTHOR_INSTITUTION, semantics))
ModelConstants.RESULT_ORGANIZATION,
ModelConstants.AFFILIATION,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_ID,
PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_NAME))
.collect(Collectors.toList()) .collect(Collectors.toList())
.iterator(), .iterator(),
Encoders.bean(Relation.class)); Encoders.bean(Relation.class));
} }
} }

View File

@ -40,5 +40,11 @@
"paramLongName": "relationPath", "paramLongName": "relationPath",
"paramDescription": "the path where to store the selected subset of relations", "paramDescription": "the path where to store the selected subset of relations",
"paramRequired": false "paramRequired": false
},
{
"paramName": "pop",
"paramLongName": "projectOrganizationPath",
"paramDescription": "the number of iterations to be computed",
"paramRequired": true
} }
] ]

View File

@ -52,5 +52,10 @@
"paramLongName": "iterations", "paramLongName": "iterations",
"paramDescription": "the number of iterations to be computed", "paramDescription": "the number of iterations to be computed",
"paramRequired": false "paramRequired": false
},{
"paramName": "pop",
"paramLongName": "projectOrganizationPath",
"paramDescription": "the number of iterations to be computed",
"paramRequired": true
} }
] ]

View File

@ -134,7 +134,7 @@
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>PrepareResultOrganizationAssociation</name> <name>PrepareResultOrganizationAssociation</name>
<class>eu.dnetlib.dhp.resulttoorganizationfromsemrel.PrepareInfo</class> <class>eu.dnetlib.dhp.entitytoorganizationfromsemrel.PrepareInfo</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar> <jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -150,6 +150,7 @@
<arg>--leavesPath</arg><arg>${workingDir}/preparedInfo/leavesPath</arg> <arg>--leavesPath</arg><arg>${workingDir}/preparedInfo/leavesPath</arg>
<arg>--childParentPath</arg><arg>${workingDir}/preparedInfo/childParentPath</arg> <arg>--childParentPath</arg><arg>${workingDir}/preparedInfo/childParentPath</arg>
<arg>--resultOrgPath</arg><arg>${workingDir}/preparedInfo/resultOrgPath</arg> <arg>--resultOrgPath</arg><arg>${workingDir}/preparedInfo/resultOrgPath</arg>
<arg>--projectOrganizationPath</arg><arg>${workingDir}/preparedInfo/projectOrganizationPath</arg>
<arg>--relationPath</arg><arg>${workingDir}/preparedInfo/relation</arg> <arg>--relationPath</arg><arg>${workingDir}/preparedInfo/relation</arg>
</spark> </spark>
<ok to="apply_resulttoorganization_propagation"/> <ok to="apply_resulttoorganization_propagation"/>
@ -161,7 +162,7 @@
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>resultToOrganizationFromSemRel</name> <name>resultToOrganizationFromSemRel</name>
<class>eu.dnetlib.dhp.resulttoorganizationfromsemrel.SparkResultToOrganizationFromSemRel</class> <class>eu.dnetlib.dhp.entitytoorganizationfromsemrel.SparkResultToOrganizationFromSemRel</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar> <jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}

View File

@ -1,22 +1,17 @@
package eu.dnetlib.dhp.resulttoorganizationfromsemrel; package eu.dnetlib.dhp.entitytoorganizationfromsemrel;
import static eu.dnetlib.dhp.PropagationConstant.readPath;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.List;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
@ -28,7 +23,6 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.KeyValueSet; import eu.dnetlib.dhp.KeyValueSet;
import eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
public class PrepareInfoJobTest { public class PrepareInfoJobTest {
@ -78,11 +72,12 @@ public class PrepareInfoJobTest {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-graphPath", getClass() "-graphPath", getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest1") "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/childparenttest1")
.getPath(), .getPath(),
"-hive_metastore_uris", "", "-hive_metastore_uris", "",
"-leavesPath", workingDir.toString() + "/currentIteration/", "-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-projectOrganizationPath", workingDir.toString() + "/projectOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/", "-childParentPath", workingDir.toString() + "/childParentOrg/",
"-relationPath", workingDir.toString() + "/relation" "-relationPath", workingDir.toString() + "/relation"
@ -223,11 +218,12 @@ public class PrepareInfoJobTest {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-graphPath", getClass() "-graphPath", getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest2") "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/childparenttest2")
.getPath(), .getPath(),
"-hive_metastore_uris", "", "-hive_metastore_uris", "",
"-leavesPath", workingDir.toString() + "/currentIteration/", "-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-projectOrganizationPath", workingDir.toString() + "/projectOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/", "-childParentPath", workingDir.toString() + "/childParentOrg/",
"-relationPath", workingDir.toString() + "/relation" "-relationPath", workingDir.toString() + "/relation"
@ -343,11 +339,12 @@ public class PrepareInfoJobTest {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-graphPath", getClass() "-graphPath", getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest") "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/resultorganizationtest")
.getPath(), .getPath(),
"-hive_metastore_uris", "", "-hive_metastore_uris", "",
"-leavesPath", workingDir.toString() + "/currentIteration/", "-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-projectOrganizationPath", workingDir.toString() + "/projectOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/", "-childParentPath", workingDir.toString() + "/childParentOrg/",
"-relationPath", workingDir.toString() + "/relation" "-relationPath", workingDir.toString() + "/relation"
@ -355,7 +352,7 @@ public class PrepareInfoJobTest {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Relation> tmp = sc JavaRDD<Relation> tmp = sc
.textFile(workingDir.toString() + "/relation") .textFile(workingDir.toString() + "/relation/result")
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); .map(item -> OBJECT_MAPPER.readValue(item, Relation.class));
Dataset<Relation> verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); Dataset<Relation> verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
@ -373,11 +370,12 @@ public class PrepareInfoJobTest {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-graphPath", getClass() "-graphPath", getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest") "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/resultorganizationtest")
.getPath(), .getPath(),
"-hive_metastore_uris", "", "-hive_metastore_uris", "",
"-leavesPath", workingDir.toString() + "/currentIteration/", "-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-projectOrganizationPath", workingDir.toString() + "/projectOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/", "-childParentPath", workingDir.toString() + "/childParentOrg/",
"-relationPath", workingDir.toString() + "/relation" "-relationPath", workingDir.toString() + "/relation"
@ -507,11 +505,12 @@ public class PrepareInfoJobTest {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-graphPath", getClass() "-graphPath", getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest") "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/resultorganizationtest")
.getPath(), .getPath(),
"-hive_metastore_uris", "", "-hive_metastore_uris", "",
"-leavesPath", workingDir.toString() + "/currentIteration/", "-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-projectOrganizationPath", workingDir.toString() + "/projectOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/", "-childParentPath", workingDir.toString() + "/childParentOrg/",
"-relationPath", workingDir.toString() + "/relation" "-relationPath", workingDir.toString() + "/relation"
@ -534,11 +533,12 @@ public class PrepareInfoJobTest {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-graphPath", getClass() "-graphPath", getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest1") "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/childparenttest1")
.getPath(), .getPath(),
"-hive_metastore_uris", "", "-hive_metastore_uris", "",
"-leavesPath", workingDir.toString() + "/currentIteration/", "-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-projectOrganizationPath", workingDir.toString() + "/projectOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/", "-childParentPath", workingDir.toString() + "/childParentOrg/",
"-relationPath", workingDir.toString() + "/relation" "-relationPath", workingDir.toString() + "/relation"

View File

@ -1,7 +1,6 @@
package eu.dnetlib.dhp.resulttoorganizationfromsemrel; package eu.dnetlib.dhp.entitytoorganizationfromsemrel;
import static eu.dnetlib.dhp.PropagationConstant.isSparkSessionManaged;
import static eu.dnetlib.dhp.PropagationConstant.readPath; import static eu.dnetlib.dhp.PropagationConstant.readPath;
import java.io.IOException; import java.io.IOException;
@ -12,7 +11,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.resulttoorganizationfromsemrel; package eu.dnetlib.dhp.entitytoorganizationfromsemrel;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;