Merge branch 'master' of code-repo.d4science.org:D-Net/dnet-hadoop

This commit is contained in:
Sandro La Bruzzo 2020-02-19 10:13:45 +01:00
commit 9a2d74ac82
49 changed files with 3593 additions and 75 deletions

2
.gitignore vendored
View File

@ -20,5 +20,5 @@
/*/build
/build
spark-warehouse
/*/*/job-override.properties
/**/job-override.properties

View File

@ -42,6 +42,10 @@
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
<groupId>net.sf.saxon</groupId>
<artifactId>Saxon-HE</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,32 @@
package eu.dnetlib.dhp.utils.saxon;
import net.sf.saxon.expr.XPathContext;
import net.sf.saxon.lib.ExtensionFunctionCall;
import net.sf.saxon.lib.ExtensionFunctionDefinition;
import net.sf.saxon.om.Sequence;
import net.sf.saxon.om.StructuredQName;
import net.sf.saxon.trans.XPathException;
public abstract class AbstractExtensionFunction extends ExtensionFunctionDefinition {
public static String DEFAULT_SAXON_EXT_NS_URI = "http://www.d-net.research-infrastructures.eu/saxon-extension";
public abstract String getName();
public abstract Sequence doCall(XPathContext context, Sequence[] arguments) throws XPathException;
@Override
public StructuredQName getFunctionQName() {
return new StructuredQName("dnet", DEFAULT_SAXON_EXT_NS_URI, getName());
}
@Override
public ExtensionFunctionCall makeCallExpression() {
return new ExtensionFunctionCall() {
@Override
public Sequence call(XPathContext context, Sequence[] arguments) throws XPathException {
return doCall(context, arguments);
}
};
}
}

View File

@ -0,0 +1,67 @@
package eu.dnetlib.dhp.utils.saxon;
import net.sf.saxon.expr.XPathContext;
import net.sf.saxon.om.Item;
import net.sf.saxon.om.Sequence;
import net.sf.saxon.trans.XPathException;
import net.sf.saxon.value.SequenceType;
import net.sf.saxon.value.StringValue;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.GregorianCalendar;
public class ExtractYear extends AbstractExtensionFunction {
private static final String[] dateFormats = { "yyyy-MM-dd", "yyyy/MM/dd" };
@Override
public String getName() {
return "extractYear";
}
@Override
public Sequence doCall(XPathContext context, Sequence[] arguments) throws XPathException {
if (arguments == null | arguments.length == 0) {
return new StringValue("");
}
final Item item = arguments[0].head();
if (item == null) {
return new StringValue("");
}
return new StringValue(_year(item.getStringValue()));
}
@Override
public int getMinimumNumberOfArguments() {
return 0;
}
@Override
public int getMaximumNumberOfArguments() {
return 1;
}
@Override
public SequenceType[] getArgumentTypes() {
return new SequenceType[] { SequenceType.OPTIONAL_ITEM };
}
@Override
public SequenceType getResultType(SequenceType[] suppliedArgumentTypes) {
return SequenceType.SINGLE_STRING;
}
private String _year(String s) {
Calendar c = new GregorianCalendar();
for (String format : dateFormats) {
try {
c.setTime(new SimpleDateFormat(format).parse(s));
String year = String.valueOf(c.get(Calendar.YEAR));
return year;
} catch (ParseException e) {}
}
return "";
}
}

View File

@ -0,0 +1,66 @@
package eu.dnetlib.dhp.utils.saxon;
import net.sf.saxon.expr.XPathContext;
import net.sf.saxon.om.Sequence;
import net.sf.saxon.trans.XPathException;
import net.sf.saxon.value.SequenceType;
import net.sf.saxon.value.StringValue;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class NormalizeDate extends AbstractExtensionFunction {
private static final String[] normalizeDateFormats = { "yyyy-MM-dd'T'hh:mm:ss", "yyyy-MM-dd", "yyyy/MM/dd", "yyyy" };
private static final String normalizeOutFormat = new String("yyyy-MM-dd'T'hh:mm:ss'Z'");
@Override
public String getName() {
return "normalizeDate";
}
@Override
public Sequence doCall(XPathContext context, Sequence[] arguments) throws XPathException {
if (arguments == null | arguments.length == 0) {
return new StringValue("");
}
String s = arguments[0].head().getStringValue();
return new StringValue(_year(s));
}
@Override
public int getMinimumNumberOfArguments() {
return 0;
}
@Override
public int getMaximumNumberOfArguments() {
return 1;
}
@Override
public SequenceType[] getArgumentTypes() {
return new SequenceType[] { SequenceType.OPTIONAL_ITEM };
}
@Override
public SequenceType getResultType(SequenceType[] suppliedArgumentTypes) {
return SequenceType.SINGLE_STRING;
}
private String _year(String s) {
final String date = s != null ? s.trim() : "";
for (String format : normalizeDateFormats) {
try {
Date parse = new SimpleDateFormat(format).parse(date);
String res = new SimpleDateFormat(normalizeOutFormat).format(parse);
return res;
} catch (ParseException e) {}
}
return "";
}
}

View File

@ -0,0 +1,60 @@
package eu.dnetlib.dhp.utils.saxon;
import net.sf.saxon.expr.XPathContext;
import net.sf.saxon.om.Item;
import net.sf.saxon.om.Sequence;
import net.sf.saxon.trans.XPathException;
import net.sf.saxon.value.SequenceType;
import net.sf.saxon.value.StringValue;
import org.apache.commons.lang3.StringUtils;
public class PickFirst extends AbstractExtensionFunction {
@Override
public String getName() {
return "pickFirst";
}
@Override
public Sequence doCall(XPathContext context, Sequence[] arguments) throws XPathException {
if (arguments == null | arguments.length == 0) {
return new StringValue("");
}
final String s1 = getValue(arguments[0]);
final String s2 = getValue(arguments[1]);
return new StringValue(StringUtils.isNotBlank(s1) ? s1 : StringUtils.isNotBlank(s2) ? s2 : "");
}
private String getValue(final Sequence arg) throws XPathException {
if (arg != null) {
final Item item = arg.head();
if (item != null) {
return item.getStringValue();
}
}
return "";
}
@Override
public int getMinimumNumberOfArguments() {
return 0;
}
@Override
public int getMaximumNumberOfArguments() {
return 2;
}
@Override
public SequenceType[] getArgumentTypes() {
return new SequenceType[] { SequenceType.OPTIONAL_ITEM };
}
@Override
public SequenceType getResultType(SequenceType[] suppliedArgumentTypes) {
return SequenceType.SINGLE_STRING;
}
}

View File

@ -0,0 +1,30 @@
package eu.dnetlib.dhp.utils.saxon;
import net.sf.saxon.Configuration;
import net.sf.saxon.TransformerFactoryImpl;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.stream.StreamSource;
import java.io.StringReader;
public class SaxonTransformerFactory {
/**
* Creates the index record transformer from the given XSLT
* @param xslt
* @return
* @throws TransformerException
*/
public static Transformer newInstance(final String xslt) throws TransformerException {
final TransformerFactoryImpl factory = new TransformerFactoryImpl();
final Configuration conf = factory.getConfiguration();
conf.registerExtensionFunction(new ExtractYear());
conf.registerExtensionFunction(new NormalizeDate());
conf.registerExtensionFunction(new PickFirst());
return factory.newTransformer(new StreamSource(new StringReader(xslt)));
}
}

View File

@ -1,3 +1,11 @@
Description of the project
--------------------------
This project defines **serialization schemas** of Avro data store files that are used to pass data between workflow nodes in the system.
This project defines **object schemas** of the OpenAIRE main entities and the relationships that intercur among them.
Namely it defines the model for
- **research product (result)** which subclasses in publication, dataset, other research product, software
- **data source** object describing the data provider (institutional repository, aggregators, cris systems)
- **organization** research bodies managing a data source or participating to a research project
- **project** research project
Te serialization of such objects (data store files) are used to pass data between workflow nodes in the processing pipeline.

View File

@ -26,6 +26,11 @@
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>

View File

@ -1,5 +1,6 @@
package eu.dnetlib.dhp.schema.oaf;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
@ -36,7 +37,7 @@ public class GeoLocation implements Serializable {
this.place = place;
}
@JsonIgnore
public boolean isBlank() {
return StringUtils.isBlank(point) &&
StringUtils.isBlank(box) &&

View File

@ -1,5 +1,6 @@
package eu.dnetlib.dhp.schema.oaf;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
@ -40,6 +41,7 @@ public class KeyValue implements Serializable {
return isBlank()?"":String.format("%s::%s", key != null ? key.toLowerCase() : "", value != null ? value.toLowerCase() : "");
}
@JsonIgnore
public boolean isBlank() {
return StringUtils.isBlank(key) && StringUtils.isBlank(value);
}

View File

@ -1,5 +1,6 @@
package eu.dnetlib.dhp.schema.oaf;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
@ -50,6 +51,8 @@ public class Qualifier implements Serializable {
schemeid != null ? schemeid : "",
schemename != null ? schemename : "");
}
@JsonIgnore
public boolean isBlank() {
return StringUtils.isBlank(classid) &&
StringUtils.isBlank(classname) &&

View File

@ -113,7 +113,7 @@
<arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
</java>
<ok to="End"/>
<ok to="ExtractPublication"/>
<error to="Kill"/>
</action>

View File

@ -1,24 +1,19 @@
package eu.dnetlib.dhp.graph;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import scala.Tuple2;
public class SparkGraphImporterJob {
public static void main(final String[] args) throws Exception {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json")));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json")));
parser.parseArgument(args);
final SparkSession spark = SparkSession
.builder()
@ -36,13 +31,8 @@ public class SparkGraphImporterJob {
// Read the input file and convert it into RDD of serializable object
GraphMappingUtils.types.forEach((name, clazz) -> {
final JavaRDD<Tuple2<String, String>> inputRDD = sc.sequenceFile(inputPath + "/" + name, Text.class, Text.class)
.map(item -> new Tuple2<>(item._1.toString(), item._2.toString()));
spark.createDataset(inputRDD
.filter(s -> s._1().equals(clazz.getName()))
.map(Tuple2::_2)
.map(s -> new ObjectMapper().readValue(s, clazz))
spark.createDataset(sc.sequenceFile(inputPath + "/" + name, Text.class, Text.class)
.map(s -> new ObjectMapper().readValue(s._2().toString(), clazz))
.rdd(), Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)

View File

@ -0,0 +1,10 @@
sparkDriverMemory=8G
sparkExecutorMemory=8G
#isLookupUrl=http://services.openaire.eu:8280/is/services/isLookUp
isLookupUrl=http://beta.services.openaire.eu:8280/is/services/isLookUp?wsdl
sourcePath=/tmp/db_openaireplus_services.export_dhp.2020.02.03
outputPath=/tmp/openaire_provision
format=TMF
batchSize=2000
sparkExecutorCoresForIndexing=64
reuseRecords=true

View File

@ -0,0 +1,92 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId>
<version>1.1.6-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-graph-provision</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
<dependency>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
</dependency>
<dependency>
<groupId>jaxen</groupId>
<artifactId>jaxen</artifactId>
</dependency>
<dependency>
<groupId>com.mycila.xmltool</groupId>
<artifactId>xmltool</artifactId>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>stringtemplate</artifactId>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
</dependency>
<dependency>
<groupId>com.lucidworks.spark</groupId>
<artifactId>spark-solr</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
</dependency>
<dependency>
<groupId>org.noggit</groupId>
<artifactId>noggit</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>cnr-rmi-api</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,257 @@
package eu.dnetlib.dhp.graph;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.graph.model.*;
import eu.dnetlib.dhp.graph.utils.ContextMapper;
import eu.dnetlib.dhp.graph.utils.GraphMappingUtils;
import eu.dnetlib.dhp.graph.utils.XmlRecordFactory;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.graph.utils.GraphMappingUtils.asRelatedEntity;
/**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
* The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization,
* and all the possible relationships (similarity links produced by the Dedup process are excluded).
*
* The operation is implemented creating the union between the entity types (E), joined by the relationships (R), and again
* by E, finally grouped by E.id;
*
* Different manipulations of the E and R sets are introduced to reduce the complexity of the operation
* 1) treat the object payload as string, extracting only the necessary information beforehand using json path,
* it seems that deserializing it with jackson's object mapper has higher memory footprint.
*
* 2) only consider rels that are not virtually deleted ($.dataInfo.deletedbyinference == false)
* 3) we only need a subset of fields from the related entities, so we introduce a distinction between E_source = S
* and E_target = T. Objects in T are heavily pruned by all the unnecessary information
*
* 4) perform the join as (((T join R) union S) groupby S.id) yield S -> [ <T, R> ]
*/
public class GraphJoiner implements Serializable {
public static final int MAX_RELS = 100;
public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd";
private SparkSession spark;
private ContextMapper contextMapper;
private String inputPath;
private String outPath;
public GraphJoiner(SparkSession spark, ContextMapper contextMapper, String inputPath, String outPath) {
this.spark = spark;
this.contextMapper = contextMapper;
this.inputPath = inputPath;
this.outPath = outPath;
}
public GraphJoiner adjacencyLists() {
final JavaSparkContext sc = new JavaSparkContext(getSpark().sparkContext());
// read each entity
JavaPairRDD<String, TypedRow> datasource = readPathEntity(sc, getInputPath(), "datasource");
JavaPairRDD<String, TypedRow> organization = readPathEntity(sc, getInputPath(), "organization");
JavaPairRDD<String, TypedRow> project = readPathEntity(sc, getInputPath(), "project");
JavaPairRDD<String, TypedRow> dataset = readPathEntity(sc, getInputPath(), "dataset");
JavaPairRDD<String, TypedRow> otherresearchproduct = readPathEntity(sc, getInputPath(), "otherresearchproduct");
JavaPairRDD<String, TypedRow> software = readPathEntity(sc, getInputPath(), "software");
JavaPairRDD<String, TypedRow> publication = readPathEntity(sc, getInputPath(), "publication");
// create the union between all the entities
final String entitiesPath = getOutPath() + "/entities";
datasource
.union(organization)
.union(project)
.union(dataset)
.union(otherresearchproduct)
.union(software)
.union(publication)
.map(e -> new EntityRelEntity().setSource(e._2()))
.map(GraphMappingUtils::serialize)
.saveAsTextFile(entitiesPath, GzipCodec.class);
JavaPairRDD<String, EntityRelEntity> entities = sc.textFile(entitiesPath)
.map(t -> new ObjectMapper().readValue(t, EntityRelEntity.class))
.mapToPair(t -> new Tuple2<>(t.getSource().getSourceId(), t));
// reads the relationships
final JavaPairRDD<String, EntityRelEntity> relation = readPathRelation(sc, getInputPath())
.filter(r -> !r.getDeleted()) //only consider those that are not virtually deleted
.map(p -> new EntityRelEntity().setRelation(p))
.mapToPair(p -> new Tuple2<>(p.getRelation().getSourceId(), p))
.groupByKey()
.map(p -> Iterables.limit(p._2(), MAX_RELS))
.flatMap(p -> p.iterator())
.mapToPair(p -> new Tuple2<>(p.getRelation().getTargetId(), p));
//final String bySource = getOutPath() + "/1_join_by_target";
JavaPairRDD<String, EntityRelEntity> bySource = relation
.join(entities
.filter(e -> !e._2().getSource().getDeleted())
.mapToPair(e -> new Tuple2<>(e._1(), asRelatedEntity(e._2()))))
.map(s -> new EntityRelEntity()
.setRelation(s._2()._1().getRelation())
.setTarget(s._2()._2().getSource()))
.mapToPair(t -> new Tuple2<>(t.getRelation().getSourceId(), t));
final XmlRecordFactory recordFactory = new XmlRecordFactory(contextMapper, false, schemaLocation, new HashSet<>());
entities
.union(bySource)
.groupByKey() // by source id
.map(l -> toJoinedEntity(l))
.mapToPair(je -> new Tuple2<>(
new Text(je.getEntity().getId()),
new Text(recordFactory.build(je))))
.saveAsHadoopFile(getOutPath() + "/xml", Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
return this;
}
public GraphJoiner asXML() {
final JavaSparkContext sc = new JavaSparkContext(getSpark().sparkContext());
final XmlRecordFactory recordFactory = new XmlRecordFactory(contextMapper, true, "", new HashSet<>());
final ObjectMapper mapper = new ObjectMapper();
final String joinedEntitiesPath = getOutPath() + "/1_joined_entities";
sc.textFile(joinedEntitiesPath)
.map(s -> mapper.readValue(s, JoinedEntity.class))
.mapToPair(je -> new Tuple2<>(new Text(je.getEntity().getId()), new Text(recordFactory.build(je))))
.saveAsHadoopFile(getOutPath() + "/2_xml", Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
return this;
}
public SparkSession getSpark() {
return spark;
}
public String getInputPath() {
return inputPath;
}
public String getOutPath() {
return outPath;
}
// HELPERS
private OafEntity parseOaf(final String json, final String type) {
final ObjectMapper o = new ObjectMapper();
try {
switch (GraphMappingUtils.EntityType.valueOf(type)) {
case publication:
return o.readValue(json, Publication.class);
case dataset:
return o.readValue(json, Dataset.class);
case otherresearchproduct:
return o.readValue(json, OtherResearchProduct.class);
case software:
return o.readValue(json, Software.class);
case datasource:
return o.readValue(json, Datasource.class);
case organization:
return o.readValue(json, Organization.class);
case project:
return o.readValue(json, Project.class);
default:
throw new IllegalArgumentException("invalid type: " + type);
}
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
private JoinedEntity toJoinedEntity(Tuple2<String, Iterable<EntityRelEntity>> p) {
final ObjectMapper o = new ObjectMapper();
final JoinedEntity j = new JoinedEntity();
final Links links2 = new Links();
for(EntityRelEntity rel : p._2()) {
if (rel.hasMainEntity() & j.getEntity() == null) {
j.setType(rel.getSource().getType());
j.setEntity(parseOaf(rel.getSource().getOaf(), rel.getSource().getType()));
}
if (rel.hasRelatedEntity()) {
try {
links2.add(
new eu.dnetlib.dhp.graph.model.Tuple2()
.setRelation(o.readValue(rel.getRelation().getOaf(), Relation.class))
.setRelatedEntity(o.readValue(rel.getTarget().getOaf(), RelatedEntity.class)));
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}
j.setLinks(links2);
if (j.getEntity() == null) {
throw new IllegalStateException("missing main entity on '" + p._1() + "'");
}
return j;
}
/**
* Reads a set of eu.dnetlib.dhp.schema.oaf.OafEntity objects from a sequence file <className, entity json serialization>,
* extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow
* @param sc
* @param inputPath
* @param type
* @return the JavaPairRDD<String, TypedRow> indexed by entity identifier
*/
private JavaPairRDD<String, TypedRow> readPathEntity(final JavaSparkContext sc, final String inputPath, final String type) {
return sc.sequenceFile(inputPath + "/" + type, Text.class, Text.class)
.mapToPair((PairFunction<Tuple2<Text, Text>, String, TypedRow>) item -> {
final String s = item._2().toString();
final DocumentContext json = JsonPath.parse(s);
final String id = json.read("$.id");
return new Tuple2<>(id, new TypedRow()
.setSourceId(id)
.setDeleted(json.read("$.dataInfo.deletedbyinference"))
.setType(type)
.setOaf(s));
});
}
/**
* Reads a set of eu.dnetlib.dhp.schema.oaf.Relation objects from a sequence file <className, relation json serialization>,
* extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow
* @param sc
* @param inputPath
* @return the JavaRDD<TypedRow> containing all the relationships
*/
private JavaRDD<TypedRow> readPathRelation(final JavaSparkContext sc, final String inputPath) {
return sc.sequenceFile(inputPath + "/relation", Text.class, Text.class)
.map(item -> {
final String s = item._2().toString();
final DocumentContext json = JsonPath.parse(s);
return new TypedRow()
.setSourceId(json.read("$.source"))
.setTargetId(json.read("$.target"))
.setDeleted(json.read("$.dataInfo.deletedbyinference"))
.setType("relation")
.setOaf(s);
});
}
}

View File

@ -0,0 +1,188 @@
package eu.dnetlib.dhp.graph;
import com.lucidworks.spark.util.SolrSupport;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.graph.utils.ISLookupClientFactory;
import eu.dnetlib.dhp.graph.utils.StreamingInputDocumentFactory;
import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.solr.common.SolrInputDocument;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.SparkSession;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.stream.StreamSource;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.text.SimpleDateFormat;
import java.util.Date;
public class SparkXmlIndexingJob {
private static final Log log = LogFactory.getLog(SparkXmlIndexingJob.class);
private static final Integer DEFAULT_BATCH_SIZE = 1000;
private static final String LAYOUT = "index";
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkXmlIndexingJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_params_update_index.json")));
parser.parseArgument(args);
final String inputPath = parser.get("sourcePath");
final String isLookupUrl = parser.get("isLookupUrl");
final String format = parser.get("format");
final Integer batchSize = parser.getObjectMap().containsKey("batchSize") ? Integer.valueOf(parser.get("batchSize")) : DEFAULT_BATCH_SIZE;
final ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl);
final String fields = getLayoutSource(isLookup, format);
final String xslt = getLayoutTransformer(isLookup);
final String dsId = getDsId(format, isLookup);
final String zkHost = getZkHost(isLookup);
final String version = getRecordDatestamp();
final String indexRecordXslt = getLayoutTransformer(format, fields, xslt);
log.info("indexRecordTransformer: " + indexRecordXslt);
final String master = parser.get("master");
final SparkConf conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
try(SparkSession spark = getSession(conf, master)) {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
RDD<SolrInputDocument> docs = sc.sequenceFile(inputPath, Text.class, Text.class)
.map(t -> t._2().toString())
.map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s))
.map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s))
.rdd();
SolrSupport.indexDocs(zkHost, format + "-" + LAYOUT + "-openaire", batchSize, docs);
}
}
private static SparkSession getSession(SparkConf conf, String master) {
return SparkSession
.builder()
.config(conf)
.appName(SparkXmlRecordBuilderJob.class.getSimpleName())
.master(master)
.getOrCreate();
}
private static String toIndexRecord(Transformer tr, final String record) {
final StreamResult res = new StreamResult(new StringWriter());
try {
tr.transform(new StreamSource(new StringReader(record)), res);
return res.getWriter().toString();
} catch (Throwable e) {
System.out.println("XPathException on record:\n" + record);
throw new IllegalArgumentException(e);
}
}
/**
* Creates the XSLT responsible for building the index xml records.
*
* @param format Metadata format name (DMF|TMF)
* @param xslt xslt for building the index record transformer
* @param fields the list of fields
* @return the javax.xml.transform.Transformer
* @throws ISLookUpException could happen
* @throws IOException could happen
* @throws TransformerException could happen
*/
private static String getLayoutTransformer(String format, String fields, String xslt) throws TransformerException {
final Transformer layoutTransformer = SaxonTransformerFactory.newInstance(xslt);
final StreamResult layoutToXsltXslt = new StreamResult(new StringWriter());
layoutTransformer.setParameter("format", format);
layoutTransformer.transform(new StreamSource(new StringReader(fields)), layoutToXsltXslt);
return layoutToXsltXslt.getWriter().toString();
}
/**
* method return a solr-compatible string representation of a date, used to mark all records as indexed today
* @return the parsed date
*/
public static String getRecordDatestamp() {
return new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss'Z'").format(new Date());
}
/**
* Method retrieves from the information system the list of fields associated to the given MDFormat name
*
* @param isLookup the ISLookup service stub
* @param format the Metadata format name
* @return the string representation of the list of fields to be indexed
*
* @throws ISLookUpDocumentNotFoundException
* @throws ISLookUpException
*/
private static String getLayoutSource(final ISLookUpService isLookup, final String format) throws ISLookUpDocumentNotFoundException, ISLookUpException {
return doLookup(isLookup, String.format(
"collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='%s']//LAYOUT[@name='%s']", format, LAYOUT));
}
/**
* Method retrieves from the information system the openaireLayoutToRecordStylesheet
*
* @param isLookup the ISLookup service stub
* @return the string representation of the XSLT contained in the transformation rule profile
*
* @throws ISLookUpDocumentNotFoundException
* @throws ISLookUpException
*/
private static String getLayoutTransformer(ISLookUpService isLookup) throws ISLookUpException {
return doLookup(isLookup, "collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType')" +
"//RESOURCE_PROFILE[./BODY/CONFIGURATION/SCRIPT/TITLE/text() = 'openaireLayoutToRecordStylesheet']//CODE/node()");
}
/**
* Method retrieves from the information system the IndexDS profile ID associated to the given MDFormat name
* @param format
* @param isLookup
* @return the IndexDS identifier
* @throws ISLookUpException
*/
private static String getDsId(String format, ISLookUpService isLookup) throws ISLookUpException {
return doLookup(isLookup, String.format("collection('/db/DRIVER/IndexDSResources/IndexDSResourceType')" +
"//RESOURCE_PROFILE[./BODY/CONFIGURATION/METADATA_FORMAT/text() = '%s']//RESOURCE_IDENTIFIER/@value/string()", format));
}
/**
* Method retrieves from the information system the zookeeper quorum of the Solr server
* @param isLookup
* @return the zookeeper quorum of the Solr server
* @throws ISLookUpException
*/
private static String getZkHost(ISLookUpService isLookup) throws ISLookUpException {
return doLookup(isLookup, "for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
}
private static String doLookup(ISLookUpService isLookup, String xquery) throws ISLookUpException {
log.info(String.format("running xquery: %s", xquery));
final String res = isLookup.getResourceProfileByQuery(xquery);
log.info(String.format("got response (100 chars): %s", StringUtils.left(res, 100) + " ..."));
return res;
}
}

View File

@ -0,0 +1,48 @@
package eu.dnetlib.dhp.graph;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.graph.utils.ContextMapper;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
public class SparkXmlRecordBuilderJob {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkXmlRecordBuilderJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_params_build_adjacency_lists.json")));
parser.parseArgument(args);
final String master = parser.get("master");
final SparkConf conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
try(SparkSession spark = getSession(conf, master)) {
final String inputPath = parser.get("sourcePath");
final String outputPath = parser.get("outputPath");
final String isLookupUrl = parser.get("isLookupUrl");
final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
if (fs.exists(new Path(outputPath))) {
fs.delete(new Path(outputPath), true);
fs.mkdirs(new Path(outputPath));
}
new GraphJoiner(spark, ContextMapper.fromIS(isLookupUrl), inputPath, outputPath)
.adjacencyLists();
}
}
private static SparkSession getSession(SparkConf conf, String master) {
return SparkSession
.builder()
.config(conf)
.appName(SparkXmlRecordBuilderJob.class.getSimpleName())
.master(master)
.getOrCreate();
}
}

View File

@ -0,0 +1,54 @@
package eu.dnetlib.dhp.graph.model;
import java.io.Serializable;
public class EntityRelEntity implements Serializable {
private TypedRow source;
private TypedRow relation;
private TypedRow target;
public EntityRelEntity() {
}
public EntityRelEntity(TypedRow source) {
this.source = source;
}
//helpers
public Boolean hasMainEntity() {
return getSource() != null & getRelation() == null & getTarget() == null;
}
public Boolean hasRelatedEntity() {
return getSource() == null & getRelation() != null & getTarget() != null;
}
public TypedRow getSource() {
return source;
}
public EntityRelEntity setSource(TypedRow source) {
this.source = source;
return this;
}
public TypedRow getRelation() {
return relation;
}
public EntityRelEntity setRelation(TypedRow relation) {
this.relation = relation;
return this;
}
public TypedRow getTarget() {
return target;
}
public EntityRelEntity setTarget(TypedRow target) {
this.target = target;
return this;
}
}

View File

@ -0,0 +1,41 @@
package eu.dnetlib.dhp.graph.model;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import java.io.Serializable;
public class JoinedEntity implements Serializable {
private String type;
private OafEntity entity;
private Links links;
public String getType() {
return type;
}
public JoinedEntity setType(String type) {
this.type = type;
return this;
}
public OafEntity getEntity() {
return entity;
}
public JoinedEntity setEntity(OafEntity entity) {
this.entity = entity;
return this;
}
public Links getLinks() {
return links;
}
public JoinedEntity setLinks(Links links) {
this.links = links;
return this;
}
}

View File

@ -0,0 +1,6 @@
package eu.dnetlib.dhp.graph.model;
import java.util.ArrayList;
public class Links extends ArrayList<Tuple2> {
}

View File

@ -0,0 +1,257 @@
package eu.dnetlib.dhp.graph.model;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
public class RelatedEntity implements Serializable {
private String id;
private String type;
// common fields
private StructuredProperty title;
private String websiteurl; // datasource, organizations, projects
// results
private String dateofacceptance;
private String publisher;
private List<StructuredProperty> pid;
private String codeRepositoryUrl;
private Qualifier resulttype;
private List<KeyValue> collectedfrom;
private List<Instance> instances;
// datasource
private String officialname;
private Qualifier datasourcetype;
private Qualifier datasourcetypeui;
private Qualifier openairecompatibility;
//private String aggregatortype;
// organization
private String legalname;
private String legalshortname;
private Qualifier country;
// project
private String projectTitle;
private String code;
private String acronym;
private Qualifier contracttype;
private List<String> fundingtree;
public String getId() {
return id;
}
public RelatedEntity setId(String id) {
this.id = id;
return this;
}
public StructuredProperty getTitle() {
return title;
}
public RelatedEntity setTitle(StructuredProperty title) {
this.title = title;
return this;
}
public String getDateofacceptance() {
return dateofacceptance;
}
public RelatedEntity setDateofacceptance(String dateofacceptance) {
this.dateofacceptance = dateofacceptance;
return this;
}
public String getPublisher() {
return publisher;
}
public RelatedEntity setPublisher(String publisher) {
this.publisher = publisher;
return this;
}
public List<StructuredProperty> getPid() {
return pid;
}
public RelatedEntity setPid(List<StructuredProperty> pid) {
this.pid = pid;
return this;
}
public String getCodeRepositoryUrl() {
return codeRepositoryUrl;
}
public RelatedEntity setCodeRepositoryUrl(String codeRepositoryUrl) {
this.codeRepositoryUrl = codeRepositoryUrl;
return this;
}
public Qualifier getResulttype() {
return resulttype;
}
public RelatedEntity setResulttype(Qualifier resulttype) {
this.resulttype = resulttype;
return this;
}
public List<KeyValue> getCollectedfrom() {
return collectedfrom;
}
public RelatedEntity setCollectedfrom(List<KeyValue> collectedfrom) {
this.collectedfrom = collectedfrom;
return this;
}
public List<Instance> getInstances() {
return instances;
}
public RelatedEntity setInstances(List<Instance> instances) {
this.instances = instances;
return this;
}
public String getOfficialname() {
return officialname;
}
public RelatedEntity setOfficialname(String officialname) {
this.officialname = officialname;
return this;
}
public String getWebsiteurl() {
return websiteurl;
}
public RelatedEntity setWebsiteurl(String websiteurl) {
this.websiteurl = websiteurl;
return this;
}
public Qualifier getDatasourcetype() {
return datasourcetype;
}
public RelatedEntity setDatasourcetype(Qualifier datasourcetype) {
this.datasourcetype = datasourcetype;
return this;
}
public Qualifier getDatasourcetypeui() {
return datasourcetypeui;
}
public RelatedEntity setDatasourcetypeui(Qualifier datasourcetypeui) {
this.datasourcetypeui = datasourcetypeui;
return this;
}
public Qualifier getOpenairecompatibility() {
return openairecompatibility;
}
public RelatedEntity setOpenairecompatibility(Qualifier openairecompatibility) {
this.openairecompatibility = openairecompatibility;
return this;
}
public String getLegalname() {
return legalname;
}
public RelatedEntity setLegalname(String legalname) {
this.legalname = legalname;
return this;
}
public String getLegalshortname() {
return legalshortname;
}
public RelatedEntity setLegalshortname(String legalshortname) {
this.legalshortname = legalshortname;
return this;
}
public Qualifier getCountry() {
return country;
}
public RelatedEntity setCountry(Qualifier country) {
this.country = country;
return this;
}
public String getCode() {
return code;
}
public RelatedEntity setCode(String code) {
this.code = code;
return this;
}
public String getAcronym() {
return acronym;
}
public RelatedEntity setAcronym(String acronym) {
this.acronym = acronym;
return this;
}
public Qualifier getContracttype() {
return contracttype;
}
public RelatedEntity setContracttype(Qualifier contracttype) {
this.contracttype = contracttype;
return this;
}
public List<String> getFundingtree() {
return fundingtree;
}
public RelatedEntity setFundingtree(List<String> fundingtree) {
this.fundingtree = fundingtree;
return this;
}
public String getProjectTitle() {
return projectTitle;
}
public RelatedEntity setProjectTitle(String projectTitle) {
this.projectTitle = projectTitle;
return this;
}
public String getType() {
return type;
}
public RelatedEntity setType(String type) {
this.type = type;
return this;
}
}

View File

@ -0,0 +1,28 @@
package eu.dnetlib.dhp.graph.model;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class Tuple2 {
private Relation relation;
private RelatedEntity relatedEntity;
public Relation getRelation() {
return relation;
}
public Tuple2 setRelation(Relation relation) {
this.relation = relation;
return this;
}
public RelatedEntity getRelatedEntity() {
return relatedEntity;
}
public Tuple2 setRelatedEntity(RelatedEntity relatedEntity) {
this.relatedEntity = relatedEntity;
return this;
}
}

View File

@ -0,0 +1,61 @@
package eu.dnetlib.dhp.graph.model;
import java.io.Serializable;
public class TypedRow implements Serializable {
private String sourceId;
private String targetId;
private Boolean deleted;
private String type;
private String oaf;
public String getSourceId() {
return sourceId;
}
public TypedRow setSourceId(String sourceId) {
this.sourceId = sourceId;
return this;
}
public String getTargetId() {
return targetId;
}
public TypedRow setTargetId(String targetId) {
this.targetId = targetId;
return this;
}
public Boolean getDeleted() {
return deleted;
}
public TypedRow setDeleted(Boolean deleted) {
this.deleted = deleted;
return this;
}
public String getType() {
return type;
}
public TypedRow setType(String type) {
this.type = type;
return this;
}
public String getOaf() {
return oaf;
}
public TypedRow setOaf(String oaf) {
this.oaf = oaf;
return this;
}
}

View File

@ -0,0 +1,51 @@
package eu.dnetlib.dhp.graph.utils;
import java.io.Serializable;
public class ContextDef implements Serializable {
private String id;
private String label;
private String name;
private String type;
public ContextDef(final String id, final String label, final String name, final String type) {
super();
this.setId(id);
this.setLabel(label);
this.setName(name);
this.setType(type);
}
public String getLabel() {
return label;
}
public void setLabel(final String label) {
this.label = label;
}
public String getId() {
return id;
}
public void setId(final String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public String getType() {
return type;
}
public void setType(final String type) {
this.type = type;
}
}

View File

@ -0,0 +1,45 @@
package eu.dnetlib.dhp.graph.utils;
import com.google.common.base.Joiner;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import java.io.Serializable;
import java.io.StringReader;
import java.util.HashMap;
public class ContextMapper extends HashMap<String, ContextDef> implements Serializable {
private static final long serialVersionUID = 2159682308502487305L;
private final static String XQUERY = "for $x in //RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='ContextDSResourceType']//*[name()='context' or name()='category' or name()='concept'] return <entry id=\"{$x/@id}\" label=\"{$x/@label|$x/@name}\" name=\"{$x/name()}\" type=\"{$x/@type}\"/>";
public static ContextMapper fromIS(final String isLookupUrl) throws DocumentException, ISLookUpException {
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
StringBuilder sb = new StringBuilder("<ContextDSResources>");
Joiner.on("").appendTo(sb, isLookUp.quickSearchProfile(XQUERY));
sb.append("</ContextDSResources>");
return fromXml(sb.toString());
}
public static ContextMapper fromXml(final String xml) throws DocumentException {
final ContextMapper contextMapper = new ContextMapper();
final Document doc = new SAXReader().read(new StringReader(xml));
for (Object o : doc.selectNodes("//entry")) {
Node node = (Node) o;
String id = node.valueOf("./@id");
String label = node.valueOf("./@label");
String name = node.valueOf("./@name");
String type = node.valueOf("./@type") + "";
contextMapper.put(id, new ContextDef(id, label, name, type));
}
return contextMapper;
}
}

View File

@ -0,0 +1,254 @@
package eu.dnetlib.dhp.graph.utils;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.graph.model.EntityRelEntity;
import eu.dnetlib.dhp.graph.model.RelatedEntity;
import eu.dnetlib.dhp.graph.model.TypedRow;
import eu.dnetlib.dhp.schema.oaf.*;
import net.minidev.json.JSONArray;
import org.apache.commons.lang3.StringUtils;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.*;
public class GraphMappingUtils {
public enum EntityType {
publication, dataset, otherresearchproduct, software, datasource, organization, project
}
public enum MainEntityType {
result, datasource, organization, project
}
public static Set<String> authorPidTypes = Sets.newHashSet("orcid", "magidentifier");
public static Set<String> instanceFieldFilter = Sets.newHashSet("instancetype", "hostedby", "license", "accessright", "collectedfrom", "dateofacceptance", "distributionlocation");
private static BiMap<String, String> relClassMapping = HashBiMap.create();
static {
relClassMapping.put("isAuthorInstitutionOf", "hasAuthorInstitution");
relClassMapping.put("isMergedIn", "merges");
relClassMapping.put("isProducedBy", "produces");
relClassMapping.put("hasParticipant", "isParticipant");
relClassMapping.put("isProvidedBy", "provides");
relClassMapping.put("isRelatedTo", "isRelatedTo");
relClassMapping.put("isAmongTopNSimilarDocuments", "hasAmongTopNSimilarDocuments");
relClassMapping.put("isRelatedTo", "isRelatedTo");
relClassMapping.put("isSupplementTo", "isSupplementedBy");
}
public static String getInverseRelClass(final String relClass) {
String res = relClassMapping.get(relClass);
if (isNotBlank(res)) {
return res;
}
res = relClassMapping.inverse().get(relClass);
if (isNotBlank(res)) {
return res;
}
throw new IllegalArgumentException("unable to find an inverse relationship class for term: " + relClass);
}
private static final String schemeTemplate = "dnet:%s_%s_relations";
private static Map<EntityType, MainEntityType> entityMapping = Maps.newHashMap();
static {
entityMapping.put(EntityType.publication, MainEntityType.result);
entityMapping.put(EntityType.dataset, MainEntityType.result);
entityMapping.put(EntityType.otherresearchproduct, MainEntityType.result);
entityMapping.put(EntityType.software, MainEntityType.result);
entityMapping.put(EntityType.datasource, MainEntityType.datasource);
entityMapping.put(EntityType.organization, MainEntityType.organization);
entityMapping.put(EntityType.project, MainEntityType.project);
}
public static String getScheme(final String sourceType, final String targetType) {
return String.format(schemeTemplate,
entityMapping.get(EntityType.valueOf(sourceType)).name(),
entityMapping.get(EntityType.valueOf(targetType)).name());
}
public static String getMainType(final String type) {
return entityMapping.get(EntityType.valueOf(type)).name();
}
public static boolean isResult(String type) {
return MainEntityType.result.name().equals(getMainType(type));
}
public static Predicate<String> instanceFilter = s -> instanceFieldFilter.contains(s);
public static EntityRelEntity asRelatedEntity(EntityRelEntity e) {
final DocumentContext j = JsonPath.parse(e.getSource().getOaf());
final RelatedEntity re = new RelatedEntity().setId(j.read("$.id")).setType(e.getSource().getType());
switch (EntityType.valueOf(e.getSource().getType())) {
case publication:
case dataset:
case otherresearchproduct:
case software:
mapTitle(j, re);
re.setDateofacceptance(j.read("$.dateofacceptance.value"));
re.setPublisher(j.read("$.publisher.value"));
JSONArray pids = j.read("$.pid");
re.setPid(pids.stream()
.map(p -> asStructuredProperty((LinkedHashMap<String, Object>) p))
.collect(Collectors.toList()));
re.setResulttype(asQualifier(j.read("$.resulttype")));
JSONArray collfrom = j.read("$.collectedfrom");
re.setCollectedfrom(collfrom.stream()
.map(c -> asKV((LinkedHashMap<String, Object>) c))
.collect(Collectors.toList()));
// will throw exception when the instance is not found
JSONArray instances = j.read("$.instance");
re.setInstances(instances.stream()
.map(i -> {
final LinkedHashMap<String, Object> p = (LinkedHashMap<String, Object>) i;
final Field<String> license = new Field<String>();
license.setValue((String) ((LinkedHashMap<String, Object>) p.get("license")).get("value"));
final Instance instance = new Instance();
instance.setLicense(license);
instance.setAccessright(asQualifier((LinkedHashMap<String, String>) p.get("accessright")));
instance.setInstancetype(asQualifier((LinkedHashMap<String, String>) p.get("instancetype")));
instance.setHostedby(asKV((LinkedHashMap<String, Object>) p.get("hostedby")));
//TODO mapping of distributionlocation
instance.setCollectedfrom(asKV((LinkedHashMap<String, Object>) p.get("collectedfrom")));
Field<String> dateofacceptance = new Field<String>();
dateofacceptance.setValue((String) ((LinkedHashMap<String, Object>) p.get("dateofacceptance")).get("value"));
instance.setDateofacceptance(dateofacceptance);
return instance;
}).collect(Collectors.toList()));
//TODO still to be mapped
//re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));
break;
case datasource:
re.setOfficialname(j.read("$.officialname.value"));
re.setWebsiteurl(j.read("$.websiteurl.value"));
re.setDatasourcetype(asQualifier(j.read("$.datasourcetype")));
re.setOpenairecompatibility(asQualifier(j.read("$.openairecompatibility")));
break;
case organization:
re.setLegalname(j.read("$.legalname.value"));
re.setLegalshortname(j.read("$.legalshortname.value"));
re.setCountry(asQualifier(j.read("$.country")));
break;
case project:
re.setProjectTitle(j.read("$.title.value"));
re.setCode(j.read("$.code.value"));
re.setAcronym(j.read("$.acronym.value"));
re.setContracttype(asQualifier(j.read("$.contracttype")));
JSONArray f = j.read("$.fundingtree");
if (!f.isEmpty()) {
re.setFundingtree(f.stream()
.map(s -> ((LinkedHashMap<String, String>) s).get("value"))
.collect(Collectors.toList()));
}
break;
}
return new EntityRelEntity().setSource(
new TypedRow()
.setSourceId(e.getSource().getSourceId())
.setDeleted(e.getSource().getDeleted())
.setType(e.getSource().getType())
.setOaf(serialize(re)));
}
private static KeyValue asKV(LinkedHashMap<String, Object> j) {
final KeyValue kv = new KeyValue();
kv.setKey((String) j.get("key"));
kv.setValue((String) j.get("value"));
return kv;
}
private static void mapTitle(DocumentContext j, RelatedEntity re) {
final JSONArray a = j.read("$.title");
if (!a.isEmpty()) {
final StructuredProperty sp = asStructuredProperty((LinkedHashMap<String, Object>) a.get(0));
if (StringUtils.isNotBlank(sp.getValue())) {
re.setTitle(sp);
}
}
}
private static StructuredProperty asStructuredProperty(LinkedHashMap<String, Object> j) {
final StructuredProperty sp = new StructuredProperty();
final String value = (String) j.get("value");
if (StringUtils.isNotBlank(value)) {
sp.setValue((String) j.get("value"));
sp.setQualifier(asQualifier((LinkedHashMap<String, String>) j.get("qualifier")));
}
return sp;
}
public static Qualifier asQualifier(LinkedHashMap<String, String> j) {
final Qualifier q = new Qualifier();
final String classid = j.get("classid");
if (StringUtils.isNotBlank(classid)) {
q.setClassid(classid);
}
final String classname = j.get("classname");
if (StringUtils.isNotBlank(classname)) {
q.setClassname(classname);
}
final String schemeid = j.get("schemeid");
if (StringUtils.isNotBlank(schemeid)) {
q.setSchemeid(schemeid);
}
final String schemename = j.get("schemename");
if (StringUtils.isNotBlank(schemename)) {
q.setSchemename(schemename);
}
return q;
}
public static String serialize(final Object o) {
try {
return new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.writeValueAsString(o);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("unable to serialize: " + o.toString(), e);
}
}
public static String removePrefix(final String s) {
if (s.contains("|")) return substringAfter(s, "|");
return s;
}
}

View File

@ -0,0 +1,24 @@
package eu.dnetlib.dhp.graph.utils;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
public class ISLookupClientFactory {
private static final Log log = LogFactory.getLog(ISLookupClientFactory.class);
public static ISLookUpService getLookUpService(final String isLookupUrl) {
return getServiceStub(ISLookUpService.class, isLookupUrl);
}
@SuppressWarnings("unchecked")
private static <T> T getServiceStub(final Class<T> clazz, final String endpoint) {
log.info(String.format("creating %s stub from %s", clazz.getName(), endpoint));
final JaxWsProxyFactoryBean jaxWsProxyFactory = new JaxWsProxyFactoryBean();
jaxWsProxyFactory.setServiceClass(clazz);
jaxWsProxyFactory.setAddress(endpoint);
return (T) jaxWsProxyFactory.create();
}
}

View File

@ -0,0 +1,49 @@
package eu.dnetlib.dhp.graph.utils;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import java.util.Comparator;
public class LicenseComparator implements Comparator<Qualifier> {
@Override
public int compare(Qualifier left, Qualifier right) {
if (left == null && right == null) return 0;
if (left == null) return 1;
if (right == null) return -1;
String lClass = left.getClassid();
String rClass = right.getClassid();
if (lClass.equals(rClass)) return 0;
if (lClass.equals("OPEN SOURCE")) return -1;
if (rClass.equals("OPEN SOURCE")) return 1;
if (lClass.equals("OPEN")) return -1;
if (rClass.equals("OPEN")) return 1;
if (lClass.equals("6MONTHS")) return -1;
if (rClass.equals("6MONTHS")) return 1;
if (lClass.equals("12MONTHS")) return -1;
if (rClass.equals("12MONTHS")) return 1;
if (lClass.equals("EMBARGO")) return -1;
if (rClass.equals("EMBARGO")) return 1;
if (lClass.equals("RESTRICTED")) return -1;
if (rClass.equals("RESTRICTED")) return 1;
if (lClass.equals("CLOSED")) return -1;
if (rClass.equals("CLOSED")) return 1;
if (lClass.equals("UNKNOWN")) return -1;
if (rClass.equals("UNKNOWN")) return 1;
// Else (but unlikely), lexicographical ordering will do.
return lClass.compareTo(rClass);
}
}

View File

@ -0,0 +1,253 @@
package eu.dnetlib.dhp.graph.utils;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import javax.xml.stream.*;
import javax.xml.stream.events.Namespace;
import javax.xml.stream.events.StartElement;
import javax.xml.stream.events.XMLEvent;
import com.google.common.collect.Lists;
import org.apache.solr.common.SolrInputDocument;
/**
* Optimized version of the document parser, drop in replacement of InputDocumentFactory.
*
* <p>
* Faster because:
* </p>
* <ul>
* <li>Doesn't create a DOM for the full document</li>
* <li>Doesn't execute xpaths agains the DOM</li>
* <li>Quickly serialize the 'result' element directly in a string.</li>
* <li>Uses less memory: less pressure on GC and allows more threads to process this in parallel</li>
* </ul>
*
* <p>
* This class is fully reentrant and can be invoked in parallel.
* </p>
*
* @author claudio
*
*/
public class StreamingInputDocumentFactory {
private static final String INDEX_FIELD_PREFIX = "__";
private static final String DS_VERSION = INDEX_FIELD_PREFIX + "dsversion";
private static final String DS_ID = INDEX_FIELD_PREFIX + "dsid";
private static final String RESULT = "result";
private static final String INDEX_RESULT = INDEX_FIELD_PREFIX + RESULT;
private static final String INDEX_RECORD_ID = INDEX_FIELD_PREFIX + "indexrecordidentifier";
private static final String outFormat = new String("yyyy-MM-dd'T'hh:mm:ss'Z'");
private final static List<String> dateFormats = Arrays.asList("yyyy-MM-dd'T'hh:mm:ss", "yyyy-MM-dd", "dd-MM-yyyy", "dd/MM/yyyy", "yyyy");
private static final String DEFAULTDNETRESULT = "dnetResult";
private static final String TARGETFIELDS = "targetFields";
private static final String INDEX_RECORD_ID_ELEMENT = "indexRecordIdentifier";
private static final String ROOT_ELEMENT = "indexRecord";
private static final int MAX_FIELD_LENGTH = 25000;
private ThreadLocal<XMLInputFactory> inputFactory = ThreadLocal.withInitial(() -> XMLInputFactory.newInstance());
private ThreadLocal<XMLOutputFactory> outputFactory = ThreadLocal.withInitial(() -> XMLOutputFactory.newInstance());
private ThreadLocal<XMLEventFactory> eventFactory = ThreadLocal.withInitial(() -> XMLEventFactory.newInstance());
private String version;
private String dsId;
private String resultName = DEFAULTDNETRESULT;
public StreamingInputDocumentFactory(final String version, final String dsId) {
this(version, dsId, DEFAULTDNETRESULT);
}
public StreamingInputDocumentFactory(final String version, final String dsId, final String resultName) {
this.version = version;
this.dsId = dsId;
this.resultName = resultName;
}
public SolrInputDocument parseDocument(final String inputDocument) {
final StringWriter results = new StringWriter();
final List<Namespace> nsList = Lists.newLinkedList();
try {
XMLEventReader parser = inputFactory.get().createXMLEventReader(new StringReader(inputDocument));
final SolrInputDocument indexDocument = new SolrInputDocument(new HashMap<>());
while (parser.hasNext()) {
final XMLEvent event = parser.nextEvent();
if ((event != null) && event.isStartElement()) {
final String localName = event.asStartElement().getName().getLocalPart();
if (ROOT_ELEMENT.equals(localName)) {
nsList.addAll(getNamespaces(event));
} else if (INDEX_RECORD_ID_ELEMENT.equals(localName)) {
final XMLEvent text = parser.nextEvent();
String recordId = getText(text);
indexDocument.addField(INDEX_RECORD_ID, recordId);
} else if (TARGETFIELDS.equals(localName)) {
parseTargetFields(indexDocument, parser);
} else if (resultName.equals(localName)) {
copyResult(indexDocument, results, parser, nsList, resultName);
}
}
}
if (version != null) {
indexDocument.addField(DS_VERSION, version);
}
if (dsId != null) {
indexDocument.addField(DS_ID, dsId);
}
if (!indexDocument.containsKey(INDEX_RECORD_ID)) {
indexDocument.clear();
System.err.println("missing indexrecord id:\n" + inputDocument);
}
return indexDocument;
} catch (XMLStreamException e) {
return new SolrInputDocument();
}
}
private List<Namespace> getNamespaces(final XMLEvent event) {
final List<Namespace> res = Lists.newLinkedList();
@SuppressWarnings("unchecked")
Iterator<Namespace> nsIter = event.asStartElement().getNamespaces();
while (nsIter.hasNext()) {
Namespace ns = nsIter.next();
res.add(ns);
}
return res;
}
/**
* Parse the targetFields block and add fields to the solr document.
*
* @param indexDocument
* @param parser
* @throws XMLStreamException
*/
protected void parseTargetFields(final SolrInputDocument indexDocument, final XMLEventReader parser) throws XMLStreamException {
boolean hasFields = false;
while (parser.hasNext()) {
final XMLEvent targetEvent = parser.nextEvent();
if (targetEvent.isEndElement() && targetEvent.asEndElement().getName().getLocalPart().equals(TARGETFIELDS)) {
break;
}
if (targetEvent.isStartElement()) {
final String fieldName = targetEvent.asStartElement().getName().getLocalPart();
final XMLEvent text = parser.nextEvent();
String data = getText(text);
addField(indexDocument, fieldName, data);
hasFields = true;
}
}
if (!hasFields) {
indexDocument.clear();
}
}
/**
* Copy the /indexRecord/result element and children, preserving namespace declarations etc.
*
* @param indexDocument
* @param results
* @param parser
* @param nsList
* @throws XMLStreamException
*/
protected void copyResult(final SolrInputDocument indexDocument,
final StringWriter results,
final XMLEventReader parser,
final List<Namespace> nsList,
final String dnetResult) throws XMLStreamException {
final XMLEventWriter writer = outputFactory.get().createXMLEventWriter(results);
for (Namespace ns : nsList) {
eventFactory.get().createNamespace(ns.getPrefix(), ns.getNamespaceURI());
}
StartElement newRecord = eventFactory.get().createStartElement("", null, RESULT, null, nsList.iterator());
// new root record
writer.add(newRecord);
// copy the rest as it is
while (parser.hasNext()) {
final XMLEvent resultEvent = parser.nextEvent();
// TODO: replace with depth tracking instead of close tag tracking.
if (resultEvent.isEndElement() && resultEvent.asEndElement().getName().getLocalPart().equals(dnetResult)) {
writer.add(eventFactory.get().createEndElement("", null, RESULT));
break;
}
writer.add(resultEvent);
}
writer.close();
indexDocument.addField(INDEX_RESULT, results.toString());
}
/**
* Helper used to add a field to a solr doc. It avoids to add empy fields
*
* @param indexDocument
* @param field
* @param value
*/
private final void addField(final SolrInputDocument indexDocument, final String field, final String value) {
String cleaned = value.trim();
if (!cleaned.isEmpty()) {
// log.info("\n\n adding field " + field.toLowerCase() + " value: " + cleaned + "\n");
indexDocument.addField(field.toLowerCase(), cleaned);
}
}
/**
* Helper used to get the string from a text element.
*
* @param text
* @return the
*/
protected final String getText(final XMLEvent text) {
if (text.isEndElement()) // log.warn("skipping because isEndOfElement " + text.asEndElement().getName().getLocalPart());
return "";
final String data = text.asCharacters().getData();
if (data != null && data.length() > MAX_FIELD_LENGTH) {
return data.substring(0, MAX_FIELD_LENGTH);
}
return data;
}
}

View File

@ -0,0 +1,107 @@
package eu.dnetlib.dhp.graph.utils;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import org.apache.commons.lang3.StringUtils;
import org.stringtemplate.v4.ST;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.graph.utils.GraphMappingUtils.removePrefix;
import static eu.dnetlib.dhp.graph.utils.XmlSerializationUtils.escapeXml;
public class TemplateFactory {
private TemplateResources resources;
private final static char DELIMITER = '$';
public TemplateFactory() {
try {
resources = new TemplateResources();
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
public String buildBody(final String type, final List<String> metadata, final List<String> rels, final List<String> children, final List<String> extraInfo) {
ST body = getTemplate(resources.getEntity());
body.add("name", type);
body.add("metadata", metadata);
body.add("rels", rels);
body.add("children", children);
body.add("extrainfo", extraInfo);
return body.render();
}
public String getChild(final String name, final String id, final List<String> metadata) {
return getTemplate(resources.getChild())
.add("name", name)
.add("hasId", !(id == null))
.add("id", id != null ? escapeXml(removePrefix(id)) : "")
.add("metadata", metadata)
.render();
}
public String buildRecord(
final OafEntity entity,
final String schemaLocation,
final String body) {
return getTemplate(resources.getRecord())
.add("id", escapeXml(removePrefix(entity.getId())))
.add("dateofcollection", entity.getDateofcollection())
.add("dateoftransformation", entity.getDateoftransformation())
.add("schemaLocation", schemaLocation)
.add("it", body)
.render();
}
public String getRel(final String type,
final String objIdentifier,
final Collection<String> fields,
final String semanticclass,
final String semantischeme,
final DataInfo info) {
return getTemplate(resources.getRel())
.add("type", type)
.add("objIdentifier", escapeXml(removePrefix(objIdentifier)))
.add("class", semanticclass)
.add("scheme", semantischeme)
.add("metadata", fields)
.add("inferred", info.getInferred())
.add("trust", info.getTrust())
.add("inferenceprovenance", info.getInferenceprovenance())
.add("provenanceaction", info.getProvenanceaction() != null ? info.getProvenanceaction().getClassid() : "")
.render();
}
public String getInstance(final String resultId, final List<String> instancemetadata, final List<String> webresources) {
return getTemplate(resources.getInstance())
.add("instanceId", escapeXml(removePrefix(resultId)))
.add("metadata", instancemetadata)
.add("webresources", webresources
.stream()
.filter(StringUtils::isNotBlank)
.map(w -> getWebResource(w))
.collect(Collectors.toList()))
.render();
}
private String getWebResource(final String identifier) {
return getTemplate(resources.getWebresource())
.add("identifier", escapeXml(identifier))
.render();
}
// HELPERS
private ST getTemplate(final String res) {
return new ST(res, DELIMITER, DELIMITER);
}
}

View File

@ -0,0 +1,54 @@
package eu.dnetlib.dhp.graph.utils;
import com.google.common.io.Resources;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class TemplateResources {
private String record = read("eu/dnetlib/dhp/graph/template/record.st");
private String instance = read("eu/dnetlib/dhp/graph/template/instance.st");
private String rel = read("eu/dnetlib/dhp/graph/template/rel.st");
private String webresource = read("eu/dnetlib/dhp/graph/template/webresource.st");
private String child = read("eu/dnetlib/dhp/graph/template/child.st");
private String entity = read("eu/dnetlib/dhp/graph/template/entity.st");
private static String read(final String classpathResource) throws IOException {
return Resources.toString(Resources.getResource(classpathResource), StandardCharsets.UTF_8);
}
public TemplateResources() throws IOException {
}
public String getEntity() {
return entity;
}
public String getRecord() {
return record;
}
public String getInstance() {
return instance;
}
public String getRel() {
return rel;
}
public String getWebresource() {
return webresource;
}
public String getChild() {
return child;
}
}

View File

@ -0,0 +1,962 @@
package eu.dnetlib.dhp.graph.utils;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.mycila.xmltool.XMLDoc;
import com.mycila.xmltool.XMLTag;
import eu.dnetlib.dhp.graph.model.JoinedEntity;
import eu.dnetlib.dhp.graph.model.RelatedEntity;
import eu.dnetlib.dhp.graph.model.Tuple2;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.*;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.Node;
import org.dom4j.io.OutputFormat;
import org.dom4j.io.SAXReader;
import org.dom4j.io.XMLWriter;
import javax.xml.transform.*;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import java.io.IOException;
import java.io.Serializable;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.graph.utils.GraphMappingUtils.*;
import static eu.dnetlib.dhp.graph.utils.XmlSerializationUtils.*;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.substringBefore;
public class XmlRecordFactory implements Serializable {
private Set<String> specialDatasourceTypes;
private ContextMapper contextMapper;
private String schemaLocation;
private Set<String> contextes = Sets.newHashSet();
private boolean indent = false;
public XmlRecordFactory(
final ContextMapper contextMapper, final boolean indent,
final String schemaLocation, final Set<String> otherDatasourceTypesUForUI) {
this.contextMapper = contextMapper;
this.schemaLocation = schemaLocation;
this.specialDatasourceTypes = otherDatasourceTypesUForUI;
this.indent = indent;
}
public String build(final JoinedEntity je) {
final OafEntity entity = je.getEntity();
TemplateFactory templateFactory = new TemplateFactory();
try {
final List<String> metadata = metadata(je.getType(), entity);
// rels has to be processed before the contexts because they enrich the contextMap with the funding info.
final List<String> relations = listRelations(je, templateFactory);
metadata.addAll(buildContexts(getMainType(je.getType())));
metadata.add(parseDataInfo(entity.getDataInfo()));
final String body = templateFactory.buildBody(
getMainType(je.getType()),
metadata,
relations,
listChildren(je, templateFactory), listExtraInfo(je));
return printXML(templateFactory.buildRecord(entity, schemaLocation, body), indent);
} catch (final Throwable e) {
throw new RuntimeException(String.format("error building record '%s'", entity.getId()), e);
}
}
private String printXML(String xml, boolean indent) {
try {
final Document doc = new SAXReader().read(new StringReader(xml));
OutputFormat format = indent ? OutputFormat.createPrettyPrint() : OutputFormat.createCompactFormat();
format.setExpandEmptyElements(false);
format.setSuppressDeclaration(true);
StringWriter sw = new StringWriter();
XMLWriter writer = new XMLWriter(sw, format);
writer.write(doc);
return sw.toString();
} catch (IOException | DocumentException e) {
throw new IllegalArgumentException("Unable to indent XML. Invalid record:\n" + xml, e);
}
}
private List<String> metadata(final String type, final OafEntity entity) {
final List<String> metadata = Lists.newArrayList();
if (entity.getCollectedfrom() != null) {
metadata.addAll(entity.getCollectedfrom()
.stream()
.map(kv -> mapKeyValue("collectedfrom", kv))
.collect(Collectors.toList()));
}
if (entity.getOriginalId() != null) {
metadata.addAll(entity.getOriginalId()
.stream()
.map(s -> asXmlElement("originalId", s))
.collect(Collectors.toList()));
}
if (entity.getPid() != null) {
metadata.addAll(entity.getPid()
.stream()
.map(p -> mapStructuredProperty("pid", p))
.collect(Collectors.toList()));
}
if (GraphMappingUtils.isResult(type)) {
final Result r = (Result) entity;
if (r.getTitle() != null) {
metadata.addAll(r.getTitle()
.stream()
.map(t -> mapStructuredProperty("title", t))
.collect(Collectors.toList()));
}
if (r.getBestaccessright() != null) {
metadata.add(mapQualifier("bestaccessright", r.getBestaccessright()));
}
if (r.getAuthor() != null) {
metadata.addAll(r.getAuthor()
.stream()
.map(a -> {
final StringBuilder sb = new StringBuilder("<creator rank=\"" + a.getRank() + "\"");
if (isNotBlank(a.getName())) {
sb.append(" name=\"" + escapeXml(a.getName()) + "\"");
}
if (isNotBlank(a.getSurname())) {
sb.append(" surname=\"" + escapeXml(a.getSurname()) + "\"");
}
if (a.getPid() != null) {
a.getPid().stream()
.filter(sp -> isNotBlank(sp.getQualifier().getClassid()) && isNotBlank(sp.getValue()))
.forEach(sp -> {
String pidType = escapeXml(sp.getQualifier().getClassid()).replaceAll("\\W", "");
String pidValue = escapeXml(sp.getValue());
// ugly hack: some records provide swapped pidtype and pidvalue
if (authorPidTypes.contains(pidValue.toLowerCase().trim())) {
sb.append(String.format(" %s=\"%s\"", pidValue, pidType));
} else {
pidType = pidType.replaceAll("\\W", "").replaceAll("\\d", "");
if (isNotBlank(pidType)) {
sb.append(String.format(" %s=\"%s\"",
pidType,
pidValue.toLowerCase().replaceAll("orcid", "")));
}
}
});
}
sb.append(">" + escapeXml(a.getFullname()) + "</creator>");
return sb.toString();
}).collect(Collectors.toList()));
}
if (r.getContributor() != null) {
metadata.addAll(r.getContributor()
.stream()
.map(c -> asXmlElement("contributor", c.getValue()))
.collect(Collectors.toList()));
}
if (r.getCountry() != null) {
metadata.addAll(r.getCountry()
.stream()
.map(c -> mapQualifier("country", c))
.collect(Collectors.toList()));
}
if (r.getCoverage() != null) {
metadata.addAll(r.getCoverage()
.stream()
.map(c -> asXmlElement("coverage", c.getValue()))
.collect(Collectors.toList()));
}
if (r.getDateofacceptance() != null) {
metadata.add(asXmlElement("dateofacceptance", r.getDateofacceptance().getValue()));
}
if (r.getDescription() != null) {
metadata.addAll(r.getDescription()
.stream()
.map(c -> asXmlElement("description", c.getValue()))
.collect(Collectors.toList()));
}
if (r.getEmbargoenddate() != null) {
metadata.add(asXmlElement("embargoenddate", r.getEmbargoenddate().getValue()));
}
if (r.getSubject() != null) {
metadata.addAll(r.getSubject()
.stream()
.map(s -> mapStructuredProperty("subject", s))
.collect(Collectors.toList()));
}
if (r.getLanguage() != null) {
metadata.add(mapQualifier("language", r.getLanguage()));
}
if (r.getRelevantdate() != null) {
metadata.addAll(r.getRelevantdate()
.stream()
.map(s -> mapStructuredProperty("relevantdate", s))
.collect(Collectors.toList()));
}
if (r.getPublisher() != null) {
metadata.add(asXmlElement("publisher", r.getPublisher().getValue()));
}
if (r.getSource() != null) {
metadata.addAll(r.getSource()
.stream()
.map(c -> asXmlElement("source", c.getValue()))
.collect(Collectors.toList()));
}
if (r.getFormat() != null) {
metadata.addAll(r.getFormat()
.stream()
.map(c -> asXmlElement("format", c.getValue()))
.collect(Collectors.toList()));
}
if (r.getResulttype() != null) {
metadata.add(mapQualifier("resulttype", r.getResulttype()));
}
if (r.getResourcetype() != null) {
metadata.add(mapQualifier("resourcetype", r.getResourcetype()));
}
metadata.add(mapQualifier("bestaccessright", getBestAccessright(r)));
if (r.getContext() != null) {
contextes.addAll(r.getContext()
.stream()
.map(c -> c.getId())
.collect(Collectors.toList()));
if (contextes.contains("dh-ch::subcommunity::2")) {
contextes.add("clarin");
}
}
}
switch (EntityType.valueOf(type)) {
case publication:
final Publication pub = (Publication) entity;
if (pub.getJournal() != null) {
final Journal j = pub.getJournal();
metadata.add(mapJournal(j));
}
break;
case dataset:
final Dataset d = (Dataset) entity;
if (d.getDevice() != null) {
metadata.add(asXmlElement("device", d.getDevice().getValue()));
}
if (d.getLastmetadataupdate() != null) {
metadata.add(asXmlElement("lastmetadataupdate", d.getLastmetadataupdate().getValue()));
}
if (d.getMetadataversionnumber() != null) {
metadata.add(asXmlElement("metadataversionnumber", d.getMetadataversionnumber().getValue()));
}
if (d.getSize() != null) {
metadata.add(asXmlElement("size", d.getSize().getValue()));
}
if (d.getStoragedate() != null) {
metadata.add(asXmlElement("storagedate", d.getStoragedate().getValue()));
}
if (d.getVersion() != null) {
metadata.add(asXmlElement("version", d.getVersion().getValue()));
}
//TODO d.getGeolocation()
break;
case otherresearchproduct:
final OtherResearchProduct orp = (OtherResearchProduct) entity;
if (orp.getContactperson() != null) {
metadata.addAll(orp.getContactperson()
.stream()
.map(c -> asXmlElement("contactperson", c.getValue()))
.collect(Collectors.toList()));
}
if (orp.getContactgroup() != null) {
metadata.addAll(orp.getContactgroup()
.stream()
.map(c -> asXmlElement("contactgroup", c.getValue()))
.collect(Collectors.toList()));
}
if (orp.getTool() != null) {
metadata.addAll(orp.getTool()
.stream()
.map(c -> asXmlElement("tool", c.getValue()))
.collect(Collectors.toList()));
}
break;
case software:
final Software s = (Software) entity;
if (s.getDocumentationUrl() != null) {
metadata.addAll(s.getDocumentationUrl()
.stream()
.map(c -> asXmlElement("documentationUrl", c.getValue()))
.collect(Collectors.toList()));
}
if (s.getLicense() != null) {
metadata.addAll(s.getLicense()
.stream()
.map(l -> mapStructuredProperty("license", l))
.collect(Collectors.toList()));
}
if (s.getCodeRepositoryUrl() != null) {
metadata.add(asXmlElement("codeRepositoryUrl", s.getCodeRepositoryUrl().getValue()));
}
if (s.getProgrammingLanguage() != null) {
metadata.add(mapQualifier("programmingLanguage", s.getProgrammingLanguage()));
}
break;
case datasource:
final Datasource ds = (Datasource) entity;
if (ds.getDatasourcetype() != null) {
mapDatasourceType(metadata, ds.getDatasourcetype());
}
if (ds.getOpenairecompatibility() != null) {
metadata.add(mapQualifier("openairecompatibility", ds.getOpenairecompatibility()));
}
if (ds.getOfficialname() != null) {
metadata.add(asXmlElement("officialname", ds.getOfficialname().getValue()));
}
if (ds.getEnglishname() != null) {
metadata.add(asXmlElement("englishname", ds.getEnglishname().getValue()));
}
if (ds.getWebsiteurl() != null) {
metadata.add(asXmlElement("websiteurl", ds.getWebsiteurl().getValue()));
}
if (ds.getLogourl() != null) {
metadata.add(asXmlElement("logourl", ds.getLogourl().getValue()));
}
if (ds.getContactemail() != null) {
metadata.add(asXmlElement("contactemail", ds.getContactemail().getValue()));
}
if (ds.getNamespaceprefix() != null) {
metadata.add(asXmlElement("namespaceprefix", ds.getNamespaceprefix().getValue()));
}
if (ds.getLatitude() != null) {
metadata.add(asXmlElement("latitude", ds.getLatitude().getValue()));
}
if (ds.getLongitude() != null) {
metadata.add(asXmlElement("longitude", ds.getLongitude().getValue()));
}
if (ds.getDateofvalidation() != null) {
metadata.add(asXmlElement("dateofvalidation", ds.getDateofvalidation().getValue()));
}
if (ds.getDescription() != null) {
metadata.add(asXmlElement("description", ds.getDescription().getValue()));
}
if (ds.getOdnumberofitems() != null) {
metadata.add(asXmlElement("odnumberofitems", ds.getOdnumberofitems().getValue()));
}
if (ds.getOdnumberofitemsdate() != null) {
metadata.add(asXmlElement("odnumberofitemsdate", ds.getOdnumberofitemsdate().getValue()));
}
if (ds.getOdpolicies() != null) {
metadata.add(asXmlElement("odpolicies", ds.getOdpolicies().getValue()));
}
if (ds.getOdlanguages() != null) {
metadata.addAll(ds.getOdlanguages()
.stream()
.map(c -> asXmlElement("odlanguages", c.getValue()))
.collect(Collectors.toList()));
}
if (ds.getOdcontenttypes() != null) {
metadata.addAll(ds.getOdcontenttypes()
.stream()
.map(c -> asXmlElement("odcontenttypes", c.getValue()))
.collect(Collectors.toList()));
}
if (ds.getAccessinfopackage() != null) {
metadata.addAll(ds.getAccessinfopackage()
.stream()
.map(c -> asXmlElement("accessinfopackage", c.getValue()))
.collect(Collectors.toList()));
}
if (ds.getReleaseenddate() != null) {
metadata.add(asXmlElement("releasestartdate", ds.getReleaseenddate().getValue()));
}
if (ds.getReleaseenddate() != null) {
metadata.add(asXmlElement("releaseenddate", ds.getReleaseenddate().getValue()));
}
if (ds.getMissionstatementurl() != null) {
metadata.add(asXmlElement("missionstatementurl", ds.getMissionstatementurl().getValue()));
}
if (ds.getDataprovider() != null) {
metadata.add(asXmlElement("dataprovider", ds.getDataprovider().getValue().toString()));
}
if (ds.getServiceprovider() != null) {
metadata.add(asXmlElement("serviceprovider", ds.getServiceprovider().getValue().toString()));
}
if (ds.getDatabaseaccesstype() != null) {
metadata.add(asXmlElement("databaseaccesstype", ds.getDatabaseaccesstype().getValue()));
}
if (ds.getDatauploadtype() != null) {
metadata.add(asXmlElement("datauploadtype", ds.getDatauploadtype().getValue()));
}
if (ds.getDatabaseaccessrestriction() != null) {
metadata.add(asXmlElement("databaseaccessrestriction", ds.getDatabaseaccessrestriction().getValue()));
}
if (ds.getDatauploadrestriction() != null) {
metadata.add(asXmlElement("datauploadrestriction", ds.getDatauploadrestriction().getValue()));
}
if (ds.getVersioning() != null) {
metadata.add(asXmlElement("versioning", ds.getVersioning().getValue().toString()));
}
if (ds.getCitationguidelineurl() != null) {
metadata.add(asXmlElement("citationguidelineurl", ds.getCitationguidelineurl().getValue()));
}
if (ds.getQualitymanagementkind() != null) {
metadata.add(asXmlElement("qualitymanagementkind", ds.getQualitymanagementkind().getValue()));
}
if (ds.getPidsystems() != null) {
metadata.add(asXmlElement("pidsystems", ds.getPidsystems().getValue()));
}
if (ds.getCertificates() != null) {
metadata.add(asXmlElement("certificates", ds.getCertificates().getValue()));
}
if (ds.getPolicies() != null) {
metadata.addAll(ds.getPolicies()
.stream()
.map(kv -> mapKeyValue("policies", kv))
.collect(Collectors.toList()));
}
if (ds.getJournal() != null) {
metadata.add(mapJournal(ds.getJournal()));
}
if (ds.getSubjects() != null) {
metadata.addAll(ds.getSubjects()
.stream()
.map(sp -> mapStructuredProperty("subject", sp))
.collect(Collectors.toList()));
}
break;
case organization:
final Organization o = (Organization) entity;
if (o.getLegalshortname() != null) {
metadata.add(asXmlElement("legalshortname", o.getLegalshortname().getValue()));
}
if (o.getLegalname() != null) {
metadata.add(asXmlElement("legalname", o.getLegalname().getValue()));
}
if (o.getAlternativeNames() != null) {
metadata.addAll(o.getAlternativeNames()
.stream()
.map(c -> asXmlElement("alternativeNames", c.getValue()))
.collect(Collectors.toList()));
}
if (o.getWebsiteurl() != null) {
metadata.add(asXmlElement("websiteurl", o.getWebsiteurl().getValue()));
}
if (o.getLogourl() != null) {
metadata.add(asXmlElement("websiteurl", o.getLogourl().getValue()));
}
if (o.getEclegalbody() != null) {
metadata.add(asXmlElement("eclegalbody", o.getEclegalbody().getValue()));
}
if (o.getEclegalperson() != null) {
metadata.add(asXmlElement("eclegalperson", o.getEclegalperson().getValue()));
}
if (o.getEcnonprofit() != null) {
metadata.add(asXmlElement("ecnonprofit", o.getEcnonprofit().getValue()));
}
if (o.getEcresearchorganization() != null) {
metadata.add(asXmlElement("ecresearchorganization", o.getEcresearchorganization().getValue()));
}
if (o.getEchighereducation() != null) {
metadata.add(asXmlElement("echighereducation", o.getEchighereducation().getValue()));
}
if (o.getEcinternationalorganization() != null) {
metadata.add(asXmlElement("ecinternationalorganizationeurinterests", o.getEcinternationalorganization().getValue()));
}
if (o.getEcinternationalorganization() != null) {
metadata.add(asXmlElement("ecinternationalorganization", o.getEcinternationalorganization().getValue()));
}
if (o.getEcenterprise() != null) {
metadata.add(asXmlElement("ecenterprise", o.getEcenterprise().getValue()));
}
if (o.getEcsmevalidated() != null) {
metadata.add(asXmlElement("ecsmevalidated", o.getEcsmevalidated().getValue()));
}
if (o.getEcnutscode() != null) {
metadata.add(asXmlElement("ecnutscode", o.getEcnutscode().getValue()));
}
if (o.getCountry() != null) {
metadata.add(mapQualifier("country", o.getCountry()));
}
break;
case project:
final Project p = (Project) entity;
if (p.getWebsiteurl() != null) {
metadata.add(asXmlElement("websiteurl", p.getWebsiteurl().getValue()));
}
if (p.getCode() != null) {
metadata.add(asXmlElement("code", p.getCode().getValue()));
}
if (p.getAcronym() != null) {
metadata.add(asXmlElement("acronym", p.getAcronym().getValue()));
}
if (p.getTitle() != null) {
metadata.add(asXmlElement("title", p.getTitle().getValue()));
}
if (p.getStartdate() != null) {
metadata.add(asXmlElement("startdate", p.getStartdate().getValue()));
}
if (p.getEnddate() != null) {
metadata.add(asXmlElement("enddate", p.getEnddate().getValue()));
}
if (p.getCallidentifier() != null) {
metadata.add(asXmlElement("callidentifier", p.getCallidentifier().getValue()));
}
if (p.getKeywords() != null) {
metadata.add(asXmlElement("keywords", p.getKeywords().getValue()));
}
if (p.getDuration() != null) {
metadata.add(asXmlElement("duration", p.getDuration().getValue()));
}
if (p.getEcarticle29_3() != null) {
metadata.add(asXmlElement("ecarticle29_3", p.getEcarticle29_3().getValue()));
}
if (p.getSubjects() != null) {
metadata.addAll(p.getSubjects()
.stream()
.map(sp -> mapStructuredProperty("subject", sp))
.collect(Collectors.toList()));
}
if (p.getContracttype() != null) {
metadata.add(mapQualifier("contracttype", p.getContracttype()));
}
if (p.getEcsc39() != null) {
metadata.add(asXmlElement("ecsc39", p.getEcsc39().getValue()));
}
if (p.getContactfullname() != null) {
metadata.add(asXmlElement("contactfullname", p.getContactfullname().getValue()));
}
if (p.getContactfax() != null) {
metadata.add(asXmlElement("contactfax", p.getContactfax().getValue()));
}
if (p.getContactphone() != null) {
metadata.add(asXmlElement("contactphone", p.getContactphone().getValue()));
}
if (p.getContactemail() != null) {
metadata.add(asXmlElement("contactemail", p.getContactemail().getValue()));
}
if (p.getSummary() != null) {
metadata.add(asXmlElement("summary", p.getSummary().getValue()));
}
if (p.getCurrency() != null) {
metadata.add(asXmlElement("currency", p.getCurrency().getValue()));
}
if (p.getTotalcost() != null) {
metadata.add(asXmlElement("totalcost", p.getTotalcost().toString()));
}
if (p.getFundedamount() != null) {
metadata.add(asXmlElement("fundedamount", p.getFundedamount().toString()));
}
if (p.getFundingtree() != null) {
metadata.addAll(p.getFundingtree()
.stream()
.map(ft -> asXmlElement("fundingtree", ft.getValue()))
.collect(Collectors.toList()));
}
break;
default:
throw new IllegalArgumentException("invalid entity type: " + type);
}
return metadata;
}
private void mapDatasourceType(List<String> metadata, final Qualifier dsType) {
metadata.add(mapQualifier("datasourcetype", dsType));
if (specialDatasourceTypes.contains(dsType.getClassid())) {
dsType.setClassid("other");
dsType.setClassname("other");
}
metadata.add(mapQualifier("datasourcetypeui", dsType));
}
private Qualifier getBestAccessright(final Result r) {
Qualifier bestAccessRight = new Qualifier();
bestAccessRight.setClassid("UNKNOWN");
bestAccessRight.setClassname("not available");
bestAccessRight.setSchemeid("dnet:access_modes");
bestAccessRight.setSchemename("dnet:access_modes");
final LicenseComparator lc = new LicenseComparator();
for (final Instance instance : r.getInstance()) {
if (lc.compare(bestAccessRight, instance.getAccessright()) > 0) {
bestAccessRight = instance.getAccessright();
}
}
return bestAccessRight;
}
private List<String> listRelations(final JoinedEntity je, TemplateFactory templateFactory) {
final List<String> rels = Lists.newArrayList();
for (final Tuple2 link : je.getLinks()) {
final Relation rel = link.getRelation();
final RelatedEntity re = link.getRelatedEntity();
final String targetType = link.getRelatedEntity().getType();
final List<String> metadata = Lists.newArrayList();
switch (EntityType.valueOf(targetType)) {
case publication:
case dataset:
case otherresearchproduct:
case software:
if (re.getTitle() != null && isNotBlank(re.getTitle().getValue())) {
metadata.add(mapStructuredProperty("title", re.getTitle()));
}
if (isNotBlank(re.getDateofacceptance())) {
metadata.add(asXmlElement("dateofacceptance", re.getDateofacceptance()));
}
if (isNotBlank(re.getPublisher())) {
metadata.add(asXmlElement("publisher", re.getPublisher()));
}
if (isNotBlank(re.getCodeRepositoryUrl())) {
metadata.add(asXmlElement("coderepositoryurl", re.getCodeRepositoryUrl()));
}
if (re.getResulttype() != null & !re.getResulttype().isBlank()) {
metadata.add(mapQualifier("resulttype", re.getResulttype()));
}
if (re.getCollectedfrom() != null) {
metadata.addAll(re.getCollectedfrom()
.stream()
.map(kv -> mapKeyValue("collectedfrom", kv))
.collect(Collectors.toList()));
}
if (re.getPid() != null) {
metadata.addAll(re.getPid()
.stream()
.map(p -> mapStructuredProperty("pid", p))
.collect(Collectors.toList()));
}
break;
case datasource:
if (isNotBlank(re.getOfficialname())) {
metadata.add(asXmlElement("officialname", re.getOfficialname()));
}
if (re.getDatasourcetype() != null & !re.getDatasourcetype().isBlank()) {
mapDatasourceType(metadata, re.getDatasourcetype());
}
if (re.getOpenairecompatibility() != null & !re.getOpenairecompatibility().isBlank()) {
metadata.add(mapQualifier("openairecompatibility", re.getOpenairecompatibility()));
}
break;
case organization:
if (isNotBlank(re.getLegalname())) {
metadata.add(asXmlElement("legalname", re.getLegalname()));
}
if (isNotBlank(re.getLegalshortname())) {
metadata.add(asXmlElement("legalshortname", re.getLegalshortname()));
}
if (re.getCountry() != null & !re.getCountry().isBlank()) {
metadata.add(mapQualifier("country", re.getCountry()));
}
break;
case project:
if (isNotBlank(re.getProjectTitle())) {
metadata.add(asXmlElement("title", re.getProjectTitle()));
}
if (isNotBlank(re.getCode())) {
metadata.add(asXmlElement("code", re.getCode()));
}
if (isNotBlank(re.getAcronym())) {
metadata.add(asXmlElement("acronym", re.getAcronym()));
}
if (re.getContracttype() != null & !re.getContracttype().isBlank()) {
metadata.add(mapQualifier("contracttype", re.getContracttype()));
}
if (re.getFundingtree() != null) {
metadata.addAll(re.getFundingtree()
.stream()
.peek(ft -> fillContextMap(ft))
.map(ft -> getRelFundingTree(ft))
.collect(Collectors.toList()));
}
break;
default:
throw new IllegalArgumentException("invalid target type: " + targetType);
}
final DataInfo info = rel.getDataInfo();
rels.add(templateFactory.getRel(
targetType,
rel.getTarget(),
Sets.newHashSet(metadata),
getInverseRelClass(rel.getRelClass()),
getScheme(targetType, re.getType()),
info));
}
return rels;
}
private List<String> listChildren(final JoinedEntity je, TemplateFactory templateFactory) {
final List<String> children = Lists.newArrayList();
if (MainEntityType.result.toString().equals(getMainType(je.getType()))) {
final List<Instance> instances = ((Result) je.getEntity()).getInstance();
if (instances != null) {
for (final Instance instance : ((Result) je.getEntity()).getInstance()) {
final List<String> fields = Lists.newArrayList();
if (instance.getAccessright() != null && !instance.getAccessright().isBlank()) {
fields.add(mapQualifier("accessright", instance.getAccessright()));
}
if (instance.getCollectedfrom() != null) {
fields.add(mapKeyValue("collectedfrom", instance.getCollectedfrom()));
}
if (instance.getHostedby() != null) {
fields.add(mapKeyValue("hostedby", instance.getHostedby()));
}
if (instance.getDateofacceptance() != null && isNotBlank(instance.getDateofacceptance().getValue())) {
fields.add(asXmlElement("dateofacceptance", instance.getDateofacceptance().getValue()));
}
if (instance.getInstancetype() != null && !instance.getInstancetype().isBlank()) {
fields.add(mapQualifier("instancetype", instance.getInstancetype()));
}
if (isNotBlank(instance.getDistributionlocation())) {
fields.add(asXmlElement("distributionlocation", instance.getDistributionlocation()));
}
if (instance.getRefereed() != null && isNotBlank(instance.getRefereed().getValue())) {
fields.add(asXmlElement("refereed", instance.getRefereed().getValue()));
}
if (instance.getProcessingchargeamount() != null && isNotBlank(instance.getProcessingchargeamount().getValue())) {
fields.add(asXmlElement("processingchargeamount", instance.getProcessingchargeamount().getValue()));
}
if (instance.getProcessingchargecurrency() != null && isNotBlank(instance.getProcessingchargecurrency().getValue())) {
fields.add(asXmlElement("processingchargecurrency", instance.getProcessingchargecurrency().getValue()));
}
children.add(templateFactory.getInstance(instance.getHostedby().getKey(), fields, instance.getUrl()));
}
}
final List<ExternalReference> ext = ((Result) je.getEntity()).getExternalReference();
if (ext != null) {
for (final ExternalReference er : ((Result) je.getEntity()).getExternalReference()) {
final List<String> fields = Lists.newArrayList();
if (isNotBlank(er.getSitename())) {
fields.add(asXmlElement("sitename", er.getSitename()));
}
if (isNotBlank(er.getLabel())) {
fields.add(asXmlElement("label", er.getLabel()));
}
if (isNotBlank(er.getUrl())) {
fields.add(asXmlElement("url", er.getUrl()));
}
if (isNotBlank(er.getDescription())) {
fields.add(asXmlElement("description", er.getDescription()));
}
if (isNotBlank(er.getUrl())) {
fields.add(mapQualifier("qualifier", er.getQualifier()));
}
if (isNotBlank(er.getRefidentifier())) {
fields.add(asXmlElement("refidentifier", er.getRefidentifier()));
}
if (isNotBlank(er.getQuery())) {
fields.add(asXmlElement("query", er.getQuery()));
}
children.add(templateFactory.getChild("externalreference", null, fields));
}
}
}
return children;
}
private List<String> listExtraInfo(JoinedEntity je) {
final List<ExtraInfo> extraInfo = je.getEntity().getExtraInfo();
return extraInfo != null ? extraInfo
.stream()
.map(e -> mapExtraInfo(e))
.collect(Collectors.toList()) : Lists.newArrayList();
}
private List<String> buildContexts(final String type) {
final List<String> res = Lists.newArrayList();
if ((contextMapper != null) && !contextMapper.isEmpty() && MainEntityType.result.toString().equals(type)) {
XMLTag document = XMLDoc.newDocument(true).addRoot("contextRoot");
for (final String context : contextes) {
String id = "";
for (final String token : Splitter.on("::").split(context)) {
id += token;
final ContextDef def = contextMapper.get(id);
if (def == null) {
continue;
// throw new IllegalStateException(String.format("cannot find context for id '%s'", id));
}
if (def.getName().equals("context")) {
final String xpath = "//context/@id='" + def.getId() + "'";
if (!document.gotoRoot().rawXpathBoolean(xpath, new Object())) {
document = addContextDef(document.gotoRoot(), def);
}
}
if (def.getName().equals("category")) {
final String rootId = substringBefore(def.getId(), "::");
document = addContextDef(document.gotoRoot().gotoTag("//context[./@id='" + rootId + "']", new Object()), def);
}
if (def.getName().equals("concept")) {
document = addContextDef(document, def).gotoParent();
}
id += "::";
}
}
final Transformer transformer = getTransformer();
for (final org.w3c.dom.Element x : document.gotoRoot().getChildElement()) {
try {
res.add(asStringElement(x, transformer));
} catch (final TransformerException e) {
throw new RuntimeException(e);
}
}
}
return res;
}
private Transformer getTransformer() {
try {
Transformer transformer = TransformerFactory.newInstance().newTransformer();
transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
return transformer;
} catch (TransformerConfigurationException e) {
throw new IllegalStateException("unable to create javax.xml.transform.Transformer", e);
}
}
private XMLTag addContextDef(final XMLTag tag, final ContextDef def) {
tag.addTag(def.getName()).addAttribute("id", def.getId()).addAttribute("label", def.getLabel());
if ((def.getType() != null) && !def.getType().isEmpty()) {
tag.addAttribute("type", def.getType());
}
return tag;
}
private String asStringElement(final org.w3c.dom.Element element, final Transformer transformer) throws TransformerException {
final StringWriter buffer = new StringWriter();
transformer.transform(new DOMSource(element), new StreamResult(buffer));
return buffer.toString();
}
private void fillContextMap(final String xmlTree) {
Document fundingPath;
try {
fundingPath = new SAXReader().read(new StringReader(xmlTree));
} catch (final DocumentException e) {
throw new RuntimeException(e);
}
try {
final Node funder = fundingPath.selectSingleNode("//funder");
if (funder != null) {
final String funderShortName = funder.valueOf("./shortname");
contextes.add(funderShortName);
contextMapper.put(funderShortName, new ContextDef(funderShortName, funder.valueOf("./name"), "context", "funding"));
final Node level0 = fundingPath.selectSingleNode("//funding_level_0");
if (level0 != null) {
final String level0Id = Joiner.on("::").join(funderShortName, level0.valueOf("./name"));
contextMapper.put(level0Id, new ContextDef(level0Id, level0.valueOf("./description"), "category", ""));
final Node level1 = fundingPath.selectSingleNode("//funding_level_1");
if (level1 == null) {
contextes.add(level0Id);
} else {
final String level1Id = Joiner.on("::").join(level0Id, level1.valueOf("./name"));
contextMapper.put(level1Id, new ContextDef(level1Id, level1.valueOf("./description"), "concept", ""));
final Node level2 = fundingPath.selectSingleNode("//funding_level_2");
if (level2 == null) {
contextes.add(level1Id);
} else {
final String level2Id = Joiner.on("::").join(level1Id, level2.valueOf("./name"));
contextMapper.put(level2Id, new ContextDef(level2Id, level2.valueOf("./description"), "concept", ""));
contextes.add(level2Id);
}
}
}
}
} catch (final NullPointerException e) {
throw new IllegalArgumentException("malformed funding path: " + xmlTree, e);
}
}
@SuppressWarnings("unchecked")
private String getRelFundingTree(final String xmlTree) {
String funding = "<funding>";
try {
final Document ftree = new SAXReader().read(new StringReader(xmlTree));
funding = "<funding>";
funding += getFunderElement(ftree);
for (final Object o : Lists.reverse(ftree.selectNodes("//fundingtree//*[starts-with(local-name(),'funding_level_')]"))) {
final Element e = (Element) o;
final String _id = e.valueOf("./id");
funding += "<" + e.getName() + " name=\"" + escapeXml(e.valueOf("./name")) + "\">" + escapeXml(_id) + "</" + e.getName() + ">";
}
} catch (final DocumentException e) {
throw new IllegalArgumentException("unable to parse funding tree: " + xmlTree + "\n" + e.getMessage());
} finally {
funding += "</funding>";
}
return funding;
}
private String getFunderElement(final Document ftree) {
final String funderId = ftree.valueOf("//fundingtree/funder/id/text()");
final String funderShortName = ftree.valueOf("//fundingtree/funder/shortname/text()");
final String funderName = ftree.valueOf("//fundingtree/funder/name/text()");
final String funderJurisdiction = ftree.valueOf("//fundingtree/funder/jurisdiction/text()");
return "<funder id=\"" + escapeXml(funderId) + "\" shortname=\"" + escapeXml(funderShortName) + "\" name=\"" + escapeXml(funderName)
+ "\" jurisdiction=\"" + escapeXml(funderJurisdiction) + "\" />";
}
}

View File

@ -0,0 +1,151 @@
package eu.dnetlib.dhp.graph.utils;
import eu.dnetlib.dhp.schema.oaf.*;
import static eu.dnetlib.dhp.graph.utils.GraphMappingUtils.removePrefix;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class XmlSerializationUtils {
// XML 1.0
// #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD] | [#x10000-#x10FFFF]
private final static String xml10pattern = "[^"
+ "\u0009\r\n"
+ "\u0020-\uD7FF"
+ "\uE000-\uFFFD"
+ "\ud800\udc00-\udbff\udfff"
+ "]";
public static String mapJournal(Journal j) {
final String attrs = new StringBuilder()
.append(attr("issn", j.getIssnPrinted()))
.append(attr("eissn", j.getIssnOnline()))
.append(attr("lissn", j.getIssnLinking()))
.append(attr("ep", j.getEp()))
.append(attr("iss", j.getIss()))
.append(attr("sp", j.getSp()))
.append(attr("vol", j.getVol()))
.toString()
.trim();
return new StringBuilder()
.append("<journal")
.append(isNotBlank(attrs) ? (" " + attrs) : "")
.append(">")
.append(escapeXml(j.getName()))
.append("</journal>")
.toString();
}
private static String attr(final String name, final String value) {
return isNotBlank(value) ? name + "=\"" + escapeXml(value) + "\" " : "";
}
public static String mapStructuredProperty(String name, StructuredProperty t) {
return asXmlElement(name, t.getValue(), t.getQualifier(), t.getDataInfo() != null ? t.getDataInfo() : null);
}
public static String mapQualifier(String name, Qualifier q) {
return asXmlElement(name, "", q, null);
}
public static String escapeXml(final String value) {
return value
.replaceAll("&", "&amp;")
.replaceAll("<", "&lt;")
.replaceAll(">", "&gt;")
.replaceAll("\"", "&quot;")
.replaceAll("'", "&apos;")
.replaceAll(xml10pattern, "");
}
public static String parseDataInfo(final DataInfo dataInfo) {
return new StringBuilder()
.append("<datainfo>")
.append(asXmlElement("inferred", dataInfo.getInferred() + ""))
.append(asXmlElement("deletedbyinference", dataInfo.getDeletedbyinference() + ""))
.append(asXmlElement("trust", dataInfo.getTrust() + ""))
.append(asXmlElement("inferenceprovenance", dataInfo.getInferenceprovenance() + ""))
.append(asXmlElement("provenanceaction", null, dataInfo.getProvenanceaction(), null))
.append("</datainfo>")
.toString();
}
private static StringBuilder dataInfoAsAttributes(final StringBuilder sb, final DataInfo info) {
return sb
.append(attr("inferred", info.getInferred() != null ? info.getInferred().toString() : ""))
.append(attr("inferenceprovenance", info.getInferenceprovenance()))
.append(attr("provenanceaction", info.getProvenanceaction() != null ? info.getProvenanceaction().getClassid() : ""))
.append(attr("trust", info.getTrust()));
}
public static String mapKeyValue(final String name, final KeyValue kv) {
return new StringBuilder()
.append("<")
.append(name)
.append(" name=\"")
.append(escapeXml(kv.getValue()))
.append("\" id=\"")
.append(escapeXml(removePrefix(kv.getKey())))
.append("\"/>")
.toString();
}
public static String mapExtraInfo(final ExtraInfo e) {
return new StringBuilder("<extraInfo ")
.append("name=\"" + e.getName() + "\" ")
.append("typology=\"" + e.getTypology() + "\" ")
.append("provenance=\"" + e.getProvenance() + "\" ")
.append("trust=\"" + e.getTrust() + "\"")
.append(">")
.append(e.getValue())
.append("</extraInfo>")
.toString();
}
public static String asXmlElement(final String name, final String value) {
return asXmlElement(name, value, null, null);
}
public static String asXmlElement(final String name, final String value, final Qualifier q, final DataInfo info) {
StringBuilder sb = new StringBuilder();
sb.append("<");
sb.append(name);
if (q != null) {
sb.append(getAttributes(q));
}
if (info != null) {
sb
.append(" ")
.append(attr("inferred", info.getInferred() != null ? info.getInferred().toString() : ""))
.append(attr("inferenceprovenance", info.getInferenceprovenance()))
.append(attr("provenanceaction", info.getProvenanceaction() != null ? info.getProvenanceaction().getClassid() : ""))
.append(attr("trust", info.getTrust()));
}
if (isBlank(value)) {
sb.append("/>");
return sb.toString();
}
sb.append(">");
sb.append(escapeXml(value));
sb.append("</");
sb.append(name);
sb.append(">");
return sb.toString();
}
public static String getAttributes(final Qualifier q) {
if (q == null || q.isBlank()) return "";
return new StringBuilder(" ")
.append(attr("classid", q.getClassid()))
.append(attr("classname", q.getClassname()))
.append(attr("schemeid", q.getSchemeid()))
.append(attr("schemename", q.getSchemename()))
.toString();
}
}

View File

@ -0,0 +1 @@
net.sf.saxon.TransformerFactoryImpl

View File

@ -0,0 +1,6 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true},
{"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the path used to store temporary output files", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequence file to read", "paramRequired": true}
]

View File

@ -0,0 +1,7 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequence file to read the XML records", "paramRequired": true},
{"paramName":"f", "paramLongName":"format", "paramDescription": "MDFormat name found in the IS profile", "paramRequired": true},
{"paramName":"b", "paramLongName":"batchSize", "paramDescription": "size of the batch of documents sent to solr", "paramRequired": false}
]

View File

@ -0,0 +1,34 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hive_db_name</name>
<value>openaire</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18088</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/applicationHistory</value>
</property>
</configuration>

View File

@ -0,0 +1,99 @@
<workflow-app name="index_infospace_graph" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>hive_db_name</name>
<description>the target hive database name</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<start to="reuse_records"/>
<decision name="reuse_records">
<switch>
<case to="adjancency_lists">${wf:conf('reuseRecords') eq false}</case>
<case to="to_solr_index">${wf:conf('reuseRecords') eq true}</case>
<default to="adjancency_lists"/>
</switch>
</decision>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="adjancency_lists">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>cluster</mode>
<name>build_adjacency_lists</name>
<class>eu.dnetlib.dhp.graph.SparkXmlRecordBuilderJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>-mt</arg> <arg>yarn</arg>
<arg>-is</arg> <arg>${isLookupUrl}</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
</spark>
<ok to="to_solr_index"/>
<error to="Kill"/>
</action>
<action name="to_solr_index">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>cluster</mode>
<name>to_solr_index</name>
<class>eu.dnetlib.dhp.graph.SparkXmlIndexingJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForIndexing}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>-mt</arg> <arg>yarn</arg>
<arg>-is</arg> <arg>${isLookupUrl}</arg>
<arg>--sourcePath</arg><arg>${outputPath}/xml</arg>
<arg>--format</arg><arg>${format}</arg>
<arg>--batchSize</arg><arg>${batchSize}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,3 @@
<name$if(hasId)$ objidentifier="$id$"$else$$endif>>
$metadata:{ it | $it$ }$
</name>

View File

@ -0,0 +1,10 @@
<oaf:$name$>
$metadata:{ it | $it$ }$
<rels>
$rels:{ it | $it$ }$
</rels>
<children>
$children:{ it | $it$ }$
</children>
</oaf:$name$>
$extrainfo:{ it | $it$ }$

View File

@ -0,0 +1,4 @@
<instance id="$instanceId$">
$metadata:{ it | $it$ }$
$webresources:{ it | $it$ }$
</instance>

View File

@ -0,0 +1,17 @@
<?xml version="1.0"?>
<record>
<result xmlns:dri="http://www.driver-repository.eu/namespace/dri" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<header>
<dri:objIdentifier>$id$</dri:objIdentifier>
<dri:dateOfCollection>$dateofcollection$</dri:dateOfCollection>
<dri:dateOfTransformation>$dateoftransformation$</dri:dateOfTransformation>
</header>
<metadata>
<oaf:entity xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:oaf="http://namespace.openaire.eu/oaf"
xsi:schemaLocation="http://namespace.openaire.eu/oaf $schemaLocation$">
$it$
</oaf:entity>
</metadata>
</result>
</record>

View File

@ -0,0 +1,4 @@
<rel inferred="$inferred$" trust="$trust$" inferenceprovenance="$inferenceprovenance$" provenanceaction="$provenanceaction$">
<to class="$class$" scheme="$scheme$" type="$type$">$objIdentifier$</to>
$metadata:{ it | $it$ }$
</rel>

View File

@ -0,0 +1,3 @@
<webresource>
<url>$identifier$</url>
</webresource>

View File

@ -18,6 +18,7 @@
<module>dhp-distcp</module>
<module>dhp-graph-mapper</module>
<module>dhp-dedup</module>
<module>dhp-graph-provision</module>
</modules>
<pluginRepositories>

75
pom.xml
View File

@ -98,6 +98,12 @@
<version>${dhp.hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${dhp.hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
@ -151,7 +157,7 @@
<dependency>
<groupId>net.sf.saxon</groupId>
<artifactId>Saxon-HE</artifactId>
<version>9.5.1-5</version>
<version>9.9.1-6</version>
</dependency>
<dependency>
@ -172,6 +178,56 @@
<version>1.1.6</version>
</dependency>
<dependency>
<groupId>com.mycila.xmltool</groupId>
<artifactId>xmltool</artifactId>
<version>3.3</version>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
<version>7.5.0</version>
<exclusions>
<exclusion>
<artifactId>*</artifactId>
<groupId>*</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.lucidworks.spark</groupId>
<artifactId>spark-solr</artifactId>
<version>3.6.0</version>
<exclusions>
<exclusion>
<artifactId>*</artifactId>
<groupId>*</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.3</version>
</dependency>
<dependency>
<groupId>org.noggit</groupId>
<artifactId>noggit</artifactId>
<version>0.8</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.11</version>
</dependency>
<dependency>
<groupId>net.schmizz</groupId>
<artifactId>sshj</artifactId>
@ -204,8 +260,17 @@
<artifactId>dnet-pace-core</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>cnr-rmi-api</artifactId>
<version>[2.0.0,3.0.0)</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http</artifactId>
<version>3.1.5</version>
</dependency>
<dependency>
<groupId>javax.persistence</groupId>
<artifactId>javax.persistence-api</artifactId>
@ -233,12 +298,16 @@
<artifactId>secondstring</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>${mongodb.driver.version}</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>stringtemplate</artifactId>
<version>4.0</version>
</dependency>
<dependency>
<groupId>org.apache.oozie</groupId>