This commit is contained in:
Miriam Baglioni 2021-12-13 15:01:40 +01:00
parent 4bb1d43afc
commit 8d755cca80
9 changed files with 94 additions and 76 deletions

View File

@ -41,7 +41,7 @@ public class CreateContextEntities implements Serializable {
.toString( .toString(
CreateContextEntities.class CreateContextEntities.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_entity_parameter.json")); "/eu/dnetlib/dhp/oa/graph/dump/input_entity_parameter.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);

View File

@ -48,7 +48,7 @@ public class CreateContextRelation implements Serializable {
.requireNonNull( .requireNonNull(
CreateContextRelation.class CreateContextRelation.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_entity_parameter.json"))); "/eu/dnetlib/dhp/oa/graph/dump/input_entity_parameter.json")));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);

View File

@ -31,7 +31,7 @@ public class SparkCollectAndSave implements Serializable {
.toString( .toString(
SparkCollectAndSave.class SparkCollectAndSave.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_collect_and_save.json")); "/eu/dnetlib/dhp/oa/graph/dump/input_collect_and_save.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);

View File

@ -22,7 +22,7 @@ public class SparkDumpEntitiesJob implements Serializable {
.toString( .toString(
SparkDumpEntitiesJob.class SparkDumpEntitiesJob.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/wf/input_parameters.json")); "/eu/dnetlib/dhp/oa/graph/dump/input_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);

View File

@ -38,7 +38,7 @@ public class SparkDumpRelationJob implements Serializable {
.toString( .toString(
SparkDumpRelationJob.class SparkDumpRelationJob.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_relationdump_parameters.json")); "/eu/dnetlib/dhp/oa/graph/dump/input_relationdump_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);
@ -55,13 +55,12 @@ public class SparkDumpRelationJob implements Serializable {
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
Optional<String> rs = Optional.ofNullable(parser.get("removeSet")); Optional<String> rs = Optional.ofNullable(parser.get("removeSet"));
final Set<String> removeSet = new HashSet<>(); final Set<String> removeSet = new HashSet<>();
if(rs.isPresent()){ if (rs.isPresent()) {
Collections.addAll(removeSet, rs.get().split(";")); Collections.addAll(removeSet, rs.get().split(";"));
} }
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
@ -78,7 +77,7 @@ public class SparkDumpRelationJob implements Serializable {
private static void dumpRelation(SparkSession spark, String inputPath, String outputPath, Set<String> removeSet) { private static void dumpRelation(SparkSession spark, String inputPath, String outputPath, Set<String> removeSet) {
Dataset<Relation> relations = Utils.readPath(spark, inputPath, Relation.class); Dataset<Relation> relations = Utils.readPath(spark, inputPath, Relation.class);
relations relations
.filter((FilterFunction<Relation>)r -> !removeSet.contains(r.getRelClass())) .filter((FilterFunction<Relation>) r -> !removeSet.contains(r.getRelClass()))
.map((MapFunction<Relation, eu.dnetlib.dhp.schema.dump.oaf.graph.Relation>) relation -> { .map((MapFunction<Relation, eu.dnetlib.dhp.schema.dump.oaf.graph.Relation>) relation -> {
eu.dnetlib.dhp.schema.dump.oaf.graph.Relation relNew = new eu.dnetlib.dhp.schema.dump.oaf.graph.Relation(); eu.dnetlib.dhp.schema.dump.oaf.graph.Relation relNew = new eu.dnetlib.dhp.schema.dump.oaf.graph.Relation();
relNew relNew

View File

@ -39,7 +39,7 @@ public class SparkOrganizationRelation implements Serializable {
.toString( .toString(
SparkOrganizationRelation.class SparkOrganizationRelation.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_organization_parameters.json")); "/eu/dnetlib/dhp/oa/graph/dump/input_organization_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);

View File

@ -35,7 +35,7 @@ public class SparkSelectValidRelationsJob implements Serializable {
.toString( .toString(
SparkSelectValidRelationsJob.class SparkSelectValidRelationsJob.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_relationdump_parameters.json")); "/eu/dnetlib/dhp/oa/graph/dump/input_relationdump_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);

View File

@ -917,7 +917,7 @@ public class DumpJobTest {
DumpProducts dump = new DumpProducts(); DumpProducts dump = new DumpProducts();
dump dump
.run( .run(
false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class, false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class,
GraphResult.class, Constants.DUMPTYPE.COMPLETE.getType()); GraphResult.class, Constants.DUMPTYPE.COMPLETE.getType());
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -949,18 +949,45 @@ public class DumpJobTest {
Assertions.assertTrue(temp.filter("id = '50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8'").count() == 1); Assertions.assertTrue(temp.filter("id = '50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8'").count() == 1);
temp = spark temp = spark
.sql( .sql(
"select id, inst.articleprocessingcharge.amount, inst.articleprocessingcharge.currency " + "select id, inst.articleprocessingcharge.amount, inst.articleprocessingcharge.currency " +
"from check " + "from check " +
"lateral view explode (instance) i as inst " + "lateral view explode (instance) i as inst " +
"where inst.articleprocessingcharge is not null"); "where inst.articleprocessingcharge is not null");
Assertions
.assertEquals(
"3131.64",
temp
.filter("id = '50|datacite____::05c611fdfc93d7a2a703d1324e28104a'")
.collectAsList()
.get(0)
.getString(1));
Assertions
.assertEquals(
"EUR",
temp
.filter("id = '50|datacite____::05c611fdfc93d7a2a703d1324e28104a'")
.collectAsList()
.get(0)
.getString(2));
Assertions.assertEquals("3131.64", temp.filter("id = '50|datacite____::05c611fdfc93d7a2a703d1324e28104a'").collectAsList().get(0).getString(1)); Assertions
Assertions.assertEquals("EUR", temp.filter("id = '50|datacite____::05c611fdfc93d7a2a703d1324e28104a'").collectAsList().get(0).getString(2)); .assertEquals(
"2578.35",
Assertions.assertEquals("2578.35", temp.filter("id = '50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8'").collectAsList().get(0).getString(1)); temp
Assertions.assertEquals("EUR", temp.filter("id = '50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8'").collectAsList().get(0).getString(2)); .filter("id = '50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8'")
.collectAsList()
.get(0)
.getString(1));
Assertions
.assertEquals(
"EUR",
temp
.filter("id = '50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8'")
.collectAsList()
.get(0)
.getString(2));
} }
} }

View File

@ -4,10 +4,8 @@ package eu.dnetlib.dhp.oa.graph.dump.complete;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.HashMap; import java.util.HashMap;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
@ -83,7 +81,6 @@ public class DumpRelationTest {
"-sourcePath", sourcePath "-sourcePath", sourcePath
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Relation> tmp = sc JavaRDD<Relation> tmp = sc
@ -145,7 +142,6 @@ public class DumpRelationTest {
"-sourcePath", sourcePath "-sourcePath", sourcePath
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Relation> tmp = sc JavaRDD<Relation> tmp = sc
@ -207,105 +203,101 @@ public class DumpRelationTest {
@Test @Test
public void test3() throws Exception {// public void test3() throws Exception {//
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/relation/relation") .getResource("/eu/dnetlib/dhp/oa/graph/dump/relation/relation")
.getPath(); .getPath();
SparkDumpRelationJob.main(new String[] { SparkDumpRelationJob.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/relation", "-outputPath", workingDir.toString() + "/relation",
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-removeSet", "isParticipant" "-removeSet", "isParticipant"
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Relation> tmp = sc JavaRDD<Relation> tmp = sc
.textFile(workingDir.toString() + "/relation") .textFile(workingDir.toString() + "/relation")
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); .map(item -> OBJECT_MAPPER.readValue(item, Relation.class));
org.apache.spark.sql.Dataset<Relation> verificationDataset = spark org.apache.spark.sql.Dataset<Relation> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); .createDataset(tmp.rdd(), Encoders.bean(Relation.class));
verificationDataset.createOrReplaceTempView("table"); verificationDataset.createOrReplaceTempView("table");
verificationDataset verificationDataset
.foreach((ForeachFunction<Relation>) r -> System.out.println(new ObjectMapper().writeValueAsString(r))); .foreach((ForeachFunction<Relation>) r -> System.out.println(new ObjectMapper().writeValueAsString(r)));
Dataset<Row> check = spark Dataset<Row> check = spark
.sql( .sql(
"SELECT reltype.name, source.id source, source.type stype, target.id target,target.type ttype, provenance.provenance " "SELECT reltype.name, source.id source, source.type stype, target.id target,target.type ttype, provenance.provenance "
+ +
"from table "); "from table ");
Assertions.assertEquals(22, check.filter("name = 'isProvidedBy'").count()); Assertions.assertEquals(22, check.filter("name = 'isProvidedBy'").count());
Assertions Assertions
.assertEquals( .assertEquals(
22, check 22, check
.filter( .filter(
"name = 'isProvidedBy' and stype = 'datasource' and ttype = 'organization' and " + "name = 'isProvidedBy' and stype = 'datasource' and ttype = 'organization' and " +
"provenance = 'Harvested'") "provenance = 'Harvested'")
.count()); .count());
Assertions.assertEquals(0, check.filter("name = 'isParticipant'").count()); Assertions.assertEquals(0, check.filter("name = 'isParticipant'").count());
Assertions.assertEquals(1, check.filter("name = 'isAuthorInstitutionOf'").count()); Assertions.assertEquals(1, check.filter("name = 'isAuthorInstitutionOf'").count());
Assertions Assertions
.assertEquals( .assertEquals(
1, check 1, check
.filter( .filter(
"name = 'isAuthorInstitutionOf' and stype = 'organization' and ttype = 'result' " + "name = 'isAuthorInstitutionOf' and stype = 'organization' and ttype = 'result' " +
"and provenance = 'Inferred by OpenAIRE'") "and provenance = 'Inferred by OpenAIRE'")
.count()); .count());
} }
@Test @Test
public void test4() throws Exception {// public void test4() throws Exception {//
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/relation/relation") .getResource("/eu/dnetlib/dhp/oa/graph/dump/relation/relation")
.getPath(); .getPath();
SparkDumpRelationJob.main(new String[] { SparkDumpRelationJob.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/relation", "-outputPath", workingDir.toString() + "/relation",
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-removeSet", "isParticipant;isAuthorInstitutionOf" "-removeSet", "isParticipant;isAuthorInstitutionOf"
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Relation> tmp = sc JavaRDD<Relation> tmp = sc
.textFile(workingDir.toString() + "/relation") .textFile(workingDir.toString() + "/relation")
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); .map(item -> OBJECT_MAPPER.readValue(item, Relation.class));
org.apache.spark.sql.Dataset<Relation> verificationDataset = spark org.apache.spark.sql.Dataset<Relation> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); .createDataset(tmp.rdd(), Encoders.bean(Relation.class));
verificationDataset.createOrReplaceTempView("table"); verificationDataset.createOrReplaceTempView("table");
verificationDataset verificationDataset
.foreach((ForeachFunction<Relation>) r -> System.out.println(new ObjectMapper().writeValueAsString(r))); .foreach((ForeachFunction<Relation>) r -> System.out.println(new ObjectMapper().writeValueAsString(r)));
Dataset<Row> check = spark Dataset<Row> check = spark
.sql( .sql(
"SELECT reltype.name, source.id source, source.type stype, target.id target,target.type ttype, provenance.provenance " "SELECT reltype.name, source.id source, source.type stype, target.id target,target.type ttype, provenance.provenance "
+ +
"from table "); "from table ");
Assertions.assertEquals(22, check.filter("name = 'isProvidedBy'").count()); Assertions.assertEquals(22, check.filter("name = 'isProvidedBy'").count());
Assertions Assertions
.assertEquals( .assertEquals(
22, check 22, check
.filter( .filter(
"name = 'isProvidedBy' and stype = 'datasource' and ttype = 'organization' and " + "name = 'isProvidedBy' and stype = 'datasource' and ttype = 'organization' and " +
"provenance = 'Harvested'") "provenance = 'Harvested'")
.count()); .count());
Assertions.assertEquals(0, check.filter("name = 'isParticipant'").count()); Assertions.assertEquals(0, check.filter("name = 'isParticipant'").count());
Assertions.assertEquals(0, check.filter("name = 'isAuthorInstitutionOf'").count()); Assertions.assertEquals(0, check.filter("name = 'isAuthorInstitutionOf'").count());
} }