moved protoutils function to dhp-schemas

This commit is contained in:
Sandro La Bruzzo 2019-10-31 11:31:37 +01:00
parent 997e57d45b
commit 18ec8e8147
7 changed files with 28 additions and 61 deletions

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.graph; package eu.dnetlib.dhp.schema.util;
import eu.dnetlib.data.proto.*; import eu.dnetlib.data.proto.*;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
@ -6,7 +6,7 @@ import eu.dnetlib.dhp.schema.oaf.*;
import java.io.Serializable; import java.io.Serializable;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static eu.dnetlib.dhp.graph.ProtoUtils.*; import static eu.dnetlib.dhp.schema.util.ProtoUtils.*;
public class ProtoConverter implements Serializable { public class ProtoConverter implements Serializable {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.graph; package eu.dnetlib.dhp.schema.util;
import com.googlecode.protobuf.format.JsonFormat; import com.googlecode.protobuf.format.JsonFormat;
import eu.dnetlib.data.proto.FieldTypeProtos; import eu.dnetlib.data.proto.FieldTypeProtos;

View File

@ -44,6 +44,11 @@
<version>1.0.0-SNAPSHOT</version> <version>1.0.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
</dependency>
</dependencies> </dependencies>

View File

@ -2,6 +2,7 @@ package eu.dnetlib.dhp.graph;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.util.ProtoConverter;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -49,22 +50,22 @@ public class SparkGraphImporterJob {
final Encoder<Relation> relationEncoder = Encoders.bean(Relation.class); final Encoder<Relation> relationEncoder = Encoders.bean(Relation.class);
if (filter == null|| filter.toLowerCase().contains("organization")) if (StringUtils.isBlank(filter)|| filter.toLowerCase().contains("organization"))
spark.createDataset(oafRdd.filter(s -> s instanceof Organization).map(s -> (Organization) s).rdd(), organizationEncoder).write().save(outputPath + "/organizations"); spark.createDataset(oafRdd.filter(s -> s instanceof Organization).map(s -> (Organization) s).rdd(), organizationEncoder).write().save(outputPath + "/organizations");
if (filter == null|| filter.toLowerCase().contains("project")) if (StringUtils.isBlank(filter)|| filter.toLowerCase().contains("project"))
spark.createDataset(oafRdd.filter(s -> s instanceof Project).map(s -> (Project) s).rdd(), projectEncoder).write().save(outputPath + "/projects"); spark.createDataset(oafRdd.filter(s -> s instanceof Project).map(s -> (Project) s).rdd(), projectEncoder).write().save(outputPath + "/projects");
if (filter == null|| filter.toLowerCase().contains("datasource")) if (StringUtils.isBlank(filter)|| filter.toLowerCase().contains("datasource"))
spark.createDataset(oafRdd.filter(s -> s instanceof Datasource).map(s -> (Datasource) s).rdd(), datasourceEncoder).write().save(outputPath + "/datasources"); spark.createDataset(oafRdd.filter(s -> s instanceof Datasource).map(s -> (Datasource) s).rdd(), datasourceEncoder).write().save(outputPath + "/datasources");
if (filter == null|| filter.toLowerCase().contains("dataset")) if (StringUtils.isBlank(filter)|| filter.toLowerCase().contains("dataset"))
spark.createDataset(oafRdd.filter(s -> s instanceof eu.dnetlib.dhp.schema.oaf.Dataset).map(s -> (eu.dnetlib.dhp.schema.oaf.Dataset) s).rdd(), datasetEncoder).write().save(outputPath + "/datasets"); spark.createDataset(oafRdd.filter(s -> s instanceof eu.dnetlib.dhp.schema.oaf.Dataset).map(s -> (eu.dnetlib.dhp.schema.oaf.Dataset) s).rdd(), datasetEncoder).write().save(outputPath + "/datasets");
if (filter == null|| filter.toLowerCase().contains("publication")) if (StringUtils.isBlank(filter)|| filter.toLowerCase().contains("publication"))
spark.createDataset(oafRdd.filter(s -> s instanceof Publication).map(s -> (Publication) s).rdd(), publicationEncoder).write().save(outputPath + "/publications"); spark.createDataset(oafRdd.filter(s -> s instanceof Publication).map(s -> (Publication) s).rdd(), publicationEncoder).write().save(outputPath + "/publications");
if (filter == null|| filter.toLowerCase().contains("software")) if (StringUtils.isBlank(filter)|| filter.toLowerCase().contains("software"))
spark.createDataset(oafRdd.filter(s -> s instanceof Software).map(s -> (Software) s).rdd(), softwareEncoder).write().save(outputPath + "/software"); spark.createDataset(oafRdd.filter(s -> s instanceof Software).map(s -> (Software) s).rdd(), softwareEncoder).write().save(outputPath + "/software");
if (filter == null|| filter.toLowerCase().contains("otherresearchproduct")) if (StringUtils.isBlank(filter)|| filter.toLowerCase().contains("otherresearchproduct"))
spark.createDataset(oafRdd.filter(s -> s instanceof OtherResearchProducts).map(s -> (OtherResearchProducts) s).rdd(), otherResearchProductsEncoder).write().save(outputPath + "/otherResearchProducts"); spark.createDataset(oafRdd.filter(s -> s instanceof OtherResearchProducts).map(s -> (OtherResearchProducts) s).rdd(), otherResearchProductsEncoder).write().save(outputPath + "/otherResearchProducts");
if (filter == null|| filter.toLowerCase().contains("relation")) if (StringUtils.isBlank(filter)|| filter.toLowerCase().contains("relation"))
spark.createDataset(oafRdd.filter(s -> s instanceof Relation).map(s -> (Relation) s).rdd(), relationEncoder).write().save(outputPath + "/relations"); spark.createDataset(oafRdd.filter(s -> s instanceof Relation).map(s -> (Relation) s).rdd(), relationEncoder).write().save(outputPath + "/relations");
} }
} }

View File

@ -2,6 +2,7 @@ package eu.dnetlib.dhp.graph;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.util.ProtoConverter;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.junit.Test; import org.junit.Test;

View File

@ -8,7 +8,8 @@ public class SparkGraphImporterJobTest {
@Test @Test
@Ignore @Ignore
public void testImport() throws Exception { public void testImport() throws Exception {
SparkGraphImporterJob.main(new String[]{"-mt", "local[*]","-i", "/home/sandro/part-m-02236", "-o", "/tmp/dataframes", "-f", "software,relation"}); SparkGraphImporterJob.main(new String[]{"-mt", "local[*]","-i", "/home/sandro/part-m-02236", "-o", "/tmp/dataframes", "-f", "publication"});
} }
} }

57
pom.xml
View File

@ -47,7 +47,6 @@
</pluginRepositories> </pluginRepositories>
<repositories> <repositories>
<repository> <repository>
<id>dnet45-releases</id> <id>dnet45-releases</id>
<name>D-Net 45 releases</name> <name>D-Net 45 releases</name>
@ -60,22 +59,6 @@
<enabled>true</enabled> <enabled>true</enabled>
</releases> </releases>
</repository> </repository>
<!--
<repository>
<id>dnet45-bootstrap-release</id>
<name>dnet45 bootstrap release</name>
<url>http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-bootstrap-release</url>
<layout>default</layout>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
-->
<repository> <repository>
<id>cloudera</id> <id>cloudera</id>
<name>Cloudera Repository</name> <name>Cloudera Repository</name>
@ -183,6 +166,12 @@
<version>1.1.6</version> <version>1.1.6</version>
</dependency> </dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.45</version>
</dependency>
<dependency> <dependency>
<groupId>net.schmizz</groupId> <groupId>net.schmizz</groupId>
<artifactId>sshj</artifactId> <artifactId>sshj</artifactId>
@ -252,7 +241,7 @@
<dependency> <dependency>
<groupId>eu.dnetlib</groupId> <groupId>eu.dnetlib</groupId>
<artifactId>dnet-openaire-data-protos</artifactId> <artifactId>dnet-openaire-data-protos</artifactId>
<version>3.9.5-proto250</version> <version>3.9.5</version>
</dependency> </dependency>
</dependencies> </dependencies>
@ -326,30 +315,6 @@
<redirectTestOutputToFile>true</redirectTestOutputToFile> <redirectTestOutputToFile>true</redirectTestOutputToFile>
</configuration> </configuration>
</plugin> </plugin>
<!--
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.19.1</version>
<executions>
<execution>
<id>default-integration-test</id>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
<configuration>
<groups>eu.dnetlib.dhp.common.IntegrationTest</groups>
<includes>
<include>**/*Test.java</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
-->
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId> <artifactId>maven-javadoc-plugin</artifactId>
@ -369,10 +334,8 @@
<artifactId>build-helper-maven-plugin</artifactId> <artifactId>build-helper-maven-plugin</artifactId>
<version>1.12</version> <version>1.12</version>
</plugin> </plugin>
</plugins> </plugins>
</pluginManagement> </pluginManagement>
<plugins> <plugins>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
@ -415,10 +378,7 @@
<version>2.10</version> <version>2.10</version>
</extension> </extension>
</extensions> </extensions>
</build> </build>
<distributionManagement> <distributionManagement>
<snapshotRepository> <snapshotRepository>
<id>dnet45-snapshots</id> <id>dnet45-snapshots</id>
@ -431,7 +391,6 @@
<url>http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases</url> <url>http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases</url>
</repository> </repository>
</distributionManagement> </distributionManagement>
<reporting> <reporting>
<plugins> <plugins>
<plugin> <plugin>
@ -456,7 +415,7 @@
<dhp.jackson.version>2.9.6</dhp.jackson.version> <dhp.jackson.version>2.9.6</dhp.jackson.version>
<dhp.commons.lang.version>3.5</dhp.commons.lang.version> <dhp.commons.lang.version>3.5</dhp.commons.lang.version>
<scala.version>2.11.8</scala.version> <scala.version>2.11.8</scala.version>
<google.protobuf.version>2.5.0</google.protobuf.version> <google.protobuf.version>2.4.1</google.protobuf.version>
</properties> </properties>
</project> </project>