WIP: reimplementing the adjacency list construction process using spark Datasets
This commit is contained in:
parent
377e1ba840
commit
adcdd2d05e
|
@ -1,291 +0,0 @@
|
||||||
package eu.dnetlib.dhp.oa.provision;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import com.google.common.collect.Iterables;
|
|
||||||
import com.google.common.collect.Maps;
|
|
||||||
import com.jayway.jsonpath.DocumentContext;
|
|
||||||
import com.jayway.jsonpath.JsonPath;
|
|
||||||
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
|
|
||||||
import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils;
|
|
||||||
import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
|
|
||||||
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
|
|
||||||
import eu.dnetlib.dhp.oa.provision.model.*;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.hadoop.io.compress.GzipCodec;
|
|
||||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
|
||||||
import org.apache.spark.SparkContext;
|
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
|
||||||
import org.apache.spark.api.java.function.PairFunction;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.apache.spark.util.LongAccumulator;
|
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.asRelatedEntity;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
|
|
||||||
* The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization,
|
|
||||||
* and all the possible relationships (similarity links produced by the Dedup process are excluded).
|
|
||||||
*
|
|
||||||
* The operation is implemented creating the union between the entity types (E), joined by the relationships (R), and again
|
|
||||||
* by E, finally grouped by E.id;
|
|
||||||
*
|
|
||||||
* Different manipulations of the E and R sets are introduced to reduce the complexity of the operation
|
|
||||||
* 1) treat the object payload as string, extracting only the necessary information beforehand using json path,
|
|
||||||
* it seems that deserializing it with jackson's object mapper has higher memory footprint.
|
|
||||||
*
|
|
||||||
* 2) only consider rels that are not virtually deleted ($.dataInfo.deletedbyinference == false)
|
|
||||||
* 3) we only need a subset of fields from the related entities, so we introduce a distinction between E_source = S
|
|
||||||
* and E_target = T. Objects in T are heavily pruned by all the unnecessary information
|
|
||||||
*
|
|
||||||
* 4) perform the join as (((T.id join R.target) union S) groupby S.id) yield S -> [ <T, R> ]
|
|
||||||
*/
|
|
||||||
public class GraphJoiner implements Serializable {
|
|
||||||
|
|
||||||
private Map<String, LongAccumulator> accumulators = Maps.newHashMap();
|
|
||||||
|
|
||||||
public static final int MAX_RELS = 100;
|
|
||||||
|
|
||||||
public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd";
|
|
||||||
|
|
||||||
private SparkSession spark;
|
|
||||||
|
|
||||||
private ContextMapper contextMapper;
|
|
||||||
|
|
||||||
private String inputPath;
|
|
||||||
|
|
||||||
private String outPath;
|
|
||||||
|
|
||||||
private String otherDsTypeId;
|
|
||||||
|
|
||||||
public GraphJoiner(SparkSession spark, ContextMapper contextMapper, String otherDsTypeId, String inputPath, String outPath) {
|
|
||||||
this.spark = spark;
|
|
||||||
this.contextMapper = contextMapper;
|
|
||||||
this.otherDsTypeId = otherDsTypeId;
|
|
||||||
this.inputPath = inputPath;
|
|
||||||
this.outPath = outPath;
|
|
||||||
|
|
||||||
final SparkContext sc = spark.sparkContext();
|
|
||||||
prepareAccumulators(sc);
|
|
||||||
}
|
|
||||||
|
|
||||||
public GraphJoiner adjacencyLists() {
|
|
||||||
final JavaSparkContext jsc = new JavaSparkContext(getSpark().sparkContext());
|
|
||||||
|
|
||||||
// read each entity
|
|
||||||
JavaPairRDD<String, TypedRow> datasource = readPathEntity(jsc, getInputPath(), "datasource");
|
|
||||||
JavaPairRDD<String, TypedRow> organization = readPathEntity(jsc, getInputPath(), "organization");
|
|
||||||
JavaPairRDD<String, TypedRow> project = readPathEntity(jsc, getInputPath(), "project");
|
|
||||||
JavaPairRDD<String, TypedRow> dataset = readPathEntity(jsc, getInputPath(), "dataset");
|
|
||||||
JavaPairRDD<String, TypedRow> otherresearchproduct = readPathEntity(jsc, getInputPath(), "otherresearchproduct");
|
|
||||||
JavaPairRDD<String, TypedRow> software = readPathEntity(jsc, getInputPath(), "software");
|
|
||||||
JavaPairRDD<String, TypedRow> publication = readPathEntity(jsc, getInputPath(), "publication");
|
|
||||||
|
|
||||||
// create the union between all the entities
|
|
||||||
final String entitiesPath = getOutPath() + "/entities";
|
|
||||||
datasource
|
|
||||||
.union(organization)
|
|
||||||
.union(project)
|
|
||||||
.union(dataset)
|
|
||||||
.union(otherresearchproduct)
|
|
||||||
.union(software)
|
|
||||||
.union(publication)
|
|
||||||
.map(e -> new EntityRelEntity().setSource(e._2()))
|
|
||||||
.map(GraphMappingUtils::serialize)
|
|
||||||
.saveAsTextFile(entitiesPath, GzipCodec.class);
|
|
||||||
|
|
||||||
JavaPairRDD<String, EntityRelEntity> entities = jsc.textFile(entitiesPath)
|
|
||||||
.map(t -> new ObjectMapper().readValue(t, EntityRelEntity.class))
|
|
||||||
.mapToPair(t -> new Tuple2<>(t.getSource().getSourceId(), t));
|
|
||||||
|
|
||||||
final String relationPath = getOutPath() + "/relation";
|
|
||||||
// reads the relationships
|
|
||||||
final JavaPairRDD<SortableRelationKey, EntityRelEntity> rels = readPathRelation(jsc, getInputPath())
|
|
||||||
.filter(rel -> !rel.getDeleted()) //only consider those that are not virtually deleted
|
|
||||||
.map(p -> new EntityRelEntity().setRelation(p))
|
|
||||||
.mapToPair(p -> new Tuple2<>(SortableRelationKey.from(p), p));
|
|
||||||
rels
|
|
||||||
.groupByKey(new RelationPartitioner(rels.getNumPartitions()))
|
|
||||||
.map(p -> Iterables.limit(p._2(), MAX_RELS))
|
|
||||||
.flatMap(p -> p.iterator())
|
|
||||||
.map(s -> new ObjectMapper().writeValueAsString(s))
|
|
||||||
.saveAsTextFile(relationPath, GzipCodec.class);
|
|
||||||
|
|
||||||
final JavaPairRDD<String, EntityRelEntity> relation = jsc.textFile(relationPath)
|
|
||||||
.map(s -> new ObjectMapper().readValue(s, EntityRelEntity.class))
|
|
||||||
.mapToPair(p -> new Tuple2<>(p.getRelation().getTargetId(), p));
|
|
||||||
|
|
||||||
final String bySourcePath = getOutPath() + "/join_by_source";
|
|
||||||
relation
|
|
||||||
.join(entities
|
|
||||||
.filter(e -> !e._2().getSource().getDeleted())
|
|
||||||
.mapToPair(e -> new Tuple2<>(e._1(), asRelatedEntity(e._2()))))
|
|
||||||
.map(s -> new EntityRelEntity()
|
|
||||||
.setRelation(s._2()._1().getRelation())
|
|
||||||
.setTarget(s._2()._2().getSource()))
|
|
||||||
.map(j -> new ObjectMapper().writeValueAsString(j))
|
|
||||||
.saveAsTextFile(bySourcePath, GzipCodec.class);
|
|
||||||
|
|
||||||
JavaPairRDD<String, EntityRelEntity> bySource = jsc.textFile(bySourcePath)
|
|
||||||
.map(e -> getObjectMapper().readValue(e, EntityRelEntity.class))
|
|
||||||
.mapToPair(t -> new Tuple2<>(t.getRelation().getSourceId(), t));
|
|
||||||
|
|
||||||
final XmlRecordFactory recordFactory = new XmlRecordFactory(accumulators, contextMapper, false, schemaLocation, otherDsTypeId);
|
|
||||||
entities
|
|
||||||
.union(bySource)
|
|
||||||
.groupByKey() // by source id
|
|
||||||
.map(l -> toJoinedEntity(l))
|
|
||||||
.mapToPair(je -> new Tuple2<>(
|
|
||||||
new Text(je.getEntity().getId()),
|
|
||||||
new Text(recordFactory.build(je))))
|
|
||||||
.saveAsHadoopFile(getOutPath() + "/xml", Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
|
||||||
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public SparkSession getSpark() {
|
|
||||||
return spark;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getInputPath() {
|
|
||||||
return inputPath;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getOutPath() {
|
|
||||||
return outPath;
|
|
||||||
}
|
|
||||||
|
|
||||||
// HELPERS
|
|
||||||
|
|
||||||
private OafEntity parseOaf(final String json, final String type, final ObjectMapper mapper) {
|
|
||||||
try {
|
|
||||||
switch (GraphMappingUtils.EntityType.valueOf(type)) {
|
|
||||||
case publication:
|
|
||||||
return mapper.readValue(json, Publication.class);
|
|
||||||
case dataset:
|
|
||||||
return mapper.readValue(json, Dataset.class);
|
|
||||||
case otherresearchproduct:
|
|
||||||
return mapper.readValue(json, OtherResearchProduct.class);
|
|
||||||
case software:
|
|
||||||
return mapper.readValue(json, Software.class);
|
|
||||||
case datasource:
|
|
||||||
return mapper.readValue(json, Datasource.class);
|
|
||||||
case organization:
|
|
||||||
return mapper.readValue(json, Organization.class);
|
|
||||||
case project:
|
|
||||||
return mapper.readValue(json, Project.class);
|
|
||||||
default:
|
|
||||||
throw new IllegalArgumentException("invalid type: " + type);
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new IllegalArgumentException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private JoinedEntity toJoinedEntity(Tuple2<String, Iterable<EntityRelEntity>> p) {
|
|
||||||
final ObjectMapper mapper = getObjectMapper();
|
|
||||||
final JoinedEntity j = new JoinedEntity();
|
|
||||||
final Links links = new Links();
|
|
||||||
for(EntityRelEntity rel : p._2()) {
|
|
||||||
if (rel.hasMainEntity() & j.getEntity() == null) {
|
|
||||||
j.setType(rel.getSource().getType());
|
|
||||||
j.setEntity(parseOaf(rel.getSource().getOaf(), rel.getSource().getType(), mapper));
|
|
||||||
}
|
|
||||||
if (rel.hasRelatedEntity()) {
|
|
||||||
try {
|
|
||||||
links.add(
|
|
||||||
new eu.dnetlib.dhp.oa.provision.model.Tuple2()
|
|
||||||
.setRelation(mapper.readValue(rel.getRelation().getOaf(), Relation.class))
|
|
||||||
.setRelatedEntity(mapper.readValue(rel.getTarget().getOaf(), RelatedEntity.class)));
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new IllegalArgumentException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
j.setLinks(links);
|
|
||||||
if (j.getEntity() == null) {
|
|
||||||
throw new IllegalStateException("missing main entity on '" + p._1() + "'");
|
|
||||||
}
|
|
||||||
return j;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Reads a set of eu.dnetlib.dhp.schema.oaf.OafEntity objects from a sequence file <className, entity json serialization>,
|
|
||||||
* extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow
|
|
||||||
* @param sc
|
|
||||||
* @param inputPath
|
|
||||||
* @param type
|
|
||||||
* @return the JavaPairRDD<String, TypedRow> indexed by entity identifier
|
|
||||||
*/
|
|
||||||
private JavaPairRDD<String, TypedRow> readPathEntity(final JavaSparkContext sc, final String inputPath, final String type) {
|
|
||||||
return sc.textFile(inputPath + "/" + type)
|
|
||||||
.mapToPair((PairFunction<String, String, TypedRow>) s -> {
|
|
||||||
final DocumentContext json = JsonPath.parse(s);
|
|
||||||
final String id = json.read("$.id");
|
|
||||||
return new Tuple2<>(id, new TypedRow()
|
|
||||||
.setSourceId(id)
|
|
||||||
.setDeleted(json.read("$.dataInfo.deletedbyinference"))
|
|
||||||
.setType(type)
|
|
||||||
.setOaf(s));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Reads a set of eu.dnetlib.dhp.schema.oaf.Relation objects from a sequence file <className, relation json serialization>,
|
|
||||||
* extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow
|
|
||||||
* @param sc
|
|
||||||
* @param inputPath
|
|
||||||
* @return the JavaRDD<TypedRow> containing all the relationships
|
|
||||||
*/
|
|
||||||
private JavaRDD<TypedRow> readPathRelation(final JavaSparkContext sc, final String inputPath) {
|
|
||||||
return sc.textFile(inputPath + "/relation")
|
|
||||||
.map(s -> {
|
|
||||||
final DocumentContext json = JsonPath.parse(s);
|
|
||||||
return new TypedRow()
|
|
||||||
.setSourceId(json.read("$.source"))
|
|
||||||
.setTargetId(json.read("$.target"))
|
|
||||||
.setDeleted(json.read("$.dataInfo.deletedbyinference"))
|
|
||||||
.setType("relation")
|
|
||||||
.setRelType("$.relType")
|
|
||||||
.setSubRelType("$.subRelType")
|
|
||||||
.setRelClass("$.relClass")
|
|
||||||
.setOaf(s);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private ObjectMapper getObjectMapper() {
|
|
||||||
return new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void prepareAccumulators(SparkContext sc) {
|
|
||||||
accumulators.put("resultResult_similarity_isAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_isAmongTopNSimilarDocuments"));
|
|
||||||
accumulators.put("resultResult_similarity_hasAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_hasAmongTopNSimilarDocuments"));
|
|
||||||
accumulators.put("resultResult_supplement_isSupplementTo", sc.longAccumulator("resultResult_supplement_isSupplementTo"));
|
|
||||||
accumulators.put("resultResult_supplement_isSupplementedBy", sc.longAccumulator("resultResult_supplement_isSupplementedBy"));
|
|
||||||
accumulators.put("resultResult_dedup_isMergedIn", sc.longAccumulator("resultResult_dedup_isMergedIn"));
|
|
||||||
accumulators.put("resultResult_dedup_merges", sc.longAccumulator("resultResult_dedup_merges"));
|
|
||||||
|
|
||||||
accumulators.put("resultResult_publicationDataset_isRelatedTo", sc.longAccumulator("resultResult_publicationDataset_isRelatedTo"));
|
|
||||||
accumulators.put("resultResult_relationship_isRelatedTo", sc.longAccumulator("resultResult_relationship_isRelatedTo"));
|
|
||||||
accumulators.put("resultProject_outcome_isProducedBy", sc.longAccumulator("resultProject_outcome_isProducedBy"));
|
|
||||||
accumulators.put("resultProject_outcome_produces", sc.longAccumulator("resultProject_outcome_produces"));
|
|
||||||
accumulators.put("resultOrganization_affiliation_isAuthorInstitutionOf", sc.longAccumulator("resultOrganization_affiliation_isAuthorInstitutionOf"));
|
|
||||||
|
|
||||||
accumulators.put("resultOrganization_affiliation_hasAuthorInstitution", sc.longAccumulator("resultOrganization_affiliation_hasAuthorInstitution"));
|
|
||||||
accumulators.put("projectOrganization_participation_hasParticipant", sc.longAccumulator("projectOrganization_participation_hasParticipant"));
|
|
||||||
accumulators.put("projectOrganization_participation_isParticipant", sc.longAccumulator("projectOrganization_participation_isParticipant"));
|
|
||||||
accumulators.put("organizationOrganization_dedup_isMergedIn", sc.longAccumulator("organizationOrganization_dedup_isMergedIn"));
|
|
||||||
accumulators.put("organizationOrganization_dedup_merges", sc.longAccumulator("resultProject_outcome_produces"));
|
|
||||||
accumulators.put("datasourceOrganization_provision_isProvidedBy", sc.longAccumulator("datasourceOrganization_provision_isProvidedBy"));
|
|
||||||
accumulators.put("datasourceOrganization_provision_provides", sc.longAccumulator("datasourceOrganization_provision_provides"));
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -0,0 +1,328 @@
|
||||||
|
package eu.dnetlib.dhp.oa.provision;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.collect.Iterators;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import com.jayway.jsonpath.DocumentContext;
|
||||||
|
import com.jayway.jsonpath.JsonPath;
|
||||||
|
import eu.dnetlib.dhp.oa.provision.model.*;
|
||||||
|
import eu.dnetlib.dhp.oa.provision.utils.*;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
import org.apache.spark.SparkContext;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.*;
|
||||||
|
import org.apache.spark.rdd.RDD;
|
||||||
|
import org.apache.spark.sql.*;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.types.*;
|
||||||
|
import org.apache.spark.util.LongAccumulator;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.asRelatedEntity;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
|
||||||
|
* The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization,
|
||||||
|
* and all the possible relationships (similarity links produced by the Dedup process are excluded).
|
||||||
|
*
|
||||||
|
* The operation is implemented creating the union between the entity types (E), joined by the relationships (R), and again
|
||||||
|
* by E, finally grouped by E.id;
|
||||||
|
*
|
||||||
|
* Different manipulations of the E and R sets are introduced to reduce the complexity of the operation
|
||||||
|
* 1) treat the object payload as string, extracting only the necessary information beforehand using json path,
|
||||||
|
* it seems that deserializing it with jackson's object mapper has higher memory footprint.
|
||||||
|
*
|
||||||
|
* 2) only consider rels that are not virtually deleted ($.dataInfo.deletedbyinference == false)
|
||||||
|
* 3) we only need a subset of fields from the related entities, so we introduce a distinction between E_source = S
|
||||||
|
* and E_target = T. Objects in T are heavily pruned by all the unnecessary information
|
||||||
|
*
|
||||||
|
* 4) perform the join as (((T.id join R.target) union S) groupby S.id) yield S -> [ <T, R> ]
|
||||||
|
*/
|
||||||
|
public class GraphJoiner_v2 implements Serializable {
|
||||||
|
|
||||||
|
public static final int LIMIT = 1000000;
|
||||||
|
private Map<String, LongAccumulator> accumulators = Maps.newHashMap();
|
||||||
|
|
||||||
|
public static final int MAX_RELS = 100;
|
||||||
|
|
||||||
|
public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd";
|
||||||
|
|
||||||
|
private static final StructType KV_SCHEMA = StructType$.MODULE$.apply(
|
||||||
|
Arrays.asList(
|
||||||
|
StructField$.MODULE$.apply("key", DataTypes.StringType, false, Metadata.empty()),
|
||||||
|
StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty())
|
||||||
|
));
|
||||||
|
|
||||||
|
private static final StructType TYPED_ROW_SCHEMA = StructType$.MODULE$.apply(
|
||||||
|
Arrays.asList(
|
||||||
|
StructField$.MODULE$.apply("sourceId", DataTypes.StringType, false, Metadata.empty()),
|
||||||
|
StructField$.MODULE$.apply("targetId", DataTypes.StringType, true, Metadata.empty()),
|
||||||
|
StructField$.MODULE$.apply("deleted", DataTypes.BooleanType, false, Metadata.empty()),
|
||||||
|
StructField$.MODULE$.apply("type", DataTypes.StringType, false, Metadata.empty()),
|
||||||
|
StructField$.MODULE$.apply("relType", DataTypes.StringType, true, Metadata.empty()),
|
||||||
|
StructField$.MODULE$.apply("subRelType", DataTypes.StringType, true, Metadata.empty()),
|
||||||
|
StructField$.MODULE$.apply("relClass", DataTypes.StringType, true, Metadata.empty()),
|
||||||
|
StructField$.MODULE$.apply("oaf", DataTypes.BinaryType, false, Metadata.empty())
|
||||||
|
));
|
||||||
|
|
||||||
|
private static final StructType ENTITY_REL_ENTITY_SCHEMA = StructType$.MODULE$.apply(
|
||||||
|
Arrays.asList(
|
||||||
|
StructField$.MODULE$.apply("source", TYPED_ROW_SCHEMA, false, Metadata.empty()),
|
||||||
|
StructField$.MODULE$.apply("relation", TYPED_ROW_SCHEMA, true, Metadata.empty()),
|
||||||
|
StructField$.MODULE$.apply("target", TYPED_ROW_SCHEMA, false, Metadata.empty())
|
||||||
|
));
|
||||||
|
|
||||||
|
|
||||||
|
private SparkSession spark;
|
||||||
|
|
||||||
|
private ContextMapper contextMapper;
|
||||||
|
|
||||||
|
private String inputPath;
|
||||||
|
|
||||||
|
private String outPath;
|
||||||
|
|
||||||
|
private String otherDsTypeId;
|
||||||
|
|
||||||
|
public GraphJoiner_v2(SparkSession spark, ContextMapper contextMapper, String otherDsTypeId, String inputPath, String outPath) {
|
||||||
|
this.spark = spark;
|
||||||
|
this.contextMapper = contextMapper;
|
||||||
|
this.otherDsTypeId = otherDsTypeId;
|
||||||
|
this.inputPath = inputPath;
|
||||||
|
this.outPath = outPath;
|
||||||
|
|
||||||
|
final SparkContext sc = spark.sparkContext();
|
||||||
|
prepareAccumulators(sc);
|
||||||
|
}
|
||||||
|
|
||||||
|
public GraphJoiner_v2 adjacencyLists() throws IOException {
|
||||||
|
|
||||||
|
final JavaSparkContext jsc = JavaSparkContext.fromSparkContext(getSpark().sparkContext());
|
||||||
|
|
||||||
|
// read each entity
|
||||||
|
Dataset<TypedRow> datasource = readPathEntity(jsc, getInputPath(), "datasource");
|
||||||
|
Dataset<TypedRow> organization = readPathEntity(jsc, getInputPath(), "organization");
|
||||||
|
Dataset<TypedRow> project = readPathEntity(jsc, getInputPath(), "project");
|
||||||
|
Dataset<TypedRow> dataset = readPathEntity(jsc, getInputPath(), "dataset");
|
||||||
|
Dataset<TypedRow> otherresearchproduct = readPathEntity(jsc, getInputPath(), "otherresearchproduct");
|
||||||
|
Dataset<TypedRow> software = readPathEntity(jsc, getInputPath(), "software");
|
||||||
|
Dataset<TypedRow> publication = readPathEntity(jsc, getInputPath(), "publication");
|
||||||
|
|
||||||
|
// create the union between all the entities
|
||||||
|
Dataset<Tuple2<String, TypedRow>> entities =
|
||||||
|
datasource
|
||||||
|
.union(organization)
|
||||||
|
.union(project)
|
||||||
|
.union(dataset)
|
||||||
|
.union(otherresearchproduct)
|
||||||
|
.union(software)
|
||||||
|
.union(publication)
|
||||||
|
.map((MapFunction<TypedRow, Tuple2<String, TypedRow>>) value -> new Tuple2<>(
|
||||||
|
value.getId(),
|
||||||
|
value),
|
||||||
|
Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class)))
|
||||||
|
.limit(LIMIT)
|
||||||
|
.cache();
|
||||||
|
|
||||||
|
System.out.println("Entities schema:");
|
||||||
|
entities.printSchema();
|
||||||
|
// reads the relationships
|
||||||
|
|
||||||
|
Dataset<Relation> rels = readPathRelation(jsc, getInputPath())
|
||||||
|
.groupByKey((MapFunction<Relation, SortableRelationKey>) t -> SortableRelationKey.from(t), Encoders.kryo(SortableRelationKey.class))
|
||||||
|
.flatMapGroups((FlatMapGroupsFunction<SortableRelationKey, Relation, Relation>) (key, values) -> Iterators.limit(values, MAX_RELS), Encoders.bean(Relation.class))
|
||||||
|
.limit(LIMIT)
|
||||||
|
.cache();
|
||||||
|
|
||||||
|
System.out.println("Relation schema:");
|
||||||
|
rels.printSchema();
|
||||||
|
|
||||||
|
Dataset<Tuple2<String, Relation>> relsByTarget = rels
|
||||||
|
.map((MapFunction<Relation, Tuple2<String, Relation>>) r -> new Tuple2<>(r.getTarget(), r), Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class)));
|
||||||
|
|
||||||
|
System.out.println("Relation by target schema:");
|
||||||
|
relsByTarget.printSchema();
|
||||||
|
|
||||||
|
Dataset<Tuple2<String, EntityRelEntity>> bySource = relsByTarget
|
||||||
|
.joinWith(entities, relsByTarget.col("_1").equalTo(entities.col("_1")), "inner")
|
||||||
|
.filter((FilterFunction<Tuple2<Tuple2<String, Relation>, Tuple2<String, TypedRow>>>) value -> value._2()._2().getDeleted() == false)
|
||||||
|
.map((MapFunction<Tuple2<Tuple2<String, Relation>, Tuple2<String, TypedRow>>, EntityRelEntity>) t -> {
|
||||||
|
EntityRelEntity e = new EntityRelEntity();
|
||||||
|
e.setRelation(t._1()._2());
|
||||||
|
e.setTarget(asRelatedEntity(t._2()._2()));
|
||||||
|
return e;
|
||||||
|
}, Encoders.bean(EntityRelEntity.class))
|
||||||
|
.map((MapFunction<EntityRelEntity, Tuple2<String, EntityRelEntity>>) e -> new Tuple2<>(e.getRelation().getSource(), e),
|
||||||
|
Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class)));
|
||||||
|
|
||||||
|
System.out.println("bySource schema");
|
||||||
|
bySource.printSchema();
|
||||||
|
|
||||||
|
Dataset<EntityRelEntity> joined = entities
|
||||||
|
.joinWith(bySource, entities.col("_1").equalTo(bySource.col("_1")), "left")
|
||||||
|
.map((MapFunction<Tuple2<Tuple2<String, TypedRow>, Tuple2<String, EntityRelEntity>>, EntityRelEntity>) value -> {
|
||||||
|
EntityRelEntity re = new EntityRelEntity();
|
||||||
|
re.setEntity(value._1()._2());
|
||||||
|
Optional<EntityRelEntity> related = Optional.ofNullable(value._2()).map(Tuple2::_2);
|
||||||
|
if (related.isPresent()) {
|
||||||
|
re.setRelation(related.get().getRelation());
|
||||||
|
re.setTarget(related.get().getTarget());
|
||||||
|
}
|
||||||
|
return re;
|
||||||
|
}, Encoders.kryo(EntityRelEntity.class));
|
||||||
|
|
||||||
|
System.out.println("joined schema");
|
||||||
|
joined.printSchema();
|
||||||
|
//joined.write().json(getOutPath() + "/joined");
|
||||||
|
|
||||||
|
final Dataset<JoinedEntity> grouped = joined
|
||||||
|
.groupByKey((MapFunction<EntityRelEntity, TypedRow>) e -> e.getEntity(), Encoders.kryo(TypedRow.class))
|
||||||
|
.mapGroups((MapGroupsFunction<TypedRow, EntityRelEntity, JoinedEntity>) (key, values) -> toJoinedEntity(key, values), Encoders.kryo(JoinedEntity.class));
|
||||||
|
|
||||||
|
System.out.println("grouped schema");
|
||||||
|
grouped.printSchema();
|
||||||
|
|
||||||
|
final XmlRecordFactory recordFactory = new XmlRecordFactory(accumulators, contextMapper, false, schemaLocation, otherDsTypeId);
|
||||||
|
grouped
|
||||||
|
.map((MapFunction<JoinedEntity, String>) value -> recordFactory.build(value), Encoders.STRING())
|
||||||
|
.limit(LIMIT)
|
||||||
|
.write()
|
||||||
|
.text(getOutPath() + "/xml");
|
||||||
|
/*
|
||||||
|
.javaRDD()
|
||||||
|
.mapToPair((PairFunction<Tuple2<String, String>, String, String>) t -> new Tuple2<>(t._1(), t._2()))
|
||||||
|
.saveAsHadoopFile(getOutPath() + "/xml", Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SparkSession getSpark() {
|
||||||
|
return spark;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getInputPath() {
|
||||||
|
return inputPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getOutPath() {
|
||||||
|
return outPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
// HELPERS
|
||||||
|
|
||||||
|
private JoinedEntity toJoinedEntity(TypedRow key, Iterator<EntityRelEntity> values) {
|
||||||
|
final ObjectMapper mapper = getObjectMapper();
|
||||||
|
final JoinedEntity j = new JoinedEntity();
|
||||||
|
j.setType(key.getType());
|
||||||
|
j.setEntity(parseOaf(key.getOaf(), key.getType(), mapper));
|
||||||
|
final Links links = new Links();
|
||||||
|
values.forEachRemaining(rel -> links.add(
|
||||||
|
new eu.dnetlib.dhp.oa.provision.model.Tuple2(
|
||||||
|
rel.getRelation(),
|
||||||
|
rel.getTarget()
|
||||||
|
)));
|
||||||
|
j.setLinks(links);
|
||||||
|
return j;
|
||||||
|
}
|
||||||
|
|
||||||
|
private OafEntity parseOaf(final String json, final String type, final ObjectMapper mapper) {
|
||||||
|
try {
|
||||||
|
switch (GraphMappingUtils.EntityType.valueOf(type)) {
|
||||||
|
case publication:
|
||||||
|
return mapper.readValue(json, Publication.class);
|
||||||
|
case dataset:
|
||||||
|
return mapper.readValue(json, eu.dnetlib.dhp.schema.oaf.Dataset.class);
|
||||||
|
case otherresearchproduct:
|
||||||
|
return mapper.readValue(json, OtherResearchProduct.class);
|
||||||
|
case software:
|
||||||
|
return mapper.readValue(json, Software.class);
|
||||||
|
case datasource:
|
||||||
|
return mapper.readValue(json, Datasource.class);
|
||||||
|
case organization:
|
||||||
|
return mapper.readValue(json, Organization.class);
|
||||||
|
case project:
|
||||||
|
return mapper.readValue(json, Project.class);
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException("invalid type: " + type);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new IllegalArgumentException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads a set of eu.dnetlib.dhp.schema.oaf.OafEntity objects from a new line delimited json file,
|
||||||
|
* extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow
|
||||||
|
* @param sc
|
||||||
|
* @param inputPath
|
||||||
|
* @param type
|
||||||
|
* @return the JavaPairRDD<String, TypedRow> indexed by entity identifier
|
||||||
|
*/
|
||||||
|
private Dataset<TypedRow> readPathEntity(final JavaSparkContext sc, final String inputPath, final String type) {
|
||||||
|
RDD<Row> rdd = sc.textFile(inputPath + "/" + type)
|
||||||
|
.map((Function<String, Row>) s -> RowFactory.create("", s))
|
||||||
|
.rdd();
|
||||||
|
|
||||||
|
return getSpark().createDataFrame(rdd, KV_SCHEMA)
|
||||||
|
.map((MapFunction<Row, TypedRow>) row -> {
|
||||||
|
final String s = row.getAs("value");
|
||||||
|
final DocumentContext json = JsonPath.parse(s);
|
||||||
|
final TypedRow t = new TypedRow();
|
||||||
|
t.setId(json.read("$.id"));
|
||||||
|
t.setDeleted(json.read("$.dataInfo.deletedbyinference"));
|
||||||
|
t.setType(type);
|
||||||
|
t.setOaf(s);
|
||||||
|
return t;
|
||||||
|
}, Encoders.bean(TypedRow.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads a set of eu.dnetlib.dhp.schema.oaf.Relation objects from a sequence file <className, relation json serialization>,
|
||||||
|
* extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow
|
||||||
|
* @param sc
|
||||||
|
* @param inputPath
|
||||||
|
* @return the JavaRDD<TypedRow> containing all the relationships
|
||||||
|
*/
|
||||||
|
private Dataset<Relation> readPathRelation(final JavaSparkContext sc, final String inputPath) {
|
||||||
|
final RDD<Row> rdd = sc.textFile(inputPath + "/relation")
|
||||||
|
.map((Function<String, Row>) s -> RowFactory.create("", s))
|
||||||
|
.rdd();
|
||||||
|
|
||||||
|
return getSpark().createDataFrame(rdd, KV_SCHEMA)
|
||||||
|
.map((MapFunction<Row, Relation>) value -> new ObjectMapper().readValue(value.<String>getAs("value"), Relation.class), Encoders.bean(Relation.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
private ObjectMapper getObjectMapper() {
|
||||||
|
return new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void prepareAccumulators(SparkContext sc) {
|
||||||
|
accumulators.put("resultResult_similarity_isAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_isAmongTopNSimilarDocuments"));
|
||||||
|
accumulators.put("resultResult_similarity_hasAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_hasAmongTopNSimilarDocuments"));
|
||||||
|
accumulators.put("resultResult_supplement_isSupplementTo", sc.longAccumulator("resultResult_supplement_isSupplementTo"));
|
||||||
|
accumulators.put("resultResult_supplement_isSupplementedBy", sc.longAccumulator("resultResult_supplement_isSupplementedBy"));
|
||||||
|
accumulators.put("resultResult_dedup_isMergedIn", sc.longAccumulator("resultResult_dedup_isMergedIn"));
|
||||||
|
accumulators.put("resultResult_dedup_merges", sc.longAccumulator("resultResult_dedup_merges"));
|
||||||
|
|
||||||
|
accumulators.put("resultResult_publicationDataset_isRelatedTo", sc.longAccumulator("resultResult_publicationDataset_isRelatedTo"));
|
||||||
|
accumulators.put("resultResult_relationship_isRelatedTo", sc.longAccumulator("resultResult_relationship_isRelatedTo"));
|
||||||
|
accumulators.put("resultProject_outcome_isProducedBy", sc.longAccumulator("resultProject_outcome_isProducedBy"));
|
||||||
|
accumulators.put("resultProject_outcome_produces", sc.longAccumulator("resultProject_outcome_produces"));
|
||||||
|
accumulators.put("resultOrganization_affiliation_isAuthorInstitutionOf", sc.longAccumulator("resultOrganization_affiliation_isAuthorInstitutionOf"));
|
||||||
|
|
||||||
|
accumulators.put("resultOrganization_affiliation_hasAuthorInstitution", sc.longAccumulator("resultOrganization_affiliation_hasAuthorInstitution"));
|
||||||
|
accumulators.put("projectOrganization_participation_hasParticipant", sc.longAccumulator("projectOrganization_participation_hasParticipant"));
|
||||||
|
accumulators.put("projectOrganization_participation_isParticipant", sc.longAccumulator("projectOrganization_participation_isParticipant"));
|
||||||
|
accumulators.put("organizationOrganization_dedup_isMergedIn", sc.longAccumulator("organizationOrganization_dedup_isMergedIn"));
|
||||||
|
accumulators.put("organizationOrganization_dedup_merges", sc.longAccumulator("resultProject_outcome_produces"));
|
||||||
|
accumulators.put("datasourceOrganization_provision_isProvidedBy", sc.longAccumulator("datasourceOrganization_provision_isProvidedBy"));
|
||||||
|
accumulators.put("datasourceOrganization_provision_provides", sc.longAccumulator("datasourceOrganization_provision_provides"));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -84,7 +84,7 @@ public class SparkXmlIndexingJob {
|
||||||
return SparkSession
|
return SparkSession
|
||||||
.builder()
|
.builder()
|
||||||
.config(conf)
|
.config(conf)
|
||||||
.appName(SparkXmlRecordBuilderJob.class.getSimpleName())
|
.appName(SparkXmlIndexingJob.class.getSimpleName())
|
||||||
.master(master)
|
.master(master)
|
||||||
.getOrCreate();
|
.getOrCreate();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,47 +0,0 @@
|
||||||
package eu.dnetlib.dhp.oa.provision;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
|
|
||||||
public class SparkXmlRecordBuilderJob {
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
|
||||||
IOUtils.toString(
|
|
||||||
SparkXmlRecordBuilderJob.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json")));
|
|
||||||
parser.parseArgument(args);
|
|
||||||
|
|
||||||
final String master = parser.get("master");
|
|
||||||
final SparkConf conf = new SparkConf()
|
|
||||||
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
|
||||||
|
|
||||||
try(SparkSession spark = getSession(conf, master)) {
|
|
||||||
|
|
||||||
final String inputPath = parser.get("sourcePath");
|
|
||||||
final String outputPath = parser.get("outputPath");
|
|
||||||
final String isLookupUrl = parser.get("isLookupUrl");
|
|
||||||
final String otherDsTypeId = parser.get("otherDsTypeId");
|
|
||||||
|
|
||||||
final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
|
|
||||||
|
|
||||||
new GraphJoiner(spark, ContextMapper.fromIS(isLookupUrl), otherDsTypeId, inputPath, outputPath)
|
|
||||||
.adjacencyLists();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static SparkSession getSession(SparkConf conf, String master) {
|
|
||||||
return SparkSession
|
|
||||||
.builder()
|
|
||||||
.config(conf)
|
|
||||||
.appName(SparkXmlRecordBuilderJob.class.getSimpleName())
|
|
||||||
.master(master)
|
|
||||||
.getOrCreate();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -0,0 +1,81 @@
|
||||||
|
package eu.dnetlib.dhp.oa.provision;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.oa.provision.model.*;
|
||||||
|
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
|
||||||
|
public class SparkXmlRecordBuilderJob_v2 {
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils.toString(
|
||||||
|
SparkXmlRecordBuilderJob_v2.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json")));
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
final String master = parser.get("master");
|
||||||
|
try(SparkSession spark = getSession(master)) {
|
||||||
|
|
||||||
|
final String inputPath = parser.get("sourcePath");
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
final String isLookupUrl = parser.get("isLookupUrl");
|
||||||
|
final String otherDsTypeId = parser.get("otherDsTypeId");
|
||||||
|
|
||||||
|
new GraphJoiner_v2(spark, ContextMapper.fromIS(isLookupUrl), otherDsTypeId, inputPath, outputPath)
|
||||||
|
.adjacencyLists();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SparkSession getSession(String master) {
|
||||||
|
final SparkConf conf = new SparkConf();
|
||||||
|
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||||
|
conf.set("spark.sql.shuffle.partitions", "500");
|
||||||
|
conf.registerKryoClasses(new Class[]{
|
||||||
|
Author.class,
|
||||||
|
Context.class,
|
||||||
|
Country.class,
|
||||||
|
DataInfo.class,
|
||||||
|
eu.dnetlib.dhp.schema.oaf.Dataset.class,
|
||||||
|
Datasource.class,
|
||||||
|
ExternalReference.class,
|
||||||
|
ExtraInfo.class,
|
||||||
|
Field.class,
|
||||||
|
GeoLocation.class,
|
||||||
|
Instance.class,
|
||||||
|
Journal.class,
|
||||||
|
KeyValue.class,
|
||||||
|
Oaf.class,
|
||||||
|
OafEntity.class,
|
||||||
|
OAIProvenance.class,
|
||||||
|
Organization.class,
|
||||||
|
OriginDescription.class,
|
||||||
|
OtherResearchProduct.class,
|
||||||
|
Project.class,
|
||||||
|
Publication.class,
|
||||||
|
Qualifier.class,
|
||||||
|
Relation.class,
|
||||||
|
Result.class,
|
||||||
|
Software.class,
|
||||||
|
StructuredProperty.class,
|
||||||
|
|
||||||
|
TypedRow.class,
|
||||||
|
EntityRelEntity.class,
|
||||||
|
JoinedEntity.class,
|
||||||
|
SortableRelationKey.class,
|
||||||
|
Tuple2.class,
|
||||||
|
Links.class,
|
||||||
|
RelatedEntity.class
|
||||||
|
});
|
||||||
|
return SparkSession
|
||||||
|
.builder()
|
||||||
|
.config(conf)
|
||||||
|
.appName(SparkXmlRecordBuilderJob_v2.class.getSimpleName())
|
||||||
|
.master(master)
|
||||||
|
.getOrCreate();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,54 +1,36 @@
|
||||||
package eu.dnetlib.dhp.oa.provision.model;
|
package eu.dnetlib.dhp.oa.provision.model;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
public class EntityRelEntity implements Serializable {
|
public class EntityRelEntity implements Serializable {
|
||||||
|
|
||||||
private TypedRow source;
|
private TypedRow entity;
|
||||||
private TypedRow relation;
|
private Relation relation;
|
||||||
private TypedRow target;
|
private RelatedEntity target;
|
||||||
|
|
||||||
public EntityRelEntity() {
|
public TypedRow getEntity() {
|
||||||
|
return entity;
|
||||||
}
|
}
|
||||||
|
|
||||||
public EntityRelEntity(TypedRow source) {
|
public void setEntity(TypedRow entity) {
|
||||||
this.source = source;
|
this.entity = entity;
|
||||||
}
|
}
|
||||||
|
|
||||||
//helpers
|
public Relation getRelation() {
|
||||||
public Boolean hasMainEntity() {
|
|
||||||
return getSource() != null & getRelation() == null & getTarget() == null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Boolean hasRelatedEntity() {
|
|
||||||
return getSource() == null & getRelation() != null & getTarget() != null;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public TypedRow getSource() {
|
|
||||||
return source;
|
|
||||||
}
|
|
||||||
|
|
||||||
public EntityRelEntity setSource(TypedRow source) {
|
|
||||||
this.source = source;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public TypedRow getRelation() {
|
|
||||||
return relation;
|
return relation;
|
||||||
}
|
}
|
||||||
|
|
||||||
public EntityRelEntity setRelation(TypedRow relation) {
|
public void setRelation(Relation relation) {
|
||||||
this.relation = relation;
|
this.relation = relation;
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public TypedRow getTarget() {
|
public RelatedEntity getTarget() {
|
||||||
return target;
|
return target;
|
||||||
}
|
}
|
||||||
|
|
||||||
public EntityRelEntity setTarget(TypedRow target) {
|
public void setTarget(RelatedEntity target) {
|
||||||
this.target = target;
|
this.target = target;
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,26 +16,23 @@ public class JoinedEntity implements Serializable {
|
||||||
return type;
|
return type;
|
||||||
}
|
}
|
||||||
|
|
||||||
public JoinedEntity setType(String type) {
|
public void setType(String type) {
|
||||||
this.type = type;
|
this.type = type;
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public OafEntity getEntity() {
|
public OafEntity getEntity() {
|
||||||
return entity;
|
return entity;
|
||||||
}
|
}
|
||||||
|
|
||||||
public JoinedEntity setEntity(OafEntity entity) {
|
public void setEntity(OafEntity entity) {
|
||||||
this.entity = entity;
|
this.entity = entity;
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Links getLinks() {
|
public Links getLinks() {
|
||||||
return links;
|
return links;
|
||||||
}
|
}
|
||||||
|
|
||||||
public JoinedEntity setLinks(Links links) {
|
public void setLinks(Links links) {
|
||||||
this.links = links;
|
this.links = links;
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,207 +49,183 @@ public class RelatedEntity implements Serializable {
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
public RelatedEntity setId(String id) {
|
public void setId(String id) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public StructuredProperty getTitle() {
|
|
||||||
return title;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setTitle(StructuredProperty title) {
|
|
||||||
this.title = title;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getDateofacceptance() {
|
|
||||||
return dateofacceptance;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setDateofacceptance(String dateofacceptance) {
|
|
||||||
this.dateofacceptance = dateofacceptance;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getPublisher() {
|
|
||||||
return publisher;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setPublisher(String publisher) {
|
|
||||||
this.publisher = publisher;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<StructuredProperty> getPid() {
|
|
||||||
return pid;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setPid(List<StructuredProperty> pid) {
|
|
||||||
this.pid = pid;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getCodeRepositoryUrl() {
|
|
||||||
return codeRepositoryUrl;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setCodeRepositoryUrl(String codeRepositoryUrl) {
|
|
||||||
this.codeRepositoryUrl = codeRepositoryUrl;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Qualifier getResulttype() {
|
|
||||||
return resulttype;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setResulttype(Qualifier resulttype) {
|
|
||||||
this.resulttype = resulttype;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<KeyValue> getCollectedfrom() {
|
|
||||||
return collectedfrom;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setCollectedfrom(List<KeyValue> collectedfrom) {
|
|
||||||
this.collectedfrom = collectedfrom;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<Instance> getInstances() {
|
|
||||||
return instances;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setInstances(List<Instance> instances) {
|
|
||||||
this.instances = instances;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getOfficialname() {
|
|
||||||
return officialname;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setOfficialname(String officialname) {
|
|
||||||
this.officialname = officialname;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getWebsiteurl() {
|
|
||||||
return websiteurl;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setWebsiteurl(String websiteurl) {
|
|
||||||
this.websiteurl = websiteurl;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Qualifier getDatasourcetype() {
|
|
||||||
return datasourcetype;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setDatasourcetype(Qualifier datasourcetype) {
|
|
||||||
this.datasourcetype = datasourcetype;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Qualifier getDatasourcetypeui() {
|
|
||||||
return datasourcetypeui;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setDatasourcetypeui(Qualifier datasourcetypeui) {
|
|
||||||
this.datasourcetypeui = datasourcetypeui;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Qualifier getOpenairecompatibility() {
|
|
||||||
return openairecompatibility;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setOpenairecompatibility(Qualifier openairecompatibility) {
|
|
||||||
this.openairecompatibility = openairecompatibility;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getLegalname() {
|
|
||||||
return legalname;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setLegalname(String legalname) {
|
|
||||||
this.legalname = legalname;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getLegalshortname() {
|
|
||||||
return legalshortname;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setLegalshortname(String legalshortname) {
|
|
||||||
this.legalshortname = legalshortname;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Qualifier getCountry() {
|
|
||||||
return country;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setCountry(Qualifier country) {
|
|
||||||
this.country = country;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getCode() {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setCode(String code) {
|
|
||||||
this.code = code;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getAcronym() {
|
|
||||||
return acronym;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setAcronym(String acronym) {
|
|
||||||
this.acronym = acronym;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Qualifier getContracttype() {
|
|
||||||
return contracttype;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setContracttype(Qualifier contracttype) {
|
|
||||||
this.contracttype = contracttype;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<String> getFundingtree() {
|
|
||||||
return fundingtree;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setFundingtree(List<String> fundingtree) {
|
|
||||||
this.fundingtree = fundingtree;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getProjectTitle() {
|
|
||||||
return projectTitle;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RelatedEntity setProjectTitle(String projectTitle) {
|
|
||||||
this.projectTitle = projectTitle;
|
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getType() {
|
public String getType() {
|
||||||
return type;
|
return type;
|
||||||
}
|
}
|
||||||
|
|
||||||
public RelatedEntity setType(String type) {
|
public void setType(String type) {
|
||||||
this.type = type;
|
this.type = type;
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public StructuredProperty getTitle() {
|
||||||
|
return title;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTitle(StructuredProperty title) {
|
||||||
|
this.title = title;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getWebsiteurl() {
|
||||||
|
return websiteurl;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setWebsiteurl(String websiteurl) {
|
||||||
|
this.websiteurl = websiteurl;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDateofacceptance() {
|
||||||
|
return dateofacceptance;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDateofacceptance(String dateofacceptance) {
|
||||||
|
this.dateofacceptance = dateofacceptance;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getPublisher() {
|
||||||
|
return publisher;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPublisher(String publisher) {
|
||||||
|
this.publisher = publisher;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<StructuredProperty> getPid() {
|
||||||
|
return pid;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPid(List<StructuredProperty> pid) {
|
||||||
|
this.pid = pid;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getCodeRepositoryUrl() {
|
||||||
|
return codeRepositoryUrl;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCodeRepositoryUrl(String codeRepositoryUrl) {
|
||||||
|
this.codeRepositoryUrl = codeRepositoryUrl;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Qualifier getResulttype() {
|
||||||
|
return resulttype;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setResulttype(Qualifier resulttype) {
|
||||||
|
this.resulttype = resulttype;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<KeyValue> getCollectedfrom() {
|
||||||
|
return collectedfrom;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCollectedfrom(List<KeyValue> collectedfrom) {
|
||||||
|
this.collectedfrom = collectedfrom;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Instance> getInstances() {
|
||||||
|
return instances;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setInstances(List<Instance> instances) {
|
||||||
|
this.instances = instances;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getOfficialname() {
|
||||||
|
return officialname;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOfficialname(String officialname) {
|
||||||
|
this.officialname = officialname;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Qualifier getDatasourcetype() {
|
||||||
|
return datasourcetype;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDatasourcetype(Qualifier datasourcetype) {
|
||||||
|
this.datasourcetype = datasourcetype;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Qualifier getDatasourcetypeui() {
|
||||||
|
return datasourcetypeui;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDatasourcetypeui(Qualifier datasourcetypeui) {
|
||||||
|
this.datasourcetypeui = datasourcetypeui;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Qualifier getOpenairecompatibility() {
|
||||||
|
return openairecompatibility;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOpenairecompatibility(Qualifier openairecompatibility) {
|
||||||
|
this.openairecompatibility = openairecompatibility;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLegalname() {
|
||||||
|
return legalname;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLegalname(String legalname) {
|
||||||
|
this.legalname = legalname;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLegalshortname() {
|
||||||
|
return legalshortname;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLegalshortname(String legalshortname) {
|
||||||
|
this.legalshortname = legalshortname;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Qualifier getCountry() {
|
||||||
|
return country;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCountry(Qualifier country) {
|
||||||
|
this.country = country;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getProjectTitle() {
|
||||||
|
return projectTitle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setProjectTitle(String projectTitle) {
|
||||||
|
this.projectTitle = projectTitle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getCode() {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCode(String code) {
|
||||||
|
this.code = code;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getAcronym() {
|
||||||
|
return acronym;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAcronym(String acronym) {
|
||||||
|
this.acronym = acronym;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Qualifier getContracttype() {
|
||||||
|
return contracttype;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setContracttype(Qualifier contracttype) {
|
||||||
|
this.contracttype = contracttype;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getFundingtree() {
|
||||||
|
return fundingtree;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setFundingtree(List<String> fundingtree) {
|
||||||
|
this.fundingtree = fundingtree;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -2,6 +2,7 @@ package eu.dnetlib.dhp.oa.provision.model;
|
||||||
|
|
||||||
import com.google.common.collect.ComparisonChain;
|
import com.google.common.collect.ComparisonChain;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -33,58 +34,54 @@ public class SortableRelationKey implements Comparable<SortableRelationKey>, Ser
|
||||||
weights.put("dedup", 8);
|
weights.put("dedup", 8);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static SortableRelationKey from(final EntityRelEntity e) {
|
public static SortableRelationKey from(final Relation r) {
|
||||||
return new SortableRelationKey()
|
final SortableRelationKey s = new SortableRelationKey();
|
||||||
.setSourceId(e.getRelation().getSourceId())
|
s.setSourceId(r.getSource());
|
||||||
.setTargetId(e.getRelation().getTargetId())
|
s.setTargetId(r.getTarget());
|
||||||
.setRelType(e.getRelation().getRelType())
|
s.setRelType(r.getRelType());
|
||||||
.setSubRelType(e.getRelation().getSubRelType())
|
s.setSubRelType(r.getSubRelType());
|
||||||
.setRelClass(e.getRelation().getRelClass());
|
s.setRelClass(r.getRelClass());
|
||||||
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getSourceId() {
|
public String getSourceId() {
|
||||||
return sourceId;
|
return sourceId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SortableRelationKey setSourceId(String sourceId) {
|
public void setSourceId(String sourceId) {
|
||||||
this.sourceId = sourceId;
|
this.sourceId = sourceId;
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getTargetId() {
|
public String getTargetId() {
|
||||||
return targetId;
|
return targetId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SortableRelationKey setTargetId(String targetId) {
|
public void setTargetId(String targetId) {
|
||||||
this.targetId = targetId;
|
this.targetId = targetId;
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getRelType() {
|
public String getRelType() {
|
||||||
return relType;
|
return relType;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SortableRelationKey setRelType(String relType) {
|
public void setRelType(String relType) {
|
||||||
this.relType = relType;
|
this.relType = relType;
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getSubRelType() {
|
public String getSubRelType() {
|
||||||
return subRelType;
|
return subRelType;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SortableRelationKey setSubRelType(String subRelType) {
|
public void setSubRelType(String subRelType) {
|
||||||
this.subRelType = subRelType;
|
this.subRelType = subRelType;
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getRelClass() {
|
public String getRelClass() {
|
||||||
return relClass;
|
return relClass;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SortableRelationKey setRelClass(String relClass) {
|
public void setRelClass(String relClass) {
|
||||||
this.relClass = relClass;
|
this.relClass = relClass;
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -8,21 +8,24 @@ public class Tuple2 {
|
||||||
|
|
||||||
private RelatedEntity relatedEntity;
|
private RelatedEntity relatedEntity;
|
||||||
|
|
||||||
|
public Tuple2(Relation relation, RelatedEntity relatedEntity) {
|
||||||
|
this.relation = relation;
|
||||||
|
this.relatedEntity = relatedEntity;
|
||||||
|
}
|
||||||
|
|
||||||
public Relation getRelation() {
|
public Relation getRelation() {
|
||||||
return relation;
|
return relation;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Tuple2 setRelation(Relation relation) {
|
public void setRelation(Relation relation) {
|
||||||
this.relation = relation;
|
this.relation = relation;
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public RelatedEntity getRelatedEntity() {
|
public RelatedEntity getRelatedEntity() {
|
||||||
return relatedEntity;
|
return relatedEntity;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Tuple2 setRelatedEntity(RelatedEntity relatedEntity) {
|
public void setRelatedEntity(RelatedEntity relatedEntity) {
|
||||||
this.relatedEntity = relatedEntity;
|
this.relatedEntity = relatedEntity;
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,92 +1,61 @@
|
||||||
package eu.dnetlib.dhp.oa.provision.model;
|
package eu.dnetlib.dhp.oa.provision.model;
|
||||||
|
|
||||||
|
import com.google.common.base.Objects;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
public class TypedRow implements Serializable {
|
public class TypedRow implements Serializable {
|
||||||
|
|
||||||
private String sourceId;
|
private String id;
|
||||||
|
|
||||||
private String targetId;
|
|
||||||
|
|
||||||
private Boolean deleted;
|
private Boolean deleted;
|
||||||
|
|
||||||
private String type;
|
private String type;
|
||||||
|
|
||||||
private String relType;
|
|
||||||
private String subRelType;
|
|
||||||
private String relClass;
|
|
||||||
|
|
||||||
private String oaf;
|
private String oaf;
|
||||||
|
|
||||||
public String getSourceId() {
|
public String getId() {
|
||||||
return sourceId;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TypedRow setSourceId(String sourceId) {
|
public void setId(String id) {
|
||||||
this.sourceId = sourceId;
|
this.id = id;
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getTargetId() {
|
|
||||||
return targetId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public TypedRow setTargetId(String targetId) {
|
|
||||||
this.targetId = targetId;
|
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Boolean getDeleted() {
|
public Boolean getDeleted() {
|
||||||
return deleted;
|
return deleted;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TypedRow setDeleted(Boolean deleted) {
|
public void setDeleted(Boolean deleted) {
|
||||||
this.deleted = deleted;
|
this.deleted = deleted;
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getType() {
|
public String getType() {
|
||||||
return type;
|
return type;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TypedRow setType(String type) {
|
public void setType(String type) {
|
||||||
this.type = type;
|
this.type = type;
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getRelType() {
|
|
||||||
return relType;
|
|
||||||
}
|
|
||||||
|
|
||||||
public TypedRow setRelType(String relType) {
|
|
||||||
this.relType = relType;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getSubRelType() {
|
|
||||||
return subRelType;
|
|
||||||
}
|
|
||||||
|
|
||||||
public TypedRow setSubRelType(String subRelType) {
|
|
||||||
this.subRelType = subRelType;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getRelClass() {
|
|
||||||
return relClass;
|
|
||||||
}
|
|
||||||
|
|
||||||
public TypedRow setRelClass(String relClass) {
|
|
||||||
this.relClass = relClass;
|
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getOaf() {
|
public String getOaf() {
|
||||||
return oaf;
|
return oaf;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TypedRow setOaf(String oaf) {
|
public void setOaf(String oaf) {
|
||||||
this.oaf = oaf;
|
this.oaf = oaf;
|
||||||
return this;
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) return true;
|
||||||
|
if (o == null || getClass() != o.getClass()) return false;
|
||||||
|
TypedRow typedRow2 = (TypedRow) o;
|
||||||
|
return Objects.equal(id, typedRow2.id);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hashCode(id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,14 +3,11 @@ package eu.dnetlib.dhp.oa.provision.utils;
|
||||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.base.Predicate;
|
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.jayway.jsonpath.DocumentContext;
|
import com.jayway.jsonpath.DocumentContext;
|
||||||
import com.jayway.jsonpath.JsonPath;
|
import com.jayway.jsonpath.JsonPath;
|
||||||
import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity;
|
import eu.dnetlib.dhp.oa.provision.model.*;
|
||||||
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
|
|
||||||
import eu.dnetlib.dhp.oa.provision.model.TypedRow;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import net.minidev.json.JSONArray;
|
import net.minidev.json.JSONArray;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
@ -66,14 +63,14 @@ public class GraphMappingUtils {
|
||||||
return MainEntityType.result.name().equals(getMainType(type));
|
return MainEntityType.result.name().equals(getMainType(type));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Predicate<String> instanceFilter = s -> instanceFieldFilter.contains(s);
|
public static RelatedEntity asRelatedEntity(TypedRow e) {
|
||||||
|
|
||||||
public static EntityRelEntity asRelatedEntity(EntityRelEntity e) {
|
final DocumentContext j = JsonPath.parse(e.getOaf());
|
||||||
|
final RelatedEntity re = new RelatedEntity();
|
||||||
|
re.setId(j.read("$.id"));
|
||||||
|
re.setType(e.getType());
|
||||||
|
|
||||||
final DocumentContext j = JsonPath.parse(e.getSource().getOaf());
|
switch (EntityType.valueOf(e.getType())) {
|
||||||
final RelatedEntity re = new RelatedEntity().setId(j.read("$.id")).setType(e.getSource().getType());
|
|
||||||
|
|
||||||
switch (EntityType.valueOf(e.getSource().getType())) {
|
|
||||||
case publication:
|
case publication:
|
||||||
case dataset:
|
case dataset:
|
||||||
case otherresearchproduct:
|
case otherresearchproduct:
|
||||||
|
@ -147,14 +144,11 @@ public class GraphMappingUtils {
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
return new EntityRelEntity().setSource(
|
|
||||||
new TypedRow()
|
return re;
|
||||||
.setSourceId(e.getSource().getSourceId())
|
|
||||||
.setDeleted(e.getSource().getDeleted())
|
|
||||||
.setType(e.getSource().getType())
|
|
||||||
.setOaf(serialize(re)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static KeyValue asKV(LinkedHashMap<String, Object> j) {
|
private static KeyValue asKV(LinkedHashMap<String, Object> j) {
|
||||||
final KeyValue kv = new KeyValue();
|
final KeyValue kv = new KeyValue();
|
||||||
kv.setKey((String) j.get("key"));
|
kv.setKey((String) j.get("key"));
|
||||||
|
|
|
@ -7,9 +7,7 @@ import com.google.common.collect.Maps;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.mycila.xmltool.XMLDoc;
|
import com.mycila.xmltool.XMLDoc;
|
||||||
import com.mycila.xmltool.XMLTag;
|
import com.mycila.xmltool.XMLTag;
|
||||||
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
|
import eu.dnetlib.dhp.oa.provision.model.*;
|
||||||
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
|
|
||||||
import eu.dnetlib.dhp.oa.provision.model.Tuple2;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
|
@ -65,7 +65,7 @@
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>build_adjacency_lists</name>
|
<name>build_adjacency_lists</name>
|
||||||
<class>eu.dnetlib.dhp.oa.provision.SparkXmlRecordBuilderJob</class>
|
<class>eu.dnetlib.dhp.oa.provision.SparkXmlRecordBuilderJob_v2</class>
|
||||||
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-cores ${sparkExecutorCoresForJoining}
|
--executor-cores ${sparkExecutorCoresForJoining}
|
||||||
|
@ -75,6 +75,7 @@
|
||||||
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
|
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.network.timeout=10000000
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>-mt</arg> <arg>yarn</arg>
|
<arg>-mt</arg> <arg>yarn</arg>
|
||||||
<arg>-is</arg> <arg>${isLookupUrl}</arg>
|
<arg>-is</arg> <arg>${isLookupUrl}</arg>
|
||||||
|
|
Loading…
Reference in New Issue