1
0
Fork 0

dataset based provision WIP

This commit is contained in:
Claudio Atzori 2020-04-06 15:33:31 +02:00
parent c8f4b95464
commit ca345aaad3
10 changed files with 124 additions and 118 deletions

View File

@ -3,31 +3,25 @@ package eu.dnetlib.dhp.oa.provision;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.*; import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils; import eu.dnetlib.dhp.oa.provision.model.Tuple2;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.rdd.RDD; import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.Encode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.util.ArrayList;
import java.util.Iterator; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*; import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.getKryoClasses;
/** /**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
@ -43,14 +37,19 @@ import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*;
* can be linked at most to 100 other objects * can be linked at most to 100 other objects
* *
* 2) JoinRelationEntityByTargetJob: * 2) JoinRelationEntityByTargetJob:
* prepare tuples [source entity - relation - target entity] (S - R - T): * (phase 1): prepare tuples [relation - target entity] (R - T):
* for each entity type E_i * for each entity type E_i
* join (R.target = E_i.id), * map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information
* map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i] * join (R.target = T_i.id)
* join (E_i.id = [R - T_i].source), where E_i becomes the source entity S * save the tuples (R_i, T_i)
* (phase 2):
* create the union of all the entity types E, hash by id
* read the tuples (R, T), hash by R.source
* join E.id = (R, T).source, where E becomes the Source Entity S
* save the tuples (S, R, T)
* *
* 3) AdjacencyListBuilderJob: * 3) AdjacencyListBuilderJob:
* given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the result as JoinedEntity
* *
* 4) XmlConverterJob: * 4) XmlConverterJob:
* convert the JoinedEntities as XML records * convert the JoinedEntities as XML records
@ -59,7 +58,6 @@ public class AdjacencyListBuilderJob {
private static final Logger log = LoggerFactory.getLogger(AdjacencyListBuilderJob.class); private static final Logger log = LoggerFactory.getLogger(AdjacencyListBuilderJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final int MAX_LINKS = 100; public static final int MAX_LINKS = 100;
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@ -91,7 +89,6 @@ public class AdjacencyListBuilderJob {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
createAdjacencyLists(spark, inputPath, outputPath); createAdjacencyLists(spark, inputPath, outputPath);
}); });
} }
private static void createAdjacencyLists(SparkSession spark, String inputPath, String outputPath) { private static void createAdjacencyLists(SparkSession spark, String inputPath, String outputPath) {
@ -103,7 +100,7 @@ public class AdjacencyListBuilderJob {
.groupByKey((MapFunction<EntityRelEntity, String>) value -> value.getEntity().getId(), Encoders.STRING()) .groupByKey((MapFunction<EntityRelEntity, String>) value -> value.getEntity().getId(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, EntityRelEntity, JoinedEntity>) (key, values) -> { .mapGroups((MapGroupsFunction<String, EntityRelEntity, JoinedEntity>) (key, values) -> {
JoinedEntity j = new JoinedEntity(); JoinedEntity j = new JoinedEntity();
Links links = new Links(); List<Tuple2> links = new ArrayList<>();
while (values.hasNext() && links.size() < MAX_LINKS) { while (values.hasNext() && links.size() < MAX_LINKS) {
EntityRelEntity curr = values.next(); EntityRelEntity curr = values.next();
if (j.getEntity() == null) { if (j.getEntity() == null) {

View File

@ -4,9 +4,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils; import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
@ -37,22 +37,22 @@ import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*;
* only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity
* can be linked at most to 100 other objects * can be linked at most to 100 other objects
* *
* 2) CreateRelatedEntitiesJob_phase1: * 2) JoinRelationEntityByTargetJob:
* prepare tuples [relation - target entity] (R - T): * (phase 1): prepare tuples [relation - target entity] (R - T):
* for each entity type E_i * for each entity type E_i
* join (R.target = E_i.id), * map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information
* map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i] * join (R.target = T_i.id)
* save the tuples (R_i, T_i)
* (phase 2):
* create the union of all the entity types E, hash by id
* read the tuples (R, T), hash by R.source
* join E.id = (R, T).source, where E becomes the Source Entity S
* save the tuples (S, R, T)
* *
* 3) CreateRelatedEntitiesJob_phase2: * 3) AdjacencyListBuilderJob:
* prepare tuples [source entity - relation - target entity] (S - R - T):
* create the union of the each entity type, hash by id (S)
* for each [R - T_i] produced in phase1
* join S.id = [R - T_i].source to produce (S_i - R - T_i)
*
* 4) AdjacencyListBuilderJob:
* given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the result as JoinedEntity * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the result as JoinedEntity
* *
* 5) XmlConverterJob: * 4) XmlConverterJob:
* convert the JoinedEntities as XML records * convert the JoinedEntities as XML records
*/ */
public class CreateRelatedEntitiesJob_phase1 { public class CreateRelatedEntitiesJob_phase1 {
@ -103,20 +103,21 @@ public class CreateRelatedEntitiesJob_phase1 {
private static <E extends OafEntity> void joinRelationEntity(SparkSession spark, String inputRelationsPath, String inputEntityPath, Class<E> entityClazz, String outputPath) { private static <E extends OafEntity> void joinRelationEntity(SparkSession spark, String inputRelationsPath, String inputEntityPath, Class<E> entityClazz, String outputPath) {
Dataset<Tuple2<String, SortableRelation>> relsByTarget = readPathRelation(spark, inputRelationsPath) Dataset<Tuple2<String, SortableRelation>> relsByTarget = readPathRelation(spark, inputRelationsPath)
.filter((FilterFunction<SortableRelation>) value -> value.getDataInfo().getDeletedbyinference() == false)
.map((MapFunction<SortableRelation, Tuple2<String, SortableRelation>>) r -> new Tuple2<>(r.getTarget(), r), .map((MapFunction<SortableRelation, Tuple2<String, SortableRelation>>) r -> new Tuple2<>(r.getTarget(), r),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(SortableRelation.class))); Encoders.tuple(Encoders.STRING(), Encoders.kryo(SortableRelation.class)))
.cache();
Dataset<Tuple2<String, E>> entities = readPathEntity(spark, inputEntityPath, entityClazz) Dataset<Tuple2<String, RelatedEntity>> entities = readPathEntity(spark, inputEntityPath, entityClazz)
.map((MapFunction<E, Tuple2<String, E>>) e -> new Tuple2<>(e.getId(), e), .map((MapFunction<E, RelatedEntity>) value -> asRelatedEntity(value, entityClazz), Encoders.bean(RelatedEntity.class))
Encoders.tuple(Encoders.STRING(), Encoders.kryo(entityClazz))) .map((MapFunction<RelatedEntity, Tuple2<String, RelatedEntity>>) e -> new Tuple2<>(e.getId(), e),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class)))
.cache(); .cache();
relsByTarget relsByTarget
.joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner") .joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner")
.filter((FilterFunction<Tuple2<Tuple2<String, SortableRelation>, Tuple2<String, E>>>) .map((MapFunction<Tuple2<Tuple2<String, SortableRelation>, Tuple2<String, RelatedEntity>>, EntityRelEntity>)
value -> value._2()._2().getDataInfo().getDeletedbyinference() == false) t -> new EntityRelEntity(t._1()._2(), t._2()._2()),
.map((MapFunction<Tuple2<Tuple2<String, SortableRelation>, Tuple2<String, E>>, EntityRelEntity>)
t -> new EntityRelEntity(t._1()._2(), GraphMappingUtils.asRelatedEntity(t._2()._2(), entityClazz)),
Encoders.bean(EntityRelEntity.class)) Encoders.bean(EntityRelEntity.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)

View File

@ -6,11 +6,9 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity;
import eu.dnetlib.dhp.oa.provision.model.TypedRow; import eu.dnetlib.dhp.oa.provision.model.TypedRow;
import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
@ -25,12 +23,10 @@ import scala.collection.JavaConverters;
import scala.collection.Seq; import scala.collection.Seq;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.function.Function;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*; import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.getKryoClasses;
/** /**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
@ -45,24 +41,22 @@ import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*;
* only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity
* can be linked at most to 100 other objects * can be linked at most to 100 other objects
* *
* 2) CreateRelatedEntitiesJob_phase1: * 2) JoinRelationEntityByTargetJob:
* prepare tuples [relation - target entity] (R - T): * (phase 1): prepare tuples [relation - target entity] (R - T):
* for each entity type E_i * for each entity type E_i
* join (R.target = E_i.id), * map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information
* map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i] * join (R.target = T_i.id)
* save the tuples [R - T_i] in append mode * save the tuples (R_i, T_i)
* (phase 2):
* create the union of all the entity types E, hash by id
* read the tuples (R, T), hash by R.source
* join E.id = (R, T).source, where E becomes the Source Entity S
* save the tuples (S, R, T)
* *
* 3) CreateRelatedEntitiesJob_phase2: * 3) AdjacencyListBuilderJob:
* prepare tuples [source entity - relation - target entity] (S - R - T): * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the result as JoinedEntity
* create the union of the each entity type, hash by id (S)
* for each [R - T_i] produced in phase1
* join S.id = [R - T_i].source to produce (S_i - R - T_i)
* save in append mode
* *
* 4) AdjacencyListBuilderJob: * 4) XmlConverterJob:
* given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity
*
* 5) XmlConverterJob:
* convert the JoinedEntities as XML records * convert the JoinedEntities as XML records
*/ */
public class CreateRelatedEntitiesJob_phase2 { public class CreateRelatedEntitiesJob_phase2 {

View File

@ -1,31 +1,22 @@
package eu.dnetlib.dhp.oa.provision; package eu.dnetlib.dhp.oa.provision;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapGroupsFunction; import org.apache.spark.api.java.function.FlatMapGroupsFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Dataset;
import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.*; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.math.Ordering;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
@ -44,14 +35,19 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
* can be linked at most to 100 other objects * can be linked at most to 100 other objects
* *
* 2) JoinRelationEntityByTargetJob: * 2) JoinRelationEntityByTargetJob:
* prepare tuples [source entity - relation - target entity] (S - R - T): * (phase 1): prepare tuples [relation - target entity] (R - T):
* for each entity type E_i * for each entity type E_i
* join (R.target = E_i.id), * map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information
* map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i] * join (R.target = T_i.id)
* join (E_i.id = [R - T_i].source), where E_i becomes the source entity S * save the tuples (R_i, T_i)
* (phase 2):
* create the union of all the entity types E, hash by id
* read the tuples (R, T), hash by R.source
* join E.id = (R, T).source, where E becomes the Source Entity S
* save the tuples (S, R, T)
* *
* 3) AdjacencyListBuilderJob: * 3) AdjacencyListBuilderJob:
* given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the result as JoinedEntity
* *
* 4) XmlConverterJob: * 4) XmlConverterJob:
* convert the JoinedEntities as XML records * convert the JoinedEntities as XML records

View File

@ -1,5 +1,6 @@
package eu.dnetlib.dhp.oa.provision; package eu.dnetlib.dhp.oa.provision;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
@ -27,8 +28,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
@ -37,23 +41,25 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
* The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization,
* and all the possible relationships (similarity links produced by the Dedup process are excluded). * and all the possible relationships (similarity links produced by the Dedup process are excluded).
* *
* The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again
* by E, finally grouped by E.id;
*
* The workflow is organized in different parts aimed to to reduce the complexity of the operation * The workflow is organized in different parts aimed to to reduce the complexity of the operation
* 1) PrepareRelationsJob: * 1) PrepareRelationsJob:
* only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity
* can be linked at most to 100 other objects * can be linked at most to 100 other objects
* *
* 2) JoinRelationEntityByTargetJob: * 2) JoinRelationEntityByTargetJob:
* prepare tuples [source entity - relation - target entity] (S - R - T): * (phase 1): prepare tuples [relation - target entity] (R - T):
* for each entity type E_i * for each entity type E_i
* join (R.target = E_i.id), * map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information
* map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i] * join (R.target = T_i.id)
* join (E_i.id = [R - T_i].source), where E_i becomes the source entity S * save the tuples (R_i, T_i)
* (phase 2):
* create the union of all the entity types E, hash by id
* read the tuples (R, T), hash by R.source
* join E.id = (R, T).source, where E becomes the Source Entity S
* save the tuples (S, R, T)
* *
* 3) AdjacencyListBuilderJob: * 3) AdjacencyListBuilderJob:
* given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the result as JoinedEntity
* *
* 4) XmlConverterJob: * 4) XmlConverterJob:
* convert the JoinedEntities as XML records * convert the JoinedEntities as XML records
@ -62,6 +68,8 @@ public class XmlConverterJob {
private static final Logger log = LoggerFactory.getLogger(XmlConverterJob.class); private static final Logger log = LoggerFactory.getLogger(XmlConverterJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd"; public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd";
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@ -107,12 +115,31 @@ public class XmlConverterJob {
spark.read() spark.read()
.load(inputPath) .load(inputPath)
.as(Encoders.bean(JoinedEntity.class)) .as(Encoders.bean(JoinedEntity.class))
/* .map((MapFunction<JoinedEntity, String>) value -> OBJECT_MAPPER.writeValueAsString(value), Encoders.STRING())
.write()
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.text("/tmp/json");
spark.read()
.textFile("/tmp/json")
.map((MapFunction<String, JoinedEntity>) value -> OBJECT_MAPPER.readValue(value, JoinedEntity.class), Encoders.bean(JoinedEntity.class))
.map((MapFunction<JoinedEntity, JoinedEntity>) j -> {
if (j.getLinks() != null) {
j.setLinks(j.getLinks()
.stream()
.filter(t -> t.getRelation() != null & t.getRelatedEntity() != null)
.collect(Collectors.toCollection(ArrayList::new)));
}
return j;
}, Encoders.bean(JoinedEntity.class))
*/
.map((MapFunction<JoinedEntity, Tuple2<String, String>>) je -> new Tuple2<>( .map((MapFunction<JoinedEntity, Tuple2<String, String>>) je -> new Tuple2<>(
je.getEntity().getId(), je.getEntity().getId(),
recordFactory.build(je) recordFactory.build(je)
), Encoders.tuple(Encoders.STRING(), Encoders.STRING())) ), Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
.javaRDD() .javaRDD()
.mapToPair((PairFunction<Tuple2<String, String>, String, String>) t -> t) .mapToPair((PairFunction<Tuple2<String, String>, Text, Text>) t -> new Tuple2<>(new Text(t._1()), new Text(t._2())))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
} }

View File

@ -1,12 +1,13 @@
package eu.dnetlib.dhp.oa.provision.model; package eu.dnetlib.dhp.oa.provision.model;
import java.io.Serializable; import java.io.Serializable;
import java.util.List;
public class JoinedEntity implements Serializable { public class JoinedEntity implements Serializable {
private TypedRow entity; private TypedRow entity;
private Links links; private List<Tuple2> links;
public JoinedEntity() { public JoinedEntity() {
} }
@ -19,11 +20,11 @@ public class JoinedEntity implements Serializable {
this.entity = entity; this.entity = entity;
} }
public Links getLinks() { public List<Tuple2> getLinks() {
return links; return links;
} }
public void setLinks(Links links) { public void setLinks(List<Tuple2> links) {
this.links = links; this.links = links;
} }
} }

View File

@ -1,10 +0,0 @@
package eu.dnetlib.dhp.oa.provision.model;
import java.io.Serializable;
import java.util.HashSet;
public class Links extends HashSet<Tuple2> implements Serializable {
public Links() {
}
}

View File

@ -114,7 +114,7 @@ public class GraphMappingUtils {
final RelatedEntity re = new RelatedEntity(); final RelatedEntity re = new RelatedEntity();
re.setId(entity.getId()); re.setId(entity.getId());
re.setType(clazz.getName()); re.setType(EntityType.fromClass(clazz).name());
re.setPid(entity.getPid()); re.setPid(entity.getPid());
re.setCollectedfrom(entity.getCollectedfrom()); re.setCollectedfrom(entity.getCollectedfrom());
@ -125,16 +125,16 @@ public class GraphMappingUtils {
case otherresearchproduct: case otherresearchproduct:
case software: case software:
Result r = (Result) entity; Result result = (Result) entity;
if (r.getTitle() == null && !r.getTitle().isEmpty()) { if (result.getTitle() == null && !result.getTitle().isEmpty()) {
re.setTitle(r.getTitle().stream().findFirst().get()); re.setTitle(result.getTitle().stream().findFirst().get());
} }
re.setDateofacceptance(getValue(r.getDateofacceptance())); re.setDateofacceptance(getValue(result.getDateofacceptance()));
re.setPublisher(getValue(r.getPublisher())); re.setPublisher(getValue(result.getPublisher()));
re.setResulttype(re.getResulttype()); re.setResulttype(result.getResulttype());
re.setInstances(re.getInstances()); re.setInstances(result.getInstance());
//TODO still to be mapped //TODO still to be mapped
//re.setCodeRepositoryUrl(j.read("$.coderepositoryurl")); //re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));

View File

@ -694,7 +694,7 @@ public class XmlRecordFactory implements Serializable {
if (isNotBlank(re.getCodeRepositoryUrl())) { if (isNotBlank(re.getCodeRepositoryUrl())) {
metadata.add(XmlSerializationUtils.asXmlElement("coderepositoryurl", re.getCodeRepositoryUrl())); metadata.add(XmlSerializationUtils.asXmlElement("coderepositoryurl", re.getCodeRepositoryUrl()));
} }
if (re.getResulttype() != null & !re.getResulttype().isBlank()) { if (re.getResulttype() != null & re.getResulttype().isBlank()) {
metadata.add(XmlSerializationUtils.mapQualifier("resulttype", re.getResulttype())); metadata.add(XmlSerializationUtils.mapQualifier("resulttype", re.getResulttype()));
} }
if (re.getCollectedfrom() != null) { if (re.getCollectedfrom() != null) {

View File

@ -75,7 +75,7 @@
<switch> <switch>
<case to="prepare_relations">${wf:conf('reuseRecords') eq false}</case> <case to="prepare_relations">${wf:conf('reuseRecords') eq false}</case>
<case to="to_solr_index">${wf:conf('reuseRecords') eq true}</case> <case to="to_solr_index">${wf:conf('reuseRecords') eq true}</case>
<default to="adjancency_lists"/> <default to="prepare_relations"/>
</switch> </switch>
</decision> </decision>
@ -132,7 +132,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg> <arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
@ -324,7 +324,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputGraphRootPath</arg><arg>${inputGraphRootPath}</arg> <arg>--inputGraphRootPath</arg><arg>${inputGraphRootPath}</arg>
@ -351,7 +351,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputPath</arg> <arg>${workingDir}/join_entities</arg> <arg>--inputPath</arg> <arg>${workingDir}/join_entities</arg>
@ -365,7 +365,7 @@
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>build_adjacency_lists</name> <name>convert_to_xml</name>
<class>eu.dnetlib.dhp.oa.provision.XmlConverterJob</class> <class>eu.dnetlib.dhp.oa.provision.XmlConverterJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar> <jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>