forked from D-Net/dnet-hadoop
created test for convert json into new OAF data model
This commit is contained in:
parent
5744a64478
commit
d2965636e0
|
@ -7,6 +7,7 @@ import eu.dnetlib.data.proto.OafProtos;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
|
||||||
public class ProtoConverter implements Serializable {
|
public class ProtoConverter implements Serializable {
|
||||||
|
@ -52,7 +53,26 @@ public class ProtoConverter implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Datasource convertDataSource(OafProtos.Oaf.Builder oaf) {
|
private static Datasource convertDataSource(OafProtos.Oaf.Builder oaf) {
|
||||||
return new Datasource();
|
final Datasource result = new Datasource();
|
||||||
|
|
||||||
|
//setting oaf field
|
||||||
|
//TODO waiting claudio for this method
|
||||||
|
//result.setDataInfo(DataInfo.fromOaf(oaf.getDataInfo()));
|
||||||
|
result.setLastupdatetimestamp(oaf.getLastupdatetimestamp());
|
||||||
|
|
||||||
|
//setting Entity fields
|
||||||
|
result.setId(oaf.getEntity().getId());
|
||||||
|
result.setOriginalId(oaf.getEntity().getOriginalIdList());
|
||||||
|
|
||||||
|
//TODO waiting claudio for this method
|
||||||
|
result.setCollectedfrom(oaf.getEntity().getCollectedfromList()
|
||||||
|
.stream()
|
||||||
|
.map(s->new KeyValue())
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Project convertProject(OafProtos.Oaf.Builder oaf) {
|
private static Project convertProject(OafProtos.Oaf.Builder oaf) {
|
||||||
|
|
|
@ -1,10 +1,12 @@
|
||||||
package eu.dnetlib.dhp.graph;
|
package eu.dnetlib.dhp.graph;
|
||||||
|
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.PairFunction;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
@ -28,16 +30,26 @@ public class SparkGraphImporterJob {
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
|
||||||
final JavaRDD<Tuple2<String, String>> inputRDD = sc.sequenceFile("file:///home/sandro/part-m-02236", Text.class, Text.class).map(item -> new Tuple2<>(item._1.toString(), item._2.toString()));
|
final JavaRDD<Tuple2<String, String>> inputRDD = sc.sequenceFile("file:///home/sandro/part-m-00000", Text.class, Text.class).map(item -> new Tuple2<>(item._1.toString(), item._2.toString()));
|
||||||
|
|
||||||
final long totalPublication = inputRDD
|
Tuple2<String, String> item = inputRDD
|
||||||
.filter(s -> s._1().split("@")[2].equalsIgnoreCase("body"))
|
.filter(s -> s._1().split("@")[2].equalsIgnoreCase("body"))
|
||||||
.map(Tuple2::_2)
|
.first();
|
||||||
.map(ProtoConverter::convert)
|
|
||||||
.filter(s -> s instanceof Publication)
|
System.out.println(item._1());
|
||||||
.count();
|
System.out.println(item._2());
|
||||||
|
|
||||||
|
|
||||||
|
// .map(Tuple2::_2)
|
||||||
|
// .map(ProtoConverter::convert)
|
||||||
|
// .mapToPair((PairFunction<Oaf, String,Integer>) s-> new Tuple2<String, Integer>(s.getClass().getName(),1))
|
||||||
|
// .reduceByKey(Integer::sum).collect().forEach(System.out::println);
|
||||||
|
//
|
||||||
|
//
|
||||||
|
// .filter(s -> s instanceof Publication)
|
||||||
|
// .count();
|
||||||
|
|
||||||
|
|
||||||
System.out.println(totalPublication);
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
package eu.dnetlib.dhp.graph;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class ProtoConverterTest {
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void convertDatasourceTest() throws Exception {
|
||||||
|
final String json = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/graph/organization.json"));
|
||||||
|
|
||||||
|
Oaf result = ProtoConverter.convert(json);
|
||||||
|
|
||||||
|
assertNotNull(result);
|
||||||
|
assertTrue(result instanceof Datasource);
|
||||||
|
Datasource ds = (Datasource) result;
|
||||||
|
assertNotNull(ds.getId());
|
||||||
|
|
||||||
|
System.out.println(ds.getId());
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1 @@
|
||||||
|
{"kind": "entity","entity": {"type": "datasource","datasource": {"metadata": {"officialname": {"value": "CRIS UNS (Current Research Information System University of Novi Sad)"},"englishname": {"value": "CRIS UNS (Current Research Information System University of Novi Sad)"},"websiteurl": {"value": "https://cris.uns.ac.rs/"},"accessinfopackage": [{"value": "https://cris.uns.ac.rs/OAIHandlerOpenAIRECRIS"}],"namespaceprefix": {"value": "CrisUnsNoviS"},"datasourcetype": {"classid": "crissystem","classname": "CRIS System","schemeid": "dnet:datasource_typologies","schemename": "dnet:datasource_typologies"},"openairecompatibility": {"classid": "openaire-cris_1.1","classname": "OpenAIRE CRIS v1.1","schemeid": "dnet:datasourceCompatibilityLevel","schemename": "dnet:datasourceCompatibilityLevel"},"latitude": {"value": "0.0"},"longitude": {"value": "0.0"},"journal": {"issnPrinted": "","issnOnline": "","issnLinking": ""}}},"originalId": ["CRIS_UNS____::openaire"],"collectedfrom": [{"key": "","value": ""}],"dateofcollection": "2019-04-04","id": "10|CRIS_UNS____::f66f1bd369679b5b077dcdf006089556","dateoftransformation": ""},"dataInfo": {"inferred": false,"deletedbyinference": false,"trust": "0.9","provenanceaction": {"classid": "sysimport:crosswalk:entityregistry","classname": "sysimport:crosswalk:entityregistry","schemeid": "dnet:provenance_actions","schemename": "dnet:provenance_actions"}}}
|
Loading…
Reference in New Issue