From 5744a64478aae8e219b863db4ba08d6930bbfc00 Mon Sep 17 00:00:00 2001 From: "sandro.labruzzo" Date: Thu, 24 Oct 2019 16:00:28 +0200 Subject: [PATCH] added module dhp=graph-mapper --- dhp-workflows/dhp-aggregation/pom.xml | 2 +- dhp-workflows/dhp-graph-mapper/pom.xml | 52 +++++++++++ .../eu/dnetlib/dhp/graph/ProtoConverter.java | 92 +++++++++++++++++++ .../dhp/graph/SparkGraphImporterJob.java | 44 +++++++++ .../dhp/graph/SparkGraphImporterJobTest.java | 12 +++ dhp-workflows/pom.xml | 1 + pom.xml | 1 - 7 files changed, 202 insertions(+), 2 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/pom.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml index 8cecaffbed..c327262ae5 100644 --- a/dhp-workflows/dhp-aggregation/pom.xml +++ b/dhp-workflows/dhp-aggregation/pom.xml @@ -8,7 +8,7 @@ dhp-workflows 1.0.0-SNAPSHOT - dhp-aggregations + dhp-aggregation org.apache.spark diff --git a/dhp-workflows/dhp-graph-mapper/pom.xml b/dhp-workflows/dhp-graph-mapper/pom.xml new file mode 100644 index 0000000000..3af9b376ad --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/pom.xml @@ -0,0 +1,52 @@ + + + + dhp-workflows + eu.dnetlib.dhp + 1.0.0-SNAPSHOT + + 4.0.0 + + dhp-graph-mapper + + + + org.apache.spark + spark-core_2.11 + + + + + org.apache.spark + spark-sql_2.11 + + + + eu.dnetlib + dnet-openaire-data-protos + + + + com.googlecode.protobuf-java-format + protobuf-java-format + + + + + + eu.dnetlib.dhp + dhp-common + 1.0.0-SNAPSHOT + + + eu.dnetlib.dhp + dhp-schemas + 1.0.0-SNAPSHOT + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java new file mode 100644 index 0000000000..9f22d7875e --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java @@ -0,0 +1,92 @@ +package eu.dnetlib.dhp.graph; + + +import com.googlecode.protobuf.format.JsonFormat; +import eu.dnetlib.data.proto.KindProtos; +import eu.dnetlib.data.proto.OafProtos; +import eu.dnetlib.dhp.schema.oaf.*; + +import java.io.Serializable; + + +public class ProtoConverter implements Serializable { + + + public static Oaf convert(String s) { + try { + final OafProtos.Oaf.Builder builder = OafProtos.Oaf.newBuilder(); + JsonFormat.merge(s, builder); + + if (builder.getKind() == KindProtos.Kind.entity) + return convertEntity(builder); + else { + return convertRelation(builder); + } + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + private static Relation convertRelation(OafProtos.Oaf.Builder oaf) { + return new Relation(); + } + + private static OafEntity convertEntity(OafProtos.Oaf.Builder oaf) { + + switch (oaf.getEntity().getType()) { + case result: + return convertResult(oaf); + case project: + return convertProject(oaf); + case datasource: + return convertDataSource(oaf); + case organization: + return convertOrganization(oaf); + default: + throw new RuntimeException("received unknown type"); + } + } + + private static Organization convertOrganization(OafProtos.Oaf.Builder oaf) { + return new Organization(); + } + + private static Datasource convertDataSource(OafProtos.Oaf.Builder oaf) { + return new Datasource(); + } + + private static Project convertProject(OafProtos.Oaf.Builder oaf) { + return new Project(); + } + + private static Result convertResult(OafProtos.Oaf.Builder oaf) { + switch (oaf.getEntity().getResult().getMetadata().getResulttype().getClassid()) { + case "dataset": + return createDataset(oaf); + case "publication": + return createPublication(oaf); + case "software": + return createSoftware(oaf); + case "orp": + return createORP(oaf); + default: + throw new RuntimeException("received unknown type :"+oaf.getEntity().getResult().getMetadata().getResulttype().getClassid()); + } + } + + private static Software createSoftware(OafProtos.Oaf.Builder oaf) { + return new Software(); + } + + private static OtherResearchProducts createORP(OafProtos.Oaf.Builder oaf) { + return new OtherResearchProducts(); + } + + private static Publication createPublication(OafProtos.Oaf.Builder oaf) { + return new Publication(); + } + + private static Dataset createDataset(OafProtos.Oaf.Builder oaf) { + return new Dataset(); + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java new file mode 100644 index 0000000000..4ef046631b --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java @@ -0,0 +1,44 @@ +package eu.dnetlib.dhp.graph; + + +import eu.dnetlib.dhp.schema.oaf.Publication; +import org.apache.hadoop.io.Text; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +public class SparkGraphImporterJob { + + + public static void main(String[] args) throws Exception{ + + //TODO add argument parser +// final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/graph_importer_parameters.json"))); +// parser.parseArgument(args); + + final SparkSession spark = SparkSession + .builder() + .appName("ImportGraph") + //TODO replace with: master(parser.get("master")) + .master("local[16]") + .getOrCreate(); + + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + + final JavaRDD> inputRDD = sc.sequenceFile("file:///home/sandro/part-m-02236", Text.class, Text.class).map(item -> new Tuple2<>(item._1.toString(), item._2.toString())); + + final long totalPublication = inputRDD + .filter(s -> s._1().split("@")[2].equalsIgnoreCase("body")) + .map(Tuple2::_2) + .map(ProtoConverter::convert) + .filter(s -> s instanceof Publication) + .count(); + + System.out.println(totalPublication); + + + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java new file mode 100644 index 0000000000..1e561aa3eb --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java @@ -0,0 +1,12 @@ +package eu.dnetlib.dhp.graph; + +import org.junit.Test; + +public class SparkGraphImporterJobTest { + + @Test + public void testImport() throws Exception { + SparkGraphImporterJob.main(null); + } + +} diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml index bb6a53feb0..df6119d3c8 100644 --- a/dhp-workflows/pom.xml +++ b/dhp-workflows/pom.xml @@ -17,6 +17,7 @@ dhp-aggregation dhp-distcp + dhp-graph-mapper diff --git a/pom.xml b/pom.xml index a91114b2ec..5e00ada157 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,6 @@ dhp-schemas dhp-common dhp-workflows - dhp-applications