From d2965636e06fbedbf5f7481e481d5dd66d7e3798 Mon Sep 17 00:00:00 2001 From: "sandro.labruzzo" Date: Thu, 24 Oct 2019 17:02:35 +0200 Subject: [PATCH] created test for convert json into new OAF data model --- .../eu/dnetlib/dhp/graph/ProtoConverter.java | 22 +++++++++++++- .../dhp/graph/SparkGraphImporterJob.java | 26 +++++++++++----- .../dnetlib/dhp/graph/ProtoConverterTest.java | 30 +++++++++++++++++++ .../eu/dnetlib/dhp/graph/organization.json | 1 + 4 files changed, 71 insertions(+), 8 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/ProtoConverterTest.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/graph/organization.json diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java index 9f22d7875..9b519beba 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java @@ -7,6 +7,7 @@ import eu.dnetlib.data.proto.OafProtos; import eu.dnetlib.dhp.schema.oaf.*; import java.io.Serializable; +import java.util.stream.Collectors; public class ProtoConverter implements Serializable { @@ -52,7 +53,26 @@ public class ProtoConverter implements Serializable { } private static Datasource convertDataSource(OafProtos.Oaf.Builder oaf) { - return new Datasource(); + final Datasource result = new Datasource(); + + //setting oaf field + //TODO waiting claudio for this method + //result.setDataInfo(DataInfo.fromOaf(oaf.getDataInfo())); + result.setLastupdatetimestamp(oaf.getLastupdatetimestamp()); + + //setting Entity fields + result.setId(oaf.getEntity().getId()); + result.setOriginalId(oaf.getEntity().getOriginalIdList()); + + //TODO waiting claudio for this method + result.setCollectedfrom(oaf.getEntity().getCollectedfromList() + .stream() + .map(s->new KeyValue()) + .collect(Collectors.toList())); + + + + return result; } private static Project convertProject(OafProtos.Oaf.Builder oaf) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java index 4ef046631..a73ed8d75 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java @@ -1,10 +1,12 @@ package eu.dnetlib.dhp.graph; +import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Publication; import org.apache.hadoop.io.Text; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.SparkSession; import scala.Tuple2; @@ -28,16 +30,26 @@ public class SparkGraphImporterJob { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final JavaRDD> inputRDD = sc.sequenceFile("file:///home/sandro/part-m-02236", Text.class, Text.class).map(item -> new Tuple2<>(item._1.toString(), item._2.toString())); + final JavaRDD> inputRDD = sc.sequenceFile("file:///home/sandro/part-m-00000", Text.class, Text.class).map(item -> new Tuple2<>(item._1.toString(), item._2.toString())); - final long totalPublication = inputRDD + Tuple2 item = inputRDD .filter(s -> s._1().split("@")[2].equalsIgnoreCase("body")) - .map(Tuple2::_2) - .map(ProtoConverter::convert) - .filter(s -> s instanceof Publication) - .count(); + .first(); + + System.out.println(item._1()); + System.out.println(item._2()); + + +// .map(Tuple2::_2) +// .map(ProtoConverter::convert) +// .mapToPair((PairFunction) s-> new Tuple2(s.getClass().getName(),1)) +// .reduceByKey(Integer::sum).collect().forEach(System.out::println); +// +// +// .filter(s -> s instanceof Publication) +// .count(); + - System.out.println(totalPublication); } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/ProtoConverterTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/ProtoConverterTest.java new file mode 100644 index 000000000..3640cb996 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/ProtoConverterTest.java @@ -0,0 +1,30 @@ +package eu.dnetlib.dhp.graph; + +import eu.dnetlib.dhp.schema.oaf.Datasource; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import org.apache.commons.io.IOUtils; +import static org.junit.Assert.*; +import org.junit.Test; + +public class ProtoConverterTest { + + + @Test + public void convertDatasourceTest() throws Exception { + final String json = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/graph/organization.json")); + + Oaf result = ProtoConverter.convert(json); + + assertNotNull(result); + assertTrue(result instanceof Datasource); + Datasource ds = (Datasource) result; + assertNotNull(ds.getId()); + + System.out.println(ds.getId()); + + + + + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/graph/organization.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/graph/organization.json new file mode 100644 index 000000000..e20e39b98 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/graph/organization.json @@ -0,0 +1 @@ +{"kind": "entity","entity": {"type": "datasource","datasource": {"metadata": {"officialname": {"value": "CRIS UNS (Current Research Information System University of Novi Sad)"},"englishname": {"value": "CRIS UNS (Current Research Information System University of Novi Sad)"},"websiteurl": {"value": "https://cris.uns.ac.rs/"},"accessinfopackage": [{"value": "https://cris.uns.ac.rs/OAIHandlerOpenAIRECRIS"}],"namespaceprefix": {"value": "CrisUnsNoviS"},"datasourcetype": {"classid": "crissystem","classname": "CRIS System","schemeid": "dnet:datasource_typologies","schemename": "dnet:datasource_typologies"},"openairecompatibility": {"classid": "openaire-cris_1.1","classname": "OpenAIRE CRIS v1.1","schemeid": "dnet:datasourceCompatibilityLevel","schemename": "dnet:datasourceCompatibilityLevel"},"latitude": {"value": "0.0"},"longitude": {"value": "0.0"},"journal": {"issnPrinted": "","issnOnline": "","issnLinking": ""}}},"originalId": ["CRIS_UNS____::openaire"],"collectedfrom": [{"key": "","value": ""}],"dateofcollection": "2019-04-04","id": "10|CRIS_UNS____::f66f1bd369679b5b077dcdf006089556","dateoftransformation": ""},"dataInfo": {"inferred": false,"deletedbyinference": false,"trust": "0.9","provenanceaction": {"classid": "sysimport:crosswalk:entityregistry","classname": "sysimport:crosswalk:entityregistry","schemeid": "dnet:provenance_actions","schemename": "dnet:provenance_actions"}}} \ No newline at end of file