forked from antonis.lempesis/dnet-hadoop
resolved conflicts
This commit is contained in:
commit
4b331790e7
|
@ -15,7 +15,6 @@ public class Author implements Serializable {
|
||||||
// json containing a Citation or Statistics
|
// json containing a Citation or Statistics
|
||||||
private String value;
|
private String value;
|
||||||
|
|
||||||
|
|
||||||
public String getName() {
|
public String getName() {
|
||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,9 +17,6 @@ public abstract class OafEntity extends Oaf implements Serializable {
|
||||||
|
|
||||||
private String dateoftransformation;
|
private String dateoftransformation;
|
||||||
|
|
||||||
//TODO remove this field
|
|
||||||
private List<OafEntity> children;
|
|
||||||
|
|
||||||
private List<ExtraInfo> extraInfo;
|
private List<ExtraInfo> extraInfo;
|
||||||
|
|
||||||
private OAIProvenance oaiprovenance;
|
private OAIProvenance oaiprovenance;
|
||||||
|
@ -79,15 +76,6 @@ public abstract class OafEntity extends Oaf implements Serializable {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<OafEntity> getChildren() {
|
|
||||||
return children;
|
|
||||||
}
|
|
||||||
|
|
||||||
public OafEntity setChildren(List<OafEntity> children) {
|
|
||||||
this.children = children;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<ExtraInfo> getExtraInfo() {
|
public List<ExtraInfo> getExtraInfo() {
|
||||||
return extraInfo;
|
return extraInfo;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@ public class OriginDescription implements Serializable {
|
||||||
|
|
||||||
private String metadataNamespace;
|
private String metadataNamespace;
|
||||||
|
|
||||||
private OriginDescription originDescription;
|
//private OriginDescription originDescription;
|
||||||
|
|
||||||
public String getHarvestDate() {
|
public String getHarvestDate() {
|
||||||
return harvestDate;
|
return harvestDate;
|
||||||
|
@ -72,12 +72,12 @@ public class OriginDescription implements Serializable {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public OriginDescription getOriginDescription() {
|
// public OriginDescription getOriginDescription() {
|
||||||
return originDescription;
|
// return originDescription;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
public OriginDescription setOriginDescription(OriginDescription originDescription) {
|
// public OriginDescription setOriginDescription(OriginDescription originDescription) {
|
||||||
this.originDescription = originDescription;
|
// this.originDescription = originDescription;
|
||||||
return this;
|
// return this;
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,7 @@ public class Qualifier implements Serializable {
|
||||||
private String schemeid;
|
private String schemeid;
|
||||||
private String schemename;
|
private String schemename;
|
||||||
|
|
||||||
private DataInfo dataInfo;
|
// private DataInfo dataInfo;
|
||||||
|
|
||||||
public String getClassid() {
|
public String getClassid() {
|
||||||
return classid;
|
return classid;
|
||||||
|
@ -47,12 +47,12 @@ public class Qualifier implements Serializable {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public DataInfo getDataInfo() {
|
// public DataInfo getDataInfo() {
|
||||||
return dataInfo;
|
// return dataInfo;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
public Qualifier setDataInfo(DataInfo dataInfo) {
|
// public Qualifier setDataInfo(DataInfo dataInfo) {
|
||||||
this.dataInfo = dataInfo;
|
// this.dataInfo = dataInfo;
|
||||||
return this;
|
// return this;
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,7 @@ public class ProtoConverter implements Serializable {
|
||||||
if (oaf.getKind() == KindProtos.Kind.entity)
|
if (oaf.getKind() == KindProtos.Kind.entity)
|
||||||
return convertEntity(oaf);
|
return convertEntity(oaf);
|
||||||
else {
|
else {
|
||||||
return convertRelation(oaf);
|
return convertRelation(oaf);
|
||||||
}
|
}
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
|
@ -40,8 +40,8 @@ public class ProtoConverter implements Serializable {
|
||||||
.setRelClass(r.getRelClass())
|
.setRelClass(r.getRelClass())
|
||||||
.setCollectedFrom(r.getCollectedfromCount() > 0 ?
|
.setCollectedFrom(r.getCollectedfromCount() > 0 ?
|
||||||
r.getCollectedfromList().stream()
|
r.getCollectedfromList().stream()
|
||||||
.map(kv -> mapKV(kv))
|
.map(kv -> mapKV(kv))
|
||||||
.collect(Collectors.toList()) : null);
|
.collect(Collectors.toList()) : null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static OafEntity convertEntity(OafProtos.Oaf oaf) {
|
private static OafEntity convertEntity(OafProtos.Oaf oaf) {
|
||||||
|
@ -64,6 +64,7 @@ public class ProtoConverter implements Serializable {
|
||||||
final DatasourceProtos.Datasource.Metadata m = oaf.getEntity().getDatasource().getMetadata();
|
final DatasourceProtos.Datasource.Metadata m = oaf.getEntity().getDatasource().getMetadata();
|
||||||
final Organization org = setOaf(new Organization(), oaf);
|
final Organization org = setOaf(new Organization(), oaf);
|
||||||
return setEntity(org, oaf);
|
return setEntity(org, oaf);
|
||||||
|
|
||||||
//TODO set org fields
|
//TODO set org fields
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,11 +72,10 @@ public class ProtoConverter implements Serializable {
|
||||||
final DatasourceProtos.Datasource.Metadata m = oaf.getEntity().getDatasource().getMetadata();
|
final DatasourceProtos.Datasource.Metadata m = oaf.getEntity().getDatasource().getMetadata();
|
||||||
final Datasource datasource = setOaf(new Datasource(), oaf);
|
final Datasource datasource = setOaf(new Datasource(), oaf);
|
||||||
return setEntity(datasource, oaf)
|
return setEntity(datasource, oaf)
|
||||||
.setAccessinfopackage(m.getAccessinfopackageCount() > 0 ?
|
.setAccessinfopackage(m.getAccessinfopackageList()
|
||||||
m.getAccessinfopackageList()
|
.stream()
|
||||||
.stream()
|
.map(ProtoUtils::mapStringField)
|
||||||
.map(ProtoUtils::mapStringField)
|
.collect(Collectors.toList()))
|
||||||
.collect(Collectors.toList()) : null)
|
|
||||||
.setCertificates(mapStringField(m.getCertificates()))
|
.setCertificates(mapStringField(m.getCertificates()))
|
||||||
.setCitationguidelineurl(mapStringField(m.getCitationguidelineurl()))
|
.setCitationguidelineurl(mapStringField(m.getCitationguidelineurl()))
|
||||||
.setContactemail(mapStringField(m.getContactemail()))
|
.setContactemail(mapStringField(m.getContactemail()))
|
||||||
|
@ -94,36 +94,32 @@ public class ProtoConverter implements Serializable {
|
||||||
.setLogourl(mapStringField(m.getLogourl()))
|
.setLogourl(mapStringField(m.getLogourl()))
|
||||||
.setMissionstatementurl(mapStringField(m.getMissionstatementurl()))
|
.setMissionstatementurl(mapStringField(m.getMissionstatementurl()))
|
||||||
.setNamespaceprefix(mapStringField(m.getNamespaceprefix()))
|
.setNamespaceprefix(mapStringField(m.getNamespaceprefix()))
|
||||||
.setOdcontenttypes(m.getOdcontenttypesCount() > 0 ?
|
.setOdcontenttypes(m.getOdcontenttypesList()
|
||||||
m.getOdcontenttypesList()
|
.stream()
|
||||||
.stream()
|
.map(ProtoUtils::mapStringField)
|
||||||
.map(ProtoUtils::mapStringField)
|
.collect(Collectors.toList()))
|
||||||
.collect(Collectors.toList()) : null)
|
.setOdlanguages(m.getOdlanguagesList()
|
||||||
.setOdlanguages(m.getOdlanguagesCount() > 0 ?
|
.stream()
|
||||||
m.getOdlanguagesList()
|
.map(ProtoUtils::mapStringField)
|
||||||
.stream()
|
.collect(Collectors.toList()))
|
||||||
.map(ProtoUtils::mapStringField)
|
|
||||||
.collect(Collectors.toList()) : null)
|
|
||||||
.setOdnumberofitems(mapStringField(m.getOdnumberofitems()))
|
.setOdnumberofitems(mapStringField(m.getOdnumberofitems()))
|
||||||
.setOdnumberofitemsdate(mapStringField(m.getOdnumberofitemsdate()))
|
.setOdnumberofitemsdate(mapStringField(m.getOdnumberofitemsdate()))
|
||||||
.setOdpolicies(mapStringField(m.getOdpolicies()))
|
.setOdpolicies(mapStringField(m.getOdpolicies()))
|
||||||
.setOfficialname(mapStringField(m.getOfficialname()))
|
.setOfficialname(mapStringField(m.getOfficialname()))
|
||||||
.setOpenairecompatibility(mapQualifier(m.getOpenairecompatibility()))
|
.setOpenairecompatibility(mapQualifier(m.getOpenairecompatibility()))
|
||||||
.setPidsystems(mapStringField(m.getPidsystems()))
|
.setPidsystems(mapStringField(m.getPidsystems()))
|
||||||
.setPolicies(m.getPoliciesCount() > 0 ?
|
.setPolicies(m.getPoliciesList()
|
||||||
m.getPoliciesList()
|
.stream()
|
||||||
.stream()
|
.map(ProtoUtils::mapKV)
|
||||||
.map(ProtoUtils::mapKV)
|
.collect(Collectors.toList()))
|
||||||
.collect(Collectors.toList()) : null)
|
|
||||||
.setQualitymanagementkind(mapStringField(m.getQualitymanagementkind()))
|
.setQualitymanagementkind(mapStringField(m.getQualitymanagementkind()))
|
||||||
.setReleaseenddate(mapStringField(m.getReleaseenddate()))
|
.setReleaseenddate(mapStringField(m.getReleaseenddate()))
|
||||||
.setServiceprovider(mapBoolField(m.getServiceprovider()))
|
.setServiceprovider(mapBoolField(m.getServiceprovider()))
|
||||||
.setReleasestartdate(mapStringField(m.getReleasestartdate()))
|
.setReleasestartdate(mapStringField(m.getReleasestartdate()))
|
||||||
.setSubjects(m.getSubjectsCount() > 0 ?
|
.setSubjects(m.getSubjectsList()
|
||||||
m.getSubjectsList()
|
.stream()
|
||||||
.stream()
|
.map(ProtoUtils::mapStructuredProperty)
|
||||||
.map(ProtoUtils::mapStructuredProperty)
|
.collect(Collectors.toList()))
|
||||||
.collect(Collectors.toList()) : null)
|
|
||||||
.setVersioning(mapBoolField(m.getVersioning()))
|
.setVersioning(mapBoolField(m.getVersioning()))
|
||||||
.setWebsiteurl(mapStringField(m.getWebsiteurl()))
|
.setWebsiteurl(mapStringField(m.getWebsiteurl()))
|
||||||
.setJournal(mapJournal(m.getJournal()));
|
.setJournal(mapJournal(m.getJournal()));
|
||||||
|
@ -151,16 +147,14 @@ public class ProtoConverter implements Serializable {
|
||||||
.setFundedamount(m.getFundedamount())
|
.setFundedamount(m.getFundedamount())
|
||||||
.setTotalcost(m.getTotalcost())
|
.setTotalcost(m.getTotalcost())
|
||||||
.setKeywords(mapStringField(m.getKeywords()))
|
.setKeywords(mapStringField(m.getKeywords()))
|
||||||
.setSubjects(m.getSubjectsCount() > 0 ?
|
.setSubjects(m.getSubjectsList().stream()
|
||||||
m.getSubjectsList().stream()
|
.map(sp -> mapStructuredProperty(sp))
|
||||||
.map(sp -> mapStructuredProperty(sp))
|
.collect(Collectors.toList()))
|
||||||
.collect(Collectors.toList()) : null)
|
|
||||||
.setTitle(mapStringField(m.getTitle()))
|
.setTitle(mapStringField(m.getTitle()))
|
||||||
.setWebsiteurl(mapStringField(m.getWebsiteurl()))
|
.setWebsiteurl(mapStringField(m.getWebsiteurl()))
|
||||||
.setFundingtree(m.getFundingtreeCount() > 0 ?
|
.setFundingtree(m.getFundingtreeList().stream()
|
||||||
m.getFundingtreeList().stream()
|
.map(f -> mapStringField(f))
|
||||||
.map(f -> mapStringField(f))
|
.collect(Collectors.toList()))
|
||||||
.collect(Collectors.toList()) : null)
|
|
||||||
.setJsonextrainfo(mapStringField(m.getJsonextrainfo()))
|
.setJsonextrainfo(mapStringField(m.getJsonextrainfo()))
|
||||||
.setSummary(mapStringField(m.getSummary()))
|
.setSummary(mapStringField(m.getSummary()))
|
||||||
.setOptional1(mapStringField(m.getOptional1()))
|
.setOptional1(mapStringField(m.getOptional1()))
|
||||||
|
@ -179,7 +173,7 @@ public class ProtoConverter implements Serializable {
|
||||||
case "orp":
|
case "orp":
|
||||||
return createORP(oaf);
|
return createORP(oaf);
|
||||||
default:
|
default:
|
||||||
throw new RuntimeException("received unknown type :"+oaf.getEntity().getResult().getMetadata().getResulttype().getClassid());
|
throw new RuntimeException("received unknown type :" + oaf.getEntity().getResult().getMetadata().getResulttype().getClassid());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -63,8 +63,8 @@ public class ProtoUtils {
|
||||||
.setClassid(q.getClassid())
|
.setClassid(q.getClassid())
|
||||||
.setClassname(q.getClassname())
|
.setClassname(q.getClassname())
|
||||||
.setSchemeid(q.getSchemeid())
|
.setSchemeid(q.getSchemeid())
|
||||||
.setSchemename(q.getSchemename())
|
.setSchemename(q.getSchemename());
|
||||||
.setDataInfo(q.hasDataInfo() ? mapDataInfo(q.getDataInfo()) : null);
|
//.setDataInfo(q.hasDataInfo() ? mapDataInfo(q.getDataInfo()) : null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static StructuredProperty mapStructuredProperty(FieldTypeProtos.StructuredProperty sp) {
|
public static StructuredProperty mapStructuredProperty(FieldTypeProtos.StructuredProperty sp) {
|
||||||
|
@ -95,8 +95,8 @@ public class ProtoUtils {
|
||||||
.setIdentifier(originDescription.getIdentifier())
|
.setIdentifier(originDescription.getIdentifier())
|
||||||
.setDatestamp(originDescription.getDatestamp())
|
.setDatestamp(originDescription.getDatestamp())
|
||||||
.setMetadataNamespace(originDescription.getMetadataNamespace());
|
.setMetadataNamespace(originDescription.getMetadataNamespace());
|
||||||
if (originDescription.hasOriginDescription())
|
// if (originDescription.hasOriginDescription())
|
||||||
originDescriptionResult.setOriginDescription(mapOriginalDescription(originDescription.getOriginDescription()));
|
// originDescriptionResult.setOriginDescription(mapOriginalDescription(originDescription.getOriginDescription()));
|
||||||
return originDescriptionResult;
|
return originDescriptionResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,15 +1,21 @@
|
||||||
package eu.dnetlib.dhp.graph;
|
package eu.dnetlib.dhp.graph;
|
||||||
|
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.api.java.function.PairFunction;
|
import org.apache.spark.api.java.function.PairFunction;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoder;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
import javax.xml.crypto.Data;
|
||||||
|
|
||||||
public class SparkGraphImporterJob {
|
public class SparkGraphImporterJob {
|
||||||
|
|
||||||
|
|
||||||
|
@ -30,20 +36,24 @@ public class SparkGraphImporterJob {
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
|
||||||
final JavaRDD<Tuple2<String, String>> inputRDD = sc.sequenceFile("file:///home/sandro/part-m-00000", Text.class, Text.class).map(item -> new Tuple2<>(item._1.toString(), item._2.toString()));
|
final String path = "file:///home/sandro/part-m-00000";
|
||||||
|
final JavaRDD<Tuple2<String, String>> inputRDD = sc.sequenceFile(path, Text.class, Text.class)
|
||||||
|
.map(item -> new Tuple2<>(item._1.toString(), item._2.toString()));
|
||||||
|
|
||||||
Tuple2<String, String> item = inputRDD
|
|
||||||
|
|
||||||
|
final JavaRDD<Datasource> datasources = inputRDD
|
||||||
.filter(s -> s._1().split("@")[2].equalsIgnoreCase("body"))
|
.filter(s -> s._1().split("@")[2].equalsIgnoreCase("body"))
|
||||||
.first();
|
.map(Tuple2::_2)
|
||||||
|
.map(ProtoConverter::convert)
|
||||||
System.out.println(item._1());
|
.filter(s-> s instanceof Datasource)
|
||||||
System.out.println(item._2());
|
.map(s->(Datasource)s);
|
||||||
|
final Encoder<Datasource> encoder = Encoders.bean(Datasource.class);
|
||||||
|
final Dataset<Datasource> mdstore = spark.createDataset(datasources.rdd(), encoder);
|
||||||
|
|
||||||
|
|
||||||
// .map(Tuple2::_2)
|
System.out.println(mdstore.count());
|
||||||
// .map(ProtoConverter::convert)
|
|
||||||
// .mapToPair((PairFunction<Oaf, String,Integer>) s-> new Tuple2<String, Integer>(s.getClass().getName(),1))
|
|
||||||
// .reduceByKey(Integer::sum).collect().forEach(System.out::println);
|
|
||||||
//
|
//
|
||||||
//
|
//
|
||||||
// .filter(s -> s instanceof Publication)
|
// .filter(s -> s instanceof Publication)
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package eu.dnetlib.dhp.graph;
|
package eu.dnetlib.dhp.graph;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
@ -23,6 +24,9 @@ public class ProtoConverterTest {
|
||||||
System.out.println(ds.getId());
|
System.out.println(ds.getId());
|
||||||
|
|
||||||
|
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
System.out.println(mapper.writeValueAsString(result));
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue