diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Author.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Author.java index 892c8d4f5..ab66d6824 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Author.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Author.java @@ -15,7 +15,6 @@ public class Author implements Serializable { // json containing a Citation or Statistics private String value; - public String getName() { return name; } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java index ef9d330cd..f00d38583 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java @@ -17,9 +17,6 @@ public abstract class OafEntity extends Oaf implements Serializable { private String dateoftransformation; - //TODO remove this field - private List children; - private List extraInfo; private OAIProvenance oaiprovenance; @@ -79,15 +76,6 @@ public abstract class OafEntity extends Oaf implements Serializable { return this; } - public List getChildren() { - return children; - } - - public OafEntity setChildren(List children) { - this.children = children; - return this; - } - public List getExtraInfo() { return extraInfo; } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OriginDescription.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OriginDescription.java index e593330e4..cba401174 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OriginDescription.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OriginDescription.java @@ -16,7 +16,7 @@ public class OriginDescription implements Serializable { private String metadataNamespace; - private OriginDescription originDescription; + //private OriginDescription originDescription; public String getHarvestDate() { return harvestDate; @@ -72,12 +72,12 @@ public class OriginDescription implements Serializable { return this; } - public OriginDescription getOriginDescription() { - return originDescription; - } - - public OriginDescription setOriginDescription(OriginDescription originDescription) { - this.originDescription = originDescription; - return this; - } +// public OriginDescription getOriginDescription() { +// return originDescription; +// } +// +// public OriginDescription setOriginDescription(OriginDescription originDescription) { +// this.originDescription = originDescription; +// return this; +// } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Qualifier.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Qualifier.java index b586855a1..0320798f0 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Qualifier.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Qualifier.java @@ -9,7 +9,7 @@ public class Qualifier implements Serializable { private String schemeid; private String schemename; - private DataInfo dataInfo; +// private DataInfo dataInfo; public String getClassid() { return classid; @@ -47,12 +47,12 @@ public class Qualifier implements Serializable { return this; } - public DataInfo getDataInfo() { - return dataInfo; - } - - public Qualifier setDataInfo(DataInfo dataInfo) { - this.dataInfo = dataInfo; - return this; - } +// public DataInfo getDataInfo() { +// return dataInfo; +// } +// +// public Qualifier setDataInfo(DataInfo dataInfo) { +// this.dataInfo = dataInfo; +// return this; +// } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java index 0e09ac172..c27d819cc 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java @@ -20,7 +20,7 @@ public class ProtoConverter implements Serializable { if (oaf.getKind() == KindProtos.Kind.entity) return convertEntity(oaf); else { - return convertRelation(oaf); + return convertRelation(oaf); } } catch (Throwable e) { throw new RuntimeException(e); @@ -40,8 +40,8 @@ public class ProtoConverter implements Serializable { .setRelClass(r.getRelClass()) .setCollectedFrom(r.getCollectedfromCount() > 0 ? r.getCollectedfromList().stream() - .map(kv -> mapKV(kv)) - .collect(Collectors.toList()) : null); + .map(kv -> mapKV(kv)) + .collect(Collectors.toList()) : null); } private static OafEntity convertEntity(OafProtos.Oaf oaf) { @@ -64,6 +64,7 @@ public class ProtoConverter implements Serializable { final DatasourceProtos.Datasource.Metadata m = oaf.getEntity().getDatasource().getMetadata(); final Organization org = setOaf(new Organization(), oaf); return setEntity(org, oaf); + //TODO set org fields } @@ -71,11 +72,10 @@ public class ProtoConverter implements Serializable { final DatasourceProtos.Datasource.Metadata m = oaf.getEntity().getDatasource().getMetadata(); final Datasource datasource = setOaf(new Datasource(), oaf); return setEntity(datasource, oaf) - .setAccessinfopackage(m.getAccessinfopackageCount() > 0 ? - m.getAccessinfopackageList() - .stream() - .map(ProtoUtils::mapStringField) - .collect(Collectors.toList()) : null) + .setAccessinfopackage(m.getAccessinfopackageList() + .stream() + .map(ProtoUtils::mapStringField) + .collect(Collectors.toList())) .setCertificates(mapStringField(m.getCertificates())) .setCitationguidelineurl(mapStringField(m.getCitationguidelineurl())) .setContactemail(mapStringField(m.getContactemail())) @@ -94,36 +94,32 @@ public class ProtoConverter implements Serializable { .setLogourl(mapStringField(m.getLogourl())) .setMissionstatementurl(mapStringField(m.getMissionstatementurl())) .setNamespaceprefix(mapStringField(m.getNamespaceprefix())) - .setOdcontenttypes(m.getOdcontenttypesCount() > 0 ? - m.getOdcontenttypesList() - .stream() - .map(ProtoUtils::mapStringField) - .collect(Collectors.toList()) : null) - .setOdlanguages(m.getOdlanguagesCount() > 0 ? - m.getOdlanguagesList() - .stream() - .map(ProtoUtils::mapStringField) - .collect(Collectors.toList()) : null) + .setOdcontenttypes(m.getOdcontenttypesList() + .stream() + .map(ProtoUtils::mapStringField) + .collect(Collectors.toList())) + .setOdlanguages(m.getOdlanguagesList() + .stream() + .map(ProtoUtils::mapStringField) + .collect(Collectors.toList())) .setOdnumberofitems(mapStringField(m.getOdnumberofitems())) .setOdnumberofitemsdate(mapStringField(m.getOdnumberofitemsdate())) .setOdpolicies(mapStringField(m.getOdpolicies())) .setOfficialname(mapStringField(m.getOfficialname())) .setOpenairecompatibility(mapQualifier(m.getOpenairecompatibility())) .setPidsystems(mapStringField(m.getPidsystems())) - .setPolicies(m.getPoliciesCount() > 0 ? - m.getPoliciesList() - .stream() - .map(ProtoUtils::mapKV) - .collect(Collectors.toList()) : null) + .setPolicies(m.getPoliciesList() + .stream() + .map(ProtoUtils::mapKV) + .collect(Collectors.toList())) .setQualitymanagementkind(mapStringField(m.getQualitymanagementkind())) .setReleaseenddate(mapStringField(m.getReleaseenddate())) .setServiceprovider(mapBoolField(m.getServiceprovider())) .setReleasestartdate(mapStringField(m.getReleasestartdate())) - .setSubjects(m.getSubjectsCount() > 0 ? - m.getSubjectsList() - .stream() - .map(ProtoUtils::mapStructuredProperty) - .collect(Collectors.toList()) : null) + .setSubjects(m.getSubjectsList() + .stream() + .map(ProtoUtils::mapStructuredProperty) + .collect(Collectors.toList())) .setVersioning(mapBoolField(m.getVersioning())) .setWebsiteurl(mapStringField(m.getWebsiteurl())) .setJournal(mapJournal(m.getJournal())); @@ -151,16 +147,14 @@ public class ProtoConverter implements Serializable { .setFundedamount(m.getFundedamount()) .setTotalcost(m.getTotalcost()) .setKeywords(mapStringField(m.getKeywords())) - .setSubjects(m.getSubjectsCount() > 0 ? - m.getSubjectsList().stream() - .map(sp -> mapStructuredProperty(sp)) - .collect(Collectors.toList()) : null) + .setSubjects(m.getSubjectsList().stream() + .map(sp -> mapStructuredProperty(sp)) + .collect(Collectors.toList())) .setTitle(mapStringField(m.getTitle())) .setWebsiteurl(mapStringField(m.getWebsiteurl())) - .setFundingtree(m.getFundingtreeCount() > 0 ? - m.getFundingtreeList().stream() - .map(f -> mapStringField(f)) - .collect(Collectors.toList()) : null) + .setFundingtree(m.getFundingtreeList().stream() + .map(f -> mapStringField(f)) + .collect(Collectors.toList())) .setJsonextrainfo(mapStringField(m.getJsonextrainfo())) .setSummary(mapStringField(m.getSummary())) .setOptional1(mapStringField(m.getOptional1())) @@ -179,7 +173,7 @@ public class ProtoConverter implements Serializable { case "orp": return createORP(oaf); default: - throw new RuntimeException("received unknown type :"+oaf.getEntity().getResult().getMetadata().getResulttype().getClassid()); + throw new RuntimeException("received unknown type :" + oaf.getEntity().getResult().getMetadata().getResulttype().getClassid()); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoUtils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoUtils.java index c0a4ae74e..7ea84316a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoUtils.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoUtils.java @@ -63,8 +63,8 @@ public class ProtoUtils { .setClassid(q.getClassid()) .setClassname(q.getClassname()) .setSchemeid(q.getSchemeid()) - .setSchemename(q.getSchemename()) - .setDataInfo(q.hasDataInfo() ? mapDataInfo(q.getDataInfo()) : null); + .setSchemename(q.getSchemename()); + //.setDataInfo(q.hasDataInfo() ? mapDataInfo(q.getDataInfo()) : null); } public static StructuredProperty mapStructuredProperty(FieldTypeProtos.StructuredProperty sp) { @@ -95,8 +95,8 @@ public class ProtoUtils { .setIdentifier(originDescription.getIdentifier()) .setDatestamp(originDescription.getDatestamp()) .setMetadataNamespace(originDescription.getMetadataNamespace()); - if (originDescription.hasOriginDescription()) - originDescriptionResult.setOriginDescription(mapOriginalDescription(originDescription.getOriginDescription())); +// if (originDescription.hasOriginDescription()) +// originDescriptionResult.setOriginDescription(mapOriginalDescription(originDescription.getOriginDescription())); return originDescriptionResult; } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java index a73ed8d75..f0edbc032 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java @@ -1,15 +1,21 @@ package eu.dnetlib.dhp.graph; +import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Publication; import org.apache.hadoop.io.Text; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import scala.Tuple2; +import javax.xml.crypto.Data; + public class SparkGraphImporterJob { @@ -30,20 +36,24 @@ public class SparkGraphImporterJob { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final JavaRDD> inputRDD = sc.sequenceFile("file:///home/sandro/part-m-00000", Text.class, Text.class).map(item -> new Tuple2<>(item._1.toString(), item._2.toString())); + final String path = "file:///home/sandro/part-m-00000"; + final JavaRDD> inputRDD = sc.sequenceFile(path, Text.class, Text.class) + .map(item -> new Tuple2<>(item._1.toString(), item._2.toString())); - Tuple2 item = inputRDD + + + final JavaRDD datasources = inputRDD .filter(s -> s._1().split("@")[2].equalsIgnoreCase("body")) - .first(); - - System.out.println(item._1()); - System.out.println(item._2()); + .map(Tuple2::_2) + .map(ProtoConverter::convert) + .filter(s-> s instanceof Datasource) + .map(s->(Datasource)s); + final Encoder encoder = Encoders.bean(Datasource.class); + final Dataset mdstore = spark.createDataset(datasources.rdd(), encoder); -// .map(Tuple2::_2) -// .map(ProtoConverter::convert) -// .mapToPair((PairFunction) s-> new Tuple2(s.getClass().getName(),1)) -// .reduceByKey(Integer::sum).collect().forEach(System.out::println); + System.out.println(mdstore.count()); + // // // .filter(s -> s instanceof Publication) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/ProtoConverterTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/ProtoConverterTest.java index 3640cb996..fd0edf573 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/ProtoConverterTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/ProtoConverterTest.java @@ -1,5 +1,6 @@ package eu.dnetlib.dhp.graph; +import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Oaf; import org.apache.commons.io.IOUtils; @@ -23,6 +24,9 @@ public class ProtoConverterTest { System.out.println(ds.getId()); + ObjectMapper mapper = new ObjectMapper(); + System.out.println(mapper.writeValueAsString(result)); + }