refactoring

This commit is contained in:
Miriam Baglioni 2023-05-31 18:56:58 +02:00
parent 97d72d41c3
commit daf4d7971b
7 changed files with 518 additions and 496 deletions

View File

@ -174,10 +174,10 @@ public class PropagationConstant {
return newRelations; return newRelations;
} }
public static Relation getRelation(String source, String target, String rel_class){ public static Relation getRelation(String source, String target, String rel_class) {
if (ModelConstants.HAS_PARTICIPANT.equals(rel_class)){ if (ModelConstants.HAS_PARTICIPANT.equals(rel_class)) {
return getParticipantRelation(source, target, rel_class); return getParticipantRelation(source, target, rel_class);
}else } else
return getAffiliationRelation(source, target, rel_class); return getAffiliationRelation(source, target, rel_class);
} }
@ -185,7 +185,8 @@ public class PropagationConstant {
String source, String source,
String target, String target,
String rel_class) { String rel_class) {
return getRelation(source, target , return getRelation(
source, target,
rel_class, rel_class,
ModelConstants.PROJECT_ORGANIZATION, ModelConstants.PROJECT_ORGANIZATION,
ModelConstants.PARTICIPATION, ModelConstants.PARTICIPATION,
@ -198,7 +199,8 @@ public class PropagationConstant {
String source, String source,
String target, String target,
String rel_class) { String rel_class) {
return getRelation(source, target , return getRelation(
source, target,
rel_class, rel_class,
ModelConstants.RESULT_ORGANIZATION, ModelConstants.RESULT_ORGANIZATION,
ModelConstants.AFFILIATION, ModelConstants.AFFILIATION,

View File

@ -7,7 +7,6 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.io.Serializable; import java.io.Serializable;
import java.util.*; import java.util.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*; import org.apache.spark.api.java.function.*;
@ -15,6 +14,8 @@ import org.apache.spark.sql.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.KeyValueSet; import eu.dnetlib.dhp.KeyValueSet;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob; import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob;
@ -103,7 +104,8 @@ public class PrepareInfo implements Serializable {
} }
private static void prepareInfo(SparkSession spark, String inputPath, String childParentOrganizationPath, private static void prepareInfo(SparkSession spark, String inputPath, String childParentOrganizationPath,
String currentIterationPath, String resultOrganizationPath, String projectOrganizationPath, String relationPath) { String currentIterationPath, String resultOrganizationPath, String projectOrganizationPath,
String relationPath) {
Dataset<Relation> relation = readPath(spark, inputPath + "/relation", Relation.class); Dataset<Relation> relation = readPath(spark, inputPath + "/relation", Relation.class);
relation.createOrReplaceTempView("relation"); relation.createOrReplaceTempView("relation");

View File

@ -155,7 +155,8 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
} }
private static void doPropagate(SparkSession spark, String leavesPath, String childParentPath, private static void doPropagate(SparkSession spark, String leavesPath, String childParentPath,
String resultOrganizationPath, String projectOrganizationPath, String graphPath, String workingPath, String outputPath, String resultOrganizationPath, String projectOrganizationPath, String graphPath, String workingPath,
String outputPath,
PropagationCounter propagationCounter) { PropagationCounter propagationCounter) {
int iteration = 0; int iteration = 0;
long leavesCount; long leavesCount;
@ -173,7 +174,7 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
StepActions StepActions
.prepareForNextStep( .prepareForNextStep(
spark, workingPath , resultOrganizationPath, projectOrganizationPath, leavesPath, spark, workingPath, resultOrganizationPath, projectOrganizationPath, leavesPath,
childParentPath, workingPath + "/leaves", workingPath + "/resOrg", workingPath + "/projOrg"); childParentPath, workingPath + "/leaves", workingPath + "/resOrg", workingPath + "/projOrg");
moveOutput(spark, workingPath, leavesPath, resultOrganizationPath, projectOrganizationPath); moveOutput(spark, workingPath, leavesPath, resultOrganizationPath, projectOrganizationPath);
leavesCount = readPath(spark, leavesPath, Leaves.class).count(); leavesCount = readPath(spark, leavesPath, Leaves.class).count();
@ -253,15 +254,14 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
.mapGroups( .mapGroups(
(MapGroupsFunction<String, Relation, Relation>) (k, it) -> it.next(), Encoders.bean(Relation.class)) (MapGroupsFunction<String, Relation, Relation>) (k, it) -> it.next(), Encoders.bean(Relation.class))
.flatMap( .flatMap(
(FlatMapFunction<Relation, Relation>) r -> (FlatMapFunction<Relation, Relation>) r -> {
{ if (r.getSource().startsWith("50|")) {
if(r.getSource().startsWith("50|")){
return Arrays return Arrays
.asList( .asList(
r, getAffiliationRelation( r, getAffiliationRelation(
r.getTarget(), r.getSource(), ModelConstants.IS_AUTHOR_INSTITUTION_OF)) r.getTarget(), r.getSource(), ModelConstants.IS_AUTHOR_INSTITUTION_OF))
.iterator(); .iterator();
}else{ } else {
return Arrays return Arrays
.asList( .asList(
r, getParticipantRelation( r, getParticipantRelation(
@ -270,8 +270,6 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
} }
} }
, Encoders.bean(Relation.class)) , Encoders.bean(Relation.class))
.write() .write()

View File

@ -50,7 +50,8 @@ public class StepActions implements Serializable {
spark, resultOrgPath, readPath(spark, selectedRelsPath, Relation.class), orgOutputPath); spark, resultOrgPath, readPath(spark, selectedRelsPath, Relation.class), orgOutputPath);
} }
public static void prepareForNextStep(SparkSession spark, String selectedRelsPath, String resultOrgPath, String projectOrgPath, public static void prepareForNextStep(SparkSession spark, String selectedRelsPath, String resultOrgPath,
String projectOrgPath,
String leavesPath, String chldParentOrgPath, String leavesOutputPath, String leavesPath, String chldParentOrgPath, String leavesOutputPath,
String orgOutputPath, String outputProjectPath) { String orgOutputPath, String outputProjectPath) {
// use of the parents as new leaves set // use of the parents as new leaves set
@ -58,10 +59,12 @@ public class StepActions implements Serializable {
// add the new relations obtained from propagation to the keyvalueset result organization // add the new relations obtained from propagation to the keyvalueset result organization
updateEntityOrganization( updateEntityOrganization(
spark, resultOrgPath, readPath(spark, selectedRelsPath + NEW_RESULT_RELATION_PATH, Relation.class), orgOutputPath); spark, resultOrgPath, readPath(spark, selectedRelsPath + NEW_RESULT_RELATION_PATH, Relation.class),
orgOutputPath);
updateEntityOrganization( updateEntityOrganization(
spark, projectOrgPath, readPath(spark, selectedRelsPath + NEW_PROJECT_RELATION_PATH, Relation.class), outputProjectPath); spark, projectOrgPath, readPath(spark, selectedRelsPath + NEW_PROJECT_RELATION_PATH, Relation.class),
outputProjectPath);
} }
private static void updateEntityOrganization(SparkSession spark, String entityOrgPath, private static void updateEntityOrganization(SparkSession spark, String entityOrgPath,
@ -128,7 +131,6 @@ public class StepActions implements Serializable {
// construction of the set) // construction of the set)
// if at least one relation in the set was not produced by propagation no new relation will be returned // if at least one relation in the set was not produced by propagation no new relation will be returned
relationDataset relationDataset
.union(newRels) .union(newRels)
.groupByKey((MapFunction<Relation, String>) r -> r.getSource() + r.getTarget(), Encoders.STRING()) .groupByKey((MapFunction<Relation, String>) r -> r.getSource() + r.getTarget(), Encoders.STRING())
@ -145,7 +147,8 @@ public class StepActions implements Serializable {
.getDataInfo() .getDataInfo()
.getProvenanceaction() .getProvenanceaction()
.getClassid() .getClassid()
.equals(PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_ID) && !rel .equals(PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_ID)
&& !rel
.getDataInfo() .getDataInfo()
.getProvenanceaction() .getProvenanceaction()
.getClassid() .getClassid()
@ -166,8 +169,6 @@ public class StepActions implements Serializable {
.option("compression", "gzip") .option("compression", "gzip")
.json(newRelationPath); .json(newRelationPath);
} }
// get the possible relations from propagation // get the possible relations from propagation
@ -202,7 +203,6 @@ public class StepActions implements Serializable {
"GROUP BY entityId") "GROUP BY entityId")
.as(Encoders.bean(KeyValueSet.class)); .as(Encoders.bean(KeyValueSet.class));
// create new relations from entity to organization for each entity linked to a leaf // create new relations from entity to organization for each entity linked to a leaf
return resultParent return resultParent
.flatMap( .flatMap(
@ -220,6 +220,4 @@ public class StepActions implements Serializable {
} }
} }

View File

@ -161,19 +161,24 @@ public class SparkJobTest {
.foreach(r -> Assertions.assertEquals(ModelConstants.HAS_AUTHOR_INSTITUTION, r.getRelClass())); .foreach(r -> Assertions.assertEquals(ModelConstants.HAS_AUTHOR_INSTITUTION, r.getRelClass()));
Assertions Assertions
.assertEquals( .assertEquals(
2, result.filter(r -> r.getSource().equals("50|doajarticles::1cae0b82b56ccd97c2db1f698def7074")).count()); 2,
result.filter(r -> r.getSource().equals("50|doajarticles::1cae0b82b56ccd97c2db1f698def7074")).count());
Assertions Assertions
.assertEquals( .assertEquals(
3, result.filter(r -> r.getSource().equals("50|dedup_wf_001::2899e571609779168222fdeb59cb916d")).count()); 3,
result.filter(r -> r.getSource().equals("50|dedup_wf_001::2899e571609779168222fdeb59cb916d")).count());
Assertions Assertions
.assertEquals( .assertEquals(
2, result.filter(r -> r.getSource().equals("50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")).count()); 2,
result.filter(r -> r.getSource().equals("50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")).count());
Assertions Assertions
.assertEquals( .assertEquals(
1, result.filter(r -> r.getSource().equals("50|openaire____::ec653e804967133b9436fdd30d3ff51d")).count()); 1,
result.filter(r -> r.getSource().equals("50|openaire____::ec653e804967133b9436fdd30d3ff51d")).count());
Assertions Assertions
.assertEquals( .assertEquals(
1, result.filter(r -> r.getSource().equals("50|doajarticles::03748bcb5d754c951efec9700e18a56d")).count()); 1,
result.filter(r -> r.getSource().equals("50|doajarticles::03748bcb5d754c951efec9700e18a56d")).count());
Assertions.assertEquals(9, result.filter(r -> r.getSource().substring(0, 3).equals("20|")).count()); Assertions.assertEquals(9, result.filter(r -> r.getSource().substring(0, 3).equals("20|")).count());
result result
@ -181,19 +186,24 @@ public class SparkJobTest {
.foreach(r -> Assertions.assertEquals(ModelConstants.IS_AUTHOR_INSTITUTION_OF, r.getRelClass())); .foreach(r -> Assertions.assertEquals(ModelConstants.IS_AUTHOR_INSTITUTION_OF, r.getRelClass()));
Assertions Assertions
.assertEquals( .assertEquals(
1, result.filter(r -> r.getSource().equals("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074")).count()); 1,
result.filter(r -> r.getSource().equals("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074")).count());
Assertions Assertions
.assertEquals( .assertEquals(
1, result.filter(r -> r.getSource().equals("20|dedup_wf_001::2899e571609779168222fdeb59cb916d")).count()); 1,
result.filter(r -> r.getSource().equals("20|dedup_wf_001::2899e571609779168222fdeb59cb916d")).count());
Assertions Assertions
.assertEquals( .assertEquals(
2, result.filter(r -> r.getSource().equals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")).count()); 2,
result.filter(r -> r.getSource().equals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")).count());
Assertions Assertions
.assertEquals( .assertEquals(
2, result.filter(r -> r.getSource().equals("20|openaire____::ec653e804967133b9436fdd30d3ff51d")).count()); 2,
result.filter(r -> r.getSource().equals("20|openaire____::ec653e804967133b9436fdd30d3ff51d")).count());
Assertions Assertions
.assertEquals( .assertEquals(
3, result.filter(r -> r.getSource().equals("20|doajarticles::03748bcb5d754c951efec9700e18a56d")).count()); 3,
result.filter(r -> r.getSource().equals("20|doajarticles::03748bcb5d754c951efec9700e18a56d")).count());
Assertions Assertions
.assertTrue( .assertTrue(
@ -395,7 +405,8 @@ public class SparkJobTest {
Assertions.assertEquals(36, temp.count()); Assertions.assertEquals(36, temp.count());
JavaRDD<Relation> project = temp.filter(r -> r.getSource().startsWith("40|") || r.getTarget().startsWith("40|")); JavaRDD<Relation> project = temp
.filter(r -> r.getSource().startsWith("40|") || r.getTarget().startsWith("40|"));
Assertions.assertEquals(18, project.count()); Assertions.assertEquals(18, project.count());
project.foreach(r -> Assertions.assertEquals(ModelConstants.PARTICIPATION, r.getSubRelType())); project.foreach(r -> Assertions.assertEquals(ModelConstants.PARTICIPATION, r.getSubRelType()));
@ -430,19 +441,24 @@ public class SparkJobTest {
.foreach(r -> Assertions.assertEquals(ModelConstants.HAS_PARTICIPANT, r.getRelClass())); .foreach(r -> Assertions.assertEquals(ModelConstants.HAS_PARTICIPANT, r.getRelClass()));
Assertions Assertions
.assertEquals( .assertEquals(
2, project.filter(r -> r.getSource().equals("40|doajarticles::1cae0b82b56ccd97c2db1f698def7074")).count()); 2,
project.filter(r -> r.getSource().equals("40|doajarticles::1cae0b82b56ccd97c2db1f698def7074")).count());
Assertions Assertions
.assertEquals( .assertEquals(
3, project.filter(r -> r.getSource().equals("40|dedup_wf_001::2899e571609779168222fdeb59cb916d")).count()); 3,
project.filter(r -> r.getSource().equals("40|dedup_wf_001::2899e571609779168222fdeb59cb916d")).count());
Assertions Assertions
.assertEquals( .assertEquals(
2, project.filter(r -> r.getSource().equals("40|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")).count()); 2,
project.filter(r -> r.getSource().equals("40|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")).count());
Assertions Assertions
.assertEquals( .assertEquals(
1, project.filter(r -> r.getSource().equals("40|openaire____::ec653e804967133b9436fdd30d3ff51d")).count()); 1,
project.filter(r -> r.getSource().equals("40|openaire____::ec653e804967133b9436fdd30d3ff51d")).count());
Assertions Assertions
.assertEquals( .assertEquals(
1, project.filter(r -> r.getSource().equals("40|doajarticles::03748bcb5d754c951efec9700e18a56d")).count()); 1,
project.filter(r -> r.getSource().equals("40|doajarticles::03748bcb5d754c951efec9700e18a56d")).count());
Assertions.assertEquals(9, project.filter(r -> r.getSource().substring(0, 3).equals("20|")).count()); Assertions.assertEquals(9, project.filter(r -> r.getSource().substring(0, 3).equals("20|")).count());
project project
@ -450,19 +466,24 @@ public class SparkJobTest {
.foreach(r -> Assertions.assertEquals(ModelConstants.IS_PARTICIPANT, r.getRelClass())); .foreach(r -> Assertions.assertEquals(ModelConstants.IS_PARTICIPANT, r.getRelClass()));
Assertions Assertions
.assertEquals( .assertEquals(
1, project.filter(r -> r.getSource().equals("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074")).count()); 1,
project.filter(r -> r.getSource().equals("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074")).count());
Assertions Assertions
.assertEquals( .assertEquals(
1, project.filter(r -> r.getSource().equals("20|dedup_wf_001::2899e571609779168222fdeb59cb916d")).count()); 1,
project.filter(r -> r.getSource().equals("20|dedup_wf_001::2899e571609779168222fdeb59cb916d")).count());
Assertions Assertions
.assertEquals( .assertEquals(
2, project.filter(r -> r.getSource().equals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")).count()); 2,
project.filter(r -> r.getSource().equals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")).count());
Assertions Assertions
.assertEquals( .assertEquals(
2, project.filter(r -> r.getSource().equals("20|openaire____::ec653e804967133b9436fdd30d3ff51d")).count()); 2,
project.filter(r -> r.getSource().equals("20|openaire____::ec653e804967133b9436fdd30d3ff51d")).count());
Assertions Assertions
.assertEquals( .assertEquals(
3, project.filter(r -> r.getSource().equals("20|doajarticles::03748bcb5d754c951efec9700e18a56d")).count()); 3,
project.filter(r -> r.getSource().equals("20|doajarticles::03748bcb5d754c951efec9700e18a56d")).count());
Assertions Assertions
.assertTrue( .assertTrue(

View File

@ -87,7 +87,8 @@ public class StepActionsTest {
getClass() getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/resultOrganization/") "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/resultOrganization/")
.getPath(), ModelConstants.HAS_AUTHOR_INSTITUTION); .getPath(),
ModelConstants.HAS_AUTHOR_INSTITUTION);
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());