diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java index 9ee7c2deb..7d8be81ac 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java @@ -58,6 +58,18 @@ public class ModelSupport { oafTypes.put("relation", Relation.class); } + public static final Map idPrefixMap = Maps.newHashMap(); + + static { + idPrefixMap.put(Datasource.class, "10"); + idPrefixMap.put(Organization.class, "20"); + idPrefixMap.put(Project.class, "40"); + idPrefixMap.put(Dataset.class, "50"); + idPrefixMap.put(OtherResearchProduct.class, "50"); + idPrefixMap.put(Software.class, "50"); + idPrefixMap.put(Publication.class, "50"); + } + public static final Map entityIdPrefix = Maps.newHashMap(); static { @@ -289,6 +301,10 @@ public class ModelSupport { private ModelSupport() { } + public static String getIdPrefix(Class clazz) { + return idPrefixMap.get(clazz); + } + /** * Checks subclass-superclass relationship. * diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java index 07ddbb00e..b5587c6b7 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java @@ -10,6 +10,7 @@ public class Dataset extends Result implements Serializable { private Field storagedate; + // candidate for removal private Field device; private Field size; diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java index 40332bf53..d25b5c9ce 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java @@ -10,8 +10,10 @@ public class Software extends Result implements Serializable { private List> documentationUrl; + // candidate for removal private List license; + // candidate for removal private Field codeRepositoryUrl; private Qualifier programmingLanguage; diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index a38e760c5..05fab47f0 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -12,16 +12,13 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.TypedColumn; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +52,9 @@ import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; +import eu.dnetlib.dhp.broker.oa.util.EventGroup; +import eu.dnetlib.dhp.broker.oa.util.ResultAggregator; +import eu.dnetlib.dhp.broker.oa.util.ResultGroup; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.oaf.OafEntity; @@ -63,6 +63,7 @@ import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Software; +import scala.Tuple2; public class GenerateEventsApplication { @@ -87,25 +88,32 @@ public class GenerateEventsApplication { private static final UpdateMatcher>, ?> enrichMoreSoftware = new EnrichMoreSoftware(); private static final UpdateMatcher>, ?> enrichMisissingPublicationIsRelatedTo = new EnrichMissingPublicationIsRelatedTo(); - private static final UpdateMatcher>, ?> enrichMissingPublicationIsReferencedBy = new EnrichMissingPublicationIsReferencedBy(); + private static final UpdateMatcher>, ?> enrichMissingPublicationIsReferencedBy = + new EnrichMissingPublicationIsReferencedBy(); private static final UpdateMatcher>, ?> enrichMissingPublicationReferences = new EnrichMissingPublicationReferences(); - private static final UpdateMatcher>, ?> enrichMissingPublicationIsSupplementedTo = new EnrichMissingPublicationIsSupplementedTo(); - private static final UpdateMatcher>, ?> enrichMissingPublicationIsSupplementedBy = new EnrichMissingPublicationIsSupplementedBy(); + private static final UpdateMatcher>, ?> enrichMissingPublicationIsSupplementedTo = + new EnrichMissingPublicationIsSupplementedTo(); + private static final UpdateMatcher>, ?> enrichMissingPublicationIsSupplementedBy = + new EnrichMissingPublicationIsSupplementedBy(); - private static final UpdateMatcher>, ?> enrichMisissingDatasetIsRelatedTo = new EnrichMissingDatasetIsRelatedTo(); - private static final UpdateMatcher>, ?> enrichMissingDatasetIsReferencedBy = new EnrichMissingDatasetIsReferencedBy(); - private static final UpdateMatcher>, ?> enrichMissingDatasetReferences = new EnrichMissingDatasetReferences(); - private static final UpdateMatcher>, ?> enrichMissingDatasetIsSupplementedTo = new EnrichMissingDatasetIsSupplementedTo(); - private static final UpdateMatcher>, ?> enrichMissingDatasetIsSupplementedBy = new EnrichMissingDatasetIsSupplementedBy(); + private static final UpdateMatcher>, ?> enrichMisissingDatasetIsRelatedTo = + new EnrichMissingDatasetIsRelatedTo(); + private static final UpdateMatcher>, ?> enrichMissingDatasetIsReferencedBy = + new EnrichMissingDatasetIsReferencedBy(); + private static final UpdateMatcher>, ?> enrichMissingDatasetReferences = + new EnrichMissingDatasetReferences(); + private static final UpdateMatcher>, ?> enrichMissingDatasetIsSupplementedTo = + new EnrichMissingDatasetIsSupplementedTo(); + private static final UpdateMatcher>, ?> enrichMissingDatasetIsSupplementedBy = + new EnrichMissingDatasetIsSupplementedBy(); public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - GenerateEventsApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json"))); + .toString(GenerateEventsApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -123,20 +131,20 @@ public class GenerateEventsApplication { final SparkConf conf = new SparkConf(); runWithSparkSession(conf, isSparkSessionManaged, spark -> { - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + removeOutputDir(spark, eventsPath); - final JavaRDD eventsRdd = sc.emptyRDD(); + final Dataset all = spark.emptyDataset(Encoders.kryo(Event.class)); for (final Class r1 : BrokerConstants.RESULT_CLASSES) { - eventsRdd.union(generateSimpleEvents(spark, graphPath, r1)); + all.union(generateSimpleEvents(spark, graphPath, r1)); for (final Class r2 : BrokerConstants.RESULT_CLASSES) { - eventsRdd.union(generateRelationEvents(spark, graphPath, r1, r2)); + all.union(generateRelationEvents(spark, graphPath, r1, r2)); } } - eventsRdd.saveAsTextFile(eventsPath, GzipCodec.class); + all.write().mode(SaveMode.Overwrite).json(eventsPath); }); } @@ -145,63 +153,56 @@ public class GenerateEventsApplication { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } - private static JavaRDD generateSimpleEvents(final SparkSession spark, + private static Dataset generateSimpleEvents(final SparkSession spark, final String graphPath, final Class resultClazz) { - final Dataset results = readPath( - spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), resultClazz) - .filter(r -> r.getDataInfo().getDeletedbyinference()); + final Dataset results = readPath(spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), Result.class) + .filter(r -> r.getDataInfo().getDeletedbyinference()); final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); - final Column c = null; // TODO - - final Dataset aa = results - .joinWith(rels, results.col("id").equalTo(rels.col("source")), "inner") - .groupBy(rels.col("target")) - .agg(c) - .filter(x -> x.size() > 1) - // generateSimpleEvents(...) - // flatMap() - // toRdd() - ; - - return null; + final TypedColumn, ResultGroup> aggr = new ResultAggregator().toColumn(); + return results.joinWith(rels, results.col("id").equalTo(rels.col("source")), "inner") + .groupByKey((MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) + .agg(aggr) + .map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) + .filter(ResultGroup::isValid) + .map((MapFunction) g -> GenerateEventsApplication.generateSimpleEvents(g), Encoders.kryo(EventGroup.class)) + .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class)); } - private List generateSimpleEvents(final Collection children) { + private static EventGroup generateSimpleEvents(final ResultGroup results) { final List> list = new ArrayList<>(); - for (final Result target : children) { - list.addAll(enrichMissingAbstract.searchUpdatesForRecord(target, children)); - list.addAll(enrichMissingAuthorOrcid.searchUpdatesForRecord(target, children)); - list.addAll(enrichMissingOpenAccess.searchUpdatesForRecord(target, children)); - list.addAll(enrichMissingPid.searchUpdatesForRecord(target, children)); - list.addAll(enrichMissingPublicationDate.searchUpdatesForRecord(target, children)); - list.addAll(enrichMissingSubject.searchUpdatesForRecord(target, children)); - list.addAll(enrichMoreOpenAccess.searchUpdatesForRecord(target, children)); - list.addAll(enrichMorePid.searchUpdatesForRecord(target, children)); - list.addAll(enrichMoreSubject.searchUpdatesForRecord(target, children)); + for (final Result target : results.getData()) { + list.addAll(enrichMissingAbstract.searchUpdatesForRecord(target, results.getData())); + list.addAll(enrichMissingAuthorOrcid.searchUpdatesForRecord(target, results.getData())); + list.addAll(enrichMissingOpenAccess.searchUpdatesForRecord(target, results.getData())); + list.addAll(enrichMissingPid.searchUpdatesForRecord(target, results.getData())); + list.addAll(enrichMissingPublicationDate.searchUpdatesForRecord(target, results.getData())); + list.addAll(enrichMissingSubject.searchUpdatesForRecord(target, results.getData())); + list.addAll(enrichMoreOpenAccess.searchUpdatesForRecord(target, results.getData())); + list.addAll(enrichMorePid.searchUpdatesForRecord(target, results.getData())); + list.addAll(enrichMoreSubject.searchUpdatesForRecord(target, results.getData())); } - return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList()); + final EventGroup events = new EventGroup(); + list.stream().map(EventFactory::newBrokerEvent).forEach(events::addElement); + return events; } - private static JavaRDD generateRelationEvents( - final SparkSession spark, + private static Dataset generateRelationEvents(final SparkSession spark, final String graphPath, final Class sourceClass, final Class targetClass) { - final Dataset sources = readPath( - spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass) - .filter(r -> r.getDataInfo().getDeletedbyinference()); + final Dataset sources = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass) + .filter(r -> r.getDataInfo().getDeletedbyinference()); - final Dataset targets = readPath( - spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), targetClass); + final Dataset targets = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), targetClass); final Dataset mergedRels = readPath(spark, graphPath + "/relation", Relation.class) .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventGroup.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventGroup.java new file mode 100644 index 000000000..9c7081c79 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventGroup.java @@ -0,0 +1,32 @@ +package eu.dnetlib.dhp.broker.oa.util; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import eu.dnetlib.dhp.broker.model.Event; + +public class EventGroup implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 765977943803533130L; + + private final List data = new ArrayList<>(); + + public List getData() { + return data; + } + + public EventGroup addElement(final Event elem) { + data.add(elem); + return this; + } + + public EventGroup addGroup(final EventGroup group) { + data.addAll(group.getData()); + return this; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultAggregator.java new file mode 100644 index 000000000..94685eeae --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultAggregator.java @@ -0,0 +1,50 @@ +package eu.dnetlib.dhp.broker.oa.util; + +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.expressions.Aggregator; + +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Result; +import scala.Tuple2; + +public class ResultAggregator extends Aggregator, ResultGroup, ResultGroup> { + + /** + * + */ + private static final long serialVersionUID = -1492327874705585538L; + + @Override + public ResultGroup zero() { + return new ResultGroup(); + } + + @Override + public ResultGroup reduce(final ResultGroup group, final Tuple2 t) { + return group.addElement(t._1); + } + + @Override + public ResultGroup merge(final ResultGroup g1, final ResultGroup g2) { + return g1.addGroup(g2); + } + + @Override + public ResultGroup finish(final ResultGroup group) { + return group; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.kryo(ResultGroup.class); + + } + + @Override + public Encoder outputEncoder() { + return Encoders.kryo(ResultGroup.class); + + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultGroup.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultGroup.java new file mode 100644 index 000000000..8fe7a5939 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultGroup.java @@ -0,0 +1,35 @@ +package eu.dnetlib.dhp.broker.oa.util; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import eu.dnetlib.dhp.schema.oaf.Result; + +public class ResultGroup implements Serializable { + + /** + * + */ + private static final long serialVersionUID = -3360828477088669296L; + + private final List data = new ArrayList<>(); + + public List getData() { + return data; + } + + public ResultGroup addElement(final Result elem) { + data.add(elem); + return this; + } + + public ResultGroup addGroup(final ResultGroup group) { + data.addAll(group.getData()); + return this; + } + + public boolean isValid() { + return data.size() > 1; + } +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala index 90bfacdc9..7b21ecda2 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala @@ -319,15 +319,7 @@ object DoiBoostMappingUtil { def generateIdentifier (oaf: Result, doi: String): String = { val id = DHPUtils.md5 (doi.toLowerCase) - if (oaf.isInstanceOf[Dataset] ) - return s"60|${ - doiBoostNSPREFIX - }${ - SEPARATOR - }${ - id - }" - s"50|${ + return s"50|${ doiBoostNSPREFIX }${ SEPARATOR diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala index cc2c9d586..85997fa36 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala @@ -179,6 +179,9 @@ case object Crossref2Oaf { if (StringUtils.isNotBlank(issuedDate)) { instance.setDateofacceptance(asField(issuedDate)) } + else { + instance.setDateofacceptance(asField(createdDate.getValue)) + } val s: String = (json \ "URL").extract[String] val links: List[String] = ((for {JString(url) <- json \ "link" \ "URL"} yield url) ::: List(s)).filter(p => p != null).distinct if (links.nonEmpty) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala index 1c6e1b0e6..2419f86a3 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala @@ -129,16 +129,16 @@ case object ConversionUtil { val fieldOfStudy = item._2 if (fieldOfStudy != null && fieldOfStudy.subjects != null && fieldOfStudy.subjects.nonEmpty) { val p: List[StructuredProperty] = fieldOfStudy.subjects.flatMap(s => { - val s1 = createSP(s.DisplayName, "keywords", "dnet:subject_classification_typologies") + val s1 = createSP(s.DisplayName, "keyword", "dnet:subject_classification_typologies") val di = DoiBoostMappingUtil.generateDataInfo(s.Score.toString) var resList: List[StructuredProperty] = List(s1) if (s.MainType.isDefined) { val maintp = s.MainType.get - val s2 = createSP(s.MainType.get, "keywords", "dnet:subject_classification_typologies") + val s2 = createSP(s.MainType.get, "keyword", "dnet:subject_classification_typologies") s2.setDataInfo(di) resList = resList ::: List(s2) if (maintp.contains(".")) { - val s3 = createSP(maintp.split("\\.").head, "keywords", "dnet:subject_classification_typologies") + val s3 = createSP(maintp.split("\\.").head, "keyword", "dnet:subject_classification_typologies") s3.setDataInfo(di) resList = resList ::: List(s3) } @@ -190,7 +190,7 @@ case object ConversionUtil { pub.setPid(List(createSP(paper.Doi.toLowerCase, "doi", PID_TYPES)).asJava) pub.setOriginalId(List(paper.PaperId.toString, paper.Doi.toLowerCase).asJava) - //Set identifier as {50|60} | doiboost____::md5(DOI) + //Set identifier as 50|doiboost____::md5(DOI) pub.setId(generateIdentifier(pub, paper.Doi.toLowerCase)) val mainTitles = createSP(paper.PaperTitle, "main title", "dnet:dataCite_title") @@ -229,6 +229,8 @@ case object ConversionUtil { pub.setPublisher(asField(journal.Publisher.get)) if (journal.Issn.isDefined) j.setIssnPrinted(journal.Issn.get) + j.setVol(paper.Volume) + j.setIss(paper.Issue) pub.setJournal(j) } pub.setCollectedfrom(List(createMAGCollectedFrom()).asJava) @@ -247,7 +249,7 @@ case object ConversionUtil { pub.setPid(List(createSP(paper.Doi.toLowerCase, "doi", PID_TYPES)).asJava) pub.setOriginalId(List(paper.PaperId.toString, paper.Doi.toLowerCase).asJava) - //Set identifier as {50|60} | doiboost____::md5(DOI) + //Set identifier as 50 | doiboost____::md5(DOI) pub.setId(generateIdentifier(pub, paper.Doi.toLowerCase)) val mainTitles = createSP(paper.PaperTitle, "main title", "dnet:dataCite_title") diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu.dnetlib.dhp.doiboost.mappings/crossref_mapping.csv b/dhp-workflows/dhp-doiboost/src/main/resources/eu.dnetlib.dhp.doiboost.mappings/crossref_mapping.csv new file mode 100644 index 000000000..d87ae5da6 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu.dnetlib.dhp.doiboost.mappings/crossref_mapping.csv @@ -0,0 +1,60 @@ +Crossref Field,Type,Required,Description (from Crossref),OAF field,Comments +publisher,String,Yes,Name of work's publisher,Result/Publisher, +title,Array of String,Yes,"Work titles, including translated titles","Result/Title with Qualifier(""main title"", ""dnet:dataCite_title"")", +original-title,Array of String,No,Work titles in the work's original publication language,"Result/Title with Qualifier(""alternative title"", ""dnet:dataCite_title"")", +short-title,Array of String,No,Short or abbreviated work titles,"Result/Title with Qualifier(""alternative title"", ""dnet:dataCite_title"")", +abstract,XML String,No,Abstract as a JSON string or a JATS XML snippet encoded into a JSON string,Result/description, +reference-count,Number,Yes,Deprecated Same as references-count,"- ", +references-count,Number,Yes,Count of outbound references deposited with Crossref,N/A, +is-referenced-by-count,Number,Yes,Count of inbound references deposited with Crossref,N/A, +source,String,Yes,Currently always Crossref,Result/source, +prefix,String,Yes,DOI prefix identifier of the form http://id.crossref.org/prefix/DOI_PREFIX,N/A, +DOI,String,Yes,DOI of the work,OafEntity/originalId, +,,,,OafEntity/PID, +,,,,"Oaf/id ",Use to generate the OpenAIRE id in the form 50|doiboost____::md5(DOI) +URL,URL,Yes,URL form of the work's DOI,Instance/url, +member,String,Yes,Member identifier of the form http://id.crossref.org/member/MEMBER_ID,N/A, +type,String,Yes,"Enumeration, one of the type ids from https://api.crossref.org/v1/types",Instance/instancetype,Also use to map the record as OAF Publication or Dataset according to the mapping defined in eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +created,Date,Yes,Date on which the DOI was first registered,"Result/relevantDate with Qualifier(""created"", ""dnet:dataCite_date"")", +,,,,"Result/dateofacceptance +Instance/dateofacceptance",If crossref.issued is blank +deposited,Date,Yes,Date on which the work metadata was most recently updated,N/A, +indexed,Date,Yes,"Date on which the work metadata was most recently indexed. Re-indexing does not imply a metadata change, see deposited for the most recent metadata change date",Result/lastupdatetimestamp, +issued,Partial Date,Yes,Earliest of published-print and published-online,Result/dateofacceptance,OAF dateofacceptance is used also for the publishing date. It's the date visualised in the OpenAIRE EXPLORE portal. +,,,,Instance/dateofacceptance, +posted,Partial Date,No,Date on which posted content was made available online,"Result/relevantDate with Qualifier(""available"", ""dnet:dataCite_date"")", +accepted,Partial Date,No,"Date on which a work was accepted, after being submitted, during a submission process","Result/relevantDate with Qualifier(""accepted"", ""dnet:dataCite_date"")", +subtitle,Array of String,No,"Work subtitles, including original language and translated","Result/Title with Qualifier(""subtitle"", ""dnet:dataCite_title"")", +container-title,Array of String,No,Full titles of the containing work (usually a book or journal),Publication/Journal/name only in case of Journal title for book title see ISBN Mapping, +short-container-title,Array of String,No,Abbreviated titles of the containing work,N/A, +group-title,String,No,Group title for posted content,N/A, +issue,String,No,Issue number of an article's journal,Publication/Journal/iss, +volume,String,No,Volume number of an article's journal,Publication/Journal/vol, +page,String,No,Pages numbers of an article within its journal,"Publication/Journal/sp +Publication/Journal/ep",Obtain start and end page by splitting by '-' +article-number,String,No,,N/A, +published-print,Partial Date,No,Date on which the work was published in print,"Result/relevantDate with Qualifier(""published-print"", ""dnet:dataCite_date"")", +published-online,Partial Date,No,Date on which the work was published online,"Result/relevantDate with Qualifier(""published-online"", ""dnet:dataCite_date"")", +subject,Array of String,No,"Subject category names, a controlled vocabulary from Sci-Val. Available for most journal articles","Result/subject with Qualifier(""keywords"", ""dnet:subject_classification_typologies""). ","Future improvements: map the controlled vocabulary instead of using the generic ""keywords"" qualifier" +ISSN,Array of String,No,,"Publication/Journal/issn +Publication/Journal/lissn +Publication/Journal/eissn",The mapping depends on the value of issn-type +issn-type,Array of ISSN with Type,No,List of ISSNs with ISSN type information,N/A,Its value guides the setting of the properties in Journal (see row above) +ISBN,Array of String,No,,Publication/source,"In case of Book We can map ISBN and container title on Publication/source using this syntax container-title + ""ISBN: "" + ISBN" +archive,Array of String,No,,N/A, +license,Array of License,No,,Result/Instance/License, +funder,Array of Funder,No,,Relation,Whenever we are able to link to a funder or project integrated into OpenAIRE. Mapping to OpenAIRE funders and projects is in eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala.generateSimpleRelationFromAward +assertion,Array of Assertion,No,,N/A, +author,Array of Contributor,No,,Result/author (with orcid if available), +editor,Array of Contributor,No,,N/A, +chair,Array of Contributor,No,,N/A, +translator,Array of Contributor,No,,N/A, +update-to,Array of Update,No,,N/A, +update-policy,URL,No,Link to an update policy covering Crossmark updates for this work,N/A, +link,Array of Resource Link,No,URLs to full-text locations,Result/Instance/url, +clinical-trial-number,Array of Clinical Trial Number,No,,OafEntity/originalId, +alternative-id,String,No,Other identifiers for the work provided by the depositing member,OafEntity/originalId, +reference,Array of Reference,No,List of references made by the work,,Future improvement: map to references +content-domain,Content Domain,No,Information on domains that support Crossmark for this work,N/A, +relation,Relations,No,Relations to other works,Result/Instance/refereed,"if(relation.has-review) instance.refereed = ""peerReviewed"". " +review,Review,No,Peer review metadata,N/A, diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java index 99247b756..d9cc03cd5 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java @@ -6,23 +6,23 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; -import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; -import eu.dnetlib.dhp.oa.provision.model.Tuple2; -import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.oa.provision.model.*; +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.Seq; /** * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The @@ -76,46 +76,31 @@ public class AdjacencyListBuilderJob { SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); runWithSparkSession( conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - createAdjacencyLists(spark, inputPath, outputPath); + createAdjacencyListsKryo(spark, inputPath, outputPath); }); } - private static void createAdjacencyLists( + private static void createAdjacencyListsKryo( SparkSession spark, String inputPath, String outputPath) { log.info("Reading joined entities from: {}", inputPath); - spark - .read() - .load(inputPath) - .as(Encoders.bean(EntityRelEntity.class)) - .groupByKey( - (MapFunction) value -> value.getEntity().getId(), - Encoders.STRING()) - .mapGroups( - (MapGroupsFunction) (key, values) -> { - JoinedEntity j = new JoinedEntity(); - List links = new ArrayList<>(); - while (values.hasNext() && links.size() < MAX_LINKS) { - EntityRelEntity curr = values.next(); - if (j.getEntity() == null) { - j.setEntity(curr.getEntity()); - } - links.add(new Tuple2(curr.getRelation(), curr.getTarget())); - } - j.setLinks(links); - return j; - }, - Encoders.bean(JoinedEntity.class)) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); + + final List paths = HdfsSupport + .listFiles(inputPath, spark.sparkContext().hadoopConfiguration()); + + log.info("Found paths: {}", String.join(",", paths)); + + } + + private static Seq toSeq(List list) { + return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq(); } private static void removeOutputDir(SparkSession spark, String path) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index 606fa4cc0..4d2633bc5 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -2,7 +2,6 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*; import java.util.List; import java.util.Objects; @@ -23,8 +22,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; +import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; +import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper; import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; @@ -91,7 +91,7 @@ public class CreateRelatedEntitiesJob_phase1 { SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); runWithSparkSession( conf, @@ -120,7 +120,7 @@ public class CreateRelatedEntitiesJob_phase1 { .filter("dataInfo.invisible == false") .map( (MapFunction) value -> asRelatedEntity(value, clazz), - Encoders.bean(RelatedEntity.class)) + Encoders.kryo(RelatedEntity.class)) .map( (MapFunction>) e -> new Tuple2<>(e.getId(), e), Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))) @@ -129,12 +129,12 @@ public class CreateRelatedEntitiesJob_phase1 { relsByTarget .joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner") .map( - (MapFunction, Tuple2>, EntityRelEntity>) t -> new EntityRelEntity( + (MapFunction, Tuple2>, RelatedEntityWrapper>) t -> new RelatedEntityWrapper( t._1()._2(), t._2()._2()), - Encoders.bean(EntityRelEntity.class)) + Encoders.kryo(RelatedEntityWrapper.class)) .write() .mode(SaveMode.Overwrite) - .parquet(outputPath + "/" + EntityType.fromClass(clazz)); + .parquet(outputPath); } private static Dataset readPathEntity( diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java index 403817019..5ef30d6e1 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java @@ -4,27 +4,32 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.List; +import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; +import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; +import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; +import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper; import eu.dnetlib.dhp.oa.provision.model.TypedRow; +import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; @@ -59,6 +64,12 @@ public class CreateRelatedEntitiesJob_phase2 { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final int MAX_EXTERNAL_ENTITIES = 50; + private static final int MAX_AUTHORS = 200; + private static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000; + private static final int MAX_TITLE_LENGTH = 5000; + private static final int MAX_ABSTRACT_LENGTH = 100000; + public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils @@ -78,8 +89,8 @@ public class CreateRelatedEntitiesJob_phase2 { String inputRelatedEntitiesPath = parser.get("inputRelatedEntitiesPath"); log.info("inputRelatedEntitiesPath: {}", inputRelatedEntitiesPath); - String inputGraphRootPath = parser.get("inputGraphRootPath"); - log.info("inputGraphRootPath: {}", inputGraphRootPath); + String inputEntityPath = parser.get("inputEntityPath"); + log.info("inputEntityPath: {}", inputEntityPath); String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); @@ -87,80 +98,112 @@ public class CreateRelatedEntitiesJob_phase2 { int numPartitions = Integer.parseInt(parser.get("numPartitions")); log.info("numPartitions: {}", numPartitions); + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); + + Class entityClazz = (Class) Class.forName(graphTableClassName); + SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); runWithSparkSession( conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - joinAllEntities( - spark, inputRelatedEntitiesPath, inputGraphRootPath, outputPath, numPartitions); + joinEntityWithRelatedEntities( + spark, inputRelatedEntitiesPath, inputEntityPath, outputPath, numPartitions, entityClazz); }); } - private static void joinAllEntities( + private static void joinEntityWithRelatedEntities( SparkSession spark, - String inputRelatedEntitiesPath, - String inputGraphRootPath, + String relatedEntitiesPath, + String entityPath, String outputPath, - int numPartitions) { + int numPartitions, + Class entityClazz) { - Dataset> entities = readAllEntities(spark, inputGraphRootPath, numPartitions); - Dataset> relsBySource = readRelatedEntities(spark, inputRelatedEntitiesPath); + Dataset> entities = readPathEntity(spark, entityPath, entityClazz); + Dataset> relatedEntities = readRelatedEntities( + spark, relatedEntitiesPath, entityClazz); + + TypedColumn aggregator = new AdjacencyListAggregator().toColumn(); entities - .joinWith(relsBySource, entities.col("_1").equalTo(relsBySource.col("_1")), "left_outer") + .joinWith(relatedEntities, entities.col("_1").equalTo(relatedEntities.col("_1")), "left_outer") + .map((MapFunction, Tuple2>, JoinedEntity>) value -> { + JoinedEntity je = new JoinedEntity(value._1()._2()); + Optional + .ofNullable(value._2()) + .map(Tuple2::_2) + .ifPresent(r -> je.getLinks().add(r)); + return je; + }, Encoders.kryo(JoinedEntity.class)) + .filter(filterEmptyEntityFn()) + .groupByKey( + (MapFunction) value -> value.getEntity().getId(), + Encoders.STRING()) + .agg(aggregator) .map( - (MapFunction, Tuple2>, EntityRelEntity>) value -> { - EntityRelEntity re = new EntityRelEntity(); - re.setEntity(value._1()._2()); - Optional related = Optional.ofNullable(value._2()).map(Tuple2::_2); - if (related.isPresent()) { - re.setRelation(related.get().getRelation()); - re.setTarget(related.get().getTarget()); - } - return re; - }, - Encoders.bean(EntityRelEntity.class)) - .repartition(numPartitions) - .filter( - (FilterFunction) value -> value.getEntity() != null - && StringUtils.isNotBlank(value.getEntity().getId())) + (MapFunction, JoinedEntity>) value -> value._2(), + Encoders.kryo(JoinedEntity.class)) + .filter(filterEmptyEntityFn()) .write() .mode(SaveMode.Overwrite) .parquet(outputPath); } - private static Dataset> readAllEntities( - SparkSession spark, String inputGraphPath, int numPartitions) { - Dataset publication = readPathEntity(spark, inputGraphPath + "/publication", Publication.class); - Dataset dataset = readPathEntity( - spark, inputGraphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); - Dataset other = readPathEntity( - spark, inputGraphPath + "/otherresearchproduct", OtherResearchProduct.class); - Dataset software = readPathEntity(spark, inputGraphPath + "/software", Software.class); - Dataset datasource = readPathEntity(spark, inputGraphPath + "/datasource", Datasource.class); - Dataset organization = readPathEntity(spark, inputGraphPath + "/organization", Organization.class); - Dataset project = readPathEntity(spark, inputGraphPath + "/project", Project.class); + public static class AdjacencyListAggregator extends Aggregator { + + @Override + public JoinedEntity zero() { + return new JoinedEntity(); + } + + @Override + public JoinedEntity reduce(JoinedEntity b, JoinedEntity a) { + return mergeAndGet(b, a); + } + + private JoinedEntity mergeAndGet(JoinedEntity b, JoinedEntity a) { + b + .setEntity( + Optional + .ofNullable(a.getEntity()) + .orElse( + Optional + .ofNullable(b.getEntity()) + .orElse(null))); + b.getLinks().addAll(a.getLinks()); + return b; + } + + @Override + public JoinedEntity merge(JoinedEntity b, JoinedEntity a) { + return mergeAndGet(b, a); + } + + @Override + public JoinedEntity finish(JoinedEntity j) { + return j; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.kryo(JoinedEntity.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.kryo(JoinedEntity.class); + } - return publication - .union(dataset) - .union(other) - .union(software) - .union(datasource) - .union(organization) - .union(project) - .map( - (MapFunction>) value -> new Tuple2<>(value.getId(), value), - Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class))) - .repartition(numPartitions); } - private static Dataset> readRelatedEntities( - SparkSession spark, String inputRelatedEntitiesPath) { + private static Dataset> readRelatedEntities( + SparkSession spark, String inputRelatedEntitiesPath, Class entityClazz) { log.info("Reading related entities from: {}", inputRelatedEntitiesPath); @@ -169,17 +212,20 @@ public class CreateRelatedEntitiesJob_phase2 { log.info("Found paths: {}", String.join(",", paths)); + final String idPrefix = ModelSupport.getIdPrefix(entityClazz); + return spark .read() .load(toSeq(paths)) - .as(Encoders.bean(EntityRelEntity.class)) + .as(Encoders.kryo(RelatedEntityWrapper.class)) + .filter((FilterFunction) e -> e.getRelation().getSource().startsWith(idPrefix)) .map( - (MapFunction>) value -> new Tuple2<>( + (MapFunction>) value -> new Tuple2<>( value.getRelation().getSource(), value), - Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class))); + Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntityWrapper.class))); } - private static Dataset readPathEntity( + private static Dataset> readPathEntity( SparkSession spark, String inputEntityPath, Class entityClazz) { log.info("Reading Graph table from: {}", inputEntityPath); @@ -190,10 +236,79 @@ public class CreateRelatedEntitiesJob_phase2 { (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz)) .filter("dataInfo.invisible == false") + .map((MapFunction) e -> pruneOutliers(entityClazz, e), Encoders.bean(entityClazz)) .map( - (MapFunction) value -> getTypedRow( - StringUtils.substringAfterLast(inputEntityPath, "/"), value), - Encoders.bean(TypedRow.class)); + (MapFunction>) e -> new Tuple2<>(e.getId(), e), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(entityClazz))); + } + + private static E pruneOutliers(Class entityClazz, E e) { + if (ModelSupport.isSubClass(entityClazz, Result.class)) { + Result r = (Result) e; + if (r.getExternalReference() != null) { + List refs = r + .getExternalReference() + .stream() + .limit(MAX_EXTERNAL_ENTITIES) + .collect(Collectors.toList()); + r.setExternalReference(refs); + } + if (r.getAuthor() != null) { + List authors = Lists.newArrayList(); + for (Author a : r.getAuthor()) { + a.setFullname(StringUtils.left(a.getFullname(), MAX_AUTHOR_FULLNAME_LENGTH)); + if (authors.size() < MAX_AUTHORS || hasORCID(a)) { + authors.add(a); + } + } + r.setAuthor(authors); + } + if (r.getDescription() != null) { + List> desc = r + .getDescription() + .stream() + .filter(Objects::nonNull) + .map(d -> { + d.setValue(StringUtils.left(d.getValue(), MAX_ABSTRACT_LENGTH)); + return d; + }) + .collect(Collectors.toList()); + r.setDescription(desc); + } + if (r.getTitle() != null) { + List titles = r + .getTitle() + .stream() + .filter(Objects::nonNull) + .map(t -> { + t.setValue(StringUtils.left(t.getValue(), MAX_TITLE_LENGTH)); + return t; + }) + .collect(Collectors.toList()); + r.setTitle(titles); + } + } + return e; + } + + private static boolean hasORCID(Author a) { + return a.getPid() != null && a + .getPid() + .stream() + .filter(Objects::nonNull) + .map(StructuredProperty::getQualifier) + .filter(Objects::nonNull) + .map(Qualifier::getClassid) + .filter(StringUtils::isNotBlank) + .anyMatch(c -> "orcid".equals(c.toLowerCase())); + } + + private static FilterFunction filterEmptyEntityFn() { + return (FilterFunction) v -> Objects.nonNull(v.getEntity()); + /* + * return (FilterFunction) v -> Optional .ofNullable(v.getEntity()) .map(e -> + * StringUtils.isNotBlank(e.getId())) .orElse(false); + */ } private static TypedRow getTypedRow(String type, OafEntity entity) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index 72d68a389..6b184071a 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -3,9 +3,8 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.util.HashSet; -import java.util.Optional; -import java.util.Set; +import java.util.*; +import java.util.function.Function; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -20,6 +19,7 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.clearspring.analytics.util.Lists; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; @@ -27,9 +27,11 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Sets; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.FunctionalInterfaceSupport; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner; +import scala.Function1; import scala.Tuple2; /** @@ -111,37 +113,10 @@ public class PrepareRelationsJob { spark -> { removeOutputDir(spark, outputPath); prepareRelationsRDD( - spark, inputRelationsPath, outputPath, relationFilter, relPartitions, maxRelations); + spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions); }); } - /** - * Dataset based implementation that prepares the graph relations by limiting the number of outgoing links and - * filtering the relation types according to the given criteria. - * - * @param spark the spark session - * @param inputRelationsPath source path for the graph relations - * @param outputPath output path for the processed relations - * @param relationFilter set of relation filters applied to the `relClass` field - * @param maxRelations maximum number of allowed outgoing edges - */ - private static void prepareRelations( - SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, - int maxRelations) { - readPathRelation(spark, inputRelationsPath) - .filter("dataInfo.deletedbyinference == false") - .filter((FilterFunction) rel -> !relationFilter.contains(rel.getRelClass())) - .groupByKey( - (MapFunction) value -> value.getSource(), Encoders.STRING()) - .flatMapGroups( - (FlatMapGroupsFunction) (key, values) -> Iterators - .limit(values, maxRelations), - Encoders.bean(SortableRelation.class)) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); - } - /** * RDD based implementation that prepares the graph relations by limiting the number of outgoing links and filtering * the relation types according to the given criteria. Moreover, outgoing links kept within the given limit are @@ -152,50 +127,41 @@ public class PrepareRelationsJob { * @param outputPath output path for the processed relations * @param relationFilter set of relation filters applied to the `relClass` field * @param maxRelations maximum number of allowed outgoing edges + * @param relPartitions number of partitions for the output RDD */ - // TODO work in progress private static void prepareRelationsRDD( - SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int relPartitions, - int maxRelations) { - JavaRDD rels = readPathRelationRDD(spark, inputRelationsPath).repartition(relPartitions); - RelationPartitioner partitioner = new RelationPartitioner(rels.getNumPartitions()); + SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, + int relPartitions) { - // only consider those that are not virtually deleted - RDD d = rels + RDD cappedRels = readPathRelationRDD(spark, inputRelationsPath) + .repartition(relPartitions) .filter(rel -> !rel.getDataInfo().getDeletedbyinference()) .filter(rel -> !relationFilter.contains(rel.getRelClass())) - .mapToPair( - (PairFunction) rel -> new Tuple2<>(rel, rel)) - .groupByKey(partitioner) - .map(group -> Iterables.limit(group._2(), maxRelations)) - .flatMap(group -> group.iterator()) + // group by SOURCE and apply limit + .mapToPair(rel -> new Tuple2<>(rel.getSource(), rel)) + .groupByKey(new RelationPartitioner(relPartitions)) + .flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator()) + // group by TARGET and apply limit + .mapToPair(rel -> new Tuple2<>(rel.getTarget(), rel)) + .groupByKey(new RelationPartitioner(relPartitions)) + .flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator()) .rdd(); spark - .createDataset(d, Encoders.bean(SortableRelation.class)) + .createDataset(cappedRels, Encoders.bean(SortableRelation.class)) .write() .mode(SaveMode.Overwrite) .parquet(outputPath); } /** - * Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text + * Reads a JavaRDD of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text * file, * * @param spark * @param inputPath - * @return the Dataset containing all the relationships + * @return the JavaRDD containing all the relationships */ - private static Dataset readPathRelation( - SparkSession spark, final String inputPath) { - return spark - .read() - .textFile(inputPath) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, SortableRelation.class), - Encoders.bean(SortableRelation.class)); - } - private static JavaRDD readPathRelationRDD( SparkSession spark, final String inputPath) { JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java index a88b28592..a1ed7fd2a 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java @@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -32,6 +33,8 @@ import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.Seq; /** * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The @@ -89,6 +92,8 @@ public class XmlConverterJob { log.info("otherDsTypeId: {}", otherDsTypeId); SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); runWithSparkSession( conf, @@ -114,26 +119,18 @@ public class XmlConverterJob { schemaLocation, otherDsTypeId); + final List paths = HdfsSupport + .listFiles(inputPath, spark.sparkContext().hadoopConfiguration()); + + log.info("Found paths: {}", String.join(",", paths)); + spark .read() - .load(inputPath) - .as(Encoders.bean(JoinedEntity.class)) + .load(toSeq(paths)) + .as(Encoders.kryo(JoinedEntity.class)) .map( - (MapFunction) j -> { - if (j.getLinks() != null) { - j - .setLinks( - j - .getLinks() - .stream() - .filter(t -> t.getRelation() != null & t.getRelatedEntity() != null) - .collect(Collectors.toCollection(ArrayList::new))); - } - return j; - }, - Encoders.bean(JoinedEntity.class)) - .map( - (MapFunction>) je -> new Tuple2<>(je.getEntity().getId(), + (MapFunction>) je -> new Tuple2<>( + je.getEntity().getId(), recordFactory.build(je)), Encoders.tuple(Encoders.STRING(), Encoders.STRING())) .javaRDD() @@ -148,6 +145,10 @@ public class XmlConverterJob { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } + private static Seq toSeq(List list) { + return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq(); + } + private static Map prepareAccumulators(SparkContext sc) { Map accumulators = Maps.newHashMap(); accumulators diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java index e29ec9d19..2eb9cf38b 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java @@ -2,30 +2,40 @@ package eu.dnetlib.dhp.oa.provision.model; import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; -public class JoinedEntity implements Serializable { +import eu.dnetlib.dhp.schema.oaf.OafEntity; - private TypedRow entity; +public class JoinedEntity implements Serializable { - private List links; + private E entity; + + private List links; public JoinedEntity() { + links = new LinkedList<>(); } - public TypedRow getEntity() { - return entity; - } - - public void setEntity(TypedRow entity) { + public JoinedEntity(E entity) { + this(); this.entity = entity; } - public List getLinks() { + public E getEntity() { + return entity; + } + + public void setEntity(E entity) { + this.entity = entity; + } + + public List getLinks() { return links; } - public void setLinks(List links) { + public void setLinks(List links) { this.links = links; } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java new file mode 100644 index 000000000..f9fde14e5 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java @@ -0,0 +1,25 @@ + +package eu.dnetlib.dhp.oa.provision.model; + +import java.util.List; + +import com.google.common.collect.Lists; + +import eu.dnetlib.dhp.schema.common.ModelSupport; + +public class ProvisionModelSupport { + + public static Class[] getModelClasses() { + List> modelClasses = Lists.newArrayList(ModelSupport.getOafModelClasses()); + modelClasses + .addAll( + Lists + .newArrayList( + TypedRow.class, + RelatedEntityWrapper.class, + JoinedEntity.class, + RelatedEntity.class, + SortableRelation.class)); + return modelClasses.toArray(new Class[] {}); + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java similarity index 56% rename from dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java rename to dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java index a6b3c5591..d708b6ed0 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java @@ -5,33 +5,23 @@ import java.io.Serializable; import com.google.common.base.Objects; -public class EntityRelEntity implements Serializable { +public class RelatedEntityWrapper implements Serializable { - private TypedRow entity; private SortableRelation relation; private RelatedEntity target; - public EntityRelEntity() { + public RelatedEntityWrapper() { } - public EntityRelEntity(SortableRelation relation, RelatedEntity target) { + public RelatedEntityWrapper(SortableRelation relation, RelatedEntity target) { this(null, relation, target); } - public EntityRelEntity(TypedRow entity, SortableRelation relation, RelatedEntity target) { - this.entity = entity; + public RelatedEntityWrapper(TypedRow entity, SortableRelation relation, RelatedEntity target) { this.relation = relation; this.target = target; } - public TypedRow getEntity() { - return entity; - } - - public void setEntity(TypedRow entity) { - this.entity = entity; - } - public SortableRelation getRelation() { return relation; } @@ -54,14 +44,13 @@ public class EntityRelEntity implements Serializable { return true; if (o == null || getClass() != o.getClass()) return false; - EntityRelEntity that = (EntityRelEntity) o; - return Objects.equal(entity, that.entity) - && Objects.equal(relation, that.relation) + RelatedEntityWrapper that = (RelatedEntityWrapper) o; + return Objects.equal(relation, that.relation) && Objects.equal(target, that.target); } @Override public int hashCode() { - return Objects.hashCode(entity, relation, target); + return Objects.hashCode(relation, target); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java deleted file mode 100644 index 5ebe9c9eb..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java +++ /dev/null @@ -1,53 +0,0 @@ - -package eu.dnetlib.dhp.oa.provision.model; - -import java.io.Serializable; -import java.util.Objects; - -import eu.dnetlib.dhp.schema.oaf.Relation; - -public class Tuple2 implements Serializable { - - private Relation relation; - - private RelatedEntity relatedEntity; - - public Tuple2() { - } - - public Tuple2(Relation relation, RelatedEntity relatedEntity) { - this.relation = relation; - this.relatedEntity = relatedEntity; - } - - public Relation getRelation() { - return relation; - } - - public void setRelation(Relation relation) { - this.relation = relation; - } - - public RelatedEntity getRelatedEntity() { - return relatedEntity; - } - - public void setRelatedEntity(RelatedEntity relatedEntity) { - this.relatedEntity = relatedEntity; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - Tuple2 t2 = (Tuple2) o; - return getRelation().equals(t2.getRelation()); - } - - @Override - public int hashCode() { - return Objects.hash(getRelation().hashCode()); - } -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java index a09a27837..c7862b48a 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java @@ -4,8 +4,6 @@ package eu.dnetlib.dhp.oa.provision.utils; import org.apache.spark.Partitioner; import org.apache.spark.util.Utils; -import eu.dnetlib.dhp.oa.provision.model.SortableRelation; - /** * Used in combination with SortableRelationKey, allows to partition the records by source id, therefore allowing to * sort relations sharing the same source id by the ordering defined in SortableRelationKey. @@ -25,6 +23,8 @@ public class RelationPartitioner extends Partitioner { @Override public int getPartition(Object key) { - return Utils.nonNegativeMod(((SortableRelation) key).getSource().hashCode(), numPartitions()); + String partitionKey = (String) key; + return Utils.nonNegativeMod(partitionKey.hashCode(), numPartitions()); } + } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java index f99298130..d950a816d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java @@ -85,17 +85,19 @@ public class XmlRecordFactory implements Serializable { final Set contexts = Sets.newHashSet(); - final OafEntity entity = toOafEntity(je.getEntity()); + // final OafEntity entity = toOafEntity(je.getEntity()); + OafEntity entity = je.getEntity(); TemplateFactory templateFactory = new TemplateFactory(); try { - final EntityType type = EntityType.valueOf(je.getEntity().getType()); + + final EntityType type = EntityType.fromClass(entity.getClass()); final List metadata = metadata(type, entity, contexts); // rels has to be processed before the contexts because they enrich the contextMap with // the // funding info. - final List relations = je - .getLinks() + final List links = je.getLinks(); + final List relations = links .stream() .filter(link -> !isDuplicate(link)) .map(link -> mapRelation(contexts, templateFactory, type, link)) @@ -975,10 +977,10 @@ public class XmlRecordFactory implements Serializable { metadata.add(XmlSerializationUtils.mapQualifier("datasourcetypeui", dsType)); } - private List mapFields(Tuple2 link, Set contexts) { + private List mapFields(RelatedEntityWrapper link, Set contexts) { final Relation rel = link.getRelation(); - final RelatedEntity re = link.getRelatedEntity(); - final String targetType = link.getRelatedEntity().getType(); + final RelatedEntity re = link.getTarget(); + final String targetType = link.getTarget().getType(); final List metadata = Lists.newArrayList(); switch (EntityType.valueOf(targetType)) { @@ -1089,9 +1091,10 @@ public class XmlRecordFactory implements Serializable { return metadata; } - private String mapRelation(Set contexts, TemplateFactory templateFactory, EntityType type, Tuple2 link) { + private String mapRelation(Set contexts, TemplateFactory templateFactory, EntityType type, + RelatedEntityWrapper link) { final Relation rel = link.getRelation(); - final String targetType = link.getRelatedEntity().getType(); + final String targetType = link.getTarget().getType(); final String scheme = ModelSupport.getScheme(type.toString(), targetType); if (StringUtils.isBlank(scheme)) { @@ -1107,18 +1110,18 @@ public class XmlRecordFactory implements Serializable { private List listChildren( final OafEntity entity, JoinedEntity je, TemplateFactory templateFactory) { - EntityType entityType = EntityType.valueOf(je.getEntity().getType()); + final EntityType entityType = EntityType.fromClass(je.getEntity().getClass()); - List children = je - .getLinks() + final List links = je.getLinks(); + List children = links .stream() .filter(link -> isDuplicate(link)) .map(link -> { - final String targetType = link.getRelatedEntity().getType(); + final String targetType = link.getTarget().getType(); final String name = ModelSupport.getMainType(EntityType.valueOf(targetType)); final HashSet fields = Sets.newHashSet(mapFields(link, null)); return templateFactory - .getChild(name, link.getRelatedEntity().getId(), Lists.newArrayList(fields)); + .getChild(name, link.getTarget().getId(), Lists.newArrayList(fields)); }) .collect(Collectors.toCollection(ArrayList::new)); @@ -1227,7 +1230,7 @@ public class XmlRecordFactory implements Serializable { return children; } - private boolean isDuplicate(Tuple2 link) { + private boolean isDuplicate(RelatedEntityWrapper link) { return REL_SUBTYPE_DEDUP.equalsIgnoreCase(link.getRelation().getSubRelType()); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json index 2727f153b..2c9f0e4f3 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json @@ -13,8 +13,14 @@ }, { "paramName": "iep", - "paramLongName": "inputGraphRootPath", - "paramDescription": "root graph path", + "paramLongName": "inputEntityPath", + "paramDescription": "input Entity Path", + "paramRequired": true + }, + { + "paramName": "clazz", + "paramLongName": "graphTableClassName", + "paramDescription": "class name associated to the input entity path", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 6983ecf53..0d5121cf1 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -103,8 +103,7 @@ ${wf:conf('resumeFrom') eq 'prepare_relations'} ${wf:conf('resumeFrom') eq 'fork_join_related_entities'} - ${wf:conf('resumeFrom') eq 'join_all_entities'} - ${wf:conf('resumeFrom') eq 'adjancency_lists'} + ${wf:conf('resumeFrom') eq 'fork_join_all_entities'} ${wf:conf('resumeFrom') eq 'convert_to_xml'} ${wf:conf('resumeFrom') eq 'to_solr_index'} @@ -134,7 +133,7 @@ --inputRelationsPath${inputGraphRootPath}/relation --outputPath${workingDir}/relation - --relPartitions3000 + --relPartitions5000 @@ -171,7 +170,7 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/publication --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/publication @@ -198,7 +197,7 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/dataset --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/dataset @@ -225,7 +224,7 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/otherresearchproduct --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/otherresearchproduct @@ -252,7 +251,7 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/software --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/software @@ -279,7 +278,7 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/datasource --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/datasource @@ -306,7 +305,7 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/organization --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/organization @@ -333,19 +332,57 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/project --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/project - + - + + + + + + + + + + + yarn cluster - Join[entities.id = relatedEntity.source] + Join[publication.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=15360 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputEntityPath${inputGraphRootPath}/publication + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities/publication + --numPartitions30000 + + + + + + + + yarn + cluster + Join[dataset.id = relatedEntity.source] eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 dhp-graph-provision-${projectVersion}.jar @@ -359,21 +396,22 @@ --conf spark.sql.shuffle.partitions=7680 --conf spark.network.timeout=${sparkNetworkTimeout} - --inputGraphRootPath${inputGraphRootPath} + --inputEntityPath${inputGraphRootPath}/dataset + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset --inputRelatedEntitiesPath${workingDir}/join_partial - --outputPath${workingDir}/join_entities - --numPartitions12000 + --outputPath${workingDir}/join_entities/dataset + --numPartitions20000 - + - + yarn cluster - build_adjacency_lists - eu.dnetlib.dhp.oa.provision.AdjacencyListBuilderJob + Join[otherresearchproduct.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 dhp-graph-provision-${projectVersion}.jar --executor-cores=${sparkExecutorCoresForJoining} @@ -386,13 +424,130 @@ --conf spark.sql.shuffle.partitions=7680 --conf spark.network.timeout=${sparkNetworkTimeout} - --inputPath ${workingDir}/join_entities - --outputPath${workingDir}/joined + --inputEntityPath${inputGraphRootPath}/otherresearchproduct + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities/otherresearchproduct + --numPartitions10000 - + + + + yarn + cluster + Join[software.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputEntityPath${inputGraphRootPath}/software + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities/software + --numPartitions10000 + + + + + + + + yarn + cluster + Join[datasource.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputEntityPath${inputGraphRootPath}/datasource + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities/datasource + --numPartitions1000 + + + + + + + + yarn + cluster + Join[organization.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputEntityPath${inputGraphRootPath}/organization + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities/organization + --numPartitions20000 + + + + + + + + yarn + cluster + Join[project.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputEntityPath${inputGraphRootPath}/project + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities/project + --numPartitions10000 + + + + + + + yarn @@ -411,7 +566,7 @@ --conf spark.sql.shuffle.partitions=3840 --conf spark.network.timeout=${sparkNetworkTimeout} - --inputPath${workingDir}/joined + --inputPath${workingDir}/join_entities --outputPath${workingDir}/xml --isLookupUrl${isLookupUrl} --otherDsTypeId${otherDsTypeId} @@ -441,7 +596,7 @@ --conf spark.hadoop.mapreduce.reduce.speculative=false --inputPath${workingDir}/xml - --isLookupUrl ${isLookupUrl} + --isLookupUrl${isLookupUrl} --format${format} --batchSize${batchSize} diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java index 885c7c425..9a115bfa6 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java @@ -45,7 +45,6 @@ public class XmlRecordFactoryTest { assertNotNull(doc); - System.out.println(doc.asXML()); - + // TODO add assertions based of values extracted from the XML record } }