Added entity filter to spark class

This commit is contained in:
Sandro La Bruzzo 2019-10-30 12:19:03 +01:00
parent a336956708
commit 997e57d45b
5 changed files with 32 additions and 9 deletions

View File

@ -27,6 +27,11 @@ public class SparkGraphImporterJob {
final String inputPath = parser.get("input");
final String outputPath = parser.get("outputDir");
final String filter = parser.get("filter");
// Read the input file and convert it into RDD of serializable object
final JavaRDD<Tuple2<String, String>> inputRDD = sc.sequenceFile(inputPath, Text.class, Text.class)
.map(item -> new Tuple2<>(item._1.toString(), item._2.toString()));
@ -44,15 +49,22 @@ public class SparkGraphImporterJob {
final Encoder<Relation> relationEncoder = Encoders.bean(Relation.class);
spark.createDataset(oafRdd.filter(s -> s instanceof Organization).map(s -> (Organization) s).rdd(), organizationEncoder).write().save(outputPath + "/organizations");
if (filter == null|| filter.toLowerCase().contains("organization"))
spark.createDataset(oafRdd.filter(s -> s instanceof Organization).map(s -> (Organization) s).rdd(), organizationEncoder).write().save(outputPath + "/organizations");
if (filter == null|| filter.toLowerCase().contains("project"))
spark.createDataset(oafRdd.filter(s -> s instanceof Project).map(s -> (Project) s).rdd(), projectEncoder).write().save(outputPath + "/projects");
spark.createDataset(oafRdd.filter(s -> s instanceof Datasource).map(s -> (Datasource) s).rdd(), datasourceEncoder).write().save(outputPath + "/datasources");
spark.createDataset(oafRdd.filter(s -> s instanceof eu.dnetlib.dhp.schema.oaf.Dataset).map(s -> (eu.dnetlib.dhp.schema.oaf.Dataset) s).rdd(), datasetEncoder).write().save(outputPath + "/datasets");
if (filter == null|| filter.toLowerCase().contains("datasource"))
spark.createDataset(oafRdd.filter(s -> s instanceof Datasource).map(s -> (Datasource) s).rdd(), datasourceEncoder).write().save(outputPath + "/datasources");
if (filter == null|| filter.toLowerCase().contains("dataset"))
spark.createDataset(oafRdd.filter(s -> s instanceof eu.dnetlib.dhp.schema.oaf.Dataset).map(s -> (eu.dnetlib.dhp.schema.oaf.Dataset) s).rdd(), datasetEncoder).write().save(outputPath + "/datasets");
spark.createDataset(oafRdd.filter(s -> s instanceof Publication).map(s -> (Publication) s).rdd(), publicationEncoder).write().save(outputPath + "/publications");
spark.createDataset(oafRdd.filter(s -> s instanceof Software).map(s -> (Software) s).rdd(), softwareEncoder).write().save(outputPath + "/software");
spark.createDataset(oafRdd.filter(s -> s instanceof OtherResearchProducts).map(s -> (OtherResearchProducts) s).rdd(), otherResearchProductsEncoder).write().save(outputPath + "/otherResearchProducts");
spark.createDataset(oafRdd.filter(s -> s instanceof Relation).map(s -> (Relation) s).rdd(), relationEncoder).write().save(outputPath + "/relations");
if (filter == null|| filter.toLowerCase().contains("publication"))
spark.createDataset(oafRdd.filter(s -> s instanceof Publication).map(s -> (Publication) s).rdd(), publicationEncoder).write().save(outputPath + "/publications");
if (filter == null|| filter.toLowerCase().contains("software"))
spark.createDataset(oafRdd.filter(s -> s instanceof Software).map(s -> (Software) s).rdd(), softwareEncoder).write().save(outputPath + "/software");
if (filter == null|| filter.toLowerCase().contains("otherresearchproduct"))
spark.createDataset(oafRdd.filter(s -> s instanceof OtherResearchProducts).map(s -> (OtherResearchProducts) s).rdd(), otherResearchProductsEncoder).write().save(outputPath + "/otherResearchProducts");
if (filter == null|| filter.toLowerCase().contains("relation"))
spark.createDataset(oafRdd.filter(s -> s instanceof Relation).map(s -> (Relation) s).rdd(), relationEncoder).write().save(outputPath + "/relations");
}
}

View File

@ -1,5 +1,6 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"i", "paramLongName":"input", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
{"paramName":"f", "paramLongName":"filter", "paramDescription": "csv of typology of dataframe to be generated", "paramRequired": false},
{"paramName":"o", "paramLongName":"outputDir", "paramDescription": "the path where store DataFrames on HDFS", "paramRequired": true}
]

View File

@ -15,4 +15,8 @@
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>filter</name>
<value></value>
</property>
</configuration>

View File

@ -20,6 +20,10 @@
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>filter</name>
<description>csv string to filter the entities to generate</description>
</property>
</parameters>
<start to="MapGraphIntoDataFrame"/>
@ -41,6 +45,8 @@
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--input</arg><arg>${sourcePath}</arg>
<arg>--outputDir</arg><arg>${targetPath}</arg>
<arg>--filter</arg><arg>${filter}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>

View File

@ -8,7 +8,7 @@ public class SparkGraphImporterJobTest {
@Test
@Ignore
public void testImport() throws Exception {
SparkGraphImporterJob.main(new String[]{"-mt", "local[*]","-i", "/home/sandro/part-m-02236", "-o", "/tmp/dataframes"});
SparkGraphImporterJob.main(new String[]{"-mt", "local[*]","-i", "/home/sandro/part-m-02236", "-o", "/tmp/dataframes", "-f", "software,relation"});
}
}