merge branch with fork master

This commit is contained in:
Miriam Baglioni 2020-06-09 15:41:14 +02:00
commit 206abba48c
25 changed files with 771 additions and 376 deletions

View File

@ -58,6 +58,18 @@ public class ModelSupport {
oafTypes.put("relation", Relation.class);
}
public static final Map<Class, String> idPrefixMap = Maps.newHashMap();
static {
idPrefixMap.put(Datasource.class, "10");
idPrefixMap.put(Organization.class, "20");
idPrefixMap.put(Project.class, "40");
idPrefixMap.put(Dataset.class, "50");
idPrefixMap.put(OtherResearchProduct.class, "50");
idPrefixMap.put(Software.class, "50");
idPrefixMap.put(Publication.class, "50");
}
public static final Map<String, String> entityIdPrefix = Maps.newHashMap();
static {
@ -289,6 +301,10 @@ public class ModelSupport {
private ModelSupport() {
}
public static <E extends OafEntity> String getIdPrefix(Class<E> clazz) {
return idPrefixMap.get(clazz);
}
/**
* Checks subclass-superclass relationship.
*

View File

@ -10,6 +10,7 @@ public class Dataset extends Result implements Serializable {
private Field<String> storagedate;
// candidate for removal
private Field<String> device;
private Field<String> size;

View File

@ -10,8 +10,10 @@ public class Software extends Result implements Serializable {
private List<Field<String>> documentationUrl;
// candidate for removal
private List<StructuredProperty> license;
// candidate for removal
private Field<String> codeRepositoryUrl;
private Qualifier programmingLanguage;

View File

@ -12,16 +12,13 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -55,6 +52,9 @@ import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
import eu.dnetlib.dhp.broker.oa.util.ResultAggregator;
import eu.dnetlib.dhp.broker.oa.util.ResultGroup;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
@ -63,6 +63,7 @@ import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import scala.Tuple2;
public class GenerateEventsApplication {
@ -87,24 +88,31 @@ public class GenerateEventsApplication {
private static final UpdateMatcher<Pair<Result, List<Software>>, ?> enrichMoreSoftware = new EnrichMoreSoftware();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMisissingPublicationIsRelatedTo = new EnrichMissingPublicationIsRelatedTo();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsReferencedBy = new EnrichMissingPublicationIsReferencedBy();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsReferencedBy =
new EnrichMissingPublicationIsReferencedBy();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationReferences = new EnrichMissingPublicationReferences();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsSupplementedTo = new EnrichMissingPublicationIsSupplementedTo();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsSupplementedBy = new EnrichMissingPublicationIsSupplementedBy();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsSupplementedTo =
new EnrichMissingPublicationIsSupplementedTo();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsSupplementedBy =
new EnrichMissingPublicationIsSupplementedBy();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMisissingDatasetIsRelatedTo = new EnrichMissingDatasetIsRelatedTo();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsReferencedBy = new EnrichMissingDatasetIsReferencedBy();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetReferences = new EnrichMissingDatasetReferences();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsSupplementedTo = new EnrichMissingDatasetIsSupplementedTo();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsSupplementedBy = new EnrichMissingDatasetIsSupplementedBy();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMisissingDatasetIsRelatedTo =
new EnrichMissingDatasetIsRelatedTo();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsReferencedBy =
new EnrichMissingDatasetIsReferencedBy();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetReferences =
new EnrichMissingDatasetReferences();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsSupplementedTo =
new EnrichMissingDatasetIsSupplementedTo();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsSupplementedBy =
new EnrichMissingDatasetIsSupplementedBy();
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
GenerateEventsApplication.class
.toString(GenerateEventsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json")));
parser.parseArgument(args);
@ -123,20 +131,20 @@ public class GenerateEventsApplication {
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
removeOutputDir(spark, eventsPath);
final JavaRDD<Event> eventsRdd = sc.emptyRDD();
final Dataset<Event> all = spark.emptyDataset(Encoders.kryo(Event.class));
for (final Class<? extends Result> r1 : BrokerConstants.RESULT_CLASSES) {
eventsRdd.union(generateSimpleEvents(spark, graphPath, r1));
all.union(generateSimpleEvents(spark, graphPath, r1));
for (final Class<? extends Result> r2 : BrokerConstants.RESULT_CLASSES) {
eventsRdd.union(generateRelationEvents(spark, graphPath, r1, r2));
all.union(generateRelationEvents(spark, graphPath, r1, r2));
}
}
eventsRdd.saveAsTextFile(eventsPath, GzipCodec.class);
all.write().mode(SaveMode.Overwrite).json(eventsPath);
});
}
@ -145,63 +153,56 @@ public class GenerateEventsApplication {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
private static <R extends Result> JavaRDD<Event> generateSimpleEvents(final SparkSession spark,
private static <R extends Result> Dataset<Event> generateSimpleEvents(final SparkSession spark,
final String graphPath,
final Class<R> resultClazz) {
final Dataset<R> results = readPath(
spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), resultClazz)
final Dataset<Result> results = readPath(spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), Result.class)
.filter(r -> r.getDataInfo().getDeletedbyinference());
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
final Column c = null; // TODO
final Dataset<Row> aa = results
.joinWith(rels, results.col("id").equalTo(rels.col("source")), "inner")
.groupBy(rels.col("target"))
.agg(c)
.filter(x -> x.size() > 1)
// generateSimpleEvents(...)
// flatMap()
// toRdd()
;
return null;
final TypedColumn<Tuple2<Result, Relation>, ResultGroup> aggr = new ResultAggregator().toColumn();
return results.joinWith(rels, results.col("id").equalTo(rels.col("source")), "inner")
.groupByKey((MapFunction<Tuple2<Result, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
.agg(aggr)
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class))
.filter(ResultGroup::isValid)
.map((MapFunction<ResultGroup, EventGroup>) g -> GenerateEventsApplication.generateSimpleEvents(g), Encoders.kryo(EventGroup.class))
.flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class));
}
private List<Event> generateSimpleEvents(final Collection<Result> children) {
private static EventGroup generateSimpleEvents(final ResultGroup results) {
final List<UpdateInfo<?>> list = new ArrayList<>();
for (final Result target : children) {
list.addAll(enrichMissingAbstract.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingAuthorOrcid.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingOpenAccess.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingPid.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingPublicationDate.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingSubject.searchUpdatesForRecord(target, children));
list.addAll(enrichMoreOpenAccess.searchUpdatesForRecord(target, children));
list.addAll(enrichMorePid.searchUpdatesForRecord(target, children));
list.addAll(enrichMoreSubject.searchUpdatesForRecord(target, children));
for (final Result target : results.getData()) {
list.addAll(enrichMissingAbstract.searchUpdatesForRecord(target, results.getData()));
list.addAll(enrichMissingAuthorOrcid.searchUpdatesForRecord(target, results.getData()));
list.addAll(enrichMissingOpenAccess.searchUpdatesForRecord(target, results.getData()));
list.addAll(enrichMissingPid.searchUpdatesForRecord(target, results.getData()));
list.addAll(enrichMissingPublicationDate.searchUpdatesForRecord(target, results.getData()));
list.addAll(enrichMissingSubject.searchUpdatesForRecord(target, results.getData()));
list.addAll(enrichMoreOpenAccess.searchUpdatesForRecord(target, results.getData()));
list.addAll(enrichMorePid.searchUpdatesForRecord(target, results.getData()));
list.addAll(enrichMoreSubject.searchUpdatesForRecord(target, results.getData()));
}
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
final EventGroup events = new EventGroup();
list.stream().map(EventFactory::newBrokerEvent).forEach(events::addElement);
return events;
}
private static <SRC extends Result, TRG extends OafEntity> JavaRDD<Event> generateRelationEvents(
final SparkSession spark,
private static <SRC extends Result, TRG extends OafEntity> Dataset<Event> generateRelationEvents(final SparkSession spark,
final String graphPath,
final Class<SRC> sourceClass,
final Class<TRG> targetClass) {
final Dataset<SRC> sources = readPath(
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
final Dataset<SRC> sources = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
.filter(r -> r.getDataInfo().getDeletedbyinference());
final Dataset<TRG> targets = readPath(
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), targetClass);
final Dataset<TRG> targets = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), targetClass);
final Dataset<Relation> mergedRels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));

View File

@ -0,0 +1,32 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import eu.dnetlib.dhp.broker.model.Event;
public class EventGroup implements Serializable {
/**
*
*/
private static final long serialVersionUID = 765977943803533130L;
private final List<Event> data = new ArrayList<>();
public List<Event> getData() {
return data;
}
public EventGroup addElement(final Event elem) {
data.add(elem);
return this;
}
public EventGroup addGroup(final EventGroup group) {
data.addAll(group.getData());
return this;
}
}

View File

@ -0,0 +1,50 @@
package eu.dnetlib.dhp.broker.oa.util;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import scala.Tuple2;
public class ResultAggregator extends Aggregator<Tuple2<Result, Relation>, ResultGroup, ResultGroup> {
/**
*
*/
private static final long serialVersionUID = -1492327874705585538L;
@Override
public ResultGroup zero() {
return new ResultGroup();
}
@Override
public ResultGroup reduce(final ResultGroup group, final Tuple2<Result, Relation> t) {
return group.addElement(t._1);
}
@Override
public ResultGroup merge(final ResultGroup g1, final ResultGroup g2) {
return g1.addGroup(g2);
}
@Override
public ResultGroup finish(final ResultGroup group) {
return group;
}
@Override
public Encoder<ResultGroup> bufferEncoder() {
return Encoders.kryo(ResultGroup.class);
}
@Override
public Encoder<ResultGroup> outputEncoder() {
return Encoders.kryo(ResultGroup.class);
}
}

View File

@ -0,0 +1,35 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import eu.dnetlib.dhp.schema.oaf.Result;
public class ResultGroup implements Serializable {
/**
*
*/
private static final long serialVersionUID = -3360828477088669296L;
private final List<Result> data = new ArrayList<>();
public List<Result> getData() {
return data;
}
public ResultGroup addElement(final Result elem) {
data.add(elem);
return this;
}
public ResultGroup addGroup(final ResultGroup group) {
data.addAll(group.getData());
return this;
}
public boolean isValid() {
return data.size() > 1;
}
}

View File

@ -319,15 +319,7 @@ object DoiBoostMappingUtil {
def generateIdentifier (oaf: Result, doi: String): String = {
val id = DHPUtils.md5 (doi.toLowerCase)
if (oaf.isInstanceOf[Dataset] )
return s"60|${
doiBoostNSPREFIX
}${
SEPARATOR
}${
id
}"
s"50|${
return s"50|${
doiBoostNSPREFIX
}${
SEPARATOR

View File

@ -179,6 +179,9 @@ case object Crossref2Oaf {
if (StringUtils.isNotBlank(issuedDate)) {
instance.setDateofacceptance(asField(issuedDate))
}
else {
instance.setDateofacceptance(asField(createdDate.getValue))
}
val s: String = (json \ "URL").extract[String]
val links: List[String] = ((for {JString(url) <- json \ "link" \ "URL"} yield url) ::: List(s)).filter(p => p != null).distinct
if (links.nonEmpty)

View File

@ -129,16 +129,16 @@ case object ConversionUtil {
val fieldOfStudy = item._2
if (fieldOfStudy != null && fieldOfStudy.subjects != null && fieldOfStudy.subjects.nonEmpty) {
val p: List[StructuredProperty] = fieldOfStudy.subjects.flatMap(s => {
val s1 = createSP(s.DisplayName, "keywords", "dnet:subject_classification_typologies")
val s1 = createSP(s.DisplayName, "keyword", "dnet:subject_classification_typologies")
val di = DoiBoostMappingUtil.generateDataInfo(s.Score.toString)
var resList: List[StructuredProperty] = List(s1)
if (s.MainType.isDefined) {
val maintp = s.MainType.get
val s2 = createSP(s.MainType.get, "keywords", "dnet:subject_classification_typologies")
val s2 = createSP(s.MainType.get, "keyword", "dnet:subject_classification_typologies")
s2.setDataInfo(di)
resList = resList ::: List(s2)
if (maintp.contains(".")) {
val s3 = createSP(maintp.split("\\.").head, "keywords", "dnet:subject_classification_typologies")
val s3 = createSP(maintp.split("\\.").head, "keyword", "dnet:subject_classification_typologies")
s3.setDataInfo(di)
resList = resList ::: List(s3)
}
@ -190,7 +190,7 @@ case object ConversionUtil {
pub.setPid(List(createSP(paper.Doi.toLowerCase, "doi", PID_TYPES)).asJava)
pub.setOriginalId(List(paper.PaperId.toString, paper.Doi.toLowerCase).asJava)
//Set identifier as {50|60} | doiboost____::md5(DOI)
//Set identifier as 50|doiboost____::md5(DOI)
pub.setId(generateIdentifier(pub, paper.Doi.toLowerCase))
val mainTitles = createSP(paper.PaperTitle, "main title", "dnet:dataCite_title")
@ -229,6 +229,8 @@ case object ConversionUtil {
pub.setPublisher(asField(journal.Publisher.get))
if (journal.Issn.isDefined)
j.setIssnPrinted(journal.Issn.get)
j.setVol(paper.Volume)
j.setIss(paper.Issue)
pub.setJournal(j)
}
pub.setCollectedfrom(List(createMAGCollectedFrom()).asJava)
@ -247,7 +249,7 @@ case object ConversionUtil {
pub.setPid(List(createSP(paper.Doi.toLowerCase, "doi", PID_TYPES)).asJava)
pub.setOriginalId(List(paper.PaperId.toString, paper.Doi.toLowerCase).asJava)
//Set identifier as {50|60} | doiboost____::md5(DOI)
//Set identifier as 50 | doiboost____::md5(DOI)
pub.setId(generateIdentifier(pub, paper.Doi.toLowerCase))
val mainTitles = createSP(paper.PaperTitle, "main title", "dnet:dataCite_title")

View File

@ -0,0 +1,60 @@
Crossref Field,Type,Required,Description (from Crossref),OAF field,Comments
publisher,String,Yes,Name of work's publisher,Result/Publisher,
title,Array of String,Yes,"Work titles, including translated titles","Result/Title with Qualifier(""main title"", ""dnet:dataCite_title"")",
original-title,Array of String,No,Work titles in the work's original publication language,"Result/Title with Qualifier(""alternative title"", ""dnet:dataCite_title"")",
short-title,Array of String,No,Short or abbreviated work titles,"Result/Title with Qualifier(""alternative title"", ""dnet:dataCite_title"")",
abstract,XML String,No,Abstract as a JSON string or a JATS XML snippet encoded into a JSON string,Result/description,
reference-count,Number,Yes,Deprecated Same as references-count,"- ",
references-count,Number,Yes,Count of outbound references deposited with Crossref,N/A,
is-referenced-by-count,Number,Yes,Count of inbound references deposited with Crossref,N/A,
source,String,Yes,Currently always Crossref,Result/source,
prefix,String,Yes,DOI prefix identifier of the form http://id.crossref.org/prefix/DOI_PREFIX,N/A,
DOI,String,Yes,DOI of the work,OafEntity/originalId,
,,,,OafEntity/PID,
,,,,"Oaf/id ",Use to generate the OpenAIRE id in the form 50|doiboost____::md5(DOI)
URL,URL,Yes,URL form of the work's DOI,Instance/url,
member,String,Yes,Member identifier of the form http://id.crossref.org/member/MEMBER_ID,N/A,
type,String,Yes,"Enumeration, one of the type ids from https://api.crossref.org/v1/types",Instance/instancetype,Also use to map the record as OAF Publication or Dataset according to the mapping defined in eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala
created,Date,Yes,Date on which the DOI was first registered,"Result/relevantDate with Qualifier(""created"", ""dnet:dataCite_date"")",
,,,,"Result/dateofacceptance
Instance/dateofacceptance",If crossref.issued is blank
deposited,Date,Yes,Date on which the work metadata was most recently updated,N/A,
indexed,Date,Yes,"Date on which the work metadata was most recently indexed. Re-indexing does not imply a metadata change, see deposited for the most recent metadata change date",Result/lastupdatetimestamp,
issued,Partial Date,Yes,Earliest of published-print and published-online,Result/dateofacceptance,OAF dateofacceptance is used also for the publishing date. It's the date visualised in the OpenAIRE EXPLORE portal.
,,,,Instance/dateofacceptance,
posted,Partial Date,No,Date on which posted content was made available online,"Result/relevantDate with Qualifier(""available"", ""dnet:dataCite_date"")",
accepted,Partial Date,No,"Date on which a work was accepted, after being submitted, during a submission process","Result/relevantDate with Qualifier(""accepted"", ""dnet:dataCite_date"")",
subtitle,Array of String,No,"Work subtitles, including original language and translated","Result/Title with Qualifier(""subtitle"", ""dnet:dataCite_title"")",
container-title,Array of String,No,Full titles of the containing work (usually a book or journal),Publication/Journal/name only in case of Journal title for book title see ISBN Mapping,
short-container-title,Array of String,No,Abbreviated titles of the containing work,N/A,
group-title,String,No,Group title for posted content,N/A,
issue,String,No,Issue number of an article's journal,Publication/Journal/iss,
volume,String,No,Volume number of an article's journal,Publication/Journal/vol,
page,String,No,Pages numbers of an article within its journal,"Publication/Journal/sp
Publication/Journal/ep",Obtain start and end page by splitting by '-'
article-number,String,No,,N/A,
published-print,Partial Date,No,Date on which the work was published in print,"Result/relevantDate with Qualifier(""published-print"", ""dnet:dataCite_date"")",
published-online,Partial Date,No,Date on which the work was published online,"Result/relevantDate with Qualifier(""published-online"", ""dnet:dataCite_date"")",
subject,Array of String,No,"Subject category names, a controlled vocabulary from Sci-Val. Available for most journal articles","Result/subject with Qualifier(""keywords"", ""dnet:subject_classification_typologies""). ","Future improvements: map the controlled vocabulary instead of using the generic ""keywords"" qualifier"
ISSN,Array of String,No,,"Publication/Journal/issn
Publication/Journal/lissn
Publication/Journal/eissn",The mapping depends on the value of issn-type
issn-type,Array of ISSN with Type,No,List of ISSNs with ISSN type information,N/A,Its value guides the setting of the properties in Journal (see row above)
ISBN,Array of String,No,,Publication/source,"In case of Book We can map ISBN and container title on Publication/source using this syntax container-title + ""ISBN: "" + ISBN"
archive,Array of String,No,,N/A,
license,Array of License,No,,Result/Instance/License,
funder,Array of Funder,No,,Relation,Whenever we are able to link to a funder or project integrated into OpenAIRE. Mapping to OpenAIRE funders and projects is in eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala.generateSimpleRelationFromAward
assertion,Array of Assertion,No,,N/A,
author,Array of Contributor,No,,Result/author (with orcid if available),
editor,Array of Contributor,No,,N/A,
chair,Array of Contributor,No,,N/A,
translator,Array of Contributor,No,,N/A,
update-to,Array of Update,No,,N/A,
update-policy,URL,No,Link to an update policy covering Crossmark updates for this work,N/A,
link,Array of Resource Link,No,URLs to full-text locations,Result/Instance/url,
clinical-trial-number,Array of Clinical Trial Number,No,,OafEntity/originalId,
alternative-id,String,No,Other identifiers for the work provided by the depositing member,OafEntity/originalId,
reference,Array of Reference,No,List of references made by the work,,Future improvement: map to references
content-domain,Content Domain,No,Information on domains that support Crossmark for this work,N/A,
relation,Relations,No,Relations to other works,Result/Instance/refereed,"if(relation.has-review) instance.refereed = ""peerReviewed"". "
review,Review,No,Peer review metadata,N/A,
1 Crossref Field Type Required Description (from Crossref) OAF field Comments
2 publisher String Yes Name of work's publisher Result/Publisher
3 title Array of String Yes Work titles, including translated titles Result/Title with Qualifier("main title", "dnet:dataCite_title")
4 original-title Array of String No Work titles in the work's original publication language Result/Title with Qualifier("alternative title", "dnet:dataCite_title")
5 short-title Array of String No Short or abbreviated work titles Result/Title with Qualifier("alternative title", "dnet:dataCite_title")
6 abstract XML String No Abstract as a JSON string or a JATS XML snippet encoded into a JSON string Result/description
7 reference-count Number Yes Deprecated Same as references-count -
8 references-count Number Yes Count of outbound references deposited with Crossref N/A
9 is-referenced-by-count Number Yes Count of inbound references deposited with Crossref N/A
10 source String Yes Currently always Crossref Result/source
11 prefix String Yes DOI prefix identifier of the form http://id.crossref.org/prefix/DOI_PREFIX N/A
12 DOI String Yes DOI of the work OafEntity/originalId
13 OafEntity/PID
14 Oaf/id Use to generate the OpenAIRE id in the form 50|doiboost____::md5(DOI)
15 URL URL Yes URL form of the work's DOI Instance/url
16 member String Yes Member identifier of the form http://id.crossref.org/member/MEMBER_ID N/A
17 type String Yes Enumeration, one of the type ids from https://api.crossref.org/v1/types Instance/instancetype Also use to map the record as OAF Publication or Dataset according to the mapping defined in eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala
18 created Date Yes Date on which the DOI was first registered Result/relevantDate with Qualifier("created", "dnet:dataCite_date")
19 Result/dateofacceptance Instance/dateofacceptance If crossref.issued is blank
20 deposited Date Yes Date on which the work metadata was most recently updated N/A
21 indexed Date Yes Date on which the work metadata was most recently indexed. Re-indexing does not imply a metadata change, see deposited for the most recent metadata change date Result/lastupdatetimestamp
22 issued Partial Date Yes Earliest of published-print and published-online Result/dateofacceptance OAF dateofacceptance is used also for the publishing date. It's the date visualised in the OpenAIRE EXPLORE portal.
23 Instance/dateofacceptance
24 posted Partial Date No Date on which posted content was made available online Result/relevantDate with Qualifier("available", "dnet:dataCite_date")
25 accepted Partial Date No Date on which a work was accepted, after being submitted, during a submission process Result/relevantDate with Qualifier("accepted", "dnet:dataCite_date")
26 subtitle Array of String No Work subtitles, including original language and translated Result/Title with Qualifier("subtitle", "dnet:dataCite_title")
27 container-title Array of String No Full titles of the containing work (usually a book or journal) Publication/Journal/name only in case of Journal title for book title see ISBN Mapping
28 short-container-title Array of String No Abbreviated titles of the containing work N/A
29 group-title String No Group title for posted content N/A
30 issue String No Issue number of an article's journal Publication/Journal/iss
31 volume String No Volume number of an article's journal Publication/Journal/vol
32 page String No Pages numbers of an article within its journal Publication/Journal/sp Publication/Journal/ep Obtain start and end page by splitting by '-'
33 article-number String No N/A
34 published-print Partial Date No Date on which the work was published in print Result/relevantDate with Qualifier("published-print", "dnet:dataCite_date")
35 published-online Partial Date No Date on which the work was published online Result/relevantDate with Qualifier("published-online", "dnet:dataCite_date")
36 subject Array of String No Subject category names, a controlled vocabulary from Sci-Val. Available for most journal articles Result/subject with Qualifier("keywords", "dnet:subject_classification_typologies"). Future improvements: map the controlled vocabulary instead of using the generic "keywords" qualifier
37 ISSN Array of String No Publication/Journal/issn Publication/Journal/lissn Publication/Journal/eissn The mapping depends on the value of issn-type
38 issn-type Array of ISSN with Type No List of ISSNs with ISSN type information N/A Its value guides the setting of the properties in Journal (see row above)
39 ISBN Array of String No Publication/source In case of Book We can map ISBN and container title on Publication/source using this syntax container-title + "ISBN: " + ISBN
40 archive Array of String No N/A
41 license Array of License No Result/Instance/License
42 funder Array of Funder No Relation Whenever we are able to link to a funder or project integrated into OpenAIRE. Mapping to OpenAIRE funders and projects is in eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala.generateSimpleRelationFromAward
43 assertion Array of Assertion No N/A
44 author Array of Contributor No Result/author (with orcid if available)
45 editor Array of Contributor No N/A
46 chair Array of Contributor No N/A
47 translator Array of Contributor No N/A
48 update-to Array of Update No N/A
49 update-policy URL No Link to an update policy covering Crossmark updates for this work N/A
50 link Array of Resource Link No URLs to full-text locations Result/Instance/url
51 clinical-trial-number Array of Clinical Trial Number No OafEntity/originalId
52 alternative-id String No Other identifiers for the work provided by the depositing member OafEntity/originalId
53 reference Array of Reference No List of references made by the work Future improvement: map to references
54 content-domain Content Domain No Information on domains that support Crossmark for this work N/A
55 relation Relations No Relations to other works Result/Instance/refereed if(relation.has-review) instance.refereed = "peerReviewed".
56 review Review No Peer review metadata N/A

View File

@ -6,23 +6,23 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity;
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.model.Tuple2;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.oa.provision.model.*;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.Seq;
/**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The
@ -76,46 +76,31 @@ public class AdjacencyListBuilderJob {
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
conf.registerKryoClasses(ProvisionModelSupport.getModelClasses());
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
createAdjacencyLists(spark, inputPath, outputPath);
createAdjacencyListsKryo(spark, inputPath, outputPath);
});
}
private static void createAdjacencyLists(
private static void createAdjacencyListsKryo(
SparkSession spark, String inputPath, String outputPath) {
log.info("Reading joined entities from: {}", inputPath);
spark
.read()
.load(inputPath)
.as(Encoders.bean(EntityRelEntity.class))
.groupByKey(
(MapFunction<EntityRelEntity, String>) value -> value.getEntity().getId(),
Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, EntityRelEntity, JoinedEntity>) (key, values) -> {
JoinedEntity j = new JoinedEntity();
List<Tuple2> links = new ArrayList<>();
while (values.hasNext() && links.size() < MAX_LINKS) {
EntityRelEntity curr = values.next();
if (j.getEntity() == null) {
j.setEntity(curr.getEntity());
final List<String> paths = HdfsSupport
.listFiles(inputPath, spark.sparkContext().hadoopConfiguration());
log.info("Found paths: {}", String.join(",", paths));
}
links.add(new Tuple2(curr.getRelation(), curr.getTarget()));
}
j.setLinks(links);
return j;
},
Encoders.bean(JoinedEntity.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
private static Seq<String> toSeq(List<String> list) {
return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq();
}
private static void removeOutputDir(SparkSession spark, String path) {

View File

@ -2,7 +2,6 @@
package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*;
import java.util.List;
import java.util.Objects;
@ -23,8 +22,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
@ -91,7 +91,7 @@ public class CreateRelatedEntitiesJob_phase1 {
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
conf.registerKryoClasses(ProvisionModelSupport.getModelClasses());
runWithSparkSession(
conf,
@ -120,7 +120,7 @@ public class CreateRelatedEntitiesJob_phase1 {
.filter("dataInfo.invisible == false")
.map(
(MapFunction<E, RelatedEntity>) value -> asRelatedEntity(value, clazz),
Encoders.bean(RelatedEntity.class))
Encoders.kryo(RelatedEntity.class))
.map(
(MapFunction<RelatedEntity, Tuple2<String, RelatedEntity>>) e -> new Tuple2<>(e.getId(), e),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class)))
@ -129,12 +129,12 @@ public class CreateRelatedEntitiesJob_phase1 {
relsByTarget
.joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner")
.map(
(MapFunction<Tuple2<Tuple2<String, SortableRelation>, Tuple2<String, RelatedEntity>>, EntityRelEntity>) t -> new EntityRelEntity(
(MapFunction<Tuple2<Tuple2<String, SortableRelation>, Tuple2<String, RelatedEntity>>, RelatedEntityWrapper>) t -> new RelatedEntityWrapper(
t._1()._2(), t._2()._2()),
Encoders.bean(EntityRelEntity.class))
Encoders.kryo(RelatedEntityWrapper.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath + "/" + EntityType.fromClass(clazz));
.parquet(outputPath);
}
private static <E extends OafEntity> Dataset<E> readPathEntity(

View File

@ -4,27 +4,32 @@ package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity;
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import eu.dnetlib.dhp.oa.provision.model.TypedRow;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
@ -59,6 +64,12 @@ public class CreateRelatedEntitiesJob_phase2 {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final int MAX_EXTERNAL_ENTITIES = 50;
private static final int MAX_AUTHORS = 200;
private static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
private static final int MAX_TITLE_LENGTH = 5000;
private static final int MAX_ABSTRACT_LENGTH = 100000;
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
@ -78,8 +89,8 @@ public class CreateRelatedEntitiesJob_phase2 {
String inputRelatedEntitiesPath = parser.get("inputRelatedEntitiesPath");
log.info("inputRelatedEntitiesPath: {}", inputRelatedEntitiesPath);
String inputGraphRootPath = parser.get("inputGraphRootPath");
log.info("inputGraphRootPath: {}", inputGraphRootPath);
String inputEntityPath = parser.get("inputEntityPath");
log.info("inputEntityPath: {}", inputEntityPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
@ -87,80 +98,112 @@ public class CreateRelatedEntitiesJob_phase2 {
int numPartitions = Integer.parseInt(parser.get("numPartitions"));
log.info("numPartitions: {}", numPartitions);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
conf.registerKryoClasses(ProvisionModelSupport.getModelClasses());
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
joinAllEntities(
spark, inputRelatedEntitiesPath, inputGraphRootPath, outputPath, numPartitions);
joinEntityWithRelatedEntities(
spark, inputRelatedEntitiesPath, inputEntityPath, outputPath, numPartitions, entityClazz);
});
}
private static void joinAllEntities(
private static <E extends OafEntity> void joinEntityWithRelatedEntities(
SparkSession spark,
String inputRelatedEntitiesPath,
String inputGraphRootPath,
String relatedEntitiesPath,
String entityPath,
String outputPath,
int numPartitions) {
int numPartitions,
Class<E> entityClazz) {
Dataset<Tuple2<String, TypedRow>> entities = readAllEntities(spark, inputGraphRootPath, numPartitions);
Dataset<Tuple2<String, EntityRelEntity>> relsBySource = readRelatedEntities(spark, inputRelatedEntitiesPath);
Dataset<Tuple2<String, E>> entities = readPathEntity(spark, entityPath, entityClazz);
Dataset<Tuple2<String, RelatedEntityWrapper>> relatedEntities = readRelatedEntities(
spark, relatedEntitiesPath, entityClazz);
TypedColumn<JoinedEntity, JoinedEntity> aggregator = new AdjacencyListAggregator().toColumn();
entities
.joinWith(relsBySource, entities.col("_1").equalTo(relsBySource.col("_1")), "left_outer")
.joinWith(relatedEntities, entities.col("_1").equalTo(relatedEntities.col("_1")), "left_outer")
.map((MapFunction<Tuple2<Tuple2<String, E>, Tuple2<String, RelatedEntityWrapper>>, JoinedEntity>) value -> {
JoinedEntity je = new JoinedEntity(value._1()._2());
Optional
.ofNullable(value._2())
.map(Tuple2::_2)
.ifPresent(r -> je.getLinks().add(r));
return je;
}, Encoders.kryo(JoinedEntity.class))
.filter(filterEmptyEntityFn())
.groupByKey(
(MapFunction<JoinedEntity, String>) value -> value.getEntity().getId(),
Encoders.STRING())
.agg(aggregator)
.map(
(MapFunction<Tuple2<Tuple2<String, TypedRow>, Tuple2<String, EntityRelEntity>>, EntityRelEntity>) value -> {
EntityRelEntity re = new EntityRelEntity();
re.setEntity(value._1()._2());
Optional<EntityRelEntity> related = Optional.ofNullable(value._2()).map(Tuple2::_2);
if (related.isPresent()) {
re.setRelation(related.get().getRelation());
re.setTarget(related.get().getTarget());
}
return re;
},
Encoders.bean(EntityRelEntity.class))
.repartition(numPartitions)
.filter(
(FilterFunction<EntityRelEntity>) value -> value.getEntity() != null
&& StringUtils.isNotBlank(value.getEntity().getId()))
(MapFunction<Tuple2<String, JoinedEntity>, JoinedEntity>) value -> value._2(),
Encoders.kryo(JoinedEntity.class))
.filter(filterEmptyEntityFn())
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
private static Dataset<Tuple2<String, TypedRow>> readAllEntities(
SparkSession spark, String inputGraphPath, int numPartitions) {
Dataset<TypedRow> publication = readPathEntity(spark, inputGraphPath + "/publication", Publication.class);
Dataset<TypedRow> dataset = readPathEntity(
spark, inputGraphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
Dataset<TypedRow> other = readPathEntity(
spark, inputGraphPath + "/otherresearchproduct", OtherResearchProduct.class);
Dataset<TypedRow> software = readPathEntity(spark, inputGraphPath + "/software", Software.class);
Dataset<TypedRow> datasource = readPathEntity(spark, inputGraphPath + "/datasource", Datasource.class);
Dataset<TypedRow> organization = readPathEntity(spark, inputGraphPath + "/organization", Organization.class);
Dataset<TypedRow> project = readPathEntity(spark, inputGraphPath + "/project", Project.class);
public static class AdjacencyListAggregator extends Aggregator<JoinedEntity, JoinedEntity, JoinedEntity> {
return publication
.union(dataset)
.union(other)
.union(software)
.union(datasource)
.union(organization)
.union(project)
.map(
(MapFunction<TypedRow, Tuple2<String, TypedRow>>) value -> new Tuple2<>(value.getId(), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class)))
.repartition(numPartitions);
@Override
public JoinedEntity zero() {
return new JoinedEntity();
}
private static Dataset<Tuple2<String, EntityRelEntity>> readRelatedEntities(
SparkSession spark, String inputRelatedEntitiesPath) {
@Override
public JoinedEntity reduce(JoinedEntity b, JoinedEntity a) {
return mergeAndGet(b, a);
}
private JoinedEntity mergeAndGet(JoinedEntity b, JoinedEntity a) {
b
.setEntity(
Optional
.ofNullable(a.getEntity())
.orElse(
Optional
.ofNullable(b.getEntity())
.orElse(null)));
b.getLinks().addAll(a.getLinks());
return b;
}
@Override
public JoinedEntity merge(JoinedEntity b, JoinedEntity a) {
return mergeAndGet(b, a);
}
@Override
public JoinedEntity finish(JoinedEntity j) {
return j;
}
@Override
public Encoder<JoinedEntity> bufferEncoder() {
return Encoders.kryo(JoinedEntity.class);
}
@Override
public Encoder<JoinedEntity> outputEncoder() {
return Encoders.kryo(JoinedEntity.class);
}
}
private static <E extends OafEntity> Dataset<Tuple2<String, RelatedEntityWrapper>> readRelatedEntities(
SparkSession spark, String inputRelatedEntitiesPath, Class<E> entityClazz) {
log.info("Reading related entities from: {}", inputRelatedEntitiesPath);
@ -169,17 +212,20 @@ public class CreateRelatedEntitiesJob_phase2 {
log.info("Found paths: {}", String.join(",", paths));
final String idPrefix = ModelSupport.getIdPrefix(entityClazz);
return spark
.read()
.load(toSeq(paths))
.as(Encoders.bean(EntityRelEntity.class))
.as(Encoders.kryo(RelatedEntityWrapper.class))
.filter((FilterFunction<RelatedEntityWrapper>) e -> e.getRelation().getSource().startsWith(idPrefix))
.map(
(MapFunction<EntityRelEntity, Tuple2<String, EntityRelEntity>>) value -> new Tuple2<>(
(MapFunction<RelatedEntityWrapper, Tuple2<String, RelatedEntityWrapper>>) value -> new Tuple2<>(
value.getRelation().getSource(), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class)));
Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntityWrapper.class)));
}
private static <E extends OafEntity> Dataset<TypedRow> readPathEntity(
private static <E extends OafEntity> Dataset<Tuple2<String, E>> readPathEntity(
SparkSession spark, String inputEntityPath, Class<E> entityClazz) {
log.info("Reading Graph table from: {}", inputEntityPath);
@ -190,10 +236,79 @@ public class CreateRelatedEntitiesJob_phase2 {
(MapFunction<String, E>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
Encoders.bean(entityClazz))
.filter("dataInfo.invisible == false")
.map((MapFunction<E, E>) e -> pruneOutliers(entityClazz, e), Encoders.bean(entityClazz))
.map(
(MapFunction<E, TypedRow>) value -> getTypedRow(
StringUtils.substringAfterLast(inputEntityPath, "/"), value),
Encoders.bean(TypedRow.class));
(MapFunction<E, Tuple2<String, E>>) e -> new Tuple2<>(e.getId(), e),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(entityClazz)));
}
private static <E extends OafEntity> E pruneOutliers(Class<E> entityClazz, E e) {
if (ModelSupport.isSubClass(entityClazz, Result.class)) {
Result r = (Result) e;
if (r.getExternalReference() != null) {
List<ExternalReference> refs = r
.getExternalReference()
.stream()
.limit(MAX_EXTERNAL_ENTITIES)
.collect(Collectors.toList());
r.setExternalReference(refs);
}
if (r.getAuthor() != null) {
List<Author> authors = Lists.newArrayList();
for (Author a : r.getAuthor()) {
a.setFullname(StringUtils.left(a.getFullname(), MAX_AUTHOR_FULLNAME_LENGTH));
if (authors.size() < MAX_AUTHORS || hasORCID(a)) {
authors.add(a);
}
}
r.setAuthor(authors);
}
if (r.getDescription() != null) {
List<Field<String>> desc = r
.getDescription()
.stream()
.filter(Objects::nonNull)
.map(d -> {
d.setValue(StringUtils.left(d.getValue(), MAX_ABSTRACT_LENGTH));
return d;
})
.collect(Collectors.toList());
r.setDescription(desc);
}
if (r.getTitle() != null) {
List<StructuredProperty> titles = r
.getTitle()
.stream()
.filter(Objects::nonNull)
.map(t -> {
t.setValue(StringUtils.left(t.getValue(), MAX_TITLE_LENGTH));
return t;
})
.collect(Collectors.toList());
r.setTitle(titles);
}
}
return e;
}
private static boolean hasORCID(Author a) {
return a.getPid() != null && a
.getPid()
.stream()
.filter(Objects::nonNull)
.map(StructuredProperty::getQualifier)
.filter(Objects::nonNull)
.map(Qualifier::getClassid)
.filter(StringUtils::isNotBlank)
.anyMatch(c -> "orcid".equals(c.toLowerCase()));
}
private static FilterFunction<JoinedEntity> filterEmptyEntityFn() {
return (FilterFunction<JoinedEntity>) v -> Objects.nonNull(v.getEntity());
/*
* return (FilterFunction<JoinedEntity>) v -> Optional .ofNullable(v.getEntity()) .map(e ->
* StringUtils.isNotBlank(e.getId())) .orElse(false);
*/
}
private static TypedRow getTypedRow(String type, OafEntity entity)

View File

@ -3,9 +3,8 @@ package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
@ -20,6 +19,7 @@ import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.clearspring.analytics.util.Lists;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
@ -27,9 +27,11 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
import scala.Function1;
import scala.Tuple2;
/**
@ -111,37 +113,10 @@ public class PrepareRelationsJob {
spark -> {
removeOutputDir(spark, outputPath);
prepareRelationsRDD(
spark, inputRelationsPath, outputPath, relationFilter, relPartitions, maxRelations);
spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions);
});
}
/**
* Dataset based implementation that prepares the graph relations by limiting the number of outgoing links and
* filtering the relation types according to the given criteria.
*
* @param spark the spark session
* @param inputRelationsPath source path for the graph relations
* @param outputPath output path for the processed relations
* @param relationFilter set of relation filters applied to the `relClass` field
* @param maxRelations maximum number of allowed outgoing edges
*/
private static void prepareRelations(
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter,
int maxRelations) {
readPathRelation(spark, inputRelationsPath)
.filter("dataInfo.deletedbyinference == false")
.filter((FilterFunction<SortableRelation>) rel -> !relationFilter.contains(rel.getRelClass()))
.groupByKey(
(MapFunction<SortableRelation, String>) value -> value.getSource(), Encoders.STRING())
.flatMapGroups(
(FlatMapGroupsFunction<String, SortableRelation, SortableRelation>) (key, values) -> Iterators
.limit(values, maxRelations),
Encoders.bean(SortableRelation.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
/**
* RDD based implementation that prepares the graph relations by limiting the number of outgoing links and filtering
* the relation types according to the given criteria. Moreover, outgoing links kept within the given limit are
@ -152,50 +127,41 @@ public class PrepareRelationsJob {
* @param outputPath output path for the processed relations
* @param relationFilter set of relation filters applied to the `relClass` field
* @param maxRelations maximum number of allowed outgoing edges
* @param relPartitions number of partitions for the output RDD
*/
// TODO work in progress
private static void prepareRelationsRDD(
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int relPartitions,
int maxRelations) {
JavaRDD<SortableRelation> rels = readPathRelationRDD(spark, inputRelationsPath).repartition(relPartitions);
RelationPartitioner partitioner = new RelationPartitioner(rels.getNumPartitions());
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int maxRelations,
int relPartitions) {
// only consider those that are not virtually deleted
RDD<SortableRelation> d = rels
RDD<SortableRelation> cappedRels = readPathRelationRDD(spark, inputRelationsPath)
.repartition(relPartitions)
.filter(rel -> !rel.getDataInfo().getDeletedbyinference())
.filter(rel -> !relationFilter.contains(rel.getRelClass()))
.mapToPair(
(PairFunction<SortableRelation, SortableRelation, SortableRelation>) rel -> new Tuple2<>(rel, rel))
.groupByKey(partitioner)
.map(group -> Iterables.limit(group._2(), maxRelations))
.flatMap(group -> group.iterator())
// group by SOURCE and apply limit
.mapToPair(rel -> new Tuple2<>(rel.getSource(), rel))
.groupByKey(new RelationPartitioner(relPartitions))
.flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator())
// group by TARGET and apply limit
.mapToPair(rel -> new Tuple2<>(rel.getTarget(), rel))
.groupByKey(new RelationPartitioner(relPartitions))
.flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator())
.rdd();
spark
.createDataset(d, Encoders.bean(SortableRelation.class))
.createDataset(cappedRels, Encoders.bean(SortableRelation.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
/**
* Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text
* Reads a JavaRDD of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text
* file,
*
* @param spark
* @param inputPath
* @return the Dataset<SortableRelation> containing all the relationships
* @return the JavaRDD<SortableRelation> containing all the relationships
*/
private static Dataset<SortableRelation> readPathRelation(
SparkSession spark, final String inputPath) {
return spark
.read()
.textFile(inputPath)
.map(
(MapFunction<String, SortableRelation>) value -> OBJECT_MAPPER.readValue(value, SortableRelation.class),
Encoders.bean(SortableRelation.class));
}
private static JavaRDD<SortableRelation> readPathRelationRDD(
SparkSession spark, final String inputPath) {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());

View File

@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@ -32,6 +33,8 @@ import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.Seq;
/**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The
@ -89,6 +92,8 @@ public class XmlConverterJob {
log.info("otherDsTypeId: {}", otherDsTypeId);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ProvisionModelSupport.getModelClasses());
runWithSparkSession(
conf,
@ -114,26 +119,18 @@ public class XmlConverterJob {
schemaLocation,
otherDsTypeId);
final List<String> paths = HdfsSupport
.listFiles(inputPath, spark.sparkContext().hadoopConfiguration());
log.info("Found paths: {}", String.join(",", paths));
spark
.read()
.load(inputPath)
.as(Encoders.bean(JoinedEntity.class))
.load(toSeq(paths))
.as(Encoders.kryo(JoinedEntity.class))
.map(
(MapFunction<JoinedEntity, JoinedEntity>) j -> {
if (j.getLinks() != null) {
j
.setLinks(
j
.getLinks()
.stream()
.filter(t -> t.getRelation() != null & t.getRelatedEntity() != null)
.collect(Collectors.toCollection(ArrayList::new)));
}
return j;
},
Encoders.bean(JoinedEntity.class))
.map(
(MapFunction<JoinedEntity, Tuple2<String, String>>) je -> new Tuple2<>(je.getEntity().getId(),
(MapFunction<JoinedEntity, Tuple2<String, String>>) je -> new Tuple2<>(
je.getEntity().getId(),
recordFactory.build(je)),
Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
.javaRDD()
@ -148,6 +145,10 @@ public class XmlConverterJob {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
private static Seq<String> toSeq(List<String> list) {
return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq();
}
private static Map<String, LongAccumulator> prepareAccumulators(SparkContext sc) {
Map<String, LongAccumulator> accumulators = Maps.newHashMap();
accumulators

View File

@ -2,30 +2,40 @@
package eu.dnetlib.dhp.oa.provision.model;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
public class JoinedEntity implements Serializable {
import eu.dnetlib.dhp.schema.oaf.OafEntity;
private TypedRow entity;
public class JoinedEntity<E extends OafEntity> implements Serializable {
private List<Tuple2> links;
private E entity;
private List<RelatedEntityWrapper> links;
public JoinedEntity() {
links = new LinkedList<>();
}
public TypedRow getEntity() {
return entity;
}
public void setEntity(TypedRow entity) {
public JoinedEntity(E entity) {
this();
this.entity = entity;
}
public List<Tuple2> getLinks() {
public E getEntity() {
return entity;
}
public void setEntity(E entity) {
this.entity = entity;
}
public List<RelatedEntityWrapper> getLinks() {
return links;
}
public void setLinks(List<Tuple2> links) {
public void setLinks(List<RelatedEntityWrapper> links) {
this.links = links;
}
}

View File

@ -0,0 +1,25 @@
package eu.dnetlib.dhp.oa.provision.model;
import java.util.List;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.schema.common.ModelSupport;
public class ProvisionModelSupport {
public static Class[] getModelClasses() {
List<Class<?>> modelClasses = Lists.newArrayList(ModelSupport.getOafModelClasses());
modelClasses
.addAll(
Lists
.newArrayList(
TypedRow.class,
RelatedEntityWrapper.class,
JoinedEntity.class,
RelatedEntity.class,
SortableRelation.class));
return modelClasses.toArray(new Class[] {});
}
}

View File

@ -5,33 +5,23 @@ import java.io.Serializable;
import com.google.common.base.Objects;
public class EntityRelEntity implements Serializable {
public class RelatedEntityWrapper implements Serializable {
private TypedRow entity;
private SortableRelation relation;
private RelatedEntity target;
public EntityRelEntity() {
public RelatedEntityWrapper() {
}
public EntityRelEntity(SortableRelation relation, RelatedEntity target) {
public RelatedEntityWrapper(SortableRelation relation, RelatedEntity target) {
this(null, relation, target);
}
public EntityRelEntity(TypedRow entity, SortableRelation relation, RelatedEntity target) {
this.entity = entity;
public RelatedEntityWrapper(TypedRow entity, SortableRelation relation, RelatedEntity target) {
this.relation = relation;
this.target = target;
}
public TypedRow getEntity() {
return entity;
}
public void setEntity(TypedRow entity) {
this.entity = entity;
}
public SortableRelation getRelation() {
return relation;
}
@ -54,14 +44,13 @@ public class EntityRelEntity implements Serializable {
return true;
if (o == null || getClass() != o.getClass())
return false;
EntityRelEntity that = (EntityRelEntity) o;
return Objects.equal(entity, that.entity)
&& Objects.equal(relation, that.relation)
RelatedEntityWrapper that = (RelatedEntityWrapper) o;
return Objects.equal(relation, that.relation)
&& Objects.equal(target, that.target);
}
@Override
public int hashCode() {
return Objects.hashCode(entity, relation, target);
return Objects.hashCode(relation, target);
}
}

View File

@ -1,53 +0,0 @@
package eu.dnetlib.dhp.oa.provision.model;
import java.io.Serializable;
import java.util.Objects;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class Tuple2 implements Serializable {
private Relation relation;
private RelatedEntity relatedEntity;
public Tuple2() {
}
public Tuple2(Relation relation, RelatedEntity relatedEntity) {
this.relation = relation;
this.relatedEntity = relatedEntity;
}
public Relation getRelation() {
return relation;
}
public void setRelation(Relation relation) {
this.relation = relation;
}
public RelatedEntity getRelatedEntity() {
return relatedEntity;
}
public void setRelatedEntity(RelatedEntity relatedEntity) {
this.relatedEntity = relatedEntity;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Tuple2 t2 = (Tuple2) o;
return getRelation().equals(t2.getRelation());
}
@Override
public int hashCode() {
return Objects.hash(getRelation().hashCode());
}
}

View File

@ -4,8 +4,6 @@ package eu.dnetlib.dhp.oa.provision.utils;
import org.apache.spark.Partitioner;
import org.apache.spark.util.Utils;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
/**
* Used in combination with SortableRelationKey, allows to partition the records by source id, therefore allowing to
* sort relations sharing the same source id by the ordering defined in SortableRelationKey.
@ -25,6 +23,8 @@ public class RelationPartitioner extends Partitioner {
@Override
public int getPartition(Object key) {
return Utils.nonNegativeMod(((SortableRelation) key).getSource().hashCode(), numPartitions());
String partitionKey = (String) key;
return Utils.nonNegativeMod(partitionKey.hashCode(), numPartitions());
}
}

View File

@ -85,17 +85,19 @@ public class XmlRecordFactory implements Serializable {
final Set<String> contexts = Sets.newHashSet();
final OafEntity entity = toOafEntity(je.getEntity());
// final OafEntity entity = toOafEntity(je.getEntity());
OafEntity entity = je.getEntity();
TemplateFactory templateFactory = new TemplateFactory();
try {
final EntityType type = EntityType.valueOf(je.getEntity().getType());
final EntityType type = EntityType.fromClass(entity.getClass());
final List<String> metadata = metadata(type, entity, contexts);
// rels has to be processed before the contexts because they enrich the contextMap with
// the
// funding info.
final List<String> relations = je
.getLinks()
final List<RelatedEntityWrapper> links = je.getLinks();
final List<String> relations = links
.stream()
.filter(link -> !isDuplicate(link))
.map(link -> mapRelation(contexts, templateFactory, type, link))
@ -975,10 +977,10 @@ public class XmlRecordFactory implements Serializable {
metadata.add(XmlSerializationUtils.mapQualifier("datasourcetypeui", dsType));
}
private List<String> mapFields(Tuple2 link, Set<String> contexts) {
private List<String> mapFields(RelatedEntityWrapper link, Set<String> contexts) {
final Relation rel = link.getRelation();
final RelatedEntity re = link.getRelatedEntity();
final String targetType = link.getRelatedEntity().getType();
final RelatedEntity re = link.getTarget();
final String targetType = link.getTarget().getType();
final List<String> metadata = Lists.newArrayList();
switch (EntityType.valueOf(targetType)) {
@ -1089,9 +1091,10 @@ public class XmlRecordFactory implements Serializable {
return metadata;
}
private String mapRelation(Set<String> contexts, TemplateFactory templateFactory, EntityType type, Tuple2 link) {
private String mapRelation(Set<String> contexts, TemplateFactory templateFactory, EntityType type,
RelatedEntityWrapper link) {
final Relation rel = link.getRelation();
final String targetType = link.getRelatedEntity().getType();
final String targetType = link.getTarget().getType();
final String scheme = ModelSupport.getScheme(type.toString(), targetType);
if (StringUtils.isBlank(scheme)) {
@ -1107,18 +1110,18 @@ public class XmlRecordFactory implements Serializable {
private List<String> listChildren(
final OafEntity entity, JoinedEntity je, TemplateFactory templateFactory) {
EntityType entityType = EntityType.valueOf(je.getEntity().getType());
final EntityType entityType = EntityType.fromClass(je.getEntity().getClass());
List<String> children = je
.getLinks()
final List<RelatedEntityWrapper> links = je.getLinks();
List<String> children = links
.stream()
.filter(link -> isDuplicate(link))
.map(link -> {
final String targetType = link.getRelatedEntity().getType();
final String targetType = link.getTarget().getType();
final String name = ModelSupport.getMainType(EntityType.valueOf(targetType));
final HashSet<String> fields = Sets.newHashSet(mapFields(link, null));
return templateFactory
.getChild(name, link.getRelatedEntity().getId(), Lists.newArrayList(fields));
.getChild(name, link.getTarget().getId(), Lists.newArrayList(fields));
})
.collect(Collectors.toCollection(ArrayList::new));
@ -1227,7 +1230,7 @@ public class XmlRecordFactory implements Serializable {
return children;
}
private boolean isDuplicate(Tuple2 link) {
private boolean isDuplicate(RelatedEntityWrapper link) {
return REL_SUBTYPE_DEDUP.equalsIgnoreCase(link.getRelation().getSubRelType());
}

View File

@ -13,8 +13,14 @@
},
{
"paramName": "iep",
"paramLongName": "inputGraphRootPath",
"paramDescription": "root graph path",
"paramLongName": "inputEntityPath",
"paramDescription": "input Entity Path",
"paramRequired": true
},
{
"paramName": "clazz",
"paramLongName": "graphTableClassName",
"paramDescription": "class name associated to the input entity path",
"paramRequired": true
},
{

View File

@ -103,8 +103,7 @@
<switch>
<case to="prepare_relations">${wf:conf('resumeFrom') eq 'prepare_relations'}</case>
<case to="fork_join_related_entities">${wf:conf('resumeFrom') eq 'fork_join_related_entities'}</case>
<case to="join_all_entities">${wf:conf('resumeFrom') eq 'join_all_entities'}</case>
<case to="adjancency_lists">${wf:conf('resumeFrom') eq 'adjancency_lists'}</case>
<case to="fork_join_all_entities">${wf:conf('resumeFrom') eq 'fork_join_all_entities'}</case>
<case to="convert_to_xml">${wf:conf('resumeFrom') eq 'convert_to_xml'}</case>
<case to="to_solr_index">${wf:conf('resumeFrom') eq 'to_solr_index'}</case>
<default to="prepare_relations"/>
@ -134,7 +133,7 @@
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation</arg>
<arg>--relPartitions</arg><arg>3000</arg>
<arg>--relPartitions</arg><arg>5000</arg>
</spark>
<ok to="fork_join_related_entities"/>
<error to="Kill"/>
@ -171,7 +170,7 @@
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/publication</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
@ -198,7 +197,7 @@
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/dataset</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
@ -225,7 +224,7 @@
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/otherresearchproduct</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
@ -252,7 +251,7 @@
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/software</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
@ -279,7 +278,7 @@
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/datasource</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/datasource</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
@ -306,7 +305,7 @@
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/organization</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
@ -333,19 +332,57 @@
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/project</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
<join name="wait_joins" to="join_all_entities"/>
<join name="wait_joins" to="fork_join_all_entities"/>
<action name="join_all_entities">
<fork name="fork_join_all_entities">
<path start="join_publication_relations"/>
<path start="join_dataset_relations"/>
<path start="join_otherresearchproduct_relations"/>
<path start="join_software_relations"/>
<path start="join_datasource_relations"/>
<path start="join_organization_relations"/>
<path start="join_project_relations"/>
</fork>
<action name="join_publication_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[entities.id = relatedEntity.source]</name>
<name>Join[publication.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15360
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/publication</arg>
<arg>--numPartitions</arg><arg>30000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_dataset_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[dataset.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
@ -359,21 +396,22 @@
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputGraphRootPath</arg><arg>${inputGraphRootPath}</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities</arg>
<arg>--numPartitions</arg><arg>12000</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/dataset</arg>
<arg>--numPartitions</arg><arg>20000</arg>
</spark>
<ok to="adjancency_lists"/>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="adjancency_lists">
<action name="join_otherresearchproduct_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>build_adjacency_lists</name>
<class>eu.dnetlib.dhp.oa.provision.AdjacencyListBuilderJob</class>
<name>Join[otherresearchproduct.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
@ -386,13 +424,130 @@
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg> <arg>${workingDir}/join_entities</arg>
<arg>--outputPath</arg><arg>${workingDir}/joined</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/otherresearchproduct</arg>
<arg>--numPartitions</arg><arg>10000</arg>
</spark>
<ok to="convert_to_xml"/>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_software_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[software.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/software</arg>
<arg>--numPartitions</arg><arg>10000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_datasource_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[datasource.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/datasource</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/datasource</arg>
<arg>--numPartitions</arg><arg>1000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_organization_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[organization.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/organization</arg>
<arg>--numPartitions</arg><arg>20000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_project_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[project.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/project</arg>
<arg>--numPartitions</arg><arg>10000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<join name="wait_join_phase2" to="convert_to_xml"/>
<action name="convert_to_xml">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
@ -411,7 +566,7 @@
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/joined</arg>
<arg>--inputPath</arg><arg>${workingDir}/join_entities</arg>
<arg>--outputPath</arg><arg>${workingDir}/xml</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--otherDsTypeId</arg><arg>${otherDsTypeId}</arg>
@ -441,7 +596,7 @@
--conf spark.hadoop.mapreduce.reduce.speculative=false
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/xml</arg>
<arg>--isLookupUrl</arg> <arg>${isLookupUrl}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--format</arg><arg>${format}</arg>
<arg>--batchSize</arg><arg>${batchSize}</arg>
</spark>

View File

@ -45,7 +45,6 @@ public class XmlRecordFactoryTest {
assertNotNull(doc);
System.out.println(doc.asXML());
// TODO add assertions based of values extracted from the XML record
}
}