From 7423577a082318d6d7648945243e237c9f847029 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 21 Jun 2022 14:51:38 +0200 Subject: [PATCH 1/4] [Graph DUMP] add code to produce the delta of new projects with respect to the previous delta/dump --- .../ProjectsSubsetSparkJob.java | 80 ++++++++ .../graph/dump/project_subset_parameters.json | 27 +++ .../oozie_app/config-default.xml | 30 +++ .../dump/projectsubset/oozie_app/workflow.xml | 171 ++++++++++++++++++ 4 files changed, 308 insertions(+) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/project_subset_parameters.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java new file mode 100644 index 000000000..67da24185 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java @@ -0,0 +1,80 @@ +package eu.dnetlib.dhp.oa.graph.dump.projectssubset; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.io.Serializable; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.oaf.community.Funder; +import eu.dnetlib.dhp.schema.dump.oaf.graph.Project; +import scala.Tuple2; +public class ProjectsSubsetSparkJob implements Serializable { + private static final Logger log = LoggerFactory.getLogger(ProjectsSubsetSparkJob.class); + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + ProjectsSubsetSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/project_subset_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + final String projectListPath = parser.get("projectListPath"); + log.info("projectListPath: {}", projectListPath); + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + getNewProjectList(spark, inputPath, outputPath, projectListPath); + }); + } + private static void getNewProjectList(SparkSession spark, String inputPath, String outputPath, + String projectListPath) { + Dataset projectList = spark.read().textFile(projectListPath); + // projectList.show(false); + Dataset projects; + projects = Utils.readPath(spark, inputPath, Project.class); + projects + .joinWith(projectList, projects.col("id").equalTo(projectList.col("value")), "left") + .map((MapFunction, Project>) t2 -> { + if (Optional.ofNullable(t2._2()).isPresent()) + return null; + return t2._1(); + }, Encoders.bean(Project.class)) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + Utils + .readPath(spark, outputPath, Project.class) + .map((MapFunction) p -> p.getId(), Encoders.STRING()) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .text(projectListPath); + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/project_subset_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/project_subset_parameters.json new file mode 100644 index 000000000..ed2313680 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/project_subset_parameters.json @@ -0,0 +1,27 @@ +[ + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + }, + { + "paramName": "ssm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "true if the spark session is managed, false otherwise", + "paramRequired": false + }, + { + "paramName": "pl", + "paramLongName": "projectListPath", + "paramDescription": "the path of the association result projectlist", + "paramRequired": true + } +] + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/oozie_app/config-default.xml new file mode 100644 index 000000000..d262cb6e0 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/oozie_app/config-default.xml @@ -0,0 +1,30 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + + hiveDbName + openaire + + + oozie.launcher.mapreduce.user.classpath.first + true + + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/oozie_app/workflow.xml new file mode 100644 index 000000000..619e3aa20 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/oozie_app/workflow.xml @@ -0,0 +1,171 @@ + + + + sourcePath + the source path + + + projectListPath + the path to the project list + + + outputPath + the output path + + + accessToken + the access token used for the deposition in Zenodo + + + connectionUrl + the connection url for Zenodo + + + metadata + the metadata associated to the deposition + + + depositionType + the type of deposition we want to perform. "new" for brand new deposition, "version" for a new version of a published deposition (in this case the concept record id must be provided), "upload" to upload content to an open deposition for which we already have the deposition id (in this case the deposition id should be provided) + + + conceptRecordId + for new version, the id of the record for the old deposition + + + depositionId + the depositionId of a deposition open that has to be added content + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + yarn + cluster + Dump table project + eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpEntitiesJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath}/project + --resultTableNameeu.dnetlib.dhp.schema.oaf.Project + --outputPath${workingDir}/project + --communityMapPathnoneed + + + + + + + yarn + cluster + Dump table project + eu.dnetlib.dhp.oa.graph.dump.projectssubset.ProjectsSubsetSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${workingDir}/project + --outputPath${workingDir}/tar/project + --projectListPath${projectListPath} + + + + + + + eu.dnetlib.dhp.oa.graph.dump.MakeTar + --hdfsPath${outputPath} + --nameNode${nameNode} + --sourcePath${workingDir}/tar + + + + + + + eu.dnetlib.dhp.oa.graph.dump.SendToZenodoHDFS + --hdfsPath${outputPath} + --nameNode${nameNode} + --accessToken${accessToken} + --connectionUrl${connectionUrl} + --metadata${metadata} + --conceptRecordId${conceptRecordId} + --depositionType${depositionType} + --depositionId${depositionId} + + + + + + \ No newline at end of file From edddfc6c637d601823681c190305aa1105ee5834 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 21 Jun 2022 18:28:53 +0200 Subject: [PATCH 2/4] [DUMP DELTA PROJECTS] adding test and resource --- .../projectssubset/ProjectSubsetTest.java | 118 ++++++++++++++++++ .../graph/dump/projectsubset/allnew/projects | 12 ++ .../dump/projectsubset/matchOne/projects | 12 ++ .../dhp/oa/graph/dump/projectsubset/projectId | 28 +++++ 4 files changed, 170 insertions(+) create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectSubsetTest.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/allnew/projects create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/matchOne/projects create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/projectId diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectSubsetTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectSubsetTest.java new file mode 100644 index 000000000..d5b1ce776 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectSubsetTest.java @@ -0,0 +1,118 @@ +package eu.dnetlib.dhp.oa.graph.dump.projectssubset; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkResultLinkedToProject; +import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; +import eu.dnetlib.dhp.schema.dump.oaf.graph.Project; +public class ProjectSubsetTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static SparkSession spark; + private static Path workingDir; + private static final Logger log = LoggerFactory + .getLogger(eu.dnetlib.dhp.oa.graph.dump.projectssubset.ProjectSubsetTest.class); + private static final HashMap map = new HashMap<>(); + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory( + eu.dnetlib.dhp.oa.graph.dump.projectssubset.ProjectSubsetTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + SparkConf conf = new SparkConf(); + conf.setAppName(eu.dnetlib.dhp.oa.graph.dump.projectssubset.ProjectSubsetTest.class.getSimpleName()); + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + spark = SparkSession + .builder() + .appName(eu.dnetlib.dhp.oa.graph.dump.projectssubset.ProjectSubsetTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + @Test + void testAllNew() throws Exception { + final String projectListPath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/projectsubset/projectId") + .getPath(); + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/projectsubset/allnew/projects") + .getPath(); + spark + .read() + .textFile(projectListPath) + .write() + .mode(SaveMode.Overwrite) + .text(workingDir.toString() + "/projectIds"); + ProjectsSubsetSparkJob.main(new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/projects", + "-sourcePath", sourcePath, + "-projectListPath", workingDir.toString() + "/projectIds" + }); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/projects") + .map(item -> OBJECT_MAPPER.readValue(item, Project.class)); + Assertions.assertEquals(12, tmp.count()); + Assertions.assertEquals(2, tmp.filter(p -> p.getId().substring(3, 15).equals("aka_________")).count()); + Assertions.assertEquals(2, tmp.filter(p -> p.getId().substring(3, 15).equals("anr_________")).count()); + Assertions.assertEquals(4, tmp.filter(p -> p.getId().substring(3, 15).equals("arc_________")).count()); + Assertions.assertEquals(3, tmp.filter(p -> p.getId().substring(3, 15).equals("conicytf____")).count()); + Assertions.assertEquals(1, tmp.filter(p -> p.getId().substring(3, 15).equals("corda_______")).count()); + Assertions.assertEquals(40, sc.textFile(workingDir.toString() + "/projectIds").count()); + } + @Test + void testMatchOne() throws Exception { + final String projectListPath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/projectsubset/projectId") + .getPath(); + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/projectsubset/matchOne/projects") + .getPath(); + spark + .read() + .textFile(projectListPath) + .write() + .mode(SaveMode.Overwrite) + .text(workingDir.toString() + "/projectIds"); + ProjectsSubsetSparkJob.main(new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/projects", + "-sourcePath", sourcePath, + "-projectListPath", workingDir.toString() + "/projectIds" + }); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/projects") + .map(item -> OBJECT_MAPPER.readValue(item, Project.class)); + Assertions.assertEquals(11, tmp.count()); + Assertions.assertEquals(2, tmp.filter(p -> p.getId().substring(3, 15).equals("aka_________")).count()); + Assertions.assertEquals(2, tmp.filter(p -> p.getId().substring(3, 15).equals("anr_________")).count()); + Assertions.assertEquals(4, tmp.filter(p -> p.getId().substring(3, 15).equals("arc_________")).count()); + Assertions.assertEquals(3, tmp.filter(p -> p.getId().substring(3, 15).equals("conicytf____")).count()); + Assertions.assertEquals(0, tmp.filter(p -> p.getId().substring(3, 15).equals("corda__h2020")).count()); + Assertions.assertEquals(39, sc.textFile(workingDir.toString() + "/projectIds").count()); + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/allnew/projects b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/allnew/projects new file mode 100644 index 000000000..82d9bef76 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/allnew/projects @@ -0,0 +1,12 @@ +{"id":"40|aka_________::01bb7b48e29d732a1c7bc5150b9195c4","websiteurl":null,"code":"135027","acronym":null,"title":"Dynamic 3D resolution-enhanced low-coherence interferometric imaging / Consortium: Hi-Lo","startdate":null,"enddate":null,"callidentifier":"Fotoniikka ja modernit kuvantamismenetelmät LT","keywords":null,"openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"AKA","name":"Academy of Finland","jurisdiction":"FI","funding_stream":null}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|aka_________::9d1af21dbd0f5bc719f71553d19a6b3a","websiteurl":null,"code":"316061","acronym":null,"title":"Finnish Imaging of Degenerative Shoulder Study (FIMAGE): A study on the prevalence of degenerative imaging changes of the shoulder and their relevance to clinical symptoms in the general population.","startdate":null,"enddate":null,"callidentifier":"Academy Project Funding TT","keywords":null,"openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"AKA","name":"Academy of Finland","jurisdiction":"FI","funding_stream":null}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|anr_________::1f21edc5c902be305ee47148955c6e50","websiteurl":null,"code":"ANR-17-CE05-0033","acronym":"MOISE","title":"METAL OXIDES AS LOW LOADED NANO-IRIDIUM SUPPORT FOR COMPETITIVE WATER ELECTROLYSIS","startdate":null,"enddate":null,"callidentifier":null,"keywords":null,"openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"ANR","name":"French National Research Agency (ANR)","jurisdiction":"FR","funding_stream":null}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|anr_________::547e78ffdcb7d72a1ef31058dede3a33","websiteurl":null,"code":"ANR-09-SEGI-0005","acronym":"GALAXY","title":"DEVELOPPEMENT COLLABORATIF DE SYSTEMES COMPLEXES SELON UNE APPROCHE GUIDEE PAR LES MODELES","startdate":null,"enddate":null,"callidentifier":null,"keywords":null,"openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"ANR","name":"French National Research Agency (ANR)","jurisdiction":"FR","funding_stream":null}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|arc_________::838e781a8d479e27a11101421fd8b296","websiteurl":"http://purl.org/au-research/grants/arc/LE0347462","code":"LE0347462","acronym":null,"title":"Femtosecond laser micromachining facility","startdate":"2003-01-01","enddate":"2003-12-31","callidentifier":null,"keywords":"biomedical nanostructures,femtosecond laser machining,laser manufacturing,laser micromachining,microphotonics,photonic bandgap structures","openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"ARC","name":"Australian Research Council (ARC)","jurisdiction":"AU","funding_stream":{"id":"ARC::Linkage Infrastructure, Equipment and Facilities","description":"Linkage Infrastructure, Equipment and Facilities"}}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|arc_________::a461f180f7b6700c0499d4d3d53e58c7","websiteurl":"http://purl.org/au-research/grants/arc/LP140100567","code":"LP140100567","acronym":null,"title":"Linkage Projects - Grant ID: LP140100567","startdate":"2014-01-01","enddate":"2017-12-31","callidentifier":null,"keywords":"EDUCATIONAL MEASUREMENT; EDUCATIONAL MEASUREMENT; HIGH-STAKES TESTING; HIGH-STAKES TESTING; PERFORMANCE ASSESSMENT; PERFORMANCE ASSESSMENT; PERFORMANCE ASSESSMENT","openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"ARC","name":"Australian Research Council (ARC)","jurisdiction":"AU","funding_stream":{"id":"ARC::Linkage Projects","description":"Linkage Projects"}}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|arc_________::b46b9e07d4cea67ccf497520a75ad0c8","websiteurl":"http://purl.org/au-research/grants/arc/DP180101235","code":"DP180101235","acronym":null,"title":"Discovery Projects - Grant ID: DP180101235","startdate":"2018-01-01","enddate":"2023-12-31","callidentifier":null,"keywords":null,"openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"ARC","name":"Australian Research Council (ARC)","jurisdiction":"AU","funding_stream":{"id":"ARC::Discovery Projects","description":"Discovery Projects"}}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|arc_________::c5f86314ce288f91a7f31c219b128fab","websiteurl":"http://purl.org/au-research/grants/arc/LE0989831","code":"LE0989831","acronym":null,"title":"The Australian Music Navigator: research infrastructure for discovering, accessing and analysing Australia's musical landscape","startdate":"2009-01-01","enddate":"2009-12-31","callidentifier":null,"keywords":"database metadata,digital sound,electroacoustic music,film music,music,music information retrieval","openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"ARC","name":"Australian Research Council (ARC)","jurisdiction":"AU","funding_stream":{"id":"ARC::Linkage Infrastructure, Equipment and Facilities","description":"Linkage Infrastructure, Equipment and Facilities"}}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|conicytf____::05539f3427ad605d7c1de0168f3e337f","websiteurl":"http://repositorio.conicyt.cl/handle/10533/183109","code":"3120023","acronym":null,"title":"SYNTHESIS AND STRUCTURE-ACTIVITY RELATIONSHIPS OF HETEROARYLISOQUINOLINE- AND PHENANTHRIDINEQUINONES AS ANTITUMOR AGENTS","startdate":"2011-01-01","enddate":"2014-01-28","callidentifier":null,"keywords":null,"openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"CONICYT","name":"Comisión Nacional de Investigación Científica y Tecnológica","jurisdiction":"CL","funding_stream":{"id":"CONICYT::FONDECYT::POSTDOCTORADO","description":"Fondecyt fundings - Fondecyt stream, POSTDOCTORADO"}}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|conicytf____::96b47b91a6c061e31f626612b1650c03","websiteurl":"http://repositorio.conicyt.cl/handle/10533/163340","code":"1040240","acronym":null,"title":"ESTUDIO TEORICO-EXPERIMENTAL DE LA PERMEACION DE FLUIDOS SUPERCRITICOS Y LA SEPARACION DE MEZCLAS A ALTA PRESION A TRAVES DE MEMBRANAS MICROPOROSAS.","startdate":"2004-01-15","enddate":"2007-01-15","callidentifier":null,"keywords":null,"openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"CONICYT","name":"Comisión Nacional de Investigación Científica y Tecnológica","jurisdiction":"CL","funding_stream":{"id":"CONICYT::FONDECYT::REGULAR","description":"Fondecyt fundings - Fondecyt stream, REGULAR"}}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|conicytf____::b122147e0a13f34cdb6311a9d714f9a5","websiteurl":"http://repositorio.conicyt.cl/handle/10533/162452","code":"1020683","acronym":null,"title":"SINTESIS Y CARACTERIZACION DE SALES CUATERNARIAS CON EL ANION CALCOFOSFATO [P2Qy]4- (Q=S,Se;y=6,7) PROPIEDADES FISICAS Y REACCIONES DE INCLUSION.","startdate":"2002-01-15","enddate":"2006-01-15","callidentifier":null,"keywords":null,"openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"CONICYT","name":"Comisión Nacional de Investigación Científica y Tecnológica","jurisdiction":"CL","funding_stream":{"id":"CONICYT::FONDECYT::REGULAR","description":"Fondecyt fundings - Fondecyt stream, REGULAR"}}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|corda_______::132bac68f17bb81c451d9071be6e4d6d","websiteurl":null,"code":"628405","acronym":"ANIM","title":"Precisely Defined, Surface-Engineered Nanostructures via Crystallization-Driven Self-Assembly of Linear-Dendritic Block Copolymers","startdate":"2014-05-01","enddate":"2016-04-30","callidentifier":"FP7-PEOPLE-2013-IIF","keywords":null,"openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"EC","name":"European Commission","jurisdiction":"EU","funding_stream":{"id":"EC::FP7::SP3::PEOPLE","description":"SEVENTH FRAMEWORK PROGRAMME - SP3-People - Marie-Curie Actions"}}],"summary":null,"granted":null,"h2020programme":[]} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/matchOne/projects b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/matchOne/projects new file mode 100644 index 000000000..93f6c0266 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/matchOne/projects @@ -0,0 +1,12 @@ +{"id":"40|aka_________::01bb7b48e29d732a1c7bc5150b9195c4","websiteurl":null,"code":"135027","acronym":null,"title":"Dynamic 3D resolution-enhanced low-coherence interferometric imaging / Consortium: Hi-Lo","startdate":null,"enddate":null,"callidentifier":"Fotoniikka ja modernit kuvantamismenetelmät LT","keywords":null,"openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"AKA","name":"Academy of Finland","jurisdiction":"FI","funding_stream":null}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|aka_________::9d1af21dbd0f5bc719f71553d19a6b3a","websiteurl":null,"code":"316061","acronym":null,"title":"Finnish Imaging of Degenerative Shoulder Study (FIMAGE): A study on the prevalence of degenerative imaging changes of the shoulder and their relevance to clinical symptoms in the general population.","startdate":null,"enddate":null,"callidentifier":"Academy Project Funding TT","keywords":null,"openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"AKA","name":"Academy of Finland","jurisdiction":"FI","funding_stream":null}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|anr_________::1f21edc5c902be305ee47148955c6e50","websiteurl":null,"code":"ANR-17-CE05-0033","acronym":"MOISE","title":"METAL OXIDES AS LOW LOADED NANO-IRIDIUM SUPPORT FOR COMPETITIVE WATER ELECTROLYSIS","startdate":null,"enddate":null,"callidentifier":null,"keywords":null,"openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"ANR","name":"French National Research Agency (ANR)","jurisdiction":"FR","funding_stream":null}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|anr_________::547e78ffdcb7d72a1ef31058dede3a33","websiteurl":null,"code":"ANR-09-SEGI-0005","acronym":"GALAXY","title":"DEVELOPPEMENT COLLABORATIF DE SYSTEMES COMPLEXES SELON UNE APPROCHE GUIDEE PAR LES MODELES","startdate":null,"enddate":null,"callidentifier":null,"keywords":null,"openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"ANR","name":"French National Research Agency (ANR)","jurisdiction":"FR","funding_stream":null}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|arc_________::838e781a8d479e27a11101421fd8b296","websiteurl":"http://purl.org/au-research/grants/arc/LE0347462","code":"LE0347462","acronym":null,"title":"Femtosecond laser micromachining facility","startdate":"2003-01-01","enddate":"2003-12-31","callidentifier":null,"keywords":"biomedical nanostructures,femtosecond laser machining,laser manufacturing,laser micromachining,microphotonics,photonic bandgap structures","openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"ARC","name":"Australian Research Council (ARC)","jurisdiction":"AU","funding_stream":{"id":"ARC::Linkage Infrastructure, Equipment and Facilities","description":"Linkage Infrastructure, Equipment and Facilities"}}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|arc_________::a461f180f7b6700c0499d4d3d53e58c7","websiteurl":"http://purl.org/au-research/grants/arc/LP140100567","code":"LP140100567","acronym":null,"title":"Linkage Projects - Grant ID: LP140100567","startdate":"2014-01-01","enddate":"2017-12-31","callidentifier":null,"keywords":"EDUCATIONAL MEASUREMENT; EDUCATIONAL MEASUREMENT; HIGH-STAKES TESTING; HIGH-STAKES TESTING; PERFORMANCE ASSESSMENT; PERFORMANCE ASSESSMENT; PERFORMANCE ASSESSMENT","openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"ARC","name":"Australian Research Council (ARC)","jurisdiction":"AU","funding_stream":{"id":"ARC::Linkage Projects","description":"Linkage Projects"}}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|arc_________::b46b9e07d4cea67ccf497520a75ad0c8","websiteurl":"http://purl.org/au-research/grants/arc/DP180101235","code":"DP180101235","acronym":null,"title":"Discovery Projects - Grant ID: DP180101235","startdate":"2018-01-01","enddate":"2023-12-31","callidentifier":null,"keywords":null,"openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"ARC","name":"Australian Research Council (ARC)","jurisdiction":"AU","funding_stream":{"id":"ARC::Discovery Projects","description":"Discovery Projects"}}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|arc_________::c5f86314ce288f91a7f31c219b128fab","websiteurl":"http://purl.org/au-research/grants/arc/LE0989831","code":"LE0989831","acronym":null,"title":"The Australian Music Navigator: research infrastructure for discovering, accessing and analysing Australia's musical landscape","startdate":"2009-01-01","enddate":"2009-12-31","callidentifier":null,"keywords":"database metadata,digital sound,electroacoustic music,film music,music,music information retrieval","openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"ARC","name":"Australian Research Council (ARC)","jurisdiction":"AU","funding_stream":{"id":"ARC::Linkage Infrastructure, Equipment and Facilities","description":"Linkage Infrastructure, Equipment and Facilities"}}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|conicytf____::05539f3427ad605d7c1de0168f3e337f","websiteurl":"http://repositorio.conicyt.cl/handle/10533/183109","code":"3120023","acronym":null,"title":"SYNTHESIS AND STRUCTURE-ACTIVITY RELATIONSHIPS OF HETEROARYLISOQUINOLINE- AND PHENANTHRIDINEQUINONES AS ANTITUMOR AGENTS","startdate":"2011-01-01","enddate":"2014-01-28","callidentifier":null,"keywords":null,"openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"CONICYT","name":"Comisión Nacional de Investigación Científica y Tecnológica","jurisdiction":"CL","funding_stream":{"id":"CONICYT::FONDECYT::POSTDOCTORADO","description":"Fondecyt fundings - Fondecyt stream, POSTDOCTORADO"}}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|conicytf____::96b47b91a6c061e31f626612b1650c03","websiteurl":"http://repositorio.conicyt.cl/handle/10533/163340","code":"1040240","acronym":null,"title":"ESTUDIO TEORICO-EXPERIMENTAL DE LA PERMEACION DE FLUIDOS SUPERCRITICOS Y LA SEPARACION DE MEZCLAS A ALTA PRESION A TRAVES DE MEMBRANAS MICROPOROSAS.","startdate":"2004-01-15","enddate":"2007-01-15","callidentifier":null,"keywords":null,"openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"CONICYT","name":"Comisión Nacional de Investigación Científica y Tecnológica","jurisdiction":"CL","funding_stream":{"id":"CONICYT::FONDECYT::REGULAR","description":"Fondecyt fundings - Fondecyt stream, REGULAR"}}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|conicytf____::b122147e0a13f34cdb6311a9d714f9a5","websiteurl":"http://repositorio.conicyt.cl/handle/10533/162452","code":"1020683","acronym":null,"title":"SINTESIS Y CARACTERIZACION DE SALES CUATERNARIAS CON EL ANION CALCOFOSFATO [P2Qy]4- (Q=S,Se;y=6,7) PROPIEDADES FISICAS Y REACCIONES DE INCLUSION.","startdate":"2002-01-15","enddate":"2006-01-15","callidentifier":null,"keywords":null,"openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"CONICYT","name":"Comisión Nacional de Investigación Científica y Tecnológica","jurisdiction":"CL","funding_stream":{"id":"CONICYT::FONDECYT::REGULAR","description":"Fondecyt fundings - Fondecyt stream, REGULAR"}}],"summary":null,"granted":null,"h2020programme":[]} +{"id":"40|corda__h2020::bf5d35ec8d24ae4abfb4a1c6a0af3856","websiteurl":null,"code":"628405","acronym":"ANIM","title":"Precisely Defined, Surface-Engineered Nanostructures via Crystallization-Driven Self-Assembly of Linear-Dendritic Block Copolymers","startdate":"2014-05-01","enddate":"2016-04-30","callidentifier":"FP7-PEOPLE-2013-IIF","keywords":null,"openaccessmandateforpublications":false,"openaccessmandatefordataset":false,"subject":[],"funding":[{"shortName":"EC","name":"European Commission","jurisdiction":"EU","funding_stream":{"id":"EC::FP7::SP3::PEOPLE","description":"SEVENTH FRAMEWORK PROGRAMME - SP3-People - Marie-Curie Actions"}}],"summary":null,"granted":null,"h2020programme":[]} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/projectId b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/projectId new file mode 100644 index 000000000..9a7ea9085 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/projectId @@ -0,0 +1,28 @@ +40|nih_________::4c32cdbc4c9949853f02219fc4780a30 +40|nih_________::b485512ef116af73bee79d50c8f9ca01 +40|nih_________::b44d9bc8e99d9a0477ac06897e3e9c19 +40|nih_________::7d2d2b7d1644a722a6bbcb031d82fec6 +40|nsf_________::6b2674b0341e07b818a56c6f0daa2633 +40|nih_________::96bb39aecc8f7b9f3b02ed36ef09538b +40|nsf_________::88d92bdf20ec2fac3ed9740f962b4fad +40|nih_________::4bb8c14729a0082378bb04db8321ce14 +40|nih_________::08a8eed6c17c6d8e427afcfd29f87c7b +40|nsf_________::c314f3d35af1990121bf5b803937e112 +40|nih_________::3ad6a2e6ebd561206f0da69468337f50 +40|nih_________::d02c60c65a59629e69a30abcf2ceaed1 +40|nih_________::d5a241cc94253feb72181cde15f51e96 +40|nih_________::b5df718bbca69af50d4b7213e26af3f0 +40|nih_________::bc90893c1be80503578e48f6ef6b7061 +40|rcuk________::2c39b38c26c260b14a9816b88c91c132 +40|nih_________::ab103ad117cd0579df66f7592a7d4adf +40|nih_________::147aa6ad8bd201e2a02c7b6cc3f68348 +40|corda__h2020::bf5d35ec8d24ae4abfb4a1c6a0af3856 +40|nih_________::b8083208156f2764d07c736ba9b49dd2 +40|nih_________::f4d1e0aece0e6a9eff8d054c28e082db +40|nsf_________::56297da8b472a4be8ac3f09af813c9f6 +40|nsf_________::6b6dc3398eeebb3de1ab66e6eb8c5cb3 +40|nih_________::93289a36ebffb0bee3d6b01c6fc0a3d6 +40|nih_________::6c3b00dd4ae9d43d6630ff18f189ebae +40|nih_________::1d983a87768f13bc8377b1b7d17290a2 +40|nih_________::c3b56e91859b114644c1403e892eb80f +40|rcuk________::c1e15330fc7956063652f9c06e584548 \ No newline at end of file From 1d1fe3b151646e52a60d4c5f9fd70d313c506b2a Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 27 Jun 2022 18:04:59 +0200 Subject: [PATCH 3/4] [DUMP DELTA PROJECTS] refactoring --- .../projectssubset/ProjectSubsetTest.java | 204 +++++++++--------- 1 file changed, 106 insertions(+), 98 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectSubsetTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectSubsetTest.java index d5b1ce776..4ff1e1b96 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectSubsetTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectSubsetTest.java @@ -1,8 +1,11 @@ + package eu.dnetlib.dhp.oa.graph.dump.projectssubset; + import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.HashMap; + import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -15,104 +18,109 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkResultLinkedToProject; -import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; + import eu.dnetlib.dhp.schema.dump.oaf.graph.Project; + public class ProjectSubsetTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static SparkSession spark; - private static Path workingDir; - private static final Logger log = LoggerFactory - .getLogger(eu.dnetlib.dhp.oa.graph.dump.projectssubset.ProjectSubsetTest.class); - private static final HashMap map = new HashMap<>(); - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files - .createTempDirectory( - eu.dnetlib.dhp.oa.graph.dump.projectssubset.ProjectSubsetTest.class.getSimpleName()); - log.info("using work dir {}", workingDir); - SparkConf conf = new SparkConf(); - conf.setAppName(eu.dnetlib.dhp.oa.graph.dump.projectssubset.ProjectSubsetTest.class.getSimpleName()); - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.toString()); - conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); - spark = SparkSession - .builder() - .appName(eu.dnetlib.dhp.oa.graph.dump.projectssubset.ProjectSubsetTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - spark.stop(); - } - @Test - void testAllNew() throws Exception { - final String projectListPath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/projectsubset/projectId") - .getPath(); - final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/projectsubset/allnew/projects") - .getPath(); - spark - .read() - .textFile(projectListPath) - .write() - .mode(SaveMode.Overwrite) - .text(workingDir.toString() + "/projectIds"); - ProjectsSubsetSparkJob.main(new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-outputPath", workingDir.toString() + "/projects", - "-sourcePath", sourcePath, - "-projectListPath", workingDir.toString() + "/projectIds" - }); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD tmp = sc - .textFile(workingDir.toString() + "/projects") - .map(item -> OBJECT_MAPPER.readValue(item, Project.class)); - Assertions.assertEquals(12, tmp.count()); - Assertions.assertEquals(2, tmp.filter(p -> p.getId().substring(3, 15).equals("aka_________")).count()); - Assertions.assertEquals(2, tmp.filter(p -> p.getId().substring(3, 15).equals("anr_________")).count()); - Assertions.assertEquals(4, tmp.filter(p -> p.getId().substring(3, 15).equals("arc_________")).count()); - Assertions.assertEquals(3, tmp.filter(p -> p.getId().substring(3, 15).equals("conicytf____")).count()); - Assertions.assertEquals(1, tmp.filter(p -> p.getId().substring(3, 15).equals("corda_______")).count()); - Assertions.assertEquals(40, sc.textFile(workingDir.toString() + "/projectIds").count()); - } - @Test - void testMatchOne() throws Exception { - final String projectListPath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/projectsubset/projectId") - .getPath(); - final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/projectsubset/matchOne/projects") - .getPath(); - spark - .read() - .textFile(projectListPath) - .write() - .mode(SaveMode.Overwrite) - .text(workingDir.toString() + "/projectIds"); - ProjectsSubsetSparkJob.main(new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-outputPath", workingDir.toString() + "/projects", - "-sourcePath", sourcePath, - "-projectListPath", workingDir.toString() + "/projectIds" - }); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD tmp = sc - .textFile(workingDir.toString() + "/projects") - .map(item -> OBJECT_MAPPER.readValue(item, Project.class)); - Assertions.assertEquals(11, tmp.count()); - Assertions.assertEquals(2, tmp.filter(p -> p.getId().substring(3, 15).equals("aka_________")).count()); - Assertions.assertEquals(2, tmp.filter(p -> p.getId().substring(3, 15).equals("anr_________")).count()); - Assertions.assertEquals(4, tmp.filter(p -> p.getId().substring(3, 15).equals("arc_________")).count()); - Assertions.assertEquals(3, tmp.filter(p -> p.getId().substring(3, 15).equals("conicytf____")).count()); - Assertions.assertEquals(0, tmp.filter(p -> p.getId().substring(3, 15).equals("corda__h2020")).count()); - Assertions.assertEquals(39, sc.textFile(workingDir.toString() + "/projectIds").count()); - } -} \ No newline at end of file + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static SparkSession spark; + private static Path workingDir; + private static final Logger log = LoggerFactory + .getLogger(eu.dnetlib.dhp.oa.graph.dump.projectssubset.ProjectSubsetTest.class); + + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory( + eu.dnetlib.dhp.oa.graph.dump.projectssubset.ProjectSubsetTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + SparkConf conf = new SparkConf(); + conf.setAppName(eu.dnetlib.dhp.oa.graph.dump.projectssubset.ProjectSubsetTest.class.getSimpleName()); + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + spark = SparkSession + .builder() + .appName(eu.dnetlib.dhp.oa.graph.dump.projectssubset.ProjectSubsetTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + void testAllNew() throws Exception { + final String projectListPath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/projectsubset/projectId") + .getPath(); + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/projectsubset/allnew/projects") + .getPath(); + spark + .read() + .textFile(projectListPath) + .write() + .mode(SaveMode.Overwrite) + .text(workingDir.toString() + "/projectIds"); + ProjectsSubsetSparkJob.main(new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/projects", + "-sourcePath", sourcePath, + "-projectListPath", workingDir.toString() + "/projectIds" + }); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/projects") + .map(item -> OBJECT_MAPPER.readValue(item, Project.class)); + Assertions.assertEquals(12, tmp.count()); + Assertions.assertEquals(2, tmp.filter(p -> p.getId().substring(3, 15).equals("aka_________")).count()); + Assertions.assertEquals(2, tmp.filter(p -> p.getId().substring(3, 15).equals("anr_________")).count()); + Assertions.assertEquals(4, tmp.filter(p -> p.getId().substring(3, 15).equals("arc_________")).count()); + Assertions.assertEquals(3, tmp.filter(p -> p.getId().substring(3, 15).equals("conicytf____")).count()); + Assertions.assertEquals(1, tmp.filter(p -> p.getId().substring(3, 15).equals("corda_______")).count()); + Assertions.assertEquals(40, sc.textFile(workingDir.toString() + "/projectIds").count()); + } + + @Test + void testMatchOne() throws Exception { + final String projectListPath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/projectsubset/projectId") + .getPath(); + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/projectsubset/matchOne/projects") + .getPath(); + spark + .read() + .textFile(projectListPath) + .write() + .mode(SaveMode.Overwrite) + .text(workingDir.toString() + "/projectIds"); + ProjectsSubsetSparkJob.main(new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/projects", + "-sourcePath", sourcePath, + "-projectListPath", workingDir.toString() + "/projectIds" + }); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/projects") + .map(item -> OBJECT_MAPPER.readValue(item, Project.class)); + Assertions.assertEquals(11, tmp.count()); + Assertions.assertEquals(2, tmp.filter(p -> p.getId().substring(3, 15).equals("aka_________")).count()); + Assertions.assertEquals(2, tmp.filter(p -> p.getId().substring(3, 15).equals("anr_________")).count()); + Assertions.assertEquals(4, tmp.filter(p -> p.getId().substring(3, 15).equals("arc_________")).count()); + Assertions.assertEquals(3, tmp.filter(p -> p.getId().substring(3, 15).equals("conicytf____")).count()); + Assertions.assertEquals(0, tmp.filter(p -> p.getId().substring(3, 15).equals("corda__h2020")).count()); + Assertions.assertEquals(39, sc.textFile(workingDir.toString() + "/projectIds").count()); + } +} From 71744a1f5274957a48e761a95c14370ec8f488a1 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 27 Jun 2022 18:07:58 +0200 Subject: [PATCH 4/4] [DUMP DELTA PROJECTS] refactoring --- .../ProjectsSubsetSparkJob.java | 121 +++++++++--------- 1 file changed, 60 insertions(+), 61 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java index 67da24185..83a9f3464 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java @@ -1,80 +1,79 @@ + package eu.dnetlib.dhp.oa.graph.dump.projectssubset; + import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; -import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.schema.dump.oaf.community.Funder; import eu.dnetlib.dhp.schema.dump.oaf.graph.Project; import scala.Tuple2; + public class ProjectsSubsetSparkJob implements Serializable { - private static final Logger log = LoggerFactory.getLogger(ProjectsSubsetSparkJob.class); - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - ProjectsSubsetSparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/project_subset_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - final String projectListPath = parser.get("projectListPath"); - log.info("projectListPath: {}", projectListPath); - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - getNewProjectList(spark, inputPath, outputPath, projectListPath); - }); - } - private static void getNewProjectList(SparkSession spark, String inputPath, String outputPath, - String projectListPath) { - Dataset projectList = spark.read().textFile(projectListPath); - // projectList.show(false); - Dataset projects; - projects = Utils.readPath(spark, inputPath, Project.class); - projects - .joinWith(projectList, projects.col("id").equalTo(projectList.col("value")), "left") - .map((MapFunction, Project>) t2 -> { - if (Optional.ofNullable(t2._2()).isPresent()) - return null; - return t2._1(); - }, Encoders.bean(Project.class)) - .filter(Objects::nonNull) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); - Utils - .readPath(spark, outputPath, Project.class) - .map((MapFunction) p -> p.getId(), Encoders.STRING()) - .write() - .mode(SaveMode.Append) - .option("compression", "gzip") - .text(projectListPath); - } -} \ No newline at end of file + private static final Logger log = LoggerFactory.getLogger(ProjectsSubsetSparkJob.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + ProjectsSubsetSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/project_subset_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + final String projectListPath = parser.get("projectListPath"); + log.info("projectListPath: {}", projectListPath); + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + getNewProjectList(spark, inputPath, outputPath, projectListPath); + }); + } + + private static void getNewProjectList(SparkSession spark, String inputPath, String outputPath, + String projectListPath) { + Dataset projectList = spark.read().textFile(projectListPath); + Dataset projects; + projects = Utils.readPath(spark, inputPath, Project.class); + projects + .joinWith(projectList, projects.col("id").equalTo(projectList.col("value")), "left") + .map((MapFunction, Project>) t2 -> { + if (Optional.ofNullable(t2._2()).isPresent()) + return null; + return t2._1(); + }, Encoders.bean(Project.class)) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + Utils + .readPath(spark, outputPath, Project.class) + .map((MapFunction) p -> p.getId(), Encoders.STRING()) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .text(projectListPath); + } +}