[Affilaition Propagation] moved the selection of graph relation as a preparation step

This commit is contained in:
Miriam Baglioni 2021-11-16 15:24:19 +01:00
parent 7c96e3fd46
commit 28ea532ece
8 changed files with 71 additions and 28 deletions

View File

@ -73,6 +73,9 @@ public class PrepareInfo implements Serializable {
final String resultOrganizationPath = parser.get("resultOrgPath"); final String resultOrganizationPath = parser.get("resultOrgPath");
log.info("resultOrganizationPath: {}", resultOrganizationPath); log.info("resultOrganizationPath: {}", resultOrganizationPath);
final String relationPath = parser.get("relationPath");
log.info("relationPath: {}", relationPath);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
@ -84,11 +87,12 @@ public class PrepareInfo implements Serializable {
graphPath, graphPath,
childParentPath, childParentPath,
leavesPath, leavesPath,
resultOrganizationPath)); resultOrganizationPath,
relationPath));
} }
private static void prepareInfo(SparkSession spark, String inputPath, String childParentOrganizationPath, private static void prepareInfo(SparkSession spark, String inputPath, String childParentOrganizationPath,
String currentIterationPath, String resultOrganizationPath) { String currentIterationPath, String resultOrganizationPath, String relationPath) {
Dataset<Relation> relation = readPath(spark, inputPath + "/relation", Relation.class); Dataset<Relation> relation = readPath(spark, inputPath + "/relation", Relation.class);
relation.createOrReplaceTempView("relation"); relation.createOrReplaceTempView("relation");
@ -108,6 +112,15 @@ public class PrepareInfo implements Serializable {
.option("compression", "gzip") .option("compression", "gzip")
.json(resultOrganizationPath); .json(resultOrganizationPath);
relation
.filter(
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
r.getRelClass().equals(ModelConstants.HAS_AUTHOR_INSTITUTION))
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.json(relationPath);
Dataset<String> children = spark Dataset<String> children = spark
.sql( .sql(
"Select distinct target as child from relation where " + "Select distinct target as child from relation where " +

View File

@ -49,8 +49,8 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
Boolean isSparkSessionManaged = isSparkSessionManaged(parser); Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String graphPath = parser.get("graphPath"); String relationPath = parser.get("relationPath");
log.info("graphPath: {}", graphPath); log.info("relationPath: {}", relationPath);
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
@ -78,7 +78,7 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
leavesPath, leavesPath,
childParentPath, childParentPath,
resultOrganizationPath, resultOrganizationPath,
graphPath, relationPath,
workingPath, workingPath,
outputPath)); outputPath));
} }

View File

@ -26,16 +26,11 @@ import scala.Tuple2;
public class StepActions implements Serializable { public class StepActions implements Serializable {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void execStep(SparkSession spark, public static void execStep(SparkSession spark,
String graphPath, String newRelationPath, String graphPath, String newRelationPath,
String leavesPath, String chldParentOrgPath, String resultOrgPath) { String leavesPath, String chldParentOrgPath, String resultOrgPath) {
Dataset<Relation> relationGraph = readPath(spark, graphPath + "/relation", Relation.class) Dataset<Relation> relationGraph = readPath(spark, graphPath, Relation.class);
.filter(
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
r.getRelClass().equals(ModelConstants.HAS_AUTHOR_INSTITUTION));
// select only the relation source target among those proposed by propagation that are not already existent // select only the relation source target among those proposed by propagation that are not already existent
getNewRels( getNewRels(
newRelationPath, relationGraph, newRelationPath, relationGraph,
@ -80,8 +75,8 @@ public class StepActions implements Serializable {
ret.setValueSet(orgs); ret.setValueSet(orgs);
return ret; return ret;
}, Encoders.bean(KeyValueSet.class)) }, Encoders.bean(KeyValueSet.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath); .json(outputPath);
} }
@ -116,7 +111,7 @@ public class StepActions implements Serializable {
// union of new propagation relations to the relation set // union of new propagation relations to the relation set
// grouping from sourcetarget (we are sure the only relations are those from result to organization by // grouping from sourcetarget (we are sure the only relations are those from result to organization by
// construction of the set) // construction of the set)
// if at least one relation in the set was harvested no new relation will be returned // if at least one relation in the set was not produced by propagation no new relation will be returned
relationDataset relationDataset
.union(newRels) .union(newRels)

View File

@ -34,5 +34,11 @@
"paramLongName": "isSparkSessionManaged", "paramLongName": "isSparkSessionManaged",
"paramDescription": "the path where prepared info have been stored", "paramDescription": "the path where prepared info have been stored",
"paramRequired": false "paramRequired": false
},
{
"paramName": "rep",
"paramLongName": "relationPath",
"paramDescription": "the path where to store the selected subset of relations",
"paramRequired": false
} }
] ]

View File

@ -1,7 +1,7 @@
[ [
{ {
"paramName":"gp", "paramName":"rep",
"paramLongName":"graphPath", "paramLongName":"relationPath",
"paramDescription": "the path of the sequencial file to read", "paramDescription": "the path of the sequencial file to read",
"paramRequired": true "paramRequired": true
}, },

View File

@ -150,6 +150,7 @@
<arg>--leavesPath</arg><arg>${workingDir}/preparedInfo/leavesPath</arg> <arg>--leavesPath</arg><arg>${workingDir}/preparedInfo/leavesPath</arg>
<arg>--childParentPath</arg><arg>${workingDir}/preparedInfo/childParentPath</arg> <arg>--childParentPath</arg><arg>${workingDir}/preparedInfo/childParentPath</arg>
<arg>--resultOrgPath</arg><arg>${workingDir}/preparedInfo/resultOrgPath</arg> <arg>--resultOrgPath</arg><arg>${workingDir}/preparedInfo/resultOrgPath</arg>
<arg>--relationPath</arg><arg>${workingDir}/preparedInfo/relation</arg>
</spark> </spark>
<ok to="apply_resulttoorganization_propagation"/> <ok to="apply_resulttoorganization_propagation"/>
<error to="Kill"/> <error to="Kill"/>
@ -173,7 +174,7 @@
--conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${sourcePath}</arg> <arg>--relationPath</arg><arg>${workingDir}/preparedInfo/relation</arg>
<arg>--outputPath</arg><arg>${outputPath}/relation</arg> <arg>--outputPath</arg><arg>${outputPath}/relation</arg>
<arg>--leavesPath</arg><arg>${workingDir}/preparedInfo/leavesPath</arg> <arg>--leavesPath</arg><arg>${workingDir}/preparedInfo/leavesPath</arg>
<arg>--childParentPath</arg><arg>${workingDir}/preparedInfo/childParentPath</arg> <arg>--childParentPath</arg><arg>${workingDir}/preparedInfo/childParentPath</arg>

View File

@ -84,6 +84,7 @@ public class PrepareInfoJobTest {
"-leavesPath", workingDir.toString() + "/currentIteration/", "-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/", "-childParentPath", workingDir.toString() + "/childParentOrg/",
"-relationPath", workingDir.toString() + "/relation"
}); });
@ -228,6 +229,7 @@ public class PrepareInfoJobTest {
"-leavesPath", workingDir.toString() + "/currentIteration/", "-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/", "-childParentPath", workingDir.toString() + "/childParentOrg/",
"-relationPath", workingDir.toString() + "/relation"
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -332,6 +334,35 @@ public class PrepareInfoJobTest {
} }
@Test
public void relationTest()throws Exception {
PrepareInfo
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-graphPath", getClass()
.getResource(
"/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest")
.getPath(),
"-hive_metastore_uris", "",
"-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/",
"-relationPath", workingDir.toString() + "/relation"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Relation> tmp = sc
.textFile(workingDir.toString() + "/relation")
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class));
Dataset<Relation> verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
Assertions.assertEquals(7, verificationDs.count());
}
@Test @Test
public void resultOrganizationTest1() throws Exception { public void resultOrganizationTest1() throws Exception {
@ -347,6 +378,7 @@ public class PrepareInfoJobTest {
"-leavesPath", workingDir.toString() + "/currentIteration/", "-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/", "-childParentPath", workingDir.toString() + "/childParentOrg/",
"-relationPath", workingDir.toString() + "/relation"
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -467,10 +499,6 @@ public class PrepareInfoJobTest {
@Test @Test
public void foundLeavesTest1() throws Exception { public void foundLeavesTest1() throws Exception {
// PrepareInfo.prepareInfo(spark, getClass()
// .getResource(
// "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest")
// .getPath(), workingDir.toString() + "/childParentOrg/", workingDir.toString() + "/currentIteration/",workingDir.toString() + "/resultOrganization/");
PrepareInfo PrepareInfo
.main( .main(
@ -484,6 +512,7 @@ public class PrepareInfoJobTest {
"-leavesPath", workingDir.toString() + "/currentIteration/", "-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/", "-childParentPath", workingDir.toString() + "/childParentOrg/",
"-relationPath", workingDir.toString() + "/relation"
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -510,12 +539,9 @@ public class PrepareInfoJobTest {
"-leavesPath", workingDir.toString() + "/currentIteration/", "-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/", "-childParentPath", workingDir.toString() + "/childParentOrg/",
"-relationPath", workingDir.toString() + "/relation"
}); });
// PrepareInfo.prepareInfo(spark, getClass()
// .getResource(
// "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest1")
// .getPath(), workingDir.toString() + "/childParentOrg/", workingDir.toString() + "/currentIteration/",workingDir.toString() + "/resultOrganization/");
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());

View File

@ -101,9 +101,9 @@ public class SparkJobTest {
.main( .main(
new String[] { new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-graphPath", graphPath, "-relationPath", graphPath,
"-hive_metastore_uris", "", "-hive_metastore_uris", "",
"-outputPath", workingDir.toString() + "/relation", "-outputPath", workingDir.toString() + "/finalrelation",
"-leavesPath", workingDir.toString() + "/leavesInput", "-leavesPath", workingDir.toString() + "/leavesInput",
"-resultOrgPath", workingDir.toString() + "/orgsInput", "-resultOrgPath", workingDir.toString() + "/orgsInput",
"-childParentPath", childParentPath, "-childParentPath", childParentPath,
@ -113,9 +113,11 @@ public class SparkJobTest {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Relation> tmp = sc JavaRDD<Relation> tmp = sc
.textFile(workingDir.toString() + "/relation") .textFile(workingDir.toString() + "/finalrelation")
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); .map(item -> OBJECT_MAPPER.readValue(item, Relation.class));
tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
Assertions.assertEquals(18, tmp.count()); Assertions.assertEquals(18, tmp.count());
tmp.foreach(r -> Assertions.assertEquals(ModelConstants.AFFILIATION, r.getSubRelType())); tmp.foreach(r -> Assertions.assertEquals(ModelConstants.AFFILIATION, r.getSubRelType()));
tmp.foreach(r -> Assertions.assertEquals(ModelConstants.RESULT_ORGANIZATION, r.getRelType())); tmp.foreach(r -> Assertions.assertEquals(ModelConstants.RESULT_ORGANIZATION, r.getRelType()));