conversion utilities from protobuffer model to DHP model moved in dnet-mapreduce-jobs. Removed also the relative protobuf dependencies

This commit is contained in:
Claudio Atzori 2019-11-04 12:28:56 +01:00
parent 997e57d45b
commit 32ed4ae8d6
17 changed files with 26 additions and 2171 deletions

View File

@ -22,15 +22,6 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>com.googlecode.protobuf-java-format</groupId>
<artifactId>protobuf-java-format</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-openaire-data-protos</artifactId>
</dependency>
</dependencies>

View File

@ -3,7 +3,7 @@ package eu.dnetlib.dhp.schema.oaf;
import java.io.Serializable;
import java.util.List;
public class OtherResearchProducts extends Result implements Serializable {
public class OtherResearchProduct extends Result implements Serializable {
private List<Field<String>> contactperson;

View File

@ -1,23 +0,0 @@
package eu.dnetlib.dhp.schema.proto;
import com.googlecode.protobuf.format.JsonFormat;
import eu.dnetlib.data.proto.OafProtos;
import org.apache.commons.io.IOUtils;
import org.junit.Test;
public class TestParseProtoJson {
@Test
public void testParse() throws Exception {
final String json = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/schema/proto/hugeRecord.json"));
final OafProtos.Oaf.Builder oafBuilder = OafProtos.Oaf.newBuilder();
JsonFormat jf = new JsonFormat();
jf.merge(IOUtils.toInputStream(json), oafBuilder);
OafProtos.Oaf oaf = oafBuilder.build();
System.out.println(jf.printToString(oaf));
}
}

View File

@ -21,16 +21,6 @@
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-openaire-data-protos</artifactId>
</dependency>
<dependency>
<groupId>com.googlecode.protobuf-java-format</groupId>
<artifactId>protobuf-java-format</artifactId>
</dependency>
<dependency>

View File

@ -1,269 +0,0 @@
package eu.dnetlib.dhp.graph;
import eu.dnetlib.data.proto.*;
import eu.dnetlib.dhp.schema.oaf.*;
import java.io.Serializable;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.graph.ProtoUtils.*;
public class ProtoConverter implements Serializable {
public static Oaf convert(String s) {
try {
OafProtos.Oaf oaf = ProtoUtils.parse(s);
if (oaf.getKind() == KindProtos.Kind.entity)
return convertEntity(oaf);
else {
return convertRelation(oaf);
}
} catch (Throwable e) {
throw new RuntimeException("error on getting " + s, e);
}
}
private static Relation convertRelation(OafProtos.Oaf oaf) {
final OafProtos.OafRel r = oaf.getRel();
final Relation rel = new Relation();
rel.setDataInfo(mapDataInfo(oaf.getDataInfo()));
rel.setLastupdatetimestamp(oaf.getLastupdatetimestamp());
rel.setSource(r.getSource());
rel.setTarget(r.getTarget());
rel.setRelType(r.getRelType().toString());
rel.setSubRelType(r.getSubRelType().toString());
rel.setRelClass(r.getRelClass());
rel.setCollectedFrom(r.getCollectedfromCount() > 0 ?
r.getCollectedfromList().stream()
.map(kv -> mapKV(kv))
.collect(Collectors.toList()) : null);
return rel;
}
private static OafEntity convertEntity(OafProtos.Oaf oaf) {
switch (oaf.getEntity().getType()) {
case result:
return convertResult(oaf);
case project:
return convertProject(oaf);
case datasource:
return convertDataSource(oaf);
case organization:
return convertOrganization(oaf);
default:
throw new RuntimeException("received unknown type");
}
}
private static Organization convertOrganization(OafProtos.Oaf oaf) {
final OrganizationProtos.Organization.Metadata m = oaf.getEntity().getOrganization().getMetadata();
final Organization org = setOaf(new Organization(), oaf);
setEntity(org, oaf);
org.setLegalshortname(mapStringField(m.getLegalshortname()));
org.setLegalname(mapStringField(m.getLegalname()));
org.setAlternativeNames(m.getAlternativeNamesList().
stream()
.map(ProtoUtils::mapStringField)
.collect(Collectors.toList()));
org.setWebsiteurl(mapStringField(m.getWebsiteurl()));
org.setLogourl(mapStringField(m.getLogourl()));
org.setEclegalbody(mapStringField(m.getEclegalbody()));
org.setEclegalperson(mapStringField(m.getEclegalperson()));
org.setEcnonprofit(mapStringField(m.getEcnonprofit()));
org.setEcresearchorganization(mapStringField(m.getEcresearchorganization()));
org.setEchighereducation(mapStringField(m.getEchighereducation()));
org.setEcinternationalorganizationeurinterests(mapStringField(m.getEcinternationalorganizationeurinterests()));
org.setEcinternationalorganization(mapStringField(m.getEcinternationalorganization()));
org.setEcenterprise(mapStringField(m.getEcenterprise()));
org.setEcsmevalidated(mapStringField(m.getEcsmevalidated()));
org.setEcnutscode(mapStringField(m.getEcnutscode()));
org.setCountry(mapQualifier(m.getCountry()));
return org;
}
private static Datasource convertDataSource(OafProtos.Oaf oaf) {
final DatasourceProtos.Datasource.Metadata m = oaf.getEntity().getDatasource().getMetadata();
final Datasource datasource = setOaf(new Datasource(), oaf);
setEntity(datasource, oaf);
datasource.setAccessinfopackage(m.getAccessinfopackageList()
.stream()
.map(ProtoUtils::mapStringField)
.collect(Collectors.toList()));
datasource.setCertificates(mapStringField(m.getCertificates()));
datasource.setCitationguidelineurl(mapStringField(m.getCitationguidelineurl()));
datasource.setContactemail(mapStringField(m.getContactemail()));
datasource.setDatabaseaccessrestriction(mapStringField(m.getDatabaseaccessrestriction()));
datasource.setDatabaseaccesstype(mapStringField(m.getDatabaseaccesstype()));
datasource.setDataprovider(mapBoolField(m.getDataprovider()));
datasource.setDatasourcetype(mapQualifier(m.getDatasourcetype()));
datasource.setDatauploadrestriction(mapStringField(m.getDatauploadrestriction()));
datasource.setCitationguidelineurl(mapStringField(m.getCitationguidelineurl()));
datasource.setDatauploadtype(mapStringField(m.getDatauploadtype()));
datasource.setDateofvalidation(mapStringField(m.getDateofvalidation()));
datasource.setDescription(mapStringField(m.getDescription()));
datasource.setEnglishname(mapStringField(m.getEnglishname()));
datasource.setLatitude(mapStringField(m.getLatitude()));
datasource.setLongitude(mapStringField(m.getLongitude()));
datasource.setLogourl(mapStringField(m.getLogourl()));
datasource.setMissionstatementurl(mapStringField(m.getMissionstatementurl()));
datasource.setNamespaceprefix(mapStringField(m.getNamespaceprefix()));
datasource.setOdcontenttypes(m.getOdcontenttypesList()
.stream()
.map(ProtoUtils::mapStringField)
.collect(Collectors.toList()));
datasource.setOdlanguages(m.getOdlanguagesList()
.stream()
.map(ProtoUtils::mapStringField)
.collect(Collectors.toList()));
datasource.setOdnumberofitems(mapStringField(m.getOdnumberofitems()));
datasource.setOdnumberofitemsdate(mapStringField(m.getOdnumberofitemsdate()));
datasource.setOdpolicies(mapStringField(m.getOdpolicies()));
datasource.setOfficialname(mapStringField(m.getOfficialname()));
datasource.setOpenairecompatibility(mapQualifier(m.getOpenairecompatibility()));
datasource.setPidsystems(mapStringField(m.getPidsystems()));
datasource.setPolicies(m.getPoliciesList()
.stream()
.map(ProtoUtils::mapKV)
.collect(Collectors.toList()));
datasource.setQualitymanagementkind(mapStringField(m.getQualitymanagementkind()));
datasource.setReleaseenddate(mapStringField(m.getReleaseenddate()));
datasource.setServiceprovider(mapBoolField(m.getServiceprovider()));
datasource.setReleasestartdate(mapStringField(m.getReleasestartdate()));
datasource.setSubjects(m.getSubjectsList()
.stream()
.map(ProtoUtils::mapStructuredProperty)
.collect(Collectors.toList()));
datasource.setVersioning(mapBoolField(m.getVersioning()));
datasource.setWebsiteurl(mapStringField(m.getWebsiteurl()));
datasource.setJournal(mapJournal(m.getJournal()));
return datasource;
}
private static Project convertProject(OafProtos.Oaf oaf) {
final ProjectProtos.Project.Metadata m = oaf.getEntity().getProject().getMetadata();
final Project project = setOaf(new Project(), oaf);
setEntity(project, oaf);
project.setAcronym(mapStringField(m.getAcronym()));
project.setCallidentifier(mapStringField(m.getCallidentifier()));
project.setCode(mapStringField(m.getCode()));
project.setContactemail(mapStringField(m.getContactemail()));
project.setContactfax(mapStringField(m.getContactfax()));
project.setContactfullname(mapStringField(m.getContactfullname()));
project.setContactphone(mapStringField(m.getContactphone()));
project.setContracttype(mapQualifier(m.getContracttype()));
project.setCurrency(mapStringField(m.getCurrency()));
project.setDuration(mapStringField(m.getDuration()));
project.setEcarticle29_3(mapStringField(m.getEcarticle293()));
project.setEcsc39(mapStringField(m.getEcsc39()));
project.setOamandatepublications(mapStringField(m.getOamandatepublications()));
project.setStartdate(mapStringField(m.getStartdate()));
project.setEnddate(mapStringField(m.getEnddate()));
project.setFundedamount(m.getFundedamount());
project.setTotalcost(m.getTotalcost());
project.setKeywords(mapStringField(m.getKeywords()));
project.setSubjects(m.getSubjectsList().stream()
.map(sp -> mapStructuredProperty(sp))
.collect(Collectors.toList()));
project.setTitle(mapStringField(m.getTitle()));
project.setWebsiteurl(mapStringField(m.getWebsiteurl()));
project.setFundingtree(m.getFundingtreeList().stream()
.map(f -> mapStringField(f))
.collect(Collectors.toList()));
project.setJsonextrainfo(mapStringField(m.getJsonextrainfo()));
project.setSummary(mapStringField(m.getSummary()));
project.setOptional1(mapStringField(m.getOptional1()));
project.setOptional2(mapStringField(m.getOptional2()));
return project;
}
private static Result convertResult(OafProtos.Oaf oaf) {
switch (oaf.getEntity().getResult().getMetadata().getResulttype().getClassid()) {
case "dataset":
return createDataset(oaf);
case "publication":
return createPublication(oaf);
case "software":
return createSoftware(oaf);
case "other":
return createORP(oaf);
default:
throw new RuntimeException("received unknown type: " + oaf.getEntity().getResult().getMetadata().getResulttype().getClassid());
}
}
private static Software createSoftware(OafProtos.Oaf oaf) {
ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata();
Software software = setOaf(new Software(), oaf);
setEntity(software, oaf);
setResult(software, oaf);
software.setDocumentationUrl(m.getDocumentationUrlList()
.stream()
.map(ProtoUtils::mapStringField)
.collect(Collectors.toList()));
software.setLicense(m.getLicenseList()
.stream()
.map(ProtoUtils::mapStructuredProperty)
.collect(Collectors.toList()));
software.setCodeRepositoryUrl(ProtoUtils.mapStringField(m.getCodeRepositoryUrl()));
software.setProgrammingLanguage(ProtoUtils.mapQualifier(m.getProgrammingLanguage()));
return software;
}
private static OtherResearchProducts createORP(OafProtos.Oaf oaf) {
ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata();
OtherResearchProducts otherResearchProducts = setOaf(new OtherResearchProducts(), oaf);
setEntity(otherResearchProducts, oaf);
setResult(otherResearchProducts, oaf);
otherResearchProducts.setContactperson(m.getContactpersonList()
.stream()
.map(ProtoUtils::mapStringField)
.collect(Collectors.toList()));
otherResearchProducts.setContactgroup(m.getContactgroupList()
.stream()
.map(ProtoUtils::mapStringField)
.collect(Collectors.toList()));
otherResearchProducts.setTool(m.getToolList()
.stream()
.map(ProtoUtils::mapStringField)
.collect(Collectors.toList()));
return otherResearchProducts;
}
private static Publication createPublication(OafProtos.Oaf oaf) {
ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata();
Publication publication = setOaf(new Publication(), oaf);
setEntity(publication, oaf);
setResult(publication, oaf);
publication.setJournal(mapJournal(m.getJournal()));
return publication;
}
private static Dataset createDataset(OafProtos.Oaf oaf) {
ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata();
Dataset dataset = setOaf(new Dataset(), oaf);
setEntity(dataset, oaf);
setResult(dataset, oaf);
dataset.setStoragedate(ProtoUtils.mapStringField(m.getStoragedate()));
dataset.setDevice(ProtoUtils.mapStringField(m.getDevice()));
dataset.setSize(ProtoUtils.mapStringField(m.getSize()));
dataset.setVersion(ProtoUtils.mapStringField(m.getVersion()));
dataset.setLastmetadataupdate(ProtoUtils.mapStringField(m.getLastmetadataupdate()));
dataset.setMetadataversionnumber(ProtoUtils.mapStringField(m.getMetadataversionnumber()));
dataset.setGeolocation(m.getGeolocationList()
.stream()
.map(ProtoUtils::mapGeolocation)
.collect(Collectors.toList()));
return dataset;
}
}

View File

@ -1,251 +0,0 @@
package eu.dnetlib.dhp.graph;
import com.googlecode.protobuf.format.JsonFormat;
import eu.dnetlib.data.proto.FieldTypeProtos;
import eu.dnetlib.data.proto.OafProtos;
import eu.dnetlib.data.proto.ResultProtos;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import java.io.IOException;
import java.util.stream.Collectors;
public class ProtoUtils {
public static OafProtos.Oaf parse(String json) throws IOException {
final OafProtos.Oaf.Builder builder = OafProtos.Oaf.newBuilder();
final JsonFormat jf = new JsonFormat();
jf.merge(IOUtils.toInputStream(json), builder);
return builder.build();
}
public static <T extends Oaf> T setOaf(T oaf, OafProtos.Oaf o) {
oaf.setDataInfo(mapDataInfo(o.getDataInfo()));
oaf.setLastupdatetimestamp(o.getLastupdatetimestamp());
return oaf;
}
public static <T extends OafEntity> T setEntity(T entity, OafProtos.Oaf oaf) {
//setting Entity fields
final OafProtos.OafEntity e = oaf.getEntity();
entity.setId(e.getId());
entity.setOriginalId(e.getOriginalIdList());
entity.setCollectedfrom(e.getCollectedfromList()
.stream()
.map(ProtoUtils::mapKV)
.collect(Collectors.toList()));
entity.setPid(e.getPidList().stream()
.map(ProtoUtils::mapStructuredProperty)
.collect(Collectors.toList()));
entity.setDateofcollection(entity.getDateofcollection());
entity.setDateoftransformation(entity.getDateoftransformation());
entity.setExtraInfo(e.getExtraInfoList()
.stream()
.map(ProtoUtils::mapExtraInfo)
.collect(Collectors.toList()));
return entity;
}
public static <T extends Result> T setResult(T entity, OafProtos.Oaf oaf) {
//setting Entity fields
final ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata();
entity.setAuthor(m.getAuthorList()
.stream()
.map(ProtoUtils::mapAuthor)
.collect(Collectors.toList()));
entity.setResulttype(mapQualifier(m.getResulttype()));
entity.setLanguage(ProtoUtils.mapQualifier(m.getLanguage()));
entity.setCountry(m.getCountryList()
.stream()
.map(ProtoUtils::mapQualifier)
.collect(Collectors.toList()));
entity.setSubject(m.getSubjectList()
.stream()
.map(ProtoUtils::mapStructuredProperty)
.collect(Collectors.toList()));
entity.setTitle(m.getTitleList()
.stream()
.map(ProtoUtils::mapStructuredProperty)
.collect(Collectors.toList()));
entity.setRelevantdate(m.getRelevantdateList()
.stream()
.map(ProtoUtils::mapStructuredProperty)
.collect(Collectors.toList()));
entity.setDescription(m.getDescriptionList()
.stream()
.map(ProtoUtils::mapStringField)
.collect(Collectors.toList()));
entity.setDateofacceptance(ProtoUtils.mapStringField(m.getDateofacceptance()));
entity.setPublisher(ProtoUtils.mapStringField(m.getPublisher()));
entity.setEmbargoenddate(ProtoUtils.mapStringField(m.getEmbargoenddate()));
entity.setSource(m.getSourceList()
.stream()
.map(ProtoUtils::mapStringField)
.collect(Collectors.toList()));
entity.setFulltext(m.getFulltextList()
.stream()
.map(ProtoUtils::mapStringField)
.collect(Collectors.toList()));
entity.setFormat(m.getFormatList()
.stream()
.map(ProtoUtils::mapStringField)
.collect(Collectors.toList()));
entity.setContributor(m.getContributorList()
.stream()
.map(ProtoUtils::mapStringField)
.collect(Collectors.toList()));
entity.setResourcetype(ProtoUtils.mapQualifier(m.getResourcetype()));
entity.setCoverage(m.getCoverageList()
.stream()
.map(ProtoUtils::mapStringField)
.collect(Collectors.toList()));
entity.setRefereed(mapStringField(m.getRefereed()));
entity.setContext(m.getContextList()
.stream()
.map(ProtoUtils::mapContext)
.collect(Collectors.toList()));
return entity;
}
private static Context mapContext(ResultProtos.Result.Context context) {
final Context entity = new Context();
entity.setId(context.getId());
entity.setDataInfo(context.getDataInfoList()
.stream()
.map(ProtoUtils::mapDataInfo)
.collect(Collectors.toList()));
return entity;
}
public static KeyValue mapKV(FieldTypeProtos.KeyValue kv) {
final KeyValue keyValue = new KeyValue();
keyValue.setKey(kv.getKey());
keyValue.setValue(kv.getValue());
keyValue.setDataInfo(mapDataInfo(kv.getDataInfo()));
return keyValue;
}
public static DataInfo mapDataInfo(FieldTypeProtos.DataInfo d) {
final DataInfo dataInfo = new DataInfo();
dataInfo.setDeletedbyinference(d.getDeletedbyinference());
dataInfo.setInferenceprovenance(d.getInferenceprovenance());
dataInfo.setInferred(d.getInferred());
dataInfo.setInvisible(d.getInvisible());
dataInfo.setProvenanceaction(mapQualifier(d.getProvenanceaction()));
return dataInfo;
}
public static Qualifier mapQualifier(FieldTypeProtos.Qualifier q) {
final Qualifier qualifier = new Qualifier();
qualifier.setClassid(q.getClassid());
qualifier.setClassname(q.getClassname());
qualifier.setSchemeid(q.getSchemeid());
qualifier.setSchemename(q.getSchemename());
return qualifier;
}
public static StructuredProperty mapStructuredProperty(FieldTypeProtos.StructuredProperty sp) {
final StructuredProperty structuredProperty = new StructuredProperty();
structuredProperty.setValue(sp.getValue());
structuredProperty.setQualifier(mapQualifier(sp.getQualifier()));
structuredProperty.setDataInfo(mapDataInfo(sp.getDataInfo()));
return structuredProperty;
}
public static ExtraInfo mapExtraInfo(FieldTypeProtos.ExtraInfo extraInfo) {
final ExtraInfo entity = new ExtraInfo();
entity.setName(extraInfo.getName());
entity.setTypology(extraInfo.getTypology());
entity.setProvenance(extraInfo.getProvenance());
entity.setTrust(extraInfo.getTrust());
entity.setValue(extraInfo.getValue());
return entity;
}
public static OAIProvenance mapOAIProvenance(FieldTypeProtos.OAIProvenance oaiProvenance) {
final OAIProvenance entity = new OAIProvenance();
entity.setOriginDescription(mapOriginalDescription(oaiProvenance.getOriginDescription()));
return entity;
}
public static OriginDescription mapOriginalDescription(FieldTypeProtos.OAIProvenance.OriginDescription originDescription) {
final OriginDescription originDescriptionResult = new OriginDescription();
originDescriptionResult.setHarvestDate(originDescription.getHarvestDate());
originDescriptionResult.setAltered(originDescription.getAltered());
originDescriptionResult.setBaseURL(originDescription.getBaseURL());
originDescriptionResult.setIdentifier(originDescription.getIdentifier());
originDescriptionResult.setDatestamp(originDescription.getDatestamp());
originDescriptionResult.setMetadataNamespace(originDescription.getMetadataNamespace());
return originDescriptionResult;
}
public static Field<String> mapStringField(FieldTypeProtos.StringField s) {
final Field<String> stringField = new Field<>();
stringField.setValue(s.getValue());
stringField.setDataInfo(mapDataInfo(s.getDataInfo()));
return stringField;
}
public static Field<Boolean> mapBoolField(FieldTypeProtos.BoolField b) {
final Field<Boolean> booleanField = new Field<>();
booleanField.setValue(b.getValue());
booleanField.setDataInfo(mapDataInfo(b.getDataInfo()));
return booleanField;
}
public static Field<Integer> mapIntField(FieldTypeProtos.IntField b) {
final Field<Integer> entity = new Field<>();
entity.setValue(b.getValue());
entity.setDataInfo(mapDataInfo(b.getDataInfo()));
return entity;
}
public static Journal mapJournal(FieldTypeProtos.Journal j) {
final Journal journal = new Journal();
journal.setConferencedate(j.getConferencedate());
journal.setConferenceplace(j.getConferenceplace());
journal.setEdition(j.getEdition());
journal.setEp(j.getEp());
journal.setIss(j.getIss());
journal.setIssnLinking(j.getIssnLinking());
journal.setIssnOnline(j.getIssnOnline());
journal.setIssnPrinted(j.getIssnPrinted());
journal.setName(j.getName());
journal.setSp(j.getSp());
journal.setVol(j.getVol());
journal.setDataInfo(mapDataInfo(j.getDataInfo()));
return journal;
}
public static Author mapAuthor(FieldTypeProtos.Author author) {
final Author entity = new Author();
entity.setFullname(author.getFullname());
entity.setName(author.getName());
entity.setSurname(author.getSurname());
entity.setRank(author.getRank());
entity.setPid(author.getPidList()
.stream()
.map(ProtoUtils::mapKV)
.collect(Collectors.toList()));
entity.setAffiliation(author.getAffiliationList()
.stream()
.map(ProtoUtils::mapStringField)
.collect(Collectors.toList()));
return entity;
}
public static GeoLocation mapGeolocation(ResultProtos.Result.GeoLocation geoLocation) {
final GeoLocation entity = new GeoLocation();
entity.setPoint(geoLocation.getPoint());
entity.setBox(geoLocation.getBox());
entity.setPlace(geoLocation.getPlace());
return entity;
}
}

View File

@ -1,5 +1,7 @@
package eu.dnetlib.dhp.graph;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
@ -7,11 +9,12 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.Map;
public class SparkGraphImporterJob {
public static void main(String[] args) throws Exception {
@ -27,44 +30,33 @@ public class SparkGraphImporterJob {
final String inputPath = parser.get("input");
final String outputPath = parser.get("outputDir");
final String filter = parser.get("filter");
// Read the input file and convert it into RDD of serializable object
final JavaRDD<Tuple2<String, String>> inputRDD = sc.sequenceFile(inputPath, Text.class, Text.class)
.map(item -> new Tuple2<>(item._1.toString(), item._2.toString()));
final JavaRDD<Oaf> oafRdd = inputRDD.filter(s -> !StringUtils.isBlank(s._2()) && !s._1().contains("@update")).map(Tuple2::_2).map(ProtoConverter::convert);
final Map<String, Class> types = Maps.newHashMap();
types.put("datasource", Datasource.class);
types.put("organization", Organization.class);
types.put("project", Project.class);
types.put("dataset", Dataset.class);
types.put("otherresearchproduct", OtherResearchProduct.class);
types.put("software", Software.class);
types.put("publication", Publication.class);
types.put("relation", Relation.class);
final Encoder<Organization> organizationEncoder = Encoders.bean(Organization.class);
final Encoder<Project> projectEncoder = Encoders.bean(Project.class);
final Encoder<Datasource> datasourceEncoder = Encoders.bean(Datasource.class);
types.forEach((name, clazz) -> {
if (StringUtils.isNotBlank(filter) || filter.toLowerCase().contains(name)) {
spark.createDataset(inputRDD
.filter(s -> s._1().equals(clazz.getName()))
.map(Tuple2::_2)
.map(s -> new ObjectMapper().readValue(s, clazz))
.rdd(), Encoders.bean(clazz))
.write()
.save(outputPath + "/" + name);
}
});
final Encoder<eu.dnetlib.dhp.schema.oaf.Dataset> datasetEncoder = Encoders.bean(eu.dnetlib.dhp.schema.oaf.Dataset.class);
final Encoder<Publication> publicationEncoder = Encoders.bean(Publication.class);
final Encoder<Software> softwareEncoder = Encoders.bean(Software.class);
final Encoder<OtherResearchProducts> otherResearchProductsEncoder = Encoders.bean(OtherResearchProducts.class);
final Encoder<Relation> relationEncoder = Encoders.bean(Relation.class);
if (filter == null|| filter.toLowerCase().contains("organization"))
spark.createDataset(oafRdd.filter(s -> s instanceof Organization).map(s -> (Organization) s).rdd(), organizationEncoder).write().save(outputPath + "/organizations");
if (filter == null|| filter.toLowerCase().contains("project"))
spark.createDataset(oafRdd.filter(s -> s instanceof Project).map(s -> (Project) s).rdd(), projectEncoder).write().save(outputPath + "/projects");
if (filter == null|| filter.toLowerCase().contains("datasource"))
spark.createDataset(oafRdd.filter(s -> s instanceof Datasource).map(s -> (Datasource) s).rdd(), datasourceEncoder).write().save(outputPath + "/datasources");
if (filter == null|| filter.toLowerCase().contains("dataset"))
spark.createDataset(oafRdd.filter(s -> s instanceof eu.dnetlib.dhp.schema.oaf.Dataset).map(s -> (eu.dnetlib.dhp.schema.oaf.Dataset) s).rdd(), datasetEncoder).write().save(outputPath + "/datasets");
if (filter == null|| filter.toLowerCase().contains("publication"))
spark.createDataset(oafRdd.filter(s -> s instanceof Publication).map(s -> (Publication) s).rdd(), publicationEncoder).write().save(outputPath + "/publications");
if (filter == null|| filter.toLowerCase().contains("software"))
spark.createDataset(oafRdd.filter(s -> s instanceof Software).map(s -> (Software) s).rdd(), softwareEncoder).write().save(outputPath + "/software");
if (filter == null|| filter.toLowerCase().contains("otherresearchproduct"))
spark.createDataset(oafRdd.filter(s -> s instanceof OtherResearchProducts).map(s -> (OtherResearchProducts) s).rdd(), otherResearchProductsEncoder).write().save(outputPath + "/otherResearchProducts");
if (filter == null|| filter.toLowerCase().contains("relation"))
spark.createDataset(oafRdd.filter(s -> s instanceof Relation).map(s -> (Relation) s).rdd(), relationEncoder).write().save(outputPath + "/relations");
}
}

View File

@ -1,111 +0,0 @@
package eu.dnetlib.dhp.graph;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class ProtoConverterTest {
@Test
public void convertDatasourceTest() throws Exception {
final String json = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/graph/datasource.json"));
Oaf result = ProtoConverter.convert(json);
assertNotNull(result);
assertTrue(result instanceof Datasource);
Datasource ds = (Datasource) result;
assertNotNull(ds.getId());
System.out.println(ds.getId());
ObjectMapper mapper = new ObjectMapper();
System.out.println(mapper.writeValueAsString(result));
}
@Test
public void convertOrganizationTest() throws Exception {
final String json = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/graph/organization.json"));
Oaf result = ProtoConverter.convert(json);
assertNotNull(result);
assertTrue(result instanceof Organization);
Organization ds = (Organization) result;
assertNotNull(ds.getId());
System.out.println(ds.getId());
ObjectMapper mapper = new ObjectMapper();
System.out.println(mapper.writeValueAsString(result));
}
@Test
public void convertPublicationTest() throws Exception {
final String json = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/graph/publication.json"));
Oaf result = ProtoConverter.convert(json);
assertNotNull(result);
assertTrue(result instanceof Publication);
Publication p = (Publication) result;
ObjectMapper mapper = new ObjectMapper();
System.out.println(mapper.writeValueAsString(p));
}
@Test
public void convertDatasetTest() throws Exception {
final String json = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/graph/dataset.json"));
Oaf result = ProtoConverter.convert(json);
assertNotNull(result);
assertTrue(result instanceof Dataset);
Dataset d = (Dataset) result;
ObjectMapper mapper = new ObjectMapper();
System.out.println(mapper.writeValueAsString(d));
}
@Test
public void convertORPTest() throws Exception {
final String json = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/graph/orp.json"));
Oaf result = ProtoConverter.convert(json);
assertNotNull(result);
assertTrue(result instanceof OtherResearchProducts);
OtherResearchProducts orp = (OtherResearchProducts) result;
ObjectMapper mapper = new ObjectMapper();
System.out.println(mapper.writeValueAsString(orp));
}
@Test
public void convertSoftware() throws Exception {
final String json = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/graph/software.json"));
Oaf result = ProtoConverter.convert(json);
assertNotNull(result);
assertTrue(result instanceof Software);
Software s = (Software) result;
ObjectMapper mapper = new ObjectMapper();
System.out.println(mapper.writeValueAsString(s));
}
}

View File

@ -1 +0,0 @@
{"kind":1,"dataInfo":{"deletedbyinference":true,"provenanceaction":{"classid":"sysimport:actionset","classname":"sysimport:actionset","schemename":"dnet:provenanceActions","schemeid":"dnet:provenanceActions"},"inferred":true,"inferenceprovenance":"dedup-similarity-result-levenstein","invisible":false,"trust":"0.9"},"entity":{"pid":[{"qualifier":{"classid":"doi","classname":"doi","schemename":"dnet:pid_types","schemeid":"dnet:pid_types"},"value":"10.5517/cc11xr4v"}],"result":{"instance":[{"url":["http://dx.doi.org/10.5517/cc11xr4v"],"collectedfrom":{"value":"scholExplorer","key":"10|openaire____::e034d6a11054f5ade9221ebac484e864"},"hostedby":{"value":"Unknown Repository","key":"10|openaire____::55045bd2a65019fd8e6741a755395c8c"},"accessright":{"classid":"UNKNOWN","classname":"not available","schemename":"dnet:access_modes","schemeid":"dnet:access_modes"},"instancetype":{"classid":"0000","classname":"Unknown","schemename":"dnet:publication_resource","schemeid":"dnet:publication_resource"}}],"metadata":{"publisher":{"value":"Cambridge Crystallographic Data Centre"},"description":[{"value":"An entry from the Cambridge Structural Database, the worlds repository for small molecule crystal structures. The entry contains experimental data from a crystal diffraction study. The deposited dataset for this entry is freely available from the CCDC and typically includes 3D coordinates, cell parameters, space group, experimental conditions and quality measures."}],"language":{"classid":"und","classname":"Undetermined","schemename":"dent:languages","schemeid":"dent:languages"},"title":[{"qualifier":{"classid":"main title","classname":"main title","schemename":"dnet:dataCite_title","schemeid":"dnet:dataCite_title"},"value":"CCDC 980937: Experimental Crystal Structure Determination"}],"author":[{"fullname":"Yuan, Xian-You","rank":1},{"fullname":"Ou, Guang-Chuan","rank":2},{"fullname":"Yuan, Lin","rank":3},{"fullname":"Zhang, Xin-Yu","rank":4},{"fullname":"Zhang, Min","rank":5}],"resulttype":{"classid":"dataset","classname":"dataset","schemename":"dnet:result_typologies","schemeid":"dnet:result_typologies"},"relevantdate":[{"qualifier":{"classid":"dnet:date","classname":"dnet:date","schemename":"dnet:date","schemeid":"dnet:date"},"value":"2016-01-01"}],"subject":[{"qualifier":{"classid":"keyword","classname":"keyword","schemename":"dnet:subject","schemeid":"dnet:subject"},"value":"Experimental 3D Coordinates"},{"qualifier":{"classid":"keyword","classname":"keyword","schemename":"dnet:subject","schemeid":"dnet:subject"},"value":"Crystal Structure"},{"qualifier":{"classid":"keyword","classname":"keyword","schemename":"dnet:subject","schemeid":"dnet:subject"},"value":"(5-ethyl-2-methyl-2-phenyl-1,3-dioxan-5-yl)methanol"},{"qualifier":{"classid":"keyword","classname":"keyword","schemename":"dnet:subject","schemeid":"dnet:subject"},"value":"Crystal System"},{"qualifier":{"classid":"keyword","classname":"keyword","schemename":"dnet:subject","schemeid":"dnet:subject"},"value":"Space Group"},{"qualifier":{"classid":"keyword","classname":"keyword","schemename":"dnet:subject","schemeid":"dnet:subject"},"value":"Cell Parameters"},{"qualifier":{"classid":"keyword","classname":"keyword","schemename":"dnet:subject","schemeid":"dnet:subject"},"value":"Crystallography"}]}},"collectedfrom":[{"value":"scholExplorer","key":"10|openaire____::e034d6a11054f5ade9221ebac484e864"}],"dateofcollection":"2019-10-22T14:29:26+00:00","type":50,"id":"50|scholexplore::000023d184acb169596e3e6004abb421"}}

View File

@ -1,73 +0,0 @@
{
"kind": "entity",
"entity": {
"type": "datasource",
"datasource": {
"metadata": {
"officialname": {
"value": "CRIS UNS (Current Research Information System University of Novi Sad)"
},
"englishname": {
"value": "CRIS UNS (Current Research Information System University of Novi Sad)"
},
"websiteurl": {
"value": "https://cris.uns.ac.rs/"
},
"accessinfopackage": [
{
"value": "https://cris.uns.ac.rs/OAIHandlerOpenAIRECRIS"
}
],
"namespaceprefix": {
"value": "CrisUnsNoviS"
},
"datasourcetype": {
"classid": "crissystem",
"classname": "CRIS System",
"schemeid": "dnet:datasource_typologies",
"schemename": "dnet:datasource_typologies"
},
"openairecompatibility": {
"classid": "openaire-cris_1.1",
"classname": "OpenAIRE CRIS v1.1",
"schemeid": "dnet:datasourceCompatibilityLevel",
"schemename": "dnet:datasourceCompatibilityLevel"
},
"latitude": {
"value": "0.0"
},
"longitude": {
"value": "0.0"
},
"journal": {
"issnPrinted": "",
"issnOnline": "",
"issnLinking": ""
}
}
},
"originalId": [
"CRIS_UNS____::openaire"
],
"collectedfrom": [
{
"key": "",
"value": ""
}
],
"dateofcollection": "2019-04-04",
"id": "10|CRIS_UNS____::f66f1bd369679b5b077dcdf006089556",
"dateoftransformation": ""
},
"dataInfo": {
"inferred": false,
"deletedbyinference": false,
"trust": "0.9",
"provenanceaction": {
"classid": "sysimport:crosswalk:entityregistry",
"classname": "sysimport:crosswalk:entityregistry",
"schemeid": "dnet:provenance_actions",
"schemename": "dnet:provenance_actions"
}
}
}

View File

@ -1,73 +0,0 @@
{
"kind": "entity",
"entity": {
"type": "organization",
"organization": {
"metadata": {
"legalname": {
"value": "University of Utrecht"
},
"eclegalbody": {
"value": "false"
},
"eclegalperson": {
"value": "false"
},
"ecnonprofit": {
"value": "false"
},
"ecresearchorganization": {
"value": "false"
},
"echighereducation": {
"value": "false"
},
"ecinternationalorganizationeurinterests": {
"value": "false"
},
"ecinternationalorganization": {
"value": "false"
},
"ecenterprise": {
"value": "false"
},
"ecsmevalidated": {
"value": "false"
},
"ecnutscode": {
"value": "false"
},
"country": {
"classid": "FI",
"classname": "Finland",
"schemeid": "dnet:countries",
"schemename": "dnet:countries"
}
}
},
"originalId": [
"aka_________::f88cc5f874ff27f0fd6e7cb24842e9fb"
],
"collectedfrom": [
{
"key": "10|openaire____::6ac933301a3933c8a22ceebea7000326",
"value": "Academy of Finland"
}
],
"dateofcollection": "2018-09-28",
"id": "20|aka_________::0070a5080d7092f960fb33c8a9fca016",
"dateoftransformation": "2019-04-16"
},
"dataInfo": {
"inferred": true,
"deletedbyinference": true,
"trust": "0.9",
"inferenceprovenance": "dedup-similarity-organization-simple",
"provenanceaction": {
"classid": "sysimport:crosswalk:entityregistry",
"classname": "sysimport:crosswalk:entityregistry",
"schemeid": "dnet:provenance_actions",
"schemename": "dnet:provenance_actions"
}
}
}

View File

@ -1 +0,0 @@
{"kind":1,"dataInfo":{"trust":"0.9","invisible":false,"deletedbyinference":false,"inferred":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"sysimport:crosswalk:datasetarchive","schemename":"dnet:provenanceActions","schemeid":"dnet:provenanceActions"}},"entity":{"dateoftransformation":"","pid":[{"qualifier":{"classid":"doi","classname":"doi","schemename":"dnet:pid_types","schemeid":"dnet:pid_types"},"value":"10.3203/iwf/c-13106eng"},{"qualifier":{"classid":"doi","classname":"doi","schemename":"dnet:pid_types","schemeid":"dnet:pid_types"},"value":"https://doi.org/10.3203/iwf/c-13106eng"}],"originalId":["https://doi.org/10.3203/iwf/c-13106eng","http://dx.doi.org/10.3203/iwf/c-13106eng","10.3203/iwf/c-13106eng"],"oaiprovenance":{"originDescription":{"metadataNamespace":"","altered":true,"baseURL":"https%3A%2F%2Foai.datacite.org%2Foai","datestamp":"","harvestDate":"2019-04-03T17:58:12.853Z","identifier":"10.3203/iwf/c-13106eng"}},"result":{"instance":[{"hostedby":{"value":"Unknown Repository","key":"10|openaire____::55045bd2a65019fd8e6741a755395c8c"},"license":{"value":""},"url":["http://dx.doi.org/10.3203/iwf/c-13106eng"],"distributionlocation":"","dateofacceptance":{"value":"2007-01-01"},"collectedfrom":{"value":"Datacite","key":"10|openaire____::9e3be59865b2c1c335d32dae2fe7b254"},"accessright":{"classid":"UNKNOWN","classname":"UNKNOWN","schemename":"dnet:access_modes","schemeid":"dnet:access_modes"},"instancetype":{"classid":"0000","classname":"Unknown","schemename":"dnet:dataCite_resource","schemeid":"dnet:dataCite_resource"}}],"metadata":{"publisher":{"value":"IWF (Göttingen)"},"license":[{"value":""}],"description":[{"value":"A 2D animation explains the molecular structure of histone octamers. From the CD-ROM: BEREITER-HAHN, JÜRGEN / PETERS, WINFRIED S. (Frankfurt a. M.). The Cell IV - Nucleus of Life - From Gene to Proteins (C 7103)"}],"language":{"classid":"eng","classname":"English","schemename":"dnet:languages","schemeid":"dnet:languages"},"title":[{"qualifier":{"classid":"main title","classname":"main title","schemename":"dnet:dataCite_title","schemeid":"dnet:dataCite_title"},"value":"Histone Octamer"}],"author":[{"fullname":"IWF","rank":1}],"resulttype":{"classid":"other","classname":"other","schemename":"dnet:result_typologies","schemeid":"dnet:result_typologies"},"version":{"value":"None"},"storagedate":{"value":"2007"},"dateofacceptance":{"value":"2007-01-01"},"size":{"value":""},"subject":[{"qualifier":{"classid":"keyword","classname":"keyword","schemename":"dnet:subject_classification_typologies","schemeid":"dnet:subject_classification_typologies"},"value":"Life Sciences"},{"qualifier":{"classid":"keyword","classname":"keyword","schemename":"dnet:subject_classification_typologies","schemeid":"dnet:subject_classification_typologies"},"value":"histone"},{"qualifier":{"classid":"keyword","classname":"keyword","schemename":"dnet:subject_classification_typologies","schemeid":"dnet:subject_classification_typologies"},"value":"nucleosome"}]}},"collectedfrom":[{"value":"Datacite","key":"10|openaire____::9e3be59865b2c1c335d32dae2fe7b254"}],"dateofcollection":"2018-10-28T00:39:04.337Z","type":50,"id":"50|datacite____::0000228dcefe42612ec4bd83810fe348"}}

View File

@ -1 +0,0 @@
{"kind": "entity","entity": {"type": "result","result": {"metadata": {"title": [{"value": "SILK PRINTING WITH RECENT DEVELOPMENTS","qualifier": {"classid": "main title","classname": "main title","schemeid": "dnet:dataCite_title","schemename": "dnet:dataCite_title"}},{"value": "Son Gelişmelerle İpek Baskıcılığı","qualifier": {"classid": "main title","classname": "main title","schemeid": "dnet:dataCite_title","schemename": "dnet:dataCite_title"}}],"dateofacceptance": {"value": "1987-06-01"},"publisher": {"value": "Tekstil Mühendisleri Odası"},"resulttype": {"classid": "publication","classname": "publication","schemeid": "dnet:result_typologies","schemename": "dnet:result_typologies"},"language": {"classid": "tur","classname": "Turkish","schemeid": "dnet:languages","schemename": "dnet:languages"},"journal": {"name": "Tekstil ve Mühendis","issnPrinted": "1300-7599"},"format": [{"value": "application/pdf"},{"value": "application/pdf"}],"description": [{"value": " "},{"value": " "}],"source": [{"value": "Tekstil ve Mühendis; Yıl: 1987 Cilt: 1 Sayı: 4"},{"value": "2147-0510"},{"value": "1300-7599"}],"author": [{"fullname": "YAKARTEPE, Mehmet","name": "Mehmet","surname": "Yakartepe","rank": 1},{"fullname": "YAKARTEPE, Zerrin","name": "Zerrin","surname": "Yakartepe","rank": 2}]},"instance": [{"accessright": {"classid": "OPEN","classname": "Open Access","schemeid": "dnet:access_modes","schemename": "dnet:access_modes"},"instancetype": {"classid": "0001","classname": "Article","schemeid": "dnet:publication_resource","schemename": "dnet:publication_resource"},"hostedby": {"key": "10|tubitakulakb::34a91944da68f59ebc51994b4db64cda","value": "Tekstil ve Mühendis"},"url": ["http://dergi.tekstilvemuhendis.org.tr/article/view/5000000711"],"collectedfrom": {"key": "10|openaire____::85e51732975595215ae3c2514e272ce6","value": "TÜBİTAK ULAKBİM DergiPark"},"dateofacceptance": {"value": "1987-06-01"}}]},"originalId": ["oai:dergipark.ulakbim.gov.tr:record/124507"],"collectedfrom": [{"key": "10|openaire____::85e51732975595215ae3c2514e272ce6","value": "TÜBİTAK ULAKBİM DergiPark"}],"dateofcollection": "2019-07-29T15:35:19Z","id": "50|tubitakulakb::7fe767f5f1dfd5bbe0a3e5e9b2a10cc9","dateoftransformation": "","oaiprovenance": {"originDescription": {"harvestDate": "2018-10-13T09:48:19.806Z","altered": true,"baseURL": "http://dergipark.ulakbim.gov.tr/v2/harvester/index.php/oai","identifier": "oai:dergipark.ulakbim.gov.tr:record/124507","datestamp": "2018-10-13T09:48:19Z","metadataNamespace": "http://www.openarchives.org/OAI/2.0/oai_dc/"}}},"dataInfo": {"inferred": true,"deletedbyinference": true,"trust": "0.9","inferenceprovenance": "dedup-similarity-result-levenstein","provenanceaction": {"classid": "sysimport:crosswalk:repository","classname": "sysimport:crosswalk:repository","schemeid": "dnet:provenanceActions","schemename": "dnet:provenanceActions"},"invisible": false}}

File diff suppressed because one or more lines are too long

View File

@ -359,7 +359,7 @@
<!-- creating tar.gz package -->
<tar destfile="target/${oozie.package.file.name}.tar.gz" compression="gzip">
<tar destfile="target/${oozie.package.file.name}.tar.gz" compression="gzip" longfile="gnu">
<tarfileset dir="target/${oozie.package.file.name}" />
<tarfileset dir="target/${oozie.package.file.name}_shell_scripts" filemode="0755">
<include name="**/*.sh" />

20
pom.xml
View File

@ -236,25 +236,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${google.protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.googlecode.protobuf-java-format</groupId>
<artifactId>protobuf-java-format</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-openaire-data-protos</artifactId>
<version>3.9.5-proto250</version>
</dependency>
</dependencies>
</dependencyManagement>
@ -456,7 +437,6 @@
<dhp.jackson.version>2.9.6</dhp.jackson.version>
<dhp.commons.lang.version>3.5</dhp.commons.lang.version>
<scala.version>2.11.8</scala.version>
<google.protobuf.version>2.5.0</google.protobuf.version>
</properties>
</project>