From adcdd2d05e36a7e2b3997ec62b27a6bf770455d3 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 1 Apr 2020 14:56:57 +0200 Subject: [PATCH] WIP: reimplementing the adjacency list construction process using spark Datasets --- .../dnetlib/dhp/oa/provision/GraphJoiner.java | 291 -------------- .../dhp/oa/provision/GraphJoiner_v2.java | 328 ++++++++++++++++ .../dhp/oa/provision/SparkXmlIndexingJob.java | 2 +- .../provision/SparkXmlRecordBuilderJob.java | 47 --- .../SparkXmlRecordBuilderJob_v2.java | 81 ++++ .../oa/provision/model/EntityRelEntity.java | 44 +-- .../dhp/oa/provision/model/JoinedEntity.java | 9 +- .../dhp/oa/provision/model/RelatedEntity.java | 362 ++++++++---------- .../provision/model/SortableRelationKey.java | 31 +- .../dhp/oa/provision/model/Tuple2.java | 11 +- .../dhp/oa/provision/model/TypedRow.java | 77 ++-- .../oa/provision/utils/GraphMappingUtils.java | 26 +- .../oa/provision/utils/XmlRecordFactory.java | 4 +- .../dhp/oa/provision/oozie_app/workflow.xml | 3 +- 14 files changed, 652 insertions(+), 664 deletions(-) delete mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java delete mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob_v2.java diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner.java deleted file mode 100644 index def757da3..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner.java +++ /dev/null @@ -1,291 +0,0 @@ -package eu.dnetlib.dhp.oa.provision; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.JsonPath; -import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; -import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils; -import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner; -import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; -import eu.dnetlib.dhp.oa.provision.model.*; -import eu.dnetlib.dhp.schema.oaf.*; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.spark.SparkContext; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.util.LongAccumulator; -import scala.Tuple2; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Map; - -import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.asRelatedEntity; - -/** - * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. - * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, - * and all the possible relationships (similarity links produced by the Dedup process are excluded). - * - * The operation is implemented creating the union between the entity types (E), joined by the relationships (R), and again - * by E, finally grouped by E.id; - * - * Different manipulations of the E and R sets are introduced to reduce the complexity of the operation - * 1) treat the object payload as string, extracting only the necessary information beforehand using json path, - * it seems that deserializing it with jackson's object mapper has higher memory footprint. - * - * 2) only consider rels that are not virtually deleted ($.dataInfo.deletedbyinference == false) - * 3) we only need a subset of fields from the related entities, so we introduce a distinction between E_source = S - * and E_target = T. Objects in T are heavily pruned by all the unnecessary information - * - * 4) perform the join as (((T.id join R.target) union S) groupby S.id) yield S -> [ ] - */ -public class GraphJoiner implements Serializable { - - private Map accumulators = Maps.newHashMap(); - - public static final int MAX_RELS = 100; - - public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd"; - - private SparkSession spark; - - private ContextMapper contextMapper; - - private String inputPath; - - private String outPath; - - private String otherDsTypeId; - - public GraphJoiner(SparkSession spark, ContextMapper contextMapper, String otherDsTypeId, String inputPath, String outPath) { - this.spark = spark; - this.contextMapper = contextMapper; - this.otherDsTypeId = otherDsTypeId; - this.inputPath = inputPath; - this.outPath = outPath; - - final SparkContext sc = spark.sparkContext(); - prepareAccumulators(sc); - } - - public GraphJoiner adjacencyLists() { - final JavaSparkContext jsc = new JavaSparkContext(getSpark().sparkContext()); - - // read each entity - JavaPairRDD datasource = readPathEntity(jsc, getInputPath(), "datasource"); - JavaPairRDD organization = readPathEntity(jsc, getInputPath(), "organization"); - JavaPairRDD project = readPathEntity(jsc, getInputPath(), "project"); - JavaPairRDD dataset = readPathEntity(jsc, getInputPath(), "dataset"); - JavaPairRDD otherresearchproduct = readPathEntity(jsc, getInputPath(), "otherresearchproduct"); - JavaPairRDD software = readPathEntity(jsc, getInputPath(), "software"); - JavaPairRDD publication = readPathEntity(jsc, getInputPath(), "publication"); - - // create the union between all the entities - final String entitiesPath = getOutPath() + "/entities"; - datasource - .union(organization) - .union(project) - .union(dataset) - .union(otherresearchproduct) - .union(software) - .union(publication) - .map(e -> new EntityRelEntity().setSource(e._2())) - .map(GraphMappingUtils::serialize) - .saveAsTextFile(entitiesPath, GzipCodec.class); - - JavaPairRDD entities = jsc.textFile(entitiesPath) - .map(t -> new ObjectMapper().readValue(t, EntityRelEntity.class)) - .mapToPair(t -> new Tuple2<>(t.getSource().getSourceId(), t)); - - final String relationPath = getOutPath() + "/relation"; - // reads the relationships - final JavaPairRDD rels = readPathRelation(jsc, getInputPath()) - .filter(rel -> !rel.getDeleted()) //only consider those that are not virtually deleted - .map(p -> new EntityRelEntity().setRelation(p)) - .mapToPair(p -> new Tuple2<>(SortableRelationKey.from(p), p)); - rels - .groupByKey(new RelationPartitioner(rels.getNumPartitions())) - .map(p -> Iterables.limit(p._2(), MAX_RELS)) - .flatMap(p -> p.iterator()) - .map(s -> new ObjectMapper().writeValueAsString(s)) - .saveAsTextFile(relationPath, GzipCodec.class); - - final JavaPairRDD relation = jsc.textFile(relationPath) - .map(s -> new ObjectMapper().readValue(s, EntityRelEntity.class)) - .mapToPair(p -> new Tuple2<>(p.getRelation().getTargetId(), p)); - - final String bySourcePath = getOutPath() + "/join_by_source"; - relation - .join(entities - .filter(e -> !e._2().getSource().getDeleted()) - .mapToPair(e -> new Tuple2<>(e._1(), asRelatedEntity(e._2())))) - .map(s -> new EntityRelEntity() - .setRelation(s._2()._1().getRelation()) - .setTarget(s._2()._2().getSource())) - .map(j -> new ObjectMapper().writeValueAsString(j)) - .saveAsTextFile(bySourcePath, GzipCodec.class); - - JavaPairRDD bySource = jsc.textFile(bySourcePath) - .map(e -> getObjectMapper().readValue(e, EntityRelEntity.class)) - .mapToPair(t -> new Tuple2<>(t.getRelation().getSourceId(), t)); - - final XmlRecordFactory recordFactory = new XmlRecordFactory(accumulators, contextMapper, false, schemaLocation, otherDsTypeId); - entities - .union(bySource) - .groupByKey() // by source id - .map(l -> toJoinedEntity(l)) - .mapToPair(je -> new Tuple2<>( - new Text(je.getEntity().getId()), - new Text(recordFactory.build(je)))) - .saveAsHadoopFile(getOutPath() + "/xml", Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); - - return this; - } - - public SparkSession getSpark() { - return spark; - } - - public String getInputPath() { - return inputPath; - } - - public String getOutPath() { - return outPath; - } - - // HELPERS - - private OafEntity parseOaf(final String json, final String type, final ObjectMapper mapper) { - try { - switch (GraphMappingUtils.EntityType.valueOf(type)) { - case publication: - return mapper.readValue(json, Publication.class); - case dataset: - return mapper.readValue(json, Dataset.class); - case otherresearchproduct: - return mapper.readValue(json, OtherResearchProduct.class); - case software: - return mapper.readValue(json, Software.class); - case datasource: - return mapper.readValue(json, Datasource.class); - case organization: - return mapper.readValue(json, Organization.class); - case project: - return mapper.readValue(json, Project.class); - default: - throw new IllegalArgumentException("invalid type: " + type); - } - } catch (IOException e) { - throw new IllegalArgumentException(e); - } - } - - private JoinedEntity toJoinedEntity(Tuple2> p) { - final ObjectMapper mapper = getObjectMapper(); - final JoinedEntity j = new JoinedEntity(); - final Links links = new Links(); - for(EntityRelEntity rel : p._2()) { - if (rel.hasMainEntity() & j.getEntity() == null) { - j.setType(rel.getSource().getType()); - j.setEntity(parseOaf(rel.getSource().getOaf(), rel.getSource().getType(), mapper)); - } - if (rel.hasRelatedEntity()) { - try { - links.add( - new eu.dnetlib.dhp.oa.provision.model.Tuple2() - .setRelation(mapper.readValue(rel.getRelation().getOaf(), Relation.class)) - .setRelatedEntity(mapper.readValue(rel.getTarget().getOaf(), RelatedEntity.class))); - } catch (IOException e) { - throw new IllegalArgumentException(e); - } - } - } - j.setLinks(links); - if (j.getEntity() == null) { - throw new IllegalStateException("missing main entity on '" + p._1() + "'"); - } - return j; - } - - /** - * Reads a set of eu.dnetlib.dhp.schema.oaf.OafEntity objects from a sequence file , - * extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow - * @param sc - * @param inputPath - * @param type - * @return the JavaPairRDD indexed by entity identifier - */ - private JavaPairRDD readPathEntity(final JavaSparkContext sc, final String inputPath, final String type) { - return sc.textFile(inputPath + "/" + type) - .mapToPair((PairFunction) s -> { - final DocumentContext json = JsonPath.parse(s); - final String id = json.read("$.id"); - return new Tuple2<>(id, new TypedRow() - .setSourceId(id) - .setDeleted(json.read("$.dataInfo.deletedbyinference")) - .setType(type) - .setOaf(s)); - }); - } - - /** - * Reads a set of eu.dnetlib.dhp.schema.oaf.Relation objects from a sequence file , - * extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow - * @param sc - * @param inputPath - * @return the JavaRDD containing all the relationships - */ - private JavaRDD readPathRelation(final JavaSparkContext sc, final String inputPath) { - return sc.textFile(inputPath + "/relation") - .map(s -> { - final DocumentContext json = JsonPath.parse(s); - return new TypedRow() - .setSourceId(json.read("$.source")) - .setTargetId(json.read("$.target")) - .setDeleted(json.read("$.dataInfo.deletedbyinference")) - .setType("relation") - .setRelType("$.relType") - .setSubRelType("$.subRelType") - .setRelClass("$.relClass") - .setOaf(s); - }); - } - - private ObjectMapper getObjectMapper() { - return new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } - - private void prepareAccumulators(SparkContext sc) { - accumulators.put("resultResult_similarity_isAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_isAmongTopNSimilarDocuments")); - accumulators.put("resultResult_similarity_hasAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_hasAmongTopNSimilarDocuments")); - accumulators.put("resultResult_supplement_isSupplementTo", sc.longAccumulator("resultResult_supplement_isSupplementTo")); - accumulators.put("resultResult_supplement_isSupplementedBy", sc.longAccumulator("resultResult_supplement_isSupplementedBy")); - accumulators.put("resultResult_dedup_isMergedIn", sc.longAccumulator("resultResult_dedup_isMergedIn")); - accumulators.put("resultResult_dedup_merges", sc.longAccumulator("resultResult_dedup_merges")); - - accumulators.put("resultResult_publicationDataset_isRelatedTo", sc.longAccumulator("resultResult_publicationDataset_isRelatedTo")); - accumulators.put("resultResult_relationship_isRelatedTo", sc.longAccumulator("resultResult_relationship_isRelatedTo")); - accumulators.put("resultProject_outcome_isProducedBy", sc.longAccumulator("resultProject_outcome_isProducedBy")); - accumulators.put("resultProject_outcome_produces", sc.longAccumulator("resultProject_outcome_produces")); - accumulators.put("resultOrganization_affiliation_isAuthorInstitutionOf", sc.longAccumulator("resultOrganization_affiliation_isAuthorInstitutionOf")); - - accumulators.put("resultOrganization_affiliation_hasAuthorInstitution", sc.longAccumulator("resultOrganization_affiliation_hasAuthorInstitution")); - accumulators.put("projectOrganization_participation_hasParticipant", sc.longAccumulator("projectOrganization_participation_hasParticipant")); - accumulators.put("projectOrganization_participation_isParticipant", sc.longAccumulator("projectOrganization_participation_isParticipant")); - accumulators.put("organizationOrganization_dedup_isMergedIn", sc.longAccumulator("organizationOrganization_dedup_isMergedIn")); - accumulators.put("organizationOrganization_dedup_merges", sc.longAccumulator("resultProject_outcome_produces")); - accumulators.put("datasourceOrganization_provision_isProvidedBy", sc.longAccumulator("datasourceOrganization_provision_isProvidedBy")); - accumulators.put("datasourceOrganization_provision_provides", sc.longAccumulator("datasourceOrganization_provision_provides")); - } - -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java new file mode 100644 index 000000000..d67493f43 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java @@ -0,0 +1,328 @@ +package eu.dnetlib.dhp.oa.provision; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterators; +import com.google.common.collect.Maps; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import eu.dnetlib.dhp.oa.provision.model.*; +import eu.dnetlib.dhp.oa.provision.utils.*; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.*; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.types.*; +import org.apache.spark.util.LongAccumulator; +import scala.Tuple2; + +import java.io.IOException; +import java.io.Serializable; +import java.util.*; + +import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.asRelatedEntity; + +/** + * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. + * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, + * and all the possible relationships (similarity links produced by the Dedup process are excluded). + * + * The operation is implemented creating the union between the entity types (E), joined by the relationships (R), and again + * by E, finally grouped by E.id; + * + * Different manipulations of the E and R sets are introduced to reduce the complexity of the operation + * 1) treat the object payload as string, extracting only the necessary information beforehand using json path, + * it seems that deserializing it with jackson's object mapper has higher memory footprint. + * + * 2) only consider rels that are not virtually deleted ($.dataInfo.deletedbyinference == false) + * 3) we only need a subset of fields from the related entities, so we introduce a distinction between E_source = S + * and E_target = T. Objects in T are heavily pruned by all the unnecessary information + * + * 4) perform the join as (((T.id join R.target) union S) groupby S.id) yield S -> [ ] + */ +public class GraphJoiner_v2 implements Serializable { + + public static final int LIMIT = 1000000; + private Map accumulators = Maps.newHashMap(); + + public static final int MAX_RELS = 100; + + public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd"; + + private static final StructType KV_SCHEMA = StructType$.MODULE$.apply( + Arrays.asList( + StructField$.MODULE$.apply("key", DataTypes.StringType, false, Metadata.empty()), + StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty()) + )); + + private static final StructType TYPED_ROW_SCHEMA = StructType$.MODULE$.apply( + Arrays.asList( + StructField$.MODULE$.apply("sourceId", DataTypes.StringType, false, Metadata.empty()), + StructField$.MODULE$.apply("targetId", DataTypes.StringType, true, Metadata.empty()), + StructField$.MODULE$.apply("deleted", DataTypes.BooleanType, false, Metadata.empty()), + StructField$.MODULE$.apply("type", DataTypes.StringType, false, Metadata.empty()), + StructField$.MODULE$.apply("relType", DataTypes.StringType, true, Metadata.empty()), + StructField$.MODULE$.apply("subRelType", DataTypes.StringType, true, Metadata.empty()), + StructField$.MODULE$.apply("relClass", DataTypes.StringType, true, Metadata.empty()), + StructField$.MODULE$.apply("oaf", DataTypes.BinaryType, false, Metadata.empty()) + )); + + private static final StructType ENTITY_REL_ENTITY_SCHEMA = StructType$.MODULE$.apply( + Arrays.asList( + StructField$.MODULE$.apply("source", TYPED_ROW_SCHEMA, false, Metadata.empty()), + StructField$.MODULE$.apply("relation", TYPED_ROW_SCHEMA, true, Metadata.empty()), + StructField$.MODULE$.apply("target", TYPED_ROW_SCHEMA, false, Metadata.empty()) + )); + + + private SparkSession spark; + + private ContextMapper contextMapper; + + private String inputPath; + + private String outPath; + + private String otherDsTypeId; + + public GraphJoiner_v2(SparkSession spark, ContextMapper contextMapper, String otherDsTypeId, String inputPath, String outPath) { + this.spark = spark; + this.contextMapper = contextMapper; + this.otherDsTypeId = otherDsTypeId; + this.inputPath = inputPath; + this.outPath = outPath; + + final SparkContext sc = spark.sparkContext(); + prepareAccumulators(sc); + } + + public GraphJoiner_v2 adjacencyLists() throws IOException { + + final JavaSparkContext jsc = JavaSparkContext.fromSparkContext(getSpark().sparkContext()); + + // read each entity + Dataset datasource = readPathEntity(jsc, getInputPath(), "datasource"); + Dataset organization = readPathEntity(jsc, getInputPath(), "organization"); + Dataset project = readPathEntity(jsc, getInputPath(), "project"); + Dataset dataset = readPathEntity(jsc, getInputPath(), "dataset"); + Dataset otherresearchproduct = readPathEntity(jsc, getInputPath(), "otherresearchproduct"); + Dataset software = readPathEntity(jsc, getInputPath(), "software"); + Dataset publication = readPathEntity(jsc, getInputPath(), "publication"); + + // create the union between all the entities + Dataset> entities = + datasource + .union(organization) + .union(project) + .union(dataset) + .union(otherresearchproduct) + .union(software) + .union(publication) + .map((MapFunction>) value -> new Tuple2<>( + value.getId(), + value), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class))) + .limit(LIMIT) + .cache(); + + System.out.println("Entities schema:"); + entities.printSchema(); + // reads the relationships + + Dataset rels = readPathRelation(jsc, getInputPath()) + .groupByKey((MapFunction) t -> SortableRelationKey.from(t), Encoders.kryo(SortableRelationKey.class)) + .flatMapGroups((FlatMapGroupsFunction) (key, values) -> Iterators.limit(values, MAX_RELS), Encoders.bean(Relation.class)) + .limit(LIMIT) + .cache(); + + System.out.println("Relation schema:"); + rels.printSchema(); + + Dataset> relsByTarget = rels + .map((MapFunction>) r -> new Tuple2<>(r.getTarget(), r), Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class))); + + System.out.println("Relation by target schema:"); + relsByTarget.printSchema(); + + Dataset> bySource = relsByTarget + .joinWith(entities, relsByTarget.col("_1").equalTo(entities.col("_1")), "inner") + .filter((FilterFunction, Tuple2>>) value -> value._2()._2().getDeleted() == false) + .map((MapFunction, Tuple2>, EntityRelEntity>) t -> { + EntityRelEntity e = new EntityRelEntity(); + e.setRelation(t._1()._2()); + e.setTarget(asRelatedEntity(t._2()._2())); + return e; + }, Encoders.bean(EntityRelEntity.class)) + .map((MapFunction>) e -> new Tuple2<>(e.getRelation().getSource(), e), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class))); + + System.out.println("bySource schema"); + bySource.printSchema(); + + Dataset joined = entities + .joinWith(bySource, entities.col("_1").equalTo(bySource.col("_1")), "left") + .map((MapFunction, Tuple2>, EntityRelEntity>) value -> { + EntityRelEntity re = new EntityRelEntity(); + re.setEntity(value._1()._2()); + Optional related = Optional.ofNullable(value._2()).map(Tuple2::_2); + if (related.isPresent()) { + re.setRelation(related.get().getRelation()); + re.setTarget(related.get().getTarget()); + } + return re; + }, Encoders.kryo(EntityRelEntity.class)); + + System.out.println("joined schema"); + joined.printSchema(); + //joined.write().json(getOutPath() + "/joined"); + + final Dataset grouped = joined + .groupByKey((MapFunction) e -> e.getEntity(), Encoders.kryo(TypedRow.class)) + .mapGroups((MapGroupsFunction) (key, values) -> toJoinedEntity(key, values), Encoders.kryo(JoinedEntity.class)); + + System.out.println("grouped schema"); + grouped.printSchema(); + + final XmlRecordFactory recordFactory = new XmlRecordFactory(accumulators, contextMapper, false, schemaLocation, otherDsTypeId); + grouped + .map((MapFunction) value -> recordFactory.build(value), Encoders.STRING()) + .limit(LIMIT) + .write() + .text(getOutPath() + "/xml"); + /* + .javaRDD() + .mapToPair((PairFunction, String, String>) t -> new Tuple2<>(t._1(), t._2())) + .saveAsHadoopFile(getOutPath() + "/xml", Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + + */ + + return this; + } + + public SparkSession getSpark() { + return spark; + } + + public String getInputPath() { + return inputPath; + } + + public String getOutPath() { + return outPath; + } + + // HELPERS + + private JoinedEntity toJoinedEntity(TypedRow key, Iterator values) { + final ObjectMapper mapper = getObjectMapper(); + final JoinedEntity j = new JoinedEntity(); + j.setType(key.getType()); + j.setEntity(parseOaf(key.getOaf(), key.getType(), mapper)); + final Links links = new Links(); + values.forEachRemaining(rel -> links.add( + new eu.dnetlib.dhp.oa.provision.model.Tuple2( + rel.getRelation(), + rel.getTarget() + ))); + j.setLinks(links); + return j; + } + + private OafEntity parseOaf(final String json, final String type, final ObjectMapper mapper) { + try { + switch (GraphMappingUtils.EntityType.valueOf(type)) { + case publication: + return mapper.readValue(json, Publication.class); + case dataset: + return mapper.readValue(json, eu.dnetlib.dhp.schema.oaf.Dataset.class); + case otherresearchproduct: + return mapper.readValue(json, OtherResearchProduct.class); + case software: + return mapper.readValue(json, Software.class); + case datasource: + return mapper.readValue(json, Datasource.class); + case organization: + return mapper.readValue(json, Organization.class); + case project: + return mapper.readValue(json, Project.class); + default: + throw new IllegalArgumentException("invalid type: " + type); + } + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + + /** + * Reads a set of eu.dnetlib.dhp.schema.oaf.OafEntity objects from a new line delimited json file, + * extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow + * @param sc + * @param inputPath + * @param type + * @return the JavaPairRDD indexed by entity identifier + */ + private Dataset readPathEntity(final JavaSparkContext sc, final String inputPath, final String type) { + RDD rdd = sc.textFile(inputPath + "/" + type) + .map((Function) s -> RowFactory.create("", s)) + .rdd(); + + return getSpark().createDataFrame(rdd, KV_SCHEMA) + .map((MapFunction) row -> { + final String s = row.getAs("value"); + final DocumentContext json = JsonPath.parse(s); + final TypedRow t = new TypedRow(); + t.setId(json.read("$.id")); + t.setDeleted(json.read("$.dataInfo.deletedbyinference")); + t.setType(type); + t.setOaf(s); + return t; + }, Encoders.bean(TypedRow.class)); + } + + /** + * Reads a set of eu.dnetlib.dhp.schema.oaf.Relation objects from a sequence file , + * extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow + * @param sc + * @param inputPath + * @return the JavaRDD containing all the relationships + */ + private Dataset readPathRelation(final JavaSparkContext sc, final String inputPath) { + final RDD rdd = sc.textFile(inputPath + "/relation") + .map((Function) s -> RowFactory.create("", s)) + .rdd(); + + return getSpark().createDataFrame(rdd, KV_SCHEMA) + .map((MapFunction) value -> new ObjectMapper().readValue(value.getAs("value"), Relation.class), Encoders.bean(Relation.class)); + } + + private ObjectMapper getObjectMapper() { + return new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + private void prepareAccumulators(SparkContext sc) { + accumulators.put("resultResult_similarity_isAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_isAmongTopNSimilarDocuments")); + accumulators.put("resultResult_similarity_hasAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_hasAmongTopNSimilarDocuments")); + accumulators.put("resultResult_supplement_isSupplementTo", sc.longAccumulator("resultResult_supplement_isSupplementTo")); + accumulators.put("resultResult_supplement_isSupplementedBy", sc.longAccumulator("resultResult_supplement_isSupplementedBy")); + accumulators.put("resultResult_dedup_isMergedIn", sc.longAccumulator("resultResult_dedup_isMergedIn")); + accumulators.put("resultResult_dedup_merges", sc.longAccumulator("resultResult_dedup_merges")); + + accumulators.put("resultResult_publicationDataset_isRelatedTo", sc.longAccumulator("resultResult_publicationDataset_isRelatedTo")); + accumulators.put("resultResult_relationship_isRelatedTo", sc.longAccumulator("resultResult_relationship_isRelatedTo")); + accumulators.put("resultProject_outcome_isProducedBy", sc.longAccumulator("resultProject_outcome_isProducedBy")); + accumulators.put("resultProject_outcome_produces", sc.longAccumulator("resultProject_outcome_produces")); + accumulators.put("resultOrganization_affiliation_isAuthorInstitutionOf", sc.longAccumulator("resultOrganization_affiliation_isAuthorInstitutionOf")); + + accumulators.put("resultOrganization_affiliation_hasAuthorInstitution", sc.longAccumulator("resultOrganization_affiliation_hasAuthorInstitution")); + accumulators.put("projectOrganization_participation_hasParticipant", sc.longAccumulator("projectOrganization_participation_hasParticipant")); + accumulators.put("projectOrganization_participation_isParticipant", sc.longAccumulator("projectOrganization_participation_isParticipant")); + accumulators.put("organizationOrganization_dedup_isMergedIn", sc.longAccumulator("organizationOrganization_dedup_isMergedIn")); + accumulators.put("organizationOrganization_dedup_merges", sc.longAccumulator("resultProject_outcome_produces")); + accumulators.put("datasourceOrganization_provision_isProvidedBy", sc.longAccumulator("datasourceOrganization_provision_isProvidedBy")); + accumulators.put("datasourceOrganization_provision_provides", sc.longAccumulator("datasourceOrganization_provision_provides")); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlIndexingJob.java index cafbc8653..975ac7548 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlIndexingJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlIndexingJob.java @@ -84,7 +84,7 @@ public class SparkXmlIndexingJob { return SparkSession .builder() .config(conf) - .appName(SparkXmlRecordBuilderJob.class.getSimpleName()) + .appName(SparkXmlIndexingJob.class.getSimpleName()) .master(master) .getOrCreate(); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob.java deleted file mode 100644 index 0a898c0fc..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob.java +++ /dev/null @@ -1,47 +0,0 @@ -package eu.dnetlib.dhp.oa.provision; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.spark.SparkConf; -import org.apache.spark.sql.SparkSession; - -public class SparkXmlRecordBuilderJob { - - public static void main(String[] args) throws Exception { - - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils.toString( - SparkXmlRecordBuilderJob.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json"))); - parser.parseArgument(args); - - final String master = parser.get("master"); - final SparkConf conf = new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - - try(SparkSession spark = getSession(conf, master)) { - - final String inputPath = parser.get("sourcePath"); - final String outputPath = parser.get("outputPath"); - final String isLookupUrl = parser.get("isLookupUrl"); - final String otherDsTypeId = parser.get("otherDsTypeId"); - - final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); - - new GraphJoiner(spark, ContextMapper.fromIS(isLookupUrl), otherDsTypeId, inputPath, outputPath) - .adjacencyLists(); - } - } - - private static SparkSession getSession(SparkConf conf, String master) { - return SparkSession - .builder() - .config(conf) - .appName(SparkXmlRecordBuilderJob.class.getSimpleName()) - .master(master) - .getOrCreate(); - } - -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob_v2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob_v2.java new file mode 100644 index 000000000..3b119cebb --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob_v2.java @@ -0,0 +1,81 @@ +package eu.dnetlib.dhp.oa.provision; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.provision.model.*; +import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; + +public class SparkXmlRecordBuilderJob_v2 { + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString( + SparkXmlRecordBuilderJob_v2.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json"))); + parser.parseArgument(args); + + final String master = parser.get("master"); + try(SparkSession spark = getSession(master)) { + + final String inputPath = parser.get("sourcePath"); + final String outputPath = parser.get("outputPath"); + final String isLookupUrl = parser.get("isLookupUrl"); + final String otherDsTypeId = parser.get("otherDsTypeId"); + + new GraphJoiner_v2(spark, ContextMapper.fromIS(isLookupUrl), otherDsTypeId, inputPath, outputPath) + .adjacencyLists(); + } + } + + private static SparkSession getSession(String master) { + final SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.set("spark.sql.shuffle.partitions", "500"); + conf.registerKryoClasses(new Class[]{ + Author.class, + Context.class, + Country.class, + DataInfo.class, + eu.dnetlib.dhp.schema.oaf.Dataset.class, + Datasource.class, + ExternalReference.class, + ExtraInfo.class, + Field.class, + GeoLocation.class, + Instance.class, + Journal.class, + KeyValue.class, + Oaf.class, + OafEntity.class, + OAIProvenance.class, + Organization.class, + OriginDescription.class, + OtherResearchProduct.class, + Project.class, + Publication.class, + Qualifier.class, + Relation.class, + Result.class, + Software.class, + StructuredProperty.class, + + TypedRow.class, + EntityRelEntity.class, + JoinedEntity.class, + SortableRelationKey.class, + Tuple2.class, + Links.class, + RelatedEntity.class + }); + return SparkSession + .builder() + .config(conf) + .appName(SparkXmlRecordBuilderJob_v2.class.getSimpleName()) + .master(master) + .getOrCreate(); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java index ba89eaa38..ddeec140b 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java @@ -1,54 +1,36 @@ package eu.dnetlib.dhp.oa.provision.model; +import eu.dnetlib.dhp.schema.oaf.Relation; + import java.io.Serializable; public class EntityRelEntity implements Serializable { - private TypedRow source; - private TypedRow relation; - private TypedRow target; + private TypedRow entity; + private Relation relation; + private RelatedEntity target; - public EntityRelEntity() { + public TypedRow getEntity() { + return entity; } - public EntityRelEntity(TypedRow source) { - this.source = source; + public void setEntity(TypedRow entity) { + this.entity = entity; } - //helpers - public Boolean hasMainEntity() { - return getSource() != null & getRelation() == null & getTarget() == null; - } - - public Boolean hasRelatedEntity() { - return getSource() == null & getRelation() != null & getTarget() != null; - } - - - public TypedRow getSource() { - return source; - } - - public EntityRelEntity setSource(TypedRow source) { - this.source = source; - return this; - } - - public TypedRow getRelation() { + public Relation getRelation() { return relation; } - public EntityRelEntity setRelation(TypedRow relation) { + public void setRelation(Relation relation) { this.relation = relation; - return this; } - public TypedRow getTarget() { + public RelatedEntity getTarget() { return target; } - public EntityRelEntity setTarget(TypedRow target) { + public void setTarget(RelatedEntity target) { this.target = target; - return this; } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java index 80b15a4d6..815863c67 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java @@ -16,26 +16,23 @@ public class JoinedEntity implements Serializable { return type; } - public JoinedEntity setType(String type) { + public void setType(String type) { this.type = type; - return this; } public OafEntity getEntity() { return entity; } - public JoinedEntity setEntity(OafEntity entity) { + public void setEntity(OafEntity entity) { this.entity = entity; - return this; } public Links getLinks() { return links; } - public JoinedEntity setLinks(Links links) { + public void setLinks(Links links) { this.links = links; - return this; } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java index 75e9045e8..2e5b4186c 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java @@ -49,207 +49,183 @@ public class RelatedEntity implements Serializable { return id; } - public RelatedEntity setId(String id) { + public void setId(String id) { this.id = id; - return this; - } - - public StructuredProperty getTitle() { - return title; - } - - public RelatedEntity setTitle(StructuredProperty title) { - this.title = title; - return this; - } - - public String getDateofacceptance() { - return dateofacceptance; - } - - public RelatedEntity setDateofacceptance(String dateofacceptance) { - this.dateofacceptance = dateofacceptance; - return this; - } - - public String getPublisher() { - return publisher; - } - - public RelatedEntity setPublisher(String publisher) { - this.publisher = publisher; - return this; - } - - public List getPid() { - return pid; - } - - public RelatedEntity setPid(List pid) { - this.pid = pid; - return this; - } - - public String getCodeRepositoryUrl() { - return codeRepositoryUrl; - } - - public RelatedEntity setCodeRepositoryUrl(String codeRepositoryUrl) { - this.codeRepositoryUrl = codeRepositoryUrl; - return this; - } - - public Qualifier getResulttype() { - return resulttype; - } - - public RelatedEntity setResulttype(Qualifier resulttype) { - this.resulttype = resulttype; - return this; - } - - public List getCollectedfrom() { - return collectedfrom; - } - - public RelatedEntity setCollectedfrom(List collectedfrom) { - this.collectedfrom = collectedfrom; - return this; - } - - public List getInstances() { - return instances; - } - - public RelatedEntity setInstances(List instances) { - this.instances = instances; - return this; - } - - public String getOfficialname() { - return officialname; - } - - public RelatedEntity setOfficialname(String officialname) { - this.officialname = officialname; - return this; - } - - public String getWebsiteurl() { - return websiteurl; - } - - public RelatedEntity setWebsiteurl(String websiteurl) { - this.websiteurl = websiteurl; - return this; - } - - public Qualifier getDatasourcetype() { - return datasourcetype; - } - - public RelatedEntity setDatasourcetype(Qualifier datasourcetype) { - this.datasourcetype = datasourcetype; - return this; - } - - public Qualifier getDatasourcetypeui() { - return datasourcetypeui; - } - - public RelatedEntity setDatasourcetypeui(Qualifier datasourcetypeui) { - this.datasourcetypeui = datasourcetypeui; - return this; - } - - public Qualifier getOpenairecompatibility() { - return openairecompatibility; - } - - public RelatedEntity setOpenairecompatibility(Qualifier openairecompatibility) { - this.openairecompatibility = openairecompatibility; - return this; - } - - public String getLegalname() { - return legalname; - } - - public RelatedEntity setLegalname(String legalname) { - this.legalname = legalname; - return this; - } - - public String getLegalshortname() { - return legalshortname; - } - - public RelatedEntity setLegalshortname(String legalshortname) { - this.legalshortname = legalshortname; - return this; - } - - public Qualifier getCountry() { - return country; - } - - public RelatedEntity setCountry(Qualifier country) { - this.country = country; - return this; - } - - public String getCode() { - return code; - } - - public RelatedEntity setCode(String code) { - this.code = code; - return this; - } - - public String getAcronym() { - return acronym; - } - - public RelatedEntity setAcronym(String acronym) { - this.acronym = acronym; - return this; - } - - public Qualifier getContracttype() { - return contracttype; - } - - public RelatedEntity setContracttype(Qualifier contracttype) { - this.contracttype = contracttype; - return this; - } - - public List getFundingtree() { - return fundingtree; - } - - public RelatedEntity setFundingtree(List fundingtree) { - this.fundingtree = fundingtree; - return this; - } - - public String getProjectTitle() { - return projectTitle; - } - - public RelatedEntity setProjectTitle(String projectTitle) { - this.projectTitle = projectTitle; - return this; } public String getType() { return type; } - public RelatedEntity setType(String type) { + public void setType(String type) { this.type = type; - return this; } + public StructuredProperty getTitle() { + return title; + } + + public void setTitle(StructuredProperty title) { + this.title = title; + } + + public String getWebsiteurl() { + return websiteurl; + } + + public void setWebsiteurl(String websiteurl) { + this.websiteurl = websiteurl; + } + + public String getDateofacceptance() { + return dateofacceptance; + } + + public void setDateofacceptance(String dateofacceptance) { + this.dateofacceptance = dateofacceptance; + } + + public String getPublisher() { + return publisher; + } + + public void setPublisher(String publisher) { + this.publisher = publisher; + } + + public List getPid() { + return pid; + } + + public void setPid(List pid) { + this.pid = pid; + } + + public String getCodeRepositoryUrl() { + return codeRepositoryUrl; + } + + public void setCodeRepositoryUrl(String codeRepositoryUrl) { + this.codeRepositoryUrl = codeRepositoryUrl; + } + + public Qualifier getResulttype() { + return resulttype; + } + + public void setResulttype(Qualifier resulttype) { + this.resulttype = resulttype; + } + + public List getCollectedfrom() { + return collectedfrom; + } + + public void setCollectedfrom(List collectedfrom) { + this.collectedfrom = collectedfrom; + } + + public List getInstances() { + return instances; + } + + public void setInstances(List instances) { + this.instances = instances; + } + + public String getOfficialname() { + return officialname; + } + + public void setOfficialname(String officialname) { + this.officialname = officialname; + } + + public Qualifier getDatasourcetype() { + return datasourcetype; + } + + public void setDatasourcetype(Qualifier datasourcetype) { + this.datasourcetype = datasourcetype; + } + + public Qualifier getDatasourcetypeui() { + return datasourcetypeui; + } + + public void setDatasourcetypeui(Qualifier datasourcetypeui) { + this.datasourcetypeui = datasourcetypeui; + } + + public Qualifier getOpenairecompatibility() { + return openairecompatibility; + } + + public void setOpenairecompatibility(Qualifier openairecompatibility) { + this.openairecompatibility = openairecompatibility; + } + + public String getLegalname() { + return legalname; + } + + public void setLegalname(String legalname) { + this.legalname = legalname; + } + + public String getLegalshortname() { + return legalshortname; + } + + public void setLegalshortname(String legalshortname) { + this.legalshortname = legalshortname; + } + + public Qualifier getCountry() { + return country; + } + + public void setCountry(Qualifier country) { + this.country = country; + } + + public String getProjectTitle() { + return projectTitle; + } + + public void setProjectTitle(String projectTitle) { + this.projectTitle = projectTitle; + } + + public String getCode() { + return code; + } + + public void setCode(String code) { + this.code = code; + } + + public String getAcronym() { + return acronym; + } + + public void setAcronym(String acronym) { + this.acronym = acronym; + } + + public Qualifier getContracttype() { + return contracttype; + } + + public void setContracttype(Qualifier contracttype) { + this.contracttype = contracttype; + } + + public List getFundingtree() { + return fundingtree; + } + + public void setFundingtree(List fundingtree) { + this.fundingtree = fundingtree; + } } \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java index 8169e57e0..fef9915e8 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java @@ -2,6 +2,7 @@ package eu.dnetlib.dhp.oa.provision.model; import com.google.common.collect.ComparisonChain; import com.google.common.collect.Maps; +import eu.dnetlib.dhp.schema.oaf.Relation; import java.io.Serializable; import java.util.Map; @@ -33,58 +34,54 @@ public class SortableRelationKey implements Comparable, Ser weights.put("dedup", 8); } - public static SortableRelationKey from(final EntityRelEntity e) { - return new SortableRelationKey() - .setSourceId(e.getRelation().getSourceId()) - .setTargetId(e.getRelation().getTargetId()) - .setRelType(e.getRelation().getRelType()) - .setSubRelType(e.getRelation().getSubRelType()) - .setRelClass(e.getRelation().getRelClass()); + public static SortableRelationKey from(final Relation r) { + final SortableRelationKey s = new SortableRelationKey(); + s.setSourceId(r.getSource()); + s.setTargetId(r.getTarget()); + s.setRelType(r.getRelType()); + s.setSubRelType(r.getSubRelType()); + s.setRelClass(r.getRelClass()); + return s; } public String getSourceId() { return sourceId; } - public SortableRelationKey setSourceId(String sourceId) { + public void setSourceId(String sourceId) { this.sourceId = sourceId; - return this; } public String getTargetId() { return targetId; } - public SortableRelationKey setTargetId(String targetId) { + public void setTargetId(String targetId) { this.targetId = targetId; - return this; } public String getRelType() { return relType; } - public SortableRelationKey setRelType(String relType) { + public void setRelType(String relType) { this.relType = relType; - return this; } public String getSubRelType() { return subRelType; } - public SortableRelationKey setSubRelType(String subRelType) { + public void setSubRelType(String subRelType) { this.subRelType = subRelType; - return this; } public String getRelClass() { return relClass; } - public SortableRelationKey setRelClass(String relClass) { + public void setRelClass(String relClass) { this.relClass = relClass; - return this; } @Override diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java index ded976eea..db639f113 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java @@ -8,21 +8,24 @@ public class Tuple2 { private RelatedEntity relatedEntity; + public Tuple2(Relation relation, RelatedEntity relatedEntity) { + this.relation = relation; + this.relatedEntity = relatedEntity; + } + public Relation getRelation() { return relation; } - public Tuple2 setRelation(Relation relation) { + public void setRelation(Relation relation) { this.relation = relation; - return this; } public RelatedEntity getRelatedEntity() { return relatedEntity; } - public Tuple2 setRelatedEntity(RelatedEntity relatedEntity) { + public void setRelatedEntity(RelatedEntity relatedEntity) { this.relatedEntity = relatedEntity; - return this; } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TypedRow.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TypedRow.java index e275fd9da..54f34802f 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TypedRow.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TypedRow.java @@ -1,92 +1,61 @@ package eu.dnetlib.dhp.oa.provision.model; +import com.google.common.base.Objects; + import java.io.Serializable; public class TypedRow implements Serializable { - private String sourceId; - - private String targetId; + private String id; private Boolean deleted; private String type; - private String relType; - private String subRelType; - private String relClass; - private String oaf; - public String getSourceId() { - return sourceId; + public String getId() { + return id; } - public TypedRow setSourceId(String sourceId) { - this.sourceId = sourceId; - return this; - } - - public String getTargetId() { - return targetId; - } - - public TypedRow setTargetId(String targetId) { - this.targetId = targetId; - return this; + public void setId(String id) { + this.id = id; } public Boolean getDeleted() { return deleted; } - public TypedRow setDeleted(Boolean deleted) { + public void setDeleted(Boolean deleted) { this.deleted = deleted; - return this; } public String getType() { return type; } - public TypedRow setType(String type) { + public void setType(String type) { this.type = type; - return this; - } - - public String getRelType() { - return relType; - } - - public TypedRow setRelType(String relType) { - this.relType = relType; - return this; - } - - public String getSubRelType() { - return subRelType; - } - - public TypedRow setSubRelType(String subRelType) { - this.subRelType = subRelType; - return this; - } - - public String getRelClass() { - return relClass; - } - - public TypedRow setRelClass(String relClass) { - this.relClass = relClass; - return this; } public String getOaf() { return oaf; } - public TypedRow setOaf(String oaf) { + public void setOaf(String oaf) { this.oaf = oaf; - return this; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TypedRow typedRow2 = (TypedRow) o; + return Objects.equal(id, typedRow2.id); + } + + @Override + public int hashCode() { + return Objects.hashCode(id); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java index a48c812fc..27b42e69d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java @@ -3,14 +3,11 @@ package eu.dnetlib.dhp.oa.provision.utils; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicate; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.JsonPath; -import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; -import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; -import eu.dnetlib.dhp.oa.provision.model.TypedRow; +import eu.dnetlib.dhp.oa.provision.model.*; import eu.dnetlib.dhp.schema.oaf.*; import net.minidev.json.JSONArray; import org.apache.commons.lang3.StringUtils; @@ -66,14 +63,14 @@ public class GraphMappingUtils { return MainEntityType.result.name().equals(getMainType(type)); } - public static Predicate instanceFilter = s -> instanceFieldFilter.contains(s); + public static RelatedEntity asRelatedEntity(TypedRow e) { - public static EntityRelEntity asRelatedEntity(EntityRelEntity e) { + final DocumentContext j = JsonPath.parse(e.getOaf()); + final RelatedEntity re = new RelatedEntity(); + re.setId(j.read("$.id")); + re.setType(e.getType()); - final DocumentContext j = JsonPath.parse(e.getSource().getOaf()); - final RelatedEntity re = new RelatedEntity().setId(j.read("$.id")).setType(e.getSource().getType()); - - switch (EntityType.valueOf(e.getSource().getType())) { + switch (EntityType.valueOf(e.getType())) { case publication: case dataset: case otherresearchproduct: @@ -147,14 +144,11 @@ public class GraphMappingUtils { break; } - return new EntityRelEntity().setSource( - new TypedRow() - .setSourceId(e.getSource().getSourceId()) - .setDeleted(e.getSource().getDeleted()) - .setType(e.getSource().getType()) - .setOaf(serialize(re))); + + return re; } + private static KeyValue asKV(LinkedHashMap j) { final KeyValue kv = new KeyValue(); kv.setKey((String) j.get("key")); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java index ffbe54904..f2b3aa2e7 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java @@ -7,9 +7,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.mycila.xmltool.XMLDoc; import com.mycila.xmltool.XMLTag; -import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; -import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; -import eu.dnetlib.dhp.oa.provision.model.Tuple2; +import eu.dnetlib.dhp.oa.provision.model.*; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.lang3.StringUtils; diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index a28174cce..e981c450e 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -65,7 +65,7 @@ yarn cluster build_adjacency_lists - eu.dnetlib.dhp.oa.provision.SparkXmlRecordBuilderJob + eu.dnetlib.dhp.oa.provision.SparkXmlRecordBuilderJob_v2 dhp-graph-provision-${projectVersion}.jar --executor-cores ${sparkExecutorCoresForJoining} @@ -75,6 +75,7 @@ --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.network.timeout=10000000 -mt yarn -is ${isLookupUrl}