From c07857fa37d18c991fc7ea27216d008912e66d38 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 23 Mar 2023 15:57:47 +0100 Subject: [PATCH] [graph cleaning] unit tests & cleanup --- .../oa/graph/clean/CleanContextSparkJob.java | 122 -- .../oa/graph/clean/CleanGraphSparkJob.java | 85 +- .../GetDatasourceFromCountry.java | 4 +- .../graph/clean/{cfhb => }/IdCfHbMapping.java | 2 +- .../graph/clean/cfhb/CleanCfHbSparkJob.java | 227 --- .../clean/country/CleanCountrySparkJob.java | 211 -- .../dhp/oa/graph/clean/oozie_app/workflow.xml | 2 +- .../dhp/oa/graph/clean/CleanContextTest.java | 289 --- .../dhp/oa/graph/clean/CleanCountryTest.java | 190 -- .../graph/clean/CleanGraphSparkJobTest.java | 1707 +++++++++-------- .../clean/cfhb/CleanCfHbSparkJobTest.java | 213 -- 11 files changed, 944 insertions(+), 2108 deletions(-) delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java rename dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/{country => }/GetDatasourceFromCountry.java (96%) rename dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/{cfhb => }/IdCfHbMapping.java (94%) delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextTest.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java deleted file mode 100644 index 10a3d4465..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java +++ /dev/null @@ -1,122 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.clean; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.Serializable; -import java.util.Optional; -import java.util.stream.Collectors; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.Result; - -public class CleanContextSparkJob implements Serializable { - private static final Logger log = LoggerFactory.getLogger(CleanContextSparkJob.class); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - public static void main(String[] args) throws Exception { - - String jsonConfiguration = IOUtils - .toString( - CleanContextSparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/input_clean_context_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - String inputPath = parser.get("inputPath"); - log.info("inputPath: {}", inputPath); - - String workingDir = parser.get("workingDir"); - log.info("workingDir: {}", workingDir); - - String contextId = parser.get("contextId"); - log.info("contextId: {}", contextId); - - String verifyParam = parser.get("verifyParam"); - log.info("verifyParam: {}", verifyParam); - - String graphTableClassName = parser.get("graphTableClassName"); - log.info("graphTableClassName: {}", graphTableClassName); - - Class entityClazz = (Class) Class.forName(graphTableClassName); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - - cleanContext(spark, contextId, verifyParam, inputPath, entityClazz, workingDir); - }); - } - - private static void cleanContext(SparkSession spark, String contextId, String verifyParam, - String inputPath, Class entityClazz, String workingDir) { - Dataset res = spark - .read() - .textFile(inputPath) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), - Encoders.bean(entityClazz)); - - res.map((MapFunction) r -> { - if (!r - .getTitle() - .stream() - .filter( - t -> t - .getQualifier() - .getClassid() - .equalsIgnoreCase(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid())) - .anyMatch(t -> t.getValue().toLowerCase().startsWith(verifyParam.toLowerCase()))) { - return r; - } - r - .setContext( - r - .getContext() - .stream() - .filter( - c -> !c.getId().split("::")[0] - .equalsIgnoreCase(contextId)) - .collect(Collectors.toList())); - return r; - }, Encoders.bean(entityClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingDir); - - spark - .read() - .textFile(workingDir) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), - Encoders.bean(entityClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(inputPath); - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index 23a56a445..e97ff3cb2 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -4,10 +4,8 @@ package eu.dnetlib.dhp.oa.graph.clean; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.*; -import java.util.stream.Collectors; import java.util.stream.Stream; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; @@ -29,8 +27,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.action.model.MasterDuplicate; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; -import eu.dnetlib.dhp.oa.graph.clean.cfhb.IdCfHbMapping; -import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Oaf; @@ -38,6 +34,7 @@ import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions; import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; @@ -55,17 +52,17 @@ public class CleanGraphSparkJob { public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils - .toString( - CleanGraphSparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json")); + .toString( + CleanGraphSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); String isLookupUrl = parser.get("isLookupUrl"); @@ -76,7 +73,8 @@ public class CleanGraphSparkJob { new CleanGraphSparkJob(parser).run(isSparkSessionManaged, isLookup); } - public void run(Boolean isSparkSessionManaged, ISLookUpService isLookUpService) throws ISLookUpException, ClassNotFoundException { + public void run(Boolean isSparkSessionManaged, ISLookUpService isLookUpService) + throws ISLookUpException, ClassNotFoundException { String inputPath = parser.get("inputPath"); log.info("inputPath: {}", inputPath); @@ -99,9 +97,10 @@ public class CleanGraphSparkJob { String country = parser.get("country"); log.info("country: {}", country); - String[] verifyCountryParam = Optional.ofNullable(parser.get("verifyCountryParam")) - .map(s -> s.split(";")) - .orElse(new String[]{}); + String[] verifyCountryParam = Optional + .ofNullable(parser.get("verifyCountryParam")) + .map(s -> s.split(";")) + .orElse(new String[] {}); log.info("verifyCountryParam: {}", verifyCountryParam); String collectedfrom = parser.get("collectedfrom"); @@ -111,9 +110,9 @@ public class CleanGraphSparkJob { log.info("masterDuplicatePath: {}", dsMasterDuplicatePath); Boolean deepClean = Optional - .ofNullable(parser.get("deepClean")) - .map(Boolean::valueOf) - .orElse(Boolean.FALSE); + .ofNullable(parser.get("deepClean")) + .map(Boolean::valueOf) + .orElse(Boolean.FALSE); log.info("deepClean: {}", deepClean); Class entityClazz = (Class) Class.forName(graphTableClassName); @@ -123,14 +122,14 @@ public class CleanGraphSparkJob { SparkConf conf = new SparkConf(); conf.setAppName(CleanGraphSparkJob.class.getSimpleName() + "#" + entityClazz.getSimpleName()); runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - cleanGraphTable( - spark, vocs, inputPath, entityClazz, outputPath, contextId, verifyParam, datasourcePath, country, - verifyCountryParam, collectedfrom, dsMasterDuplicatePath, deepClean); - }); + conf, + isSparkSessionManaged, + spark -> { + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + cleanGraphTable( + spark, vocs, inputPath, entityClazz, outputPath, contextId, verifyParam, datasourcePath, country, + verifyCountryParam, collectedfrom, dsMasterDuplicatePath, deepClean); + }); } private static void cleanGraphTable( @@ -172,33 +171,33 @@ public class CleanGraphSparkJob { .map(as(clazz), Encoders.bean(clazz)) .flatMap(flattenCfHbFn(), Encoders.bean(IdCfHbMapping.class)); - // set the EMPTY master ID/NAME and save it - resolved + // set the EMPTY master ID/NAME + Dataset resolvedDs = resolved .joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId"))) .map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class)) .filter((FilterFunction) m -> Objects.nonNull(m.getMasterId())); // load the hostedby mapping Set hostedBy = Sets - .newHashSet( - spark - .read() - .textFile(datasourcePath) - .collectAsList()); + .newHashSet( + spark + .read() + .textFile(datasourcePath) + .collectAsList()); // perform the deep cleaning steps final Dataset cleaned_deep = cleaned_basic - .map( - (MapFunction) value -> GraphCleaningFunctions.cleanContext(value, contextId, verifyParam), - Encoders.bean(clazz)) - .map( - (MapFunction) value -> GraphCleaningFunctions - .cleanCountry(value, verifyCountryParam, hostedBy, collectedfrom, country), - Encoders.bean(clazz)); + .map( + (MapFunction) value -> GraphCleaningFunctions.cleanContext(value, contextId, verifyParam), + Encoders.bean(clazz)) + .map( + (MapFunction) value -> GraphCleaningFunctions + .cleanCountry(value, verifyCountryParam, hostedBy, collectedfrom, country), + Encoders.bean(clazz)); // Join the results with the resolved CF|HB mapping, apply the mapping and save it cleaned_deep - .joinWith(resolved, cleaned_deep.col("id").equalTo(resolved.col("resultId")), "left") + .joinWith(resolvedDs, cleaned_deep.col("id").equalTo(resolvedDs.col("resultId")), "left") .groupByKey( (MapFunction, String>) t -> ((Result) t._1()).getId(), Encoders.STRING()) .mapGroups(getMapGroupsFunction(), Encoders.bean(clazz)) @@ -302,8 +301,8 @@ public class CleanGraphSparkJob { private Stream filter(List kvs) { return kvs - .stream() - .filter(kv -> StringUtils.isNotBlank(kv.getKey()) && StringUtils.isNotBlank(kv.getValue())); + .stream() + .filter(kv -> StringUtils.isNotBlank(kv.getKey()) && StringUtils.isNotBlank(kv.getValue())); } private void updateKeyValue(final KeyValue kv, final IdCfHbMapping a) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/GetDatasourceFromCountry.java similarity index 96% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/GetDatasourceFromCountry.java index 598fccdd7..a69b1a8bf 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/GetDatasourceFromCountry.java @@ -1,10 +1,9 @@ -package eu.dnetlib.dhp.oa.graph.clean.country; +package eu.dnetlib.dhp.oa.graph.clean; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; -import java.util.List; import java.util.Optional; import org.apache.commons.io.IOUtils; @@ -21,7 +20,6 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/IdCfHbMapping.java similarity index 94% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/IdCfHbMapping.java index fad1129c5..a560360ba 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/IdCfHbMapping.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.oa.graph.clean.cfhb; +package eu.dnetlib.dhp.oa.graph.clean; import java.io.Serializable; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java deleted file mode 100644 index 531b415ed..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java +++ /dev/null @@ -1,227 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.clean.cfhb; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.util.Iterator; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.MapGroupsFunction; -import org.apache.spark.sql.*; -import org.apache.spark.sql.expressions.Aggregator; -import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.common.action.model.MasterDuplicate; -import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob; -import eu.dnetlib.dhp.schema.oaf.Instance; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.utils.DHPUtils; -import scala.Tuple2; - -public class CleanCfHbSparkJob { - - private static final Logger log = LoggerFactory.getLogger(CleanCfHbSparkJob.class); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - public static void main(String[] args) throws Exception { - - String jsonConfiguration = IOUtils - .toString( - CleanCountrySparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - String inputPath = parser.get("inputPath"); - log.info("inputPath: {}", inputPath); - - String resolvedPath = parser.get("resolvedPath"); - log.info("resolvedPath: {}", resolvedPath); - - String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - String dsMasterDuplicatePath = parser.get("masterDuplicatePath"); - log.info("masterDuplicatePath: {}", dsMasterDuplicatePath); - - String graphTableClassName = parser.get("graphTableClassName"); - log.info("graphTableClassName: {}", graphTableClassName); - - Class entityClazz = (Class) Class.forName(graphTableClassName); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - HdfsSupport.remove(resolvedPath, spark.sparkContext().hadoopConfiguration()); - cleanCfHb( - spark, inputPath, entityClazz, resolvedPath, dsMasterDuplicatePath, outputPath); - }); - } - - private static void cleanCfHb(SparkSession spark, String inputPath, Class entityClazz, - String resolvedPath, String masterDuplicatePath, String outputPath) { - - // read the master-duplicate tuples - Dataset md = spark - .read() - .textFile(masterDuplicatePath) - .map(as(MasterDuplicate.class), Encoders.bean(MasterDuplicate.class)); - - // prepare the resolved CF|HB references with the corresponding EMPTY master ID - Dataset resolved = spark - .read() - .textFile(inputPath) - .map(as(entityClazz), Encoders.bean(entityClazz)) - .flatMap(flattenCfHbFn(), Encoders.bean(IdCfHbMapping.class)); - - // set the EMPTY master ID/NAME and save it - resolved - .joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId"))) - .map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class)) - .filter((FilterFunction) m -> Objects.nonNull(m.getMasterId())) - .write() - .mode(SaveMode.Overwrite) - .json(resolvedPath); - - // read again the resolved CF|HB mapping - Dataset resolvedDS = spark - .read() - .textFile(resolvedPath) - .map(as(IdCfHbMapping.class), Encoders.bean(IdCfHbMapping.class)); - - // read the result table - Dataset res = spark - .read() - .textFile(inputPath) - .map(as(entityClazz), Encoders.bean(entityClazz)); - - // Join the results with the resolved CF|HB mapping, apply the mapping and save it - res - .joinWith(resolvedDS, res.col("id").equalTo(resolvedDS.col("resultId")), "left") - .groupByKey((MapFunction, String>) t -> t._1().getId(), Encoders.STRING()) - .mapGroups(getMapGroupsFunction(), Encoders.bean(entityClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); - } - - private static MapFunction, IdCfHbMapping> asIdCfHbMapping() { - return t -> { - final IdCfHbMapping mapping = t._1(); - Optional - .ofNullable(t._2()) - .ifPresent(t2 -> { - mapping.setMasterId(t2.getMasterId()); - mapping.setMasterName(t2.getMasterName()); - - }); - return mapping; - }; - } - - private static FlatMapFunction flattenCfHbFn() { - return r -> Stream - .concat( - Optional - .ofNullable(r.getCollectedfrom()) - .map(cf -> cf.stream().map(KeyValue::getKey)) - .orElse(Stream.empty()), - Stream - .concat( - Optional - .ofNullable(r.getInstance()) - .map( - instances -> instances - .stream() - .map(i -> Optional.ofNullable(i.getHostedby()).map(KeyValue::getKey).orElse(""))) - .orElse(Stream.empty()) - .filter(StringUtils::isNotBlank), - Optional - .ofNullable(r.getInstance()) - .map( - instances -> instances - .stream() - .map( - i -> Optional - .ofNullable(i.getCollectedfrom()) - .map(KeyValue::getKey) - .orElse(""))) - .orElse(Stream.empty()) - .filter(StringUtils::isNotBlank))) - .distinct() - .filter(StringUtils::isNotBlank) - .map(cfHb -> asIdCfHbMapping(r.getId(), cfHb)) - .iterator(); - } - - private static MapGroupsFunction, T> getMapGroupsFunction() { - return new MapGroupsFunction, T>() { - @Override - public T call(String key, Iterator> values) { - final Tuple2 first = values.next(); - final T res = first._1(); - - updateResult(res, first._2()); - values.forEachRemaining(t -> updateResult(res, t._2())); - return res; - } - - private void updateResult(T res, IdCfHbMapping m) { - if (Objects.nonNull(m)) { - res.getCollectedfrom().forEach(kv -> updateKeyValue(kv, m)); - res.getInstance().forEach(i -> { - updateKeyValue(i.getHostedby(), m); - updateKeyValue(i.getCollectedfrom(), m); - }); - } - } - - private void updateKeyValue(final KeyValue kv, final IdCfHbMapping a) { - if (kv.getKey().equals(a.getCfhb())) { - kv.setKey(a.getMasterId()); - kv.setValue(a.getMasterName()); - } - } - - }; - } - - private static IdCfHbMapping asIdCfHbMapping(String resultId, String cfHb) { - IdCfHbMapping m = new IdCfHbMapping(resultId); - m.setCfhb(cfHb); - return m; - } - - private static MapFunction as(Class clazz) { - return s -> OBJECT_MAPPER.readValue(s, clazz); - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java deleted file mode 100644 index 37e693de9..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java +++ /dev/null @@ -1,211 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.clean.country; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.Serializable; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import javax.swing.text.html.Option; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author miriam.baglioni - * @Date 20/07/22 - */ -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob; -import eu.dnetlib.dhp.schema.oaf.Country; -import eu.dnetlib.dhp.schema.oaf.Instance; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -import eu.dnetlib.dhp.schema.oaf.utils.PidType; - -public class CleanCountrySparkJob implements Serializable { - private static final Logger log = LoggerFactory.getLogger(CleanCountrySparkJob.class); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - public static void main(String[] args) throws Exception { - - String jsonConfiguration = IOUtils - .toString( - CleanCountrySparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - String inputPath = parser.get("inputPath"); - log.info("inputPath: {}", inputPath); - - String workingDir = parser.get("workingDir"); - log.info("workingDir: {}", workingDir); - - String datasourcePath = parser.get("hostedBy"); - log.info("datasourcePath: {}", datasourcePath); - - String country = parser.get("country"); - log.info("country: {}", country); - - String[] verifyParam = parser.get("verifyParam").split(";"); - log.info("verifyParam: {}", verifyParam); - - String collectedfrom = parser.get("collectedfrom"); - log.info("collectedfrom: {}", collectedfrom); - - String graphTableClassName = parser.get("graphTableClassName"); - log.info("graphTableClassName: {}", graphTableClassName); - - Class entityClazz = (Class) Class.forName(graphTableClassName); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - - cleanCountry( - spark, country, verifyParam, inputPath, entityClazz, workingDir, collectedfrom, datasourcePath); - }); - } - - private static void cleanCountry(SparkSession spark, String country, String[] verifyParam, - String inputPath, Class entityClazz, String workingDir, String collectedfrom, String datasourcePath) { - - List hostedBy = spark - .read() - .textFile(datasourcePath) - .collectAsList(); - - Dataset res = spark - .read() - .textFile(inputPath) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), - Encoders.bean(entityClazz)); - - res.map((MapFunction) r -> { - if (r.getInstance().stream().anyMatch(i -> hostedBy.contains(i.getHostedby().getKey())) || - !r.getCollectedfrom().stream().anyMatch(cf -> cf.getValue().equals(collectedfrom))) { - return r; - } - - List ids = getPidsAndAltIds(r).collect(Collectors.toList()); - if (ids - .stream() - .anyMatch( - p -> p - .getQualifier() - .getClassid() - .equals(PidType.doi.toString()) && pidInParam(p.getValue(), verifyParam))) { - r - .setCountry( - r - .getCountry() - .stream() - .filter( - c -> toTakeCountry(c, country)) - .collect(Collectors.toList())); - - } - - return r; - }, Encoders.bean(entityClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingDir); - - spark - .read() - .textFile(workingDir) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), - Encoders.bean(entityClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(inputPath); - } - - private static Stream getPidsAndAltIds(T r) { - final Stream resultPids = Optional - .ofNullable(r.getPid()) - .map(Collection::stream) - .orElse(Stream.empty()); - - final Stream instancePids = Optional - .ofNullable(r.getInstance()) - .map( - instance -> instance - .stream() - .flatMap( - i -> Optional - .ofNullable(i.getPid()) - .map(Collection::stream) - .orElse(Stream.empty()))) - .orElse(Stream.empty()); - - final Stream instanceAltIds = Optional - .ofNullable(r.getInstance()) - .map( - instance -> instance - .stream() - .flatMap( - i -> Optional - .ofNullable(i.getAlternateIdentifier()) - .map(Collection::stream) - .orElse(Stream.empty()))) - .orElse(Stream.empty()); - - return Stream - .concat( - Stream.concat(resultPids, instancePids), - instanceAltIds); - } - - private static boolean pidInParam(String value, String[] verifyParam) { - for (String s : verifyParam) - if (value.startsWith(s)) - return true; - return false; - } - - private static boolean toTakeCountry(Country c, String country) { - // If dataInfo is not set, or dataInfo.inferenceprovenance is not set or not present then it cannot be - // inserted via propagation - if (!Optional.ofNullable(c.getDataInfo()).isPresent()) - return true; - if (!Optional.ofNullable(c.getDataInfo().getInferenceprovenance()).isPresent()) - return true; - return !(c - .getClassid() - .equalsIgnoreCase(country) && - c.getDataInfo().getInferenceprovenance().equals("propagation")); - } - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml index b5179b1fc..505c78c34 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml @@ -99,7 +99,7 @@ yarn cluster Select datasource ID from country - eu.dnetlib.dhp.oa.graph.clean.country.GetDatasourceFromCountry + eu.dnetlib.dhp.oa.graph.clean.GetDatasourceFromCountry dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextTest.java deleted file mode 100644 index 91094f534..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextTest.java +++ /dev/null @@ -1,289 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.clean; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.List; - -import org.apache.commons.io.FileUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; - -public class CleanContextTest { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - private static SparkSession spark; - - private static Path workingDir; - - private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class); - - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files.createTempDirectory(CleanContextTest.class.getSimpleName()); - log.info("using work dir {}", workingDir); - - SparkConf conf = new SparkConf(); - conf.setAppName(CleanContextTest.class.getSimpleName()); - - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.toString()); - conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); - - spark = SparkSession - .builder() - .appName(CleanContextTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } - - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - spark.stop(); - } - - @Test - public void testResultClean() throws Exception { - final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_context.json") - .getPath(); - final String prefix = "gcube "; - - spark - .read() - .textFile(sourcePath) - .map( - (MapFunction) r -> OBJECT_MAPPER.readValue(r, Publication.class), - Encoders.bean(Publication.class)) - .write() - .json(workingDir.toString() + "/publication"); - - CleanContextSparkJob.main(new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--inputPath", workingDir.toString() + "/publication", - "--graphTableClassName", Publication.class.getCanonicalName(), - "--workingDir", workingDir.toString() + "/working", - "--contextId", "sobigdata", - "--verifyParam", "gCube " - }); - - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD tmp = sc - .textFile(workingDir.toString() + "/publication") - .map(item -> OBJECT_MAPPER.readValue(item, Publication.class)); - - Assertions.assertEquals(7, tmp.count()); - - // original result with sobigdata context and gcube as starting string in the main title for the publication - Assertions - .assertEquals( - 0, - tmp - .filter(p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95")) - .collect() - .get(0) - .getContext() - .size()); - - // original result with sobigdata context without gcube as starting string in the main title for the publication - Assertions - .assertEquals( - 1, - tmp - .filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")) - .collect() - .get(0) - .getContext() - .size()); - Assertions - .assertEquals( - "sobigdata::projects::2", - tmp - .filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")) - .collect() - .get(0) - .getContext() - .get(0) - .getId()); - - // original result with sobigdata context with gcube as starting string in the subtitle - Assertions - .assertEquals( - 1, - tmp - .filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")) - .collect() - .get(0) - .getContext() - .size()); - Assertions - .assertEquals( - "sobigdata::projects::2", - tmp - .filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")) - .collect() - .get(0) - .getContext() - .get(0) - .getId()); - List titles = tmp - .filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")) - .collect() - .get(0) - .getTitle(); - Assertions.assertEquals(1, titles.size()); - Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix)); - Assertions.assertEquals("subtitle", titles.get(0).getQualifier().getClassid()); - - // original result with sobigdata context with gcube not as starting string in the main title - Assertions - .assertEquals( - 1, - tmp - .filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")) - .collect() - .get(0) - .getContext() - .size()); - Assertions - .assertEquals( - "sobigdata::projects::1", - tmp - .filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")) - .collect() - .get(0) - .getContext() - .get(0) - .getId()); - titles = tmp - .filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")) - .collect() - .get(0) - .getTitle(); - Assertions.assertEquals(1, titles.size()); - Assertions.assertFalse(titles.get(0).getValue().toLowerCase().startsWith(prefix)); - Assertions.assertTrue(titles.get(0).getValue().toLowerCase().contains(prefix.trim())); - Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid()); - - // original result with sobigdata in context and also other contexts with gcube as starting string for the main - // title - Assertions - .assertEquals( - 1, - tmp - .filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")) - .collect() - .get(0) - .getContext() - .size()); - Assertions - .assertEquals( - "dh-ch", - tmp - .filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")) - .collect() - .get(0) - .getContext() - .get(0) - .getId()); - titles = tmp - .filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")) - .collect() - .get(0) - .getTitle(); - Assertions.assertEquals(1, titles.size()); - Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix)); - Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid()); - - // original result with multiple main title one of which whith gcube as starting string and with 2 contextes - Assertions - .assertEquals( - 1, - tmp - .filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")) - .collect() - .get(0) - .getContext() - .size()); - Assertions - .assertEquals( - "dh-ch", - tmp - .filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")) - .collect() - .get(0) - .getContext() - .get(0) - .getId()); - titles = tmp - .filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")) - .collect() - .get(0) - .getTitle(); - Assertions.assertEquals(2, titles.size()); - Assertions - .assertTrue( - titles - .stream() - .anyMatch( - t -> t.getQualifier().getClassid().equals("main title") - && t.getValue().toLowerCase().startsWith(prefix))); - - // original result without sobigdata in context with gcube as starting string for the main title - Assertions - .assertEquals( - 1, - tmp - .filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")) - .collect() - .get(0) - .getContext() - .size()); - Assertions - .assertEquals( - "dh-ch", - tmp - .filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")) - .collect() - .get(0) - .getContext() - .get(0) - .getId()); - titles = tmp - .filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")) - .collect() - .get(0) - .getTitle(); - Assertions.assertEquals(2, titles.size()); - - Assertions - .assertTrue( - titles - .stream() - .anyMatch( - t -> t.getQualifier().getClassid().equals("main title") - && t.getValue().toLowerCase().startsWith(prefix))); - - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java deleted file mode 100644 index 3bc69cfd1..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java +++ /dev/null @@ -1,190 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.clean; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; - -import org.apache.commons.io.FileUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author miriam.baglioni - * @Date 20/07/22 - */ -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob; -import eu.dnetlib.dhp.schema.oaf.Dataset; -import eu.dnetlib.dhp.schema.oaf.Publication; - -public class CleanCountryTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - private static SparkSession spark; - - private static Path workingDir; - - private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class); - - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files.createTempDirectory(CleanCountryTest.class.getSimpleName()); - log.info("using work dir {}", workingDir); - - SparkConf conf = new SparkConf(); - conf.setAppName(CleanCountryTest.class.getSimpleName()); - - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.toString()); - conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); - - spark = SparkSession - .builder() - .appName(CleanCountryTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } - - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - spark.stop(); - } - - @Test - public void testResultClean() throws Exception { - final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_country.json") - .getPath(); - - spark - .read() - .textFile(sourcePath) - .map( - (MapFunction) r -> OBJECT_MAPPER.readValue(r, Publication.class), - Encoders.bean(Publication.class)) - .write() - .json(workingDir.toString() + "/publication"); - - CleanCountrySparkJob.main(new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--inputPath", workingDir.toString() + "/publication", - "--graphTableClassName", Publication.class.getCanonicalName(), - "--workingDir", workingDir.toString() + "/working", - "--country", "NL", - "--verifyParam", "10.17632", - "--collectedfrom", "NARCIS", - "--hostedBy", getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy") - .getPath() - }); - - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD tmp = sc - .textFile(workingDir.toString() + "/publication") - .map(item -> OBJECT_MAPPER.readValue(item, Publication.class)); - - Assertions.assertEquals(8, tmp.count()); - - // original result with NL country and doi starting with Mendely prefix, but not collectedfrom NARCIS - Assertions - .assertEquals( - 1, - tmp - .filter(p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95")) - .collect() - .get(0) - .getCountry() - .size()); - - // original result with NL country and pid not starting with Mendely prefix - Assertions - .assertEquals( - 1, - tmp - .filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")) - .collect() - .get(0) - .getCountry() - .size()); - - // original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS but not - // inserted with propagation - Assertions - .assertEquals( - 1, - tmp - .filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")) - .collect() - .get(0) - .getCountry() - .size()); - - // original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS inserted with - // propagation - Assertions - .assertEquals( - 0, - tmp - .filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6ag")) - .collect() - .get(0) - .getCountry() - .size()); - } - - @Test - public void testDatasetClean() throws Exception { - final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/clean/dataset_clean_country.json") - .getPath(); - - spark - .read() - .textFile(sourcePath) - .map( - (MapFunction) r -> OBJECT_MAPPER.readValue(r, Dataset.class), - Encoders.bean(Dataset.class)) - .write() - .json(workingDir.toString() + "/dataset"); - - CleanCountrySparkJob.main(new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--inputPath", workingDir.toString() + "/dataset", - "-graphTableClassName", Dataset.class.getCanonicalName(), - "-workingDir", workingDir.toString() + "/working", - "-country", "NL", - "-verifyParam", "10.17632", - "-collectedfrom", "NARCIS", - "-hostedBy", getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy") - .getPath() - }); - - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD tmp = sc - .textFile(workingDir.toString() + "/dataset") - .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); - - Assertions.assertEquals(1, tmp.count()); - - Assertions.assertEquals(0, tmp.first().getCountry().size()); - - } - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJobTest.java index f27169082..5b021af01 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJobTest.java @@ -1,36 +1,8 @@ + package eu.dnetlib.dhp.oa.graph.clean; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; -import eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob; -import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import org.apache.commons.cli.ParseException; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.apache.commons.io.filefilter.*; -import org.apache.commons.lang3.StringUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.ForeachFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.*; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.lenient; import java.io.File; import java.io.IOException; @@ -38,793 +10,912 @@ import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; -import java.util.stream.Stream; -import static org.junit.jupiter.api.Assertions.*; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.lenient; +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.filefilter.FalseFileFilter; +import org.apache.commons.io.filefilter.TrueFileFilter; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @ExtendWith(MockitoExtension.class) public class CleanGraphSparkJobTest { - private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class); + private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJobTest.class); + + public static final ObjectMapper MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + @Mock + private ISLookUpService isLookUpService; + + private VocabularyGroup vocabularies; + + private CleaningRuleMap mapping; + + private static SparkSession spark; + + private static Path testBaseTmpPath; + + private static String graphInputPath; + + private static String graphOutputPath; + + private static String dsMasterDuplicatePath; + + @BeforeAll + public static void beforeAll() throws IOException, URISyntaxException { + testBaseTmpPath = Files.createTempDirectory(CleanGraphSparkJobTest.class.getSimpleName()); + log.info("using test base path {}", testBaseTmpPath); + + File basePath = Paths + .get( + Objects + .requireNonNull( + CleanGraphSparkJobTest.class.getResource("/eu/dnetlib/dhp/oa/graph/clean/graph")) + .toURI()) + .toFile(); + + List paths = FileUtils + .listFilesAndDirs(basePath, FalseFileFilter.FALSE, TrueFileFilter.TRUE) + .stream() + .filter(f -> !f.getAbsolutePath().endsWith("/graph")) + .collect(Collectors.toList()); + + for (File path : paths) { + String type = StringUtils.substringAfterLast(path.getAbsolutePath(), "/"); + FileUtils + .copyDirectory( + path, + testBaseTmpPath.resolve("input").resolve("graph").resolve(type).toFile()); + } + + FileUtils + .copyFileToDirectory( + Paths + .get( + CleanGraphSparkJobTest.class + .getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/masterduplicate.json") + .toURI()) + .toFile(), + testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toFile()); + + graphInputPath = testBaseTmpPath.resolve("input").resolve("graph").toString(); + graphOutputPath = testBaseTmpPath.resolve("output").resolve("graph").toString(); + dsMasterDuplicatePath = testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toString(); + + SparkConf conf = new SparkConf(); + conf.setAppName(CleanGraphSparkJobTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", testBaseTmpPath.toString()); + conf.set("hive.metastore.warehouse.dir", testBaseTmpPath.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .config(conf) + .getOrCreate(); + } + + @BeforeEach + public void setUp() throws ISLookUpException, IOException { + lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs()); + lenient() + .when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY)) + .thenReturn(synonyms()); + + vocabularies = VocabularyGroup.loadVocsFromIS(isLookUpService); + mapping = CleaningRuleMap.create(vocabularies); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(testBaseTmpPath.toFile()); + spark.stop(); + } + + @Test + void testCleanRelations() throws Exception { + + spark + .read() + .textFile(graphInputPath.toString() + "/relation") + .map(as(Relation.class), Encoders.bean(Relation.class)) + .collectAsList() + .forEach( + r -> assertFalse( + vocabularies.getTerms(ModelConstants.DNET_RELATION_RELCLASS).contains(r.getRelClass()))); + + new CleanGraphSparkJob( + args( + "/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json", + new String[] { + "--inputPath", graphInputPath + "/relation", + "--outputPath", graphOutputPath + "/relation", + "--isLookupUrl", "lookupurl", + "--graphTableClassName", Relation.class.getCanonicalName(), + "--deepClean", "false" + })).run(false, isLookUpService); + + spark + .read() + .textFile(graphOutputPath.toString() + "/relation") + .map(as(Relation.class), Encoders.bean(Relation.class)) + .collectAsList() + .forEach(r -> { + + assertTrue(vocabularies.getTerms(ModelConstants.DNET_RELATION_RELCLASS).contains(r.getRelClass())); + assertTrue(vocabularies.getTerms(ModelConstants.DNET_RELATION_SUBRELTYPE).contains(r.getSubRelType())); + + assertEquals("iis", r.getDataInfo().getProvenanceaction().getClassid()); + assertEquals("Inferred by OpenAIRE", r.getDataInfo().getProvenanceaction().getClassname()); + }); + } + + @Test + void testFilter_invisible_true() throws Exception { + + assertNotNull(vocabularies); + assertNotNull(mapping); + + String json = IOUtils + .toString( + Objects + .requireNonNull( + getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/result_invisible.json"))); + Publication p_in = MAPPER.readValue(json, Publication.class); + + assertTrue(p_in instanceof Result); + assertTrue(p_in instanceof Publication); + + assertEquals(true, GraphCleaningFunctions.filter(p_in)); + } + + @Test + void testFilter_true_nothing_to_filter() throws Exception { + + assertNotNull(vocabularies); + assertNotNull(mapping); + + String json = IOUtils + .toString( + Objects + .requireNonNull( + getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/result.json"))); + Publication p_in = MAPPER.readValue(json, Publication.class); + + assertTrue(p_in instanceof Result); + assertTrue(p_in instanceof Publication); + + assertEquals(true, GraphCleaningFunctions.filter(p_in)); + } + + @Test + void testFilter_missing_invisible() throws Exception { + + assertNotNull(vocabularies); + assertNotNull(mapping); + + String json = IOUtils + .toString( + Objects + .requireNonNull( + getClass() + .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/result_missing_invisible.json"))); + Publication p_in = MAPPER.readValue(json, Publication.class); + + assertTrue(p_in instanceof Result); + assertTrue(p_in instanceof Publication); + + assertEquals(true, GraphCleaningFunctions.filter(p_in)); + } + + @Test + void testCleaning_publication() throws Exception { + + final String id = "50|CSC_________::2250a70c903c6ac6e4c01438259e9375"; + + Publication p_in = read(spark, graphInputPath + "/publication", Publication.class) + .filter(String.format("id = '%s'", id)) + .first(); + + assertNull(p_in.getBestaccessright()); + assertTrue(p_in instanceof Result); + assertTrue(p_in instanceof Publication); + + new CleanGraphSparkJob( + args( + "/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json", + new String[] { + "--inputPath", graphInputPath + "/publication", + "--outputPath", graphOutputPath + "/publication", + "--isLookupUrl", "lookupurl", + "--graphTableClassName", Publication.class.getCanonicalName(), + "--deepClean", "false" + })).run(false, isLookUpService); - public static final ObjectMapper MAPPER = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + Publication p = read(spark, graphOutputPath + "/publication", Publication.class) + .filter(String.format("id = '%s'", id)) + .first(); + + assertNull(p.getPublisher()); - @Mock - private ISLookUpService isLookUpService; + assertEquals("und", p.getLanguage().getClassid()); + assertEquals("Undetermined", p.getLanguage().getClassname()); - private VocabularyGroup vocabularies; + assertEquals("DE", p.getCountry().get(0).getClassid()); + assertEquals("Germany", p.getCountry().get(0).getClassname()); - private CleaningRuleMap mapping; + assertEquals("0018", p.getInstance().get(0).getInstancetype().getClassid()); + assertEquals("Annotation", p.getInstance().get(0).getInstancetype().getClassname()); + + assertEquals("0027", p.getInstance().get(1).getInstancetype().getClassid()); + assertEquals("Model", p.getInstance().get(1).getInstancetype().getClassname()); - private static SparkSession spark; - - private static Path workingDir; - - private static Path testBaseTmpPath; - - private static String graphInputPath; - - private static String graphOutputPath; - - private static String dsMasterDuplicatePath; - - @BeforeAll - public static void beforeAll() throws IOException, URISyntaxException { - testBaseTmpPath = Files.createTempDirectory(CleanGraphSparkJobTest.class.getSimpleName()); - log.info("using test base path {}", testBaseTmpPath); - - File basePath = Paths - .get(CleanGraphSparkJobTest.class.getResource("/eu/dnetlib/dhp/oa/graph/clean/graph").toURI()) - .toFile(); - - - List paths = FileUtils - .listFilesAndDirs(basePath, FalseFileFilter.FALSE, TrueFileFilter.TRUE) - .stream() - .filter(f -> !f.getAbsolutePath().endsWith("/graph")) - .collect(Collectors.toList()); - - for(File path : paths) { - String type = StringUtils.substringAfterLast(path.getAbsolutePath(), "/"); - FileUtils - .copyDirectory( - path, - testBaseTmpPath.resolve("input").resolve("graph").resolve(type).toFile()); - } - - FileUtils - .copyFileToDirectory( - Paths - .get( - CleanGraphSparkJobTest.class - .getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/masterduplicate.json") - .toURI()) - .toFile(), - testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toFile()); - - graphInputPath = testBaseTmpPath.resolve("input").resolve("graph").toString(); - graphOutputPath = testBaseTmpPath.resolve("output").resolve("graph").toString(); - dsMasterDuplicatePath = testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toString(); - - - - workingDir = Files.createTempDirectory(CleanGraphSparkJobTest.class.getSimpleName()); - log.info("using work dir {}", workingDir); - - SparkConf conf = new SparkConf(); - conf.setAppName(CleanGraphSparkJobTest.class.getSimpleName()); - - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.toString()); - conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); - - spark = SparkSession - .builder() - .config(conf) - .getOrCreate(); - } - - @BeforeEach - public void setUp() throws ISLookUpException, IOException { - lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs()); - lenient() - .when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY)) - .thenReturn(synonyms()); - - vocabularies = VocabularyGroup.loadVocsFromIS(isLookUpService); - mapping = CleaningRuleMap.create(vocabularies); - } - - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - spark.stop(); - } - - @Test - void testCleanRelations() throws Exception { - - spark.read() - .textFile(graphInputPath.toString() + "/relation") - .map(as(Relation.class), Encoders.bean(Relation.class)) - .collectAsList() - .forEach(r -> assertFalse(vocabularies.getTerms(ModelConstants.DNET_RELATION_RELCLASS).contains(r.getRelClass()))); - - new CleanGraphSparkJob( - args("/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json", - new String[] { - "--inputPath", graphInputPath.toString() + "/relation", - "--outputPath", graphOutputPath.toString() + "/relation", - "--isLookupUrl", "lookupurl", - "--graphTableClassName", Relation.class.getCanonicalName(), - "--deepClean", "false" - })).run(false, isLookUpService); - - spark.read() - .textFile(graphOutputPath.toString() + "/relation") - .map(as(Relation.class), Encoders.bean(Relation.class)) - .collectAsList() - .forEach(r -> { - - assertTrue(vocabularies.getTerms(ModelConstants.DNET_RELATION_RELCLASS).contains(r.getRelClass())); - assertTrue(vocabularies.getTerms(ModelConstants.DNET_RELATION_SUBRELTYPE).contains(r.getSubRelType())); - - assertEquals("iis", r.getDataInfo().getProvenanceaction().getClassid()); - assertEquals("Inferred by OpenAIRE", r.getDataInfo().getProvenanceaction().getClassname()); - }); - } - - @Test - void testFilter_invisible_true() throws Exception { - - assertNotNull(vocabularies); - assertNotNull(mapping); - - String json = IOUtils - .toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/result_invisible.json")); - Publication p_in = MAPPER.readValue(json, Publication.class); - - assertTrue(p_in instanceof Result); - assertTrue(p_in instanceof Publication); - - assertEquals(true, GraphCleaningFunctions.filter(p_in)); - } - - @Test - void testFilter_true_nothing_to_filter() throws Exception { - - assertNotNull(vocabularies); - assertNotNull(mapping); - - String json = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/result.json")); - Publication p_in = MAPPER.readValue(json, Publication.class); - - assertTrue(p_in instanceof Result); - assertTrue(p_in instanceof Publication); - - assertEquals(true, GraphCleaningFunctions.filter(p_in)); - } - - @Test - void testFilter_missing_invisible() throws Exception { - - assertNotNull(vocabularies); - assertNotNull(mapping); - - String json = IOUtils - .toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/result_missing_invisible.json")); - Publication p_in = MAPPER.readValue(json, Publication.class); - - assertTrue(p_in instanceof Result); - assertTrue(p_in instanceof Publication); - - assertEquals(true, GraphCleaningFunctions.filter(p_in)); - } - - @Test - void testCleaning_publication() throws Exception { - - final String id = "50|CSC_________::2250a70c903c6ac6e4c01438259e9375"; - - Publication p_in = read(spark, graphInputPath.toString() + "/publication", Publication.class) - .filter(String.format("id = '%s'", id)) - .first(); - - assertNull(p_in.getBestaccessright()); - assertTrue(p_in instanceof Result); - assertTrue(p_in instanceof Publication); - - new CleanGraphSparkJob( - args("/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json", - new String[] { - "--inputPath", graphInputPath.toString() + "/publication", - "--outputPath", graphOutputPath.toString() + "/publication", - "--isLookupUrl", "lookupurl", - "--graphTableClassName", Publication.class.getCanonicalName(), - "--deepClean", "false" - })).run(false, isLookUpService); - - Publication p = read(spark, graphOutputPath.toString() + "/publication", Publication.class) - .filter(String.format("id = '%s'", id)) - .first(); - - assertNull(p.getPublisher()); - - assertEquals("und", p.getLanguage().getClassid()); - assertEquals("Undetermined", p.getLanguage().getClassname()); - - assertEquals("DE", p.getCountry().get(0).getClassid()); - assertEquals("Germany", p.getCountry().get(0).getClassname()); - - assertEquals("0018", p.getInstance().get(0).getInstancetype().getClassid()); - assertEquals("Annotation", p.getInstance().get(0).getInstancetype().getClassname()); - - assertEquals("0027", p.getInstance().get(1).getInstancetype().getClassid()); - assertEquals("Model", p.getInstance().get(1).getInstancetype().getClassname()); - - assertEquals("0038", p.getInstance().get(2).getInstancetype().getClassid()); - assertEquals("Other literature type", p.getInstance().get(2).getInstancetype().getClassname()); - - assertEquals("CLOSED", p.getInstance().get(0).getAccessright().getClassid()); - assertEquals("Closed Access", p.getInstance().get(0).getAccessright().getClassname()); - - Set pidTerms = vocabularies.getTerms(ModelConstants.DNET_PID_TYPES); - assertTrue( - p - .getPid() - .stream() - .map(StructuredProperty::getQualifier) - .allMatch(q -> pidTerms.contains(q.getClassid()))); - - List poi = p.getInstance(); - assertNotNull(poi); - assertEquals(3, poi.size()); - - final Instance poii = poi.get(0); - assertNotNull(poii); - assertNotNull(poii.getPid()); - - assertEquals(2, poii.getPid().size()); - - assertTrue( - poii.getPid().stream().anyMatch(s -> s.getValue().equals("10.1007/s109090161569x"))); - assertTrue(poii.getPid().stream().anyMatch(s -> s.getValue().equals("10.1008/abcd"))); - - assertNotNull(poii.getAlternateIdentifier()); - assertEquals(1, poii.getAlternateIdentifier().size()); - - assertTrue( - poii - .getAlternateIdentifier() - .stream() - .anyMatch(s -> s.getValue().equals("10.1009/qwerty"))); - - assertEquals(3, p.getTitle().size()); - - - List titles = p - .getTitle() - .stream() - .map(StructuredProperty::getValue) - .collect(Collectors.toList()); - assertTrue(titles.contains("omic")); - assertTrue( - titles.contains("Optical response of strained- and unstrained-silicon cold-electron bolometers test")); - assertTrue(titles.contains("「マキャベリ的知性と心の理論の進化論」 リチャード・バーン, アンドリュー・ホワイトゥン 編/藤田和生, 山下博志, 友永雅巳 監訳")); - - assertEquals("CLOSED", p.getBestaccessright().getClassid()); - assertNull(p.getPublisher()); - - assertEquals("1970-10-07", p.getDateofacceptance().getValue()); - - assertEquals("0038", p.getInstance().get(2).getInstancetype().getClassid()); - assertEquals("Other literature type", p.getInstance().get(2).getInstancetype().getClassname()); - - final List pci = p.getInstance(); - assertNotNull(pci); - assertEquals(3, pci.size()); - - final Instance pcii = pci.get(0); - assertNotNull(pcii); - assertNotNull(pcii.getPid()); - - assertEquals(2, pcii.getPid().size()); - - assertTrue( - pcii.getPid().stream().anyMatch(s -> s.getValue().equals("10.1007/s109090161569x"))); - assertTrue(pcii.getPid().stream().anyMatch(s -> s.getValue().equals("10.1008/abcd"))); - - assertNotNull(pcii.getAlternateIdentifier()); - assertEquals(1, pcii.getAlternateIdentifier().size()); - assertTrue( - pcii - .getAlternateIdentifier() - .stream() - .anyMatch(s -> s.getValue().equals("10.1009/qwerty"))); - - assertNotNull(p.getSubject()); - - List fos_subjects = p - .getSubject() - .stream() - .filter(s -> ModelConstants.DNET_SUBJECT_FOS_CLASSID.equals(s.getQualifier().getClassid())) - .collect(Collectors.toList()); - - assertNotNull(fos_subjects); - assertEquals(2, fos_subjects.size()); - - assertTrue( - fos_subjects - .stream() - .anyMatch( - s -> "0101 mathematics".equals(s.getValue()) & - ModelConstants.DNET_SUBJECT_FOS_CLASSID.equals(s.getQualifier().getClassid()) & - "sysimport:crosswalk:datasetarchive" - .equals(s.getDataInfo().getProvenanceaction().getClassid()))); - - assertTrue( - fos_subjects - .stream() - .anyMatch( - s -> "0102 computer and information sciences".equals(s.getValue()) & - ModelConstants.DNET_SUBJECT_FOS_CLASSID.equals(s.getQualifier().getClassid()))); - - verify_keyword(p, "In Situ Hybridization"); - verify_keyword(p, "Avicennia"); - } - - @Test - public void testCleanDoiBoost() throws IOException, ParseException, ISLookUpException, ClassNotFoundException { - verifyFiltering(1, "50|doi_________::b0baa0eb88a5788f0b8815560d2a32f2"); - } - - @Test - public void testCleanDoiBoost2() throws IOException, ParseException, ISLookUpException, ClassNotFoundException { - verifyFiltering(1, "50|doi_________::4972b0ca81b96b225aed8038bb965656"); - } - - private void verifyFiltering(int expectedCount, String id) throws ISLookUpException, ClassNotFoundException, IOException, ParseException { - new CleanGraphSparkJob( - args("/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json", - new String[] { - "--inputPath", graphInputPath.toString() + "/publication", - "--outputPath", graphOutputPath.toString() + "/publication", - "--isLookupUrl", "lookupurl", - "--graphTableClassName", Publication.class.getCanonicalName(), - "--deepClean", "false" - })).run(false, isLookUpService); - - Dataset p = read(spark, graphOutputPath.toString() + "/publication", Publication.class) - .filter(String.format("id = '%s'", id)); - - assertEquals(expectedCount, p.count()); - } - - @Test - public void testCleanContext() throws Exception { - final String prefix = "gcube "; - - new CleanGraphSparkJob( - args("/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json", - new String[] { - "--inputPath", graphInputPath.toString() + "/publication", - "--outputPath", graphOutputPath.toString() + "/publication", - "--isLookupUrl", "lookupurl", - "--graphTableClassName", Publication.class.getCanonicalName(), - "--deepClean", "true", - "--contextId", "sobigdata", - "--verifyParam", "gCube ", - "--masterDuplicatePath", dsMasterDuplicatePath, - "--country", "NL", - "--verifyCountryParam", "10.17632", - "--collectedfrom", "NARCIS", - "--hostedBy", getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy") - .getPath() - })).run(false, isLookUpService); - - Dataset pubs = read(spark, graphOutputPath.toString() + "/publication", Publication.class) - .filter((FilterFunction) p1 -> StringUtils.endsWith(p1.getId(), "_ctx")); - - assertEquals(7, pubs.count()); - - // original result with sobigdata context and gcube as starting string in the main title for the publication - assertEquals( - 0, - pubs - .filter((FilterFunction) p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439a_ctx")) - .first() - .getContext() - .size()); - - // original result with sobigdata context without gcube as starting string in the main title for the publication - assertEquals( - 1, - pubs - .filter((FilterFunction) p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67d_ctx")) - .first() - .getContext() - .size()); - assertEquals( - "sobigdata::projects::2", - pubs - .filter((FilterFunction) p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67d_ctx")) - .first() - .getContext() - .get(0) - .getId()); - - // original result with sobigdata context with gcube as starting string in the subtitle - assertEquals( - 1, - pubs - .filter((FilterFunction) p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6f_ctx")) - .first() - .getContext() - .size()); - assertEquals( - "sobigdata::projects::2", - pubs - .filter((FilterFunction) p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6f_ctx")) - .first() - .getContext() - .get(0) - .getId()); - - List titles = pubs - .filter((FilterFunction) p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6f_ctx")) - .first() - .getTitle(); - - assertEquals(1, titles.size()); - assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix)); - assertEquals("subtitle", titles.get(0).getQualifier().getClassid()); - - // original result with sobigdata context with gcube not as starting string in the main title - assertEquals( - 1, - pubs - .filter((FilterFunction) p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9_ctx")) - .first() - .getContext() - .size()); - assertEquals( - "sobigdata::projects::1", - pubs - .filter((FilterFunction) p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9_ctx")) - .first() - .getContext() - .get(0) - .getId()); - titles = pubs - .filter((FilterFunction) p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9_ctx")) - .first() - .getTitle(); - - assertEquals(1, titles.size()); - assertFalse(titles.get(0).getValue().toLowerCase().startsWith(prefix)); - assertTrue(titles.get(0).getValue().toLowerCase().contains(prefix.trim())); - assertEquals("main title", titles.get(0).getQualifier().getClassid()); - - // original result with sobigdata in context and also other contexts with gcube as starting string for the main - // title - assertEquals( - 1, - pubs - .filter((FilterFunction) p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fd_ctx")) - .first() - .getContext() - .size()); - assertEquals( - "dh-ch", - pubs - .filter((FilterFunction) p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fd_ctx")) - .first() - .getContext() - .get(0) - .getId()); - titles = pubs - .filter((FilterFunction) p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fd_ctx")) - .first() - .getTitle(); - - assertEquals(1, titles.size()); - assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix)); - assertEquals("main title", titles.get(0).getQualifier().getClassid()); - - // original result with multiple main title one of which whith gcube as starting string and with 2 contextes - assertEquals( - 1, - pubs - .filter((FilterFunction) p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74_ctx")) - .first() - .getContext() - .size()); - assertEquals( - "dh-ch", - pubs - .filter((FilterFunction) p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74_ctx")) - .first() - .getContext() - .get(0) - .getId()); - titles = pubs - .filter((FilterFunction) p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74_ctx")) - .first() - .getTitle(); - - assertEquals(2, titles.size()); - assertTrue( - titles - .stream() - .anyMatch( - t -> t.getQualifier().getClassid().equals("main title") - && t.getValue().toLowerCase().startsWith(prefix))); - - // original result without sobigdata in context with gcube as starting string for the main title - assertEquals( - 1, - pubs - .filter((FilterFunction) p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6_ctx")) - .first() - .getContext() - .size()); - assertEquals( - "dh-ch", - pubs - .filter((FilterFunction) p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6_ctx")) - .first() - .getContext() - .get(0) - .getId()); - titles = pubs - .filter((FilterFunction) p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6_ctx")) - .first() - .getTitle(); - - assertEquals(2, titles.size()); - - assertTrue( - titles - .stream() - .anyMatch( - t -> t.getQualifier().getClassid().equals("main title") - && t.getValue().toLowerCase().startsWith(prefix))); - - } - - @Test - void testCleanCfHbSparkJob() throws Exception { - - final Dataset pubs_in = read(spark, graphInputPath.toString() + "/publication", Publication.class); - final Publication p1_in = pubs_in - .filter("id = '50|doi_________::09821844208a5cd6300b2bfb13b_cfhb'") - .first(); - assertEquals("10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", p1_in.getCollectedfrom().get(0).getKey()); - assertEquals("Bacterial Protein Interaction Database - DUP", p1_in.getCollectedfrom().get(0).getValue()); - assertEquals( - "10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", p1_in.getInstance().get(0).getCollectedfrom().getKey()); - assertEquals( - "Bacterial Protein Interaction Database - DUP", p1_in.getInstance().get(0).getCollectedfrom().getValue()); - - final Publication p2_in = pubs_in - .filter("id = '50|DansKnawCris::0dd644304b7116e8e58da3a5e3a_cfhb'") - .first(); - assertEquals("10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", p2_in.getCollectedfrom().get(0).getKey()); - assertEquals("FILUR DATA - DUP", p2_in.getCollectedfrom().get(0).getValue()); - assertEquals( - "10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", p2_in.getInstance().get(0).getCollectedfrom().getKey()); - assertEquals("FILUR DATA - DUP", p2_in.getInstance().get(0).getCollectedfrom().getValue()); - assertEquals( - "10|re3data_____::6ffd7bc058f762912dc494cd9c175341", p2_in.getInstance().get(0).getHostedby().getKey()); - assertEquals("depositar - DUP", p2_in.getInstance().get(0).getHostedby().getValue()); - - final Publication p3_in = pubs_in - .filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7d_cfhb'") - .first(); - assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", p3_in.getCollectedfrom().get(0).getKey()); - assertEquals("DANS (Data Archiving and Networked Services)", p3_in.getCollectedfrom().get(0).getValue()); - assertEquals( - "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", p3_in.getInstance().get(0).getCollectedfrom().getKey()); - assertEquals( - "DANS (Data Archiving and Networked Services)", p3_in.getInstance().get(0).getCollectedfrom().getValue()); - assertEquals( - "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", p3_in.getInstance().get(0).getHostedby().getKey()); - assertEquals("DANS (Data Archiving and Networked Services)", p3_in.getInstance().get(0).getHostedby().getValue()); - - new CleanGraphSparkJob( - args("/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json", - new String[] { - "--inputPath", graphInputPath.toString() + "/publication", - "--outputPath", graphOutputPath.toString() + "/publication", - "--isLookupUrl", "lookupurl", - "--graphTableClassName", Publication.class.getCanonicalName(), - "--deepClean", "true", - "--contextId", "sobigdata", - "--verifyParam", "gCube ", - "--masterDuplicatePath", dsMasterDuplicatePath, - "--country", "NL", - "--verifyCountryParam", "10.17632", - "--collectedfrom", "NARCIS", - "--hostedBy", getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy") - .getPath() - })).run(false, isLookUpService); - - assertTrue(Files.exists(Paths.get(graphOutputPath, "publication"))); - - final Dataset pubs_out = read(spark, graphOutputPath.toString() + "/publication", Publication.class) - .filter((FilterFunction) p -> StringUtils.endsWith(p.getId(), "_cfhb")); - - assertEquals(3, pubs_out.count()); - - final Publication p1_out = pubs_out - .filter("id = '50|doi_________::09821844208a5cd6300b2bfb13b_cfhb'") - .first(); - assertEquals("10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", p1_out.getCollectedfrom().get(0).getKey()); - assertEquals("Bacterial Protein Interaction Database", p1_out.getCollectedfrom().get(0).getValue()); - assertEquals( - "10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", p1_out.getInstance().get(0).getCollectedfrom().getKey()); - assertEquals("Bacterial Protein Interaction Database", p1_out.getInstance().get(0).getCollectedfrom().getValue()); - - final Publication p2_out = pubs_out - .filter("id = '50|DansKnawCris::0dd644304b7116e8e58da3a5e3a_cfhb'") - .first(); - assertEquals("10|re3data_____::fc1db64b3964826913b1e9eafe830490", p2_out.getCollectedfrom().get(0).getKey()); - assertEquals("FULIR Data", p2_out.getCollectedfrom().get(0).getValue()); - assertEquals( - "10|re3data_____::fc1db64b3964826913b1e9eafe830490", p2_out.getInstance().get(0).getCollectedfrom().getKey()); - assertEquals("FULIR Data", p2_out.getInstance().get(0).getCollectedfrom().getValue()); - assertEquals( - "10|fairsharing_::3f647cadf56541fb9513cb63ec370187", p2_out.getInstance().get(0).getHostedby().getKey()); - assertEquals("depositar", p2_out.getInstance().get(0).getHostedby().getValue()); - - final Publication p3_out = pubs_out - .filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7d_cfhb'") - .first(); - assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", p3_out.getCollectedfrom().get(0).getKey()); - assertEquals("DANS (Data Archiving and Networked Services)", p3_out.getCollectedfrom().get(0).getValue()); - assertEquals( - "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", p3_out.getInstance().get(0).getCollectedfrom().getKey()); - assertEquals( - "DANS (Data Archiving and Networked Services)", p3_out.getInstance().get(0).getCollectedfrom().getValue()); - assertEquals( - "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", p3_out.getInstance().get(0).getHostedby().getKey()); - assertEquals("DANS (Data Archiving and Networked Services)", p3_out.getInstance().get(0).getHostedby().getValue()); - } - - @Test - public void testCleanCountry() throws Exception { - - new CleanGraphSparkJob( - args("/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json", - new String[] { - "--inputPath", graphInputPath.toString() + "/publication", - "--outputPath", graphOutputPath.toString() + "/publication", - "--isLookupUrl", "lookupurl", - "--graphTableClassName", Publication.class.getCanonicalName(), - "--deepClean", "true", - "--contextId", "sobigdata", - "--verifyParam", "gCube ", - "--masterDuplicatePath", dsMasterDuplicatePath, - "--country", "NL", - "--verifyCountryParam", "10.17632", - "--collectedfrom", "NARCIS", - "--hostedBy", getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy") - .getPath() - })).run(false, isLookUpService); - - - final Dataset pubs_out = read(spark, graphOutputPath.toString() + "/publication", Publication.class) - .filter((FilterFunction) p -> StringUtils.endsWith(p.getId(), "_country")); - - assertEquals(8, pubs_out.count()); - - // original result with NL country and doi starting with Mendely prefix, but not collectedfrom NARCIS - assertEquals( - 1, - pubs_out - .filter((FilterFunction) p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6_country")) - .first() - .getCountry() - .size()); - - // original result with NL country and pid not starting with Mendely prefix - assertEquals( - 1, - pubs_out - .filter((FilterFunction) p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1_country")) - .first() - .getCountry() - .size()); - - // original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS but not - // inserted with propagation - assertEquals( - 1, - pubs_out - .filter((FilterFunction) p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817e_country")) - .first() - .getCountry() - .size()); - - // original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS inserted with - // propagation - assertEquals( - 0, - pubs_out - .filter((FilterFunction) p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817d_country")) - .first() - .getCountry() - .size()); - } - - private List vocs() throws IOException { - return IOUtils - .readLines( - GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt")); - } - - private List synonyms() throws IOException { - return IOUtils - .readLines( - GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt")); - } - - private org.apache.spark.sql.Dataset read(SparkSession spark, String path, Class clazz) { - return spark - .read() - .textFile(path) - .map(as(clazz), Encoders.bean(clazz)); - } - - private static MapFunction as(Class clazz) { - return s -> MAPPER.readValue(s, clazz); - } - - private static String classPathResourceAsString(String path) throws IOException { - return IOUtils - .toString( - CleanGraphSparkJobTest.class - .getResourceAsStream(path)); - } - - private ArgumentApplicationParser args(String paramSpecs, String[] args) throws IOException, ParseException { - ArgumentApplicationParser parser = new ArgumentApplicationParser(classPathResourceAsString(paramSpecs)); - parser.parseArgument(args); - return parser; - } - - private static void verify_keyword(Publication p_cleaned, String subject) { - Optional s1 = p_cleaned - .getSubject() - .stream() - .filter(s -> s.getValue().equals(subject)) - .findFirst(); - - assertTrue(s1.isPresent()); - assertEquals(ModelConstants.DNET_SUBJECT_KEYWORD, s1.get().getQualifier().getClassid()); - assertEquals(ModelConstants.DNET_SUBJECT_KEYWORD, s1.get().getQualifier().getClassname()); - } - - private Stream getAuthorPids(Result pub) { - return pub - .getAuthor() - .stream() - .map(Author::getPid) - .flatMap(Collection::stream); - } + assertEquals("0038", p.getInstance().get(2).getInstancetype().getClassid()); + assertEquals("Other literature type", p.getInstance().get(2).getInstancetype().getClassname()); + + assertEquals("CLOSED", p.getInstance().get(0).getAccessright().getClassid()); + assertEquals("Closed Access", p.getInstance().get(0).getAccessright().getClassname()); + + Set pidTerms = vocabularies.getTerms(ModelConstants.DNET_PID_TYPES); + assertTrue( + p + .getPid() + .stream() + .map(StructuredProperty::getQualifier) + .allMatch(q -> pidTerms.contains(q.getClassid()))); + + List poi = p.getInstance(); + assertNotNull(poi); + assertEquals(3, poi.size()); + + final Instance poii = poi.get(0); + assertNotNull(poii); + assertNotNull(poii.getPid()); + + assertEquals(2, poii.getPid().size()); + + assertTrue( + poii.getPid().stream().anyMatch(s -> s.getValue().equals("10.1007/s109090161569x"))); + assertTrue(poii.getPid().stream().anyMatch(s -> s.getValue().equals("10.1008/abcd"))); + + assertNotNull(poii.getAlternateIdentifier()); + assertEquals(1, poii.getAlternateIdentifier().size()); + + assertTrue( + poii + .getAlternateIdentifier() + .stream() + .anyMatch(s -> s.getValue().equals("10.1009/qwerty"))); + + assertEquals(3, p.getTitle().size()); + + List titles = p + .getTitle() + .stream() + .map(StructuredProperty::getValue) + .collect(Collectors.toList()); + assertTrue(titles.contains("omic")); + assertTrue( + titles.contains("Optical response of strained- and unstrained-silicon cold-electron bolometers test")); + assertTrue(titles.contains("「マキャベリ的知性と心の理論の進化論」 リチャード・バーン, アンドリュー・ホワイトゥン 編/藤田和生, 山下博志, 友永雅巳 監訳")); + + assertEquals("CLOSED", p.getBestaccessright().getClassid()); + assertNull(p.getPublisher()); + + assertEquals("1970-10-07", p.getDateofacceptance().getValue()); + + assertEquals("0038", p.getInstance().get(2).getInstancetype().getClassid()); + assertEquals("Other literature type", p.getInstance().get(2).getInstancetype().getClassname()); + + final List pci = p.getInstance(); + assertNotNull(pci); + assertEquals(3, pci.size()); + + final Instance pcii = pci.get(0); + assertNotNull(pcii); + assertNotNull(pcii.getPid()); + + assertEquals(2, pcii.getPid().size()); + + assertTrue( + pcii.getPid().stream().anyMatch(s -> s.getValue().equals("10.1007/s109090161569x"))); + assertTrue(pcii.getPid().stream().anyMatch(s -> s.getValue().equals("10.1008/abcd"))); + + assertNotNull(pcii.getAlternateIdentifier()); + assertEquals(1, pcii.getAlternateIdentifier().size()); + assertTrue( + pcii + .getAlternateIdentifier() + .stream() + .anyMatch(s -> s.getValue().equals("10.1009/qwerty"))); + + assertNotNull(p.getSubject()); + + List fos_subjects = p + .getSubject() + .stream() + .filter(s -> ModelConstants.DNET_SUBJECT_FOS_CLASSID.equals(s.getQualifier().getClassid())) + .collect(Collectors.toList()); + + assertNotNull(fos_subjects); + assertEquals(2, fos_subjects.size()); + + assertTrue( + fos_subjects + .stream() + .anyMatch( + s -> "0101 mathematics".equals(s.getValue()) & + ModelConstants.DNET_SUBJECT_FOS_CLASSID.equals(s.getQualifier().getClassid()) & + "sysimport:crosswalk:datasetarchive" + .equals(s.getDataInfo().getProvenanceaction().getClassid()))); + + assertTrue( + fos_subjects + .stream() + .anyMatch( + s -> "0102 computer and information sciences".equals(s.getValue()) & + ModelConstants.DNET_SUBJECT_FOS_CLASSID.equals(s.getQualifier().getClassid()))); + + verify_keyword(p, "In Situ Hybridization"); + verify_keyword(p, "Avicennia"); + } + + @Test + void testCleanDoiBoost() throws IOException, ParseException, ISLookUpException, ClassNotFoundException { + verifyFiltering(1, "50|doi_________::b0baa0eb88a5788f0b8815560d2a32f2"); + } + + @Test + void testCleanDoiBoost2() throws IOException, ParseException, ISLookUpException, ClassNotFoundException { + verifyFiltering(1, "50|doi_________::4972b0ca81b96b225aed8038bb965656"); + } + + private void verifyFiltering(int expectedCount, String id) + throws ISLookUpException, ClassNotFoundException, IOException, ParseException { + new CleanGraphSparkJob( + args( + "/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json", + new String[] { + "--inputPath", graphInputPath + "/publication", + "--outputPath", graphOutputPath + "/publication", + "--isLookupUrl", "lookupurl", + "--graphTableClassName", Publication.class.getCanonicalName(), + "--deepClean", "false" + })).run(false, isLookUpService); + + Dataset p = read(spark, graphOutputPath + "/publication", Publication.class) + .filter(String.format("id = '%s'", id)); + + assertEquals(expectedCount, p.count()); + } + + @Test + void testCleanContext() throws Exception { + final String prefix = "gcube "; + + new CleanGraphSparkJob( + args( + "/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json", + new String[] { + "--inputPath", graphInputPath + "/publication", + "--outputPath", graphOutputPath + "/publication", + "--isLookupUrl", "lookupurl", + "--graphTableClassName", Publication.class.getCanonicalName(), + "--deepClean", "true", + "--contextId", "sobigdata", + "--verifyParam", "gCube ", + "--masterDuplicatePath", dsMasterDuplicatePath, + "--country", "NL", + "--verifyCountryParam", "10.17632", + "--collectedfrom", "NARCIS", + "--hostedBy", Objects + .requireNonNull( + getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy")) + .getPath() + })).run(false, isLookUpService); + + Dataset pubs = read(spark, graphOutputPath + "/publication", Publication.class) + .filter((FilterFunction) p1 -> StringUtils.endsWith(p1.getId(), "_ctx")); + + assertEquals(7, pubs.count()); + + // original result with sobigdata context and gcube as starting string in the main title for the publication + assertEquals( + 0, + pubs + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|DansKnawCris::0224aae28af558f21768dbc6439a_ctx")) + .first() + .getContext() + .size()); + + // original result with sobigdata context without gcube as starting string in the main title for the publication + assertEquals( + 1, + pubs + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67d_ctx")) + .first() + .getContext() + .size()); + assertEquals( + "sobigdata::projects::2", + pubs + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67d_ctx")) + .first() + .getContext() + .get(0) + .getId()); + + // original result with sobigdata context with gcube as starting string in the subtitle + assertEquals( + 1, + pubs + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6f_ctx")) + .first() + .getContext() + .size()); + assertEquals( + "sobigdata::projects::2", + pubs + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6f_ctx")) + .first() + .getContext() + .get(0) + .getId()); + + List titles = pubs + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6f_ctx")) + .first() + .getTitle(); + + assertEquals(1, titles.size()); + assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix)); + assertEquals("subtitle", titles.get(0).getQualifier().getClassid()); + + // original result with sobigdata context with gcube not as starting string in the main title + assertEquals( + 1, + pubs + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9_ctx")) + .first() + .getContext() + .size()); + assertEquals( + "sobigdata::projects::1", + pubs + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9_ctx")) + .first() + .getContext() + .get(0) + .getId()); + titles = pubs + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9_ctx")) + .first() + .getTitle(); + + assertEquals(1, titles.size()); + assertFalse(titles.get(0).getValue().toLowerCase().startsWith(prefix)); + assertTrue(titles.get(0).getValue().toLowerCase().contains(prefix.trim())); + assertEquals("main title", titles.get(0).getQualifier().getClassid()); + + // original result with sobigdata in context and also other contexts with gcube as starting string for the main + // title + assertEquals( + 1, + pubs + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|DansKnawCris::4669a378a73661417182c208e6fd_ctx")) + .first() + .getContext() + .size()); + assertEquals( + "dh-ch", + pubs + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|DansKnawCris::4669a378a73661417182c208e6fd_ctx")) + .first() + .getContext() + .get(0) + .getId()); + titles = pubs + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|DansKnawCris::4669a378a73661417182c208e6fd_ctx")) + .first() + .getTitle(); + + assertEquals(1, titles.size()); + assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix)); + assertEquals("main title", titles.get(0).getQualifier().getClassid()); + + // original result with multiple main title one of which whith gcube as starting string and with 2 contextes + assertEquals( + 1, + pubs + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|DansKnawCris::4a9152e80f860eab99072e921d74_ctx")) + .first() + .getContext() + .size()); + assertEquals( + "dh-ch", + pubs + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|DansKnawCris::4a9152e80f860eab99072e921d74_ctx")) + .first() + .getContext() + .get(0) + .getId()); + titles = pubs + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|DansKnawCris::4a9152e80f860eab99072e921d74_ctx")) + .first() + .getTitle(); + + assertEquals(2, titles.size()); + assertTrue( + titles + .stream() + .anyMatch( + t -> t.getQualifier().getClassid().equals("main title") + && t.getValue().toLowerCase().startsWith(prefix))); + + // original result without sobigdata in context with gcube as starting string for the main title + assertEquals( + 1, + pubs + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6_ctx")) + .first() + .getContext() + .size()); + assertEquals( + "dh-ch", + pubs + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6_ctx")) + .first() + .getContext() + .get(0) + .getId()); + titles = pubs + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6_ctx")) + .first() + .getTitle(); + + assertEquals(2, titles.size()); + + assertTrue( + titles + .stream() + .anyMatch( + t -> t.getQualifier().getClassid().equals("main title") + && t.getValue().toLowerCase().startsWith(prefix))); + + } + + @Test + void testCleanCfHbSparkJob() throws Exception { + + final Dataset pubs_in = read(spark, graphInputPath + "/publication", Publication.class); + final Publication p1_in = pubs_in + .filter("id = '50|doi_________::09821844208a5cd6300b2bfb13b_cfhb'") + .first(); + assertEquals("10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", p1_in.getCollectedfrom().get(0).getKey()); + assertEquals("Bacterial Protein Interaction Database - DUP", p1_in.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", + p1_in.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals( + "Bacterial Protein Interaction Database - DUP", p1_in.getInstance().get(0).getCollectedfrom().getValue()); + + final Publication p2_in = pubs_in + .filter("id = '50|DansKnawCris::0dd644304b7116e8e58da3a5e3a_cfhb'") + .first(); + assertEquals("10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", p2_in.getCollectedfrom().get(0).getKey()); + assertEquals("FILUR DATA - DUP", p2_in.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", + p2_in.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals("FILUR DATA - DUP", p2_in.getInstance().get(0).getCollectedfrom().getValue()); + assertEquals( + "10|re3data_____::6ffd7bc058f762912dc494cd9c175341", p2_in.getInstance().get(0).getHostedby().getKey()); + assertEquals("depositar - DUP", p2_in.getInstance().get(0).getHostedby().getValue()); + + final Publication p3_in = pubs_in + .filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7d_cfhb'") + .first(); + assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", p3_in.getCollectedfrom().get(0).getKey()); + assertEquals("DANS (Data Archiving and Networked Services)", p3_in.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", + p3_in.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals( + "DANS (Data Archiving and Networked Services)", p3_in.getInstance().get(0).getCollectedfrom().getValue()); + assertEquals( + "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", p3_in.getInstance().get(0).getHostedby().getKey()); + assertEquals( + "DANS (Data Archiving and Networked Services)", p3_in.getInstance().get(0).getHostedby().getValue()); + + new CleanGraphSparkJob( + args( + "/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json", + new String[] { + "--inputPath", graphInputPath + "/publication", + "--outputPath", graphOutputPath + "/publication", + "--isLookupUrl", "lookupurl", + "--graphTableClassName", Publication.class.getCanonicalName(), + "--deepClean", "true", + "--contextId", "sobigdata", + "--verifyParam", "gCube ", + "--masterDuplicatePath", dsMasterDuplicatePath, + "--country", "NL", + "--verifyCountryParam", "10.17632", + "--collectedfrom", "NARCIS", + "--hostedBy", Objects + .requireNonNull( + getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy")) + .getPath() + })).run(false, isLookUpService); + + assertTrue(Files.exists(Paths.get(graphOutputPath, "publication"))); + + final Dataset pubs_out = read(spark, graphOutputPath + "/publication", Publication.class) + .filter((FilterFunction) p -> StringUtils.endsWith(p.getId(), "_cfhb")); + + assertEquals(3, pubs_out.count()); + + final Publication p1_out = pubs_out + .filter("id = '50|doi_________::09821844208a5cd6300b2bfb13b_cfhb'") + .first(); + assertEquals("10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", p1_out.getCollectedfrom().get(0).getKey()); + assertEquals("Bacterial Protein Interaction Database", p1_out.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", + p1_out.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals( + "Bacterial Protein Interaction Database", p1_out.getInstance().get(0).getCollectedfrom().getValue()); + + final Publication p2_out = pubs_out + .filter("id = '50|DansKnawCris::0dd644304b7116e8e58da3a5e3a_cfhb'") + .first(); + assertEquals("10|re3data_____::fc1db64b3964826913b1e9eafe830490", p2_out.getCollectedfrom().get(0).getKey()); + assertEquals("FULIR Data", p2_out.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|re3data_____::fc1db64b3964826913b1e9eafe830490", + p2_out.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals("FULIR Data", p2_out.getInstance().get(0).getCollectedfrom().getValue()); + assertEquals( + "10|fairsharing_::3f647cadf56541fb9513cb63ec370187", p2_out.getInstance().get(0).getHostedby().getKey()); + assertEquals("depositar", p2_out.getInstance().get(0).getHostedby().getValue()); + + final Publication p3_out = pubs_out + .filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7d_cfhb'") + .first(); + assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", p3_out.getCollectedfrom().get(0).getKey()); + assertEquals("DANS (Data Archiving and Networked Services)", p3_out.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", + p3_out.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals( + "DANS (Data Archiving and Networked Services)", p3_out.getInstance().get(0).getCollectedfrom().getValue()); + assertEquals( + "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", p3_out.getInstance().get(0).getHostedby().getKey()); + assertEquals( + "DANS (Data Archiving and Networked Services)", p3_out.getInstance().get(0).getHostedby().getValue()); + } + + @Test + void testCleanCountry() throws Exception { + + new CleanGraphSparkJob( + args( + "/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json", + new String[] { + "--inputPath", graphInputPath + "/publication", + "--outputPath", graphOutputPath + "/publication", + "--isLookupUrl", "lookupurl", + "--graphTableClassName", Publication.class.getCanonicalName(), + "--deepClean", "true", + "--contextId", "sobigdata", + "--verifyParam", "gCube ", + "--masterDuplicatePath", dsMasterDuplicatePath, + "--country", "NL", + "--verifyCountryParam", "10.17632", + "--collectedfrom", "NARCIS", + "--hostedBy", Objects + .requireNonNull( + getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy")) + .getPath() + })).run(false, isLookUpService); + + final Dataset pubs_out = read(spark, graphOutputPath + "/publication", Publication.class) + .filter((FilterFunction) p -> StringUtils.endsWith(p.getId(), "_country")); + + assertEquals(8, pubs_out.count()); + + // original result with NL country and doi starting with Mendely prefix, but not collectedfrom NARCIS + assertEquals( + 1, + pubs_out + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|DansKnawCris::0224aae28af558f21768dbc6_country")) + .first() + .getCountry() + .size()); + + // original result with NL country and pid not starting with Mendely prefix + assertEquals( + 1, + pubs_out + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1_country")) + .first() + .getCountry() + .size()); + + // original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS but not + // inserted with propagation + assertEquals( + 1, + pubs_out + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|DansKnawCris::3c81248c335f0aa07e06817e_country")) + .first() + .getCountry() + .size()); + + // original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS inserted with + // propagation + assertEquals( + 0, + pubs_out + .filter( + (FilterFunction) p -> p + .getId() + .equals("50|DansKnawCris::3c81248c335f0aa07e06817d_country")) + .first() + .getCountry() + .size()); + } + + private List vocs() throws IOException { + return IOUtils + .readLines( + Objects + .requireNonNull( + getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"))); + } + + private List synonyms() throws IOException { + return IOUtils + .readLines( + Objects + .requireNonNull( + getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt"))); + } + + private org.apache.spark.sql.Dataset read(SparkSession spark, String path, Class clazz) { + return spark + .read() + .textFile(path) + .map(as(clazz), Encoders.bean(clazz)); + } + + private static MapFunction as(Class clazz) { + return s -> MAPPER.readValue(s, clazz); + } + + private static String classPathResourceAsString(String path) throws IOException { + return IOUtils + .toString( + Objects + .requireNonNull( + CleanGraphSparkJobTest.class.getResourceAsStream(path))); + } + + private ArgumentApplicationParser args(String paramSpecs, String[] args) throws IOException, ParseException { + ArgumentApplicationParser parser = new ArgumentApplicationParser(classPathResourceAsString(paramSpecs)); + parser.parseArgument(args); + return parser; + } + + private static void verify_keyword(Publication p_cleaned, String subject) { + Optional s1 = p_cleaned + .getSubject() + .stream() + .filter(s -> s.getValue().equals(subject)) + .findFirst(); + + assertTrue(s1.isPresent()); + assertEquals(ModelConstants.DNET_SUBJECT_KEYWORD, s1.get().getQualifier().getClassid()); + assertEquals(ModelConstants.DNET_SUBJECT_KEYWORD, s1.get().getQualifier().getClassname()); + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java deleted file mode 100644 index 9096180ef..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java +++ /dev/null @@ -1,213 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.clean.cfhb; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; - -import org.apache.commons.io.FileUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.schema.oaf.Dataset; -import eu.dnetlib.dhp.schema.oaf.Publication; - -public class CleanCfHbSparkJobTest { - - private static final Logger log = LoggerFactory.getLogger(CleanCfHbSparkJobTest.class); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - private static SparkSession spark; - - private static Path testBaseTmpPath; - - private static String resolvedPath; - - private static String graphInputPath; - - private static String graphOutputPath; - - private static String dsMasterDuplicatePath; - - @BeforeAll - public static void beforeAll() throws IOException, URISyntaxException { - - testBaseTmpPath = Files.createTempDirectory(CleanCfHbSparkJobTest.class.getSimpleName()); - log.info("using test base path {}", testBaseTmpPath); - - final File entitiesSources = Paths - .get(CleanCfHbSparkJobTest.class.getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/entities").toURI()) - .toFile(); - - FileUtils - .copyDirectory( - entitiesSources, - testBaseTmpPath.resolve("input").resolve("entities").toFile()); - - FileUtils - .copyFileToDirectory( - Paths - .get( - CleanCfHbSparkJobTest.class - .getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/masterduplicate.json") - .toURI()) - .toFile(), - testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toFile()); - - graphInputPath = testBaseTmpPath.resolve("input").resolve("entities").toString(); - resolvedPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbResolved").toString(); - graphOutputPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbPatched").toString(); - dsMasterDuplicatePath = testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toString(); - - SparkConf conf = new SparkConf(); - conf.setAppName(CleanCfHbSparkJobTest.class.getSimpleName()); - - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("spark.ui.enabled", "false"); - - spark = SparkSession - .builder() - .appName(CleanCfHbSparkJobTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } - - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(testBaseTmpPath.toFile()); - spark.stop(); - } - - @Test - void testCleanCfHbSparkJob() throws Exception { - final String outputPath = graphOutputPath + "/dataset"; - final String inputPath = graphInputPath + "/dataset"; - - org.apache.spark.sql.Dataset records = read(spark, inputPath, Dataset.class); - Dataset d = records - .filter("id = '50|doi_________::09821844208a5cd6300b2bfb13bca1b9'") - .first(); - assertEquals("10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", d.getCollectedfrom().get(0).getKey()); - assertEquals("Bacterial Protein Interaction Database - DUP", d.getCollectedfrom().get(0).getValue()); - assertEquals( - "10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", d.getInstance().get(0).getCollectedfrom().getKey()); - assertEquals( - "Bacterial Protein Interaction Database - DUP", d.getInstance().get(0).getCollectedfrom().getValue()); - - d = records - .filter("id = '50|DansKnawCris::0dd644304b7116e8e58da3a5e3adc37a'") - .first(); - assertEquals("10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", d.getCollectedfrom().get(0).getKey()); - assertEquals("FILUR DATA - DUP", d.getCollectedfrom().get(0).getValue()); - assertEquals( - "10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", d.getInstance().get(0).getCollectedfrom().getKey()); - assertEquals("FILUR DATA - DUP", d.getInstance().get(0).getCollectedfrom().getValue()); - assertEquals( - "10|re3data_____::6ffd7bc058f762912dc494cd9c175341", d.getInstance().get(0).getHostedby().getKey()); - assertEquals("depositar - DUP", d.getInstance().get(0).getHostedby().getValue()); - - d = records - .filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c'") - .first(); - assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getCollectedfrom().get(0).getKey()); - assertEquals("DANS (Data Archiving and Networked Services)", d.getCollectedfrom().get(0).getValue()); - assertEquals( - "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getCollectedfrom().getKey()); - assertEquals( - "DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getCollectedfrom().getValue()); - assertEquals( - "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getHostedby().getKey()); - assertEquals("DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getHostedby().getValue()); - - CleanCfHbSparkJob - .main( - new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--inputPath", inputPath, - "--outputPath", outputPath, - "--resolvedPath", resolvedPath + "/dataset", - "--graphTableClassName", Dataset.class.getCanonicalName(), - "--masterDuplicatePath", dsMasterDuplicatePath - }); - - assertTrue(Files.exists(Paths.get(graphOutputPath, "dataset"))); - - records = read(spark, outputPath, Dataset.class); - - assertEquals(3, records.count()); - - d = records - .filter("id = '50|doi_________::09821844208a5cd6300b2bfb13bca1b9'") - .first(); - assertEquals("10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", d.getCollectedfrom().get(0).getKey()); - assertEquals("Bacterial Protein Interaction Database", d.getCollectedfrom().get(0).getValue()); - assertEquals( - "10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", d.getInstance().get(0).getCollectedfrom().getKey()); - assertEquals("Bacterial Protein Interaction Database", d.getInstance().get(0).getCollectedfrom().getValue()); - - d = records - .filter("id = '50|DansKnawCris::0dd644304b7116e8e58da3a5e3adc37a'") - .first(); - assertEquals("10|re3data_____::fc1db64b3964826913b1e9eafe830490", d.getCollectedfrom().get(0).getKey()); - assertEquals("FULIR Data", d.getCollectedfrom().get(0).getValue()); - assertEquals( - "10|re3data_____::fc1db64b3964826913b1e9eafe830490", d.getInstance().get(0).getCollectedfrom().getKey()); - assertEquals("FULIR Data", d.getInstance().get(0).getCollectedfrom().getValue()); - assertEquals( - "10|fairsharing_::3f647cadf56541fb9513cb63ec370187", d.getInstance().get(0).getHostedby().getKey()); - assertEquals("depositar", d.getInstance().get(0).getHostedby().getValue()); - - d = records - .filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c'") - .first(); - assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getCollectedfrom().get(0).getKey()); - assertEquals("DANS (Data Archiving and Networked Services)", d.getCollectedfrom().get(0).getValue()); - assertEquals( - "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getCollectedfrom().getKey()); - assertEquals( - "DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getCollectedfrom().getValue()); - assertEquals( - "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getHostedby().getKey()); - assertEquals("DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getHostedby().getValue()); - - d = records - .filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c'") - .first(); - assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getCollectedfrom().get(0).getKey()); - assertEquals("DANS (Data Archiving and Networked Services)", d.getCollectedfrom().get(0).getValue()); - assertEquals( - "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getCollectedfrom().getKey()); - assertEquals( - "DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getCollectedfrom().getValue()); - assertEquals( - "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getHostedby().getKey()); - assertEquals("DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getHostedby().getValue()); - } - - private org.apache.spark.sql.Dataset read(SparkSession spark, String path, Class clazz) { - return spark - .read() - .textFile(path) - .map(as(clazz), Encoders.bean(clazz)); - } - - private static MapFunction as(Class clazz) { - return s -> OBJECT_MAPPER.readValue(s, clazz); - } -}