1
0
Fork 0

Merge branch 'provision_indexing'

This commit is contained in:
Claudio Atzori 2020-06-25 12:22:41 +02:00
commit 7df2712824
18 changed files with 625 additions and 573 deletions

View File

@ -271,6 +271,26 @@ object DoiBoostMappingUtil {
} }
def createSP(value: String, classId: String,className:String, schemeId: String, schemeName:String): StructuredProperty = {
val sp = new StructuredProperty
sp.setQualifier(createQualifier(classId,className, schemeId, schemeName))
sp.setValue(value)
sp
}
def createSP(value: String, classId: String,className:String, schemeId: String, schemeName:String, dataInfo: DataInfo): StructuredProperty = {
val sp = new StructuredProperty
sp.setQualifier(createQualifier(classId,className, schemeId, schemeName))
sp.setValue(value)
sp.setDataInfo(dataInfo)
sp
}
def createSP(value: String, classId: String, schemeId: String): StructuredProperty = { def createSP(value: String, classId: String, schemeId: String): StructuredProperty = {
val sp = new StructuredProperty val sp = new StructuredProperty
sp.setQualifier(createQualifier(classId, schemeId)) sp.setQualifier(createQualifier(classId, schemeId))
@ -279,6 +299,8 @@ object DoiBoostMappingUtil {
} }
def createSP(value: String, classId: String, schemeId: String, dataInfo: DataInfo): StructuredProperty = { def createSP(value: String, classId: String, schemeId: String, dataInfo: DataInfo): StructuredProperty = {
val sp = new StructuredProperty val sp = new StructuredProperty
sp.setQualifier(createQualifier(classId, schemeId)) sp.setQualifier(createQualifier(classId, schemeId))

View File

@ -129,16 +129,16 @@ case object ConversionUtil {
val fieldOfStudy = item._2 val fieldOfStudy = item._2
if (fieldOfStudy != null && fieldOfStudy.subjects != null && fieldOfStudy.subjects.nonEmpty) { if (fieldOfStudy != null && fieldOfStudy.subjects != null && fieldOfStudy.subjects.nonEmpty) {
val p: List[StructuredProperty] = fieldOfStudy.subjects.flatMap(s => { val p: List[StructuredProperty] = fieldOfStudy.subjects.flatMap(s => {
val s1 = createSP(s.DisplayName, "keyword", "dnet:subject_classification_typologies") val s1 = createSP(s.DisplayName, "MAG","Microsoft Academic Graph classification", "dnet:subject_classification_typologies", "dnet:subject_classification_typologies")
val di = DoiBoostMappingUtil.generateDataInfo(s.Score.toString) val di = DoiBoostMappingUtil.generateDataInfo(s.Score.toString)
var resList: List[StructuredProperty] = List(s1) var resList: List[StructuredProperty] = List(s1)
if (s.MainType.isDefined) { if (s.MainType.isDefined) {
val maintp = s.MainType.get val maintp = s.MainType.get
val s2 = createSP(s.MainType.get, "keyword", "dnet:subject_classification_typologies") val s2 = createSP(s.MainType.get, "MAG","Microsoft Academic Graph classification", "dnet:subject_classification_typologies", "dnet:subject_classification_typologies")
s2.setDataInfo(di) s2.setDataInfo(di)
resList = resList ::: List(s2) resList = resList ::: List(s2)
if (maintp.contains(".")) { if (maintp.contains(".")) {
val s3 = createSP(maintp.split("\\.").head, "keyword", "dnet:subject_classification_typologies") val s3 = createSP(maintp.split("\\.").head, "MAG","Microsoft Academic Graph classification", "dnet:subject_classification_typologies", "dnet:subject_classification_typologies")
s3.setDataInfo(di) s3.setDataInfo(di)
resList = resList ::: List(s3) resList = resList ::: List(s3)
} }

View File

@ -43,7 +43,7 @@ object SparkPreProcessMAG {
val distinctPaper: Dataset[MagPapers] = spark.createDataset(result) val distinctPaper: Dataset[MagPapers] = spark.createDataset(result)
distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct") distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct")
logger.info("Phase 6) Enrich Publication with description") logger.info("Phase 0) Enrich Publication with description")
val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract] val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract]
pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract") pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract")

View File

@ -32,7 +32,7 @@
<start to="GenerateActionSet"/> <start to="CreateDOIBoost"/>
<kill name="Kill"> <kill name="Kill">

View File

@ -18,6 +18,9 @@ class CrossrefMappingTest {
val mapper = new ObjectMapper() val mapper = new ObjectMapper()
@Test @Test
def testFunderRelationshipsMapping(): Unit = { def testFunderRelationshipsMapping(): Unit = {
val template = Source.fromInputStream(getClass.getResourceAsStream("article_funder_template.json")).mkString val template = Source.fromInputStream(getClass.getResourceAsStream("article_funder_template.json")).mkString
@ -58,6 +61,27 @@ class CrossrefMappingTest {
} }
@Test
def testOrcidID() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("orcid_data.json")).mkString
assertNotNull(json)
assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json)
assertTrue(resultList.nonEmpty)
val items = resultList.filter(p => p.isInstanceOf[Result])
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
items.foreach(p => println(mapper.writeValueAsString(p)))
}
@Test @Test
def testEmptyTitle() :Unit = { def testEmptyTitle() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("empty_title.json")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("empty_title.json")).mkString

View File

@ -0,0 +1,271 @@
{
"DOI":"10.1016/j.carbpol.2020.115930",
"issued":{
"date-parts":[
[
2020,
4
]
]
},
"published-print":{
"date-parts":[
[
2020,
4
]
]
},
"prefix":"10.1016",
"subject":[
"Organic Chemistry",
"Materials Chemistry",
"Polymers and Plastics"
],
"author":[
{
"affiliation":[
],
"given":"Lei",
"family":"Fang",
"sequence":"first"
},
{
"affiliation":[
],
"given":"Hua",
"family":"Lin",
"sequence":"additional"
},
{
"affiliation":[
],
"given":"Zhenfeng",
"family":"Wu",
"sequence":"additional"
},
{
"affiliation":[
],
"given":"Zhen",
"family":"Wang",
"sequence":"additional"
},
{
"affiliation":[
],
"given":"Xinxin",
"family":"Fan",
"sequence":"additional"
},
{
"affiliation":[
],
"given":"Ziting",
"family":"Cheng",
"sequence":"additional"
},
{
"affiliation":[
],
"given":"Xiaoya",
"family":"Hou",
"sequence":"additional"
},
{
"authenticated-orcid":false,
"given":"Daquan",
"family":"Chen",
"sequence":"additional",
"affiliation":[
],
"ORCID":"http://orcid.org/0000-0002-6796-0204"
}
],
"reference-count":41,
"ISSN":[
"0144-8617"
],
"assertion":[
{
"name":"publisher",
"value":"Elsevier",
"label":"This article is maintained by"
},
{
"name":"articletitle",
"value":"In vitro/vivo evaluation of novel mitochondrial targeting charge-reversal polysaccharide-based antitumor nanoparticle",
"label":"Article Title"
},
{
"name":"journaltitle",
"value":"Carbohydrate Polymers",
"label":"Journal Title"
},
{
"name":"articlelink",
"value":"https://doi.org/10.1016/j.carbpol.2020.115930",
"label":"CrossRef DOI link to publisher maintained version"
},
{
"name":"content_type",
"value":"article",
"label":"Content Type"
},
{
"name":"copyright",
"value":"\\u00a9 2020 Elsevier Ltd. All rights reserved.",
"label":"Copyright"
}
],
"member":"78",
"source":"Crossref",
"score":1.0,
"deposited":{
"timestamp":1584590965000,
"date-time":"2020-03-19T04:09:25Z",
"date-parts":[
[
2020,
3,
19
]
]
},
"indexed":{
"timestamp":1584592912467,
"date-time":"2020-03-19T04:41:52Z",
"date-parts":[
[
2020,
3,
19
]
]
},
"type":"journal-article",
"URL":"http://dx.doi.org/10.1016/j.carbpol.2020.115930",
"is-referenced-by-count":0,
"volume":"234",
"issn-type":[
{
"type":"print",
"value":"0144-8617"
}
],
"link":[
{
"URL":"https://api.elsevier.com/content/article/PII:S0144861720301041?httpAccept=text/xml",
"intended-application":"text-mining",
"content-version":"vor",
"content-type":"text/xml"
},
{
"URL":"https://api.elsevier.com/content/article/PII:S0144861720301041?httpAccept=text/plain",
"intended-application":"text-mining",
"content-version":"vor",
"content-type":"text/plain"
}
],
"update-policy":"http://dx.doi.org/10.1016/elsevier_cm_policy",
"references-count":41,
"short-container-title":[
"Carbohydrate Polymers"
],
"publisher":"Elsevier BV",
"content-domain":{
"domain":[
"elsevier.com",
"sciencedirect.com"
],
"crossmark-restriction":true
},
"language":"en",
"license":[
{
"URL":"https://www.elsevier.com/tdm/userlicense/1.0/",
"start":{
"timestamp":1585699200000,
"date-time":"2020-04-01T00:00:00Z",
"date-parts":[
[
2020,
4,
1
]
]
},
"content-version":"tdm",
"delay-in-days":0
}
],
"created":{
"timestamp":1581759678000,
"date-time":"2020-02-15T09:41:18Z",
"date-parts":[
[
2020,
2,
15
]
]
},
"title":[
"In vitro/vivo evaluation of novel mitochondrial targeting charge-reversal polysaccharide-based antitumor nanoparticle"
],
"alternative-id":[
"S0144861720301041"
],
"container-title":[
"Carbohydrate Polymers"
],
"funder":[
{
"doi-asserted-by":"publisher",
"DOI":"10.13039/501100007129",
"name":"Natural Science Foundation of Shandong Province",
"award":[
"ZR2019ZD24",
"ZR2019YQ30"
]
},
{
"doi-asserted-by":"publisher",
"DOI":"10.13039/100010449",
"name":"Ministry of Education, Libya",
"award":[
]
},
{
"doi-asserted-by":"publisher",
"DOI":"10.13039/501100012249",
"name":"Jiangxi University of Traditional Chinese Medicine",
"award":[
"TCM-0906"
]
},
{
"name":"Taishan Scholar Program",
"award":[
"qnts20161035"
]
},
{
"name":"Open fund project of Key Laboratory of Modern Preparation of TCM",
"award":[
]
}
],
"page":"115930",
"article-number":"115930"
}

View File

@ -147,6 +147,9 @@ public class CleanGraphSparkJob {
if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) { if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) {
i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY); i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY);
} }
if (Objects.isNull(i.getRefereed())) {
i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS));
}
} }
} }

View File

@ -25,9 +25,7 @@ import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper; import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2; import scala.Tuple2;
@ -109,11 +107,12 @@ public class CreateRelatedEntitiesJob_phase1 {
Class<E> clazz, Class<E> clazz,
String outputPath) { String outputPath) {
Dataset<Tuple2<String, SortableRelation>> relsByTarget = readPathRelation(spark, inputRelationsPath) Dataset<Tuple2<String, Relation>> relsByTarget = readPathRelation(spark, inputRelationsPath)
.filter("dataInfo.deletedbyinference == false") .filter("dataInfo.deletedbyinference == false")
.map( .map(
(MapFunction<SortableRelation, Tuple2<String, SortableRelation>>) r -> new Tuple2<>(r.getTarget(), r), (MapFunction<Relation, Tuple2<String, Relation>>) r -> new Tuple2<>(r.getTarget(),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(SortableRelation.class))) r),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class)))
.cache(); .cache();
Dataset<Tuple2<String, RelatedEntity>> entities = readPathEntity(spark, inputEntityPath, clazz) Dataset<Tuple2<String, RelatedEntity>> entities = readPathEntity(spark, inputEntityPath, clazz)
@ -129,7 +128,7 @@ public class CreateRelatedEntitiesJob_phase1 {
relsByTarget relsByTarget
.joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner") .joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner")
.map( .map(
(MapFunction<Tuple2<Tuple2<String, SortableRelation>, Tuple2<String, RelatedEntity>>, RelatedEntityWrapper>) t -> new RelatedEntityWrapper( (MapFunction<Tuple2<Tuple2<String, Relation>, Tuple2<String, RelatedEntity>>, RelatedEntityWrapper>) t -> new RelatedEntityWrapper(
t._1()._2(), t._2()._2()), t._1()._2(), t._2()._2()),
Encoders.kryo(RelatedEntityWrapper.class)) Encoders.kryo(RelatedEntityWrapper.class))
.write() .write()
@ -232,11 +231,11 @@ public class CreateRelatedEntitiesJob_phase1 {
* @param relationPath * @param relationPath
* @return the Dataset<SortableRelation> containing all the relationships * @return the Dataset<SortableRelation> containing all the relationships
*/ */
private static Dataset<SortableRelation> readPathRelation( private static Dataset<Relation> readPathRelation(
SparkSession spark, final String relationPath) { SparkSession spark, final String relationPath) {
log.info("Reading relations from: {}", relationPath); log.info("Reading relations from: {}", relationPath);
return spark.read().load(relationPath).as(Encoders.bean(SortableRelation.class)); return spark.read().load(relationPath).as(Encoders.bean(Relation.class));
} }
private static void removeOutputDir(SparkSession spark, String path) { private static void removeOutputDir(SparkSession spark, String path) {

View File

@ -1,39 +1,33 @@
package eu.dnetlib.dhp.oa.provision; package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Splitter;
import java.util.*; import com.google.common.collect.Iterables;
import java.util.function.Function; import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey;
import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
import eu.dnetlib.dhp.schema.oaf.Relation;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.rdd.RDD; import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.clearspring.analytics.util.Lists;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
import scala.Function1;
import scala.Tuple2; import scala.Tuple2;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
/** /**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The
* operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, and * operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, and
@ -133,22 +127,35 @@ public class PrepareRelationsJob {
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int maxRelations, SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int maxRelations,
int relPartitions) { int relPartitions) {
RDD<SortableRelation> cappedRels = readPathRelationRDD(spark, inputRelationsPath) // group by SOURCE and apply limit
.repartition(relPartitions) RDD<Relation> bySource = readPathRelationRDD(spark, inputRelationsPath)
.filter(rel -> !rel.getDataInfo().getDeletedbyinference()) .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false)
.filter(rel -> !relationFilter.contains(rel.getRelClass())) .filter(rel -> relationFilter.contains(rel.getRelClass()) == false)
// group by SOURCE and apply limit .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getSource()), r))
.mapToPair(rel -> new Tuple2<>(rel.getSource(), rel)) .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
.groupByKey(new RelationPartitioner(relPartitions)) .groupBy(Tuple2::_1)
.flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator()) .map(Tuple2::_2)
// group by TARGET and apply limit .map(t -> Iterables.limit(t, maxRelations))
.mapToPair(rel -> new Tuple2<>(rel.getTarget(), rel)) .flatMap(Iterable::iterator)
.groupByKey(new RelationPartitioner(relPartitions)) .map(Tuple2::_2)
.flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator()) .rdd();
// group by TARGET and apply limit
RDD<Relation> byTarget = readPathRelationRDD(spark, inputRelationsPath)
.filter(rel -> rel.getDataInfo().getDeletedbyinference() == false)
.filter(rel -> relationFilter.contains(rel.getRelClass()) == false)
.mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getTarget()), r))
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
.groupBy(Tuple2::_1)
.map(Tuple2::_2)
.map(t -> Iterables.limit(t, maxRelations))
.flatMap(Iterable::iterator)
.map(Tuple2::_2)
.rdd(); .rdd();
spark spark
.createDataset(cappedRels, Encoders.bean(SortableRelation.class)) .createDataset(bySource.union(byTarget), Encoders.bean(Relation.class))
.repartition(relPartitions)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.parquet(outputPath); .parquet(outputPath);
@ -162,10 +169,10 @@ public class PrepareRelationsJob {
* @param inputPath * @param inputPath
* @return the JavaRDD<SortableRelation> containing all the relationships * @return the JavaRDD<SortableRelation> containing all the relationships
*/ */
private static JavaRDD<SortableRelation> readPathRelationRDD( private static JavaRDD<Relation> readPathRelationRDD(
SparkSession spark, final String inputPath) { SparkSession spark, final String inputPath) {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, SortableRelation.class)); return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, Relation.class));
} }
private static void removeOutputDir(SparkSession spark, String path) { private static void removeOutputDir(SparkSession spark, String path) {

View File

@ -19,7 +19,7 @@ public class ProvisionModelSupport {
RelatedEntityWrapper.class, RelatedEntityWrapper.class,
JoinedEntity.class, JoinedEntity.class,
RelatedEntity.class, RelatedEntity.class,
SortableRelation.class)); SortableRelationKey.class));
return modelClasses.toArray(new Class[] {}); return modelClasses.toArray(new Class[] {});
} }
} }

View File

@ -5,28 +5,30 @@ import java.io.Serializable;
import com.google.common.base.Objects; import com.google.common.base.Objects;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class RelatedEntityWrapper implements Serializable { public class RelatedEntityWrapper implements Serializable {
private SortableRelation relation; private Relation relation;
private RelatedEntity target; private RelatedEntity target;
public RelatedEntityWrapper() { public RelatedEntityWrapper() {
} }
public RelatedEntityWrapper(SortableRelation relation, RelatedEntity target) { public RelatedEntityWrapper(Relation relation, RelatedEntity target) {
this(null, relation, target); this(null, relation, target);
} }
public RelatedEntityWrapper(TypedRow entity, SortableRelation relation, RelatedEntity target) { public RelatedEntityWrapper(TypedRow entity, Relation relation, RelatedEntity target) {
this.relation = relation; this.relation = relation;
this.target = target; this.target = target;
} }
public SortableRelation getRelation() { public Relation getRelation() {
return relation; return relation;
} }
public void setRelation(SortableRelation relation) { public void setRelation(Relation relation) {
this.relation = relation; this.relation = relation;
} }

View File

@ -1,38 +0,0 @@
package eu.dnetlib.dhp.oa.provision.model;
import java.io.Serializable;
import java.util.Map;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class SortableRelation extends Relation implements Comparable<Relation>, Serializable {
private static final Map<String, Integer> weights = Maps.newHashMap();
static {
weights.put("outcome", 0);
weights.put("supplement", 1);
weights.put("affiliation", 2);
weights.put("relationship", 3);
weights.put("publicationDataset", 4);
weights.put("similarity", 5);
weights.put("provision", 6);
weights.put("participation", 7);
weights.put("dedup", 8);
}
@Override
public int compareTo(Relation o) {
return ComparisonChain
.start()
.compare(weights.get(getSubRelType()), weights.get(o.getSubRelType()))
.compare(getSource(), o.getSource())
.compare(getTarget(), o.getTarget())
.result();
}
}

View File

@ -0,0 +1,90 @@
package eu.dnetlib.dhp.oa.provision.model;
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import com.google.common.base.Objects;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class SortableRelationKey implements Comparable<SortableRelationKey>, Serializable {
private static final Map<String, Integer> weights = Maps.newHashMap();
static {
weights.put("outcome", 0);
weights.put("supplement", 1);
weights.put("review", 2);
weights.put("citation", 3);
weights.put("affiliation", 4);
weights.put("relationship", 5);
weights.put("publicationDataset", 6);
weights.put("similarity", 7);
weights.put("provision", 8);
weights.put("participation", 9);
weights.put("dedup", 10);
}
private static final long serialVersionUID = 3232323;
private String groupingKey;
private String subRelType;
public static SortableRelationKey create(Relation r, String groupingKey) {
SortableRelationKey sr = new SortableRelationKey();
sr.setGroupingKey(groupingKey);
sr.setSubRelType(r.getSubRelType());
return sr;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
SortableRelationKey that = (SortableRelationKey) o;
return getGroupingKey().equals(that.getGroupingKey());
}
@Override
public int hashCode() {
return Objects.hashCode(getGroupingKey());
}
@Override
public int compareTo(SortableRelationKey o) {
return ComparisonChain
.start()
.compare(getGroupingKey(), o.getGroupingKey())
.compare(getWeight(this), getWeight(o))
.result();
}
private Integer getWeight(SortableRelationKey o) {
return Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE);
}
public String getSubRelType() {
return subRelType;
}
public void setSubRelType(String subRelType) {
this.subRelType = subRelType;
}
public String getGroupingKey() {
return groupingKey;
}
public void setGroupingKey(String groupingKey) {
this.groupingKey = groupingKey;
}
}

View File

@ -4,12 +4,16 @@ package eu.dnetlib.dhp.oa.provision.utils;
import org.apache.spark.Partitioner; import org.apache.spark.Partitioner;
import org.apache.spark.util.Utils; import org.apache.spark.util.Utils;
import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey;
/** /**
* Used in combination with SortableRelationKey, allows to partition the records by source id, therefore allowing to * Used in combination with SortableRelationKey, allows to partition the records by source id, therefore allowing to
* sort relations sharing the same source id by the ordering defined in SortableRelationKey. * sort relations sharing the same source id by the ordering defined in SortableRelationKey.
*/ */
public class RelationPartitioner extends Partitioner { public class RelationPartitioner extends Partitioner {
private static final long serialVersionUID = 343434456L;
private final int numPartitions; private final int numPartitions;
public RelationPartitioner(int numPartitions) { public RelationPartitioner(int numPartitions) {
@ -23,8 +27,18 @@ public class RelationPartitioner extends Partitioner {
@Override @Override
public int getPartition(Object key) { public int getPartition(Object key) {
String partitionKey = (String) key; SortableRelationKey partitionKey = (SortableRelationKey) key;
return Utils.nonNegativeMod(partitionKey.hashCode(), numPartitions()); return Utils.nonNegativeMod(partitionKey.getGroupingKey().hashCode(), numPartitions());
}
@Override
public boolean equals(Object obj) {
if (obj instanceof RelationPartitioner) {
RelationPartitioner p = (RelationPartitioner) obj;
if (p.numPartitions() == numPartitions())
return true;
}
return false;
} }
} }

View File

@ -276,7 +276,7 @@ public class XmlRecordFactory implements Serializable {
pidType, pidType,
pidValue pidValue
.toLowerCase() .toLowerCase()
.replaceAll("orcid", ""))); .replaceAll("^.*orcid\\.org\\/", "")));
} }
} }
}); });

View File

@ -97,18 +97,7 @@
</configuration> </configuration>
</global> </global>
<start to="resume_from"/> <start to="prepare_relations"/>
<decision name="resume_from">
<switch>
<case to="prepare_relations">${wf:conf('resumeFrom') eq 'prepare_relations'}</case>
<case to="fork_join_related_entities">${wf:conf('resumeFrom') eq 'fork_join_related_entities'}</case>
<case to="fork_join_all_entities">${wf:conf('resumeFrom') eq 'fork_join_all_entities'}</case>
<case to="convert_to_xml">${wf:conf('resumeFrom') eq 'convert_to_xml'}</case>
<case to="to_solr_index">${wf:conf('resumeFrom') eq 'to_solr_index'}</case>
<default to="prepare_relations"/>
</switch>
</decision>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -135,475 +124,12 @@
<arg>--outputPath</arg><arg>${workingDir}/relation</arg> <arg>--outputPath</arg><arg>${workingDir}/relation</arg>
<arg>--relPartitions</arg><arg>5000</arg> <arg>--relPartitions</arg><arg>5000</arg>
</spark> </spark>
<ok to="fork_join_related_entities"/>
<error to="Kill"/>
</action>
<fork name="fork_join_related_entities">
<path start="join_relation_publication"/>
<path start="join_relation_dataset"/>
<path start="join_relation_otherresearchproduct"/>
<path start="join_relation_software"/>
<path start="join_relation_datasource"/>
<path start="join_relation_organization"/>
<path start="join_relation_project"/>
</fork>
<action name="join_relation_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = publication.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/publication</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
<action name="join_relation_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = dataset.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/dataset</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
<action name="join_relation_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = otherresearchproduct.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/otherresearchproduct</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
<action name="join_relation_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = software.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/software</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
<action name="join_relation_datasource">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = datasource.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/datasource</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/datasource</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
<action name="join_relation_organization">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = organization.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/organization</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
<action name="join_relation_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = project.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/project</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
<join name="wait_joins" to="fork_join_all_entities"/>
<fork name="fork_join_all_entities">
<path start="join_publication_relations"/>
<path start="join_dataset_relations"/>
<path start="join_otherresearchproduct_relations"/>
<path start="join_software_relations"/>
<path start="join_datasource_relations"/>
<path start="join_organization_relations"/>
<path start="join_project_relations"/>
</fork>
<action name="join_publication_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[publication.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15360
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/publication</arg>
<arg>--numPartitions</arg><arg>30000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_dataset_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[dataset.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/dataset</arg>
<arg>--numPartitions</arg><arg>20000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_otherresearchproduct_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[otherresearchproduct.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/otherresearchproduct</arg>
<arg>--numPartitions</arg><arg>10000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_software_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[software.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/software</arg>
<arg>--numPartitions</arg><arg>10000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_datasource_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[datasource.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/datasource</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/datasource</arg>
<arg>--numPartitions</arg><arg>1000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_organization_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[organization.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/organization</arg>
<arg>--numPartitions</arg><arg>20000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_project_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[project.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/project</arg>
<arg>--numPartitions</arg><arg>10000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<join name="wait_join_phase2" to="convert_to_xml"/>
<action name="convert_to_xml">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>convert_to_xml</name>
<class>eu.dnetlib.dhp.oa.provision.XmlConverterJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/join_entities</arg>
<arg>--outputPath</arg><arg>${workingDir}/xml</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--otherDsTypeId</arg><arg>${otherDsTypeId}</arg>
</spark>
<ok to="to_solr_index"/>
<error to="Kill"/>
</action>
<action name="to_solr_index">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>to_solr_index</name>
<class>eu.dnetlib.dhp.oa.provision.XmlIndexingJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemoryForIndexing}
--driver-memory=${sparkDriverMemoryForIndexing}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForIndexing}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/xml</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--format</arg><arg>${format}</arg>
<arg>--batchSize</arg><arg>${batchSize}</arg>
</spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -0,0 +1,42 @@
package eu.dnetlib.dhp.oa.provision;
import java.io.IOException;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class SortableRelationKeyTest {
@Test
public void doTesSorting() throws IOException {
final ObjectMapper mapper = new ObjectMapper();
final String json = IOUtils.toString(this.getClass().getResourceAsStream("relations.json"));
final List<Relation> relations = mapper.readValue(json, new TypeReference<List<Relation>>() {
});
relations
.stream()
.map(r -> SortableRelationKey.create(r, r.getSource()))
.sorted()
.forEach(
it -> {
try {
System.out.println(mapper.writeValueAsString(it));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
});
}
}

View File

@ -0,0 +1,90 @@
[
{
"collectedfrom": [],
"dataInfo": {
"deletedbyinference": false,
"inferred": false,
"invisible": false,
"provenanceaction": {
"classid": "sysimport:crosswalk:entityregistry",
"classname": "Harvested",
"schemeid": "dnet:provenanceActions",
"schemename": "dnet:provenanceActions"
},
"trust": "0.9"
},
"lastupdatetimestamp": 1592688952862,
"properties": [],
"relClass": "hasAuthorInstitution",
"relType": "resultOrganization",
"source": "1",
"subRelType": "affiliation",
"target": "2"
},
{
"collectedfrom": [],
"dataInfo": {
"deletedbyinference": false,
"inferred": false,
"invisible": false,
"provenanceaction": {
"classid": "sysimport:crosswalk:entityregistry",
"classname": "Harvested",
"schemeid": "dnet:provenanceActions",
"schemename": "dnet:provenanceActions"
},
"trust": "0.9"
},
"lastupdatetimestamp": 1592688952862,
"properties": [],
"relClass": "isAuthorInstitutionOf",
"relType": "resultOrganization",
"source": "2",
"subRelType": "affiliation",
"target": "1"
},
{
"collectedfrom": [],
"dataInfo": {
"deletedbyinference": false,
"inferred": false,
"invisible": false,
"provenanceaction": {
"classid": "sysimport:crosswalk:entityregistry",
"classname": "Harvested",
"schemeid": "dnet:provenanceActions",
"schemename": "dnet:provenanceActions"
},
"trust": "0.9"
},
"lastupdatetimestamp": 1592688952862,
"properties": [],
"relClass": "isProducedBy",
"relType": "resultProject",
"source": "1",
"subRelType": "outcome",
"target": "2"
},
{
"collectedfrom": [],
"dataInfo": {
"deletedbyinference": false,
"inferred": false,
"invisible": false,
"provenanceaction": {
"classid": "sysimport:crosswalk:entityregistry",
"classname": "Harvested",
"schemeid": "dnet:provenanceActions",
"schemename": "dnet:provenanceActions"
},
"trust": "0.9"
},
"lastupdatetimestamp": 1592688952862,
"properties": [],
"relClass": "produces",
"relType": "resultProject",
"source": "2",
"subRelType": "outcome",
"target": "1"
}
]