From b2691a3b0a09c3d8bb04272e584491f190763f85 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 30 Jan 2020 17:46:29 +0100 Subject: [PATCH] save adjacency list as JoinedEntity --- .../job-override.properties | 3 +- .../eu/dnetlib/dhp/graph/EntityRelEntity.java | 1 + .../eu/dnetlib/dhp/graph/GraphJoiner.java | 224 ++++++++++++++---- .../dnetlib/dhp/graph/GraphMappingUtils.java | 137 +++++++++++ .../eu/dnetlib/dhp/graph/JoinedEntity.java | 44 ++++ .../eu/dnetlib/dhp/graph/MappingUtils.java | 103 -------- .../eu/dnetlib/dhp/graph/RelatedEntity.java | 57 ++++- ...Job.java => SparkXmlRecordBuilderJob.java} | 23 +- .../dhp/graph/input_graph_parameters.json | 2 +- .../dnetlib/dhp/graph/oozie_app/workflow.xml | 11 +- .../dnetlib/dhp/graph/MappingUtilsTest.java | 4 +- 11 files changed, 427 insertions(+), 182 deletions(-) create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/JoinedEntity.java delete mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/MappingUtils.java rename dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/{SparkGraphIndexingJob.java => SparkXmlRecordBuilderJob.java} (54%) diff --git a/dhp-workflows/dhp-graph-provision/job-override.properties b/dhp-workflows/dhp-graph-provision/job-override.properties index 1870b0e6e0..acaf167178 100644 --- a/dhp-workflows/dhp-graph-provision/job-override.properties +++ b/dhp-workflows/dhp-graph-provision/job-override.properties @@ -1,4 +1,5 @@ sparkDriverMemory=7G sparkExecutorMemory=7G hive_db_name=claudio -sourcePath=/tmp/db_openaireplus_services_beta.export.2019.11.06 \ No newline at end of file +sourcePath=/tmp/db_openaireplus_services_beta.export.2019.11.06 +outputPath=/tmp/openaire_provision \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityRelEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityRelEntity.java index b0711bbff2..285cacbc0c 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityRelEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityRelEntity.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.graph; import java.io.Serializable; public class EntityRelEntity implements Serializable { + private TypedRow source; private TypedRow relation; private TypedRow target; diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java index 96d1f150a5..f7bf0da392 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java @@ -1,11 +1,12 @@ package eu.dnetlib.dhp.graph; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.JsonPath; -import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.*; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.api.java.JavaPairRDD; @@ -15,8 +16,10 @@ import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.SparkSession; import scala.Tuple2; +import java.io.IOException; import java.io.Serializable; import java.util.List; +import java.util.stream.Collectors; /** * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. @@ -40,21 +43,32 @@ public class GraphJoiner implements Serializable { public static final int MAX_RELS = 10; - public void join(final SparkSession spark, final String inputPath, final String hiveDbName, final String outPath) { + private SparkSession spark; - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + private String inputPath; + + private String outPath; + + public GraphJoiner(SparkSession spark, String inputPath, String outPath) { + this.spark = spark; + this.inputPath = inputPath; + this.outPath = outPath; + } + + public GraphJoiner adjacencyLists() { + final JavaSparkContext sc = new JavaSparkContext(getSpark().sparkContext()); // read each entity - JavaPairRDD datasource = readPathEntity(sc, inputPath, "datasource"); - JavaPairRDD organization = readPathEntity(sc, inputPath, "organization"); - JavaPairRDD project = readPathEntity(sc, inputPath, "project"); - JavaPairRDD dataset = readPathEntity(sc, inputPath, "dataset"); - JavaPairRDD otherresearchproduct = readPathEntity(sc, inputPath, "otherresearchproduct"); - JavaPairRDD software = readPathEntity(sc, inputPath, "software"); - JavaPairRDD publication = readPathEntity(sc, inputPath, "publication"); + JavaPairRDD datasource = readPathEntity(sc, getInputPath(), "datasource"); + JavaPairRDD organization = readPathEntity(sc, getInputPath(), "organization"); + JavaPairRDD project = readPathEntity(sc, getInputPath(), "project"); + JavaPairRDD dataset = readPathEntity(sc, getInputPath(), "dataset"); + JavaPairRDD otherresearchproduct = readPathEntity(sc, getInputPath(), "otherresearchproduct"); + JavaPairRDD software = readPathEntity(sc, getInputPath(), "software"); + JavaPairRDD publication = readPathEntity(sc, getInputPath(), "publication"); // create the union between all the entities - final String entitiesPath = outPath + "/entities"; + final String entitiesPath = getOutPath() + "/0_entities"; datasource .union(organization) .union(project) @@ -63,7 +77,7 @@ public class GraphJoiner implements Serializable { .union(software) .union(publication) .map(e -> new EntityRelEntity().setSource(e._2())) - .map(e -> new ObjectMapper().writeValueAsString(e)) + .map(GraphMappingUtils::serialize) .saveAsTextFile(entitiesPath, GzipCodec.class); JavaPairRDD entities = sc.textFile(entitiesPath) @@ -71,7 +85,7 @@ public class GraphJoiner implements Serializable { .mapToPair(t -> new Tuple2<>(t.getSource().getSourceId(), t)); // reads the relationships - final JavaPairRDD relation = readPathRelation(sc, inputPath) + final JavaPairRDD relation = readPathRelation(sc, getInputPath()) .filter(r -> !r.getDeleted()) //only consider those that are not virtually deleted .map(p -> new EntityRelEntity().setRelation(p)) .mapToPair(p -> new Tuple2<>(p.getRelation().getSourceId(), p)) @@ -80,45 +94,156 @@ public class GraphJoiner implements Serializable { .flatMap(p -> p.iterator()) .mapToPair(p -> new Tuple2<>(p.getRelation().getTargetId(), p)); - final String joinByTargetPath = outPath + "/join_by_target"; + final String joinByTargetPath = getOutPath() + "/1_join_by_target"; relation .join(entities .filter(e -> !e._2().getSource().getDeleted()) - /*.mapToPair(e -> new Tuple2<>(e._1(), new MappingUtils().pruneModel(e._2())))*/) + .mapToPair(e -> new Tuple2<>(e._1(), new GraphMappingUtils().pruneModel(e._2())))) .map(s -> new EntityRelEntity() .setRelation(s._2()._1().getRelation()) .setTarget(s._2()._2().getSource())) - .map(e -> new ObjectMapper().writeValueAsString(e)) + .map(GraphMappingUtils::serialize) .saveAsTextFile(joinByTargetPath, GzipCodec.class); JavaPairRDD bySource = sc.textFile(joinByTargetPath) .map(t -> new ObjectMapper().readValue(t, EntityRelEntity.class)) .mapToPair(t -> new Tuple2<>(t.getRelation().getSourceId(), t)); + final String linkedEntityPath = getOutPath() + "/2_linked_entities"; entities .union(bySource) .groupByKey() // by source id - .map(p -> { - final LinkedEntity e = new LinkedEntity(); - final List links = Lists.newArrayList(); - for(EntityRelEntity rel : p._2()) { - if (rel.hasMainEntity() & e.getEntity() == null) { - e.setEntity(rel.getSource()); - } - if (rel.hasRelatedEntity()) { - links.add(new Tuple() - .setRelation(rel.getRelation()) - .setTarget(rel.getTarget())); - } - } - e.setLinks(links); - if (e.getEntity() == null) { - throw new IllegalStateException("missing main entity on '" + p._1() + "'"); - } - return e; - }) - .map(e -> new ObjectMapper().writeValueAsString(e)) - .saveAsTextFile(outPath + "/linked_entities", GzipCodec.class); + .map(p -> toLinkedEntity(p)) + .map(e -> new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL).writeValueAsString(e)) + .saveAsTextFile(linkedEntityPath, GzipCodec.class); + + final String joinedEntitiesPath = getOutPath() + "/3_joined_entities"; + sc.textFile(linkedEntityPath) + .map(s -> new ObjectMapper().readValue(s, LinkedEntity.class)) + .map(l -> toJoinedEntity(l)) + .map(j -> new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL).writeValueAsString(j)) + .saveAsTextFile(joinedEntitiesPath); + + return this; + } + + public GraphJoiner asXML() { + final JavaSparkContext sc = new JavaSparkContext(getSpark().sparkContext()); + + final String joinedEntitiesPath = getOutPath() + "/3_joined_entities"; + sc.textFile(joinedEntitiesPath) + .map(s -> new ObjectMapper().readValue(s, LinkedEntity.class)) + .map(l -> toXML(l)) + .saveAsTextFile(getOutPath() + "/4_xml"); + + return this; + } + + private String toXML(LinkedEntity l) { + + return null; + } + + public SparkSession getSpark() { + return spark; + } + + public GraphJoiner setSpark(SparkSession spark) { + this.spark = spark; + return this; + } + + public String getInputPath() { + return inputPath; + } + + public GraphJoiner setInputPath(String inputPath) { + this.inputPath = inputPath; + return this; + } + + public String getOutPath() { + return outPath; + } + + public GraphJoiner setOutPath(String outPath) { + this.outPath = outPath; + return this; + } + + // HELPERS + + private OafEntity parseOaf(final String json, final String type) { + final ObjectMapper o = new ObjectMapper(); + try { + switch (type) { + case "publication": + return o.readValue(json, Publication.class); + case "dataset": + return o.readValue(json, Dataset.class); + case "otherresearchproduct": + return o.readValue(json, OtherResearchProduct.class); + case "software": + return o.readValue(json, Software.class); + case "datasource": + return o.readValue(json, Datasource.class); + case "organization": + return o.readValue(json, Organization.class); + case "project": + return o.readValue(json, Project.class); + default: + throw new IllegalArgumentException("invalid type: " + type); + } + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + + /** + * Converts the result of grouping pairs and the entities by source id to LinkedEntity + * @param p + * @return + */ + private LinkedEntity toLinkedEntity(Tuple2> p) { + final LinkedEntity e = new LinkedEntity(); + final List links = Lists.newArrayList(); + for(EntityRelEntity rel : p._2()) { + if (rel.hasMainEntity() & e.getEntity() == null) { + e.setEntity(rel.getSource()); + } + if (rel.hasRelatedEntity()) { + links.add(new Tuple() + .setRelation(rel.getRelation()) + .setTarget(rel.getTarget())); + } + } + e.setLinks(links); + if (e.getEntity() == null) { + throw new IllegalStateException("missing main entity on '" + p._1() + "'"); + } + return e; + } + + /** + * Converts a LinkedEntity to a JoinedEntity + * @param l + * @return + */ + private JoinedEntity toJoinedEntity(LinkedEntity l) { + return new JoinedEntity().setType(l.getEntity().getType()) + .setEntity(parseOaf(l.getEntity().getOaf(), l.getEntity().getType())) + .setLinks(l.getLinks() + .stream() + .map(t -> { + final ObjectMapper o = new ObjectMapper(); + try { + return new Tuple2<>( + o.readValue(t.getRelation().getOaf(), Relation.class), + o.readValue(t.getTarget().getOaf(), RelatedEntity.class)); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + }).collect(Collectors.toList())); } /** @@ -132,14 +257,14 @@ public class GraphJoiner implements Serializable { private JavaPairRDD readPathEntity(final JavaSparkContext sc, final String inputPath, final String type) { return sc.sequenceFile(inputPath + "/" + type, Text.class, Text.class) .mapToPair((PairFunction, String, TypedRow>) item -> { - - final String json = item._2().toString(); - final String id = JsonPath.read(json, "$.id"); + final String s = item._2().toString(); + final DocumentContext json = JsonPath.parse(s); + final String id = json.read("$.id"); return new Tuple2<>(id, new TypedRow() - .setSourceId(id) - .setDeleted(JsonPath.read(json, "$.dataInfo.deletedbyinference")) - .setType(type) - .setOaf(json)); + .setSourceId(id) + .setDeleted(json.read("$.dataInfo.deletedbyinference")) + .setType(type) + .setOaf(s)); }); } @@ -153,13 +278,14 @@ public class GraphJoiner implements Serializable { private JavaRDD readPathRelation(final JavaSparkContext sc, final String inputPath) { return sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) .map(item -> { - final String json = item._2().toString(); + final String s = item._2().toString(); + final DocumentContext json = JsonPath.parse(s); return new TypedRow() - .setSourceId(JsonPath.read(json, "$.source")) - .setTargetId(JsonPath.read(json, "$.target")) - .setDeleted(JsonPath.read(json, "$.dataInfo.deletedbyinference")) + .setSourceId(json.read("$.source")) + .setTargetId(json.read("$.target")) + .setDeleted(json.read("$.dataInfo.deletedbyinference")) .setType("relation") - .setOaf(json); + .setOaf(s); }); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java index ab19ff2b51..e3622cd204 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java @@ -1,9 +1,18 @@ package eu.dnetlib.dhp.graph; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; import eu.dnetlib.dhp.schema.oaf.*; +import net.minidev.json.JSONArray; +import org.apache.commons.lang3.StringUtils; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.stream.Collectors; public class GraphMappingUtils { @@ -20,4 +29,132 @@ public class GraphMappingUtils { types.put("relation", Relation.class); } + public static EntityRelEntity pruneModel(EntityRelEntity e) { + + final DocumentContext j = JsonPath.parse(e.getSource().getOaf()); + final RelatedEntity re = new RelatedEntity().setId(j.read("$.id")).setType(e.getSource().getType()); + + switch (e.getSource().getType()) { + case "publication": + case "dataset": + case "otherresearchproduct": + case "software": + mapTitle(j, re); + re.setDateofacceptance(j.read("$.dateofacceptance.value")); + re.setPublisher(j.read("$.publisher.value")); + + JSONArray pids = j.read("$.pid"); + re.setPid(pids.stream() + .map(p -> asStructuredProperty((LinkedHashMap) p)) + .collect(Collectors.toList())); + + re.setResulttype(asQualifier(j.read("$.resulttype"))); + + JSONArray collfrom = j.read("$.collectedfrom"); + re.setCollectedfrom(collfrom.stream() + .map(c -> asKV((LinkedHashMap)c)) + .collect(Collectors.toList())); + + //TODO still to be mapped + //re.setCodeRepositoryUrl(j.read("$.coderepositoryurl")); + + break; + case "datasource": + re.setOfficialname(j.read("$.officialname.value")); + re.setWebsiteurl(j.read("$.websiteurl.value")); + re.setDatasourcetype(asQualifier(j.read("$.datasourcetype"))); + re.setOpenairecompatibility(asQualifier(j.read("$.openairecompatibility"))); + + break; + case "organization": + re.setLegalname(j.read("$.legalname.value")); + re.setLegalshortname(j.read("$.legalshortname.value")); + re.setCountry(asQualifier(j.read("$.country"))); + + break; + case "project": + re.setProjectTitle(j.read("$.title.value")); + re.setCode(j.read("$.code.value")); + re.setAcronym(j.read("$.acronym.value")); + re.setContracttype(asQualifier(j.read("$.contracttype"))); + + JSONArray f = j.read("$.fundingtree"); + if (!f.isEmpty()) { + re.setFundingtree(f.stream() + .map(s -> s.toString()) + .collect(Collectors.toList())); + } + + break; + } + return new EntityRelEntity().setSource( + new TypedRow() + .setSourceId(e.getSource().getSourceId()) + .setDeleted(e.getSource().getDeleted()) + .setType(e.getSource().getType()) + .setOaf(serialize(re))); + } + + private static KeyValue asKV(LinkedHashMap j) { + final KeyValue kv = new KeyValue(); + kv.setKey((String) j.get("key")); + kv.setValue((String) j.get("value")); + return kv; + } + + private static void mapTitle(DocumentContext j, RelatedEntity re) { + final JSONArray a = j.read("$.title"); + if (!a.isEmpty()) { + final StructuredProperty sp = asStructuredProperty((LinkedHashMap) a.get(0)); + if(StringUtils.isNotBlank(sp.getValue())) { + re.setTitle(sp); + } + } + } + + private static StructuredProperty asStructuredProperty(LinkedHashMap j) { + final StructuredProperty sp = new StructuredProperty(); + final String value = (String) j.get("value"); + if (StringUtils.isNotBlank(value)) { + sp.setValue((String) j.get("value")); + sp.setQualifier(asQualifier((LinkedHashMap) j.get("qualifier"))); + } + return sp; + } + + public static Qualifier asQualifier(LinkedHashMap j) { + final Qualifier q = new Qualifier(); + + final String classid = j.get("classid"); + if (StringUtils.isNotBlank(classid)) { + q.setClassid(classid); + } + + final String classname = j.get("classname"); + if (StringUtils.isNotBlank(classname)) { + q.setClassname(classname); + } + + final String schemeid = j.get("schemeid"); + if (StringUtils.isNotBlank(schemeid)) { + q.setSchemeid(schemeid); + } + + final String schemename = j.get("schemename"); + if (StringUtils.isNotBlank(schemename)) { + q.setSchemename(schemename); + } + return q; + } + + public static String serialize(final Object o) { + try { + return new ObjectMapper() + .setSerializationInclusion(JsonInclude.Include.NON_NULL) + .writeValueAsString(o); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("unable to serialize: " + o.toString(), e); + } + } + } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/JoinedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/JoinedEntity.java new file mode 100644 index 0000000000..d65eb64c88 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/JoinedEntity.java @@ -0,0 +1,44 @@ +package eu.dnetlib.dhp.graph; + +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Relation; +import scala.Tuple2; + +import java.io.Serializable; +import java.util.List; + +public class JoinedEntity implements Serializable { + + private String type; + + private OafEntity entity; + + private List> links; + + public String getType() { + return type; + } + + public JoinedEntity setType(String type) { + this.type = type; + return this; + } + + public OafEntity getEntity() { + return entity; + } + + public JoinedEntity setEntity(OafEntity entity) { + this.entity = entity; + return this; + } + + public List> getLinks() { + return links; + } + + public JoinedEntity setLinks(List> links) { + this.links = links; + return this; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/MappingUtils.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/MappingUtils.java deleted file mode 100644 index 756506c120..0000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/MappingUtils.java +++ /dev/null @@ -1,103 +0,0 @@ -package eu.dnetlib.dhp.graph; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.JsonPath; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -import net.minidev.json.JSONArray; - -import java.util.LinkedHashMap; -import java.util.stream.Collectors; - -public class MappingUtils { - - public EntityRelEntity pruneModel(EntityRelEntity e) throws JsonProcessingException { - - final DocumentContext j = JsonPath.parse(e.getSource().getOaf()); - final RelatedEntity re = new RelatedEntity(); - - switch (e.getSource().getType()) { - case "publication": - case "dataset": - case "otherresearchproduct": - case "software": - - mapTitle(j, re); - re.setDateofacceptance(j.read("$.dateofacceptance.value")); - re.setPublisher(j.read("$.publisher.value")); - - JSONArray pids = j.read("$.pid"); - re.setPid(pids.stream() - .map(p -> asStructuredProperty((LinkedHashMap) p)) - .collect(Collectors.toList())); - - re.setResulttype(asQualifier(j.read("$.resulttype"))); - - JSONArray collfrom = j.read("$.collectedfrom"); - re.setCollectedfrom(collfrom.stream() - .map(c -> asKV((LinkedHashMap)c)) - .collect(Collectors.toList())); - - //TODO still to be mapped - //re.setCodeRepositoryUrl(j.read("$.coderepositoryurl")); - - break; - case "datasource": - re.setOfficialname(j.read("$.officialname.value")); - re.setWebsiteurl(j.read("$.websiteurl.value")); - - re.setDatasourcetype(asQualifier(j.read("$.datasourcetype"))); - re.setOpenairecompatibility(asQualifier(j.read("$.openairecompatibility"))); - - break; - case "organization": - - break; - case "project": - mapTitle(j, re); - break; - } - - return new EntityRelEntity().setSource( - new TypedRow() - .setSourceId(e.getSource().getSourceId()) - .setDeleted(e.getSource().getDeleted()) - .setType(e.getSource().getType()) - .setOaf(new ObjectMapper().writeValueAsString(re))); - } - - private KeyValue asKV(LinkedHashMap j) { - final KeyValue kv = new KeyValue(); - kv.setKey((String) j.get("key")); - kv.setValue((String) j.get("value")); - return kv; - } - - private void mapTitle(DocumentContext j, RelatedEntity re) { - JSONArray a = j.read("$.title"); - if (!a.isEmpty()) { - re.setTitle(asStructuredProperty((LinkedHashMap) a.get(0))); - } - } - - private StructuredProperty asStructuredProperty(LinkedHashMap j) { - final StructuredProperty sp = new StructuredProperty(); - sp.setValue((String) j.get("value")); - sp.setQualifier(asQualifier((LinkedHashMap) j.get("qualifier"))); - return sp; - - } - - public Qualifier asQualifier(LinkedHashMap j) { - Qualifier q = new Qualifier(); - q.setClassid(j.get("classid")); - q.setClassname(j.get("classname")); - q.setSchemeid(j.get("schemeid")); - q.setSchemename(j.get("schemename")); - return q; - } - -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java index a441392b27..50b97daced 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java @@ -3,14 +3,22 @@ package eu.dnetlib.dhp.graph; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import org.codehaus.jackson.map.ObjectMapper; +import java.io.IOException; import java.io.Serializable; import java.util.List; public class RelatedEntity implements Serializable { + private String id; + private String type; + + // common fields + private StructuredProperty title; + private String websiteurl; // datasource, organizations, projects + // results - private StructuredProperty title; // also for projects private String dateofacceptance; private String publisher; private List pid; @@ -20,11 +28,10 @@ public class RelatedEntity implements Serializable { // datasource private String officialname; - private String websiteurl; // also for organizations, projects private Qualifier datasourcetype; private Qualifier datasourcetypeui; - //private String aggregatortype; private Qualifier openairecompatibility; + //private String aggregatortype; // organization private String legalname; @@ -32,10 +39,28 @@ public class RelatedEntity implements Serializable { private Qualifier country; // project + private String projectTitle; private String code; private String acronym; private Qualifier contracttype; - private String fundingtree; + private List fundingtree; + + public static RelatedEntity parse(final String json) { + try { + return new ObjectMapper().readValue(json, RelatedEntity.class); + } catch (IOException e) { + throw new IllegalArgumentException("invalid RelatedEntity, cannot parse: " + json); + } + } + + public String getId() { + return id; + } + + public RelatedEntity setId(String id) { + this.id = id; + return this; + } public StructuredProperty getTitle() { return title; @@ -199,12 +224,30 @@ public class RelatedEntity implements Serializable { return this; } - public String getFundingtree() { + public List getFundingtree() { return fundingtree; } - public RelatedEntity setFundingtree(String fundingtree) { + public RelatedEntity setFundingtree(List fundingtree) { this.fundingtree = fundingtree; return this; } -} + + public String getProjectTitle() { + return projectTitle; + } + + public RelatedEntity setProjectTitle(String projectTitle) { + this.projectTitle = projectTitle; + return this; + } + + public String getType() { + return type; + } + + public RelatedEntity setType(String type) { + this.type = type; + return this; + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlRecordBuilderJob.java similarity index 54% rename from dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java rename to dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlRecordBuilderJob.java index 1d55dda89b..38bc2bae2e 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlRecordBuilderJob.java @@ -7,37 +7,34 @@ import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; -public class SparkGraphIndexingJob { - - private final static String OUTPUT_BASE_PATH = "/tmp/openaire_provision"; +public class SparkXmlRecordBuilderJob { public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphIndexingJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json"))); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkXmlRecordBuilderJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("hive.metastore.uris", parser.get("hive_metastore_uris")); + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); final SparkSession spark = SparkSession .builder() .config(conf) - .appName(SparkGraphIndexingJob.class.getSimpleName()) + .appName(SparkXmlRecordBuilderJob.class.getSimpleName()) .master(parser.get("master")) - .enableHiveSupport() .getOrCreate(); final String inputPath = parser.get("sourcePath"); - final String hiveDbName = parser.get("hive_db_name"); + final String outputPath = parser.get("outputPath"); final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); - if (fs.exists(new Path(OUTPUT_BASE_PATH))) { - fs.delete(new Path(OUTPUT_BASE_PATH), true); - fs.mkdirs(new Path(OUTPUT_BASE_PATH)); + if (fs.exists(new Path(outputPath))) { + fs.delete(new Path(outputPath), true); + fs.mkdirs(new Path(outputPath)); } - new GraphJoiner().join(spark, inputPath, hiveDbName, OUTPUT_BASE_PATH); + new GraphJoiner(spark, inputPath, outputPath) + .adjacencyLists(); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json index a197abc781..3a02ab1a03 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json @@ -1,6 +1,6 @@ [ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, {"paramName":"h", "paramLongName":"hive_metastore_uris","paramDescription": "the hive metastore uris", "paramRequired": true}, - {"paramName":"db", "paramLongName":"hive_db_name", "paramDescription": "the target hive database name", "paramRequired": true}, + {"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the path used to store temporary output files", "paramRequired": true}, {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml index a91759ade5..4b4d2c7bf3 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml @@ -26,20 +26,20 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + ${jobTracker} ${nameNode} yarn-cluster cluster - GraphIndexing - eu.dnetlib.dhp.graph.SparkGraphIndexingJob + build_adjacency_lists + eu.dnetlib.dhp.graph.SparkXmlRecordBuilderJob dhp-graph-provision-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} @@ -53,8 +53,7 @@ -mt yarn-cluster --sourcePath${sourcePath} - --hive_db_name${hive_db_name} - --hive_metastore_uris${hive_metastore_uris} + --outputPath${outputPath} diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/MappingUtilsTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/MappingUtilsTest.java index 2edb0aa70a..0deb3d81a1 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/MappingUtilsTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/MappingUtilsTest.java @@ -9,11 +9,11 @@ import java.io.InputStreamReader; public class MappingUtilsTest { - private MappingUtils utils; + private GraphMappingUtils utils; @Before public void setUp() { - utils = new MappingUtils(); + utils = new GraphMappingUtils(); } @Test