2019-10-24 16:00:28 +02:00
|
|
|
package eu.dnetlib.dhp.graph;
|
|
|
|
|
2019-10-25 18:10:30 +02:00
|
|
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
|
|
import eu.dnetlib.dhp.schema.oaf.*;
|
|
|
|
import org.apache.commons.io.IOUtils;
|
|
|
|
import org.apache.commons.lang.StringUtils;
|
2019-10-24 16:00:28 +02:00
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
|
import org.apache.spark.api.java.JavaRDD;
|
|
|
|
import org.apache.spark.api.java.JavaSparkContext;
|
2019-10-25 09:24:18 +02:00
|
|
|
import org.apache.spark.sql.Encoder;
|
|
|
|
import org.apache.spark.sql.Encoders;
|
2019-10-24 16:00:28 +02:00
|
|
|
import org.apache.spark.sql.SparkSession;
|
|
|
|
import scala.Tuple2;
|
|
|
|
|
|
|
|
public class SparkGraphImporterJob {
|
|
|
|
|
2019-10-25 18:10:30 +02:00
|
|
|
public static void main(String[] args) throws Exception {
|
2019-10-24 16:00:28 +02:00
|
|
|
|
2019-10-25 18:10:30 +02:00
|
|
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json")));
|
|
|
|
parser.parseArgument(args);
|
2019-10-24 16:00:28 +02:00
|
|
|
final SparkSession spark = SparkSession
|
|
|
|
.builder()
|
|
|
|
.appName("ImportGraph")
|
2019-10-25 18:10:30 +02:00
|
|
|
.master(parser.get("master"))
|
2019-10-24 16:00:28 +02:00
|
|
|
.getOrCreate();
|
|
|
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
2019-10-25 18:10:30 +02:00
|
|
|
final String inputPath = parser.get("input");
|
|
|
|
final String outputPath = parser.get("outputDir");
|
2019-10-24 16:00:28 +02:00
|
|
|
|
2019-10-25 18:10:30 +02:00
|
|
|
// Read the input file and convert it into RDD of serializable object
|
|
|
|
final JavaRDD<Tuple2<String, String>> inputRDD = sc.sequenceFile(inputPath, Text.class, Text.class)
|
2019-10-25 09:45:12 +02:00
|
|
|
.map(item -> new Tuple2<>(item._1.toString(), item._2.toString()));
|
|
|
|
|
2019-10-25 18:10:30 +02:00
|
|
|
final JavaRDD<Oaf> oafRdd = inputRDD.filter(s -> !StringUtils.isBlank(s._2()) && !s._1().contains("@update")).map(Tuple2::_2).map(ProtoConverter::convert);
|
2019-10-25 11:58:20 +02:00
|
|
|
|
2019-10-25 18:10:30 +02:00
|
|
|
final Encoder<Organization> organizationEncoder = Encoders.bean(Organization.class);
|
|
|
|
final Encoder<Project> projectEncoder = Encoders.bean(Project.class);
|
|
|
|
final Encoder<Datasource> datasourceEncoder = Encoders.bean(Datasource.class);
|
2019-10-25 11:56:28 +02:00
|
|
|
|
2019-10-25 18:10:30 +02:00
|
|
|
final Encoder<eu.dnetlib.dhp.schema.oaf.Dataset> datasetEncoder = Encoders.bean(eu.dnetlib.dhp.schema.oaf.Dataset.class);
|
|
|
|
final Encoder<Publication> publicationEncoder = Encoders.bean(Publication.class);
|
|
|
|
final Encoder<Software> softwareEncoder = Encoders.bean(Software.class);
|
|
|
|
final Encoder<OtherResearchProducts> otherResearchProductsEncoder = Encoders.bean(OtherResearchProducts.class);
|
2019-10-25 11:56:28 +02:00
|
|
|
|
2019-10-25 18:10:30 +02:00
|
|
|
final Encoder<Relation> relationEncoder = Encoders.bean(Relation.class);
|
2019-10-24 16:00:28 +02:00
|
|
|
|
2019-10-25 18:10:30 +02:00
|
|
|
spark.createDataset(oafRdd.filter(s -> s instanceof Organization).map(s -> (Organization) s).rdd(), organizationEncoder).write().save(outputPath + "/organizations");
|
|
|
|
spark.createDataset(oafRdd.filter(s -> s instanceof Project).map(s -> (Project) s).rdd(), projectEncoder).write().save(outputPath + "/projects");
|
|
|
|
spark.createDataset(oafRdd.filter(s -> s instanceof Datasource).map(s -> (Datasource) s).rdd(), datasourceEncoder).write().save(outputPath + "/datasources");
|
|
|
|
spark.createDataset(oafRdd.filter(s -> s instanceof eu.dnetlib.dhp.schema.oaf.Dataset).map(s -> (eu.dnetlib.dhp.schema.oaf.Dataset) s).rdd(), datasetEncoder).write().save(outputPath + "/datasets");
|
2019-10-24 17:02:35 +02:00
|
|
|
|
2019-10-25 18:10:30 +02:00
|
|
|
spark.createDataset(oafRdd.filter(s -> s instanceof Publication).map(s -> (Publication) s).rdd(), publicationEncoder).write().save(outputPath + "/publications");
|
|
|
|
spark.createDataset(oafRdd.filter(s -> s instanceof Software).map(s -> (Software) s).rdd(), softwareEncoder).write().save(outputPath + "/software");
|
|
|
|
spark.createDataset(oafRdd.filter(s -> s instanceof OtherResearchProducts).map(s -> (OtherResearchProducts) s).rdd(), otherResearchProductsEncoder).write().save(outputPath + "/otherResearchProducts");
|
2019-10-24 17:02:35 +02:00
|
|
|
|
2019-10-25 18:10:30 +02:00
|
|
|
spark.createDataset(oafRdd.filter(s -> s instanceof Relation).map(s -> (Relation) s).rdd(), relationEncoder).write().save(outputPath + "/relations");
|
2019-10-24 16:00:28 +02:00
|
|
|
}
|
|
|
|
}
|