diff --git a/.gitignore b/.gitignore index 3f00d9729..66fe55aa9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ .DS_Store .idea *.iml +*.ipr +*.iws *~ .classpath /*/.classpath @@ -18,5 +20,5 @@ /*/build /build spark-warehouse -/*/*/job-override.properties +/**/job-override.properties diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java index a6a4e9291..5401b71c1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java @@ -33,17 +33,12 @@ public class SparkGraphImporterJob { // Read the input file and convert it into RDD of serializable object GraphMappingUtils.types.forEach((name, clazz) -> { - final JavaRDD> inputRDD = sc.sequenceFile(inputPath + "/" + name, Text.class, Text.class) - .map(item -> new Tuple2<>(item._1.toString(), item._2.toString())); - - spark.createDataset(inputRDD - .filter(s -> s._1().equals(clazz.getName())) - .map(Tuple2::_2) - .map(s -> new ObjectMapper().readValue(s, clazz)) + spark.createDataset(sc.sequenceFile(inputPath + "/" + name, Text.class, Text.class) + .map(s -> new ObjectMapper().readValue(s._2().toString(), clazz)) .rdd(), Encoders.bean(clazz)) - .write() - .mode(SaveMode.Overwrite) - .saveAsTable(hiveDbName + "." + name); + .write() + .mode(SaveMode.Overwrite) + .saveAsTable(hiveDbName + "." + name); }); } diff --git a/dhp-workflows/dhp-graph-provision/job-override.properties b/dhp-workflows/dhp-graph-provision/job-override.properties index acaf16717..c7b173a14 100644 --- a/dhp-workflows/dhp-graph-provision/job-override.properties +++ b/dhp-workflows/dhp-graph-provision/job-override.properties @@ -1,5 +1,11 @@ -sparkDriverMemory=7G -sparkExecutorMemory=7G -hive_db_name=claudio -sourcePath=/tmp/db_openaireplus_services_beta.export.2019.11.06 -outputPath=/tmp/openaire_provision \ No newline at end of file +sparkDriverMemory=8G +sparkExecutorMemory=8G +#isLookupUrl=http://services.openaire.eu:8280/is/services/isLookUp +isLookupUrl=http://beta.services.openaire.eu:8280/is/services/isLookUp?wsdl +sourcePath=/tmp/db_openaireplus_services.export_dhp.2020.02.03 +outputPath=/tmp/openaire_provision +format=TMF +batchSize=1000 +sparkExecutorCoresForIndexing=1 +sparkExecutorInstances=10 +reuseRecords=false \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/pom.xml b/dhp-workflows/dhp-graph-provision/pom.xml index 62d8ac2ae..5e6beb249 100644 --- a/dhp-workflows/dhp-graph-provision/pom.xml +++ b/dhp-workflows/dhp-graph-provision/pom.xml @@ -23,6 +23,52 @@ com.jayway.jsonpath json-path + + dom4j + dom4j + + + jaxen + jaxen + + + com.mycila.xmltool + xmltool + + + org.antlr + stringtemplate + + + org.apache.solr + solr-solrj + + + com.lucidworks.spark + spark-solr + + + + org.apache.httpcomponents + httpclient + + + org.noggit + noggit + + + org.apache.zookeeper + zookeeper + + + + org.apache.cxf + cxf-rt-transports-http + + + eu.dnetlib + cnr-rmi-api + eu.dnetlib.dhp diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java index f7bf0da39..062c8886b 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java @@ -6,9 +6,14 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.JsonPath; +import eu.dnetlib.dhp.graph.model.*; +import eu.dnetlib.dhp.graph.utils.ContextMapper; +import eu.dnetlib.dhp.graph.utils.GraphMappingUtils; +import eu.dnetlib.dhp.graph.utils.XmlRecordFactory; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -18,9 +23,12 @@ import scala.Tuple2; import java.io.IOException; import java.io.Serializable; +import java.util.HashSet; import java.util.List; import java.util.stream.Collectors; +import static eu.dnetlib.dhp.graph.utils.GraphMappingUtils.asRelatedEntity; + /** * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, @@ -41,16 +49,21 @@ import java.util.stream.Collectors; */ public class GraphJoiner implements Serializable { - public static final int MAX_RELS = 10; + public static final int MAX_RELS = 100; + + public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd"; private SparkSession spark; + private ContextMapper contextMapper; + private String inputPath; private String outPath; - public GraphJoiner(SparkSession spark, String inputPath, String outPath) { + public GraphJoiner(SparkSession spark, ContextMapper contextMapper, String inputPath, String outPath) { this.spark = spark; + this.contextMapper = contextMapper; this.inputPath = inputPath; this.outPath = outPath; } @@ -68,7 +81,7 @@ public class GraphJoiner implements Serializable { JavaPairRDD publication = readPathEntity(sc, getInputPath(), "publication"); // create the union between all the entities - final String entitiesPath = getOutPath() + "/0_entities"; + final String entitiesPath = getOutPath() + "/entities"; datasource .union(organization) .union(project) @@ -94,102 +107,74 @@ public class GraphJoiner implements Serializable { .flatMap(p -> p.iterator()) .mapToPair(p -> new Tuple2<>(p.getRelation().getTargetId(), p)); - final String joinByTargetPath = getOutPath() + "/1_join_by_target"; - relation + //final String bySource = getOutPath() + "/1_join_by_target"; + JavaPairRDD bySource = relation .join(entities .filter(e -> !e._2().getSource().getDeleted()) - .mapToPair(e -> new Tuple2<>(e._1(), new GraphMappingUtils().pruneModel(e._2())))) + .mapToPair(e -> new Tuple2<>(e._1(), asRelatedEntity(e._2())))) .map(s -> new EntityRelEntity() .setRelation(s._2()._1().getRelation()) .setTarget(s._2()._2().getSource())) - .map(GraphMappingUtils::serialize) - .saveAsTextFile(joinByTargetPath, GzipCodec.class); - - JavaPairRDD bySource = sc.textFile(joinByTargetPath) - .map(t -> new ObjectMapper().readValue(t, EntityRelEntity.class)) .mapToPair(t -> new Tuple2<>(t.getRelation().getSourceId(), t)); - final String linkedEntityPath = getOutPath() + "/2_linked_entities"; + final XmlRecordFactory recordFactory = new XmlRecordFactory(contextMapper, false, schemaLocation, new HashSet<>()); entities .union(bySource) .groupByKey() // by source id - .map(p -> toLinkedEntity(p)) - .map(e -> new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL).writeValueAsString(e)) - .saveAsTextFile(linkedEntityPath, GzipCodec.class); - - final String joinedEntitiesPath = getOutPath() + "/3_joined_entities"; - sc.textFile(linkedEntityPath) - .map(s -> new ObjectMapper().readValue(s, LinkedEntity.class)) .map(l -> toJoinedEntity(l)) - .map(j -> new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL).writeValueAsString(j)) - .saveAsTextFile(joinedEntitiesPath); + .mapToPair(je -> new Tuple2<>( + new Text(je.getEntity().getId()), + new Text(recordFactory.build(je)))) + .saveAsHadoopFile(getOutPath() + "/xml", Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); return this; } public GraphJoiner asXML() { final JavaSparkContext sc = new JavaSparkContext(getSpark().sparkContext()); + final XmlRecordFactory recordFactory = new XmlRecordFactory(contextMapper, true, "", new HashSet<>()); + final ObjectMapper mapper = new ObjectMapper(); - final String joinedEntitiesPath = getOutPath() + "/3_joined_entities"; + final String joinedEntitiesPath = getOutPath() + "/1_joined_entities"; sc.textFile(joinedEntitiesPath) - .map(s -> new ObjectMapper().readValue(s, LinkedEntity.class)) - .map(l -> toXML(l)) - .saveAsTextFile(getOutPath() + "/4_xml"); + .map(s -> mapper.readValue(s, JoinedEntity.class)) + .mapToPair(je -> new Tuple2<>(new Text(je.getEntity().getId()), new Text(recordFactory.build(je)))) + .saveAsHadoopFile(getOutPath() + "/2_xml", Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); return this; } - private String toXML(LinkedEntity l) { - - return null; - } - public SparkSession getSpark() { return spark; } - public GraphJoiner setSpark(SparkSession spark) { - this.spark = spark; - return this; - } - public String getInputPath() { return inputPath; } - public GraphJoiner setInputPath(String inputPath) { - this.inputPath = inputPath; - return this; - } - public String getOutPath() { return outPath; } - public GraphJoiner setOutPath(String outPath) { - this.outPath = outPath; - return this; - } - // HELPERS private OafEntity parseOaf(final String json, final String type) { final ObjectMapper o = new ObjectMapper(); try { - switch (type) { - case "publication": + switch (GraphMappingUtils.EntityType.valueOf(type)) { + case publication: return o.readValue(json, Publication.class); - case "dataset": + case dataset: return o.readValue(json, Dataset.class); - case "otherresearchproduct": + case otherresearchproduct: return o.readValue(json, OtherResearchProduct.class); - case "software": + case software: return o.readValue(json, Software.class); - case "datasource": + case datasource: return o.readValue(json, Datasource.class); - case "organization": + case organization: return o.readValue(json, Organization.class); - case "project": + case project: return o.readValue(json, Project.class); default: throw new IllegalArgumentException("invalid type: " + type); @@ -199,56 +184,36 @@ public class GraphJoiner implements Serializable { } } - /** - * Converts the result of grouping pairs and the entities by source id to LinkedEntity - * @param p - * @return - */ - private LinkedEntity toLinkedEntity(Tuple2> p) { - final LinkedEntity e = new LinkedEntity(); - final List links = Lists.newArrayList(); + private JoinedEntity toJoinedEntity(Tuple2> p) { + final ObjectMapper o = new ObjectMapper(); + final JoinedEntity j = new JoinedEntity(); + final Links links2 = new Links(); for(EntityRelEntity rel : p._2()) { - if (rel.hasMainEntity() & e.getEntity() == null) { - e.setEntity(rel.getSource()); + if (rel.hasMainEntity() & j.getEntity() == null) { + j.setType(rel.getSource().getType()); + j.setEntity(parseOaf(rel.getSource().getOaf(), rel.getSource().getType())); } if (rel.hasRelatedEntity()) { - links.add(new Tuple() - .setRelation(rel.getRelation()) - .setTarget(rel.getTarget())); + try { + links2.add( + new eu.dnetlib.dhp.graph.model.Tuple2() + .setRelation(o.readValue(rel.getRelation().getOaf(), Relation.class)) + .setRelatedEntity(o.readValue(rel.getTarget().getOaf(), RelatedEntity.class))); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } } } - e.setLinks(links); - if (e.getEntity() == null) { + j.setLinks(links2); + if (j.getEntity() == null) { throw new IllegalStateException("missing main entity on '" + p._1() + "'"); } - return e; - } - - /** - * Converts a LinkedEntity to a JoinedEntity - * @param l - * @return - */ - private JoinedEntity toJoinedEntity(LinkedEntity l) { - return new JoinedEntity().setType(l.getEntity().getType()) - .setEntity(parseOaf(l.getEntity().getOaf(), l.getEntity().getType())) - .setLinks(l.getLinks() - .stream() - .map(t -> { - final ObjectMapper o = new ObjectMapper(); - try { - return new Tuple2<>( - o.readValue(t.getRelation().getOaf(), Relation.class), - o.readValue(t.getTarget().getOaf(), RelatedEntity.class)); - } catch (IOException e) { - throw new IllegalArgumentException(e); - } - }).collect(Collectors.toList())); + return j; } /** * Reads a set of eu.dnetlib.dhp.schema.oaf.OafEntity objects from a sequence file , - * extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.TypedRow + * extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow * @param sc * @param inputPath * @param type @@ -270,7 +235,7 @@ public class GraphJoiner implements Serializable { /** * Reads a set of eu.dnetlib.dhp.schema.oaf.Relation objects from a sequence file , - * extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.TypedRow + * extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow * @param sc * @param inputPath * @return the JavaRDD containing all the relationships diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java deleted file mode 100644 index e3622cd20..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java +++ /dev/null @@ -1,160 +0,0 @@ -package eu.dnetlib.dhp.graph; - -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Maps; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.JsonPath; -import eu.dnetlib.dhp.schema.oaf.*; -import net.minidev.json.JSONArray; -import org.apache.commons.lang3.StringUtils; - -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.stream.Collectors; - -public class GraphMappingUtils { - - public final static Map types = Maps.newHashMap(); - - static { - types.put("datasource", Datasource.class); - types.put("organization", Organization.class); - types.put("project", Project.class); - types.put("dataset", Dataset.class); - types.put("otherresearchproduct", OtherResearchProduct.class); - types.put("software", Software.class); - types.put("publication", Publication.class); - types.put("relation", Relation.class); - } - - public static EntityRelEntity pruneModel(EntityRelEntity e) { - - final DocumentContext j = JsonPath.parse(e.getSource().getOaf()); - final RelatedEntity re = new RelatedEntity().setId(j.read("$.id")).setType(e.getSource().getType()); - - switch (e.getSource().getType()) { - case "publication": - case "dataset": - case "otherresearchproduct": - case "software": - mapTitle(j, re); - re.setDateofacceptance(j.read("$.dateofacceptance.value")); - re.setPublisher(j.read("$.publisher.value")); - - JSONArray pids = j.read("$.pid"); - re.setPid(pids.stream() - .map(p -> asStructuredProperty((LinkedHashMap) p)) - .collect(Collectors.toList())); - - re.setResulttype(asQualifier(j.read("$.resulttype"))); - - JSONArray collfrom = j.read("$.collectedfrom"); - re.setCollectedfrom(collfrom.stream() - .map(c -> asKV((LinkedHashMap)c)) - .collect(Collectors.toList())); - - //TODO still to be mapped - //re.setCodeRepositoryUrl(j.read("$.coderepositoryurl")); - - break; - case "datasource": - re.setOfficialname(j.read("$.officialname.value")); - re.setWebsiteurl(j.read("$.websiteurl.value")); - re.setDatasourcetype(asQualifier(j.read("$.datasourcetype"))); - re.setOpenairecompatibility(asQualifier(j.read("$.openairecompatibility"))); - - break; - case "organization": - re.setLegalname(j.read("$.legalname.value")); - re.setLegalshortname(j.read("$.legalshortname.value")); - re.setCountry(asQualifier(j.read("$.country"))); - - break; - case "project": - re.setProjectTitle(j.read("$.title.value")); - re.setCode(j.read("$.code.value")); - re.setAcronym(j.read("$.acronym.value")); - re.setContracttype(asQualifier(j.read("$.contracttype"))); - - JSONArray f = j.read("$.fundingtree"); - if (!f.isEmpty()) { - re.setFundingtree(f.stream() - .map(s -> s.toString()) - .collect(Collectors.toList())); - } - - break; - } - return new EntityRelEntity().setSource( - new TypedRow() - .setSourceId(e.getSource().getSourceId()) - .setDeleted(e.getSource().getDeleted()) - .setType(e.getSource().getType()) - .setOaf(serialize(re))); - } - - private static KeyValue asKV(LinkedHashMap j) { - final KeyValue kv = new KeyValue(); - kv.setKey((String) j.get("key")); - kv.setValue((String) j.get("value")); - return kv; - } - - private static void mapTitle(DocumentContext j, RelatedEntity re) { - final JSONArray a = j.read("$.title"); - if (!a.isEmpty()) { - final StructuredProperty sp = asStructuredProperty((LinkedHashMap) a.get(0)); - if(StringUtils.isNotBlank(sp.getValue())) { - re.setTitle(sp); - } - } - } - - private static StructuredProperty asStructuredProperty(LinkedHashMap j) { - final StructuredProperty sp = new StructuredProperty(); - final String value = (String) j.get("value"); - if (StringUtils.isNotBlank(value)) { - sp.setValue((String) j.get("value")); - sp.setQualifier(asQualifier((LinkedHashMap) j.get("qualifier"))); - } - return sp; - } - - public static Qualifier asQualifier(LinkedHashMap j) { - final Qualifier q = new Qualifier(); - - final String classid = j.get("classid"); - if (StringUtils.isNotBlank(classid)) { - q.setClassid(classid); - } - - final String classname = j.get("classname"); - if (StringUtils.isNotBlank(classname)) { - q.setClassname(classname); - } - - final String schemeid = j.get("schemeid"); - if (StringUtils.isNotBlank(schemeid)) { - q.setSchemeid(schemeid); - } - - final String schemename = j.get("schemename"); - if (StringUtils.isNotBlank(schemename)) { - q.setSchemename(schemename); - } - return q; - } - - public static String serialize(final Object o) { - try { - return new ObjectMapper() - .setSerializationInclusion(JsonInclude.Include.NON_NULL) - .writeValueAsString(o); - } catch (JsonProcessingException e) { - throw new IllegalArgumentException("unable to serialize: " + o.toString(), e); - } - } - -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java deleted file mode 100644 index 9e6fc0d38..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/LinkedEntity.java +++ /dev/null @@ -1,29 +0,0 @@ -package eu.dnetlib.dhp.graph; - -import java.io.Serializable; -import java.util.List; - -public class LinkedEntity implements Serializable { - - private TypedRow entity; - - private List links; - - public TypedRow getEntity() { - return entity; - } - - public LinkedEntity setEntity(TypedRow entity) { - this.entity = entity; - return this; - } - - public List getLinks() { - return links; - } - - public LinkedEntity setLinks(List links) { - this.links = links; - return this; - } -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlIndexingJob.java new file mode 100644 index 000000000..e13f8bbe2 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlIndexingJob.java @@ -0,0 +1,188 @@ +package eu.dnetlib.dhp.graph; + +import com.lucidworks.spark.util.SolrSupport; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.graph.utils.ISLookupClientFactory; +import eu.dnetlib.dhp.graph.utils.StreamingInputDocumentFactory; +import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Text; +import org.apache.solr.common.SolrInputDocument; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.SparkSession; + +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.stream.StreamResult; +import javax.xml.transform.stream.StreamSource; +import java.io.IOException; +import java.io.StringReader; +import java.io.StringWriter; +import java.text.SimpleDateFormat; +import java.util.Date; + +public class SparkXmlIndexingJob { + + private static final Log log = LogFactory.getLog(SparkXmlIndexingJob.class); + + private static final Integer DEFAULT_BATCH_SIZE = 1000; + + private static final String LAYOUT = "index"; + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkXmlIndexingJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_params_update_index.json"))); + parser.parseArgument(args); + + final String inputPath = parser.get("sourcePath"); + final String isLookupUrl = parser.get("isLookupUrl"); + final String format = parser.get("format"); + final Integer batchSize = parser.getObjectMap().containsKey("batckSize") ? Integer.valueOf(parser.get("batchSize")) : DEFAULT_BATCH_SIZE; + + final ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl); + final String fields = getLayoutSource(isLookup, format); + final String xslt = getLayoutTransformer(isLookup); + + final String dsId = getDsId(format, isLookup); + final String zkHost = getZkHost(isLookup); + final String version = getRecordDatestamp(); + + final String indexRecordXslt = getLayoutTransformer(format, fields, xslt); + + log.info("indexRecordTransformer: " + indexRecordXslt); + + final String master = parser.get("master"); + final SparkConf conf = new SparkConf() + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + + try(SparkSession spark = getSession(conf, master)) { + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + RDD docs = sc.sequenceFile(inputPath, Text.class, Text.class) + .map(t -> t._2().toString()) + .map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s)) + .map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s)) + .rdd(); + + SolrSupport.indexDocs(zkHost, format + "-" + LAYOUT + "-openaire", batchSize, docs); + } + } + + private static SparkSession getSession(SparkConf conf, String master) { + return SparkSession + .builder() + .config(conf) + .appName(SparkXmlRecordBuilderJob.class.getSimpleName()) + .master(master) + .getOrCreate(); + } + + private static String toIndexRecord(Transformer tr, final String record) { + final StreamResult res = new StreamResult(new StringWriter()); + try { + tr.transform(new StreamSource(new StringReader(record)), res); + return res.getWriter().toString(); + } catch (Throwable e) { + System.out.println("XPathException on record:\n" + record); + throw new IllegalArgumentException(e); + } + } + + /** + * Creates the XSLT responsible for building the index xml records. + * + * @param format Metadata format name (DMF|TMF) + * @param xslt xslt for building the index record transformer + * @param fields the list of fields + * @return the javax.xml.transform.Transformer + * @throws ISLookUpException could happen + * @throws IOException could happen + * @throws TransformerException could happen + */ + private static String getLayoutTransformer(String format, String fields, String xslt) throws TransformerException { + + final Transformer layoutTransformer = SaxonTransformerFactory.newInstance(xslt); + final StreamResult layoutToXsltXslt = new StreamResult(new StringWriter()); + + layoutTransformer.setParameter("format", format); + layoutTransformer.transform(new StreamSource(new StringReader(fields)), layoutToXsltXslt); + + return layoutToXsltXslt.getWriter().toString(); + } + + /** + * method return a solr-compatible string representation of a date, used to mark all records as indexed today + * @return the parsed date + */ + public static String getRecordDatestamp() { + return new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss'Z'").format(new Date()); + } + + /** + * Method retrieves from the information system the list of fields associated to the given MDFormat name + * + * @param isLookup the ISLookup service stub + * @param format the Metadata format name + * @return the string representation of the list of fields to be indexed + * + * @throws ISLookUpDocumentNotFoundException + * @throws ISLookUpException + */ + private static String getLayoutSource(final ISLookUpService isLookup, final String format) throws ISLookUpDocumentNotFoundException, ISLookUpException { + return doLookup(isLookup, String.format( + "collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='%s']//LAYOUT[@name='%s']", format, LAYOUT)); + } + + /** + * Method retrieves from the information system the openaireLayoutToRecordStylesheet + * + * @param isLookup the ISLookup service stub + * @return the string representation of the XSLT contained in the transformation rule profile + * + * @throws ISLookUpDocumentNotFoundException + * @throws ISLookUpException + */ + private static String getLayoutTransformer(ISLookUpService isLookup) throws ISLookUpException { + return doLookup(isLookup, "collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType')" + + "//RESOURCE_PROFILE[./BODY/CONFIGURATION/SCRIPT/TITLE/text() = 'openaireLayoutToRecordStylesheet']//CODE/node()"); + } + + /** + * Method retrieves from the information system the IndexDS profile ID associated to the given MDFormat name + * @param format + * @param isLookup + * @return the IndexDS identifier + * @throws ISLookUpException + */ + private static String getDsId(String format, ISLookUpService isLookup) throws ISLookUpException { + return doLookup(isLookup, String.format("collection('/db/DRIVER/IndexDSResources/IndexDSResourceType')" + + "//RESOURCE_PROFILE[./BODY/CONFIGURATION/METADATA_FORMAT/text() = '%s']//RESOURCE_IDENTIFIER/@value/string()", format)); + } + + /** + * Method retrieves from the information system the zookeeper quorum of the Solr server + * @param isLookup + * @return the zookeeper quorum of the Solr server + * @throws ISLookUpException + */ + private static String getZkHost(ISLookUpService isLookup) throws ISLookUpException { + return doLookup(isLookup, "for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()"); + } + + private static String doLookup(ISLookUpService isLookup, String xquery) throws ISLookUpException { + log.info(String.format("running xquery: %s", xquery)); + final String res = isLookup.getResourceProfileByQuery(xquery); + log.info(String.format("got response (100 chars): %s", StringUtils.left(res, 100) + " ...")); + return res; + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlRecordBuilderJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlRecordBuilderJob.java index 38bc2bae2..0b2180f19 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlRecordBuilderJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlRecordBuilderJob.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.graph; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.graph.utils.ContextMapper; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -11,30 +12,37 @@ public class SparkXmlRecordBuilderJob { public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkXmlRecordBuilderJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json"))); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkXmlRecordBuilderJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_params_build_adjacency_lists.json"))); parser.parseArgument(args); + final String master = parser.get("master"); final SparkConf conf = new SparkConf() .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - final SparkSession spark = SparkSession + try(SparkSession spark = getSession(conf, master)) { + + final String inputPath = parser.get("sourcePath"); + final String outputPath = parser.get("outputPath"); + final String isLookupUrl = parser.get("isLookupUrl"); + + final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); + if (fs.exists(new Path(outputPath))) { + fs.delete(new Path(outputPath), true); + fs.mkdirs(new Path(outputPath)); + } + + new GraphJoiner(spark, ContextMapper.fromIS(isLookupUrl), inputPath, outputPath) + .adjacencyLists(); + } + } + + private static SparkSession getSession(SparkConf conf, String master) { + return SparkSession .builder() .config(conf) .appName(SparkXmlRecordBuilderJob.class.getSimpleName()) - .master(parser.get("master")) + .master(master) .getOrCreate(); - - final String inputPath = parser.get("sourcePath"); - final String outputPath = parser.get("outputPath"); - - final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); - if (fs.exists(new Path(outputPath))) { - fs.delete(new Path(outputPath), true); - fs.mkdirs(new Path(outputPath)); - } - - new GraphJoiner(spark, inputPath, outputPath) - .adjacencyLists(); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Tuple.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Tuple.java deleted file mode 100644 index 1eb0491a7..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/Tuple.java +++ /dev/null @@ -1,29 +0,0 @@ -package eu.dnetlib.dhp.graph; - -import java.io.Serializable; - -public class Tuple implements Serializable { - - private TypedRow relation; - - private TypedRow target; - - - public TypedRow getRelation() { - return relation; - } - - public Tuple setRelation(TypedRow relation) { - this.relation = relation; - return this; - } - - public TypedRow getTarget() { - return target; - } - - public Tuple setTarget(TypedRow target) { - this.target = target; - return this; - } -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TupleWrapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TupleWrapper.java deleted file mode 100644 index eb60e1474..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TupleWrapper.java +++ /dev/null @@ -1,29 +0,0 @@ -package eu.dnetlib.dhp.graph; - -import java.io.Serializable; - -public class TupleWrapper implements Serializable { - - private TypedRow relation; - - private TypedRow target; - - - public TypedRow getRelation() { - return relation; - } - - public TupleWrapper setRelation(TypedRow relation) { - this.relation = relation; - return this; - } - - public TypedRow getTarget() { - return target; - } - - public TupleWrapper setTarget(TypedRow target) { - this.target = target; - return this; - } -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityRelEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/EntityRelEntity.java similarity index 96% rename from dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityRelEntity.java rename to dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/EntityRelEntity.java index 285cacbc0..8c08337e2 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityRelEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/EntityRelEntity.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.graph; +package eu.dnetlib.dhp.graph.model; import java.io.Serializable; @@ -15,7 +15,6 @@ public class EntityRelEntity implements Serializable { this.source = source; } - //helpers public Boolean hasMainEntity() { return getSource() != null & getRelation() == null & getTarget() == null; diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/JoinedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/JoinedEntity.java similarity index 65% rename from dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/JoinedEntity.java rename to dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/JoinedEntity.java index d65eb64c8..f89273a0d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/JoinedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/JoinedEntity.java @@ -1,11 +1,8 @@ -package eu.dnetlib.dhp.graph; +package eu.dnetlib.dhp.graph.model; import eu.dnetlib.dhp.schema.oaf.OafEntity; -import eu.dnetlib.dhp.schema.oaf.Relation; -import scala.Tuple2; import java.io.Serializable; -import java.util.List; public class JoinedEntity implements Serializable { @@ -13,7 +10,7 @@ public class JoinedEntity implements Serializable { private OafEntity entity; - private List> links; + private Links links; public String getType() { return type; @@ -33,11 +30,11 @@ public class JoinedEntity implements Serializable { return this; } - public List> getLinks() { + public Links getLinks() { return links; } - public JoinedEntity setLinks(List> links) { + public JoinedEntity setLinks(Links links) { this.links = links; return this; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/Links.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/Links.java new file mode 100644 index 000000000..96ad67b0c --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/Links.java @@ -0,0 +1,6 @@ +package eu.dnetlib.dhp.graph.model; + +import java.util.ArrayList; + +public class Links extends ArrayList { +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/RelatedEntity.java similarity index 94% rename from dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java rename to dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/RelatedEntity.java index 50b97dace..baeff1c6a 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/RelatedEntity.java @@ -1,5 +1,6 @@ -package eu.dnetlib.dhp.graph; +package eu.dnetlib.dhp.graph.model; +import eu.dnetlib.dhp.schema.oaf.Instance; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; @@ -25,6 +26,7 @@ public class RelatedEntity implements Serializable { private String codeRepositoryUrl; private Qualifier resulttype; private List collectedfrom; + private List instances; // datasource private String officialname; @@ -45,14 +47,6 @@ public class RelatedEntity implements Serializable { private Qualifier contracttype; private List fundingtree; - public static RelatedEntity parse(final String json) { - try { - return new ObjectMapper().readValue(json, RelatedEntity.class); - } catch (IOException e) { - throw new IllegalArgumentException("invalid RelatedEntity, cannot parse: " + json); - } - } - public String getId() { return id; } @@ -125,6 +119,15 @@ public class RelatedEntity implements Serializable { return this; } + public List getInstances() { + return instances; + } + + public RelatedEntity setInstances(List instances) { + this.instances = instances; + return this; + } + public String getOfficialname() { return officialname; } @@ -250,4 +253,5 @@ public class RelatedEntity implements Serializable { this.type = type; return this; } + } \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/Tuple2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/Tuple2.java new file mode 100644 index 000000000..ab965808b --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/Tuple2.java @@ -0,0 +1,28 @@ +package eu.dnetlib.dhp.graph.model; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class Tuple2 { + + private Relation relation; + + private RelatedEntity relatedEntity; + + public Relation getRelation() { + return relation; + } + + public Tuple2 setRelation(Relation relation) { + this.relation = relation; + return this; + } + + public RelatedEntity getRelatedEntity() { + return relatedEntity; + } + + public Tuple2 setRelatedEntity(RelatedEntity relatedEntity) { + this.relatedEntity = relatedEntity; + return this; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TypedRow.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/TypedRow.java similarity index 96% rename from dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TypedRow.java rename to dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/TypedRow.java index 1acbbce93..3651e28c9 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/TypedRow.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/model/TypedRow.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.graph; +package eu.dnetlib.dhp.graph.model; import java.io.Serializable; diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/ContextDef.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/ContextDef.java new file mode 100644 index 000000000..05d9456f6 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/ContextDef.java @@ -0,0 +1,51 @@ +package eu.dnetlib.dhp.graph.utils; + +import java.io.Serializable; + +public class ContextDef implements Serializable { + + private String id; + private String label; + private String name; + private String type; + + public ContextDef(final String id, final String label, final String name, final String type) { + super(); + this.setId(id); + this.setLabel(label); + this.setName(name); + this.setType(type); + } + + public String getLabel() { + return label; + } + + public void setLabel(final String label) { + this.label = label; + } + + public String getId() { + return id; + } + + public void setId(final String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public String getType() { + return type; + } + + public void setType(final String type) { + this.type = type; + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/ContextMapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/ContextMapper.java new file mode 100644 index 000000000..0c3a481d0 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/ContextMapper.java @@ -0,0 +1,45 @@ +package eu.dnetlib.dhp.graph.utils; + +import com.google.common.base.Joiner; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.Node; +import org.dom4j.io.SAXReader; + +import java.io.Serializable; +import java.io.StringReader; +import java.util.HashMap; + +public class ContextMapper extends HashMap implements Serializable { + + private static final long serialVersionUID = 2159682308502487305L; + + private final static String XQUERY = "for $x in //RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='ContextDSResourceType']//*[name()='context' or name()='category' or name()='concept'] return "; + + public static ContextMapper fromIS(final String isLookupUrl) throws DocumentException, ISLookUpException { + ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); + StringBuilder sb = new StringBuilder(""); + Joiner.on("").appendTo(sb, isLookUp.quickSearchProfile(XQUERY)); + sb.append(""); + return fromXml(sb.toString()); + } + + public static ContextMapper fromXml(final String xml) throws DocumentException { + final ContextMapper contextMapper = new ContextMapper(); + + final Document doc = new SAXReader().read(new StringReader(xml)); + for (Object o : doc.selectNodes("//entry")) { + Node node = (Node) o; + String id = node.valueOf("./@id"); + String label = node.valueOf("./@label"); + String name = node.valueOf("./@name"); + String type = node.valueOf("./@type") + ""; + + contextMapper.put(id, new ContextDef(id, label, name, type)); + } + return contextMapper; + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/GraphMappingUtils.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/GraphMappingUtils.java new file mode 100644 index 000000000..0921fe105 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/GraphMappingUtils.java @@ -0,0 +1,254 @@ +package eu.dnetlib.dhp.graph.utils; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicate; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import eu.dnetlib.dhp.graph.model.EntityRelEntity; +import eu.dnetlib.dhp.graph.model.RelatedEntity; +import eu.dnetlib.dhp.graph.model.TypedRow; +import eu.dnetlib.dhp.schema.oaf.*; +import net.minidev.json.JSONArray; +import org.apache.commons.lang3.StringUtils; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.StringUtils.*; + +public class GraphMappingUtils { + + public enum EntityType { + publication, dataset, otherresearchproduct, software, datasource, organization, project + } + + public enum MainEntityType { + result, datasource, organization, project + } + + public static Set authorPidTypes = Sets.newHashSet("orcid", "magidentifier"); + + public static Set instanceFieldFilter = Sets.newHashSet("instancetype", "hostedby", "license", "accessright", "collectedfrom", "dateofacceptance", "distributionlocation"); + + private static BiMap relClassMapping = HashBiMap.create(); + + static { + relClassMapping.put("isAuthorInstitutionOf", "hasAuthorInstitution"); + relClassMapping.put("isMergedIn", "merges"); + relClassMapping.put("isProducedBy", "produces"); + relClassMapping.put("hasParticipant", "isParticipant"); + relClassMapping.put("isProvidedBy", "provides"); + relClassMapping.put("isRelatedTo", "isRelatedTo"); + relClassMapping.put("isAmongTopNSimilarDocuments", "hasAmongTopNSimilarDocuments"); + relClassMapping.put("isRelatedTo", "isRelatedTo"); + relClassMapping.put("isSupplementTo", "isSupplementedBy"); + } + + public static String getInverseRelClass(final String relClass) { + String res = relClassMapping.get(relClass); + if (isNotBlank(res)) { + return res; + } + res = relClassMapping.inverse().get(relClass); + + if (isNotBlank(res)) { + return res; + } + + throw new IllegalArgumentException("unable to find an inverse relationship class for term: " + relClass); + } + + private static final String schemeTemplate = "dnet:%s_%s_relations"; + + private static Map entityMapping = Maps.newHashMap(); + + static { + entityMapping.put(EntityType.publication, MainEntityType.result); + entityMapping.put(EntityType.dataset, MainEntityType.result); + entityMapping.put(EntityType.otherresearchproduct, MainEntityType.result); + entityMapping.put(EntityType.software, MainEntityType.result); + entityMapping.put(EntityType.datasource, MainEntityType.datasource); + entityMapping.put(EntityType.organization, MainEntityType.organization); + entityMapping.put(EntityType.project, MainEntityType.project); + } + + public static String getScheme(final String sourceType, final String targetType) { + return String.format(schemeTemplate, + entityMapping.get(EntityType.valueOf(sourceType)).name(), + entityMapping.get(EntityType.valueOf(targetType)).name()); + } + + public static String getMainType(final String type) { + return entityMapping.get(EntityType.valueOf(type)).name(); + } + + public static boolean isResult(String type) { + return MainEntityType.result.name().equals(getMainType(type)); + } + + public static Predicate instanceFilter = s -> instanceFieldFilter.contains(s); + + public static EntityRelEntity asRelatedEntity(EntityRelEntity e) { + + final DocumentContext j = JsonPath.parse(e.getSource().getOaf()); + final RelatedEntity re = new RelatedEntity().setId(j.read("$.id")).setType(e.getSource().getType()); + + switch (EntityType.valueOf(e.getSource().getType())) { + case publication: + case dataset: + case otherresearchproduct: + case software: + mapTitle(j, re); + re.setDateofacceptance(j.read("$.dateofacceptance.value")); + re.setPublisher(j.read("$.publisher.value")); + + JSONArray pids = j.read("$.pid"); + re.setPid(pids.stream() + .map(p -> asStructuredProperty((LinkedHashMap) p)) + .collect(Collectors.toList())); + + re.setResulttype(asQualifier(j.read("$.resulttype"))); + + JSONArray collfrom = j.read("$.collectedfrom"); + re.setCollectedfrom(collfrom.stream() + .map(c -> asKV((LinkedHashMap) c)) + .collect(Collectors.toList())); + + // will throw exception when the instance is not found + JSONArray instances = j.read("$.instance"); + re.setInstances(instances.stream() + .map(i -> { + final LinkedHashMap p = (LinkedHashMap) i; + final Field license = new Field(); + license.setValue((String) ((LinkedHashMap) p.get("license")).get("value")); + final Instance instance = new Instance(); + instance.setLicense(license); + instance.setAccessright(asQualifier((LinkedHashMap) p.get("accessright"))); + instance.setInstancetype(asQualifier((LinkedHashMap) p.get("instancetype"))); + instance.setHostedby(asKV((LinkedHashMap) p.get("hostedby"))); + //TODO mapping of distributionlocation + instance.setCollectedfrom(asKV((LinkedHashMap) p.get("collectedfrom"))); + + Field dateofacceptance = new Field(); + dateofacceptance.setValue((String) ((LinkedHashMap) p.get("dateofacceptance")).get("value")); + instance.setDateofacceptance(dateofacceptance); + return instance; + }).collect(Collectors.toList())); + + //TODO still to be mapped + //re.setCodeRepositoryUrl(j.read("$.coderepositoryurl")); + + break; + case datasource: + re.setOfficialname(j.read("$.officialname.value")); + re.setWebsiteurl(j.read("$.websiteurl.value")); + re.setDatasourcetype(asQualifier(j.read("$.datasourcetype"))); + re.setOpenairecompatibility(asQualifier(j.read("$.openairecompatibility"))); + + break; + case organization: + re.setLegalname(j.read("$.legalname.value")); + re.setLegalshortname(j.read("$.legalshortname.value")); + re.setCountry(asQualifier(j.read("$.country"))); + + break; + case project: + re.setProjectTitle(j.read("$.title.value")); + re.setCode(j.read("$.code.value")); + re.setAcronym(j.read("$.acronym.value")); + re.setContracttype(asQualifier(j.read("$.contracttype"))); + + JSONArray f = j.read("$.fundingtree"); + if (!f.isEmpty()) { + re.setFundingtree(f.stream() + .map(s -> ((LinkedHashMap) s).get("value")) + .collect(Collectors.toList())); + } + + break; + } + return new EntityRelEntity().setSource( + new TypedRow() + .setSourceId(e.getSource().getSourceId()) + .setDeleted(e.getSource().getDeleted()) + .setType(e.getSource().getType()) + .setOaf(serialize(re))); + } + + private static KeyValue asKV(LinkedHashMap j) { + final KeyValue kv = new KeyValue(); + kv.setKey((String) j.get("key")); + kv.setValue((String) j.get("value")); + return kv; + } + + private static void mapTitle(DocumentContext j, RelatedEntity re) { + final JSONArray a = j.read("$.title"); + if (!a.isEmpty()) { + final StructuredProperty sp = asStructuredProperty((LinkedHashMap) a.get(0)); + if (StringUtils.isNotBlank(sp.getValue())) { + re.setTitle(sp); + } + } + } + + private static StructuredProperty asStructuredProperty(LinkedHashMap j) { + final StructuredProperty sp = new StructuredProperty(); + final String value = (String) j.get("value"); + if (StringUtils.isNotBlank(value)) { + sp.setValue((String) j.get("value")); + sp.setQualifier(asQualifier((LinkedHashMap) j.get("qualifier"))); + } + return sp; + } + + public static Qualifier asQualifier(LinkedHashMap j) { + final Qualifier q = new Qualifier(); + + final String classid = j.get("classid"); + if (StringUtils.isNotBlank(classid)) { + q.setClassid(classid); + } + + final String classname = j.get("classname"); + if (StringUtils.isNotBlank(classname)) { + q.setClassname(classname); + } + + final String schemeid = j.get("schemeid"); + if (StringUtils.isNotBlank(schemeid)) { + q.setSchemeid(schemeid); + } + + final String schemename = j.get("schemename"); + if (StringUtils.isNotBlank(schemename)) { + q.setSchemename(schemename); + } + return q; + } + + public static String serialize(final Object o) { + try { + return new ObjectMapper() + .setSerializationInclusion(JsonInclude.Include.NON_NULL) + .writeValueAsString(o); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("unable to serialize: " + o.toString(), e); + } + } + + public static String removePrefix(final String s) { + if (s.contains("|")) return substringAfter(s, "|"); + return s; + } + + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/ISLookupClientFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/ISLookupClientFactory.java new file mode 100644 index 000000000..d87f29452 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/ISLookupClientFactory.java @@ -0,0 +1,24 @@ +package eu.dnetlib.dhp.graph.utils; + +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.cxf.jaxws.JaxWsProxyFactoryBean; + +public class ISLookupClientFactory { + + private static final Log log = LogFactory.getLog(ISLookupClientFactory.class); + + public static ISLookUpService getLookUpService(final String isLookupUrl) { + return getServiceStub(ISLookUpService.class, isLookupUrl); + } + + @SuppressWarnings("unchecked") + private static T getServiceStub(final Class clazz, final String endpoint) { + log.info(String.format("creating %s stub from %s", clazz.getName(), endpoint)); + final JaxWsProxyFactoryBean jaxWsProxyFactory = new JaxWsProxyFactoryBean(); + jaxWsProxyFactory.setServiceClass(clazz); + jaxWsProxyFactory.setAddress(endpoint); + return (T) jaxWsProxyFactory.create(); + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/LicenseComparator.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/LicenseComparator.java new file mode 100644 index 000000000..c4cbfadea --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/LicenseComparator.java @@ -0,0 +1,49 @@ +package eu.dnetlib.dhp.graph.utils; + +import eu.dnetlib.dhp.schema.oaf.Qualifier; + +import java.util.Comparator; + +public class LicenseComparator implements Comparator { + + @Override + public int compare(Qualifier left, Qualifier right) { + + if (left == null && right == null) return 0; + if (left == null) return 1; + if (right == null) return -1; + + String lClass = left.getClassid(); + String rClass = right.getClassid(); + + if (lClass.equals(rClass)) return 0; + + if (lClass.equals("OPEN SOURCE")) return -1; + if (rClass.equals("OPEN SOURCE")) return 1; + + if (lClass.equals("OPEN")) return -1; + if (rClass.equals("OPEN")) return 1; + + if (lClass.equals("6MONTHS")) return -1; + if (rClass.equals("6MONTHS")) return 1; + + if (lClass.equals("12MONTHS")) return -1; + if (rClass.equals("12MONTHS")) return 1; + + if (lClass.equals("EMBARGO")) return -1; + if (rClass.equals("EMBARGO")) return 1; + + if (lClass.equals("RESTRICTED")) return -1; + if (rClass.equals("RESTRICTED")) return 1; + + if (lClass.equals("CLOSED")) return -1; + if (rClass.equals("CLOSED")) return 1; + + if (lClass.equals("UNKNOWN")) return -1; + if (rClass.equals("UNKNOWN")) return 1; + + // Else (but unlikely), lexicographical ordering will do. + return lClass.compareTo(rClass); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/StreamingInputDocumentFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/StreamingInputDocumentFactory.java new file mode 100644 index 000000000..736c9fc28 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/StreamingInputDocumentFactory.java @@ -0,0 +1,253 @@ +package eu.dnetlib.dhp.graph.utils; + +import java.io.StringReader; +import java.io.StringWriter; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import javax.xml.stream.*; +import javax.xml.stream.events.Namespace; +import javax.xml.stream.events.StartElement; +import javax.xml.stream.events.XMLEvent; + +import com.google.common.collect.Lists; +import org.apache.solr.common.SolrInputDocument; + +/** + * Optimized version of the document parser, drop in replacement of InputDocumentFactory. + * + *

+ * Faster because: + *

+ *
    + *
  • Doesn't create a DOM for the full document
  • + *
  • Doesn't execute xpaths agains the DOM
  • + *
  • Quickly serialize the 'result' element directly in a string.
  • + *
  • Uses less memory: less pressure on GC and allows more threads to process this in parallel
  • + *
+ * + *

+ * This class is fully reentrant and can be invoked in parallel. + *

+ * + * @author claudio + * + */ +public class StreamingInputDocumentFactory { + + private static final String INDEX_FIELD_PREFIX = "__"; + + private static final String DS_VERSION = INDEX_FIELD_PREFIX + "dsversion"; + + private static final String DS_ID = INDEX_FIELD_PREFIX + "dsid"; + + private static final String RESULT = "result"; + + private static final String INDEX_RESULT = INDEX_FIELD_PREFIX + RESULT; + + private static final String INDEX_RECORD_ID = INDEX_FIELD_PREFIX + "indexrecordidentifier"; + + private static final String outFormat = new String("yyyy-MM-dd'T'hh:mm:ss'Z'"); + + private final static List dateFormats = Arrays.asList("yyyy-MM-dd'T'hh:mm:ss", "yyyy-MM-dd", "dd-MM-yyyy", "dd/MM/yyyy", "yyyy"); + + private static final String DEFAULTDNETRESULT = "dnetResult"; + + private static final String TARGETFIELDS = "targetFields"; + + private static final String INDEX_RECORD_ID_ELEMENT = "indexRecordIdentifier"; + + private static final String ROOT_ELEMENT = "indexRecord"; + + private static final int MAX_FIELD_LENGTH = 25000; + + private ThreadLocal inputFactory = ThreadLocal.withInitial(() -> XMLInputFactory.newInstance()); + + private ThreadLocal outputFactory = ThreadLocal.withInitial(() -> XMLOutputFactory.newInstance()); + + private ThreadLocal eventFactory = ThreadLocal.withInitial(() -> XMLEventFactory.newInstance()); + + private String version; + + private String dsId; + + private String resultName = DEFAULTDNETRESULT; + + public StreamingInputDocumentFactory(final String version, final String dsId) { + this(version, dsId, DEFAULTDNETRESULT); + } + + public StreamingInputDocumentFactory(final String version, final String dsId, final String resultName) { + this.version = version; + this.dsId = dsId; + this.resultName = resultName; + } + + public SolrInputDocument parseDocument(final String inputDocument) { + + final StringWriter results = new StringWriter(); + final List nsList = Lists.newLinkedList(); + try { + + XMLEventReader parser = inputFactory.get().createXMLEventReader(new StringReader(inputDocument)); + + final SolrInputDocument indexDocument = new SolrInputDocument(new HashMap<>()); + + while (parser.hasNext()) { + final XMLEvent event = parser.nextEvent(); + if ((event != null) && event.isStartElement()) { + final String localName = event.asStartElement().getName().getLocalPart(); + + if (ROOT_ELEMENT.equals(localName)) { + nsList.addAll(getNamespaces(event)); + } else if (INDEX_RECORD_ID_ELEMENT.equals(localName)) { + final XMLEvent text = parser.nextEvent(); + String recordId = getText(text); + indexDocument.addField(INDEX_RECORD_ID, recordId); + } else if (TARGETFIELDS.equals(localName)) { + parseTargetFields(indexDocument, parser); + } else if (resultName.equals(localName)) { + copyResult(indexDocument, results, parser, nsList, resultName); + } + } + } + + if (version != null) { + indexDocument.addField(DS_VERSION, version); + } + + if (dsId != null) { + indexDocument.addField(DS_ID, dsId); + } + + if (!indexDocument.containsKey(INDEX_RECORD_ID)) { + indexDocument.clear(); + System.err.println("missing indexrecord id:\n" + inputDocument); + } + + return indexDocument; + } catch (XMLStreamException e) { + return new SolrInputDocument(); + } + } + + private List getNamespaces(final XMLEvent event) { + final List res = Lists.newLinkedList(); + @SuppressWarnings("unchecked") + Iterator nsIter = event.asStartElement().getNamespaces(); + while (nsIter.hasNext()) { + Namespace ns = nsIter.next(); + res.add(ns); + } + return res; + } + + /** + * Parse the targetFields block and add fields to the solr document. + * + * @param indexDocument + * @param parser + * @throws XMLStreamException + */ + protected void parseTargetFields(final SolrInputDocument indexDocument, final XMLEventReader parser) throws XMLStreamException { + + boolean hasFields = false; + + while (parser.hasNext()) { + final XMLEvent targetEvent = parser.nextEvent(); + if (targetEvent.isEndElement() && targetEvent.asEndElement().getName().getLocalPart().equals(TARGETFIELDS)) { + break; + } + + if (targetEvent.isStartElement()) { + final String fieldName = targetEvent.asStartElement().getName().getLocalPart(); + final XMLEvent text = parser.nextEvent(); + + String data = getText(text); + + addField(indexDocument, fieldName, data); + hasFields = true; + } + } + + if (!hasFields) { + indexDocument.clear(); + } + } + + /** + * Copy the /indexRecord/result element and children, preserving namespace declarations etc. + * + * @param indexDocument + * @param results + * @param parser + * @param nsList + * @throws XMLStreamException + */ + protected void copyResult(final SolrInputDocument indexDocument, + final StringWriter results, + final XMLEventReader parser, + final List nsList, + final String dnetResult) throws XMLStreamException { + final XMLEventWriter writer = outputFactory.get().createXMLEventWriter(results); + + for (Namespace ns : nsList) { + eventFactory.get().createNamespace(ns.getPrefix(), ns.getNamespaceURI()); + } + + StartElement newRecord = eventFactory.get().createStartElement("", null, RESULT, null, nsList.iterator()); + + // new root record + writer.add(newRecord); + + // copy the rest as it is + while (parser.hasNext()) { + final XMLEvent resultEvent = parser.nextEvent(); + + // TODO: replace with depth tracking instead of close tag tracking. + if (resultEvent.isEndElement() && resultEvent.asEndElement().getName().getLocalPart().equals(dnetResult)) { + writer.add(eventFactory.get().createEndElement("", null, RESULT)); + break; + } + + writer.add(resultEvent); + } + writer.close(); + indexDocument.addField(INDEX_RESULT, results.toString()); + } + + /** + * Helper used to add a field to a solr doc. It avoids to add empy fields + * + * @param indexDocument + * @param field + * @param value + */ + private final void addField(final SolrInputDocument indexDocument, final String field, final String value) { + String cleaned = value.trim(); + if (!cleaned.isEmpty()) { + // log.info("\n\n adding field " + field.toLowerCase() + " value: " + cleaned + "\n"); + indexDocument.addField(field.toLowerCase(), cleaned); + } + } + + /** + * Helper used to get the string from a text element. + * + * @param text + * @return the + */ + protected final String getText(final XMLEvent text) { + if (text.isEndElement()) // log.warn("skipping because isEndOfElement " + text.asEndElement().getName().getLocalPart()); + return ""; + + final String data = text.asCharacters().getData(); + if (data != null && data.length() > MAX_FIELD_LENGTH) { + return data.substring(0, MAX_FIELD_LENGTH); + } + + return data; + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/TemplateFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/TemplateFactory.java new file mode 100644 index 000000000..27c55fab7 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/TemplateFactory.java @@ -0,0 +1,107 @@ +package eu.dnetlib.dhp.graph.utils; + +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import org.apache.commons.lang3.StringUtils; +import org.stringtemplate.v4.ST; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.graph.utils.GraphMappingUtils.removePrefix; +import static eu.dnetlib.dhp.graph.utils.XmlSerializationUtils.escapeXml; + +public class TemplateFactory { + + private TemplateResources resources; + + private final static char DELIMITER = '$'; + + public TemplateFactory() { + try { + resources = new TemplateResources(); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + public String buildBody(final String type, final List metadata, final List rels, final List children, final List extraInfo) { + ST body = getTemplate(resources.getEntity()); + + body.add("name", type); + body.add("metadata", metadata); + body.add("rels", rels); + body.add("children", children); + body.add("extrainfo", extraInfo); + + return body.render(); + } + + public String getChild(final String name, final String id, final List metadata) { + return getTemplate(resources.getChild()) + .add("name", name) + .add("hasId", !(id == null)) + .add("id", id != null ? escapeXml(removePrefix(id)) : "") + .add("metadata", metadata) + .render(); + } + + public String buildRecord( + final OafEntity entity, + final String schemaLocation, + final String body) { + return getTemplate(resources.getRecord()) + .add("id", escapeXml(removePrefix(entity.getId()))) + .add("dateofcollection", entity.getDateofcollection()) + .add("dateoftransformation", entity.getDateoftransformation()) + .add("schemaLocation", schemaLocation) + .add("it", body) + .render(); + } + + public String getRel(final String type, + final String objIdentifier, + final Collection fields, + final String semanticclass, + final String semantischeme, + final DataInfo info) { + return getTemplate(resources.getRel()) + .add("type", type) + .add("objIdentifier", escapeXml(removePrefix(objIdentifier))) + .add("class", semanticclass) + .add("scheme", semantischeme) + .add("metadata", fields) + .add("inferred", info.getInferred()) + .add("trust", info.getTrust()) + .add("inferenceprovenance", info.getInferenceprovenance()) + .add("provenanceaction", info.getProvenanceaction() != null ? info.getProvenanceaction().getClassid() : "") + .render(); + } + + public String getInstance(final String resultId, final List instancemetadata, final List webresources) { + return getTemplate(resources.getInstance()) + .add("instanceId", escapeXml(removePrefix(resultId))) + .add("metadata", instancemetadata) + .add("webresources", webresources + .stream() + .filter(StringUtils::isNotBlank) + .map(w -> getWebResource(w)) + .collect(Collectors.toList())) + .render(); + } + + private String getWebResource(final String identifier) { + return getTemplate(resources.getWebresource()) + .add("identifier", escapeXml(identifier)) + .render(); + } + + // HELPERS + + private ST getTemplate(final String res) { + return new ST(res, DELIMITER, DELIMITER); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/TemplateResources.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/TemplateResources.java new file mode 100644 index 000000000..92aaedfd3 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/TemplateResources.java @@ -0,0 +1,54 @@ +package eu.dnetlib.dhp.graph.utils; + +import com.google.common.io.Resources; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class TemplateResources { + + private String record = read("eu/dnetlib/dhp/graph/template/record.st"); + + private String instance = read("eu/dnetlib/dhp/graph/template/instance.st"); + + private String rel = read("eu/dnetlib/dhp/graph/template/rel.st"); + + private String webresource = read("eu/dnetlib/dhp/graph/template/webresource.st"); + + private String child = read("eu/dnetlib/dhp/graph/template/child.st"); + + private String entity = read("eu/dnetlib/dhp/graph/template/entity.st"); + + private static String read(final String classpathResource) throws IOException { + return Resources.toString(Resources.getResource(classpathResource), StandardCharsets.UTF_8); + } + + public TemplateResources() throws IOException { + + } + + public String getEntity() { + return entity; + } + + public String getRecord() { + return record; + } + + public String getInstance() { + return instance; + } + + public String getRel() { + return rel; + } + + public String getWebresource() { + return webresource; + } + + public String getChild() { + return child; + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/XmlRecordFactory.java new file mode 100644 index 000000000..bd4f8ec6c --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/XmlRecordFactory.java @@ -0,0 +1,962 @@ +package eu.dnetlib.dhp.graph.utils; + +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.mycila.xmltool.XMLDoc; +import com.mycila.xmltool.XMLTag; +import eu.dnetlib.dhp.graph.model.JoinedEntity; +import eu.dnetlib.dhp.graph.model.RelatedEntity; +import eu.dnetlib.dhp.graph.model.Tuple2; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.*; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.Element; +import org.dom4j.Node; +import org.dom4j.io.OutputFormat; +import org.dom4j.io.SAXReader; +import org.dom4j.io.XMLWriter; + +import javax.xml.transform.*; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import java.io.IOException; +import java.io.Serializable; +import java.io.StringReader; +import java.io.StringWriter; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.graph.utils.GraphMappingUtils.*; +import static eu.dnetlib.dhp.graph.utils.XmlSerializationUtils.*; +import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.commons.lang3.StringUtils.substringBefore; + +public class XmlRecordFactory implements Serializable { + + private Set specialDatasourceTypes; + + private ContextMapper contextMapper; + + private String schemaLocation; + + private Set contextes = Sets.newHashSet(); + + private boolean indent = false; + + public XmlRecordFactory( + final ContextMapper contextMapper, final boolean indent, + final String schemaLocation, final Set otherDatasourceTypesUForUI) { + + this.contextMapper = contextMapper; + this.schemaLocation = schemaLocation; + this.specialDatasourceTypes = otherDatasourceTypesUForUI; + + this.indent = indent; + } + + public String build(final JoinedEntity je) { + final OafEntity entity = je.getEntity(); + TemplateFactory templateFactory = new TemplateFactory(); + try { + final List metadata = metadata(je.getType(), entity); + + // rels has to be processed before the contexts because they enrich the contextMap with the funding info. + final List relations = listRelations(je, templateFactory); + + metadata.addAll(buildContexts(getMainType(je.getType()))); + metadata.add(parseDataInfo(entity.getDataInfo())); + + final String body = templateFactory.buildBody( + getMainType(je.getType()), + metadata, + relations, + listChildren(je, templateFactory), listExtraInfo(je)); + + return printXML(templateFactory.buildRecord(entity, schemaLocation, body), indent); + } catch (final Throwable e) { + throw new RuntimeException(String.format("error building record '%s'", entity.getId()), e); + } + } + + private String printXML(String xml, boolean indent) { + try { + final Document doc = new SAXReader().read(new StringReader(xml)); + OutputFormat format = indent ? OutputFormat.createPrettyPrint() : OutputFormat.createCompactFormat(); + format.setExpandEmptyElements(false); + format.setSuppressDeclaration(true); + StringWriter sw = new StringWriter(); + XMLWriter writer = new XMLWriter(sw, format); + writer.write(doc); + return sw.toString(); + } catch (IOException | DocumentException e) { + throw new IllegalArgumentException("Unable to indent XML. Invalid record:\n" + xml, e); + } + } + + private List metadata(final String type, final OafEntity entity) { + + final List metadata = Lists.newArrayList(); + + if (entity.getCollectedfrom() != null) { + metadata.addAll(entity.getCollectedfrom() + .stream() + .map(kv -> mapKeyValue("collectedfrom", kv)) + .collect(Collectors.toList())); + } + if (entity.getOriginalId() != null) { + metadata.addAll(entity.getOriginalId() + .stream() + .map(s -> asXmlElement("originalId", s)) + .collect(Collectors.toList())); + } + if (entity.getPid() != null) { + metadata.addAll(entity.getPid() + .stream() + .map(p -> mapStructuredProperty("pid", p)) + .collect(Collectors.toList())); + } + + if (GraphMappingUtils.isResult(type)) { + final Result r = (Result) entity; + + if (r.getTitle() != null) { + metadata.addAll(r.getTitle() + .stream() + .map(t -> mapStructuredProperty("title", t)) + .collect(Collectors.toList())); + } + if (r.getAuthor() != null) { + metadata.addAll(r.getAuthor() + .stream() + .map(a -> { + final StringBuilder sb = new StringBuilder(" isNotBlank(sp.getQualifier().getClassid()) && isNotBlank(sp.getValue())) + .forEach(sp -> { + String pidType = escapeXml(sp.getQualifier().getClassid()).replaceAll("\\W", ""); + String pidValue = escapeXml(sp.getValue()); + + // ugly hack: some records provide swapped pidtype and pidvalue + if (authorPidTypes.contains(pidValue.toLowerCase().trim())) { + sb.append(String.format(" %s=\"%s\"", pidValue, pidType)); + } else { + pidType = pidType.replaceAll("\\W", "").replaceAll("\\d", ""); + if (isNotBlank(pidType)) { + sb.append(String.format(" %s=\"%s\"", + pidType, + pidValue.toLowerCase().replaceAll("orcid", ""))); + } + } + }); + } + sb.append(">" + escapeXml(a.getFullname()) + ""); + return sb.toString(); + }).collect(Collectors.toList())); + } + if (r.getContributor() != null) { + metadata.addAll(r.getContributor() + .stream() + .map(c -> asXmlElement("contributor", c.getValue())) + .collect(Collectors.toList())); + } + if (r.getCountry() != null) { + metadata.addAll(r.getCountry() + .stream() + .map(c -> mapQualifier("country", c)) + .collect(Collectors.toList())); + } + if (r.getCoverage() != null) { + metadata.addAll(r.getCoverage() + .stream() + .map(c -> asXmlElement("coverage", c.getValue())) + .collect(Collectors.toList())); + } + if (r.getDateofacceptance() != null) { + metadata.add(asXmlElement("dateofacceptance", r.getDateofacceptance().getValue())); + } + if (r.getDescription() != null) { + metadata.addAll(r.getDescription() + .stream() + .map(c -> asXmlElement("description", c.getValue())) + .collect(Collectors.toList())); + } + if (r.getEmbargoenddate() != null) { + metadata.add(asXmlElement("embargoenddate", r.getEmbargoenddate().getValue())); + } + if (r.getSubject() != null) { + metadata.addAll(r.getSubject() + .stream() + .map(s -> mapStructuredProperty("subject", s)) + .collect(Collectors.toList())); + } + if (r.getLanguage() != null) { + metadata.add(mapQualifier("language", r.getLanguage())); + } + if (r.getRelevantdate() != null) { + metadata.addAll(r.getRelevantdate() + .stream() + .map(s -> mapStructuredProperty("relevantdate", s)) + .collect(Collectors.toList())); + } + if (r.getPublisher() != null) { + metadata.add(asXmlElement("publisher", r.getPublisher().getValue())); + } + if (r.getSource() != null) { + metadata.addAll(r.getSource() + .stream() + .map(c -> asXmlElement("source", c.getValue())) + .collect(Collectors.toList())); + } + if (r.getFormat() != null) { + metadata.addAll(r.getFormat() + .stream() + .map(c -> asXmlElement("format", c.getValue())) + .collect(Collectors.toList())); + } + if (r.getResulttype() != null) { + metadata.add(mapQualifier("resulttype", r.getResulttype())); + } + if (r.getResourcetype() != null) { + metadata.add(mapQualifier("resourcetype", r.getResourcetype())); + } + if (r.getRefereed() != null) { + metadata.add(asXmlElement("refereed", r.getRefereed().getValue())); + } + if (r.getProcessingchargeamount() != null) { + metadata.add(asXmlElement("processingchargeamount", r.getProcessingchargeamount().getValue())); + } + if (r.getProcessingchargecurrency() != null) { + metadata.add(asXmlElement("processingchargecurrency", r.getProcessingchargecurrency().getValue())); + } + + metadata.add(mapQualifier("bestaccessright", getBestAccessright(r))); + + if (r.getContext() != null) { + contextes.addAll(r.getContext() + .stream() + .map(c -> c.getId()) + .collect(Collectors.toList())); + if (contextes.contains("dh-ch::subcommunity::2")) { + contextes.add("clarin"); + } + } + } + + switch (EntityType.valueOf(type)) { + case publication: + final Publication pub = (Publication) entity; + + if (pub.getJournal() != null) { + final Journal j = pub.getJournal(); + metadata.add(mapJournal(j)); + } + + break; + case dataset: + final Dataset d = (Dataset) entity; + if (d.getDevice() != null) { + metadata.add(asXmlElement("device", d.getDevice().getValue())); + } + if (d.getLastmetadataupdate() != null) { + metadata.add(asXmlElement("lastmetadataupdate", d.getLastmetadataupdate().getValue())); + } + if (d.getMetadataversionnumber() != null) { + metadata.add(asXmlElement("metadataversionnumber", d.getMetadataversionnumber().getValue())); + } + if (d.getSize() != null) { + metadata.add(asXmlElement("size", d.getSize().getValue())); + } + if (d.getStoragedate() != null) { + metadata.add(asXmlElement("storagedate", d.getStoragedate().getValue())); + } + if (d.getVersion() != null) { + metadata.add(asXmlElement("version", d.getVersion().getValue())); + } + //TODO d.getGeolocation() + + break; + case otherresearchproduct: + final OtherResearchProduct orp = (OtherResearchProduct) entity; + + if (orp.getContactperson() != null) { + metadata.addAll(orp.getContactperson() + .stream() + .map(c -> asXmlElement("contactperson", c.getValue())) + .collect(Collectors.toList())); + } + + if (orp.getContactgroup() != null) { + metadata.addAll(orp.getContactgroup() + .stream() + .map(c -> asXmlElement("contactgroup", c.getValue())) + .collect(Collectors.toList())); + } + if (orp.getTool() != null) { + metadata.addAll(orp.getTool() + .stream() + .map(c -> asXmlElement("tool", c.getValue())) + .collect(Collectors.toList())); + } + break; + case software: + final Software s = (Software) entity; + + if (s.getDocumentationUrl() != null) { + metadata.addAll(s.getDocumentationUrl() + .stream() + .map(c -> asXmlElement("documentationUrl", c.getValue())) + .collect(Collectors.toList())); + } + if (s.getLicense() != null) { + metadata.addAll(s.getLicense() + .stream() + .map(l -> mapStructuredProperty("license", l)) + .collect(Collectors.toList())); + } + if (s.getCodeRepositoryUrl() != null) { + metadata.add(asXmlElement("codeRepositoryUrl", s.getCodeRepositoryUrl().getValue())); + } + if (s.getProgrammingLanguage() != null) { + metadata.add(mapQualifier("programmingLanguage", s.getProgrammingLanguage())); + } + break; + case datasource: + final Datasource ds = (Datasource) entity; + + if (ds.getDatasourcetype() != null) { + mapDatasourceType(metadata, ds.getDatasourcetype()); + } + if (ds.getOpenairecompatibility() != null) { + metadata.add(mapQualifier("openairecompatibility", ds.getOpenairecompatibility())); + } + if (ds.getOfficialname() != null) { + metadata.add(asXmlElement("officialname", ds.getOfficialname().getValue())); + } + if (ds.getEnglishname() != null) { + metadata.add(asXmlElement("englishname", ds.getEnglishname().getValue())); + } + if (ds.getWebsiteurl() != null) { + metadata.add(asXmlElement("websiteurl", ds.getWebsiteurl().getValue())); + } + if (ds.getLogourl() != null) { + metadata.add(asXmlElement("logourl", ds.getLogourl().getValue())); + } + if (ds.getContactemail() != null) { + metadata.add(asXmlElement("contactemail", ds.getContactemail().getValue())); + } + if (ds.getNamespaceprefix() != null) { + metadata.add(asXmlElement("namespaceprefix", ds.getNamespaceprefix().getValue())); + } + if (ds.getLatitude() != null) { + metadata.add(asXmlElement("latitude", ds.getLatitude().getValue())); + } + if (ds.getLongitude() != null) { + metadata.add(asXmlElement("longitude", ds.getLongitude().getValue())); + } + if (ds.getDateofvalidation() != null) { + metadata.add(asXmlElement("dateofvalidation", ds.getDateofvalidation().getValue())); + } + if (ds.getDescription() != null) { + metadata.add(asXmlElement("description", ds.getDescription().getValue())); + } + if (ds.getOdnumberofitems() != null) { + metadata.add(asXmlElement("odnumberofitems", ds.getOdnumberofitems().getValue())); + } + if (ds.getOdnumberofitemsdate() != null) { + metadata.add(asXmlElement("odnumberofitemsdate", ds.getOdnumberofitemsdate().getValue())); + } + if (ds.getOdpolicies() != null) { + metadata.add(asXmlElement("odpolicies", ds.getOdpolicies().getValue())); + } + if (ds.getOdlanguages() != null) { + metadata.addAll(ds.getOdlanguages() + .stream() + .map(c -> asXmlElement("odlanguages", c.getValue())) + .collect(Collectors.toList())); + } + if (ds.getOdcontenttypes() != null) { + metadata.addAll(ds.getOdcontenttypes() + .stream() + .map(c -> asXmlElement("odcontenttypes", c.getValue())) + .collect(Collectors.toList())); + } + if (ds.getAccessinfopackage() != null) { + metadata.addAll(ds.getAccessinfopackage() + .stream() + .map(c -> asXmlElement("accessinfopackage", c.getValue())) + .collect(Collectors.toList())); + } + if (ds.getReleaseenddate() != null) { + metadata.add(asXmlElement("releasestartdate", ds.getReleaseenddate().getValue())); + } + if (ds.getReleaseenddate() != null) { + metadata.add(asXmlElement("releaseenddate", ds.getReleaseenddate().getValue())); + } + if (ds.getMissionstatementurl() != null) { + metadata.add(asXmlElement("missionstatementurl", ds.getMissionstatementurl().getValue())); + } + if (ds.getDataprovider() != null) { + metadata.add(asXmlElement("dataprovider", ds.getDataprovider().getValue().toString())); + } + if (ds.getServiceprovider() != null) { + metadata.add(asXmlElement("serviceprovider", ds.getServiceprovider().getValue().toString())); + } + if (ds.getDatabaseaccesstype() != null) { + metadata.add(asXmlElement("databaseaccesstype", ds.getDatabaseaccesstype().getValue())); + } + if (ds.getDatauploadtype() != null) { + metadata.add(asXmlElement("datauploadtype", ds.getDatauploadtype().getValue())); + } + if (ds.getDatabaseaccessrestriction() != null) { + metadata.add(asXmlElement("databaseaccessrestriction", ds.getDatabaseaccessrestriction().getValue())); + } + if (ds.getDatauploadrestriction() != null) { + metadata.add(asXmlElement("datauploadrestriction", ds.getDatauploadrestriction().getValue())); + } + if (ds.getVersioning() != null) { + metadata.add(asXmlElement("versioning", ds.getVersioning().getValue().toString())); + } + if (ds.getCitationguidelineurl() != null) { + metadata.add(asXmlElement("citationguidelineurl", ds.getCitationguidelineurl().getValue())); + } + if (ds.getQualitymanagementkind() != null) { + metadata.add(asXmlElement("qualitymanagementkind", ds.getQualitymanagementkind().getValue())); + } + if (ds.getPidsystems() != null) { + metadata.add(asXmlElement("pidsystems", ds.getPidsystems().getValue())); + } + if (ds.getCertificates() != null) { + metadata.add(asXmlElement("certificates", ds.getCertificates().getValue())); + } + if (ds.getPolicies() != null) { + metadata.addAll(ds.getPolicies() + .stream() + .map(kv -> mapKeyValue("policies", kv)) + .collect(Collectors.toList())); + } + if (ds.getJournal() != null) { + metadata.add(mapJournal(ds.getJournal())); + } + if (ds.getSubjects() != null) { + metadata.addAll(ds.getSubjects() + .stream() + .map(sp -> mapStructuredProperty("subject", sp)) + .collect(Collectors.toList())); + } + + break; + case organization: + final Organization o = (Organization) entity; + + if (o.getLegalshortname() != null) { + metadata.add(asXmlElement("legalshortname", o.getLegalshortname().getValue())); + } + if (o.getLegalname() != null) { + metadata.add(asXmlElement("legalname", o.getLegalname().getValue())); + } + if (o.getAlternativeNames() != null) { + metadata.addAll(o.getAlternativeNames() + .stream() + .map(c -> asXmlElement("alternativeNames", c.getValue())) + .collect(Collectors.toList())); + } + if (o.getWebsiteurl() != null) { + metadata.add(asXmlElement("websiteurl", o.getWebsiteurl().getValue())); + } + if (o.getLogourl() != null) { + metadata.add(asXmlElement("websiteurl", o.getLogourl().getValue())); + } + + if (o.getEclegalbody() != null) { + metadata.add(asXmlElement("eclegalbody", o.getEclegalbody().getValue())); + } + if (o.getEclegalperson() != null) { + metadata.add(asXmlElement("eclegalperson", o.getEclegalperson().getValue())); + } + if (o.getEcnonprofit() != null) { + metadata.add(asXmlElement("ecnonprofit", o.getEcnonprofit().getValue())); + } + if (o.getEcresearchorganization() != null) { + metadata.add(asXmlElement("ecresearchorganization", o.getEcresearchorganization().getValue())); + } + if (o.getEchighereducation() != null) { + metadata.add(asXmlElement("echighereducation", o.getEchighereducation().getValue())); + } + if (o.getEcinternationalorganization() != null) { + metadata.add(asXmlElement("ecinternationalorganizationeurinterests", o.getEcinternationalorganization().getValue())); + } + if (o.getEcinternationalorganization() != null) { + metadata.add(asXmlElement("ecinternationalorganization", o.getEcinternationalorganization().getValue())); + } + if (o.getEcenterprise() != null) { + metadata.add(asXmlElement("ecenterprise", o.getEcenterprise().getValue())); + } + if (o.getEcsmevalidated() != null) { + metadata.add(asXmlElement("ecsmevalidated", o.getEcsmevalidated().getValue())); + } + if (o.getEcnutscode() != null) { + metadata.add(asXmlElement("ecnutscode", o.getEcnutscode().getValue())); + } + if (o.getCountry() != null) { + metadata.add(mapQualifier("country", o.getCountry())); + } + + break; + case project: + + final Project p = (Project) entity; + + if (p.getWebsiteurl() != null) { + metadata.add(asXmlElement("websiteurl", p.getWebsiteurl().getValue())); + } + if (p.getCode() != null) { + metadata.add(asXmlElement("code", p.getCode().getValue())); + } + if (p.getAcronym() != null) { + metadata.add(asXmlElement("acronym", p.getAcronym().getValue())); + } + if (p.getTitle() != null) { + metadata.add(asXmlElement("title", p.getTitle().getValue())); + } + if (p.getStartdate() != null) { + metadata.add(asXmlElement("startdate", p.getStartdate().getValue())); + } + if (p.getEnddate() != null) { + metadata.add(asXmlElement("enddate", p.getEnddate().getValue())); + } + if (p.getCallidentifier() != null) { + metadata.add(asXmlElement("callidentifier", p.getCallidentifier().getValue())); + } + if (p.getKeywords() != null) { + metadata.add(asXmlElement("keywords", p.getKeywords().getValue())); + } + if (p.getDuration() != null) { + metadata.add(asXmlElement("duration", p.getDuration().getValue())); + } + if (p.getEcsc39() != null) { + metadata.add(asXmlElement("ecsc39", p.getEcsc39().getValue())); + } + if (p.getEcarticle29_3() != null) { + metadata.add(asXmlElement("ecarticle29_3", p.getEcarticle29_3().getValue())); + } + if (p.getSubjects() != null) { + metadata.addAll(p.getSubjects() + .stream() + .map(sp -> mapStructuredProperty("subject", sp)) + .collect(Collectors.toList())); + } + if (p.getContracttype() != null) { + metadata.add(mapQualifier("contracttype", p.getContracttype())); + } + if (p.getEcsc39() != null) { + metadata.add(asXmlElement("ecsc39", p.getEcsc39().getValue())); + } + if (p.getContactfullname() != null) { + metadata.add(asXmlElement("contactfullname", p.getContactfullname().getValue())); + } + if (p.getContactfax() != null) { + metadata.add(asXmlElement("contactfax", p.getContactfax().getValue())); + } + if (p.getContactphone() != null) { + metadata.add(asXmlElement("contactphone", p.getContactphone().getValue())); + } + if (p.getContactemail() != null) { + metadata.add(asXmlElement("contactemail", p.getContactemail().getValue())); + } + if (p.getSummary() != null) { + metadata.add(asXmlElement("summary", p.getSummary().getValue())); + } + if (p.getCurrency() != null) { + metadata.add(asXmlElement("currency", p.getCurrency().getValue())); + } + if (p.getTotalcost() != null) { + metadata.add(asXmlElement("totalcost", p.getTotalcost().toString())); + } + if (p.getFundedamount() != null) { + metadata.add(asXmlElement("fundedamount", p.getFundedamount().toString())); + } + if (p.getFundingtree() != null) { + metadata.addAll(p.getFundingtree() + .stream() + .map(ft -> asXmlElement("fundingtree", ft.getValue())) + .collect(Collectors.toList())); + } + + break; + default: + throw new IllegalArgumentException("invalid entity type: " + type); + } + + return metadata; + } + + private void mapDatasourceType(List metadata, final Qualifier dsType) { + metadata.add(mapQualifier("datasourcetype", dsType)); + + if (specialDatasourceTypes.contains(dsType.getClassid())) { + dsType.setClassid("other"); + dsType.setClassname("other"); + } + metadata.add(mapQualifier("datasourcetypeui", dsType)); + } + + private Qualifier getBestAccessright(final Result r) { + Qualifier bestAccessRight = new Qualifier(); + bestAccessRight.setClassid("UNKNOWN"); + bestAccessRight.setClassname("not available"); + bestAccessRight.setSchemeid("dnet:access_modes"); + bestAccessRight.setSchemename("dnet:access_modes"); + + final LicenseComparator lc = new LicenseComparator(); + for (final Instance instance : r.getInstance()) { + if (lc.compare(bestAccessRight, instance.getAccessright()) > 0) { + bestAccessRight = instance.getAccessright(); + } + } + return bestAccessRight; + } + + private List listRelations(final JoinedEntity je, TemplateFactory templateFactory) { + final List rels = Lists.newArrayList(); + + for (final Tuple2 link : je.getLinks()) { + + final Relation rel = link.getRelation(); + final RelatedEntity re = link.getRelatedEntity(); + final String targetType = link.getRelatedEntity().getType(); + + final List metadata = Lists.newArrayList(); + switch (EntityType.valueOf(targetType)) { + case publication: + case dataset: + case otherresearchproduct: + case software: + if (re.getTitle() != null && isNotBlank(re.getTitle().getValue())) { + metadata.add(mapStructuredProperty("title", re.getTitle())); + } + if (isNotBlank(re.getDateofacceptance())) { + metadata.add(asXmlElement("dateofacceptance", re.getDateofacceptance())); + } + if (isNotBlank(re.getPublisher())) { + metadata.add(asXmlElement("publisher", re.getPublisher())); + } + if (isNotBlank(re.getCodeRepositoryUrl())) { + metadata.add(asXmlElement("coderepositoryurl", re.getCodeRepositoryUrl())); + } + if (re.getResulttype() != null & !re.getResulttype().isBlank()) { + metadata.add(mapQualifier("resulttype", re.getResulttype())); + } + if (re.getCollectedfrom() != null) { + metadata.addAll(re.getCollectedfrom() + .stream() + .map(kv -> mapKeyValue("collectedfrom", kv)) + .collect(Collectors.toList())); + } + if (re.getPid() != null) { + metadata.addAll(re.getPid() + .stream() + .map(p -> mapStructuredProperty("pid", p)) + .collect(Collectors.toList())); + } + break; + case datasource: + if (isNotBlank(re.getOfficialname())) { + metadata.add(asXmlElement("officialname", re.getOfficialname())); + } + if (re.getDatasourcetype() != null & !re.getDatasourcetype().isBlank()) { + mapDatasourceType(metadata, re.getDatasourcetype()); + } + if (re.getOpenairecompatibility() != null & !re.getOpenairecompatibility().isBlank()) { + metadata.add(mapQualifier("openairecompatibility", re.getOpenairecompatibility())); + } + break; + case organization: + if (isNotBlank(re.getLegalname())) { + metadata.add(asXmlElement("legalname", re.getLegalname())); + } + if (isNotBlank(re.getLegalshortname())) { + metadata.add(asXmlElement("legalshortname", re.getLegalshortname())); + } + if (re.getCountry() != null & !re.getCountry().isBlank()) { + metadata.add(mapQualifier("country", re.getCountry())); + } + break; + case project: + if (isNotBlank(re.getProjectTitle())) { + metadata.add(asXmlElement("title", re.getProjectTitle())); + } + if (isNotBlank(re.getCode())) { + metadata.add(asXmlElement("code", re.getCode())); + } + if (isNotBlank(re.getAcronym())) { + metadata.add(asXmlElement("acronym", re.getAcronym())); + } + if (re.getContracttype() != null & !re.getContracttype().isBlank()) { + metadata.add(mapQualifier("contracttype", re.getContracttype())); + } + if (re.getFundingtree() != null) { + metadata.addAll(re.getFundingtree() + .stream() + .peek(ft -> fillContextMap(ft)) + .map(ft -> getRelFundingTree(ft)) + .collect(Collectors.toList())); + } + break; + default: + throw new IllegalArgumentException("invalid target type: " + targetType); + + } + final DataInfo info = rel.getDataInfo(); + + rels.add(templateFactory.getRel( + targetType, + rel.getTarget(), + Sets.newHashSet(metadata), + getInverseRelClass(rel.getRelClass()), + getScheme(targetType, re.getType()), + info)); + } + return rels; + } + + private List listChildren(final JoinedEntity je, TemplateFactory templateFactory) { + + final List children = Lists.newArrayList(); + + if (MainEntityType.result.toString().equals(getMainType(je.getType()))) { + final List instances = ((Result) je.getEntity()).getInstance(); + if (instances != null) { + for (final Instance instance : ((Result) je.getEntity()).getInstance()) { + + final List fields = Lists.newArrayList(); + + if (instance.getAccessright() != null && !instance.getAccessright().isBlank()) { + fields.add(mapQualifier("accessright", instance.getAccessright())); + } + if (instance.getCollectedfrom() != null) { + fields.add(mapKeyValue("collectedfrom", instance.getCollectedfrom())); + } + if (instance.getHostedby() != null) { + fields.add(mapKeyValue("hostedby", instance.getHostedby())); + } + if (instance.getDateofacceptance() != null && isNotBlank(instance.getDateofacceptance().getValue())) { + fields.add(asXmlElement("dateofacceptance", instance.getDateofacceptance().getValue())); + } + if (instance.getInstancetype() != null && !instance.getInstancetype().isBlank()) { + fields.add(mapQualifier("instancetype", instance.getInstancetype())); + } + if (isNotBlank(instance.getDistributionlocation())) { + fields.add(asXmlElement("distributionlocation", instance.getDistributionlocation())); + } + + children.add(templateFactory.getInstance(instance.getHostedby().getKey(), fields, instance.getUrl())); + } + } + final List ext = ((Result) je.getEntity()).getExternalReference(); + if (ext != null) { + for (final ExternalReference er : ((Result) je.getEntity()).getExternalReference()) { + + final List fields = Lists.newArrayList(); + + if (isNotBlank(er.getSitename())) { + fields.add(asXmlElement("sitename", er.getSitename())); + } + if (isNotBlank(er.getLabel())) { + fields.add(asXmlElement("label", er.getLabel())); + } + if (isNotBlank(er.getUrl())) { + fields.add(asXmlElement("url", er.getUrl())); + } + if (isNotBlank(er.getDescription())) { + fields.add(asXmlElement("description", er.getDescription())); + } + if (isNotBlank(er.getUrl())) { + fields.add(mapQualifier("qualifier", er.getQualifier())); + } + if (isNotBlank(er.getRefidentifier())) { + fields.add(asXmlElement("refidentifier", er.getRefidentifier())); + } + if (isNotBlank(er.getQuery())) { + fields.add(asXmlElement("query", er.getQuery())); + } + + children.add(templateFactory.getChild("externalreference", null, fields)); + } + } + } + + return children; + } + + private List listExtraInfo(JoinedEntity je) { + final List extraInfo = je.getEntity().getExtraInfo(); + return extraInfo != null ? extraInfo + .stream() + .map(e -> mapExtraInfo(e)) + .collect(Collectors.toList()) : Lists.newArrayList(); + } + + private List buildContexts(final String type) { + final List res = Lists.newArrayList(); + + if ((contextMapper != null) && !contextMapper.isEmpty() && MainEntityType.result.toString().equals(type)) { + + XMLTag document = XMLDoc.newDocument(true).addRoot("contextRoot"); + + for (final String context : contextes) { + + String id = ""; + for (final String token : Splitter.on("::").split(context)) { + id += token; + + final ContextDef def = contextMapper.get(id); + + if (def == null) { + continue; + // throw new IllegalStateException(String.format("cannot find context for id '%s'", id)); + } + + if (def.getName().equals("context")) { + final String xpath = "//context/@id='" + def.getId() + "'"; + if (!document.gotoRoot().rawXpathBoolean(xpath, new Object())) { + document = addContextDef(document.gotoRoot(), def); + } + } + + if (def.getName().equals("category")) { + final String rootId = substringBefore(def.getId(), "::"); + document = addContextDef(document.gotoRoot().gotoTag("//context[./@id='" + rootId + "']", new Object()), def); + } + + if (def.getName().equals("concept")) { + document = addContextDef(document, def).gotoParent(); + } + id += "::"; + } + } + final Transformer transformer = getTransformer(); + for (final org.w3c.dom.Element x : document.gotoRoot().getChildElement()) { + try { + res.add(asStringElement(x, transformer)); + } catch (final TransformerException e) { + throw new RuntimeException(e); + } + } + } + + return res; + } + + private Transformer getTransformer() { + try { + Transformer transformer = TransformerFactory.newInstance().newTransformer(); + transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes"); + return transformer; + } catch (TransformerConfigurationException e) { + throw new IllegalStateException("unable to create javax.xml.transform.Transformer", e); + } + } + + private XMLTag addContextDef(final XMLTag tag, final ContextDef def) { + tag.addTag(def.getName()).addAttribute("id", def.getId()).addAttribute("label", def.getLabel()); + if ((def.getType() != null) && !def.getType().isEmpty()) { + tag.addAttribute("type", def.getType()); + } + return tag; + } + + private String asStringElement(final org.w3c.dom.Element element, final Transformer transformer) throws TransformerException { + final StringWriter buffer = new StringWriter(); + transformer.transform(new DOMSource(element), new StreamResult(buffer)); + return buffer.toString(); + } + + private void fillContextMap(final String xmlTree) { + + Document fundingPath; + try { + fundingPath = new SAXReader().read(new StringReader(xmlTree)); + } catch (final DocumentException e) { + throw new RuntimeException(e); + } + try { + final Node funder = fundingPath.selectSingleNode("//funder"); + + if (funder != null) { + + final String funderShortName = funder.valueOf("./shortname"); + contextes.add(funderShortName); + + contextMapper.put(funderShortName, new ContextDef(funderShortName, funder.valueOf("./name"), "context", "funding")); + final Node level0 = fundingPath.selectSingleNode("//funding_level_0"); + if (level0 != null) { + final String level0Id = Joiner.on("::").join(funderShortName, level0.valueOf("./name")); + contextMapper.put(level0Id, new ContextDef(level0Id, level0.valueOf("./description"), "category", "")); + final Node level1 = fundingPath.selectSingleNode("//funding_level_1"); + if (level1 == null) { + contextes.add(level0Id); + } else { + final String level1Id = Joiner.on("::").join(level0Id, level1.valueOf("./name")); + contextMapper.put(level1Id, new ContextDef(level1Id, level1.valueOf("./description"), "concept", "")); + final Node level2 = fundingPath.selectSingleNode("//funding_level_2"); + if (level2 == null) { + contextes.add(level1Id); + } else { + final String level2Id = Joiner.on("::").join(level1Id, level2.valueOf("./name")); + contextMapper.put(level2Id, new ContextDef(level2Id, level2.valueOf("./description"), "concept", "")); + contextes.add(level2Id); + } + } + } + } + } catch (final NullPointerException e) { + throw new IllegalArgumentException("malformed funding path: " + xmlTree, e); + } + } + + + + @SuppressWarnings("unchecked") + private String getRelFundingTree(final String xmlTree) { + String funding = ""; + try { + final Document ftree = new SAXReader().read(new StringReader(xmlTree)); + funding = ""; + + funding += getFunderElement(ftree); + + for (final Object o : Lists.reverse(ftree.selectNodes("//fundingtree//*[starts-with(local-name(),'funding_level_')]"))) { + final Element e = (Element) o; + final String _id = e.valueOf("./id"); + funding += "<" + e.getName() + " name=\"" + escapeXml(e.valueOf("./name")) + "\">" + escapeXml(_id) + ""; + } + } catch (final DocumentException e) { + throw new IllegalArgumentException("unable to parse funding tree: " + xmlTree + "\n" + e.getMessage()); + } finally { + funding += ""; + } + return funding; + } + + private String getFunderElement(final Document ftree) { + final String funderId = ftree.valueOf("//fundingtree/funder/id/text()"); + final String funderShortName = ftree.valueOf("//fundingtree/funder/shortname/text()"); + final String funderName = ftree.valueOf("//fundingtree/funder/name/text()"); + final String funderJurisdiction = ftree.valueOf("//fundingtree/funder/jurisdiction/text()"); + + return ""; + } + +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/XmlSerializationUtils.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/XmlSerializationUtils.java new file mode 100644 index 000000000..3088828ab --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/XmlSerializationUtils.java @@ -0,0 +1,151 @@ +package eu.dnetlib.dhp.graph.utils; + +import eu.dnetlib.dhp.schema.oaf.*; + +import static eu.dnetlib.dhp.graph.utils.GraphMappingUtils.removePrefix; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +public class XmlSerializationUtils { + + // XML 1.0 + // #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD] | [#x10000-#x10FFFF] + private final static String xml10pattern = "[^" + + "\u0009\r\n" + + "\u0020-\uD7FF" + + "\uE000-\uFFFD" + + "\ud800\udc00-\udbff\udfff" + + "]"; + + public static String mapJournal(Journal j) { + final String attrs = new StringBuilder() + .append(attr("issn", j.getIssnPrinted())) + .append(attr("eissn", j.getIssnOnline())) + .append(attr("lissn", j.getIssnLinking())) + .append(attr("ep", j.getEp())) + .append(attr("iss", j.getIss())) + .append(attr("sp", j.getSp())) + .append(attr("vol", j.getVol())) + .toString() + .trim(); + + return new StringBuilder() + .append("") + .append(escapeXml(j.getName())) + .append("") + .toString(); + } + + private static String attr(final String name, final String value) { + return isNotBlank(value) ? name + "=\"" + escapeXml(value) + "\" " : ""; + } + + public static String mapStructuredProperty(String name, StructuredProperty t) { + return asXmlElement(name, t.getValue(), t.getQualifier(), t.getDataInfo() != null ? t.getDataInfo() : null); + } + + public static String mapQualifier(String name, Qualifier q) { + return asXmlElement(name, "", q, null); + } + + public static String escapeXml(final String value) { + return value + .replaceAll("&", "&") + .replaceAll("<", "<") + .replaceAll(">", ">") + .replaceAll("\"", """) + .replaceAll("'", "'") + .replaceAll(xml10pattern, ""); + } + + public static String parseDataInfo(final DataInfo dataInfo) { + return new StringBuilder() + .append("") + .append(asXmlElement("inferred", dataInfo.getInferred() + "")) + .append(asXmlElement("deletedbyinference", dataInfo.getDeletedbyinference() + "")) + .append(asXmlElement("trust", dataInfo.getTrust() + "")) + .append(asXmlElement("inferenceprovenance", dataInfo.getInferenceprovenance() + "")) + .append(asXmlElement("provenanceaction", null, dataInfo.getProvenanceaction(), null)) + .append("") + .toString(); + } + + private static StringBuilder dataInfoAsAttributes(final StringBuilder sb, final DataInfo info) { + return sb + .append(attr("inferred", info.getInferred() != null ? info.getInferred().toString() : "")) + .append(attr("inferenceprovenance", info.getInferenceprovenance())) + .append(attr("provenanceaction", info.getProvenanceaction() != null ? info.getProvenanceaction().getClassid() : "")) + .append(attr("trust", info.getTrust())); + } + + public static String mapKeyValue(final String name, final KeyValue kv) { + return new StringBuilder() + .append("<") + .append(name) + .append(" name=\"") + .append(escapeXml(kv.getValue())) + .append("\" id=\"") + .append(escapeXml(removePrefix(kv.getKey()))) + .append("\"/>") + .toString(); + } + + public static String mapExtraInfo(final ExtraInfo e) { + return new StringBuilder("") + .append(e.getValue()) + .append("") + .toString(); + } + + public static String asXmlElement(final String name, final String value) { + return asXmlElement(name, value, null, null); + } + + public static String asXmlElement(final String name, final String value, final Qualifier q, final DataInfo info) { + StringBuilder sb = new StringBuilder(); + sb.append("<"); + sb.append(name); + if (q != null) { + sb.append(getAttributes(q)); + } + if (info != null) { + sb + .append(" ") + .append(attr("inferred", info.getInferred() != null ? info.getInferred().toString() : "")) + .append(attr("inferenceprovenance", info.getInferenceprovenance())) + .append(attr("provenanceaction", info.getProvenanceaction() != null ? info.getProvenanceaction().getClassid() : "")) + .append(attr("trust", info.getTrust())); + } + if (isBlank(value)) { + sb.append("/>"); + return sb.toString(); + } + + sb.append(">"); + sb.append(escapeXml(value)); + sb.append(""); + + return sb.toString(); + } + + public static String getAttributes(final Qualifier q) { + if (q == null || q.isBlank()) return ""; + + return new StringBuilder(" ") + .append(attr("classid", q.getClassid())) + .append(attr("classname", q.getClassname())) + .append(attr("schemeid", q.getSchemeid())) + .append(attr("schemename", q.getSchemename())) + .toString(); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/META-INF/services/javax.xml.transform.TransformerFactory b/dhp-workflows/dhp-graph-provision/src/main/resources/META-INF/services/javax.xml.transform.TransformerFactory new file mode 100644 index 000000000..b53ca855f --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/META-INF/services/javax.xml.transform.TransformerFactory @@ -0,0 +1 @@ +net.sf.saxon.TransformerFactoryImpl \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_params_build_adjacency_lists.json similarity index 65% rename from dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json rename to dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_params_build_adjacency_lists.json index cbd4285bf..e63322028 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_params_build_adjacency_lists.json @@ -1,5 +1,6 @@ [ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true}, {"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the path used to store temporary output files", "paramRequired": true}, - {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true} + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequence file to read", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_params_update_index.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_params_update_index.json new file mode 100644 index 000000000..0d45e9e29 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_params_update_index.json @@ -0,0 +1,7 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequence file to read the XML records", "paramRequired": true}, + {"paramName":"f", "paramLongName":"format", "paramDescription": "MDFormat name found in the IS profile", "paramRequired": true}, + {"paramName":"b", "paramLongName":"batchSize", "paramDescription": "size of the batch of documents sent to solr", "paramRequired": false} +] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml index 4b4d2c7bf..fee463868 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml @@ -26,7 +26,15 @@ - + + + + + ${wf:conf('reuseRecords') eq false} + ${wf:conf('reuseRecords') eq true} + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -36,7 +44,7 @@ ${jobTracker} ${nameNode} - yarn-cluster + yarn cluster build_adjacency_lists eu.dnetlib.dhp.graph.SparkXmlRecordBuilderJob @@ -47,14 +55,43 @@ --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" - --conf spark.sql.warehouse.dir="/user/hive/warehouse" --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - -mt yarn-cluster + -mt yarn + -is ${isLookupUrl} --sourcePath${sourcePath} --outputPath${outputPath} + + + + + + + ${jobTracker} + ${nameNode} + yarn + cluster + to_solr_index + eu.dnetlib.dhp.graph.SparkXmlIndexingJob + dhp-graph-provision-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCoresForIndexing} + --driver-memory=${sparkDriverMemory} + --conf spark.executor.instances=${sparkExecutorInstances} + --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" + --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + -mt yarn + -is ${isLookupUrl} + --sourcePath${outputPath}/xml + --format${format} + --batchSize${batchSize} + diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/template/child.st b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/template/child.st new file mode 100644 index 000000000..89f81e16b --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/template/child.st @@ -0,0 +1,3 @@ +> + $metadata:{ it | $it$ }$ + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/template/entity.st b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/template/entity.st new file mode 100644 index 000000000..d16f3c3e0 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/template/entity.st @@ -0,0 +1,10 @@ + + $metadata:{ it | $it$ }$ + + $rels:{ it | $it$ }$ + + + $children:{ it | $it$ }$ + + +$extrainfo:{ it | $it$ }$ \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/template/instance.st b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/template/instance.st new file mode 100644 index 000000000..64bed05b4 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/template/instance.st @@ -0,0 +1,4 @@ + + $metadata:{ it | $it$ }$ + $webresources:{ it | $it$ }$ + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/template/record.st b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/template/record.st new file mode 100644 index 000000000..dea68eab8 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/template/record.st @@ -0,0 +1,17 @@ + + + +
+ $id$ + $dateofcollection$ + $dateoftransformation$ +
+ + + $it$ + + +
+
\ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/template/rel.st b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/template/rel.st new file mode 100644 index 000000000..af19ba497 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/template/rel.st @@ -0,0 +1,4 @@ + + $objIdentifier$ + $metadata:{ it | $it$ }$ + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/template/webresource.st b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/template/webresource.st new file mode 100644 index 000000000..7ff6c5d7f --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/template/webresource.st @@ -0,0 +1,3 @@ + + $identifier$ + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/MappingUtilsTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/MappingUtilsTest.java index fdff4d984..a9d696bea 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/MappingUtilsTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/MappingUtilsTest.java @@ -1,5 +1,8 @@ package eu.dnetlib.dhp.graph; +import eu.dnetlib.dhp.graph.model.EntityRelEntity; +import eu.dnetlib.dhp.graph.model.RelatedEntity; +import eu.dnetlib.dhp.graph.utils.GraphMappingUtils; import org.codehaus.jackson.map.ObjectMapper; import org.junit.Before; import org.junit.Test; @@ -23,23 +26,34 @@ public class MappingUtilsTest { final EntityRelEntity e = new ObjectMapper().readValue(in, EntityRelEntity.class); e.getSource().setType("datasource"); - final EntityRelEntity out = utils.pruneModel(e); + final EntityRelEntity out = utils.asRelatedEntity(e); + System.out.println(out); + + } + + //@Test + public void testOafMappingResult() throws IOException { + + final InputStreamReader in = new InputStreamReader(getClass().getResourceAsStream("result.json")); + final EntityRelEntity e = new ObjectMapper().readValue(in, EntityRelEntity.class); + + final EntityRelEntity out = utils.asRelatedEntity(e); System.out.println(out); } @Test - public void testOafMappinResult() throws IOException { + public void testOafMappingSoftware() throws IOException { - final InputStreamReader in = new InputStreamReader(getClass().getResourceAsStream("result.json")); + final InputStreamReader in = new InputStreamReader(getClass().getResourceAsStream("software.json")); final EntityRelEntity e = new ObjectMapper().readValue(in, EntityRelEntity.class); - e.getSource().setType("otherresearchproduct"); - final EntityRelEntity out = utils.pruneModel(e); + final EntityRelEntity out = utils.asRelatedEntity(e); System.out.println(out); } + @Test public void testParseRelatedEntity() throws IOException { diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/XmlRecordFactoryTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/XmlRecordFactoryTest.java new file mode 100644 index 000000000..2a3c343ec --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/graph/XmlRecordFactoryTest.java @@ -0,0 +1,55 @@ +package eu.dnetlib.dhp.graph; + +import eu.dnetlib.dhp.graph.utils.ContextMapper; +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.spark.sql.SparkSession; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +public class XmlRecordFactoryTest { + + private static final Log log = LogFactory.getLog(XmlRecordFactoryTest.class); + + private Path testDir; + + @Before + public void setup() throws IOException { + testDir = Files.createTempDirectory(getClass().getSimpleName()); + log.info("created test directory " + testDir.toString()); + } + + @After + public void tearDown() throws IOException { + FileUtils.deleteDirectory(testDir.toFile()); + log.info("deleted test directory " + testDir.toString()); + } + + @Test + public void testXmlSerialization() throws Exception { + + final SparkSession spark = SparkSession + .builder() + .appName(SparkXmlRecordBuilderJob.class.getSimpleName()) + .master("local[*]") + .getOrCreate(); + + final String inputDir = testDir.toString() + "/3_joined_entities"; + FileUtils.forceMkdir(new File(inputDir)); + FileUtils.copyFile(new File("/Users/claudio/Downloads/joined_entities-part-00000"), new File(inputDir + "/joined_entities-part-00000")); + + final ContextMapper ctx = ContextMapper.fromIS("https://dev-openaire.d4science.org:443/is/services/isLookUp"); + + final GraphJoiner g = new GraphJoiner(spark, ctx, inputDir, testDir.toString()); + + g.asXML(); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/graph/software.json b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/graph/software.json new file mode 100644 index 000000000..0065b6799 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/graph/software.json @@ -0,0 +1 @@ +{"type":"software","entity":{"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk:repository","classname":"sysimport:crosswalk:repository","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":0,"id":"50|od______2659::05817f64c43a918a07483340b5726f77","originalId":["oai:zenodo.org:204139"],"collectedfrom":[{"key":"10|opendoar____::358aee4cc897452c00244351e4d91f69","value":"ZENODO","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"","inferenceprovenance":"","provenanceaction":{"classid":"","classname":"","schemeid":"","schemename":""}}}],"pid":[],"extraInfo":[],"author":[],"resulttype":{"classid":"software","classname":"software","schemeid":"dnet:result_typologies","schemename":"dnet:result_typologies"},"language":{"classid":"und","classname":"Undetermined","schemeid":"dnet:languages","schemename":"dnet:languages"},"country":[],"subject":[],"title":[],"relevantdate":[],"description":[],"dateofacceptance":{"value":"2016-01-01","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"","inferenceprovenance":"","provenanceaction":{"classid":"","classname":"","schemeid":"","schemename":""}}},"publisher":{"value":"","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"","inferenceprovenance":"","provenanceaction":{"classid":"","classname":"","schemeid":"","schemename":""}}},"embargoenddate":{"value":"","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"","inferenceprovenance":"","provenanceaction":{"classid":"","classname":"","schemeid":"","schemename":""}}},"source":[],"fulltext":[],"format":[],"contributor":[],"resourcetype":{"classid":"","classname":"","schemeid":"","schemename":""},"coverage":[],"refereed":{"value":"","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"","inferenceprovenance":"","provenanceaction":{"classid":"","classname":"","schemeid":"","schemename":""}}},"context":[],"instance":[{"license":{"value":"","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"","inferenceprovenance":"","provenanceaction":{"classid":"","classname":"","schemeid":"","schemename":""}}},"accessright":{"classid":"OPEN","classname":"Open Access","schemeid":"dnet:access_modes","schemename":"dnet:access_modes"},"instancetype":{"classid":"0029","classname":"Software","schemeid":"dnet:publication_resource","schemename":"dnet:publication_resource"},"hostedby":{"key":"10|opendoar____::358aee4cc897452c00244351e4d91f69","value":"ZENODO","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"","inferenceprovenance":"","provenanceaction":{"classid":"","classname":"","schemeid":"","schemename":""}}},"url":[],"distributionlocation":"","collectedfrom":{"key":"10|opendoar____::358aee4cc897452c00244351e4d91f69","value":"ZENODO","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"","inferenceprovenance":"","provenanceaction":{"classid":"","classname":"","schemeid":"","schemename":""}}},"dateofacceptance":{"value":"2016-01-01","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"","inferenceprovenance":"","provenanceaction":{"classid":"","classname":"","schemeid":"","schemename":""}}}}],"documentationUrl":[],"license":[],"codeRepositoryUrl":{"value":"","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"","inferenceprovenance":"","provenanceaction":{"classid":"","classname":"","schemeid":"","schemename":""}}},"programmingLanguage":{"classid":"","classname":"","schemeid":"","schemename":""}},"links":[{"relation":{"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk:repository","classname":"sysimport:crosswalk:repository","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":0,"relType":"resultProject","subRelType":"outcome","relClass":"isProducedBy","source":"50|od______2659::05817f64c43a918a07483340b5726f77","target":"40|corda__h2020::e2a38892773e6541ec7c07aa605ad581"},"relatedEntity":{"id":"40|corda__h2020::e2a38892773e6541ec7c07aa605ad581","type":"project","projectTitle":"Engaging the EGI Community towards an Open Science Commons","code":"654142","acronym":"EGI-Engage","contracttype":{},"fundingtree":["{value=ec__________::ECECEuropean CommissionEUec__________::EC::H2020::RIAResearch and Innovation actionRIAec:h2020toasec__________::EC::H2020H2020Horizon 2020 Framework Programmeec:h2020fundings, dataInfo={invisible=false, inferred=false, deletedbyinference=false, trust=, inferenceprovenance=, provenanceaction={classid=, classname=, schemeid=, schemename=}}}"]}},{"relation":{"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk:repository","classname":"sysimport:crosswalk:repository","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":0,"relType":"resultProject","subRelType":"outcome","relClass":"isProducedBy","source":"50|od______2659::05817f64c43a918a07483340b5726f77","target":"40|corda_______::4d31ccb13726266f9098129756e03f43"},"relatedEntity":{"id":"40|corda_______::4d31ccb13726266f9098129756e03f43","type":"project","projectTitle":"Common Operations of Environmental Research Infrastructures","code":"283465","acronym":"ENVRI","contracttype":{},"fundingtree":["{value=ec__________::ECECEuropean CommissionEUec__________::EC::FP7::SP4::INFRAResearch InfrastructuresINFRAec:programec__________::EC::FP7::SP4SP4-CapacitiesSP4ec:specificprogramec__________::EC::FP7SEVENTH FRAMEWORK PROGRAMMEFP7ec:frameworkprogram, dataInfo={invisible=false, inferred=false, deletedbyinference=false, trust=, inferenceprovenance=, provenanceaction={classid=, classname=, schemeid=, schemename=}}}"]}},{"relation":{"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk:repository","classname":"sysimport:crosswalk:repository","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":0,"relType":"resultProject","subRelType":"outcome","relClass":"isProducedBy","source":"50|od______2659::05817f64c43a918a07483340b5726f77","target":"40|corda_______::5af7655a8e0e871cf16072b4b6ab9b41"},"relatedEntity":{"id":"40|corda_______::5af7655a8e0e871cf16072b4b6ab9b41","type":"project","projectTitle":"Data e-Infrastructure Initiative for Fisheries Management and Conservation of Marine Living Resources","code":"283644","acronym":"IMARINE","contracttype":{},"fundingtree":["{value=ec__________::ECECEuropean CommissionEUec__________::EC::FP7::SP4::INFRAResearch InfrastructuresINFRAec:programec__________::EC::FP7::SP4SP4-CapacitiesSP4ec:specificprogramec__________::EC::FP7SEVENTH FRAMEWORK PROGRAMMEFP7ec:frameworkprogram, dataInfo={invisible=false, inferred=false, deletedbyinference=false, trust=, inferenceprovenance=, provenanceaction={classid=, classname=, schemeid=, schemename=}}}"]}},{"relation":{"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk:repository","classname":"sysimport:crosswalk:repository","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":0,"relType":"resultProject","subRelType":"outcome","relClass":"isProducedBy","source":"50|od______2659::05817f64c43a918a07483340b5726f77","target":"40|corda_______::e8da2e3e130ad3b1a650487d9ff126e4"},"relatedEntity":{"id":"40|corda_______::e8da2e3e130ad3b1a650487d9ff126e4","type":"project","projectTitle":"EU-Brazil Open Data and Cloud Computing e-Infrastructure for Biodiversity","code":"288754","acronym":"EUBRAZILOPENBIO","contracttype":{},"fundingtree":["{value=ec__________::ECECEuropean CommissionEUec__________::EC::FP7::SP1::ICTInformation and Communication TechnologiesICTec:programec__________::EC::FP7::SP1SP1-CooperationSP1ec:specificprogramec__________::EC::FP7SEVENTH FRAMEWORK PROGRAMMEFP7ec:frameworkprogram, dataInfo={invisible=false, inferred=false, deletedbyinference=false, trust=, inferenceprovenance=, provenanceaction={classid=, classname=, schemeid=, schemename=}}}"]}},{"relation":{"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk:repository","classname":"sysimport:crosswalk:repository","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":0,"relType":"resultProject","subRelType":"outcome","relClass":"isProducedBy","source":"50|od______2659::05817f64c43a918a07483340b5726f77","target":"40|corda_______::15463ed3cba51f042181197cfabb2ff5"},"relatedEntity":{"id":"40|corda_______::15463ed3cba51f042181197cfabb2ff5","type":"project","projectTitle":"Data Infrastructure Ecosystem for Science","code":"239019","acronym":"D4SCIENCE-II","contracttype":{},"fundingtree":["{value=ec__________::ECECEuropean CommissionEUec__________::EC::FP7::SP4::INFRAResearch InfrastructuresINFRAec:programec__________::EC::FP7::SP4SP4-CapacitiesSP4ec:specificprogramec__________::EC::FP7SEVENTH FRAMEWORK PROGRAMMEFP7ec:frameworkprogram, dataInfo={invisible=false, inferred=false, deletedbyinference=false, trust=, inferenceprovenance=, provenanceaction={classid=, classname=, schemeid=, schemename=}}}"]}},{"relation":{"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk:repository","classname":"sysimport:crosswalk:repository","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":0,"relType":"resultProject","subRelType":"outcome","relClass":"isProducedBy","source":"50|od______2659::05817f64c43a918a07483340b5726f77","target":"40|corda__h2020::4d46893df18bb77f5d817b8ce98ac56c"},"relatedEntity":{"id":"40|corda__h2020::4d46893df18bb77f5d817b8ce98ac56c","type":"project","projectTitle":"Pooling Activities, Resources and Tools for Heritage E-research Networking, Optimization and Synergies","code":"654119","acronym":"PARTHENOS","contracttype":{},"fundingtree":["{value=ec__________::ECECEuropean CommissionEUec__________::EC::H2020::RIAResearch and Innovation actionRIAec:h2020toasec__________::EC::H2020H2020Horizon 2020 Framework Programmeec:h2020fundings, dataInfo={invisible=false, inferred=false, deletedbyinference=false, trust=, inferenceprovenance=, provenanceaction={classid=, classname=, schemeid=, schemename=}}}"]}},{"relation":{"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk:repository","classname":"sysimport:crosswalk:repository","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":0,"relType":"resultProject","subRelType":"outcome","relClass":"isProducedBy","source":"50|od______2659::05817f64c43a918a07483340b5726f77","target":"40|corda_______::7f18b83690e3a18134b9a3db66d882d3"},"relatedEntity":{"id":"40|corda_______::7f18b83690e3a18134b9a3db66d882d3","type":"project","projectTitle":"DIstributed colLaboratories Infrastructure on Grid ENabled Technology 4 Science","code":"212488","acronym":"D4SCIENCE","contracttype":{},"fundingtree":["{value=ec__________::ECECEuropean CommissionEUec__________::EC::FP7::SP4::INFRAResearch InfrastructuresINFRAec:programec__________::EC::FP7::SP4SP4-CapacitiesSP4ec:specificprogramec__________::EC::FP7SEVENTH FRAMEWORK PROGRAMMEFP7ec:frameworkprogram, dataInfo={invisible=false, inferred=false, deletedbyinference=false, trust=, inferenceprovenance=, provenanceaction={classid=, classname=, schemeid=, schemename=}}}"]}},{"relation":{"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk:repository","classname":"sysimport:crosswalk:repository","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":0,"relType":"resultProject","subRelType":"outcome","relClass":"isProducedBy","source":"50|od______2659::05817f64c43a918a07483340b5726f77","target":"40|corda__h2020::6729c0ee95de7724deb60454bb4179de"},"relatedEntity":{"id":"40|corda__h2020::6729c0ee95de7724deb60454bb4179de","type":"project","projectTitle":"Building Research environments for fostering Innovation, Decision making, Governance and Education to support Blue growth","code":"675680","acronym":"BlueBRIDGE","contracttype":{},"fundingtree":["{value=ec__________::ECECEuropean CommissionEUec__________::EC::H2020::RIAResearch and Innovation actionRIAec:h2020toasec__________::EC::H2020H2020Horizon 2020 Framework Programmeec:h2020fundings, dataInfo={invisible=false, inferred=false, deletedbyinference=false, trust=, inferenceprovenance=, provenanceaction={classid=, classname=, schemeid=, schemename=}}}"]}},{"relation":{"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk:repository","classname":"sysimport:crosswalk:repository","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":0,"relType":"resultProject","subRelType":"outcome","relClass":"isProducedBy","source":"50|od______2659::05817f64c43a918a07483340b5726f77","target":"40|corda__h2020::0da81b3ad78047f577dd405e8a2d7f07"},"relatedEntity":{"id":"40|corda__h2020::0da81b3ad78047f577dd405e8a2d7f07","type":"project","projectTitle":"Environmental Research Infrastructures Providing Shared Solutions for Science and Society","code":"654182","acronym":"ENVRI PLUS","contracttype":{},"fundingtree":["{value=ec__________::ECECEuropean CommissionEUec__________::EC::H2020::RIAResearch and Innovation actionRIAec:h2020toasec__________::EC::H2020H2020Horizon 2020 Framework Programmeec:h2020fundings, dataInfo={invisible=false, inferred=false, deletedbyinference=false, trust=, inferenceprovenance=, provenanceaction={classid=, classname=, schemeid=, schemename=}}}"]}},{"relation":{"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk:repository","classname":"sysimport:crosswalk:repository","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":0,"relType":"resultProject","subRelType":"outcome","relClass":"isProducedBy","source":"50|od______2659::05817f64c43a918a07483340b5726f77","target":"40|corda__h2020::e7f5e7755409fc74eea9d168ab795634"},"relatedEntity":{"id":"40|corda__h2020::e7f5e7755409fc74eea9d168ab795634","type":"project","projectTitle":"SoBigData Research Infrastructure","code":"654024","acronym":"SoBigData","contracttype":{},"fundingtree":["{value=ec__________::ECECEuropean CommissionEUec__________::EC::H2020::RIAResearch and Innovation actionRIAec:h2020toasec__________::EC::H2020H2020Horizon 2020 Framework Programmeec:h2020fundings, dataInfo={invisible=false, inferred=false, deletedbyinference=false, trust=, inferenceprovenance=, provenanceaction={classid=, classname=, schemeid=, schemename=}}}"]}}]} \ No newline at end of file diff --git a/pom.xml b/pom.xml index aedf5ebff..f14877500 100644 --- a/pom.xml +++ b/pom.xml @@ -96,6 +96,12 @@ ${dhp.hadoop.version} provided
+ + org.apache.hadoop + hadoop-common + ${dhp.hadoop.version} + provided + org.apache.hadoop hadoop-client @@ -149,7 +155,7 @@ net.sf.saxon Saxon-HE - 9.5.1-5 + 9.9.1-6 @@ -170,6 +176,51 @@ 1.1.6 + + com.mycila.xmltool + xmltool + 3.3 + + + + org.apache.solr + solr-solrj + 7.5.0 + + + * + * + + + + + com.lucidworks.spark + spark-solr + 3.6.0 + + + * + * + + + + + + org.apache.httpcomponents + httpclient + 4.5.3 + + + org.noggit + noggit + 0.8 + + + org.apache.zookeeper + zookeeper + 3.4.11 + + net.schmizz sshj @@ -202,8 +253,17 @@ dnet-pace-core 4.0.0-SNAPSHOT + + eu.dnetlib + cnr-rmi-api + [2.0.0,3.0.0) + - + + org.apache.cxf + cxf-rt-transports-http + 3.1.5 + javax.persistence javax.persistence-api @@ -231,6 +291,11 @@ secondstring 1.0.0 + + org.antlr + stringtemplate + 4.0 + org.apache.oozie