forked from antonis.lempesis/dnet-hadoop
Merge branch 'master' of https://code-repo.d3science.org/D-Net/dnet-hadoop
This commit is contained in:
commit
331b853ad6
|
@ -1,9 +1,6 @@
|
||||||
package eu.dnetlib.dhp.graph;
|
package eu.dnetlib.dhp.graph;
|
||||||
|
|
||||||
import eu.dnetlib.data.proto.DatasourceProtos;
|
import eu.dnetlib.data.proto.*;
|
||||||
import eu.dnetlib.data.proto.KindProtos;
|
|
||||||
import eu.dnetlib.data.proto.OafProtos;
|
|
||||||
import eu.dnetlib.data.proto.ProjectProtos;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
@ -61,21 +58,38 @@ public class ProtoConverter implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Organization convertOrganization(OafProtos.Oaf oaf) {
|
private static Organization convertOrganization(OafProtos.Oaf oaf) {
|
||||||
final DatasourceProtos.Datasource.Metadata m = oaf.getEntity().getDatasource().getMetadata();
|
final OrganizationProtos.Organization.Metadata m = oaf.getEntity().getOrganization().getMetadata();
|
||||||
final Organization org = setOaf(new Organization(), oaf);
|
final Organization org = setOaf(new Organization(), oaf);
|
||||||
return setEntity(org, oaf);
|
return setEntity(org, oaf)
|
||||||
|
.setLegalshortname(mapStringField(m.getLegalshortname()))
|
||||||
//TODO set org fields
|
.setLegalname(mapStringField(m.getLegalname()))
|
||||||
}
|
.setAlternativeNames(m.getAlternativeNamesList().
|
||||||
|
stream()
|
||||||
|
.map(ProtoUtils::mapStringField)
|
||||||
|
.collect(Collectors.toList()))
|
||||||
|
.setWebsiteurl(mapStringField(m.getWebsiteurl()))
|
||||||
|
.setLogourl(mapStringField(m.getLogourl()))
|
||||||
|
.setEclegalbody(mapStringField(m.getEclegalbody()))
|
||||||
|
.setEclegalperson(mapStringField(m.getEclegalperson()))
|
||||||
|
.setEcnonprofit(mapStringField(m.getEcnonprofit()))
|
||||||
|
.setEcresearchorganization(mapStringField(m.getEcresearchorganization()))
|
||||||
|
.setEchighereducation(mapStringField(m.getEchighereducation()))
|
||||||
|
.setEcinternationalorganizationeurinterests(mapStringField(m.getEcinternationalorganizationeurinterests()))
|
||||||
|
.setEcinternationalorganization(mapStringField(m.getEcinternationalorganization()))
|
||||||
|
.setEcenterprise(mapStringField(m.getEcenterprise()))
|
||||||
|
.setEcsmevalidated(mapStringField(m.getEcsmevalidated()))
|
||||||
|
.setEcnutscode(mapStringField(m.getEcnutscode()))
|
||||||
|
.setCountry(mapQualifier(m.getCountry()));
|
||||||
|
}
|
||||||
|
|
||||||
private static Datasource convertDataSource(OafProtos.Oaf oaf) {
|
private static Datasource convertDataSource(OafProtos.Oaf oaf) {
|
||||||
final DatasourceProtos.Datasource.Metadata m = oaf.getEntity().getDatasource().getMetadata();
|
final DatasourceProtos.Datasource.Metadata m = oaf.getEntity().getDatasource().getMetadata();
|
||||||
final Datasource datasource = setOaf(new Datasource(), oaf);
|
final Datasource datasource = setOaf(new Datasource(), oaf);
|
||||||
return setEntity(datasource, oaf)
|
return setEntity(datasource, oaf)
|
||||||
.setAccessinfopackage(m.getAccessinfopackageList()
|
.setAccessinfopackage(m.getAccessinfopackageList()
|
||||||
.stream()
|
.stream()
|
||||||
.map(ProtoUtils::mapStringField)
|
.map(ProtoUtils::mapStringField)
|
||||||
.collect(Collectors.toList()))
|
.collect(Collectors.toList()))
|
||||||
.setCertificates(mapStringField(m.getCertificates()))
|
.setCertificates(mapStringField(m.getCertificates()))
|
||||||
.setCitationguidelineurl(mapStringField(m.getCitationguidelineurl()))
|
.setCitationguidelineurl(mapStringField(m.getCitationguidelineurl()))
|
||||||
.setContactemail(mapStringField(m.getContactemail()))
|
.setContactemail(mapStringField(m.getContactemail()))
|
||||||
|
@ -95,13 +109,13 @@ public class ProtoConverter implements Serializable {
|
||||||
.setMissionstatementurl(mapStringField(m.getMissionstatementurl()))
|
.setMissionstatementurl(mapStringField(m.getMissionstatementurl()))
|
||||||
.setNamespaceprefix(mapStringField(m.getNamespaceprefix()))
|
.setNamespaceprefix(mapStringField(m.getNamespaceprefix()))
|
||||||
.setOdcontenttypes(m.getOdcontenttypesList()
|
.setOdcontenttypes(m.getOdcontenttypesList()
|
||||||
.stream()
|
.stream()
|
||||||
.map(ProtoUtils::mapStringField)
|
.map(ProtoUtils::mapStringField)
|
||||||
.collect(Collectors.toList()))
|
.collect(Collectors.toList()))
|
||||||
.setOdlanguages(m.getOdlanguagesList()
|
.setOdlanguages(m.getOdlanguagesList()
|
||||||
.stream()
|
.stream()
|
||||||
.map(ProtoUtils::mapStringField)
|
.map(ProtoUtils::mapStringField)
|
||||||
.collect(Collectors.toList()))
|
.collect(Collectors.toList()))
|
||||||
.setOdnumberofitems(mapStringField(m.getOdnumberofitems()))
|
.setOdnumberofitems(mapStringField(m.getOdnumberofitems()))
|
||||||
.setOdnumberofitemsdate(mapStringField(m.getOdnumberofitemsdate()))
|
.setOdnumberofitemsdate(mapStringField(m.getOdnumberofitemsdate()))
|
||||||
.setOdpolicies(mapStringField(m.getOdpolicies()))
|
.setOdpolicies(mapStringField(m.getOdpolicies()))
|
||||||
|
@ -109,17 +123,17 @@ public class ProtoConverter implements Serializable {
|
||||||
.setOpenairecompatibility(mapQualifier(m.getOpenairecompatibility()))
|
.setOpenairecompatibility(mapQualifier(m.getOpenairecompatibility()))
|
||||||
.setPidsystems(mapStringField(m.getPidsystems()))
|
.setPidsystems(mapStringField(m.getPidsystems()))
|
||||||
.setPolicies(m.getPoliciesList()
|
.setPolicies(m.getPoliciesList()
|
||||||
.stream()
|
.stream()
|
||||||
.map(ProtoUtils::mapKV)
|
.map(ProtoUtils::mapKV)
|
||||||
.collect(Collectors.toList()))
|
.collect(Collectors.toList()))
|
||||||
.setQualitymanagementkind(mapStringField(m.getQualitymanagementkind()))
|
.setQualitymanagementkind(mapStringField(m.getQualitymanagementkind()))
|
||||||
.setReleaseenddate(mapStringField(m.getReleaseenddate()))
|
.setReleaseenddate(mapStringField(m.getReleaseenddate()))
|
||||||
.setServiceprovider(mapBoolField(m.getServiceprovider()))
|
.setServiceprovider(mapBoolField(m.getServiceprovider()))
|
||||||
.setReleasestartdate(mapStringField(m.getReleasestartdate()))
|
.setReleasestartdate(mapStringField(m.getReleasestartdate()))
|
||||||
.setSubjects(m.getSubjectsList()
|
.setSubjects(m.getSubjectsList()
|
||||||
.stream()
|
.stream()
|
||||||
.map(ProtoUtils::mapStructuredProperty)
|
.map(ProtoUtils::mapStructuredProperty)
|
||||||
.collect(Collectors.toList()))
|
.collect(Collectors.toList()))
|
||||||
.setVersioning(mapBoolField(m.getVersioning()))
|
.setVersioning(mapBoolField(m.getVersioning()))
|
||||||
.setWebsiteurl(mapStringField(m.getWebsiteurl()))
|
.setWebsiteurl(mapStringField(m.getWebsiteurl()))
|
||||||
.setJournal(mapJournal(m.getJournal()));
|
.setJournal(mapJournal(m.getJournal()));
|
||||||
|
@ -148,13 +162,13 @@ public class ProtoConverter implements Serializable {
|
||||||
.setTotalcost(m.getTotalcost())
|
.setTotalcost(m.getTotalcost())
|
||||||
.setKeywords(mapStringField(m.getKeywords()))
|
.setKeywords(mapStringField(m.getKeywords()))
|
||||||
.setSubjects(m.getSubjectsList().stream()
|
.setSubjects(m.getSubjectsList().stream()
|
||||||
.map(sp -> mapStructuredProperty(sp))
|
.map(sp -> mapStructuredProperty(sp))
|
||||||
.collect(Collectors.toList()))
|
.collect(Collectors.toList()))
|
||||||
.setTitle(mapStringField(m.getTitle()))
|
.setTitle(mapStringField(m.getTitle()))
|
||||||
.setWebsiteurl(mapStringField(m.getWebsiteurl()))
|
.setWebsiteurl(mapStringField(m.getWebsiteurl()))
|
||||||
.setFundingtree(m.getFundingtreeList().stream()
|
.setFundingtree(m.getFundingtreeList().stream()
|
||||||
.map(f -> mapStringField(f))
|
.map(f -> mapStringField(f))
|
||||||
.collect(Collectors.toList()))
|
.collect(Collectors.toList()))
|
||||||
.setJsonextrainfo(mapStringField(m.getJsonextrainfo()))
|
.setJsonextrainfo(mapStringField(m.getJsonextrainfo()))
|
||||||
.setSummary(mapStringField(m.getSummary()))
|
.setSummary(mapStringField(m.getSummary()))
|
||||||
.setOptional1(mapStringField(m.getOptional1()))
|
.setOptional1(mapStringField(m.getOptional1()))
|
||||||
|
|
|
@ -3,6 +3,7 @@ package eu.dnetlib.dhp.graph;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Organization;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
@ -42,14 +43,14 @@ public class SparkGraphImporterJob {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
final JavaRDD<Datasource> datasources = inputRDD
|
final JavaRDD<Organization> datasources = inputRDD
|
||||||
.filter(s -> s._1().split("@")[2].equalsIgnoreCase("body"))
|
.filter(s -> s._1().split("@")[2].equalsIgnoreCase("body"))
|
||||||
.map(Tuple2::_2)
|
.map(Tuple2::_2)
|
||||||
.map(ProtoConverter::convert)
|
.map(ProtoConverter::convert)
|
||||||
.filter(s-> s instanceof Datasource)
|
.filter(s-> s instanceof Organization)
|
||||||
.map(s->(Datasource)s);
|
.map(s->(Organization)s);
|
||||||
final Encoder<Datasource> encoder = Encoders.bean(Datasource.class);
|
final Encoder<Organization> encoder = Encoders.bean(Organization.class);
|
||||||
final Dataset<Datasource> mdstore = spark.createDataset(datasources.rdd(), encoder);
|
final Dataset<Organization> mdstore = spark.createDataset(datasources.rdd(), encoder);
|
||||||
|
|
||||||
|
|
||||||
System.out.println(mdstore.count());
|
System.out.println(mdstore.count());
|
||||||
|
|
Loading…
Reference in New Issue