forked from antonis.lempesis/dnet-hadoop
removed circular dependencies
This commit is contained in:
parent
a10d071cf4
commit
09ffda03a2
|
@ -65,7 +65,7 @@ public class Datasource extends OafEntity implements Serializable {
|
|||
private Field<String> databaseaccessrestriction;
|
||||
|
||||
// {feeRequired, registration, other}
|
||||
private Field<String> datauploadrestriction9;
|
||||
private Field<String> datauploadrestriction;
|
||||
|
||||
private Field<Boolean> versioning;
|
||||
|
||||
|
@ -298,12 +298,12 @@ public class Datasource extends OafEntity implements Serializable {
|
|||
this.databaseaccessrestriction = databaseaccessrestriction;
|
||||
}
|
||||
|
||||
public Field<String> getDatauploadrestriction9() {
|
||||
return datauploadrestriction9;
|
||||
public Field<String> getDatauploadrestriction() {
|
||||
return datauploadrestriction;
|
||||
}
|
||||
|
||||
public void setDatauploadrestriction9(Field<String> datauploadrestriction9) {
|
||||
this.datauploadrestriction9 = datauploadrestriction9;
|
||||
public void setDatauploadrestriction(Field<String> datauploadrestriction) {
|
||||
this.datauploadrestriction = datauploadrestriction;
|
||||
}
|
||||
|
||||
public Field<Boolean> getVersioning() {
|
||||
|
|
|
@ -32,95 +32,107 @@ public class Journal implements Serializable {
|
|||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
public Journal setName(String name) {
|
||||
this.name = name;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getIssnPrinted() {
|
||||
return issnPrinted;
|
||||
}
|
||||
|
||||
public void setIssnPrinted(String issnPrinted) {
|
||||
public Journal setIssnPrinted(String issnPrinted) {
|
||||
this.issnPrinted = issnPrinted;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getIssnOnline() {
|
||||
return issnOnline;
|
||||
}
|
||||
|
||||
public void setIssnOnline(String issnOnline) {
|
||||
public Journal setIssnOnline(String issnOnline) {
|
||||
this.issnOnline = issnOnline;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getIssnLinking() {
|
||||
return issnLinking;
|
||||
}
|
||||
|
||||
public void setIssnLinking(String issnLinking) {
|
||||
public Journal setIssnLinking(String issnLinking) {
|
||||
this.issnLinking = issnLinking;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getEp() {
|
||||
return ep;
|
||||
}
|
||||
|
||||
public void setEp(String ep) {
|
||||
public Journal setEp(String ep) {
|
||||
this.ep = ep;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getIss() {
|
||||
return iss;
|
||||
}
|
||||
|
||||
public void setIss(String iss) {
|
||||
public Journal setIss(String iss) {
|
||||
this.iss = iss;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getSp() {
|
||||
return sp;
|
||||
}
|
||||
|
||||
public void setSp(String sp) {
|
||||
public Journal setSp(String sp) {
|
||||
this.sp = sp;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getVol() {
|
||||
return vol;
|
||||
}
|
||||
|
||||
public void setVol(String vol) {
|
||||
public Journal setVol(String vol) {
|
||||
this.vol = vol;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getEdition() {
|
||||
return edition;
|
||||
}
|
||||
|
||||
public void setEdition(String edition) {
|
||||
public Journal setEdition(String edition) {
|
||||
this.edition = edition;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getConferenceplace() {
|
||||
return conferenceplace;
|
||||
}
|
||||
|
||||
public void setConferenceplace(String conferenceplace) {
|
||||
public Journal setConferenceplace(String conferenceplace) {
|
||||
this.conferenceplace = conferenceplace;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getConferencedate() {
|
||||
return conferencedate;
|
||||
}
|
||||
|
||||
public void setConferencedate(String conferencedate) {
|
||||
public Journal setConferencedate(String conferencedate) {
|
||||
this.conferencedate = conferencedate;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DataInfo getDataInfo() {
|
||||
return dataInfo;
|
||||
}
|
||||
|
||||
public void setDataInfo(DataInfo dataInfo) {
|
||||
public Journal setDataInfo(DataInfo dataInfo) {
|
||||
this.dataInfo = dataInfo;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ public abstract class OafEntity extends Oaf implements Serializable {
|
|||
private String dateoftransformation;
|
||||
|
||||
//TODO remove this field
|
||||
private List<OafEntity> children;
|
||||
// private List<OafEntity> children;
|
||||
|
||||
private List<ExtraInfo> extraInfo;
|
||||
|
||||
|
@ -73,13 +73,13 @@ public abstract class OafEntity extends Oaf implements Serializable {
|
|||
this.dateoftransformation = dateoftransformation;
|
||||
}
|
||||
|
||||
public List<OafEntity> getChildren() {
|
||||
return children;
|
||||
}
|
||||
|
||||
public void setChildren(List<OafEntity> children) {
|
||||
this.children = children;
|
||||
}
|
||||
// public List<OafEntity> getChildren() {
|
||||
// return children;
|
||||
// }
|
||||
//
|
||||
// public void setChildren(List<OafEntity> children) {
|
||||
// this.children = children;
|
||||
// }
|
||||
|
||||
public List<ExtraInfo> getExtraInfo() {
|
||||
return extraInfo;
|
||||
|
|
|
@ -16,7 +16,7 @@ public class OriginDescription implements Serializable {
|
|||
|
||||
private String metadataNamespace;
|
||||
|
||||
private OriginDescription originDescription;
|
||||
//private OriginDescription originDescription;
|
||||
|
||||
public String getHarvestDate() {
|
||||
return harvestDate;
|
||||
|
@ -72,12 +72,12 @@ public class OriginDescription implements Serializable {
|
|||
return this;
|
||||
}
|
||||
|
||||
public OriginDescription getOriginDescription() {
|
||||
return originDescription;
|
||||
}
|
||||
|
||||
public OriginDescription setOriginDescription(OriginDescription originDescription) {
|
||||
this.originDescription = originDescription;
|
||||
return this;
|
||||
}
|
||||
// public OriginDescription getOriginDescription() {
|
||||
// return originDescription;
|
||||
// }
|
||||
//
|
||||
// public OriginDescription setOriginDescription(OriginDescription originDescription) {
|
||||
// this.originDescription = originDescription;
|
||||
// return this;
|
||||
// }
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@ public class Qualifier implements Serializable {
|
|||
private String schemeid;
|
||||
private String schemename;
|
||||
|
||||
private DataInfo dataInfo;
|
||||
// private DataInfo dataInfo;
|
||||
|
||||
public String getClassid() {
|
||||
return classid;
|
||||
|
@ -47,12 +47,12 @@ public class Qualifier implements Serializable {
|
|||
return this;
|
||||
}
|
||||
|
||||
public DataInfo getDataInfo() {
|
||||
return dataInfo;
|
||||
}
|
||||
|
||||
public Qualifier setDataInfo(DataInfo dataInfo) {
|
||||
this.dataInfo = dataInfo;
|
||||
return this;
|
||||
}
|
||||
// public DataInfo getDataInfo() {
|
||||
// return dataInfo;
|
||||
// }
|
||||
//
|
||||
// public Qualifier setDataInfo(DataInfo dataInfo) {
|
||||
// this.dataInfo = dataInfo;
|
||||
// return this;
|
||||
// }
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ public class ProtoConverter implements Serializable {
|
|||
if (oaf.getKind() == KindProtos.Kind.entity)
|
||||
return convertEntity(oaf);
|
||||
else {
|
||||
return convertRelation(oaf);
|
||||
return convertRelation(oaf);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
|
@ -40,8 +40,8 @@ public class ProtoConverter implements Serializable {
|
|||
.setRelClass(r.getRelClass())
|
||||
.setCollectedFrom(r.getCollectedfromCount() > 0 ?
|
||||
r.getCollectedfromList().stream()
|
||||
.map(kv -> mapKV(kv))
|
||||
.collect(Collectors.toList()) : null);
|
||||
.map(kv -> mapKV(kv))
|
||||
.collect(Collectors.toList()) : null);
|
||||
}
|
||||
|
||||
private static OafEntity convertEntity(OafProtos.Oaf oaf) {
|
||||
|
@ -151,6 +151,28 @@ public class ProtoConverter implements Serializable {
|
|||
|
||||
//TODO r3data fields
|
||||
|
||||
result.setReleasestartdate(mapStringField(datasource.getReleasestartdate()));
|
||||
result.setReleaseenddate(mapStringField(datasource.getReleaseenddate()));
|
||||
result.setMissionstatementurl(mapStringField(datasource.getMissionstatementurl()));
|
||||
result.setDataprovider(mapBoolField(datasource.getDataprovider()));
|
||||
result.setServiceprovider(mapBoolField(datasource.getServiceprovider()));
|
||||
result.setDatabaseaccesstype(mapStringField(datasource.getDatabaseaccesstype()));
|
||||
result.setDatauploadtype(mapStringField(datasource.getDatauploadtype()));
|
||||
result.setDatabaseaccessrestriction(mapStringField(datasource.getDatabaseaccessrestriction()));
|
||||
result.setDatauploadrestriction(mapStringField(datasource.getDatauploadrestriction()));
|
||||
result.setVersioning(mapBoolField(datasource.getVersioning()));
|
||||
result.setCitationguidelineurl(mapStringField(datasource.getCitationguidelineurl()));
|
||||
result.setQualitymanagementkind(mapStringField(datasource.getQualitymanagementkind()));
|
||||
result.setPidsystems(mapStringField(datasource.getPidsystems()));
|
||||
result.setCertificates(mapStringField(datasource.getCertificates()));
|
||||
result.setPolicies(datasource.getPoliciesList()
|
||||
.stream()
|
||||
.map(ProtoUtils::mapKV)
|
||||
.collect(Collectors.toList())
|
||||
);
|
||||
|
||||
result.setJournal(mapJournal(datasource.getJournal()));
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -180,14 +202,14 @@ public class ProtoConverter implements Serializable {
|
|||
.setKeywords(mapStringField(m.getKeywords()))
|
||||
.setSubjects(m.getSubjectsCount() > 0 ?
|
||||
m.getSubjectsList().stream()
|
||||
.map(sp -> mapStructuredProperty(sp))
|
||||
.collect(Collectors.toList()) : null)
|
||||
.map(sp -> mapStructuredProperty(sp))
|
||||
.collect(Collectors.toList()) : null)
|
||||
.setTitle(mapStringField(m.getTitle()))
|
||||
.setWebsiteurl(mapStringField(m.getWebsiteurl()))
|
||||
.setFundingtree(m.getFundingtreeCount() > 0 ?
|
||||
m.getFundingtreeList().stream()
|
||||
.map(f -> mapStringField(f))
|
||||
.collect(Collectors.toList()) : null)
|
||||
.map(f -> mapStringField(f))
|
||||
.collect(Collectors.toList()) : null)
|
||||
.setJsonextrainfo(mapStringField(m.getJsonextrainfo()))
|
||||
.setSummary(mapStringField(m.getSummary()))
|
||||
.setOptional1(mapStringField(m.getOptional1()))
|
||||
|
@ -206,7 +228,7 @@ public class ProtoConverter implements Serializable {
|
|||
case "orp":
|
||||
return createORP(oaf);
|
||||
default:
|
||||
throw new RuntimeException("received unknown type :"+oaf.getEntity().getResult().getMetadata().getResulttype().getClassid());
|
||||
throw new RuntimeException("received unknown type :" + oaf.getEntity().getResult().getMetadata().getResulttype().getClassid());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -34,8 +34,8 @@ public class ProtoUtils {
|
|||
.setClassid(q.getClassid())
|
||||
.setClassname(q.getClassname())
|
||||
.setSchemeid(q.getSchemeid())
|
||||
.setSchemename(q.getSchemename())
|
||||
.setDataInfo(q.hasDataInfo() ? mapDataInfo(q.getDataInfo()) : null);
|
||||
.setSchemename(q.getSchemename());
|
||||
//.setDataInfo(q.hasDataInfo() ? mapDataInfo(q.getDataInfo()) : null);
|
||||
}
|
||||
|
||||
public static StructuredProperty mapStructuredProperty(FieldTypeProtos.StructuredProperty sp) {
|
||||
|
@ -68,10 +68,30 @@ public class ProtoUtils {
|
|||
.setIdentifier(originDescription.getIdentifier())
|
||||
.setDatestamp(originDescription.getDatestamp())
|
||||
.setMetadataNamespace(originDescription.getMetadataNamespace());
|
||||
if (originDescription.hasOriginDescription())
|
||||
originDescriptionResult.setOriginDescription(mapOriginalDescription(originDescription.getOriginDescription()));
|
||||
// if (originDescription.hasOriginDescription())
|
||||
// originDescriptionResult.setOriginDescription(mapOriginalDescription(originDescription.getOriginDescription()));
|
||||
return originDescriptionResult;
|
||||
|
||||
}
|
||||
|
||||
public static Journal mapJournal(FieldTypeProtos.Journal journal) {
|
||||
return new Journal()
|
||||
.setName(journal.getName())
|
||||
.setIssnPrinted(journal.getIssnPrinted())
|
||||
.setIssnOnline(journal.getIssnOnline())
|
||||
.setIssnLinking(journal.getIssnLinking())
|
||||
.setEp(journal.getEp())
|
||||
.setIss(journal.getIss())
|
||||
.setSp(journal.getSp())
|
||||
.setVol(journal.getVol())
|
||||
.setEdition(journal.getEdition())
|
||||
.setConferenceplace(journal.getConferenceplace())
|
||||
.setConferencedate(journal.getConferencedate())
|
||||
.setDataInfo(mapDataInfo(journal.getDataInfo()))
|
||||
;
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
public static Field<String> mapStringField(FieldTypeProtos.StringField s) {
|
||||
|
|
|
@ -1,15 +1,21 @@
|
|||
package eu.dnetlib.dhp.graph;
|
||||
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.PairFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoder;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import scala.Tuple2;
|
||||
|
||||
import javax.xml.crypto.Data;
|
||||
|
||||
public class SparkGraphImporterJob {
|
||||
|
||||
|
||||
|
@ -32,18 +38,19 @@ public class SparkGraphImporterJob {
|
|||
|
||||
final JavaRDD<Tuple2<String, String>> inputRDD = sc.sequenceFile("file:///home/sandro/part-m-00000", Text.class, Text.class).map(item -> new Tuple2<>(item._1.toString(), item._2.toString()));
|
||||
|
||||
Tuple2<String, String> item = inputRDD
|
||||
final JavaRDD<Datasource> datasources = inputRDD
|
||||
.filter(s -> s._1().split("@")[2].equalsIgnoreCase("body"))
|
||||
.first();
|
||||
|
||||
System.out.println(item._1());
|
||||
System.out.println(item._2());
|
||||
.map(Tuple2::_2)
|
||||
.map(ProtoConverter::convert)
|
||||
.filter(s-> s instanceof Datasource)
|
||||
.map(s->(Datasource)s);
|
||||
final Encoder<Datasource> encoder = Encoders.bean(Datasource.class);
|
||||
final Dataset<Datasource> mdstore = spark.createDataset(datasources.rdd(), encoder);
|
||||
|
||||
|
||||
// .map(Tuple2::_2)
|
||||
// .map(ProtoConverter::convert)
|
||||
// .mapToPair((PairFunction<Oaf, String,Integer>) s-> new Tuple2<String, Integer>(s.getClass().getName(),1))
|
||||
// .reduceByKey(Integer::sum).collect().forEach(System.out::println);
|
||||
System.out.println(mdstore.count());
|
||||
|
||||
//
|
||||
//
|
||||
// .filter(s -> s instanceof Publication)
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package eu.dnetlib.dhp.graph;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
@ -23,6 +24,9 @@ public class ProtoConverterTest {
|
|||
System.out.println(ds.getId());
|
||||
|
||||
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
System.out.println(mapper.writeValueAsString(result));
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue