From c8d6d6bbd162ce5db7f6f48354b401f03b4b68aa Mon Sep 17 00:00:00 2001 From: "sandro.labruzzo" Date: Fri, 25 Oct 2019 10:23:51 +0200 Subject: [PATCH] implemented organization mapping --- .../eu/dnetlib/dhp/graph/ProtoConverter.java | 70 +++++++++++-------- .../dhp/graph/SparkGraphImporterJob.java | 11 +-- 2 files changed, 48 insertions(+), 33 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java index c27d819cc..8ecb5c999 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoConverter.java @@ -1,9 +1,6 @@ package eu.dnetlib.dhp.graph; -import eu.dnetlib.data.proto.DatasourceProtos; -import eu.dnetlib.data.proto.KindProtos; -import eu.dnetlib.data.proto.OafProtos; -import eu.dnetlib.data.proto.ProjectProtos; +import eu.dnetlib.data.proto.*; import eu.dnetlib.dhp.schema.oaf.*; import java.io.Serializable; @@ -61,21 +58,38 @@ public class ProtoConverter implements Serializable { } private static Organization convertOrganization(OafProtos.Oaf oaf) { - final DatasourceProtos.Datasource.Metadata m = oaf.getEntity().getDatasource().getMetadata(); + final OrganizationProtos.Organization.Metadata m = oaf.getEntity().getOrganization().getMetadata(); final Organization org = setOaf(new Organization(), oaf); - return setEntity(org, oaf); - - //TODO set org fields - } + return setEntity(org, oaf) + .setLegalshortname(mapStringField(m.getLegalshortname())) + .setLegalname(mapStringField(m.getLegalname())) + .setAlternativeNames(m.getAlternativeNamesList(). + stream() + .map(ProtoUtils::mapStringField) + .collect(Collectors.toList())) + .setWebsiteurl(mapStringField(m.getWebsiteurl())) + .setLogourl(mapStringField(m.getLogourl())) + .setEclegalbody(mapStringField(m.getEclegalbody())) + .setEclegalperson(mapStringField(m.getEclegalperson())) + .setEcnonprofit(mapStringField(m.getEcnonprofit())) + .setEcresearchorganization(mapStringField(m.getEcresearchorganization())) + .setEchighereducation(mapStringField(m.getEchighereducation())) + .setEcinternationalorganizationeurinterests(mapStringField(m.getEcinternationalorganizationeurinterests())) + .setEcinternationalorganization(mapStringField(m.getEcinternationalorganization())) + .setEcenterprise(mapStringField(m.getEcenterprise())) + .setEcsmevalidated(mapStringField(m.getEcsmevalidated())) + .setEcnutscode(mapStringField(m.getEcnutscode())) + .setCountry(mapQualifier(m.getCountry())); + } private static Datasource convertDataSource(OafProtos.Oaf oaf) { final DatasourceProtos.Datasource.Metadata m = oaf.getEntity().getDatasource().getMetadata(); final Datasource datasource = setOaf(new Datasource(), oaf); return setEntity(datasource, oaf) .setAccessinfopackage(m.getAccessinfopackageList() - .stream() - .map(ProtoUtils::mapStringField) - .collect(Collectors.toList())) + .stream() + .map(ProtoUtils::mapStringField) + .collect(Collectors.toList())) .setCertificates(mapStringField(m.getCertificates())) .setCitationguidelineurl(mapStringField(m.getCitationguidelineurl())) .setContactemail(mapStringField(m.getContactemail())) @@ -95,13 +109,13 @@ public class ProtoConverter implements Serializable { .setMissionstatementurl(mapStringField(m.getMissionstatementurl())) .setNamespaceprefix(mapStringField(m.getNamespaceprefix())) .setOdcontenttypes(m.getOdcontenttypesList() - .stream() - .map(ProtoUtils::mapStringField) - .collect(Collectors.toList())) + .stream() + .map(ProtoUtils::mapStringField) + .collect(Collectors.toList())) .setOdlanguages(m.getOdlanguagesList() - .stream() - .map(ProtoUtils::mapStringField) - .collect(Collectors.toList())) + .stream() + .map(ProtoUtils::mapStringField) + .collect(Collectors.toList())) .setOdnumberofitems(mapStringField(m.getOdnumberofitems())) .setOdnumberofitemsdate(mapStringField(m.getOdnumberofitemsdate())) .setOdpolicies(mapStringField(m.getOdpolicies())) @@ -109,17 +123,17 @@ public class ProtoConverter implements Serializable { .setOpenairecompatibility(mapQualifier(m.getOpenairecompatibility())) .setPidsystems(mapStringField(m.getPidsystems())) .setPolicies(m.getPoliciesList() - .stream() - .map(ProtoUtils::mapKV) - .collect(Collectors.toList())) + .stream() + .map(ProtoUtils::mapKV) + .collect(Collectors.toList())) .setQualitymanagementkind(mapStringField(m.getQualitymanagementkind())) .setReleaseenddate(mapStringField(m.getReleaseenddate())) .setServiceprovider(mapBoolField(m.getServiceprovider())) .setReleasestartdate(mapStringField(m.getReleasestartdate())) .setSubjects(m.getSubjectsList() - .stream() - .map(ProtoUtils::mapStructuredProperty) - .collect(Collectors.toList())) + .stream() + .map(ProtoUtils::mapStructuredProperty) + .collect(Collectors.toList())) .setVersioning(mapBoolField(m.getVersioning())) .setWebsiteurl(mapStringField(m.getWebsiteurl())) .setJournal(mapJournal(m.getJournal())); @@ -148,13 +162,13 @@ public class ProtoConverter implements Serializable { .setTotalcost(m.getTotalcost()) .setKeywords(mapStringField(m.getKeywords())) .setSubjects(m.getSubjectsList().stream() - .map(sp -> mapStructuredProperty(sp)) - .collect(Collectors.toList())) + .map(sp -> mapStructuredProperty(sp)) + .collect(Collectors.toList())) .setTitle(mapStringField(m.getTitle())) .setWebsiteurl(mapStringField(m.getWebsiteurl())) .setFundingtree(m.getFundingtreeList().stream() - .map(f -> mapStringField(f)) - .collect(Collectors.toList())) + .map(f -> mapStringField(f)) + .collect(Collectors.toList())) .setJsonextrainfo(mapStringField(m.getJsonextrainfo())) .setSummary(mapStringField(m.getSummary())) .setOptional1(mapStringField(m.getOptional1())) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java index f0edbc032..76bcc0cf9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.graph; import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.schema.oaf.Publication; import org.apache.hadoop.io.Text; import org.apache.spark.api.java.JavaRDD; @@ -42,14 +43,14 @@ public class SparkGraphImporterJob { - final JavaRDD datasources = inputRDD + final JavaRDD datasources = inputRDD .filter(s -> s._1().split("@")[2].equalsIgnoreCase("body")) .map(Tuple2::_2) .map(ProtoConverter::convert) - .filter(s-> s instanceof Datasource) - .map(s->(Datasource)s); - final Encoder encoder = Encoders.bean(Datasource.class); - final Dataset mdstore = spark.createDataset(datasources.rdd(), encoder); + .filter(s-> s instanceof Organization) + .map(s->(Organization)s); + final Encoder encoder = Encoders.bean(Organization.class); + final Dataset mdstore = spark.createDataset(datasources.rdd(), encoder); System.out.println(mdstore.count());