diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/country/SparkFindResultsRelatedToCountry.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/country/SparkFindResultsRelatedToCountry.java new file mode 100644 index 0000000..05ac3b0 --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/country/SparkFindResultsRelatedToCountry.java @@ -0,0 +1,157 @@ +package eu.dnetlib.dhp.oa.graph.dump.country; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.Node; +import org.dom4j.io.SAXReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xml.sax.SAXException; +import scala.Tuple2; + +import java.io.Serializable; +import java.io.StringReader; +import java.util.*; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +/** + * @author miriam.baglioni + * @Date 27/04/23 + * Finds the results id which are in relation with another entity having the given country + * or that have that country in the country list + */ +public class SparkFindResultsRelatedToCountry implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkFindResultsRelatedToCountry.class); + + public static final String COMPRESSION = "compression"; + public static final String GZIP = "gzip"; + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkFindResultsRelatedToCountry.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/input_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + + final String country = parser.get("country"); + + + run( + isSparkSessionManaged, inputPath, outputPath, country); + + } + + private static void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, + String country) { + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + findRelatedEntities( + spark, inputPath, outputPath, country); + }); + + } + + public static void findRelatedEntities( + SparkSession spark, + String inputPath, + String outputPath, + String country) { + + Dataset projectsInCountry = Utils.readPath(spark, inputPath + "/project", Project.class) + .filter((FilterFunction) p -> isCountryInFunderJurisdiction(p.getFundingtree(), country)); + + Dataset relsProjectResults = Utils.readPath(spark, inputPath + "/relation", Relation.class) + .filter((FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && + r.getRelClass().equals(ModelConstants.PRODUCES)); + + projectsInCountry.joinWith(relsProjectResults, projectsInCountry.col("id").equalTo(relsProjectResults.col("source"))) + .map((MapFunction, String>) t2 -> t2._2().getTarget(), Encoders.STRING()) + .write() + .option(COMPRESSION, GZIP) + .mode(SaveMode.Overwrite) + .json(outputPath ); + + + Dataset organizationsInCountry = Utils.readPath(spark, inputPath + "/organization", Organization.class) + .filter((FilterFunction) o -> o.getCountry().getClassid().equals(country)); + + Dataset relsOrganizationResults = Utils.readPath(spark, inputPath + "/relation", Relation.class) + .filter((FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && + r.getRelClass().equals(ModelConstants.IS_AUTHOR_INSTITUTION_OF)); + + organizationsInCountry.joinWith(relsOrganizationResults, organizationsInCountry.col("id").equalTo(relsOrganizationResults.col("source"))) + .map((MapFunction, String>) t2 -> t2._2().getTarget(), Encoders.STRING()) + .write() + .option(COMPRESSION, GZIP) + .mode(SaveMode.Append) + .json(outputPath ); + + selectResultWithCountry(spark, inputPath, outputPath, country, "publication", Publication.class); + selectResultWithCountry(spark, inputPath, outputPath, country, "dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); + selectResultWithCountry(spark, inputPath, outputPath, country, "software", Software.class); + selectResultWithCountry(spark, inputPath, outputPath, country, "otherresearchproduct", OtherResearchProduct.class); + + } + + private static void selectResultWithCountry(SparkSession spark, String inputPath, String outputPath, String country, String type, Class inputClazz) { + Utils.readPath(spark, inputPath + "/" + type, inputClazz) + .filter((FilterFunction) p -> !p.getDataInfo().getDeletedbyinference() && !p.getDataInfo().getInvisible() && + p.getCountry().stream().anyMatch(c -> c.getClassid().equals(country))) + .map((MapFunction) p -> p.getId(), Encoders.STRING() ) + .write() + .option(COMPRESSION, GZIP) + .mode(SaveMode.Append) + .json(outputPath); + } + + private static boolean isCountryInFunderJurisdiction(List> fundingtrees, String country) { + try { + final SAXReader reader = new SAXReader(); + reader.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true); + for (Field fundingtree: fundingtrees){ + final Document doc = reader.read(new StringReader(fundingtree.getValue())); + if(((Node) (doc.selectNodes("//funder/jurisdiction").get(0))).getText().equals(country)){ + return true; + } + } + return false; + } catch (DocumentException | SAXException e) { + throw new IllegalArgumentException(e); + } + } + + +}