added module dhp=graph-mapper

This commit is contained in:
Sandro La Bruzzo 2019-10-24 16:00:28 +02:00
parent ed14a40890
commit 5744a64478
7 changed files with 202 additions and 2 deletions

View File

@ -8,7 +8,7 @@
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<version>1.0.0-SNAPSHOT</version> <version>1.0.0-SNAPSHOT</version>
</parent> </parent>
<artifactId>dhp-aggregations</artifactId> <artifactId>dhp-aggregation</artifactId>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>

View File

@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-graph-mapper</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-openaire-data-protos</artifactId>
</dependency>
<dependency>
<groupId>com.googlecode.protobuf-java-format</groupId>
<artifactId>protobuf-java-format</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,92 @@
package eu.dnetlib.dhp.graph;
import com.googlecode.protobuf.format.JsonFormat;
import eu.dnetlib.data.proto.KindProtos;
import eu.dnetlib.data.proto.OafProtos;
import eu.dnetlib.dhp.schema.oaf.*;
import java.io.Serializable;
public class ProtoConverter implements Serializable {
public static Oaf convert(String s) {
try {
final OafProtos.Oaf.Builder builder = OafProtos.Oaf.newBuilder();
JsonFormat.merge(s, builder);
if (builder.getKind() == KindProtos.Kind.entity)
return convertEntity(builder);
else {
return convertRelation(builder);
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
private static Relation convertRelation(OafProtos.Oaf.Builder oaf) {
return new Relation();
}
private static OafEntity convertEntity(OafProtos.Oaf.Builder oaf) {
switch (oaf.getEntity().getType()) {
case result:
return convertResult(oaf);
case project:
return convertProject(oaf);
case datasource:
return convertDataSource(oaf);
case organization:
return convertOrganization(oaf);
default:
throw new RuntimeException("received unknown type");
}
}
private static Organization convertOrganization(OafProtos.Oaf.Builder oaf) {
return new Organization();
}
private static Datasource convertDataSource(OafProtos.Oaf.Builder oaf) {
return new Datasource();
}
private static Project convertProject(OafProtos.Oaf.Builder oaf) {
return new Project();
}
private static Result convertResult(OafProtos.Oaf.Builder oaf) {
switch (oaf.getEntity().getResult().getMetadata().getResulttype().getClassid()) {
case "dataset":
return createDataset(oaf);
case "publication":
return createPublication(oaf);
case "software":
return createSoftware(oaf);
case "orp":
return createORP(oaf);
default:
throw new RuntimeException("received unknown type :"+oaf.getEntity().getResult().getMetadata().getResulttype().getClassid());
}
}
private static Software createSoftware(OafProtos.Oaf.Builder oaf) {
return new Software();
}
private static OtherResearchProducts createORP(OafProtos.Oaf.Builder oaf) {
return new OtherResearchProducts();
}
private static Publication createPublication(OafProtos.Oaf.Builder oaf) {
return new Publication();
}
private static Dataset createDataset(OafProtos.Oaf.Builder oaf) {
return new Dataset();
}
}

View File

@ -0,0 +1,44 @@
package eu.dnetlib.dhp.graph;
import eu.dnetlib.dhp.schema.oaf.Publication;
import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
public class SparkGraphImporterJob {
public static void main(String[] args) throws Exception{
//TODO add argument parser
// final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/graph_importer_parameters.json")));
// parser.parseArgument(args);
final SparkSession spark = SparkSession
.builder()
.appName("ImportGraph")
//TODO replace with: master(parser.get("master"))
.master("local[16]")
.getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final JavaRDD<Tuple2<String, String>> inputRDD = sc.sequenceFile("file:///home/sandro/part-m-02236", Text.class, Text.class).map(item -> new Tuple2<>(item._1.toString(), item._2.toString()));
final long totalPublication = inputRDD
.filter(s -> s._1().split("@")[2].equalsIgnoreCase("body"))
.map(Tuple2::_2)
.map(ProtoConverter::convert)
.filter(s -> s instanceof Publication)
.count();
System.out.println(totalPublication);
}
}

View File

@ -0,0 +1,12 @@
package eu.dnetlib.dhp.graph;
import org.junit.Test;
public class SparkGraphImporterJobTest {
@Test
public void testImport() throws Exception {
SparkGraphImporterJob.main(null);
}
}

View File

@ -17,6 +17,7 @@
<modules> <modules>
<module>dhp-aggregation</module> <module>dhp-aggregation</module>
<module>dhp-distcp</module> <module>dhp-distcp</module>
<module>dhp-graph-mapper</module>
</modules> </modules>
<pluginRepositories> <pluginRepositories>

View File

@ -24,7 +24,6 @@
<module>dhp-schemas</module> <module>dhp-schemas</module>
<module>dhp-common</module> <module>dhp-common</module>
<module>dhp-workflows</module> <module>dhp-workflows</module>
<module>dhp-applications</module>
</modules> </modules>
<issueManagement> <issueManagement>