From a12d28c6444616ac914d79b43e5360c4e92d714b Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 22 Jul 2022 16:23:12 +0200 Subject: [PATCH] [Clean Country] added logic not to remove country from result if it exist a hosting datasource with that country. Moreover the country will be removed only if added with propagation --- .../{ => country}/CleanCountrySparkJob.java | 40 ++++++-- .../country/GetDatasourceFromCountry.java | 99 +++++++++++++++++++ .../dhp/oa/graph/clean/CleanCountryTest.java | 3 +- 3 files changed, 130 insertions(+), 12 deletions(-) rename dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/{ => country}/CleanCountrySparkJob.java (75%) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountrySparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java similarity index 75% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountrySparkJob.java rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java index ec82d385c..1af1aafc0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountrySparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.oa.graph.clean; +package eu.dnetlib.dhp.oa.graph.clean.country; /** * @author miriam.baglioni @@ -7,6 +7,7 @@ package eu.dnetlib.dhp.oa.graph.clean; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import org.apache.commons.io.IOUtils; @@ -21,6 +22,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -54,10 +56,13 @@ public class CleanCountrySparkJob implements Serializable { String workingPath = parser.get("workingPath"); log.info("workingPath: {}", workingPath); + String datasourcePath = parser.get("datasourcePath"); + log.info("datasourcePath: {}", datasourcePath); + String country = parser.get("country"); log.info("country: {}", country); - String verifyParam = parser.get("verifyParam"); + String[] verifyParam = parser.get("verifyParam").split(";"); log.info("verifyParam: {}", verifyParam); String collectedfrom = parser.get("collectedfrom"); @@ -74,12 +79,17 @@ public class CleanCountrySparkJob implements Serializable { isSparkSessionManaged, spark -> { - cleanCountry(spark, country, verifyParam, inputPath, entityClazz, workingPath,collectedfrom); + cleanCountry(spark, country, verifyParam, inputPath, entityClazz, workingPath,collectedfrom, datasourcePath); }); } - private static void cleanCountry(SparkSession spark, String country, String verifyParam, - String inputPath, Class entityClazz, String workingPath, String collectedfrom) { + private static void cleanCountry(SparkSession spark, String country, String[] verifyParam, + String inputPath, Class entityClazz, String workingPath, String collectedfrom, String datasourcePath) { + + List hostedBy = spark.read().textFile(datasourcePath) + // .filter((FilterFunction) ds -> !ds.equals(collectedfrom)) + .collectAsList(); + Dataset res = spark .read() .textFile(inputPath) @@ -89,11 +99,14 @@ public class CleanCountrySparkJob implements Serializable { ; res.map((MapFunction) r -> { - for(StructuredProperty p: r.getPid()){ - if (p.getQualifier().getClassid().equalsIgnoreCase("doi") && - p.getValue().startsWith(verifyParam) && r.getCollectedfrom().stream().anyMatch(cf -> cf.getValue().equalsIgnoreCase(collectedfrom))) { + if(r.getInstance().stream().anyMatch(i -> hostedBy.contains(i.getHostedby().getKey())) || + !r.getCollectedfrom().stream().anyMatch(cf -> cf.getValue().equals(collectedfrom))){ + return r; + } - r + if(r.getPid().stream().anyMatch(p -> p.getQualifier().getClassid().equals("doi") && pidInParam(p.getValue(), verifyParam)) + && r.getCountry().stream().anyMatch(c -> c.getClassid().equals(country) && c.getDataInfo().getInferenceprovenance().equals("propagation"))) + { r .setCountry( r .getCountry() @@ -102,7 +115,7 @@ public class CleanCountrySparkJob implements Serializable { c -> !c.getClassid() .equalsIgnoreCase(country)) .collect(Collectors.toList())); - } + } return r; @@ -124,5 +137,12 @@ public class CleanCountrySparkJob implements Serializable { .json(inputPath); } + private static boolean pidInParam(String value, String[] verifyParam) { + for (String s : verifyParam ) + if (value.startsWith(s)) + return true; + return false; + } + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java new file mode 100644 index 000000000..cdca96107 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java @@ -0,0 +1,99 @@ +package eu.dnetlib.dhp.oa.graph.clean.country; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.Serializable; +import java.util.List; +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +/** + * @author miriam.baglioni + * @Date 22/07/22 + */ +public class GetDatasourceFromCountry implements Serializable { + private static final Logger log = LoggerFactory.getLogger(GetDatasourceFromCountry.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + GetDatasourceFromCountry.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + + String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + String country = parser.get("country"); + log.info("country: {}", country); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + + getDatasourceFromCountry(spark, country, inputPath, workingPath); + }); + } + + private static void getDatasourceFromCountry(SparkSession spark, String country, String inputPath, String workingPath) { + + Dataset organization = spark.read().textFile(inputPath + "/organization") + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, Organization.class), + Encoders.bean(Organization.class)) + .filter( + (FilterFunction) o -> !o.getDataInfo().getDeletedbyinference() && + o.getCountry().getClassid().length() > 0 && + o.getCountry().getClassid().equals(country));; + + + // filtering of the relations taking the non deleted by inference and those with IsProvidedBy as relclass + Dataset relation = spark.read().textFile( inputPath + "/relation") + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, Relation.class), + Encoders.bean(Relation.class)) + .filter( + (FilterFunction) rel -> rel.getRelClass().equalsIgnoreCase(ModelConstants.IS_PROVIDED_BY) && + !rel.getDataInfo().getDeletedbyinference()); + + organization.joinWith(relation, organization.col("id").equalTo(relation.col("target")), "left") + .map((MapFunction, String>) t2 -> t2._2().getSource(), Encoders.STRING() ) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(workingPath); + + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java index f9dad9fcf..a72093156 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java @@ -5,9 +5,9 @@ package eu.dnetlib.dhp.oa.graph.clean; */ import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob; import eu.dnetlib.dhp.oa.graph.dump.DumpJobTest; import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -25,7 +25,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.List; public class CleanCountryTest {