diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/ZenodoAPIClient.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/ZenodoAPIClient.java index 56366bc725..63f890a102 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/ZenodoAPIClient.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/ZenodoAPIClient.java @@ -255,7 +255,8 @@ public class ZenodoAPIClient implements Serializable { private void setDepositionId(String concept_rec_id, Integer page) throws IOException, MissingConceptDoiException { - ZenodoModelList zenodoModelList = new Gson().fromJson(getPrevDepositions(String.valueOf(page)), ZenodoModelList.class); + ZenodoModelList zenodoModelList = new Gson() + .fromJson(getPrevDepositions(String.valueOf(page)), ZenodoModelList.class); for (ZenodoModel zm : zenodoModelList) { if (zm.getConceptrecid().equals(concept_rec_id)) { @@ -263,8 +264,9 @@ public class ZenodoAPIClient implements Serializable { return; } } - if(zenodoModelList.size() == 0) - throw new MissingConceptDoiException("The concept record id specified was missing in the list of depositions"); + if (zenodoModelList.size() == 0) + throw new MissingConceptDoiException( + "The concept record id specified was missing in the list of depositions"); setDepositionId(concept_rec_id, page + 1); } @@ -278,11 +280,11 @@ public class ZenodoAPIClient implements Serializable { String url = urlBuilder.build().toString(); Request request = new Request.Builder() - .url(url) - .addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()) // add request headers - .addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + access_token) - .get() - .build(); + .url(url) + .addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()) // add request headers + .addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + access_token) + .get() + .build(); try (Response response = httpClient.newCall(request).execute()) { @@ -295,7 +297,6 @@ public class ZenodoAPIClient implements Serializable { } - private String getBucket(String url) throws IOException { OkHttpClient httpClient = new OkHttpClient.Builder() .connectTimeout(600, TimeUnit.SECONDS) diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java index fea7b53ac5..7fc80d1681 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java @@ -142,7 +142,8 @@ class TransformationJobTest extends AbstractVocabularyTest { @Test @DisplayName("Test TransformSparkJobNode.main with oaiOpenaire_datacite (v4)") - void transformTestITGv4OAIdatacite(@TempDir final Path testDir) throws Exception { + void transformTestITGv4OAIdatacite(@TempDir + final Path testDir) throws Exception { try (SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate()) { @@ -152,7 +153,9 @@ class TransformationJobTest extends AbstractVocabularyTest { .getFile(); final String mdstore_output = testDir.toString() + "/version"; - mockupTrasformationRule("simpleTRule", "/eu/dnetlib/dhp/transform/scripts/xslt_cleaning_oaiOpenaire_datacite_ExchangeLandingpagePid.xsl"); + mockupTrasformationRule( + "simpleTRule", + "/eu/dnetlib/dhp/transform/scripts/xslt_cleaning_oaiOpenaire_datacite_ExchangeLandingpagePid.xsl"); final Map parameters = Stream.of(new String[][] { { @@ -203,7 +206,8 @@ class TransformationJobTest extends AbstractVocabularyTest { @Test @DisplayName("Test TransformSparkJobNode.main") - void transformTest(@TempDir final Path testDir) throws Exception { + void transformTest(@TempDir + final Path testDir) throws Exception { try (SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate()) { diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index ad1e8721ac..982e8ea750 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -103,7 +103,7 @@ public class SparkBulkTagJob { ResultTagger resultTagger = new ResultTagger(); readPath(spark, inputPath, resultClazz) .map(patchResult(), Encoders.bean(resultClazz)) - .filter(Objects::nonNull) + .filter(Objects::nonNull) .map( (MapFunction) value -> resultTagger .enrichContextCriteria( diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java index cfe47f29f9..34ca6697c4 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java @@ -1,10 +1,14 @@ + package eu.dnetlib.dhp.oa.graph.dump.funderresults; + import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + import java.io.Serializable; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; + import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; @@ -14,29 +18,32 @@ import org.apache.spark.sql.*; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; import eu.dnetlib.dhp.schema.dump.oaf.community.Funder; import eu.dnetlib.dhp.schema.dump.oaf.community.Project; + /** * Splits the dumped results by funder and stores them in a folder named as the funder nsp (for all the funders, but the EC * for the EC it specifies also the fundingStream (FP7 or H2020) */ public class SparkDumpFunderResults implements Serializable { private static final Logger log = LoggerFactory.getLogger(SparkDumpFunderResults.class); + public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils - .toString( - SparkDumpFunderResults.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/funder_result_parameters.json")); + .toString( + SparkDumpFunderResults.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/funder_result_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); final String inputPath = parser.get("sourcePath"); log.info("inputPath: {}", inputPath); @@ -44,30 +51,32 @@ public class SparkDumpFunderResults implements Serializable { log.info("outputPath: {}", outputPath); SparkConf conf = new SparkConf(); runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - writeResultProjectList(spark, inputPath, outputPath); - }); + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + writeResultProjectList(spark, inputPath, outputPath); + }); } + private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath) { Dataset result = Utils - .readPath(spark, inputPath + "/publication", CommunityResult.class) - .union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class)) - .union(Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class)) - .union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class)); + .readPath(spark, inputPath + "/publication", CommunityResult.class) + .union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class)) + .union(Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class)) + .union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class)); log.info("Number of result {}", result.count()); Dataset tmp = result - .flatMap((FlatMapFunction) cr -> cr.getProjects().stream().map(p -> { - return getFunderName(p); - }).collect(Collectors.toList()).iterator(), Encoders.STRING()) - .distinct(); + .flatMap((FlatMapFunction) cr -> cr.getProjects().stream().map(p -> { + return getFunderName(p); + }).collect(Collectors.toList()).iterator(), Encoders.STRING()) + .distinct(); List funderList = tmp.collectAsList(); funderList.forEach(funder -> { dumpResults(funder, result, outputPath); }); } + @NotNull private static String getFunderName(Project p) { Optional ofunder = Optional.ofNullable(p.getFunder()); @@ -97,23 +106,24 @@ public class SparkDumpFunderResults implements Serializable { return fName; } } + private static void dumpResults(String funder, Dataset results, String outputPath) { results.map((MapFunction) r -> { - if (!Optional.ofNullable(r.getProjects()).isPresent()) { - return null; - } - for (Project p : r.getProjects()) { - String fName = getFunderName(p); - if (fName.equalsIgnoreCase(funder)) { - return r; - } - } - return null; - }, Encoders.bean(CommunityResult.class)) - .filter(Objects::nonNull) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "/" + funder); + if (!Optional.ofNullable(r.getProjects()).isPresent()) { + return null; + } + for (Project p : r.getProjects()) { + String fName = getFunderName(p); + if (fName.equalsIgnoreCase(funder)) { + return r; + } + } + return null; + }, Encoders.bean(CommunityResult.class)) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/" + funder); } -} \ No newline at end of file +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java index 5eed64e892..4b00729448 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java @@ -45,18 +45,18 @@ public class SparkResultLinkedToProject implements Serializable { public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils - .toString( - SparkResultLinkedToProject.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/input_parameters_link_prj.json")); + .toString( + SparkResultLinkedToProject.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/input_parameters_link_prj.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); final String inputPath = parser.get("sourcePath"); @@ -78,41 +78,41 @@ public class SparkResultLinkedToProject implements Serializable { SparkConf conf = new SparkConf(); runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - writeResultsLinkedToProjects( - communityMapPath, spark, inputClazz, inputPath, outputPath, resultProjectsPath); - }); + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + writeResultsLinkedToProjects( + communityMapPath, spark, inputClazz, inputPath, outputPath, resultProjectsPath); + }); } private static void writeResultsLinkedToProjects(String communityMapPath, SparkSession spark, - Class inputClazz, - String inputPath, String outputPath, String resultProjectsPath) { + Class inputClazz, + String inputPath, String outputPath, String resultProjectsPath) { Dataset results = Utils - .readPath(spark, inputPath, inputClazz) - .filter( - (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && - !r.getDataInfo().getInvisible()); + .readPath(spark, inputPath, inputClazz) + .filter( + (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && + !r.getDataInfo().getInvisible()); Dataset resultProjectDataset = Utils - .readPath(spark, resultProjectsPath, ResultProject.class); + .readPath(spark, resultProjectsPath, ResultProject.class); CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath); results - .joinWith(resultProjectDataset, results.col("id").equalTo(resultProjectDataset.col("resultId"))) - .map((MapFunction, CommunityResult>) t2 -> { - CommunityResult cr = (CommunityResult) ResultMapper - .map( - t2._1(), - communityMap, Constants.DUMPTYPE.FUNDER.getType()); - cr.setProjects(t2._2().getProjectsList()); - return cr; - }, Encoders.bean(CommunityResult.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); + .joinWith(resultProjectDataset, results.col("id").equalTo(resultProjectDataset.col("resultId"))) + .map((MapFunction, CommunityResult>) t2 -> { + CommunityResult cr = (CommunityResult) ResultMapper + .map( + t2._1(), + communityMap, Constants.DUMPTYPE.FUNDER.getType()); + cr.setProjects(t2._2().getProjectsList()); + return cr; + }, Encoders.bean(CommunityResult.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java index 83a9f34645..38eb3cb370 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java @@ -2,9 +2,11 @@ package eu.dnetlib.dhp.oa.graph.dump.projectssubset; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + import java.io.Serializable; import java.util.Objects; import java.util.Optional; + import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -14,6 +16,7 @@ import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.schema.dump.oaf.graph.Project; diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/PrepareResultProjectJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/PrepareResultProjectJobTest.java index c6798ac581..f3b7246b1a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/PrepareResultProjectJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/PrepareResultProjectJobTest.java @@ -35,12 +35,12 @@ public class PrepareResultProjectJobTest { private static Path workingDir; private static final Logger log = LoggerFactory - .getLogger(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class); + .getLogger(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class); @BeforeAll public static void beforeAll() throws IOException { workingDir = Files - .createTempDirectory(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class.getSimpleName()); + .createTempDirectory(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class.getSimpleName()); log.info("using work dir {}", workingDir); SparkConf conf = new SparkConf(); @@ -54,10 +54,10 @@ public class PrepareResultProjectJobTest { conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); spark = SparkSession - .builder() - .appName(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); + .builder() + .appName(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); } @AfterAll @@ -70,23 +70,23 @@ public class PrepareResultProjectJobTest { void testNoMatch() throws Exception { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/no_match") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/no_match") + .getPath(); SparkPrepareResultProject.main(new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-outputPath", workingDir.toString() + "/preparedInfo", - "-sourcePath", sourcePath + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/preparedInfo", + "-sourcePath", sourcePath }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/preparedInfo") - .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); + .textFile(workingDir.toString() + "/preparedInfo") + .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); + .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); assertEquals(0, verificationDataset.count()); @@ -96,37 +96,37 @@ public class PrepareResultProjectJobTest { void testMatchOne() throws Exception { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match_one") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match_one") + .getPath(); SparkPrepareResultProject.main(new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-outputPath", workingDir.toString() + "/preparedInfo", - "-sourcePath", sourcePath + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/preparedInfo", + "-sourcePath", sourcePath }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/preparedInfo") - .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); + .textFile(workingDir.toString() + "/preparedInfo") + .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); + .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); assertEquals(1, verificationDataset.count()); assertEquals( - 1, - verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count()); + 1, + verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count()); verificationDataset.createOrReplaceTempView("table"); Dataset check = spark - .sql( - "Select projList.provenance.provenance " + - "from table " + - "lateral view explode (projectsList) pl as projList"); + .sql( + "Select projList.provenance.provenance " + + "from table " + + "lateral view explode (projectsList) pl as projList"); assertEquals(1, check.filter("provenance = 'sysimport:crosswalk:entityregistry'").count()); @@ -138,88 +138,88 @@ public class PrepareResultProjectJobTest { void testMatch() throws Exception { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match") + .getPath(); SparkPrepareResultProject.main(new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-outputPath", workingDir.toString() + "/preparedInfo", - "-sourcePath", sourcePath + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/preparedInfo", + "-sourcePath", sourcePath }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/preparedInfo") - .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); + .textFile(workingDir.toString() + "/preparedInfo") + .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); + .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); assertEquals(2, verificationDataset.count()); assertEquals( - 1, - verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count()); + 1, + verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count()); assertEquals( - 1, - verificationDataset.filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'").count()); + 1, + verificationDataset.filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'").count()); verificationDataset.createOrReplaceTempView("dataset"); String query = "select resultId, MyT.id project , MyT.title title, MyT.acronym acronym , MyT.provenance.provenance provenance " - + "from dataset " - + "lateral view explode(projectsList) p as MyT "; + + "from dataset " + + "lateral view explode(projectsList) p as MyT "; org.apache.spark.sql.Dataset resultExplodedProvenance = spark.sql(query); assertEquals(3, resultExplodedProvenance.count()); assertEquals( - 2, - resultExplodedProvenance - .filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") - .count()); + 2, + resultExplodedProvenance + .filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") + .count()); assertEquals( - 1, - resultExplodedProvenance - .filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'") - .count()); + 1, + resultExplodedProvenance + .filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'") + .count()); assertEquals( - 2, - resultExplodedProvenance - .filter("project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6'") - .count()); + 2, + resultExplodedProvenance + .filter("project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6'") + .count()); assertEquals( - 1, - resultExplodedProvenance - .filter( - "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") - .count()); + 1, + resultExplodedProvenance + .filter( + "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") + .count()); assertEquals( - 1, - resultExplodedProvenance - .filter( - "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' and resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'") - .count()); + 1, + resultExplodedProvenance + .filter( + "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' and resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'") + .count()); assertEquals( - 1, - resultExplodedProvenance - .filter("project = '40|aka_________::03376222b28a3aebf2730ac514818d04'") - .count()); + 1, + resultExplodedProvenance + .filter("project = '40|aka_________::03376222b28a3aebf2730ac514818d04'") + .count()); assertEquals( - 1, - resultExplodedProvenance - .filter( - "project = '40|aka_________::03376222b28a3aebf2730ac514818d04' and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") - .count()); + 1, + resultExplodedProvenance + .filter( + "project = '40|aka_________::03376222b28a3aebf2730ac514818d04' and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") + .count()); assertEquals( - 3, resultExplodedProvenance.filter("provenance = 'sysimport:crosswalk:entityregistry'").count()); + 3, resultExplodedProvenance.filter("provenance = 'sysimport:crosswalk:entityregistry'").count()); } @@ -227,98 +227,98 @@ public class PrepareResultProjectJobTest { public void testMatchValidated() throws Exception { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match_validatedRels") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match_validatedRels") + .getPath(); SparkPrepareResultProject.main(new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-outputPath", workingDir.toString() + "/preparedInfo", - "-sourcePath", sourcePath + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/preparedInfo", + "-sourcePath", sourcePath }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/preparedInfo") - .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); + .textFile(workingDir.toString() + "/preparedInfo") + .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); + .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); assertEquals(2, verificationDataset.count()); assertEquals( - 1, - verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count()); + 1, + verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count()); assertEquals( - 1, - verificationDataset.filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'").count()); + 1, + verificationDataset.filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'").count()); verificationDataset.createOrReplaceTempView("dataset"); String query = "select resultId, MyT.id project , MyT.title title, MyT.acronym acronym , MyT.provenance.provenance provenance, " - + - "MyT.validated.validatedByFunder, MyT.validated.validationDate " - + "from dataset " - + "lateral view explode(projectsList) p as MyT "; + + + "MyT.validated.validatedByFunder, MyT.validated.validationDate " + + "from dataset " + + "lateral view explode(projectsList) p as MyT "; org.apache.spark.sql.Dataset resultExplodedProvenance = spark.sql(query); assertEquals(3, resultExplodedProvenance.count()); assertEquals(3, resultExplodedProvenance.filter("validatedByFunder = true").count()); assertEquals( - 2, - resultExplodedProvenance - .filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") - .count()); + 2, + resultExplodedProvenance + .filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") + .count()); assertEquals( - 1, - resultExplodedProvenance - .filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'") - .count()); + 1, + resultExplodedProvenance + .filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'") + .count()); assertEquals( - 2, - resultExplodedProvenance - .filter("project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6'") - .count()); + 2, + resultExplodedProvenance + .filter("project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6'") + .count()); assertEquals( - 1, - resultExplodedProvenance - .filter( - "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' " + - "and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb' " + - "and validatedByFunder = true " + - "and validationDate = '2021-08-06'") - .count()); + 1, + resultExplodedProvenance + .filter( + "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' " + + "and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb' " + + "and validatedByFunder = true " + + "and validationDate = '2021-08-06'") + .count()); assertEquals( - 1, - resultExplodedProvenance - .filter( - "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' " + - "and resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80' " + - "and validatedByFunder = true and validationDate = '2021-08-04'") - .count()); + 1, + resultExplodedProvenance + .filter( + "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' " + + "and resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80' " + + "and validatedByFunder = true and validationDate = '2021-08-04'") + .count()); assertEquals( - 1, - resultExplodedProvenance - .filter("project = '40|aka_________::03376222b28a3aebf2730ac514818d04'") - .count()); + 1, + resultExplodedProvenance + .filter("project = '40|aka_________::03376222b28a3aebf2730ac514818d04'") + .count()); assertEquals( - 1, - resultExplodedProvenance - .filter( - "project = '40|aka_________::03376222b28a3aebf2730ac514818d04' " + - "and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb' " + - "and validatedByFunder = true and validationDate = '2021-08-05'") - .count()); + 1, + resultExplodedProvenance + .filter( + "project = '40|aka_________::03376222b28a3aebf2730ac514818d04' " + + "and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb' " + + "and validatedByFunder = true and validationDate = '2021-08-05'") + .count()); assertEquals( - 3, resultExplodedProvenance.filter("provenance = 'sysimport:crosswalk:entityregistry'").count()); + 3, resultExplodedProvenance.filter("provenance = 'sysimport:crosswalk:entityregistry'").count()); } @@ -326,20 +326,20 @@ public class PrepareResultProjectJobTest { void testMatchx() throws Exception { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/match") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/match") + .getPath(); SparkPrepareResultProject.main(new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-outputPath", workingDir.toString() + "/preparedInfo", - "-sourcePath", sourcePath + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/preparedInfo", + "-sourcePath", sourcePath }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/preparedInfo") - .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); + .textFile(workingDir.toString() + "/preparedInfo") + .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/ResultLinkedToProjectTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/ResultLinkedToProjectTest.java index 579958a624..7046b02c22 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/ResultLinkedToProjectTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/ResultLinkedToProjectTest.java @@ -35,15 +35,15 @@ public class ResultLinkedToProjectTest { private static Path workingDir; private static final Logger log = LoggerFactory - .getLogger(eu.dnetlib.dhp.oa.graph.dump.funderresult.ResultLinkedToProjectTest.class); + .getLogger(eu.dnetlib.dhp.oa.graph.dump.funderresult.ResultLinkedToProjectTest.class); private static final HashMap map = new HashMap<>(); @BeforeAll public static void beforeAll() throws IOException { workingDir = Files - .createTempDirectory( - eu.dnetlib.dhp.oa.graph.dump.funderresult.ResultLinkedToProjectTest.class.getSimpleName()); + .createTempDirectory( + eu.dnetlib.dhp.oa.graph.dump.funderresult.ResultLinkedToProjectTest.class.getSimpleName()); log.info("using work dir {}", workingDir); SparkConf conf = new SparkConf(); @@ -57,10 +57,10 @@ public class ResultLinkedToProjectTest { conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); spark = SparkSession - .builder() - .appName(eu.dnetlib.dhp.oa.graph.dump.funderresult.ResultLinkedToProjectTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); + .builder() + .appName(eu.dnetlib.dhp.oa.graph.dump.funderresult.ResultLinkedToProjectTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); } @AfterAll @@ -73,32 +73,32 @@ public class ResultLinkedToProjectTest { void testNoMatch() throws Exception { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/nomatch/papers.json") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/nomatch/papers.json") + .getPath(); final String graphPath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/preparedInfo") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/preparedInfo") + .getPath(); final String communityMapPath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/communityMapPath") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/communityMapPath") + .getPath(); SparkResultLinkedToProject.main(new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-outputPath", workingDir.toString() + "/preparedInfo", - "-sourcePath", sourcePath, - "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication", - "-graphPath", graphPath, - "-communityMapPath", communityMapPath + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/preparedInfo", + "-sourcePath", sourcePath, + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication", + "-graphPath", graphPath, + "-communityMapPath", communityMapPath }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/preparedInfo") - .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); + .textFile(workingDir.toString() + "/preparedInfo") + .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); Assertions.assertEquals(0, tmp.count()); @@ -108,32 +108,32 @@ public class ResultLinkedToProjectTest { void testMatchOne() throws Exception { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/match/papers.json") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/match/papers.json") + .getPath(); final String graphPath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/preparedInfo") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/preparedInfo") + .getPath(); final String communityMapPath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/communityMapPath") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/communityMapPath") + .getPath(); SparkResultLinkedToProject.main(new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-outputPath", workingDir.toString() + "/preparedInfo", - "-sourcePath", sourcePath, - "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication", - "-graphPath", graphPath, - "-communityMapPath", communityMapPath + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/preparedInfo", + "-sourcePath", sourcePath, + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication", + "-graphPath", graphPath, + "-communityMapPath", communityMapPath }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/preparedInfo") - .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); + .textFile(workingDir.toString() + "/preparedInfo") + .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); Assertions.assertEquals(1, tmp.count()); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/SplitPerFunderTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/SplitPerFunderTest.java index 5bb5a11789..e9d739db28 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/SplitPerFunderTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/SplitPerFunderTest.java @@ -56,10 +56,10 @@ public class SplitPerFunderTest { conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); spark = SparkSession - .builder() - .appName(SplitPerFunderTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); + .builder() + .appName(SplitPerFunderTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); } @AfterAll @@ -72,13 +72,13 @@ public class SplitPerFunderTest { void test1() throws Exception { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/ext") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/ext") + .getPath(); SparkDumpFunderResults.main(new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-outputPath", workingDir.toString() + "/split", - "-sourcePath", sourcePath + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/split", + "-sourcePath", sourcePath }); @@ -86,64 +86,64 @@ public class SplitPerFunderTest { // FP7 3 and H2020 3 JavaRDD tmp = sc - .textFile(workingDir.toString() + "/split/EC_FP7") - .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); + .textFile(workingDir.toString() + "/split/EC_FP7") + .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(CommunityResult.class)); + .createDataset(tmp.rdd(), Encoders.bean(CommunityResult.class)); Assertions.assertEquals(3, verificationDataset.count()); Assertions - .assertEquals( - 1, verificationDataset.filter("id = '50|dedup_wf_001::0d16b1714ab3077df73893a8ea57d776'").count()); + .assertEquals( + 1, verificationDataset.filter("id = '50|dedup_wf_001::0d16b1714ab3077df73893a8ea57d776'").count()); // CIHR 2 tmp = sc - .textFile(workingDir.toString() + "/split/CIHR") - .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); + .textFile(workingDir.toString() + "/split/CIHR") + .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); Assertions.assertEquals(2, tmp.count()); // NWO 1 tmp = sc - .textFile(workingDir.toString() + "/split/NWO") - .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); + .textFile(workingDir.toString() + "/split/NWO") + .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); Assertions.assertEquals(1, tmp.count()); // NIH 3 tmp = sc - .textFile(workingDir.toString() + "/split/NIH") - .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); + .textFile(workingDir.toString() + "/split/NIH") + .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); Assertions.assertEquals(2, tmp.count()); // NSF 1 tmp = sc - .textFile(workingDir.toString() + "/split/NSF") - .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); + .textFile(workingDir.toString() + "/split/NSF") + .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); Assertions.assertEquals(1, tmp.count()); // SNSF 1 tmp = sc - .textFile(workingDir.toString() + "/split/SNSF") - .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); + .textFile(workingDir.toString() + "/split/SNSF") + .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); Assertions.assertEquals(1, tmp.count()); // NHMRC 1 tmp = sc - .textFile(workingDir.toString() + "/split/NHMRC") - .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); + .textFile(workingDir.toString() + "/split/NHMRC") + .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); Assertions.assertEquals(1, tmp.count()); // H2020 3 tmp = sc - .textFile(workingDir.toString() + "/split/EC_H2020") - .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); + .textFile(workingDir.toString() + "/split/EC_H2020") + .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); Assertions.assertEquals(3, tmp.count()); // MZOS 1 tmp = sc - .textFile(workingDir.toString() + "/split/MZOS") - .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); + .textFile(workingDir.toString() + "/split/MZOS") + .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); Assertions.assertEquals(1, tmp.count()); } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectSubsetTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectSubsetTest.java index 4ff1e1b968..8cdb285e4b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectSubsetTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectSubsetTest.java @@ -30,7 +30,6 @@ public class ProjectSubsetTest { private static final Logger log = LoggerFactory .getLogger(eu.dnetlib.dhp.oa.graph.dump.projectssubset.ProjectSubsetTest.class); - @BeforeAll public static void beforeAll() throws IOException { workingDir = Files