dataset based provision WIP

This commit is contained in:
Claudio Atzori 2020-04-06 08:59:58 +02:00
parent eb2f5f3198
commit c8f4b95464
13 changed files with 224 additions and 180 deletions

View File

@ -9,19 +9,22 @@ import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.*;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.Encode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.IOException;
import java.util.Iterator;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*;
@ -57,6 +60,7 @@ public class AdjacencyListBuilderJob {
private static final Logger log = LoggerFactory.getLogger(AdjacencyListBuilderJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final int MAX_LINKS = 100;
public static void main(String[] args) throws Exception {
@ -92,72 +96,27 @@ public class AdjacencyListBuilderJob {
private static void createAdjacencyLists(SparkSession spark, String inputPath, String outputPath) {
RDD<JoinedEntity> joined = spark.read()
log.info("Reading joined entities from: {}", inputPath);
spark.read()
.load(inputPath)
.as(Encoders.kryo(EntityRelEntity.class))
.javaRDD()
.map(e -> getJoinedEntity(e))
.mapToPair(e -> new Tuple2<>(e.getEntity().getId(), e))
.reduceByKey((j1, j2) -> getJoinedEntity(j1, j2))
.map(Tuple2::_2)
.rdd();
spark.createDataset(joined, Encoders.bean(JoinedEntity.class))
.as(Encoders.bean(EntityRelEntity.class))
.groupByKey((MapFunction<EntityRelEntity, String>) value -> value.getEntity().getId(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, EntityRelEntity, JoinedEntity>) (key, values) -> {
JoinedEntity j = new JoinedEntity();
Links links = new Links();
while (values.hasNext() && links.size() < MAX_LINKS) {
EntityRelEntity curr = values.next();
if (j.getEntity() == null) {
j.setEntity(curr.getEntity());
}
links.add(new Tuple2(curr.getRelation(), curr.getTarget()));
}
j.setLinks(links);
return j;
}, Encoders.bean(JoinedEntity.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
private static JoinedEntity getJoinedEntity(JoinedEntity j1, JoinedEntity j2) {
JoinedEntity je = new JoinedEntity();
je.setEntity(je.getEntity());
je.setType(j1.getType());
Links links = new Links();
links.addAll(j1.getLinks());
links.addAll(j2.getLinks());
return je;
}
private static JoinedEntity getJoinedEntity(EntityRelEntity e) {
JoinedEntity j = new JoinedEntity();
j.setEntity(toOafEntity(e.getEntity()));
j.setType(EntityType.valueOf(e.getEntity().getType()));
Links links = new Links();
links.add(new eu.dnetlib.dhp.oa.provision.model.Tuple2(e.getRelation(), e.getTarget()));
j.setLinks(links);
return j;
}
private static OafEntity toOafEntity(TypedRow typedRow) {
return parseOaf(typedRow.getOaf(), typedRow.getType());
}
private static OafEntity parseOaf(final String json, final String type) {
try {
switch (GraphMappingUtils.EntityType.valueOf(type)) {
case publication:
return OBJECT_MAPPER.readValue(json, Publication.class);
case dataset:
return OBJECT_MAPPER.readValue(json, Dataset.class);
case otherresearchproduct:
return OBJECT_MAPPER.readValue(json, OtherResearchProduct.class);
case software:
return OBJECT_MAPPER.readValue(json, Software.class);
case datasource:
return OBJECT_MAPPER.readValue(json, Datasource.class);
case organization:
return OBJECT_MAPPER.readValue(json, Organization.class);
case project:
return OBJECT_MAPPER.readValue(json, Project.class);
default:
throw new IllegalArgumentException("invalid type: " + type);
}
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
private static void removeOutputDir(SparkSession spark, String path) {

View File

@ -42,17 +42,15 @@ import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*;
* for each entity type E_i
* join (R.target = E_i.id),
* map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i]
* save the tuples [R - T_i] in append mode
*
* 3) CreateRelatedEntitiesJob_phase2:
* prepare tuples [source entity - relation - target entity] (S - R - T):
* create the union of the each entity type, hash by id (S)
* for each [R - T_i] produced in phase1
* join S.id = [R - T_i].source to produce (S_i - R - T_i)
* save in append mode
*
* 4) AdjacencyListBuilderJob:
* given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity
* given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the result as JoinedEntity
*
* 5) XmlConverterJob:
* convert the JoinedEntities as XML records
@ -121,8 +119,8 @@ public class CreateRelatedEntitiesJob_phase1 {
t -> new EntityRelEntity(t._1()._2(), GraphMappingUtils.asRelatedEntity(t._2()._2(), entityClazz)),
Encoders.bean(EntityRelEntity.class))
.write()
.mode(SaveMode.Append)
.parquet(outputPath);
.mode(SaveMode.Overwrite)
.parquet(outputPath + "/" + EntityType.fromClass(entityClazz));
}
private static <E extends OafEntity> Dataset<E> readPathEntity(SparkSession spark, String inputEntityPath, Class<E> entityClazz) {

View File

@ -1,5 +1,6 @@
package eu.dnetlib.dhp.oa.provision;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
@ -8,7 +9,10 @@ import eu.dnetlib.dhp.oa.provision.model.TypedRow;
import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@ -17,7 +21,10 @@ import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
@ -68,7 +75,7 @@ public class CreateRelatedEntitiesJob_phase2 {
String jsonConfiguration = IOUtils.toString(
PrepareRelationsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json"));
.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
@ -81,14 +88,14 @@ public class CreateRelatedEntitiesJob_phase2 {
String inputRelatedEntitiesPath = parser.get("inputRelatedEntitiesPath");
log.info("inputRelatedEntitiesPath: {}", inputRelatedEntitiesPath);
String inputGraphPath = parser.get("inputGraphPath");
log.info("inputGraphPath: {}", inputGraphPath);
String inputGraphRootPath = parser.get("inputGraphRootPath");
log.info("inputGraphRootPath: {}", inputGraphRootPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
int numPartitions = Integer.parseInt(parser.get("numPartitions"));
log.info("numPartitions: {}", numPartitions);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
@ -97,14 +104,14 @@ public class CreateRelatedEntitiesJob_phase2 {
runWithSparkSession(conf, isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
joinAllEntities(spark, inputRelatedEntitiesPath, inputGraphPath, outputPath);
joinAllEntities(spark, inputRelatedEntitiesPath, inputGraphRootPath, outputPath, numPartitions);
});
}
private static void joinAllEntities(SparkSession spark, String inputRelatedEntitiesPath, String inputGraphPath, String outputPath) {
private static void joinAllEntities(SparkSession spark, String inputRelatedEntitiesPath, String inputGraphRootPath, String outputPath, int numPartitions) {
Dataset<Tuple2<String, TypedRow>> entities = readAllEntities(spark, inputGraphRootPath, numPartitions);
Dataset<Tuple2<String, EntityRelEntity>> relsBySource = readRelatedEntities(spark, inputRelatedEntitiesPath);
Dataset<Tuple2<String, TypedRow>> entities = readAllEntities(spark, inputGraphPath);
entities
.joinWith(relsBySource, entities.col("_1").equalTo(relsBySource.col("_1")), "left_outer")
@ -118,51 +125,76 @@ public class CreateRelatedEntitiesJob_phase2 {
}
return re;
}, Encoders.bean(EntityRelEntity.class))
.repartition(numPartitions)
.filter((FilterFunction<EntityRelEntity>) value -> value.getEntity() != null && StringUtils.isNotBlank(value.getEntity().getId()))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
private static Dataset<Tuple2<String, TypedRow>> readAllEntities(SparkSession spark, String inputGraphPath) {
return GraphMappingUtils.entityTypes.entrySet()
.stream()
.map((Function<Map.Entry<GraphMappingUtils.EntityType, Class>, Dataset<TypedRow>>)
e -> readPathEntity(spark, inputGraphPath + "/" + e.getKey().name(), e.getValue())
.map((MapFunction<OafEntity, TypedRow>) entity -> {
TypedRow t = new TypedRow();
t.setType(e.getKey().name());
t.setDeleted(entity.getDataInfo().getDeletedbyinference());
t.setId(entity.getId());
t.setOaf(OBJECT_MAPPER.writeValueAsString(entity));
return t;
}, Encoders.bean(TypedRow.class)))
.reduce(spark.emptyDataset(Encoders.bean(TypedRow.class)), Dataset::union)
private static Dataset<Tuple2<String, TypedRow>> readAllEntities(SparkSession spark, String inputGraphPath, int numPartitions) {
Dataset<TypedRow> publication = readPathEntity(spark, inputGraphPath + "/publication", Publication.class);
Dataset<TypedRow> dataset = readPathEntity(spark, inputGraphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
Dataset<TypedRow> other = readPathEntity(spark, inputGraphPath + "/otherresearchproduct", OtherResearchProduct.class);
Dataset<TypedRow> software = readPathEntity(spark, inputGraphPath + "/software", Software.class);
Dataset<TypedRow> datasource = readPathEntity(spark, inputGraphPath + "/datasource", Datasource.class);
Dataset<TypedRow> organization = readPathEntity(spark, inputGraphPath + "/organization", Organization.class);
Dataset<TypedRow> project = readPathEntity(spark, inputGraphPath + "/project", Project.class);
return publication
.union(dataset)
.union(other)
.union(software)
.union(datasource)
.union(organization)
.union(project)
.map((MapFunction<TypedRow, Tuple2<String, TypedRow>>)
value -> new Tuple2<>(value.getId(), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class)));
value -> new Tuple2<>(value.getId(), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class)))
.repartition(numPartitions);
}
private static Dataset<Tuple2<String, EntityRelEntity>> readRelatedEntities(SparkSession spark, String inputRelatedEntitiesPath) {
log.info("Reading related entities from: {}", inputRelatedEntitiesPath);
final List<String> paths = HdfsSupport.listFiles(inputRelatedEntitiesPath, spark.sparkContext().hadoopConfiguration());
log.info("Found paths: {}", String.join(",", paths));
return spark.read()
.load(inputRelatedEntitiesPath)
.as(Encoders.kryo(EntityRelEntity.class))
.load(toSeq(paths))
.as(Encoders.bean(EntityRelEntity.class))
.map((MapFunction<EntityRelEntity, Tuple2<String, EntityRelEntity>>)
value -> new Tuple2<>(value.getRelation().getSource(), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class)));
value -> new Tuple2<>(value.getRelation().getSource(), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class)));
}
private static <E extends OafEntity> Dataset<E> readPathEntity(SparkSession spark, String inputEntityPath, Class<E> entityClazz) {
private static <E extends OafEntity> Dataset<TypedRow> readPathEntity(SparkSession spark, String inputEntityPath, Class<E> entityClazz) {
log.info("Reading Graph table from: {}", inputEntityPath);
return spark
.read()
.textFile(inputEntityPath)
.map((MapFunction<String, E>) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz));
.map((MapFunction<String, E>) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz))
.map((MapFunction<E, TypedRow>) value -> getTypedRow(StringUtils.substringAfterLast(inputEntityPath, "/"), value), Encoders.bean(TypedRow.class));
}
private static TypedRow getTypedRow(String type, OafEntity entity) throws JsonProcessingException {
TypedRow t = new TypedRow();
t.setType(type);
t.setDeleted(entity.getDataInfo().getDeletedbyinference());
t.setId(entity.getId());
t.setOaf(OBJECT_MAPPER.writeValueAsString(entity));
return t;
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
private static Seq<String> toSeq(List<String> list) {
return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq();
}
}

View File

@ -1,25 +1,30 @@
package eu.dnetlib.dhp.oa.provision;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapGroupsFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.math.Ordering;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
@ -78,34 +83,24 @@ public class PrepareRelationsJob {
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
int numPartitions = Integer.parseInt(parser.get("relPartitions"));
log.info("relPartitions: {}", numPartitions);
SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
prepareRelationsFromPaths(spark, inputRelationsPath, outputPath);
prepareRelationsFromPaths(spark, inputRelationsPath, outputPath, numPartitions);
});
}
private static void prepareRelationsFromPaths(SparkSession spark, String inputRelationsPath, String outputPath) {
RDD<SortableRelation> rels = readPathRelation(spark, inputRelationsPath)
.filter((FilterFunction<SortableRelation>) r -> r.getDataInfo().getDeletedbyinference() == false)
.javaRDD()
.mapToPair((PairFunction<SortableRelation, String, List<SortableRelation>>) rel -> new Tuple2<>(
rel.getSource(),
Lists.newArrayList(rel)))
.reduceByKey((v1, v2) -> {
v1.addAll(v2);
v1.sort(SortableRelation::compareTo);
if (v1.size() > MAX_RELS) {
return v1.subList(0, MAX_RELS);
}
return new ArrayList<>(v1.subList(0, MAX_RELS));
})
.flatMap(r -> r._2().iterator())
.rdd();
spark.createDataset(rels, Encoders.bean(SortableRelation.class))
private static void prepareRelationsFromPaths(SparkSession spark, String inputRelationsPath, String outputPath, int numPartitions) {
readPathRelation(spark, inputRelationsPath)
.filter((FilterFunction<SortableRelation>) value -> value.getDataInfo().getDeletedbyinference() == false)
.groupByKey((MapFunction<SortableRelation, String>) value -> value.getSource(), Encoders.STRING())
.flatMapGroups((FlatMapGroupsFunction<String, SortableRelation, SortableRelation>) (key, values) -> Iterators.limit(values, MAX_RELS), Encoders.bean(SortableRelation.class))
.repartition(numPartitions)
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
@ -121,8 +116,7 @@ public class PrepareRelationsJob {
private static Dataset<SortableRelation> readPathRelation(SparkSession spark, final String inputPath) {
return spark.read()
.textFile(inputPath)
.map((MapFunction<String, SortableRelation>) s -> OBJECT_MAPPER.readValue(s, SortableRelation.class),
Encoders.bean(SortableRelation.class));
.map((MapFunction<String, SortableRelation>) value -> OBJECT_MAPPER.readValue(value, SortableRelation.class), Encoders.bean(SortableRelation.class));
}
private static void removeOutputDir(SparkSession spark, String path) {

View File

@ -1,31 +1,21 @@
package eu.dnetlib.dhp.oa.provision.model;
import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import java.io.Serializable;
public class JoinedEntity implements Serializable {
private GraphMappingUtils.EntityType type;
private OafEntity entity;
private TypedRow entity;
private Links links;
public GraphMappingUtils.EntityType getType() {
return type;
public JoinedEntity() {
}
public void setType(GraphMappingUtils.EntityType type) {
this.type = type;
}
public OafEntity getEntity() {
public TypedRow getEntity() {
return entity;
}
public void setEntity(OafEntity entity) {
public void setEntity(TypedRow entity) {
this.entity = entity;
}

View File

@ -1,6 +1,10 @@
package eu.dnetlib.dhp.oa.provision.model;
import java.io.Serializable;
import java.util.HashSet;
public class Links extends HashSet<Tuple2> {
public class Links extends HashSet<Tuple2> implements Serializable {
public Links() {
}
}

View File

@ -4,9 +4,10 @@ import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.Relation;
import java.io.Serializable;
import java.util.Map;
public class SortableRelation extends Relation implements Comparable<Relation> {
public class SortableRelation extends Relation implements Comparable<Relation>, Serializable {
private final static Map<String, Integer> weights = Maps.newHashMap();

View File

@ -11,6 +11,9 @@ public class Tuple2 implements Serializable {
private RelatedEntity relatedEntity;
public Tuple2() {
}
public Tuple2(Relation relation, RelatedEntity relatedEntity) {
this.relation = relation;
this.relatedEntity = relatedEntity;

View File

@ -102,11 +102,11 @@ public class GraphMappingUtils {
entityMapping.get(EntityType.valueOf(targetType)).name());
}
public static String getMainType(final String type) {
return entityMapping.get(EntityType.valueOf(type)).name();
public static String getMainType(final EntityType type) {
return entityMapping.get(type).name();
}
public static boolean isResult(String type) {
public static boolean isResult(EntityType type) {
return MainEntityType.result.name().equals(getMainType(type));
}

View File

@ -1,5 +1,6 @@
package eu.dnetlib.dhp.oa.provision.utils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
@ -48,6 +49,8 @@ public class XmlRecordFactory implements Serializable {
private boolean indent = false;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public XmlRecordFactory(
final ContextMapper contextMapper, final boolean indent,
final String schemaLocation, final String otherDatasourceTypesUForUI) {
@ -72,22 +75,24 @@ public class XmlRecordFactory implements Serializable {
final Set<String> contexts = Sets.newHashSet();
final OafEntity entity = je.getEntity();
final OafEntity entity = toOafEntity(je.getEntity());
TemplateFactory templateFactory = new TemplateFactory();
try {
final List<String> metadata = metadata(je.getType(), entity, contexts);
final EntityType type = GraphMappingUtils.EntityType.valueOf(je.getEntity().getType());
final List<String> metadata = metadata(type, entity, contexts);
// rels has to be processed before the contexts because they enrich the contextMap with the funding info.
final List<String> relations = listRelations(je, templateFactory, contexts);
metadata.addAll(buildContexts(getMainType(je.getType()), contexts));
final String mainType = getMainType(type);
metadata.addAll(buildContexts(mainType, contexts));
metadata.add(XmlSerializationUtils.parseDataInfo(entity.getDataInfo()));
final String body = templateFactory.buildBody(
getMainType(je.getType()),
mainType,
metadata,
relations,
listChildren(je, templateFactory), listExtraInfo(je));
listChildren(entity, je.getEntity().getType(), templateFactory), listExtraInfo(entity));
return printXML(templateFactory.buildRecord(entity, schemaLocation, body), indent);
} catch (final Throwable e) {
@ -95,6 +100,35 @@ public class XmlRecordFactory implements Serializable {
}
}
private static OafEntity toOafEntity(TypedRow typedRow) {
return parseOaf(typedRow.getOaf(), typedRow.getType());
}
private static OafEntity parseOaf(final String json, final String type) {
try {
switch (GraphMappingUtils.EntityType.valueOf(type)) {
case publication:
return OBJECT_MAPPER.readValue(json, Publication.class);
case dataset:
return OBJECT_MAPPER.readValue(json, Dataset.class);
case otherresearchproduct:
return OBJECT_MAPPER.readValue(json, OtherResearchProduct.class);
case software:
return OBJECT_MAPPER.readValue(json, Software.class);
case datasource:
return OBJECT_MAPPER.readValue(json, Datasource.class);
case organization:
return OBJECT_MAPPER.readValue(json, Organization.class);
case project:
return OBJECT_MAPPER.readValue(json, Project.class);
default:
throw new IllegalArgumentException("invalid type: " + type);
}
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
private String printXML(String xml, boolean indent) {
try {
final Document doc = new SAXReader().read(new StringReader(xml));
@ -110,7 +144,7 @@ public class XmlRecordFactory implements Serializable {
}
}
private List<String> metadata(final String type, final OafEntity entity, final Set<String> contexts) {
private List<String> metadata(final EntityType type, final OafEntity entity, final Set<String> contexts) {
final List<String> metadata = Lists.newArrayList();
@ -262,7 +296,7 @@ public class XmlRecordFactory implements Serializable {
metadata.add(XmlSerializationUtils.mapQualifier("bestaccessright", getBestAccessright(r)));
}
switch (EntityType.valueOf(type)) {
switch (type) {
case publication:
final Publication pub = (Publication) entity;
@ -746,14 +780,14 @@ public class XmlRecordFactory implements Serializable {
return rels;
}
private List<String> listChildren(final JoinedEntity je, TemplateFactory templateFactory) {
private List<String> listChildren(final OafEntity entity, String type, TemplateFactory templateFactory) {
final List<String> children = Lists.newArrayList();
if (MainEntityType.result.toString().equals(getMainType(je.getType()))) {
final List<Instance> instances = ((Result) je.getEntity()).getInstance();
EntityType entityType = EntityType.valueOf(type);
if (MainEntityType.result.toString().equals(getMainType(entityType))) {
final List<Instance> instances = ((Result) entity).getInstance();
if (instances != null) {
for (final Instance instance : ((Result) je.getEntity()).getInstance()) {
for (final Instance instance : ((Result) entity).getInstance()) {
final List<String> fields = Lists.newArrayList();
@ -788,9 +822,9 @@ public class XmlRecordFactory implements Serializable {
children.add(templateFactory.getInstance(instance.getHostedby().getKey(), fields, instance.getUrl()));
}
}
final List<ExternalReference> ext = ((Result) je.getEntity()).getExternalReference();
final List<ExternalReference> ext = ((Result) entity).getExternalReference();
if (ext != null) {
for (final ExternalReference er : ((Result) je.getEntity()).getExternalReference()) {
for (final ExternalReference er : ((Result) entity).getExternalReference()) {
final List<String> fields = Lists.newArrayList();
@ -824,8 +858,8 @@ public class XmlRecordFactory implements Serializable {
return children;
}
private List<String> listExtraInfo(JoinedEntity je) {
final List<ExtraInfo> extraInfo = je.getEntity().getExtraInfo();
private List<String> listExtraInfo(OafEntity entity) {
final List<ExtraInfo> extraInfo = entity.getExtraInfo();
return extraInfo != null ? extraInfo
.stream()
.map(e -> XmlSerializationUtils.mapExtraInfo(e))

View File

@ -16,5 +16,11 @@
"paramLongName": "outputPath",
"paramDescription": "root output location for prepared relations",
"paramRequired": true
},
{
"paramName": "rp",
"paramLongName": "relPartitions",
"paramDescription": "number or partitions for the relations Dataset",
"paramRequired": true
}
]

View File

@ -13,7 +13,7 @@
},
{
"paramName": "iep",
"paramLongName": "inputGraphPath",
"paramLongName": "inputGraphRootPath",
"paramDescription": "root graph path",
"paramRequired": true
},
@ -22,5 +22,11 @@
"paramLongName": "outputPath",
"paramDescription": "root output location for prepared relations",
"paramRequired": true
},
{
"paramName": "np",
"paramLongName": "numPartitions",
"paramDescription": "number of partitions to use for the output",
"paramRequired": true
}
]

View File

@ -5,6 +5,10 @@
<name>inputGraphRootPath</name>
<description>root location of input materialized graph</description>
</property>
<property>
<name>isLookupUrl</name>
<description>URL for the isLookup service</description>
</property>
<property>
<name>sparkDriverMemoryForJoining</name>
@ -97,6 +101,7 @@
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation</arg>
<arg>--relPartitions</arg><arg>3000</arg>
</spark>
<ok to="fork_join_related_entities"/>
<error to="Kill"/>
@ -128,13 +133,14 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relations</arg>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
</spark>
<ok to="join_relation"/>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
@ -154,13 +160,14 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relations</arg>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
</spark>
<ok to="join_relation"/>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
@ -180,13 +187,14 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relations</arg>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
</spark>
<ok to="join_relation"/>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
@ -206,13 +214,14 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relations</arg>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
</spark>
<ok to="join_relation"/>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
@ -232,13 +241,14 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relations</arg>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/datasource</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
</spark>
<ok to="join_relation"/>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
@ -258,13 +268,14 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relations</arg>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
</spark>
<ok to="join_relation"/>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
@ -284,17 +295,19 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relations</arg>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
</spark>
<ok to="join_relation"/>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
<join name="join_relation" to="join_all_entities"/>
<join name="wait_joins" to="join_all_entities"/>
<action name="join_all_entities">
<spark xmlns="uri:oozie:spark-action:0.2">
@ -312,10 +325,12 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}</arg>
<arg>--inputGraphRootPath</arg><arg>${inputGraphRootPath}</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities</arg>
<arg>--numPartitions</arg><arg>12000</arg>
</spark>
<ok to="adjancency_lists"/>
<error to="Kill"/>
@ -337,6 +352,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg> <arg>${workingDir}/join_entities</arg>
<arg>--outputPath</arg><arg>${workingDir}/joined</arg>
@ -361,6 +377,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/joined</arg>
<arg>--outputPath</arg><arg>${workingDir}/xml</arg>