diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java index b01564d251..a3d3eddc29 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java @@ -232,9 +232,9 @@ public class PropagationConstant { if (HdfsSupport.exists(inputPath, spark.sparkContext().hadoopConfiguration())) { return spark - .read() - .textFile(inputPath) - .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); + .read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); } else { return spark.emptyDataset(Encoders.bean(clazz)); } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java index 478c33a120..132006f5c2 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java @@ -95,7 +95,6 @@ public class ResultTagger implements Serializable { } - tmp .forEach( dsId -> datasources diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java index 676f377926..8a9dbb028b 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java @@ -774,47 +774,47 @@ public class BulkTagJobTest { void bulktagPublicationwithConstraintsTest() throws Exception { final String sourcePath = getClass() - .getResource( - "/eu/dnetlib/dhp/bulktag/sample/publication/orcidbulktagfordatasource") - .getPath(); + .getResource( + "/eu/dnetlib/dhp/bulktag/sample/publication/orcidbulktagfordatasource") + .getPath(); SparkBulkTagJob - .main( - new String[] { - "-isTest", Boolean.TRUE.toString(), - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-sourcePath", sourcePath, - "-taggingConf", IOUtils - .toString( - BulkTagJobTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf_neanias.xml")), - "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication", - "-outputPath", workingDir.toString() + "/publication", - "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, - "-pathMap", pathMap - }); + .main( + new String[] { + "-isTest", Boolean.TRUE.toString(), + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", sourcePath, + "-taggingConf", IOUtils + .toString( + BulkTagJobTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf_neanias.xml")), + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication", + "-outputPath", workingDir.toString() + "/publication", + "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, + "-pathMap", pathMap + }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/publication") - .map(item -> OBJECT_MAPPER.readValue(item, Publication.class)); + .textFile(workingDir.toString() + "/publication") + .map(item -> OBJECT_MAPPER.readValue(item, Publication.class)); Assertions.assertEquals(2, tmp.count()); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(Publication.class)); + .createDataset(tmp.rdd(), Encoders.bean(Publication.class)); verificationDataset.createOrReplaceTempView("dataset"); String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name " - + "from dataset " - + "lateral view explode(context) c as MyT " - + "lateral view explode(MyT.datainfo) d as MyD " - + "where MyD.inferenceprovenance = 'bulktagging'"; + + "from dataset " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD " + + "where MyD.inferenceprovenance = 'bulktagging'"; org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query); idExplodeCommunity.show(false); Assertions.assertEquals(0, idExplodeCommunity.count()); - } + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java index 5bbfeba8df..37e693de9a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java @@ -39,173 +39,173 @@ import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.utils.PidType; public class CleanCountrySparkJob implements Serializable { - private static final Logger log = LoggerFactory.getLogger(CleanCountrySparkJob.class); + private static final Logger log = LoggerFactory.getLogger(CleanCountrySparkJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - CleanCountrySparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + String jsonConfiguration = IOUtils + .toString( + CleanCountrySparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - String inputPath = parser.get("inputPath"); - log.info("inputPath: {}", inputPath); + String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); - String workingDir = parser.get("workingDir"); - log.info("workingDir: {}", workingDir); + String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - String datasourcePath = parser.get("hostedBy"); - log.info("datasourcePath: {}", datasourcePath); + String datasourcePath = parser.get("hostedBy"); + log.info("datasourcePath: {}", datasourcePath); - String country = parser.get("country"); - log.info("country: {}", country); + String country = parser.get("country"); + log.info("country: {}", country); - String[] verifyParam = parser.get("verifyParam").split(";"); - log.info("verifyParam: {}", verifyParam); + String[] verifyParam = parser.get("verifyParam").split(";"); + log.info("verifyParam: {}", verifyParam); - String collectedfrom = parser.get("collectedfrom"); - log.info("collectedfrom: {}", collectedfrom); + String collectedfrom = parser.get("collectedfrom"); + log.info("collectedfrom: {}", collectedfrom); - String graphTableClassName = parser.get("graphTableClassName"); - log.info("graphTableClassName: {}", graphTableClassName); + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); - Class entityClazz = (Class) Class.forName(graphTableClassName); + Class entityClazz = (Class) Class.forName(graphTableClassName); - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { - cleanCountry( - spark, country, verifyParam, inputPath, entityClazz, workingDir, collectedfrom, datasourcePath); - }); - } + cleanCountry( + spark, country, verifyParam, inputPath, entityClazz, workingDir, collectedfrom, datasourcePath); + }); + } - private static void cleanCountry(SparkSession spark, String country, String[] verifyParam, - String inputPath, Class entityClazz, String workingDir, String collectedfrom, String datasourcePath) { + private static void cleanCountry(SparkSession spark, String country, String[] verifyParam, + String inputPath, Class entityClazz, String workingDir, String collectedfrom, String datasourcePath) { - List hostedBy = spark - .read() - .textFile(datasourcePath) - .collectAsList(); + List hostedBy = spark + .read() + .textFile(datasourcePath) + .collectAsList(); - Dataset res = spark - .read() - .textFile(inputPath) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), - Encoders.bean(entityClazz)); + Dataset res = spark + .read() + .textFile(inputPath) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), + Encoders.bean(entityClazz)); - res.map((MapFunction) r -> { - if (r.getInstance().stream().anyMatch(i -> hostedBy.contains(i.getHostedby().getKey())) || - !r.getCollectedfrom().stream().anyMatch(cf -> cf.getValue().equals(collectedfrom))) { - return r; - } + res.map((MapFunction) r -> { + if (r.getInstance().stream().anyMatch(i -> hostedBy.contains(i.getHostedby().getKey())) || + !r.getCollectedfrom().stream().anyMatch(cf -> cf.getValue().equals(collectedfrom))) { + return r; + } - List ids = getPidsAndAltIds(r).collect(Collectors.toList()); - if (ids - .stream() - .anyMatch( - p -> p - .getQualifier() - .getClassid() - .equals(PidType.doi.toString()) && pidInParam(p.getValue(), verifyParam))) { - r - .setCountry( - r - .getCountry() - .stream() - .filter( - c -> toTakeCountry(c, country)) - .collect(Collectors.toList())); + List ids = getPidsAndAltIds(r).collect(Collectors.toList()); + if (ids + .stream() + .anyMatch( + p -> p + .getQualifier() + .getClassid() + .equals(PidType.doi.toString()) && pidInParam(p.getValue(), verifyParam))) { + r + .setCountry( + r + .getCountry() + .stream() + .filter( + c -> toTakeCountry(c, country)) + .collect(Collectors.toList())); - } + } - return r; - }, Encoders.bean(entityClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingDir); + return r; + }, Encoders.bean(entityClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir); - spark - .read() - .textFile(workingDir) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), - Encoders.bean(entityClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(inputPath); - } + spark + .read() + .textFile(workingDir) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), + Encoders.bean(entityClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(inputPath); + } - private static Stream getPidsAndAltIds(T r) { - final Stream resultPids = Optional - .ofNullable(r.getPid()) - .map(Collection::stream) - .orElse(Stream.empty()); + private static Stream getPidsAndAltIds(T r) { + final Stream resultPids = Optional + .ofNullable(r.getPid()) + .map(Collection::stream) + .orElse(Stream.empty()); - final Stream instancePids = Optional - .ofNullable(r.getInstance()) - .map( - instance -> instance - .stream() - .flatMap( - i -> Optional - .ofNullable(i.getPid()) - .map(Collection::stream) - .orElse(Stream.empty()))) - .orElse(Stream.empty()); + final Stream instancePids = Optional + .ofNullable(r.getInstance()) + .map( + instance -> instance + .stream() + .flatMap( + i -> Optional + .ofNullable(i.getPid()) + .map(Collection::stream) + .orElse(Stream.empty()))) + .orElse(Stream.empty()); - final Stream instanceAltIds = Optional - .ofNullable(r.getInstance()) - .map( - instance -> instance - .stream() - .flatMap( - i -> Optional - .ofNullable(i.getAlternateIdentifier()) - .map(Collection::stream) - .orElse(Stream.empty()))) - .orElse(Stream.empty()); + final Stream instanceAltIds = Optional + .ofNullable(r.getInstance()) + .map( + instance -> instance + .stream() + .flatMap( + i -> Optional + .ofNullable(i.getAlternateIdentifier()) + .map(Collection::stream) + .orElse(Stream.empty()))) + .orElse(Stream.empty()); - return Stream - .concat( - Stream.concat(resultPids, instancePids), - instanceAltIds); - } + return Stream + .concat( + Stream.concat(resultPids, instancePids), + instanceAltIds); + } - private static boolean pidInParam(String value, String[] verifyParam) { - for (String s : verifyParam) - if (value.startsWith(s)) - return true; - return false; - } + private static boolean pidInParam(String value, String[] verifyParam) { + for (String s : verifyParam) + if (value.startsWith(s)) + return true; + return false; + } - private static boolean toTakeCountry(Country c, String country) { - // If dataInfo is not set, or dataInfo.inferenceprovenance is not set or not present then it cannot be - // inserted via propagation - if (!Optional.ofNullable(c.getDataInfo()).isPresent()) - return true; - if (!Optional.ofNullable(c.getDataInfo().getInferenceprovenance()).isPresent()) - return true; - return !(c - .getClassid() - .equalsIgnoreCase(country) && - c.getDataInfo().getInferenceprovenance().equals("propagation")); - } + private static boolean toTakeCountry(Country c, String country) { + // If dataInfo is not set, or dataInfo.inferenceprovenance is not set or not present then it cannot be + // inserted via propagation + if (!Optional.ofNullable(c.getDataInfo()).isPresent()) + return true; + if (!Optional.ofNullable(c.getDataInfo().getInferenceprovenance()).isPresent()) + return true; + return !(c + .getClassid() + .equalsIgnoreCase(country) && + c.getDataInfo().getInferenceprovenance().equals("propagation")); + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java index b89b459dff..598fccdd75 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java @@ -31,76 +31,76 @@ import scala.Tuple2; * @Date 22/07/22 */ public class GetDatasourceFromCountry implements Serializable { - private static final Logger log = LoggerFactory.getLogger(GetDatasourceFromCountry.class); + private static final Logger log = LoggerFactory.getLogger(GetDatasourceFromCountry.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - GetDatasourceFromCountry.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/input_datasource_country_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + String jsonConfiguration = IOUtils + .toString( + GetDatasourceFromCountry.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/input_datasource_country_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - String inputPath = parser.get("inputPath"); - log.info("inputPath: {}", inputPath); + String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); - String workingPath = parser.get("workingDir"); - log.info("workingDir: {}", workingPath); + String workingPath = parser.get("workingDir"); + log.info("workingDir: {}", workingPath); - String country = parser.get("country"); - log.info("country: {}", country); + String country = parser.get("country"); + log.info("country: {}", country); - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - getDatasourceFromCountry(spark, country, inputPath, workingPath); - }); - } + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + getDatasourceFromCountry(spark, country, inputPath, workingPath); + }); + } - private static void getDatasourceFromCountry(SparkSession spark, String country, String inputPath, - String workingDir) { + private static void getDatasourceFromCountry(SparkSession spark, String country, String inputPath, + String workingDir) { - Dataset organization = spark - .read() - .textFile(inputPath + "/organization") - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, Organization.class), - Encoders.bean(Organization.class)) - .filter( - (FilterFunction) o -> !o.getDataInfo().getDeletedbyinference() && - o.getCountry().getClassid().length() > 0 && - o.getCountry().getClassid().equals(country)); + Dataset organization = spark + .read() + .textFile(inputPath + "/organization") + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, Organization.class), + Encoders.bean(Organization.class)) + .filter( + (FilterFunction) o -> !o.getDataInfo().getDeletedbyinference() && + o.getCountry().getClassid().length() > 0 && + o.getCountry().getClassid().equals(country)); - // filtering of the relations taking the non deleted by inference and those with IsProvidedBy as relclass - Dataset relation = spark - .read() - .textFile(inputPath + "/relation") - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, Relation.class), - Encoders.bean(Relation.class)) - .filter( - (FilterFunction) rel -> rel.getRelClass().equalsIgnoreCase(ModelConstants.IS_PROVIDED_BY) && - !rel.getDataInfo().getDeletedbyinference()); + // filtering of the relations taking the non deleted by inference and those with IsProvidedBy as relclass + Dataset relation = spark + .read() + .textFile(inputPath + "/relation") + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, Relation.class), + Encoders.bean(Relation.class)) + .filter( + (FilterFunction) rel -> rel.getRelClass().equalsIgnoreCase(ModelConstants.IS_PROVIDED_BY) && + !rel.getDataInfo().getDeletedbyinference()); - organization - .joinWith(relation, organization.col("id").equalTo(relation.col("target"))) - .map((MapFunction, String>) t2 -> t2._2().getSource(), Encoders.STRING()) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingDir); + organization + .joinWith(relation, organization.col("id").equalTo(relation.col("target"))) + .map((MapFunction, String>) t2 -> t2._2().getSource(), Encoders.STRING()) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir); - } + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java index d1c186308c..3bc69cfd1f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java @@ -5,7 +5,6 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import eu.dnetlib.dhp.schema.oaf.Dataset; import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -27,165 +26,165 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob; +import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.Publication; public class CleanCountryTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static SparkSession spark; + private static SparkSession spark; - private static Path workingDir; + private static Path workingDir; - private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class); + private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class); - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files.createTempDirectory(CleanCountryTest.class.getSimpleName()); - log.info("using work dir {}", workingDir); + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(CleanCountryTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); - SparkConf conf = new SparkConf(); - conf.setAppName(CleanCountryTest.class.getSimpleName()); + SparkConf conf = new SparkConf(); + conf.setAppName(CleanCountryTest.class.getSimpleName()); - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.toString()); - conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); - spark = SparkSession - .builder() - .appName(CleanCountryTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } + spark = SparkSession + .builder() + .appName(CleanCountryTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - spark.stop(); - } + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } - @Test - public void testResultClean() throws Exception { - final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_country.json") - .getPath(); + @Test + public void testResultClean() throws Exception { + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_country.json") + .getPath(); - spark - .read() - .textFile(sourcePath) - .map( - (MapFunction) r -> OBJECT_MAPPER.readValue(r, Publication.class), - Encoders.bean(Publication.class)) - .write() - .json(workingDir.toString() + "/publication"); + spark + .read() + .textFile(sourcePath) + .map( + (MapFunction) r -> OBJECT_MAPPER.readValue(r, Publication.class), + Encoders.bean(Publication.class)) + .write() + .json(workingDir.toString() + "/publication"); - CleanCountrySparkJob.main(new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--inputPath", workingDir.toString() + "/publication", - "--graphTableClassName", Publication.class.getCanonicalName(), - "--workingDir", workingDir.toString() + "/working", - "--country", "NL", - "--verifyParam", "10.17632", - "--collectedfrom", "NARCIS", - "--hostedBy", getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy") - .getPath() - }); + CleanCountrySparkJob.main(new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--inputPath", workingDir.toString() + "/publication", + "--graphTableClassName", Publication.class.getCanonicalName(), + "--workingDir", workingDir.toString() + "/working", + "--country", "NL", + "--verifyParam", "10.17632", + "--collectedfrom", "NARCIS", + "--hostedBy", getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy") + .getPath() + }); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD tmp = sc - .textFile(workingDir.toString() + "/publication") - .map(item -> OBJECT_MAPPER.readValue(item, Publication.class)); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/publication") + .map(item -> OBJECT_MAPPER.readValue(item, Publication.class)); - Assertions.assertEquals(8, tmp.count()); + Assertions.assertEquals(8, tmp.count()); - // original result with NL country and doi starting with Mendely prefix, but not collectedfrom NARCIS - Assertions - .assertEquals( - 1, - tmp - .filter(p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95")) - .collect() - .get(0) - .getCountry() - .size()); + // original result with NL country and doi starting with Mendely prefix, but not collectedfrom NARCIS + Assertions + .assertEquals( + 1, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95")) + .collect() + .get(0) + .getCountry() + .size()); - // original result with NL country and pid not starting with Mendely prefix - Assertions - .assertEquals( - 1, - tmp - .filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")) - .collect() - .get(0) - .getCountry() - .size()); + // original result with NL country and pid not starting with Mendely prefix + Assertions + .assertEquals( + 1, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")) + .collect() + .get(0) + .getCountry() + .size()); - // original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS but not - // inserted with propagation - Assertions - .assertEquals( - 1, - tmp - .filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")) - .collect() - .get(0) - .getCountry() - .size()); + // original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS but not + // inserted with propagation + Assertions + .assertEquals( + 1, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")) + .collect() + .get(0) + .getCountry() + .size()); - // original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS inserted with - // propagation - Assertions - .assertEquals( - 0, - tmp - .filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6ag")) - .collect() - .get(0) - .getCountry() - .size()); - } + // original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS inserted with + // propagation + Assertions + .assertEquals( + 0, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6ag")) + .collect() + .get(0) + .getCountry() + .size()); + } - @Test - public void testDatasetClean() throws Exception { - final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/clean/dataset_clean_country.json") - .getPath(); + @Test + public void testDatasetClean() throws Exception { + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/clean/dataset_clean_country.json") + .getPath(); - spark - .read() - .textFile(sourcePath) - .map( - (MapFunction) r -> OBJECT_MAPPER.readValue(r, Dataset.class), - Encoders.bean(Dataset.class)) - .write() - .json(workingDir.toString() + "/dataset"); + spark + .read() + .textFile(sourcePath) + .map( + (MapFunction) r -> OBJECT_MAPPER.readValue(r, Dataset.class), + Encoders.bean(Dataset.class)) + .write() + .json(workingDir.toString() + "/dataset"); - CleanCountrySparkJob.main(new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--inputPath", workingDir.toString() + "/dataset", - "-graphTableClassName", Dataset.class.getCanonicalName(), - "-workingDir", workingDir.toString() + "/working", - "-country", "NL", - "-verifyParam", "10.17632", - "-collectedfrom", "NARCIS", - "-hostedBy", getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy") - .getPath() - }); + CleanCountrySparkJob.main(new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--inputPath", workingDir.toString() + "/dataset", + "-graphTableClassName", Dataset.class.getCanonicalName(), + "-workingDir", workingDir.toString() + "/working", + "-country", "NL", + "-verifyParam", "10.17632", + "-collectedfrom", "NARCIS", + "-hostedBy", getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy") + .getPath() + }); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD tmp = sc - .textFile(workingDir.toString() + "/dataset") - .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/dataset") + .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); - Assertions.assertEquals(1, tmp.count()); + Assertions.assertEquals(1, tmp.count()); - Assertions.assertEquals(0, tmp.first().getCountry().size()); + Assertions.assertEquals(0, tmp.first().getCountry().size()); - - } + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index ed6179cb55..4c488a8947 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -12,7 +12,6 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; -import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.dom4j.DocumentException; @@ -22,6 +21,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; @@ -995,7 +995,7 @@ class MappersTest { @Test void testEoscFutureHackZenodo() throws IOException { final String xml = IOUtils - .toString(Objects.requireNonNull(getClass().getResourceAsStream("zenodo7351221.xml"))); + .toString(Objects.requireNonNull(getClass().getResourceAsStream("zenodo7351221.xml"))); final List actual = new OdfToOafMapper(vocs, false, true).processMdRecord(xml); actual.forEach(a -> {