diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Datasource.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Datasource.java index 354045ba7..21408a5ec 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Datasource.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Datasource.java @@ -65,7 +65,7 @@ public class Datasource extends OafEntity implements Serializable { private Field databaseaccessrestriction; // {feeRequired, registration, other} - private Field datauploadrestriction9; + private Field datauploadrestriction; private Field versioning; @@ -298,12 +298,12 @@ public class Datasource extends OafEntity implements Serializable { this.databaseaccessrestriction = databaseaccessrestriction; } - public Field getDatauploadrestriction9() { - return datauploadrestriction9; + public Field getDatauploadrestriction() { + return datauploadrestriction; } - public void setDatauploadrestriction9(Field datauploadrestriction9) { - this.datauploadrestriction9 = datauploadrestriction9; + public void setDatauploadrestriction(Field datauploadrestriction) { + this.datauploadrestriction = datauploadrestriction; } public Field getVersioning() { diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Journal.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Journal.java index 06e5ca7dc..b0a53e5c4 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Journal.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Journal.java @@ -32,95 +32,107 @@ public class Journal implements Serializable { return name; } - public void setName(String name) { + public Journal setName(String name) { this.name = name; + return this; } public String getIssnPrinted() { return issnPrinted; } - public void setIssnPrinted(String issnPrinted) { + public Journal setIssnPrinted(String issnPrinted) { this.issnPrinted = issnPrinted; + return this; } public String getIssnOnline() { return issnOnline; } - public void setIssnOnline(String issnOnline) { + public Journal setIssnOnline(String issnOnline) { this.issnOnline = issnOnline; + return this; } public String getIssnLinking() { return issnLinking; } - public void setIssnLinking(String issnLinking) { + public Journal setIssnLinking(String issnLinking) { this.issnLinking = issnLinking; + return this; } public String getEp() { return ep; } - public void setEp(String ep) { + public Journal setEp(String ep) { this.ep = ep; + return this; } public String getIss() { return iss; } - public void setIss(String iss) { + public Journal setIss(String iss) { this.iss = iss; + return this; } public String getSp() { return sp; } - public void setSp(String sp) { + public Journal setSp(String sp) { this.sp = sp; + return this; } public String getVol() { return vol; } - public void setVol(String vol) { + public Journal setVol(String vol) { this.vol = vol; + return this; } public String getEdition() { return edition; } - public void setEdition(String edition) { + public Journal setEdition(String edition) { this.edition = edition; + return this; } public String getConferenceplace() { return conferenceplace; } - public void setConferenceplace(String conferenceplace) { + public Journal setConferenceplace(String conferenceplace) { this.conferenceplace = conferenceplace; + return this; } public String getConferencedate() { return conferencedate; } - public void setConferencedate(String conferencedate) { + public Journal setConferencedate(String conferencedate) { this.conferencedate = conferencedate; + return this; } public DataInfo getDataInfo() { return dataInfo; } - public void setDataInfo(DataInfo dataInfo) { + public Journal setDataInfo(DataInfo dataInfo) { this.dataInfo = dataInfo; + return this; } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java index 98f435fab..e5d07539f 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java @@ -18,7 +18,7 @@ public abstract class OafEntity extends Oaf implements Serializable { private String dateoftransformation; //TODO remove this field - private List children; +// private List children; private List extraInfo; @@ -73,13 +73,13 @@ public abstract class OafEntity extends Oaf implements Serializable { this.dateoftransformation = dateoftransformation; } - public List getChildren() { - return children; - } - - public void setChildren(List children) { - this.children = children; - } +// public List getChildren() { +// return children; +// } +// +// public void setChildren(List children) { +// this.children = children; +// } public List getExtraInfo() { return extraInfo; diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OriginDescription.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OriginDescription.java index e593330e4..cba401174 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OriginDescription.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OriginDescription.java @@ -16,7 +16,7 @@ public class OriginDescription implements Serializable { private String metadataNamespace; - private OriginDescription originDescription; + //private OriginDescription originDescription; public String getHarvestDate() { return harvestDate; @@ -72,12 +72,12 @@ public class OriginDescription implements Serializable { return this; } - public OriginDescription getOriginDescription() { - return originDescription; - } - - public OriginDescription setOriginDescription(OriginDescription originDescription) { - this.originDescription = originDescription; - return this; - } +// public OriginDescription getOriginDescription() { +// return originDescription; +// } +// +// public OriginDescription setOriginDescription(OriginDescription originDescription) { +// this.originDescription = originDescription; +// return this; +// } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Qualifier.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Qualifier.java index b586855a1..0320798f0 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Qualifier.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Qualifier.java @@ -9,7 +9,7 @@ public class Qualifier implements Serializable { private String schemeid; private String schemename; - private DataInfo dataInfo; +// private DataInfo dataInfo; public String getClassid() { return classid; @@ -47,12 +47,12 @@ public class Qualifier implements Serializable { return this; } - public DataInfo getDataInfo() { - return dataInfo; - } - - public Qualifier setDataInfo(DataInfo dataInfo) { - this.dataInfo = dataInfo; - return this; - } +// public DataInfo getDataInfo() { +// return dataInfo; +// } +// +// public Qualifier setDataInfo(DataInfo dataInfo) { +// this.dataInfo = dataInfo; +// return this; +// } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java index 10473f6d9..f0fde32f3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java @@ -20,7 +20,7 @@ public class ProtoConverter implements Serializable { if (oaf.getKind() == KindProtos.Kind.entity) return convertEntity(oaf); else { - return convertRelation(oaf); + return convertRelation(oaf); } } catch (Throwable e) { throw new RuntimeException(e); @@ -40,8 +40,8 @@ public class ProtoConverter implements Serializable { .setRelClass(r.getRelClass()) .setCollectedFrom(r.getCollectedfromCount() > 0 ? r.getCollectedfromList().stream() - .map(kv -> mapKV(kv)) - .collect(Collectors.toList()) : null); + .map(kv -> mapKV(kv)) + .collect(Collectors.toList()) : null); } private static OafEntity convertEntity(OafProtos.Oaf oaf) { @@ -151,6 +151,28 @@ public class ProtoConverter implements Serializable { //TODO r3data fields + result.setReleasestartdate(mapStringField(datasource.getReleasestartdate())); + result.setReleaseenddate(mapStringField(datasource.getReleaseenddate())); + result.setMissionstatementurl(mapStringField(datasource.getMissionstatementurl())); + result.setDataprovider(mapBoolField(datasource.getDataprovider())); + result.setServiceprovider(mapBoolField(datasource.getServiceprovider())); + result.setDatabaseaccesstype(mapStringField(datasource.getDatabaseaccesstype())); + result.setDatauploadtype(mapStringField(datasource.getDatauploadtype())); + result.setDatabaseaccessrestriction(mapStringField(datasource.getDatabaseaccessrestriction())); + result.setDatauploadrestriction(mapStringField(datasource.getDatauploadrestriction())); + result.setVersioning(mapBoolField(datasource.getVersioning())); + result.setCitationguidelineurl(mapStringField(datasource.getCitationguidelineurl())); + result.setQualitymanagementkind(mapStringField(datasource.getQualitymanagementkind())); + result.setPidsystems(mapStringField(datasource.getPidsystems())); + result.setCertificates(mapStringField(datasource.getCertificates())); + result.setPolicies(datasource.getPoliciesList() + .stream() + .map(ProtoUtils::mapKV) + .collect(Collectors.toList()) + ); + + result.setJournal(mapJournal(datasource.getJournal())); + return result; } @@ -180,14 +202,14 @@ public class ProtoConverter implements Serializable { .setKeywords(mapStringField(m.getKeywords())) .setSubjects(m.getSubjectsCount() > 0 ? m.getSubjectsList().stream() - .map(sp -> mapStructuredProperty(sp)) - .collect(Collectors.toList()) : null) + .map(sp -> mapStructuredProperty(sp)) + .collect(Collectors.toList()) : null) .setTitle(mapStringField(m.getTitle())) .setWebsiteurl(mapStringField(m.getWebsiteurl())) .setFundingtree(m.getFundingtreeCount() > 0 ? m.getFundingtreeList().stream() - .map(f -> mapStringField(f)) - .collect(Collectors.toList()) : null) + .map(f -> mapStringField(f)) + .collect(Collectors.toList()) : null) .setJsonextrainfo(mapStringField(m.getJsonextrainfo())) .setSummary(mapStringField(m.getSummary())) .setOptional1(mapStringField(m.getOptional1())) @@ -206,7 +228,7 @@ public class ProtoConverter implements Serializable { case "orp": return createORP(oaf); default: - throw new RuntimeException("received unknown type :"+oaf.getEntity().getResult().getMetadata().getResulttype().getClassid()); + throw new RuntimeException("received unknown type :" + oaf.getEntity().getResult().getMetadata().getResulttype().getClassid()); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoUtils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoUtils.java index 97b9c3297..90c590b66 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoUtils.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoUtils.java @@ -34,8 +34,8 @@ public class ProtoUtils { .setClassid(q.getClassid()) .setClassname(q.getClassname()) .setSchemeid(q.getSchemeid()) - .setSchemename(q.getSchemename()) - .setDataInfo(q.hasDataInfo() ? mapDataInfo(q.getDataInfo()) : null); + .setSchemename(q.getSchemename()); + //.setDataInfo(q.hasDataInfo() ? mapDataInfo(q.getDataInfo()) : null); } public static StructuredProperty mapStructuredProperty(FieldTypeProtos.StructuredProperty sp) { @@ -68,10 +68,30 @@ public class ProtoUtils { .setIdentifier(originDescription.getIdentifier()) .setDatestamp(originDescription.getDatestamp()) .setMetadataNamespace(originDescription.getMetadataNamespace()); - if (originDescription.hasOriginDescription()) - originDescriptionResult.setOriginDescription(mapOriginalDescription(originDescription.getOriginDescription())); +// if (originDescription.hasOriginDescription()) +// originDescriptionResult.setOriginDescription(mapOriginalDescription(originDescription.getOriginDescription())); return originDescriptionResult; + } + + public static Journal mapJournal(FieldTypeProtos.Journal journal) { + return new Journal() + .setName(journal.getName()) + .setIssnPrinted(journal.getIssnPrinted()) + .setIssnOnline(journal.getIssnOnline()) + .setIssnLinking(journal.getIssnLinking()) + .setEp(journal.getEp()) + .setIss(journal.getIss()) + .setSp(journal.getSp()) + .setVol(journal.getVol()) + .setEdition(journal.getEdition()) + .setConferenceplace(journal.getConferenceplace()) + .setConferencedate(journal.getConferencedate()) + .setDataInfo(mapDataInfo(journal.getDataInfo())) + ; + + + } public static Field mapStringField(FieldTypeProtos.StringField s) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java index a73ed8d75..804ecbd7b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java @@ -1,15 +1,21 @@ package eu.dnetlib.dhp.graph; +import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Publication; import org.apache.hadoop.io.Text; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import scala.Tuple2; +import javax.xml.crypto.Data; + public class SparkGraphImporterJob { @@ -32,18 +38,19 @@ public class SparkGraphImporterJob { final JavaRDD> inputRDD = sc.sequenceFile("file:///home/sandro/part-m-00000", Text.class, Text.class).map(item -> new Tuple2<>(item._1.toString(), item._2.toString())); - Tuple2 item = inputRDD + final JavaRDD datasources = inputRDD .filter(s -> s._1().split("@")[2].equalsIgnoreCase("body")) - .first(); - System.out.println(item._1()); - System.out.println(item._2()); + .map(Tuple2::_2) + .map(ProtoConverter::convert) + .filter(s-> s instanceof Datasource) + .map(s->(Datasource)s); + final Encoder encoder = Encoders.bean(Datasource.class); + final Dataset mdstore = spark.createDataset(datasources.rdd(), encoder); -// .map(Tuple2::_2) -// .map(ProtoConverter::convert) -// .mapToPair((PairFunction) s-> new Tuple2(s.getClass().getName(),1)) -// .reduceByKey(Integer::sum).collect().forEach(System.out::println); + System.out.println(mdstore.count()); + // // // .filter(s -> s instanceof Publication) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/ProtoConverterTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/ProtoConverterTest.java index 3640cb996..fd0edf573 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/ProtoConverterTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/ProtoConverterTest.java @@ -1,5 +1,6 @@ package eu.dnetlib.dhp.graph; +import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Oaf; import org.apache.commons.io.IOUtils; @@ -23,6 +24,9 @@ public class ProtoConverterTest { System.out.println(ds.getId()); + ObjectMapper mapper = new ObjectMapper(); + System.out.println(mapper.writeValueAsString(result)); + }