From 6901ac91b1269fb5e7f19fbfc94820cb90d34a90 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 16 Dec 2022 15:42:49 +0100 Subject: [PATCH] [Clean Country] moving source and resources to master --- .../clean/country/CleanCountrySparkJob.java | 211 ++++++++++ .../country/GetDatasourceFromCountry.java | 106 +++++ .../dhp/oa/graph/clean/oozie_app/workflow.xml | 379 +++++++++++++++++- .../graph/input_clean_country_parameters.json | 49 +++ .../dhp/oa/graph/clean/CleanCountryTest.java | 191 +++++++++ 5 files changed, 927 insertions(+), 9 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java new file mode 100644 index 000000000..5bbfeba8d --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java @@ -0,0 +1,211 @@ + +package eu.dnetlib.dhp.oa.graph.clean.country; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import javax.swing.text.html.Option; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author miriam.baglioni + * @Date 20/07/22 + */ +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob; +import eu.dnetlib.dhp.schema.oaf.Country; +import eu.dnetlib.dhp.schema.oaf.Instance; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.schema.oaf.utils.PidType; + +public class CleanCountrySparkJob implements Serializable { + private static final Logger log = LoggerFactory.getLogger(CleanCountrySparkJob.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + CleanCountrySparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + + String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); + + String datasourcePath = parser.get("hostedBy"); + log.info("datasourcePath: {}", datasourcePath); + + String country = parser.get("country"); + log.info("country: {}", country); + + String[] verifyParam = parser.get("verifyParam").split(";"); + log.info("verifyParam: {}", verifyParam); + + String collectedfrom = parser.get("collectedfrom"); + log.info("collectedfrom: {}", collectedfrom); + + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); + + Class entityClazz = (Class) Class.forName(graphTableClassName); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + + cleanCountry( + spark, country, verifyParam, inputPath, entityClazz, workingDir, collectedfrom, datasourcePath); + }); + } + + private static void cleanCountry(SparkSession spark, String country, String[] verifyParam, + String inputPath, Class entityClazz, String workingDir, String collectedfrom, String datasourcePath) { + + List hostedBy = spark + .read() + .textFile(datasourcePath) + .collectAsList(); + + Dataset res = spark + .read() + .textFile(inputPath) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), + Encoders.bean(entityClazz)); + + res.map((MapFunction) r -> { + if (r.getInstance().stream().anyMatch(i -> hostedBy.contains(i.getHostedby().getKey())) || + !r.getCollectedfrom().stream().anyMatch(cf -> cf.getValue().equals(collectedfrom))) { + return r; + } + + List ids = getPidsAndAltIds(r).collect(Collectors.toList()); + if (ids + .stream() + .anyMatch( + p -> p + .getQualifier() + .getClassid() + .equals(PidType.doi.toString()) && pidInParam(p.getValue(), verifyParam))) { + r + .setCountry( + r + .getCountry() + .stream() + .filter( + c -> toTakeCountry(c, country)) + .collect(Collectors.toList())); + + } + + return r; + }, Encoders.bean(entityClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir); + + spark + .read() + .textFile(workingDir) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), + Encoders.bean(entityClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(inputPath); + } + + private static Stream getPidsAndAltIds(T r) { + final Stream resultPids = Optional + .ofNullable(r.getPid()) + .map(Collection::stream) + .orElse(Stream.empty()); + + final Stream instancePids = Optional + .ofNullable(r.getInstance()) + .map( + instance -> instance + .stream() + .flatMap( + i -> Optional + .ofNullable(i.getPid()) + .map(Collection::stream) + .orElse(Stream.empty()))) + .orElse(Stream.empty()); + + final Stream instanceAltIds = Optional + .ofNullable(r.getInstance()) + .map( + instance -> instance + .stream() + .flatMap( + i -> Optional + .ofNullable(i.getAlternateIdentifier()) + .map(Collection::stream) + .orElse(Stream.empty()))) + .orElse(Stream.empty()); + + return Stream + .concat( + Stream.concat(resultPids, instancePids), + instanceAltIds); + } + + private static boolean pidInParam(String value, String[] verifyParam) { + for (String s : verifyParam) + if (value.startsWith(s)) + return true; + return false; + } + + private static boolean toTakeCountry(Country c, String country) { + // If dataInfo is not set, or dataInfo.inferenceprovenance is not set or not present then it cannot be + // inserted via propagation + if (!Optional.ofNullable(c.getDataInfo()).isPresent()) + return true; + if (!Optional.ofNullable(c.getDataInfo().getInferenceprovenance()).isPresent()) + return true; + return !(c + .getClassid() + .equalsIgnoreCase(country) && + c.getDataInfo().getInferenceprovenance().equals("propagation")); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java new file mode 100644 index 000000000..b89b459df --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java @@ -0,0 +1,106 @@ + +package eu.dnetlib.dhp.oa.graph.clean.country; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import scala.Tuple2; + +/** + * @author miriam.baglioni + * @Date 22/07/22 + */ +public class GetDatasourceFromCountry implements Serializable { + private static final Logger log = LoggerFactory.getLogger(GetDatasourceFromCountry.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + GetDatasourceFromCountry.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/input_datasource_country_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + + String workingPath = parser.get("workingDir"); + log.info("workingDir: {}", workingPath); + + String country = parser.get("country"); + log.info("country: {}", country); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + getDatasourceFromCountry(spark, country, inputPath, workingPath); + }); + } + + private static void getDatasourceFromCountry(SparkSession spark, String country, String inputPath, + String workingDir) { + + Dataset organization = spark + .read() + .textFile(inputPath + "/organization") + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, Organization.class), + Encoders.bean(Organization.class)) + .filter( + (FilterFunction) o -> !o.getDataInfo().getDeletedbyinference() && + o.getCountry().getClassid().length() > 0 && + o.getCountry().getClassid().equals(country)); + + // filtering of the relations taking the non deleted by inference and those with IsProvidedBy as relclass + Dataset relation = spark + .read() + .textFile(inputPath + "/relation") + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, Relation.class), + Encoders.bean(Relation.class)) + .filter( + (FilterFunction) rel -> rel.getRelClass().equalsIgnoreCase(ModelConstants.IS_PROVIDED_BY) && + !rel.getDataInfo().getDeletedbyinference()); + + organization + .joinWith(relation, organization.col("id").equalTo(relation.col("target"))) + .map((MapFunction, String>) t2 -> t2._2().getSource(), Encoders.STRING()) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir); + + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml index 0cf6cdd05..683c2417b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml @@ -14,8 +14,8 @@ the address of the lookUp service - shouldCleanContext - true if the context have to be cleaned + shouldClean + true if the operation of deletion of not needed values from the results have to be performed contextId @@ -30,6 +30,22 @@ It is the constrint to be verified. This time is hardcoded as gcube and it is searched for in the title. If title starts with gcube than the context sobigdata will be removed by the result if present + + verifyCountryParam + 10.17632;10.5061 + It is the constraints to be verified. This time is hardcoded as the starting doi from mendeley and dryad and it is searched for in + the pid value. If the pid value starts with one of the two prefixes, then the country may be removed + + + country + NL + It is the country to be removed from the set of countries if it is present with provenance propagation. The country will not be removed if in one of the isntances there is a datasource with country `country` + + + collectedfrom + NARCIS + the only datasource for which the country NL will be removed from the country list + sparkDriverMemory @@ -296,18 +312,18 @@ - ${wf:conf('shouldCleanContext') eq true} + ${wf:conf('shouldClean') eq true} - + yarn @@ -327,7 +343,7 @@ --inputPath${graphOutputPath}/publication --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication - --workingPath${workingDir}/working/publication + --workingDir${workingDir}/working/publication --contextId${contextId} --verifyParam${verifyParam} @@ -354,7 +370,7 @@ --inputPath${graphOutputPath}/dataset --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset - --workingPath${workingDir}/working/dataset + --workingDir${workingDir}/working/dataset --contextId${contextId} --verifyParam${verifyParam} @@ -381,7 +397,7 @@ --inputPath${graphOutputPath}/otherresearchproduct --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --workingPath${workingDir}/working/otherresearchproduct + --workingDir${workingDir}/working/otherresearchproduct --contextId${contextId} --verifyParam${verifyParam} @@ -408,7 +424,7 @@ --inputPath${graphOutputPath}/software --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software - --workingPath${workingDir}/working/software + --workingDir${workingDir}/working/software --contextId${contextId} --verifyParam${verifyParam} @@ -416,7 +432,352 @@ - + + + + + yarn + cluster + Select datasource ID from country + eu.dnetlib.dhp.oa.graph.clean.country.GetDatasourceFromCountry + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphOutputPath} + --workingDir${workingDir}/working/hostedby + --country${country} + + + + + + + + + + + + + + + yarn + cluster + Clean publication country + eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphOutputPath}/publication + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication + --workingDir${workingDir}/working/publication + --country${country} + --verifyParam${verifyCountryParam} + --hostedBy${workingDir}/working/hostedby + --collectedfrom${collectedfrom} + + + + + + + + yarn + cluster + Clean dataset country + eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphOutputPath}/dataset + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset + --workingDir${workingDir}/working/dataset + --country${country} + --verifyParam${verifyCountryParam} + --hostedBy${workingDir}/working/hostedby + --collectedfrom${collectedfrom} + + + + + + + + yarn + cluster + Clean otherresearchproduct country + eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphOutputPath}/otherresearchproduct + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + --workingDir${workingDir}/working/otherresearchproduct + --country${country} + --verifyParam${verifyCountryParam} + --hostedBy${workingDir}/working/hostedby + --collectedfrom${collectedfrom} + + + + + + + + yarn + cluster + Clean software country + eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphOutputPath}/software + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software + --workingDir${workingDir}/working/software + --country${country} + --verifyParam${verifyCountryParam} + --hostedBy${workingDir}/working/hostedby + --collectedfrom${collectedfrom} + + + + + + + + + + ${wf:conf('shouldClean') eq true} + + + + + + + eu.dnetlib.dhp.oa.graph.clean.MasterDuplicateAction + --postgresUrl${postgresURL} + --postgresUser${postgresUser} + --postgresPassword${postgresPassword} + --hdfsPath${workingDir}/masterduplicate + --hdfsNameNode${nameNode} + + + + + + + + + + + + + + + yarn + cluster + patch publication cfhb + eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphOutputPath}/publication + --resolvedPath${workingDir}/cfHbResolved/publication + --outputPath${workingDir}/cfHbPatched/publication + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication + --masterDuplicatePath${workingDir}/masterduplicate + + + + + + + + yarn + cluster + patch dataset cfhb + eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphOutputPath}/dataset + --resolvedPath${workingDir}/cfHbResolved/dataset + --outputPath${workingDir}/cfHbPatched/dataset + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset + --masterDuplicatePath${workingDir}/masterduplicate + + + + + + + + yarn + cluster + patch otherresearchproduct cfhb + eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphOutputPath}/otherresearchproduct + --resolvedPath${workingDir}/cfHbResolved/otherresearchproduct + --outputPath${workingDir}/cfHbPatched/otherresearchproduct + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + --masterDuplicatePath${workingDir}/masterduplicate + + + + + + + + yarn + cluster + patch software cfhb + eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphOutputPath}/software + --resolvedPath${workingDir}/cfHbResolved/software + --outputPath${workingDir}/cfHbPatched/software + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software + --masterDuplicatePath${workingDir}/masterduplicate + + + + + + + + + + + + + + + + + + + + ${workingDir}/cfHbPatched/publication + ${graphOutputPath}/publication + + + + + + + + + + + ${workingDir}/cfHbPatched/dataset + ${graphOutputPath}/dataset + + + + + + + + + + + ${workingDir}/cfHbPatched/otherresearchproduct + ${graphOutputPath}/otherresearchproduct + + + + + + + + + + + ${workingDir}/cfHbPatched/software + ${graphOutputPath}/software + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json new file mode 100644 index 000000000..b38b5ac9f --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json @@ -0,0 +1,49 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "in", + "paramLongName": "inputPath", + "paramDescription": "the path to the graph data dump to read", + "paramRequired": true + }, + { + "paramName": "wd", + "paramLongName": "workingDir", + "paramDescription": "the path to store the output graph", + "paramRequired": true + }, + { + "paramName": "c", + "paramLongName": "country", + "paramDescription": "the id of the context to be removed", + "paramRequired": true + }, + { + "paramName": "class", + "paramLongName": "graphTableClassName", + "paramDescription": "class name moelling the graph table", + "paramRequired": true + },{ + "paramName": "vf", + "paramLongName": "verifyParam", + "paramDescription": "the parameter to be verified to remove the country", + "paramRequired": true +}, + { + "paramName": "cf", + "paramLongName": "collectedfrom", + "paramDescription": "the collectedfrom value for which we should apply the cleaning", + "paramRequired": true + }, + { + "paramName": "hb", + "paramLongName": "hostedBy", + "paramDescription": "the set of datasources having the specified country in the graph searched for in the hostedby of the results", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java new file mode 100644 index 000000000..d1c186308 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java @@ -0,0 +1,191 @@ + +package eu.dnetlib.dhp.oa.graph.clean; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import eu.dnetlib.dhp.schema.oaf.Dataset; +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author miriam.baglioni + * @Date 20/07/22 + */ +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob; +import eu.dnetlib.dhp.schema.oaf.Publication; + +public class CleanCountryTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + + private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(CleanCountryTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(CleanCountryTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(CleanCountryTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void testResultClean() throws Exception { + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_country.json") + .getPath(); + + spark + .read() + .textFile(sourcePath) + .map( + (MapFunction) r -> OBJECT_MAPPER.readValue(r, Publication.class), + Encoders.bean(Publication.class)) + .write() + .json(workingDir.toString() + "/publication"); + + CleanCountrySparkJob.main(new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--inputPath", workingDir.toString() + "/publication", + "--graphTableClassName", Publication.class.getCanonicalName(), + "--workingDir", workingDir.toString() + "/working", + "--country", "NL", + "--verifyParam", "10.17632", + "--collectedfrom", "NARCIS", + "--hostedBy", getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy") + .getPath() + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/publication") + .map(item -> OBJECT_MAPPER.readValue(item, Publication.class)); + + Assertions.assertEquals(8, tmp.count()); + + // original result with NL country and doi starting with Mendely prefix, but not collectedfrom NARCIS + Assertions + .assertEquals( + 1, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95")) + .collect() + .get(0) + .getCountry() + .size()); + + // original result with NL country and pid not starting with Mendely prefix + Assertions + .assertEquals( + 1, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")) + .collect() + .get(0) + .getCountry() + .size()); + + // original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS but not + // inserted with propagation + Assertions + .assertEquals( + 1, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")) + .collect() + .get(0) + .getCountry() + .size()); + + // original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS inserted with + // propagation + Assertions + .assertEquals( + 0, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6ag")) + .collect() + .get(0) + .getCountry() + .size()); + } + + @Test + public void testDatasetClean() throws Exception { + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/clean/dataset_clean_country.json") + .getPath(); + + spark + .read() + .textFile(sourcePath) + .map( + (MapFunction) r -> OBJECT_MAPPER.readValue(r, Dataset.class), + Encoders.bean(Dataset.class)) + .write() + .json(workingDir.toString() + "/dataset"); + + CleanCountrySparkJob.main(new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--inputPath", workingDir.toString() + "/dataset", + "-graphTableClassName", Dataset.class.getCanonicalName(), + "-workingDir", workingDir.toString() + "/working", + "-country", "NL", + "-verifyParam", "10.17632", + "-collectedfrom", "NARCIS", + "-hostedBy", getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy") + .getPath() + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/dataset") + .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); + + Assertions.assertEquals(1, tmp.count()); + + Assertions.assertEquals(0, tmp.first().getCountry().size()); + + + } + +}