diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java index 01f71a19f..d517cca00 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java @@ -27,8 +27,8 @@ import java.util.Comparator; import java.util.HashMap; import java.util.Map; - public class TransformationJobTest { + @Mock LongAccumulator accumulator; @@ -42,9 +42,8 @@ public class TransformationJobTest { testDir = Files.createTempDirectory("dhp-collection"); } - @After - public void teadDown() throws IOException { + public void tearDown() throws IOException { FileUtils.deleteDirectory(testDir.toFile()); } @@ -90,11 +89,8 @@ public class TransformationJobTest { "-rh", "", "-ro", "", "-rr", ""}); - - } - @Test public void tryLoadFolderOnCP() throws Exception { final String path = this.getClass().getResource("/eu/dnetlib/dhp/transform/mdstorenative").getFile(); @@ -102,7 +98,6 @@ public class TransformationJobTest { Path tempDirWithPrefix = Files.createTempDirectory("mdsotre_output"); - System.out.println(tempDirWithPrefix.toFile().getAbsolutePath()); Files.deleteIfExists(tempDirWithPrefix); @@ -140,10 +135,6 @@ public class TransformationJobTest { Node node = document.selectSingleNode("//CODE/*[local-name()='stylesheet']"); System.out.println(node.asXML()); - - - } - } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java new file mode 100644 index 000000000..ab19ff2b5 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java @@ -0,0 +1,23 @@ +package eu.dnetlib.dhp.graph; + +import com.google.common.collect.Maps; +import eu.dnetlib.dhp.schema.oaf.*; + +import java.util.Map; + +public class GraphMappingUtils { + + public final static Map types = Maps.newHashMap(); + + static { + types.put("datasource", Datasource.class); + types.put("organization", Organization.class); + types.put("project", Project.class); + types.put("dataset", Dataset.class); + types.put("otherresearchproduct", OtherResearchProduct.class); + types.put("software", Software.class); + types.put("publication", Publication.class); + types.put("relation", Relation.class); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java index f081b4ca9..ac1b0a860 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java @@ -1,9 +1,7 @@ package eu.dnetlib.dhp.graph; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Maps; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.Text; @@ -13,8 +11,6 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import scala.Tuple2; -import java.util.Map; - public class SparkGraphImporterJob { public static void main(String[] args) throws Exception { @@ -27,8 +23,8 @@ public class SparkGraphImporterJob { .master(parser.get("master")) .getOrCreate(); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("input"); - final String outputPath = parser.get("outputDir"); + final String inputPath = parser.get("sourcePath"); + final String outputPath = parser.get("targetPath"); final String filter = parser.get("filter"); @@ -36,17 +32,7 @@ public class SparkGraphImporterJob { final JavaRDD> inputRDD = sc.sequenceFile(inputPath, Text.class, Text.class) .map(item -> new Tuple2<>(item._1.toString(), item._2.toString())); - final Map types = Maps.newHashMap(); - types.put("datasource", Datasource.class); - types.put("organization", Organization.class); - types.put("project", Project.class); - types.put("dataset", Dataset.class); - types.put("otherresearchproduct", OtherResearchProduct.class); - types.put("software", Software.class); - types.put("publication", Publication.class); - types.put("relation", Relation.class); - - types.forEach((name, clazz) -> { + GraphMappingUtils.types.forEach((name, clazz) -> { if (StringUtils.isNotBlank(filter) || filter.toLowerCase().contains(name)) { spark.createDataset(inputRDD .filter(s -> s._1().equals(clazz.getName())) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json index 3e0a45dbf..ca7283b75 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json @@ -1,6 +1,6 @@ [ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"i", "paramLongName":"input", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, - {"paramName":"f", "paramLongName":"filter", "paramDescription": "csv of typology of dataframe to be generated", "paramRequired": false}, - {"paramName":"o", "paramLongName":"outputDir", "paramDescription": "the path where store DataFrames on HDFS", "paramRequired": true} + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, + {"paramName":"f", "paramLongName":"filter", "paramDescription": "csv of typology of dataframe to be generated", "paramRequired": false}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path where store DataFrames on HDFS", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml index 6375e27a6..8d90af185 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml @@ -43,8 +43,8 @@ dhp-graph-mapper-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" -mt yarn-cluster - --input${sourcePath} - --outputDir${targetPath} + --sourcePath${sourcePath} + --targetPath${targetPath} --filter${filter} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImportCounterTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImportCounterTest.java new file mode 100644 index 000000000..a8e810d4f --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImportCounterTest.java @@ -0,0 +1,31 @@ +package eu.dnetlib.dhp.graph; + +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.util.List; +import java.util.stream.Collectors; + +public class SparkGraphImportCounterTest { + + public static List> countEntities(final String inputPath) throws Exception { + + final SparkSession spark = SparkSession + .builder() + .appName(SparkGraphImportCounterTest.class.getSimpleName()) + .master("local[*]") + .getOrCreate(); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + return GraphMappingUtils.types.entrySet() + .stream() + .map(entry -> { + final Long count = spark.read().load(inputPath + "/" + entry.getKey()).as(Encoders.bean(entry.getValue())).count(); + return new Tuple2(entry.getKey(), count); + }) + .collect(Collectors.toList()); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java index dd5468de4..c713e235e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java @@ -1,15 +1,39 @@ package eu.dnetlib.dhp.graph; -import org.junit.Ignore; -import org.junit.Test; +import org.apache.commons.io.FileUtils; +import org.junit.*; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; public class SparkGraphImporterJobTest { + private static final long MAX = 1000L; + private Path testDir; + + @Before + public void setup() throws IOException { + testDir = Files.createTempDirectory(getClass().getSimpleName()); + } + + @After + public void tearDown() throws IOException { + FileUtils.deleteDirectory(testDir.toFile()); + } + @Test @Ignore public void testImport() throws Exception { - SparkGraphImporterJob.main(new String[]{"-mt", "local[*]","-i", "/home/sandro/part-m-02236", "-o", "/tmp/dataframes", "-f", "publication"}); + SparkGraphImporterJob.main(new String[] { + "-mt", "local[*]", + "-i", getClass().getResource("/eu/dnetlib/dhp/dhp-sample/part-m-00010").getPath(), + "-o", testDir.toString()}); + SparkGraphImportCounterTest.countEntities(testDir.toString()).forEach(t -> { + System.out.println(t); + //Assert.assertEquals(String.format("mapped %s must be %s", t._1(), MAX), MAX, t._2().longValue()); + }); } }