This commit is contained in:
Miriam Baglioni 2023-10-26 08:48:52 +02:00
parent c946f5c5b8
commit 5a3f0d949c
2 changed files with 164 additions and 28 deletions

View File

@ -74,9 +74,56 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
spark -> {
Utils.removeOutputDir(spark, workingPath + "publicationextendedaffiliation");
addOrganizations(spark, inputPath, workingPath, outputPath, resultType);
dumpOrganizationAndRelations(spark, inputPath, workingPath, outputPath, resultType);
});
}
private static void dumpOrganizationAndRelations(SparkSession spark, String inputPath, String workingPath,
String outputPath, String resultType) {
Dataset<Relation> relation = Utils
.readPath(spark, inputPath + "/relation", Relation.class)
.filter(
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
r.getSubRelType().equalsIgnoreCase(ModelConstants.AFFILIATION));
Dataset<Organization> organization = Utils
.readPath(spark, inputPath + "/organization", Organization.class)
.filter((FilterFunction<Organization>) o -> !o.getDataInfo().getDeletedbyinference());
Dataset<Result> result = Utils.readPath(spark, workingPath + resultType, Result.class);
// result -> organization takes the relation of type affiliation having the source in the results related to
// EOSC
Dataset<Relation> eoscRelation = result
.joinWith(relation, result.col("id").equalTo(relation.col("source")))
.map((MapFunction<Tuple2<Result, Relation>, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class));
System.out.println(eoscRelation.count());
// from eoscRelation select the organization
eoscRelation
.joinWith(organization, eoscRelation.col("target").equalTo(organization.col("id")))
.map(
(MapFunction<Tuple2<Relation, Organization>, eu.dnetlib.dhp.eosc.model.Organization>) t2 -> mapOrganization(
t2._2()),
Encoders.bean(eu.dnetlib.dhp.eosc.model.Organization.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "organization");
eoscRelation
.joinWith(organization, eoscRelation.col("target").equalTo(organization.col("id")))
.map(
(MapFunction<Tuple2<Relation, Organization>, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> eu.dnetlib.dhp.eosc.model.Relation
.newInstance(t2._1().getSource(), t2._1().getTarget()),
Encoders.bean(eu.dnetlib.dhp.eosc.model.Relation.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "resultOrganization");
}
private static void addOrganizations(SparkSession spark, String inputPath, String workingPath, String outputPath,
String resultType) {
@ -157,34 +204,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath + "publicationextendedaffiliation");
Dataset<Relation> organizationWithAffiliation = relations
.joinWith(results, relations.col("target").equalTo(results.col("id")))
.map((MapFunction<Tuple2<Relation, Result>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class));
organizationWithAffiliation
.joinWith(organizations, organizationWithAffiliation.col("source").equalTo(organizations.col("id")))
.map(
(MapFunction<Tuple2<Relation, Organization>, eu.dnetlib.dhp.eosc.model.Organization>) t2 -> mapOrganization(
t2._2()),
Encoders.bean(eu.dnetlib.dhp.eosc.model.Organization.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "organization");
organizationWithAffiliation
.joinWith(organizations, organizationWithAffiliation.col("source").equalTo(organizations.col("id")))
.map(
(MapFunction<Tuple2<Relation, Organization>, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> eu.dnetlib.dhp.eosc.model.Relation
.newInstance(t2._1().getSource(), t2._1().getTarget()),
Encoders.bean(eu.dnetlib.dhp.eosc.model.Relation.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "resultOrganization");
.json(workingPath + resultType + "extendedaffiliation");
}

View File

@ -0,0 +1,116 @@
package eu.dnetlib.dhp.oa.graph.dump.eosc;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Optional;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.eosc.model.Organization;
import eu.dnetlib.dhp.eosc.model.Result;
/**
* @author miriam.baglioni
* @Date 25/10/23
*/
public class ExtendAffiliationTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(ExtendAffiliationTest.class);
private static HashMap<String, String> map = new HashMap<>();
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(ExtendAffiliationTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(ExtendAffiliationTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(ExtendAffiliationTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
public void selectEoscResults() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/eosc/input")
.getPath();
final String workingPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/eosc/working/")
.getPath();
final String mdp = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/eosc/working/masterduplicate")
.getPath();
ExtendEoscResultWithOrganizationStep2.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/",
"-sourcePath", sourcePath,
"-resultType", "publication",
"-workingPath", workingPath
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Organization> tmp = sc
.textFile(workingDir.toString() + "/organization")
.map(item -> OBJECT_MAPPER.readValue(item, Organization.class));
System.out.println(tmp.count());
// Assertions.assertEquals(3, tmp.count());
//
// Assertions
// .assertEquals(
// 0,
// tmp
// .filter(r -> Optional.ofNullable(r.getAffiliation()).isPresent() && r.getAffiliation().size() > 0)
// .count());
//
// tmp.foreach(r -> System.out.println(new ObjectMapper().writeValueAsString(r)));
}
}