From 997e57d45b448aa718c0a81d321e8cee44df6c7e Mon Sep 17 00:00:00 2001 From: "sandro.labruzzo" Date: Wed, 30 Oct 2019 12:19:03 +0100 Subject: [PATCH] Added entity filter to spark class --- .../dhp/graph/SparkGraphImporterJob.java | 28 +++++++++++++------ .../dhp/graph/input_graph_parameters.json | 1 + .../dhp/graph/oozie_app/config-default.xml | 4 +++ .../dnetlib/dhp/graph/oozie_app/workflow.xml | 6 ++++ .../dhp/graph/SparkGraphImporterJobTest.java | 2 +- 5 files changed, 32 insertions(+), 9 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java index 9d8d384b6..625b34223 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java @@ -27,6 +27,11 @@ public class SparkGraphImporterJob { final String inputPath = parser.get("input"); final String outputPath = parser.get("outputDir"); + + final String filter = parser.get("filter"); + + + // Read the input file and convert it into RDD of serializable object final JavaRDD> inputRDD = sc.sequenceFile(inputPath, Text.class, Text.class) .map(item -> new Tuple2<>(item._1.toString(), item._2.toString())); @@ -44,15 +49,22 @@ public class SparkGraphImporterJob { final Encoder relationEncoder = Encoders.bean(Relation.class); - spark.createDataset(oafRdd.filter(s -> s instanceof Organization).map(s -> (Organization) s).rdd(), organizationEncoder).write().save(outputPath + "/organizations"); + if (filter == null|| filter.toLowerCase().contains("organization")) + spark.createDataset(oafRdd.filter(s -> s instanceof Organization).map(s -> (Organization) s).rdd(), organizationEncoder).write().save(outputPath + "/organizations"); + if (filter == null|| filter.toLowerCase().contains("project")) spark.createDataset(oafRdd.filter(s -> s instanceof Project).map(s -> (Project) s).rdd(), projectEncoder).write().save(outputPath + "/projects"); - spark.createDataset(oafRdd.filter(s -> s instanceof Datasource).map(s -> (Datasource) s).rdd(), datasourceEncoder).write().save(outputPath + "/datasources"); - spark.createDataset(oafRdd.filter(s -> s instanceof eu.dnetlib.dhp.schema.oaf.Dataset).map(s -> (eu.dnetlib.dhp.schema.oaf.Dataset) s).rdd(), datasetEncoder).write().save(outputPath + "/datasets"); + if (filter == null|| filter.toLowerCase().contains("datasource")) + spark.createDataset(oafRdd.filter(s -> s instanceof Datasource).map(s -> (Datasource) s).rdd(), datasourceEncoder).write().save(outputPath + "/datasources"); + if (filter == null|| filter.toLowerCase().contains("dataset")) + spark.createDataset(oafRdd.filter(s -> s instanceof eu.dnetlib.dhp.schema.oaf.Dataset).map(s -> (eu.dnetlib.dhp.schema.oaf.Dataset) s).rdd(), datasetEncoder).write().save(outputPath + "/datasets"); - spark.createDataset(oafRdd.filter(s -> s instanceof Publication).map(s -> (Publication) s).rdd(), publicationEncoder).write().save(outputPath + "/publications"); - spark.createDataset(oafRdd.filter(s -> s instanceof Software).map(s -> (Software) s).rdd(), softwareEncoder).write().save(outputPath + "/software"); - spark.createDataset(oafRdd.filter(s -> s instanceof OtherResearchProducts).map(s -> (OtherResearchProducts) s).rdd(), otherResearchProductsEncoder).write().save(outputPath + "/otherResearchProducts"); - - spark.createDataset(oafRdd.filter(s -> s instanceof Relation).map(s -> (Relation) s).rdd(), relationEncoder).write().save(outputPath + "/relations"); + if (filter == null|| filter.toLowerCase().contains("publication")) + spark.createDataset(oafRdd.filter(s -> s instanceof Publication).map(s -> (Publication) s).rdd(), publicationEncoder).write().save(outputPath + "/publications"); + if (filter == null|| filter.toLowerCase().contains("software")) + spark.createDataset(oafRdd.filter(s -> s instanceof Software).map(s -> (Software) s).rdd(), softwareEncoder).write().save(outputPath + "/software"); + if (filter == null|| filter.toLowerCase().contains("otherresearchproduct")) + spark.createDataset(oafRdd.filter(s -> s instanceof OtherResearchProducts).map(s -> (OtherResearchProducts) s).rdd(), otherResearchProductsEncoder).write().save(outputPath + "/otherResearchProducts"); + if (filter == null|| filter.toLowerCase().contains("relation")) + spark.createDataset(oafRdd.filter(s -> s instanceof Relation).map(s -> (Relation) s).rdd(), relationEncoder).write().save(outputPath + "/relations"); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json index 50378b465..3e0a45dbf 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json @@ -1,5 +1,6 @@ [ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, {"paramName":"i", "paramLongName":"input", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, + {"paramName":"f", "paramLongName":"filter", "paramDescription": "csv of typology of dataframe to be generated", "paramRequired": false}, {"paramName":"o", "paramLongName":"outputDir", "paramDescription": "the path where store DataFrames on HDFS", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml index 2e0ed9aee..b2b577712 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml @@ -15,4 +15,8 @@ oozie.action.sharelib.for.spark spark2 + + filter + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml index 08e52f62c..6375e27a6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml @@ -20,6 +20,10 @@ sparkExecutorCores number of cores used by single executor + + filter + csv string to filter the entities to generate + @@ -41,6 +45,8 @@ -mt yarn-cluster --input${sourcePath} --outputDir${targetPath} + --filter${filter} + diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java index 5854aa519..ce5185b61 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java @@ -8,7 +8,7 @@ public class SparkGraphImporterJobTest { @Test @Ignore public void testImport() throws Exception { - SparkGraphImporterJob.main(new String[]{"-mt", "local[*]","-i", "/home/sandro/part-m-02236", "-o", "/tmp/dataframes"}); + SparkGraphImporterJob.main(new String[]{"-mt", "local[*]","-i", "/home/sandro/part-m-02236", "-o", "/tmp/dataframes", "-f", "software,relation"}); } }