From bc7cfd597581ad2c864468010c7e7758364f6fb1 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 2 Mar 2020 17:03:07 +0100 Subject: [PATCH] indexing workflow WIP: fixed projects fundingtree xml conversion, prioritized links between results and projects when limiting them to 100 in the join procedure --- .../job-override.properties | 8 +- .../eu/dnetlib/dhp/graph/GraphJoiner.java | 140 +++++++++++------- .../dhp/graph/SparkXmlRecordBuilderJob.java | 4 +- .../dhp/graph/model/SortableRelationKey.java | 99 ++++++++++++- .../eu/dnetlib/dhp/graph/model/TypedRow.java | 31 ++++ .../dhp/graph/utils/GraphMappingUtils.java | 5 + .../dhp/graph/utils/RelationPartitioner.java | 27 +++- .../dhp/graph/utils/XmlRecordFactory.java | 40 ++++- .../input_params_build_adjacency_lists.json | 9 +- .../dnetlib/dhp/graph/oozie_app/workflow.xml | 2 + .../eu/dnetlib/dhp/graph/GraphJoinerTest.java | 34 +++++ 11 files changed, 331 insertions(+), 68 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/job-override.properties b/dhp-workflows/dhp-graph-provision/job-override.properties index b5ab07982..68816c224 100644 --- a/dhp-workflows/dhp-graph-provision/job-override.properties +++ b/dhp-workflows/dhp-graph-provision/job-override.properties @@ -1,10 +1,12 @@ -sparkDriverMemory=8G -sparkExecutorMemory=8G +sparkDriverMemory=10G +sparkExecutorMemory=15G #isLookupUrl=http://services.openaire.eu:8280/is/services/isLookUp isLookupUrl=http://beta.services.openaire.eu:8280/is/services/isLookUp?wsdl sourcePath=/tmp/db_openaireplus_services.export_dhp.2020.02.03 outputPath=/tmp/openaire_provision format=TMF batchSize=2000 +sparkExecutorCoresForJoining=128 sparkExecutorCoresForIndexing=64 -reuseRecords=true \ No newline at end of file +reuseRecords=false +otherDsTypeId=scholarcomminfra, infospace, pubsrepository::mock, entityregistry, entityregistry::projects, entityregistry::repositories, websource \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java index 062c8886b..246613ce4 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java @@ -1,31 +1,32 @@ package eu.dnetlib.dhp.graph; -import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.JsonPath; import eu.dnetlib.dhp.graph.model.*; import eu.dnetlib.dhp.graph.utils.ContextMapper; import eu.dnetlib.dhp.graph.utils.GraphMappingUtils; +import eu.dnetlib.dhp.graph.utils.RelationPartitioner; import eu.dnetlib.dhp.graph.utils.XmlRecordFactory; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.LongAccumulator; import scala.Tuple2; import java.io.IOException; import java.io.Serializable; -import java.util.HashSet; -import java.util.List; -import java.util.stream.Collectors; +import java.util.Map; import static eu.dnetlib.dhp.graph.utils.GraphMappingUtils.asRelatedEntity; @@ -49,6 +50,8 @@ import static eu.dnetlib.dhp.graph.utils.GraphMappingUtils.asRelatedEntity; */ public class GraphJoiner implements Serializable { + private Map accumulators = Maps.newHashMap(); + public static final int MAX_RELS = 100; public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd"; @@ -61,24 +64,30 @@ public class GraphJoiner implements Serializable { private String outPath; - public GraphJoiner(SparkSession spark, ContextMapper contextMapper, String inputPath, String outPath) { + private String otherDsTypeId; + + public GraphJoiner(SparkSession spark, ContextMapper contextMapper, String otherDsTypeId, String inputPath, String outPath) { this.spark = spark; this.contextMapper = contextMapper; + this.otherDsTypeId = otherDsTypeId; this.inputPath = inputPath; this.outPath = outPath; + + final SparkContext sc = spark.sparkContext(); + prepareAccumulators(sc); } public GraphJoiner adjacencyLists() { - final JavaSparkContext sc = new JavaSparkContext(getSpark().sparkContext()); + final JavaSparkContext jsc = new JavaSparkContext(getSpark().sparkContext()); // read each entity - JavaPairRDD datasource = readPathEntity(sc, getInputPath(), "datasource"); - JavaPairRDD organization = readPathEntity(sc, getInputPath(), "organization"); - JavaPairRDD project = readPathEntity(sc, getInputPath(), "project"); - JavaPairRDD dataset = readPathEntity(sc, getInputPath(), "dataset"); - JavaPairRDD otherresearchproduct = readPathEntity(sc, getInputPath(), "otherresearchproduct"); - JavaPairRDD software = readPathEntity(sc, getInputPath(), "software"); - JavaPairRDD publication = readPathEntity(sc, getInputPath(), "publication"); + JavaPairRDD datasource = readPathEntity(jsc, getInputPath(), "datasource"); + JavaPairRDD organization = readPathEntity(jsc, getInputPath(), "organization"); + JavaPairRDD project = readPathEntity(jsc, getInputPath(), "project"); + JavaPairRDD dataset = readPathEntity(jsc, getInputPath(), "dataset"); + JavaPairRDD otherresearchproduct = readPathEntity(jsc, getInputPath(), "otherresearchproduct"); + JavaPairRDD software = readPathEntity(jsc, getInputPath(), "software"); + JavaPairRDD publication = readPathEntity(jsc, getInputPath(), "publication"); // create the union between all the entities final String entitiesPath = getOutPath() + "/entities"; @@ -93,31 +102,43 @@ public class GraphJoiner implements Serializable { .map(GraphMappingUtils::serialize) .saveAsTextFile(entitiesPath, GzipCodec.class); - JavaPairRDD entities = sc.textFile(entitiesPath) + JavaPairRDD entities = jsc.textFile(entitiesPath) .map(t -> new ObjectMapper().readValue(t, EntityRelEntity.class)) .mapToPair(t -> new Tuple2<>(t.getSource().getSourceId(), t)); + final String relationPath = getOutPath() + "/relation"; // reads the relationships - final JavaPairRDD relation = readPathRelation(sc, getInputPath()) - .filter(r -> !r.getDeleted()) //only consider those that are not virtually deleted + final JavaPairRDD rels = readPathRelation(jsc, getInputPath()) + .filter(rel -> !rel.getDeleted()) //only consider those that are not virtually deleted .map(p -> new EntityRelEntity().setRelation(p)) - .mapToPair(p -> new Tuple2<>(p.getRelation().getSourceId(), p)) - .groupByKey() + .mapToPair(p -> new Tuple2<>(SortableRelationKey.from(p), p)); + rels + .groupByKey(new RelationPartitioner(rels.getNumPartitions())) .map(p -> Iterables.limit(p._2(), MAX_RELS)) .flatMap(p -> p.iterator()) + .map(s -> new ObjectMapper().writeValueAsString(s)) + .saveAsTextFile(relationPath, GzipCodec.class); + + final JavaPairRDD relation = jsc.textFile(relationPath) + .map(s -> new ObjectMapper().readValue(s, EntityRelEntity.class)) .mapToPair(p -> new Tuple2<>(p.getRelation().getTargetId(), p)); - //final String bySource = getOutPath() + "/1_join_by_target"; - JavaPairRDD bySource = relation + final String bySourcePath = getOutPath() + "/join_by_source"; + relation .join(entities .filter(e -> !e._2().getSource().getDeleted()) .mapToPair(e -> new Tuple2<>(e._1(), asRelatedEntity(e._2())))) .map(s -> new EntityRelEntity() .setRelation(s._2()._1().getRelation()) .setTarget(s._2()._2().getSource())) + .map(j -> new ObjectMapper().writeValueAsString(j)) + .saveAsTextFile(bySourcePath, GzipCodec.class); + + JavaPairRDD bySource = jsc.textFile(bySourcePath) + .map(e -> getObjectMapper().readValue(e, EntityRelEntity.class)) .mapToPair(t -> new Tuple2<>(t.getRelation().getSourceId(), t)); - final XmlRecordFactory recordFactory = new XmlRecordFactory(contextMapper, false, schemaLocation, new HashSet<>()); + final XmlRecordFactory recordFactory = new XmlRecordFactory(accumulators, contextMapper, false, schemaLocation, otherDsTypeId); entities .union(bySource) .groupByKey() // by source id @@ -130,20 +151,6 @@ public class GraphJoiner implements Serializable { return this; } - public GraphJoiner asXML() { - final JavaSparkContext sc = new JavaSparkContext(getSpark().sparkContext()); - final XmlRecordFactory recordFactory = new XmlRecordFactory(contextMapper, true, "", new HashSet<>()); - final ObjectMapper mapper = new ObjectMapper(); - - final String joinedEntitiesPath = getOutPath() + "/1_joined_entities"; - sc.textFile(joinedEntitiesPath) - .map(s -> mapper.readValue(s, JoinedEntity.class)) - .mapToPair(je -> new Tuple2<>(new Text(je.getEntity().getId()), new Text(recordFactory.build(je)))) - .saveAsHadoopFile(getOutPath() + "/2_xml", Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); - - return this; - } - public SparkSession getSpark() { return spark; } @@ -158,24 +165,23 @@ public class GraphJoiner implements Serializable { // HELPERS - private OafEntity parseOaf(final String json, final String type) { - final ObjectMapper o = new ObjectMapper(); + private OafEntity parseOaf(final String json, final String type, final ObjectMapper mapper) { try { switch (GraphMappingUtils.EntityType.valueOf(type)) { case publication: - return o.readValue(json, Publication.class); + return mapper.readValue(json, Publication.class); case dataset: - return o.readValue(json, Dataset.class); + return mapper.readValue(json, Dataset.class); case otherresearchproduct: - return o.readValue(json, OtherResearchProduct.class); + return mapper.readValue(json, OtherResearchProduct.class); case software: - return o.readValue(json, Software.class); + return mapper.readValue(json, Software.class); case datasource: - return o.readValue(json, Datasource.class); + return mapper.readValue(json, Datasource.class); case organization: - return o.readValue(json, Organization.class); + return mapper.readValue(json, Organization.class); case project: - return o.readValue(json, Project.class); + return mapper.readValue(json, Project.class); default: throw new IllegalArgumentException("invalid type: " + type); } @@ -185,26 +191,26 @@ public class GraphJoiner implements Serializable { } private JoinedEntity toJoinedEntity(Tuple2> p) { - final ObjectMapper o = new ObjectMapper(); + final ObjectMapper mapper = getObjectMapper(); final JoinedEntity j = new JoinedEntity(); - final Links links2 = new Links(); + final Links links = new Links(); for(EntityRelEntity rel : p._2()) { if (rel.hasMainEntity() & j.getEntity() == null) { j.setType(rel.getSource().getType()); - j.setEntity(parseOaf(rel.getSource().getOaf(), rel.getSource().getType())); + j.setEntity(parseOaf(rel.getSource().getOaf(), rel.getSource().getType(), mapper)); } if (rel.hasRelatedEntity()) { try { - links2.add( + links.add( new eu.dnetlib.dhp.graph.model.Tuple2() - .setRelation(o.readValue(rel.getRelation().getOaf(), Relation.class)) - .setRelatedEntity(o.readValue(rel.getTarget().getOaf(), RelatedEntity.class))); + .setRelation(mapper.readValue(rel.getRelation().getOaf(), Relation.class)) + .setRelatedEntity(mapper.readValue(rel.getTarget().getOaf(), RelatedEntity.class))); } catch (IOException e) { throw new IllegalArgumentException(e); } } } - j.setLinks(links2); + j.setLinks(links); if (j.getEntity() == null) { throw new IllegalStateException("missing main entity on '" + p._1() + "'"); } @@ -250,8 +256,38 @@ public class GraphJoiner implements Serializable { .setTargetId(json.read("$.target")) .setDeleted(json.read("$.dataInfo.deletedbyinference")) .setType("relation") + .setRelType("$.relType") + .setSubRelType("$.subRelType") + .setRelClass("$.relClass") .setOaf(s); }); } + private ObjectMapper getObjectMapper() { + return new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + private void prepareAccumulators(SparkContext sc) { + accumulators.put("resultResult_similarity_isAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_isAmongTopNSimilarDocuments")); + accumulators.put("resultResult_similarity_hasAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_hasAmongTopNSimilarDocuments")); + accumulators.put("resultResult_supplement_isSupplementTo", sc.longAccumulator("resultResult_supplement_isSupplementTo")); + accumulators.put("resultResult_supplement_isSupplementedBy", sc.longAccumulator("resultResult_supplement_isSupplementedBy")); + accumulators.put("resultResult_dedup_isMergedIn", sc.longAccumulator("resultResult_dedup_isMergedIn")); + accumulators.put("resultResult_dedup_merges", sc.longAccumulator("resultResult_dedup_merges")); + + accumulators.put("resultResult_publicationDataset_isRelatedTo", sc.longAccumulator("resultResult_publicationDataset_isRelatedTo")); + accumulators.put("resultResult_relationship_isRelatedTo", sc.longAccumulator("resultResult_relationship_isRelatedTo")); + accumulators.put("resultProject_outcome_isProducedBy", sc.longAccumulator("resultProject_outcome_isProducedBy")); + accumulators.put("resultProject_outcome_produces", sc.longAccumulator("resultProject_outcome_produces")); + accumulators.put("resultOrganization_affiliation_isAuthorInstitutionOf", sc.longAccumulator("resultOrganization_affiliation_isAuthorInstitutionOf")); + + accumulators.put("resultOrganization_affiliation_hasAuthorInstitution", sc.longAccumulator("resultOrganization_affiliation_hasAuthorInstitution")); + accumulators.put("projectOrganization_participation_hasParticipant", sc.longAccumulator("projectOrganization_participation_hasParticipant")); + accumulators.put("projectOrganization_participation_isParticipant", sc.longAccumulator("projectOrganization_participation_isParticipant")); + accumulators.put("organizationOrganization_dedup_isMergedIn", sc.longAccumulator("organizationOrganization_dedup_isMergedIn")); + accumulators.put("organizationOrganization_dedup_merges", sc.longAccumulator("resultProject_outcome_produces")); + accumulators.put("datasourceOrganization_provision_isProvidedBy", sc.longAccumulator("datasourceOrganization_provision_isProvidedBy")); + accumulators.put("datasourceOrganization_provision_provides", sc.longAccumulator("datasourceOrganization_provision_provides")); + } + } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlRecordBuilderJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlRecordBuilderJob.java index 0b2180f19..5fa3e6385 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlRecordBuilderJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlRecordBuilderJob.java @@ -24,6 +24,7 @@ public class SparkXmlRecordBuilderJob { final String inputPath = parser.get("sourcePath"); final String outputPath = parser.get("outputPath"); final String isLookupUrl = parser.get("isLookupUrl"); + final String otherDsTypeId = parser.get("otherDsTypeId"); final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); if (fs.exists(new Path(outputPath))) { @@ -31,8 +32,9 @@ public class SparkXmlRecordBuilderJob { fs.mkdirs(new Path(outputPath)); } - new GraphJoiner(spark, ContextMapper.fromIS(isLookupUrl), inputPath, outputPath) + new GraphJoiner(spark, ContextMapper.fromIS(isLookupUrl), otherDsTypeId, inputPath, outputPath) .adjacencyLists(); + //.asXML(); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/SortableRelationKey.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/SortableRelationKey.java index ea56c6d12..6bfbab547 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/SortableRelationKey.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/SortableRelationKey.java @@ -1,4 +1,99 @@ -package eu.dnetlib.dhp.graph; +package eu.dnetlib.dhp.graph.model; + +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Maps; + +import java.io.Serializable; +import java.util.Map; + +/** + * Allows to sort relationships according to the priority defined in weights map. + */ +public class SortableRelationKey implements Comparable, Serializable { + + private String sourceId; + private String targetId; + + private String relType; + private String subRelType; + private String relClass; + + private final static Map weights = Maps.newHashMap(); + + static { + weights.put("outcome", 0); + weights.put("supplement", 1); + weights.put("publicationDataset", 2); + weights.put("relationship", 3); + weights.put("similarity", 4); + weights.put("affiliation", 5); + + weights.put("provision", 6); + weights.put("participation", 7); + weights.put("dedup", 8); + } + + public static SortableRelationKey from(final EntityRelEntity e) { + return new SortableRelationKey() + .setSourceId(e.getRelation().getSourceId()) + .setTargetId(e.getRelation().getTargetId()) + .setRelType(e.getRelation().getRelType()) + .setSubRelType(e.getRelation().getSubRelType()) + .setRelClass(e.getRelation().getRelClass()); + } + + public String getSourceId() { + return sourceId; + } + + public SortableRelationKey setSourceId(String sourceId) { + this.sourceId = sourceId; + return this; + } + + public String getTargetId() { + return targetId; + } + + public SortableRelationKey setTargetId(String targetId) { + this.targetId = targetId; + return this; + } + + public String getRelType() { + return relType; + } + + public SortableRelationKey setRelType(String relType) { + this.relType = relType; + return this; + } + + public String getSubRelType() { + return subRelType; + } + + public SortableRelationKey setSubRelType(String subRelType) { + this.subRelType = subRelType; + return this; + } + + public String getRelClass() { + return relClass; + } + + public SortableRelationKey setRelClass(String relClass) { + this.relClass = relClass; + return this; + } + + @Override + public int compareTo(SortableRelationKey o) { + return ComparisonChain.start() + .compare(weights.get(getSubRelType()), weights.get(o.getSubRelType())) + .compare(getSourceId(), o.getSourceId()) + .compare(getTargetId(), o.getTargetId()) + .result(); + } -public class SortableRelationKey { } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/TypedRow.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/TypedRow.java index 3651e28c9..8205c38ef 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/TypedRow.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/TypedRow.java @@ -12,6 +12,10 @@ public class TypedRow implements Serializable { private String type; + private String relType; + private String subRelType; + private String relClass; + private String oaf; public String getSourceId() { @@ -50,6 +54,33 @@ public class TypedRow implements Serializable { return this; } + public String getRelType() { + return relType; + } + + public TypedRow setRelType(String relType) { + this.relType = relType; + return this; + } + + public String getSubRelType() { + return subRelType; + } + + public TypedRow setSubRelType(String subRelType) { + this.subRelType = subRelType; + return this; + } + + public String getRelClass() { + return relClass; + } + + public TypedRow setRelClass(String relClass) { + this.relClass = relClass; + return this; + } + public String getOaf() { return oaf; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/GraphMappingUtils.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/GraphMappingUtils.java index 0921fe105..2069ad9f7 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/GraphMappingUtils.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/GraphMappingUtils.java @@ -26,6 +26,8 @@ import static org.apache.commons.lang3.StringUtils.*; public class GraphMappingUtils { + public static final String SEPARATOR = "_"; + public enum EntityType { publication, dataset, otherresearchproduct, software, datasource, organization, project } @@ -250,5 +252,8 @@ public class GraphMappingUtils { return s; } + public static String getRelDescriptor(String relType, String subRelType, String relClass) { + return relType + SEPARATOR + subRelType + SEPARATOR + relClass; + } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/RelationPartitioner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/RelationPartitioner.java index 0a7c6dcaf..f4b1514d0 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/RelationPartitioner.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/RelationPartitioner.java @@ -1,4 +1,29 @@ package eu.dnetlib.dhp.graph.utils; -public class RelationPartitioner { +import eu.dnetlib.dhp.graph.model.SortableRelationKey; +import org.apache.spark.Partitioner; +import org.apache.spark.util.Utils; + +/** + * Used in combination with SortableRelationKey, allows to partition the records by source id, therefore + * allowing to sort relations sharing the same source id by the ordering defined in SortableRelationKey. + */ +public class RelationPartitioner extends Partitioner { + + private int numPartitions; + + public RelationPartitioner(int numPartitions) { + this.numPartitions = numPartitions; + } + + @Override + public int numPartitions() { + return numPartitions; + } + + @Override + public int getPartition(Object key) { + return Utils.nonNegativeMod(((SortableRelationKey) key).getSourceId().hashCode(), numPartitions()); + } + } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/XmlRecordFactory.java index df34b08d3..b8ffd7f7a 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/XmlRecordFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/XmlRecordFactory.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.graph.utils; import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.mycila.xmltool.XMLDoc; import com.mycila.xmltool.XMLTag; @@ -11,6 +12,8 @@ import eu.dnetlib.dhp.graph.model.RelatedEntity; import eu.dnetlib.dhp.graph.model.Tuple2; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.util.LongAccumulator; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -27,6 +30,7 @@ import java.io.Serializable; import java.io.StringReader; import java.io.StringWriter; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -37,6 +41,8 @@ import static org.apache.commons.lang3.StringUtils.substringBefore; public class XmlRecordFactory implements Serializable { + private Map accumulators; + private Set specialDatasourceTypes; private ContextMapper contextMapper; @@ -47,11 +53,20 @@ public class XmlRecordFactory implements Serializable { public XmlRecordFactory( final ContextMapper contextMapper, final boolean indent, - final String schemaLocation, final Set otherDatasourceTypesUForUI) { + final String schemaLocation, final String otherDatasourceTypesUForUI) { + this(Maps.newHashMap(), contextMapper, indent, schemaLocation, otherDatasourceTypesUForUI); + } + + public XmlRecordFactory( + final Map accumulators, + final ContextMapper contextMapper, final boolean indent, + final String schemaLocation, final String otherDatasourceTypesUForUI) { + + this.accumulators = accumulators; this.contextMapper = contextMapper; this.schemaLocation = schemaLocation; - this.specialDatasourceTypes = otherDatasourceTypesUForUI; + this.specialDatasourceTypes = Sets.newHashSet(Splitter.on(",").trimResults().split(otherDatasourceTypesUForUI)); this.indent = indent; } @@ -583,7 +598,7 @@ public class XmlRecordFactory implements Serializable { if (p.getFundingtree() != null) { metadata.addAll(p.getFundingtree() .stream() - .map(ft -> asXmlElement("fundingtree", ft.getValue())) + .map(ft -> ft.getValue()) .collect(Collectors.toList())); } @@ -713,12 +728,27 @@ public class XmlRecordFactory implements Serializable { } final DataInfo info = rel.getDataInfo(); + final String inverseRelClass = getInverseRelClass(rel.getRelClass()); + final String scheme = getScheme(re.getType(), targetType); + + if (StringUtils.isBlank(inverseRelClass)) { + throw new IllegalArgumentException("missing inverse for: " + rel.getRelClass()); + } + if (StringUtils.isBlank(scheme)) { + throw new IllegalArgumentException(String.format("missing scheme for: <%s - %s>", re.getType(), targetType)); + } + + final String accumulatorName = getRelDescriptor(rel.getRelType(), rel.getSubRelType(), rel.getRelClass()); + if (accumulators.containsKey(accumulatorName)) { + accumulators.get(accumulatorName).add(1); + } + rels.add(templateFactory.getRel( targetType, rel.getTarget(), Sets.newHashSet(metadata), - getInverseRelClass(rel.getRelClass()), - getScheme(targetType, re.getType()), + inverseRelClass, + scheme, info)); } return rels; diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_params_build_adjacency_lists.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_params_build_adjacency_lists.json index e63322028..a5d20a55f 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_params_build_adjacency_lists.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_params_build_adjacency_lists.json @@ -1,6 +1,7 @@ [ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true}, - {"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the path used to store temporary output files", "paramRequired": true}, - {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequence file to read", "paramRequired": true} + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true}, + {"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the path used to store temporary output files", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequence file to read", "paramRequired": true}, + {"paramName":"t", "paramLongName":"otherDsTypeId", "paramDescription": "list of datasource types to populate field datasourcetypeui", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml index 350358944..e3d0ca192 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml @@ -53,6 +53,7 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForJoining} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -60,6 +61,7 @@ -mt yarn -is ${isLookupUrl} + -t ${otherDsTypeId} --sourcePath${sourcePath} --outputPath${outputPath} diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/GraphJoinerTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/GraphJoinerTest.java index e8c1c088c..147ac801c 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/GraphJoinerTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/GraphJoinerTest.java @@ -1,4 +1,38 @@ package eu.dnetlib.dhp.graph; +import org.junit.Before; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + public class GraphJoinerTest { + + private ClassLoader cl = getClass().getClassLoader(); + private Path workingDir; + private Path inputDir; + private Path outputDir; + + @Before + public void before() throws IOException { + workingDir = Files.createTempDirectory("promote_action_set"); + inputDir = workingDir.resolve("input"); + outputDir = workingDir.resolve("output"); + } + + private static void copyFiles(Path source, Path target) throws IOException { + Files.list(source).forEach(f -> { + try { + if (Files.isDirectory(f)) { + Path subTarget = Files.createDirectories(target.resolve(f.getFileName())); + copyFiles(f, subTarget); + } else { + Files.copy(f, target.resolve(f.getFileName())); + } + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + }); + } }