From 5a3f0d949cf49af360f8276c5109d5a22e023c4f Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 26 Oct 2023 08:48:52 +0200 Subject: [PATCH] - --- ...ExtendEoscResultWithOrganizationStep2.java | 76 +++++++----- .../dump/eosc/ExtendAffiliationTest.java | 116 ++++++++++++++++++ 2 files changed, 164 insertions(+), 28 deletions(-) create mode 100644 dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendAffiliationTest.java diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java index 3958bdd..48de341 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java @@ -74,9 +74,56 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { spark -> { Utils.removeOutputDir(spark, workingPath + "publicationextendedaffiliation"); addOrganizations(spark, inputPath, workingPath, outputPath, resultType); + dumpOrganizationAndRelations(spark, inputPath, workingPath, outputPath, resultType); }); } + private static void dumpOrganizationAndRelations(SparkSession spark, String inputPath, String workingPath, + String outputPath, String resultType) { + Dataset relation = Utils + .readPath(spark, inputPath + "/relation", Relation.class) + .filter( + (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && + r.getSubRelType().equalsIgnoreCase(ModelConstants.AFFILIATION)); + + Dataset organization = Utils + .readPath(spark, inputPath + "/organization", Organization.class) + .filter((FilterFunction) o -> !o.getDataInfo().getDeletedbyinference()); + + Dataset result = Utils.readPath(spark, workingPath + resultType, Result.class); + + // result -> organization takes the relation of type affiliation having the source in the results related to + // EOSC + Dataset eoscRelation = result + .joinWith(relation, result.col("id").equalTo(relation.col("source"))) + .map((MapFunction, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class)); + + System.out.println(eoscRelation.count()); + // from eoscRelation select the organization + eoscRelation + .joinWith(organization, eoscRelation.col("target").equalTo(organization.col("id"))) + .map( + (MapFunction, eu.dnetlib.dhp.eosc.model.Organization>) t2 -> mapOrganization( + t2._2()), + Encoders.bean(eu.dnetlib.dhp.eosc.model.Organization.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "organization"); + + eoscRelation + .joinWith(organization, eoscRelation.col("target").equalTo(organization.col("id"))) + .map( + (MapFunction, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> eu.dnetlib.dhp.eosc.model.Relation + .newInstance(t2._1().getSource(), t2._1().getTarget()), + Encoders.bean(eu.dnetlib.dhp.eosc.model.Relation.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "resultOrganization"); + + } + private static void addOrganizations(SparkSession spark, String inputPath, String workingPath, String outputPath, String resultType) { @@ -157,34 +204,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(workingPath + "publicationextendedaffiliation"); - - Dataset organizationWithAffiliation = relations - .joinWith(results, relations.col("target").equalTo(results.col("id"))) - .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)); - - organizationWithAffiliation - .joinWith(organizations, organizationWithAffiliation.col("source").equalTo(organizations.col("id"))) - .map( - (MapFunction, eu.dnetlib.dhp.eosc.model.Organization>) t2 -> mapOrganization( - t2._2()), - Encoders.bean(eu.dnetlib.dhp.eosc.model.Organization.class)) - .filter(Objects::nonNull) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "organization"); - - organizationWithAffiliation - .joinWith(organizations, organizationWithAffiliation.col("source").equalTo(organizations.col("id"))) - .map( - (MapFunction, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> eu.dnetlib.dhp.eosc.model.Relation - .newInstance(t2._1().getSource(), t2._1().getTarget()), - Encoders.bean(eu.dnetlib.dhp.eosc.model.Relation.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "resultOrganization"); + .json(workingPath + resultType + "extendedaffiliation"); } diff --git a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendAffiliationTest.java b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendAffiliationTest.java new file mode 100644 index 0000000..3c9034d --- /dev/null +++ b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendAffiliationTest.java @@ -0,0 +1,116 @@ + +package eu.dnetlib.dhp.oa.graph.dump.eosc; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Optional; + +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.eosc.model.Organization; +import eu.dnetlib.dhp.eosc.model.Result; + +/** + * @author miriam.baglioni + * @Date 25/10/23 + */ +public class ExtendAffiliationTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + + private static final Logger log = LoggerFactory + .getLogger(ExtendAffiliationTest.class); + + private static HashMap map = new HashMap<>(); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory(ExtendAffiliationTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(ExtendAffiliationTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(ExtendAffiliationTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void selectEoscResults() throws Exception { + + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/eosc/input") + .getPath(); + + final String workingPath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/eosc/working/") + .getPath(); + final String mdp = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/eosc/working/masterduplicate") + .getPath(); + + ExtendEoscResultWithOrganizationStep2.main(new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/", + "-sourcePath", sourcePath, + "-resultType", "publication", + "-workingPath", workingPath + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/organization") + .map(item -> OBJECT_MAPPER.readValue(item, Organization.class)); + + System.out.println(tmp.count()); + +// Assertions.assertEquals(3, tmp.count()); +// +// Assertions +// .assertEquals( +// 0, +// tmp +// .filter(r -> Optional.ofNullable(r.getAffiliation()).isPresent() && r.getAffiliation().size() > 0) +// .count()); +// +// tmp.foreach(r -> System.out.println(new ObjectMapper().writeValueAsString(r))); + + } + +}