Compare commits

...

31 Commits

Author SHA1 Message Date
Michele Artini e96527325d saved a query for openaire production database 2020-02-19 09:55:58 +01:00
Claudio Atzori ed76521d9b removed stale test resources, will be re-added later on 2020-02-18 11:51:08 +01:00
Claudio Atzori 0f364605ff removed stale tests, need to reimplemente them anyway 2020-02-18 11:48:19 +01:00
Claudio Atzori 6a288625e5 fixed workflow outgoing node 2020-02-17 15:04:33 +01:00
Claudio Atzori 1b18fd4d54 sync with master branch 2020-02-17 13:49:46 +01:00
Claudio Atzori 5bae30f399 adding readme for dhp-schema 2020-02-17 13:38:33 +01:00
Sandro La Bruzzo 4f04759738 Merge branch 'master' of code-repo.d4science.org:D-Net/dnet-hadoop 2020-02-17 12:31:58 +01:00
Sandro La Bruzzo 76ee85141a added oozie job for DNET migration and implemented Spark job for extracting entities 2020-02-17 12:31:44 +01:00
Claudio Atzori c460e2d281 Aggiornare 'dhp-workflows/docs/oozie-installer.markdown' 2020-02-17 11:54:48 +01:00
Sandro La Bruzzo fe93c709f1 Merge branch 'master' of michele.artini/dnet-hadoop into master 2020-02-17 10:43:08 +01:00
Claudio Atzori 56d1810a66 working procedure for records indexing using Spark, via lib com.lucidworks.spark:spark-solr 2020-02-14 12:28:52 +01:00
Claudio Atzori 1ee1baa8c0 Merge branch 'master' into provision_indexing 2020-02-13 18:17:07 +01:00
Claudio Atzori 11cfd6bd9a integrated changes from master branch 2020-02-13 17:27:07 +01:00
Claudio Atzori 1fee6e2b7e implemented XML records construction and serialization, indexing WIP 2020-02-13 16:53:27 +01:00
Claudio Atzori 956da2f923 added Saxon-HE extension functions and Transformer factory class 2020-02-13 16:49:45 +01:00
Claudio Atzori d3b96f102b builder pattern screws up the Parquet schema inference method, avoid using it in the bean definitions 2020-02-04 14:10:58 +01:00
Claudio Atzori ed290ca8d7 builder pattern 2020-02-03 10:35:51 +01:00
Claudio Atzori 7ba0f44d05 WIP 2020-01-30 18:21:07 +01:00
Claudio Atzori 49ef2f4eb1 removed input parameter specification, SparkXmlRecordBuilderJob doesn't need hive 2020-01-30 18:20:26 +01:00
Claudio Atzori b5e1e2e5b2 reintegrated changes from fcbc4ccd70 2020-01-30 18:11:04 +01:00
Claudio Atzori 7bacd6812e Merge branch 'provision_indexing' of https://code-repo.d4science.org/D-Net/dnet-hadoop into HEAD
 Conflicts:
	dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java
	dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/MappingUtils.java
	dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java
	dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlRecordBuilderJob.java
2020-01-30 17:59:46 +01:00
Claudio Atzori b2691a3b0a save adjacency list as JoinedEntity 2020-01-30 17:46:29 +01:00
Claudio Atzori 1ecca69f49 added annotation to ignore method during the serialization 2020-01-30 17:45:28 +01:00
Claudio Atzori 8c2aff99b0 joining entities using T x R x S, WIP: last representation based on LinkedEntity type 2020-01-29 15:40:33 +01:00
Claudio Atzori fcbc4ccd70 a bit of docs doesn't hurt 2020-01-24 08:43:23 +01:00
Claudio Atzori a55f5fecc6 joining entities using T x R x S method with groupByKey, WIP: making target objects (T) have lower memory footprint 2020-01-24 08:17:53 +01:00
Claudio Atzori 799929c1e3 joining entities using T x R x S method with groupByKey 2020-01-21 16:35:44 +01:00
Claudio Atzori 1cd6899480 merged from master 2020-01-17 14:25:57 +01:00
Claudio Atzori 63c0db4ff8 instance URLs must be repeatable 2020-01-16 15:54:53 +02:00
Claudio Atzori 97c239ee0d WIP: trying to find a way to build the records for the index 2020-01-16 12:02:28 +02:00
Claudio Atzori 7ba586d2e5 oozie workflow aimed to build the adjacency lists representation of the graph, needed to build the records to be indexed 2019-12-17 16:24:49 +01:00
55 changed files with 4077 additions and 84 deletions

2
.gitignore vendored
View File

@ -20,5 +20,5 @@
/*/build
/build
spark-warehouse
/*/*/job-override.properties
/**/job-override.properties

View File

@ -42,6 +42,10 @@
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
<groupId>net.sf.saxon</groupId>
<artifactId>Saxon-HE</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,32 @@
package eu.dnetlib.dhp.utils.saxon;
import net.sf.saxon.expr.XPathContext;
import net.sf.saxon.lib.ExtensionFunctionCall;
import net.sf.saxon.lib.ExtensionFunctionDefinition;
import net.sf.saxon.om.Sequence;
import net.sf.saxon.om.StructuredQName;
import net.sf.saxon.trans.XPathException;
public abstract class AbstractExtensionFunction extends ExtensionFunctionDefinition {
public static String DEFAULT_SAXON_EXT_NS_URI = "http://www.d-net.research-infrastructures.eu/saxon-extension";
public abstract String getName();
public abstract Sequence doCall(XPathContext context, Sequence[] arguments) throws XPathException;
@Override
public StructuredQName getFunctionQName() {
return new StructuredQName("dnet", DEFAULT_SAXON_EXT_NS_URI, getName());
}
@Override
public ExtensionFunctionCall makeCallExpression() {
return new ExtensionFunctionCall() {
@Override
public Sequence call(XPathContext context, Sequence[] arguments) throws XPathException {
return doCall(context, arguments);
}
};
}
}

View File

@ -0,0 +1,67 @@
package eu.dnetlib.dhp.utils.saxon;
import net.sf.saxon.expr.XPathContext;
import net.sf.saxon.om.Item;
import net.sf.saxon.om.Sequence;
import net.sf.saxon.trans.XPathException;
import net.sf.saxon.value.SequenceType;
import net.sf.saxon.value.StringValue;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.GregorianCalendar;
public class ExtractYear extends AbstractExtensionFunction {
private static final String[] dateFormats = { "yyyy-MM-dd", "yyyy/MM/dd" };
@Override
public String getName() {
return "extractYear";
}
@Override
public Sequence doCall(XPathContext context, Sequence[] arguments) throws XPathException {
if (arguments == null | arguments.length == 0) {
return new StringValue("");
}
final Item item = arguments[0].head();
if (item == null) {
return new StringValue("");
}
return new StringValue(_year(item.getStringValue()));
}
@Override
public int getMinimumNumberOfArguments() {
return 0;
}
@Override
public int getMaximumNumberOfArguments() {
return 1;
}
@Override
public SequenceType[] getArgumentTypes() {
return new SequenceType[] { SequenceType.OPTIONAL_ITEM };
}
@Override
public SequenceType getResultType(SequenceType[] suppliedArgumentTypes) {
return SequenceType.SINGLE_STRING;
}
private String _year(String s) {
Calendar c = new GregorianCalendar();
for (String format : dateFormats) {
try {
c.setTime(new SimpleDateFormat(format).parse(s));
String year = String.valueOf(c.get(Calendar.YEAR));
return year;
} catch (ParseException e) {}
}
return "";
}
}

View File

@ -0,0 +1,66 @@
package eu.dnetlib.dhp.utils.saxon;
import net.sf.saxon.expr.XPathContext;
import net.sf.saxon.om.Sequence;
import net.sf.saxon.trans.XPathException;
import net.sf.saxon.value.SequenceType;
import net.sf.saxon.value.StringValue;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class NormalizeDate extends AbstractExtensionFunction {
private static final String[] normalizeDateFormats = { "yyyy-MM-dd'T'hh:mm:ss", "yyyy-MM-dd", "yyyy/MM/dd", "yyyy" };
private static final String normalizeOutFormat = new String("yyyy-MM-dd'T'hh:mm:ss'Z'");
@Override
public String getName() {
return "normalizeDate";
}
@Override
public Sequence doCall(XPathContext context, Sequence[] arguments) throws XPathException {
if (arguments == null | arguments.length == 0) {
return new StringValue("");
}
String s = arguments[0].head().getStringValue();
return new StringValue(_year(s));
}
@Override
public int getMinimumNumberOfArguments() {
return 0;
}
@Override
public int getMaximumNumberOfArguments() {
return 1;
}
@Override
public SequenceType[] getArgumentTypes() {
return new SequenceType[] { SequenceType.OPTIONAL_ITEM };
}
@Override
public SequenceType getResultType(SequenceType[] suppliedArgumentTypes) {
return SequenceType.SINGLE_STRING;
}
private String _year(String s) {
final String date = s != null ? s.trim() : "";
for (String format : normalizeDateFormats) {
try {
Date parse = new SimpleDateFormat(format).parse(date);
String res = new SimpleDateFormat(normalizeOutFormat).format(parse);
return res;
} catch (ParseException e) {}
}
return "";
}
}

View File

@ -0,0 +1,60 @@
package eu.dnetlib.dhp.utils.saxon;
import net.sf.saxon.expr.XPathContext;
import net.sf.saxon.om.Item;
import net.sf.saxon.om.Sequence;
import net.sf.saxon.trans.XPathException;
import net.sf.saxon.value.SequenceType;
import net.sf.saxon.value.StringValue;
import org.apache.commons.lang3.StringUtils;
public class PickFirst extends AbstractExtensionFunction {
@Override
public String getName() {
return "pickFirst";
}
@Override
public Sequence doCall(XPathContext context, Sequence[] arguments) throws XPathException {
if (arguments == null | arguments.length == 0) {
return new StringValue("");
}
final String s1 = getValue(arguments[0]);
final String s2 = getValue(arguments[1]);
return new StringValue(StringUtils.isNotBlank(s1) ? s1 : StringUtils.isNotBlank(s2) ? s2 : "");
}
private String getValue(final Sequence arg) throws XPathException {
if (arg != null) {
final Item item = arg.head();
if (item != null) {
return item.getStringValue();
}
}
return "";
}
@Override
public int getMinimumNumberOfArguments() {
return 0;
}
@Override
public int getMaximumNumberOfArguments() {
return 2;
}
@Override
public SequenceType[] getArgumentTypes() {
return new SequenceType[] { SequenceType.OPTIONAL_ITEM };
}
@Override
public SequenceType getResultType(SequenceType[] suppliedArgumentTypes) {
return SequenceType.SINGLE_STRING;
}
}

View File

@ -0,0 +1,30 @@
package eu.dnetlib.dhp.utils.saxon;
import net.sf.saxon.Configuration;
import net.sf.saxon.TransformerFactoryImpl;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.stream.StreamSource;
import java.io.StringReader;
public class SaxonTransformerFactory {
/**
* Creates the index record transformer from the given XSLT
* @param xslt
* @return
* @throws TransformerException
*/
public static Transformer newInstance(final String xslt) throws TransformerException {
final TransformerFactoryImpl factory = new TransformerFactoryImpl();
final Configuration conf = factory.getConfiguration();
conf.registerExtensionFunction(new ExtractYear());
conf.registerExtensionFunction(new NormalizeDate());
conf.registerExtensionFunction(new PickFirst());
return factory.newTransformer(new StreamSource(new StringReader(xslt)));
}
}

View File

@ -1,3 +1,11 @@
Description of the project
--------------------------
This project defines **serialization schemas** of Avro data store files that are used to pass data between workflow nodes in the system.
This project defines **object schemas** of the OpenAIRE main entities and the relationships that intercur among them.
Namely it defines the model for
- **research product (result)** which subclasses in publication, dataset, other research product, software
- **data source** object describing the data provider (institutional repository, aggregators, cris systems)
- **organization** research bodies managing a data source or participating to a research project
- **project** research project
Te serialization of such objects (data store files) are used to pass data between workflow nodes in the processing pipeline.

View File

@ -26,6 +26,11 @@
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>

View File

@ -40,9 +40,9 @@ public class Datasource extends OafEntity implements Serializable {
private List<Field<String>> odlanguages;
private List< Field<String>> odcontenttypes;
private List<Field<String>> odcontenttypes;
private List< Field<String>> accessinfopackage;
private List<Field<String>> accessinfopackage;
// re3data fields
private Field<String> releasestartdate;

View File

@ -1,5 +1,6 @@
package eu.dnetlib.dhp.schema.oaf;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
@ -36,7 +37,7 @@ public class GeoLocation implements Serializable {
this.place = place;
}
@JsonIgnore
public boolean isBlank() {
return StringUtils.isBlank(point) &&
StringUtils.isBlank(box) &&

View File

@ -1,5 +1,6 @@
package eu.dnetlib.dhp.schema.oaf;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
@ -40,6 +41,7 @@ public class KeyValue implements Serializable {
return isBlank()?"":String.format("%s::%s", key != null ? key.toLowerCase() : "", value != null ? value.toLowerCase() : "");
}
@JsonIgnore
public boolean isBlank() {
return StringUtils.isBlank(key) && StringUtils.isBlank(value);
}

View File

@ -1,5 +1,6 @@
package eu.dnetlib.dhp.schema.oaf;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
@ -50,6 +51,8 @@ public class Qualifier implements Serializable {
schemeid != null ? schemeid : "",
schemename != null ? schemename : "");
}
@JsonIgnore
public boolean isBlank() {
return StringUtils.isBlank(classid) &&
StringUtils.isBlank(classname) &&

View File

@ -0,0 +1,56 @@
package eu.dnetlib.dhp.migration;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
public class ExtractEntitiesFromHDFSJob {
private static List<String> folderNames = Arrays.asList("db_entities", "oaf_entities", "odf_entities");
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils.toString(MigrateMongoMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json")));
parser.parseArgument(args);
final SparkSession spark = SparkSession
.builder()
.appName(ExtractEntitiesFromHDFSJob.class.getSimpleName())
.master(parser.get("master"))
.getOrCreate();
final String sourcePath = parser.get("sourcePath");
final String targetPath = parser.get("graphRawPath");
final String entity = parser.get("entity");
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<String> inputRdd = sc.emptyRDD();
folderNames.forEach(p -> inputRdd.union(
sc.sequenceFile(sourcePath+"/"+p, Text.class, Text.class)
.map(k -> new Tuple2<>(k._1().toString(), k._2().toString()))
.filter(k -> isEntityType(k._1(), entity))
.map(Tuple2::_2))
);
inputRdd.saveAsTextFile(targetPath+"/"+entity);
}
private static boolean isEntityType(final String item, final String entity) {
return StringUtils.substringAfter(item, ":").equalsIgnoreCase(entity);
}
}

View File

@ -0,0 +1,26 @@
[
{
"paramName": "s",
"paramLongName": "sourcePath",
"paramDescription": "the HDFS source path which contains the sequential file",
"paramRequired": true
},
{
"paramName": "mt",
"paramLongName": "master",
"paramDescription": "should be local or yarn",
"paramRequired": true
},
{
"paramName": "g",
"paramLongName": "graphRawPath",
"paramDescription": "the path of the graph Raw in hdfs",
"paramRequired": true
},
{
"paramName": "e",
"paramLongName": "entity",
"paramDescription": "The entity to extract",
"paramRequired": true
}
]

View File

@ -0,0 +1,22 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hdfsUser</name>
<value>dnet</value>
</property>
</configuration>

View File

@ -0,0 +1,282 @@
<workflow-app name="import Entities from aggretor to HDFS" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>workingPath</name>
<description>the base path to store hdfs file</description>
</property>
<property>
<name>graphRawPath</name>
<description>the graph Raw base path</description>
</property>
<property>
<name>postgresURL</name>
<description>the postgres URL to access to the database</description>
</property>
<property>
<name>postgresUser</name>
<description>the user postgres</description>
</property>
<property>
<name>postgresPassword</name>
<description>the password postgres</description>
</property>
<property>
<name>mongourl</name>
<description>mongoDB url, example: mongodb://[username:password@]host[:port]</description>
</property>
<property>
<name>mongoDb</name>
<description>mongo database</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="ResetWorkingPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResetWorkingPath">
<fs>
<delete path='${workingPath}'/>
<mkdir path='${workingPath}'/>
</fs>
<ok to="ImportEntitiesFromPostgres"/>
<error to="Kill"/>
</action>
<action name="ImportEntitiesFromPostgres">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.MigrateDbEntitiesApplication</main-class>
<arg>-p</arg><arg>${workingPath}/db_entities</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-u</arg><arg>${hdfsUser}</arg>
<arg>-dburl</arg><arg>${postgresURL}</arg>
<arg>-dbuser</arg><arg>${postgresUser}</arg>
<arg>-dbpasswd</arg><arg>${postgresPassword}</arg>
</java>
<ok to="ImportODFEntitiesFromMongoDB"/>
<error to="Kill"/>
</action>
<action name="ImportODFEntitiesFromMongoDB">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${workingPath}/odf_entities</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-u</arg><arg>${hdfsUser}</arg>
<arg>-mongourl</arg><arg>${mongourl}</arg>
<arg>-db</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>ODF</arg>
<arg>-l</arg><arg>store</arg>
<arg>-i</arg><arg>cleaned</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
</java>
<ok to="ImportOAFEntitiesFromMongoDB"/>
<error to="Kill"/>
</action>
<action name="ImportOAFEntitiesFromMongoDB">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${workingPath}/oaf_entities</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-u</arg><arg>${hdfsUser}</arg>
<arg>-mongourl</arg><arg>${mongourl}</arg>
<arg>-db</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>OAF</arg>
<arg>-l</arg><arg>store</arg>
<arg>-i</arg><arg>cleaned</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
</java>
<ok to="ExtractPublication"/>
<error to="Kill"/>
</action>
<action name="ExtractPublication">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExtractEntities: publication</name>
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingPath}</arg>
<arg>-g</arg><arg>${graphRawPath}/publication</arg>
<arg>-e</arg><arg>publication</arg>
</spark>
<ok to="ExtractDataset"/>
<error to="Kill"/>
</action>
<action name="ExtractDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExtractEntities: dataset</name>
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingPath}</arg>
<arg>-g</arg><arg>${graphRawPath}/dataset</arg>
<arg>-e</arg><arg>dataset</arg>
</spark>
<ok to="ExtractSoftware"/>
<error to="Kill"/>
</action>
<action name="ExtractSoftware">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExtractEntities: software</name>
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingPath}</arg>
<arg>-g</arg><arg>${graphRawPath}/software</arg>
<arg>-e</arg><arg>software</arg>
</spark>
<ok to="ExtractORP"/>
<error to="Kill"/>
</action>
<action name="ExtractORP">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExtractEntities: otherresearchproduct</name>
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingPath}</arg>
<arg>-g</arg><arg>${graphRawPath}/otherresearchproduct</arg>
<arg>-e</arg><arg>otherresearchproduct</arg>
</spark>
<ok to="ExtractDatasource"/>
<error to="Kill"/>
</action>
<action name="ExtractDatasource">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExtractEntities: datasource</name>
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingPath}</arg>
<arg>-g</arg><arg>${graphRawPath}/datasource</arg>
<arg>-e</arg><arg>datasource</arg>
</spark>
<ok to="ExtractOrganization"/>
<error to="Kill"/>
</action>
<action name="ExtractOrganization">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExtractEntities: organization</name>
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingPath}</arg>
<arg>-g</arg><arg>${graphRawPath}/organization</arg>
<arg>-e</arg><arg>organization</arg>
</spark>
<ok to="ExtractProject"/>
<error to="Kill"/>
</action>
<action name="ExtractProject">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExtractEntities: project</name>
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingPath}</arg>
<arg>-g</arg><arg>${graphRawPath}/project</arg>
<arg>-e</arg><arg>project</arg>
</spark>
<ok to="ExtractRelation"/>
<error to="Kill"/>
</action>
<action name="ExtractRelation">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExtractEntities: relation</name>
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingPath}</arg>
<arg>-g</arg><arg>${graphRawPath}/relation</arg>
<arg>-e</arg><arg>relation</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,90 @@
SELECT
p.id AS projectid,
p.code AS code,
p.websiteurl AS websiteurl,
p.acronym AS acronym,
p.title AS title,
p.startdate AS startdate,
p.enddate AS enddate,
p.call_identifier AS callidentifier,
p.keywords AS keywords,
p.duration AS duration,
p.ec_sc39 AS ecsc39,
p.oa_mandate_for_publications AS oamandatepublications,
p.ec_article29_3 AS ecarticle29_3,
p.dateofcollection AS dateofcollection,
p.lastupdate AS dateoftransformation,
p.inferred AS inferred,
p.deletedbyinference AS deletedbyinference,
p.trust AS trust,
p.inferenceprovenance AS inferenceprovenance,
p.optional1 AS optional1,
p.optional2 AS optional2,
p.jsonextrainfo AS jsonextrainfo,
p.contactfullname AS contactfullname,
p.contactfax AS contactfax,
p.contactphone AS contactphone,
p.contactemail AS contactemail,
p.summary AS summary,
p.currency AS currency,
p.totalcost AS totalcost,
p.fundedamount AS fundedamount,
dc.id AS collectedfromid,
dc.officialname AS collectedfromname,
ctc.code || '@@@' || ctc.name || '@@@' || cts.code || '@@@' || cts.name AS contracttype,
pac.code || '@@@' || pac.name || '@@@' || pas.code || '@@@' || pas.name AS provenanceaction,
array_agg(DISTINCT i.pid || '###' || i.issuertype) AS pid,
array_agg(DISTINCT s.name || '###' || sc.code || '@@@' || sc.name || '@@@' || ss.code || '@@@' || ss.name) AS subjects,
array_agg(DISTINCT fp.path) AS fundingtree
FROM projects p
LEFT OUTER JOIN class pac ON (pac.code = p.provenanceactionclass)
LEFT OUTER JOIN scheme pas ON (pas.code = p.provenanceactionscheme)
LEFT OUTER JOIN projectpids pp ON (pp.project = p.id)
LEFT OUTER JOIN dsm_identities i ON (i.pid = pp.pid)
LEFT OUTER JOIN dsm_datasources dc ON (dc.id = p.collectedfrom)
LEFT OUTER JOIN project_fundingpath pf ON (pf.project = p.id)
LEFT OUTER JOIN fundingpaths fp ON (fp.id = pf.funding)
LEFT OUTER JOIN project_subject ps ON (ps.project = p.id)
LEFT OUTER JOIN subjects s ON (s.id = ps.subject)
LEFT OUTER JOIN class sc ON (sc.code = s.semanticclass)
LEFT OUTER JOIN scheme ss ON (ss.code = s.semanticscheme)
LEFT OUTER JOIN class ctc ON (ctc.code = p.contracttypeclass)
LEFT OUTER JOIN scheme cts ON (cts.code = p.contracttypescheme)
GROUP BY
p.id,
p.code,
p.websiteurl,
p.acronym,
p.title,
p.startdate,
p.enddate,
p.call_identifier,
p.keywords,
p.duration,
p.ec_sc39,
p.oa_mandate_for_publications,
p.ec_article29_3,
p.dateofcollection,
p.inferred,
p.deletedbyinference,
p.trust,
p.inferenceprovenance,
p.contactfullname,
p.contactfax,
p.contactphone,
p.contactemail,
p.summary,
p.currency,
p.totalcost,
p.fundedamount,
dc.id,
dc.officialname,
pac.code, pac.name, pas.code, pas.name,
ctc.code, ctc.name, cts.code, cts.name;

View File

@ -18,13 +18,13 @@ public class GraphMappingUtils {
public final static Map<String, Class> types = Maps.newHashMap();
static {
types.put("datasource", Datasource.class);
types.put("organization", Organization.class);
types.put("datasource", Datasource.class);
types.put("organization", Organization.class);
types.put("project", Project.class);
types.put("dataset", Dataset.class);
types.put("otherresearchproduct", OtherResearchProduct.class);
types.put("software", Software.class);
types.put("publication", Publication.class);
types.put("dataset", Dataset.class);
types.put("otherresearchproduct", OtherResearchProduct.class);
types.put("software", Software.class);
types.put("publication", Publication.class);
types.put("relation", Relation.class);
}

View File

@ -1,53 +1,43 @@
package eu.dnetlib.dhp.graph;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import scala.Tuple2;
public class SparkGraphImporterJob {
public static void main(final String[] args) throws Exception {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json")));
parser.parseArgument(args);
final SparkSession spark = SparkSession
.builder()
.appName(SparkGraphImporterJob.class.getSimpleName())
.master(parser.get("master"))
.config("hive.metastore.uris", parser.get("hive_metastore_uris"))
.enableHiveSupport()
.getOrCreate();
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json")));
parser.parseArgument(args);
final SparkSession spark = SparkSession
.builder()
.appName(SparkGraphImporterJob.class.getSimpleName())
.master(parser.get("master"))
.config("hive.metastore.uris", parser.get("hive_metastore_uris"))
.enableHiveSupport()
.getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final String inputPath = parser.get("sourcePath");
final String hiveDbName = parser.get("hive_db_name");
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final String inputPath = parser.get("sourcePath");
final String hiveDbName = parser.get("hive_db_name");
spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName));
spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName));
// Read the input file and convert it into RDD of serializable object
GraphMappingUtils.types.forEach((name, clazz) -> {
final JavaRDD<Tuple2<String, String>> inputRDD = sc.sequenceFile(inputPath + "/" + name, Text.class, Text.class)
.map(item -> new Tuple2<>(item._1.toString(), item._2.toString()));
// Read the input file and convert it into RDD of serializable object
GraphMappingUtils.types.forEach((name, clazz) -> {
spark.createDataset(sc.sequenceFile(inputPath + "/" + name, Text.class, Text.class)
.map(s -> new ObjectMapper().readValue(s._2().toString(), clazz))
.rdd(), Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
.saveAsTable(hiveDbName + "." + name);
});
spark.createDataset(inputRDD
.filter(s -> s._1().equals(clazz.getName()))
.map(Tuple2::_2)
.map(s -> new ObjectMapper().readValue(s, clazz))
.rdd(), Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
.saveAsTable(hiveDbName + "." + name);
});
}
}
}

View File

@ -0,0 +1,10 @@
sparkDriverMemory=8G
sparkExecutorMemory=8G
#isLookupUrl=http://services.openaire.eu:8280/is/services/isLookUp
isLookupUrl=http://beta.services.openaire.eu:8280/is/services/isLookUp?wsdl
sourcePath=/tmp/db_openaireplus_services.export_dhp.2020.02.03
outputPath=/tmp/openaire_provision
format=TMF
batchSize=2000
sparkExecutorCoresForIndexing=64
reuseRecords=true

View File

@ -0,0 +1,92 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId>
<version>1.1.6-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-graph-provision</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
<dependency>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
</dependency>
<dependency>
<groupId>jaxen</groupId>
<artifactId>jaxen</artifactId>
</dependency>
<dependency>
<groupId>com.mycila.xmltool</groupId>
<artifactId>xmltool</artifactId>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>stringtemplate</artifactId>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
</dependency>
<dependency>
<groupId>com.lucidworks.spark</groupId>
<artifactId>spark-solr</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
</dependency>
<dependency>
<groupId>org.noggit</groupId>
<artifactId>noggit</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>cnr-rmi-api</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,257 @@
package eu.dnetlib.dhp.graph;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.graph.model.*;
import eu.dnetlib.dhp.graph.utils.ContextMapper;
import eu.dnetlib.dhp.graph.utils.GraphMappingUtils;
import eu.dnetlib.dhp.graph.utils.XmlRecordFactory;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.graph.utils.GraphMappingUtils.asRelatedEntity;
/**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
* The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization,
* and all the possible relationships (similarity links produced by the Dedup process are excluded).
*
* The operation is implemented creating the union between the entity types (E), joined by the relationships (R), and again
* by E, finally grouped by E.id;
*
* Different manipulations of the E and R sets are introduced to reduce the complexity of the operation
* 1) treat the object payload as string, extracting only the necessary information beforehand using json path,
* it seems that deserializing it with jackson's object mapper has higher memory footprint.
*
* 2) only consider rels that are not virtually deleted ($.dataInfo.deletedbyinference == false)
* 3) we only need a subset of fields from the related entities, so we introduce a distinction between E_source = S
* and E_target = T. Objects in T are heavily pruned by all the unnecessary information
*
* 4) perform the join as (((T join R) union S) groupby S.id) yield S -> [ <T, R> ]
*/
public class GraphJoiner implements Serializable {
public static final int MAX_RELS = 100;
public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd";
private SparkSession spark;
private ContextMapper contextMapper;
private String inputPath;
private String outPath;
public GraphJoiner(SparkSession spark, ContextMapper contextMapper, String inputPath, String outPath) {
this.spark = spark;
this.contextMapper = contextMapper;
this.inputPath = inputPath;
this.outPath = outPath;
}
public GraphJoiner adjacencyLists() {
final JavaSparkContext sc = new JavaSparkContext(getSpark().sparkContext());
// read each entity
JavaPairRDD<String, TypedRow> datasource = readPathEntity(sc, getInputPath(), "datasource");
JavaPairRDD<String, TypedRow> organization = readPathEntity(sc, getInputPath(), "organization");
JavaPairRDD<String, TypedRow> project = readPathEntity(sc, getInputPath(), "project");
JavaPairRDD<String, TypedRow> dataset = readPathEntity(sc, getInputPath(), "dataset");
JavaPairRDD<String, TypedRow> otherresearchproduct = readPathEntity(sc, getInputPath(), "otherresearchproduct");
JavaPairRDD<String, TypedRow> software = readPathEntity(sc, getInputPath(), "software");
JavaPairRDD<String, TypedRow> publication = readPathEntity(sc, getInputPath(), "publication");
// create the union between all the entities
final String entitiesPath = getOutPath() + "/entities";
datasource
.union(organization)
.union(project)
.union(dataset)
.union(otherresearchproduct)
.union(software)
.union(publication)
.map(e -> new EntityRelEntity().setSource(e._2()))
.map(GraphMappingUtils::serialize)
.saveAsTextFile(entitiesPath, GzipCodec.class);
JavaPairRDD<String, EntityRelEntity> entities = sc.textFile(entitiesPath)
.map(t -> new ObjectMapper().readValue(t, EntityRelEntity.class))
.mapToPair(t -> new Tuple2<>(t.getSource().getSourceId(), t));
// reads the relationships
final JavaPairRDD<String, EntityRelEntity> relation = readPathRelation(sc, getInputPath())
.filter(r -> !r.getDeleted()) //only consider those that are not virtually deleted
.map(p -> new EntityRelEntity().setRelation(p))
.mapToPair(p -> new Tuple2<>(p.getRelation().getSourceId(), p))
.groupByKey()
.map(p -> Iterables.limit(p._2(), MAX_RELS))
.flatMap(p -> p.iterator())
.mapToPair(p -> new Tuple2<>(p.getRelation().getTargetId(), p));
//final String bySource = getOutPath() + "/1_join_by_target";
JavaPairRDD<String, EntityRelEntity> bySource = relation
.join(entities
.filter(e -> !e._2().getSource().getDeleted())
.mapToPair(e -> new Tuple2<>(e._1(), asRelatedEntity(e._2()))))
.map(s -> new EntityRelEntity()
.setRelation(s._2()._1().getRelation())
.setTarget(s._2()._2().getSource()))
.mapToPair(t -> new Tuple2<>(t.getRelation().getSourceId(), t));
final XmlRecordFactory recordFactory = new XmlRecordFactory(contextMapper, false, schemaLocation, new HashSet<>());
entities
.union(bySource)
.groupByKey() // by source id
.map(l -> toJoinedEntity(l))
.mapToPair(je -> new Tuple2<>(
new Text(je.getEntity().getId()),
new Text(recordFactory.build(je))))
.saveAsHadoopFile(getOutPath() + "/xml", Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
return this;
}
public GraphJoiner asXML() {
final JavaSparkContext sc = new JavaSparkContext(getSpark().sparkContext());
final XmlRecordFactory recordFactory = new XmlRecordFactory(contextMapper, true, "", new HashSet<>());
final ObjectMapper mapper = new ObjectMapper();
final String joinedEntitiesPath = getOutPath() + "/1_joined_entities";
sc.textFile(joinedEntitiesPath)
.map(s -> mapper.readValue(s, JoinedEntity.class))
.mapToPair(je -> new Tuple2<>(new Text(je.getEntity().getId()), new Text(recordFactory.build(je))))
.saveAsHadoopFile(getOutPath() + "/2_xml", Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
return this;
}
public SparkSession getSpark() {
return spark;
}
public String getInputPath() {
return inputPath;
}
public String getOutPath() {
return outPath;
}
// HELPERS
private OafEntity parseOaf(final String json, final String type) {
final ObjectMapper o = new ObjectMapper();
try {
switch (GraphMappingUtils.EntityType.valueOf(type)) {
case publication:
return o.readValue(json, Publication.class);
case dataset:
return o.readValue(json, Dataset.class);
case otherresearchproduct:
return o.readValue(json, OtherResearchProduct.class);
case software:
return o.readValue(json, Software.class);
case datasource:
return o.readValue(json, Datasource.class);
case organization:
return o.readValue(json, Organization.class);
case project:
return o.readValue(json, Project.class);
default:
throw new IllegalArgumentException("invalid type: " + type);
}
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
private JoinedEntity toJoinedEntity(Tuple2<String, Iterable<EntityRelEntity>> p) {
final ObjectMapper o = new ObjectMapper();
final JoinedEntity j = new JoinedEntity();
final Links links2 = new Links();
for(EntityRelEntity rel : p._2()) {
if (rel.hasMainEntity() & j.getEntity() == null) {
j.setType(rel.getSource().getType());
j.setEntity(parseOaf(rel.getSource().getOaf(), rel.getSource().getType()));
}
if (rel.hasRelatedEntity()) {
try {
links2.add(
new eu.dnetlib.dhp.graph.model.Tuple2()
.setRelation(o.readValue(rel.getRelation().getOaf(), Relation.class))
.setRelatedEntity(o.readValue(rel.getTarget().getOaf(), RelatedEntity.class)));
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}
j.setLinks(links2);
if (j.getEntity() == null) {
throw new IllegalStateException("missing main entity on '" + p._1() + "'");
}
return j;
}
/**
* Reads a set of eu.dnetlib.dhp.schema.oaf.OafEntity objects from a sequence file <className, entity json serialization>,
* extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow
* @param sc
* @param inputPath
* @param type
* @return the JavaPairRDD<String, TypedRow> indexed by entity identifier
*/
private JavaPairRDD<String, TypedRow> readPathEntity(final JavaSparkContext sc, final String inputPath, final String type) {
return sc.sequenceFile(inputPath + "/" + type, Text.class, Text.class)
.mapToPair((PairFunction<Tuple2<Text, Text>, String, TypedRow>) item -> {
final String s = item._2().toString();
final DocumentContext json = JsonPath.parse(s);
final String id = json.read("$.id");
return new Tuple2<>(id, new TypedRow()
.setSourceId(id)
.setDeleted(json.read("$.dataInfo.deletedbyinference"))
.setType(type)
.setOaf(s));
});
}
/**
* Reads a set of eu.dnetlib.dhp.schema.oaf.Relation objects from a sequence file <className, relation json serialization>,
* extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow
* @param sc
* @param inputPath
* @return the JavaRDD<TypedRow> containing all the relationships
*/
private JavaRDD<TypedRow> readPathRelation(final JavaSparkContext sc, final String inputPath) {
return sc.sequenceFile(inputPath + "/relation", Text.class, Text.class)
.map(item -> {
final String s = item._2().toString();
final DocumentContext json = JsonPath.parse(s);
return new TypedRow()
.setSourceId(json.read("$.source"))
.setTargetId(json.read("$.target"))
.setDeleted(json.read("$.dataInfo.deletedbyinference"))
.setType("relation")
.setOaf(s);
});
}
}

View File

@ -0,0 +1,188 @@
package eu.dnetlib.dhp.graph;
import com.lucidworks.spark.util.SolrSupport;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.graph.utils.ISLookupClientFactory;
import eu.dnetlib.dhp.graph.utils.StreamingInputDocumentFactory;
import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.solr.common.SolrInputDocument;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.SparkSession;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.stream.StreamSource;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.text.SimpleDateFormat;
import java.util.Date;
public class SparkXmlIndexingJob {
private static final Log log = LogFactory.getLog(SparkXmlIndexingJob.class);
private static final Integer DEFAULT_BATCH_SIZE = 1000;
private static final String LAYOUT = "index";
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkXmlIndexingJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_params_update_index.json")));
parser.parseArgument(args);
final String inputPath = parser.get("sourcePath");
final String isLookupUrl = parser.get("isLookupUrl");
final String format = parser.get("format");
final Integer batchSize = parser.getObjectMap().containsKey("batchSize") ? Integer.valueOf(parser.get("batchSize")) : DEFAULT_BATCH_SIZE;
final ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl);
final String fields = getLayoutSource(isLookup, format);
final String xslt = getLayoutTransformer(isLookup);
final String dsId = getDsId(format, isLookup);
final String zkHost = getZkHost(isLookup);
final String version = getRecordDatestamp();
final String indexRecordXslt = getLayoutTransformer(format, fields, xslt);
log.info("indexRecordTransformer: " + indexRecordXslt);
final String master = parser.get("master");
final SparkConf conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
try(SparkSession spark = getSession(conf, master)) {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
RDD<SolrInputDocument> docs = sc.sequenceFile(inputPath, Text.class, Text.class)
.map(t -> t._2().toString())
.map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s))
.map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s))
.rdd();
SolrSupport.indexDocs(zkHost, format + "-" + LAYOUT + "-openaire", batchSize, docs);
}
}
private static SparkSession getSession(SparkConf conf, String master) {
return SparkSession
.builder()
.config(conf)
.appName(SparkXmlRecordBuilderJob.class.getSimpleName())
.master(master)
.getOrCreate();
}
private static String toIndexRecord(Transformer tr, final String record) {
final StreamResult res = new StreamResult(new StringWriter());
try {
tr.transform(new StreamSource(new StringReader(record)), res);
return res.getWriter().toString();
} catch (Throwable e) {
System.out.println("XPathException on record:\n" + record);
throw new IllegalArgumentException(e);
}
}
/**
* Creates the XSLT responsible for building the index xml records.
*
* @param format Metadata format name (DMF|TMF)
* @param xslt xslt for building the index record transformer
* @param fields the list of fields
* @return the javax.xml.transform.Transformer
* @throws ISLookUpException could happen
* @throws IOException could happen
* @throws TransformerException could happen
*/
private static String getLayoutTransformer(String format, String fields, String xslt) throws TransformerException {
final Transformer layoutTransformer = SaxonTransformerFactory.newInstance(xslt);
final StreamResult layoutToXsltXslt = new StreamResult(new StringWriter());
layoutTransformer.setParameter("format", format);
layoutTransformer.transform(new StreamSource(new StringReader(fields)), layoutToXsltXslt);
return layoutToXsltXslt.getWriter().toString();
}
/**
* method return a solr-compatible string representation of a date, used to mark all records as indexed today
* @return the parsed date
*/
public static String getRecordDatestamp() {
return new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss'Z'").format(new Date());
}
/**
* Method retrieves from the information system the list of fields associated to the given MDFormat name
*
* @param isLookup the ISLookup service stub
* @param format the Metadata format name
* @return the string representation of the list of fields to be indexed
*
* @throws ISLookUpDocumentNotFoundException
* @throws ISLookUpException
*/
private static String getLayoutSource(final ISLookUpService isLookup, final String format) throws ISLookUpDocumentNotFoundException, ISLookUpException {
return doLookup(isLookup, String.format(
"collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='%s']//LAYOUT[@name='%s']", format, LAYOUT));
}
/**
* Method retrieves from the information system the openaireLayoutToRecordStylesheet
*
* @param isLookup the ISLookup service stub
* @return the string representation of the XSLT contained in the transformation rule profile
*
* @throws ISLookUpDocumentNotFoundException
* @throws ISLookUpException
*/
private static String getLayoutTransformer(ISLookUpService isLookup) throws ISLookUpException {
return doLookup(isLookup, "collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType')" +
"//RESOURCE_PROFILE[./BODY/CONFIGURATION/SCRIPT/TITLE/text() = 'openaireLayoutToRecordStylesheet']//CODE/node()");
}
/**
* Method retrieves from the information system the IndexDS profile ID associated to the given MDFormat name
* @param format
* @param isLookup
* @return the IndexDS identifier
* @throws ISLookUpException
*/
private static String getDsId(String format, ISLookUpService isLookup) throws ISLookUpException {
return doLookup(isLookup, String.format("collection('/db/DRIVER/IndexDSResources/IndexDSResourceType')" +
"//RESOURCE_PROFILE[./BODY/CONFIGURATION/METADATA_FORMAT/text() = '%s']//RESOURCE_IDENTIFIER/@value/string()", format));
}
/**
* Method retrieves from the information system the zookeeper quorum of the Solr server
* @param isLookup
* @return the zookeeper quorum of the Solr server
* @throws ISLookUpException
*/
private static String getZkHost(ISLookUpService isLookup) throws ISLookUpException {
return doLookup(isLookup, "for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
}
private static String doLookup(ISLookUpService isLookup, String xquery) throws ISLookUpException {
log.info(String.format("running xquery: %s", xquery));
final String res = isLookup.getResourceProfileByQuery(xquery);
log.info(String.format("got response (100 chars): %s", StringUtils.left(res, 100) + " ..."));
return res;
}
}

View File

@ -0,0 +1,48 @@
package eu.dnetlib.dhp.graph;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.graph.utils.ContextMapper;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
public class SparkXmlRecordBuilderJob {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkXmlRecordBuilderJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_params_build_adjacency_lists.json")));
parser.parseArgument(args);
final String master = parser.get("master");
final SparkConf conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
try(SparkSession spark = getSession(conf, master)) {
final String inputPath = parser.get("sourcePath");
final String outputPath = parser.get("outputPath");
final String isLookupUrl = parser.get("isLookupUrl");
final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
if (fs.exists(new Path(outputPath))) {
fs.delete(new Path(outputPath), true);
fs.mkdirs(new Path(outputPath));
}
new GraphJoiner(spark, ContextMapper.fromIS(isLookupUrl), inputPath, outputPath)
.adjacencyLists();
}
}
private static SparkSession getSession(SparkConf conf, String master) {
return SparkSession
.builder()
.config(conf)
.appName(SparkXmlRecordBuilderJob.class.getSimpleName())
.master(master)
.getOrCreate();
}
}

View File

@ -0,0 +1,54 @@
package eu.dnetlib.dhp.graph.model;
import java.io.Serializable;
public class EntityRelEntity implements Serializable {
private TypedRow source;
private TypedRow relation;
private TypedRow target;
public EntityRelEntity() {
}
public EntityRelEntity(TypedRow source) {
this.source = source;
}
//helpers
public Boolean hasMainEntity() {
return getSource() != null & getRelation() == null & getTarget() == null;
}
public Boolean hasRelatedEntity() {
return getSource() == null & getRelation() != null & getTarget() != null;
}
public TypedRow getSource() {
return source;
}
public EntityRelEntity setSource(TypedRow source) {
this.source = source;
return this;
}
public TypedRow getRelation() {
return relation;
}
public EntityRelEntity setRelation(TypedRow relation) {
this.relation = relation;
return this;
}
public TypedRow getTarget() {
return target;
}
public EntityRelEntity setTarget(TypedRow target) {
this.target = target;
return this;
}
}

View File

@ -0,0 +1,41 @@
package eu.dnetlib.dhp.graph.model;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import java.io.Serializable;
public class JoinedEntity implements Serializable {
private String type;
private OafEntity entity;
private Links links;
public String getType() {
return type;
}
public JoinedEntity setType(String type) {
this.type = type;
return this;
}
public OafEntity getEntity() {
return entity;
}
public JoinedEntity setEntity(OafEntity entity) {
this.entity = entity;
return this;
}
public Links getLinks() {
return links;
}
public JoinedEntity setLinks(Links links) {
this.links = links;
return this;
}
}

View File

@ -0,0 +1,6 @@
package eu.dnetlib.dhp.graph.model;
import java.util.ArrayList;
public class Links extends ArrayList<Tuple2> {
}

View File

@ -0,0 +1,257 @@
package eu.dnetlib.dhp.graph.model;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
public class RelatedEntity implements Serializable {
private String id;
private String type;
// common fields
private StructuredProperty title;
private String websiteurl; // datasource, organizations, projects
// results
private String dateofacceptance;
private String publisher;
private List<StructuredProperty> pid;
private String codeRepositoryUrl;
private Qualifier resulttype;
private List<KeyValue> collectedfrom;
private List<Instance> instances;
// datasource
private String officialname;
private Qualifier datasourcetype;
private Qualifier datasourcetypeui;
private Qualifier openairecompatibility;
//private String aggregatortype;
// organization
private String legalname;
private String legalshortname;
private Qualifier country;
// project
private String projectTitle;
private String code;
private String acronym;
private Qualifier contracttype;
private List<String> fundingtree;
public String getId() {
return id;
}
public RelatedEntity setId(String id) {
this.id = id;
return this;
}
public StructuredProperty getTitle() {
return title;
}
public RelatedEntity setTitle(StructuredProperty title) {
this.title = title;
return this;
}
public String getDateofacceptance() {
return dateofacceptance;
}
public RelatedEntity setDateofacceptance(String dateofacceptance) {
this.dateofacceptance = dateofacceptance;
return this;
}
public String getPublisher() {
return publisher;
}
public RelatedEntity setPublisher(String publisher) {
this.publisher = publisher;
return this;
}
public List<StructuredProperty> getPid() {
return pid;
}
public RelatedEntity setPid(List<StructuredProperty> pid) {
this.pid = pid;
return this;
}
public String getCodeRepositoryUrl() {
return codeRepositoryUrl;
}
public RelatedEntity setCodeRepositoryUrl(String codeRepositoryUrl) {
this.codeRepositoryUrl = codeRepositoryUrl;
return this;
}
public Qualifier getResulttype() {
return resulttype;
}
public RelatedEntity setResulttype(Qualifier resulttype) {
this.resulttype = resulttype;
return this;
}
public List<KeyValue> getCollectedfrom() {
return collectedfrom;
}
public RelatedEntity setCollectedfrom(List<KeyValue> collectedfrom) {
this.collectedfrom = collectedfrom;
return this;
}
public List<Instance> getInstances() {
return instances;
}
public RelatedEntity setInstances(List<Instance> instances) {
this.instances = instances;
return this;
}
public String getOfficialname() {
return officialname;
}
public RelatedEntity setOfficialname(String officialname) {
this.officialname = officialname;
return this;
}
public String getWebsiteurl() {
return websiteurl;
}
public RelatedEntity setWebsiteurl(String websiteurl) {
this.websiteurl = websiteurl;
return this;
}
public Qualifier getDatasourcetype() {
return datasourcetype;
}
public RelatedEntity setDatasourcetype(Qualifier datasourcetype) {
this.datasourcetype = datasourcetype;
return this;
}
public Qualifier getDatasourcetypeui() {
return datasourcetypeui;
}
public RelatedEntity setDatasourcetypeui(Qualifier datasourcetypeui) {
this.datasourcetypeui = datasourcetypeui;
return this;
}
public Qualifier getOpenairecompatibility() {
return openairecompatibility;
}
public RelatedEntity setOpenairecompatibility(Qualifier openairecompatibility) {
this.openairecompatibility = openairecompatibility;
return this;
}
public String getLegalname() {
return legalname;
}
public RelatedEntity setLegalname(String legalname) {
this.legalname = legalname;
return this;
}
public String getLegalshortname() {
return legalshortname;
}
public RelatedEntity setLegalshortname(String legalshortname) {
this.legalshortname = legalshortname;
return this;
}
public Qualifier getCountry() {
return country;
}
public RelatedEntity setCountry(Qualifier country) {
this.country = country;
return this;
}
public String getCode() {
return code;
}
public RelatedEntity setCode(String code) {
this.code = code;
return this;
}
public String getAcronym() {
return acronym;
}
public RelatedEntity setAcronym(String acronym) {
this.acronym = acronym;
return this;
}
public Qualifier getContracttype() {
return contracttype;
}
public RelatedEntity setContracttype(Qualifier contracttype) {
this.contracttype = contracttype;
return this;
}
public List<String> getFundingtree() {
return fundingtree;
}
public RelatedEntity setFundingtree(List<String> fundingtree) {
this.fundingtree = fundingtree;
return this;
}
public String getProjectTitle() {
return projectTitle;
}
public RelatedEntity setProjectTitle(String projectTitle) {
this.projectTitle = projectTitle;
return this;
}
public String getType() {
return type;
}
public RelatedEntity setType(String type) {
this.type = type;
return this;
}
}

View File

@ -0,0 +1,28 @@
package eu.dnetlib.dhp.graph.model;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class Tuple2 {
private Relation relation;
private RelatedEntity relatedEntity;
public Relation getRelation() {
return relation;
}
public Tuple2 setRelation(Relation relation) {
this.relation = relation;
return this;
}
public RelatedEntity getRelatedEntity() {
return relatedEntity;
}
public Tuple2 setRelatedEntity(RelatedEntity relatedEntity) {
this.relatedEntity = relatedEntity;
return this;
}
}

View File

@ -0,0 +1,61 @@
package eu.dnetlib.dhp.graph.model;
import java.io.Serializable;
public class TypedRow implements Serializable {
private String sourceId;
private String targetId;
private Boolean deleted;
private String type;
private String oaf;
public String getSourceId() {
return sourceId;
}
public TypedRow setSourceId(String sourceId) {
this.sourceId = sourceId;
return this;
}
public String getTargetId() {
return targetId;
}
public TypedRow setTargetId(String targetId) {
this.targetId = targetId;
return this;
}
public Boolean getDeleted() {
return deleted;
}
public TypedRow setDeleted(Boolean deleted) {
this.deleted = deleted;
return this;
}
public String getType() {
return type;
}
public TypedRow setType(String type) {
this.type = type;
return this;
}
public String getOaf() {
return oaf;
}
public TypedRow setOaf(String oaf) {
this.oaf = oaf;
return this;
}
}

View File

@ -0,0 +1,51 @@
package eu.dnetlib.dhp.graph.utils;
import java.io.Serializable;
public class ContextDef implements Serializable {
private String id;
private String label;
private String name;
private String type;
public ContextDef(final String id, final String label, final String name, final String type) {
super();
this.setId(id);
this.setLabel(label);
this.setName(name);
this.setType(type);
}
public String getLabel() {
return label;
}
public void setLabel(final String label) {
this.label = label;
}
public String getId() {
return id;
}
public void setId(final String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public String getType() {
return type;
}
public void setType(final String type) {
this.type = type;
}
}

View File

@ -0,0 +1,45 @@
package eu.dnetlib.dhp.graph.utils;
import com.google.common.base.Joiner;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import java.io.Serializable;
import java.io.StringReader;
import java.util.HashMap;
public class ContextMapper extends HashMap<String, ContextDef> implements Serializable {
private static final long serialVersionUID = 2159682308502487305L;
private final static String XQUERY = "for $x in //RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='ContextDSResourceType']//*[name()='context' or name()='category' or name()='concept'] return <entry id=\"{$x/@id}\" label=\"{$x/@label|$x/@name}\" name=\"{$x/name()}\" type=\"{$x/@type}\"/>";
public static ContextMapper fromIS(final String isLookupUrl) throws DocumentException, ISLookUpException {
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
StringBuilder sb = new StringBuilder("<ContextDSResources>");
Joiner.on("").appendTo(sb, isLookUp.quickSearchProfile(XQUERY));
sb.append("</ContextDSResources>");
return fromXml(sb.toString());
}
public static ContextMapper fromXml(final String xml) throws DocumentException {
final ContextMapper contextMapper = new ContextMapper();
final Document doc = new SAXReader().read(new StringReader(xml));
for (Object o : doc.selectNodes("//entry")) {
Node node = (Node) o;
String id = node.valueOf("./@id");
String label = node.valueOf("./@label");
String name = node.valueOf("./@name");
String type = node.valueOf("./@type") + "";
contextMapper.put(id, new ContextDef(id, label, name, type));
}
return contextMapper;
}
}

View File

@ -0,0 +1,254 @@
package eu.dnetlib.dhp.graph.utils;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.graph.model.EntityRelEntity;
import eu.dnetlib.dhp.graph.model.RelatedEntity;
import eu.dnetlib.dhp.graph.model.TypedRow;
import eu.dnetlib.dhp.schema.oaf.*;
import net.minidev.json.JSONArray;
import org.apache.commons.lang3.StringUtils;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.*;
public class GraphMappingUtils {
public enum EntityType {
publication, dataset, otherresearchproduct, software, datasource, organization, project
}
public enum MainEntityType {
result, datasource, organization, project
}
public static Set<String> authorPidTypes = Sets.newHashSet("orcid", "magidentifier");
public static Set<String> instanceFieldFilter = Sets.newHashSet("instancetype", "hostedby", "license", "accessright", "collectedfrom", "dateofacceptance", "distributionlocation");
private static BiMap<String, String> relClassMapping = HashBiMap.create();
static {
relClassMapping.put("isAuthorInstitutionOf", "hasAuthorInstitution");
relClassMapping.put("isMergedIn", "merges");
relClassMapping.put("isProducedBy", "produces");
relClassMapping.put("hasParticipant", "isParticipant");
relClassMapping.put("isProvidedBy", "provides");
relClassMapping.put("isRelatedTo", "isRelatedTo");
relClassMapping.put("isAmongTopNSimilarDocuments", "hasAmongTopNSimilarDocuments");
relClassMapping.put("isRelatedTo", "isRelatedTo");
relClassMapping.put("isSupplementTo", "isSupplementedBy");
}
public static String getInverseRelClass(final String relClass) {
String res = relClassMapping.get(relClass);
if (isNotBlank(res)) {
return res;
}
res = relClassMapping.inverse().get(relClass);
if (isNotBlank(res)) {
return res;
}
throw new IllegalArgumentException("unable to find an inverse relationship class for term: " + relClass);
}
private static final String schemeTemplate = "dnet:%s_%s_relations";
private static Map<EntityType, MainEntityType> entityMapping = Maps.newHashMap();
static {
entityMapping.put(EntityType.publication, MainEntityType.result);
entityMapping.put(EntityType.dataset, MainEntityType.result);
entityMapping.put(EntityType.otherresearchproduct, MainEntityType.result);
entityMapping.put(EntityType.software, MainEntityType.result);
entityMapping.put(EntityType.datasource, MainEntityType.datasource);
entityMapping.put(EntityType.organization, MainEntityType.organization);
entityMapping.put(EntityType.project, MainEntityType.project);
}
public static String getScheme(final String sourceType, final String targetType) {
return String.format(schemeTemplate,
entityMapping.get(EntityType.valueOf(sourceType)).name(),
entityMapping.get(EntityType.valueOf(targetType)).name());
}
public static String getMainType(final String type) {
return entityMapping.get(EntityType.valueOf(type)).name();
}
public static boolean isResult(String type) {
return MainEntityType.result.name().equals(getMainType(type));
}
public static Predicate<String> instanceFilter = s -> instanceFieldFilter.contains(s);
public static EntityRelEntity asRelatedEntity(EntityRelEntity e) {
final DocumentContext j = JsonPath.parse(e.getSource().getOaf());
final RelatedEntity re = new RelatedEntity().setId(j.read("$.id")).setType(e.getSource().getType());
switch (EntityType.valueOf(e.getSource().getType())) {
case publication:
case dataset:
case otherresearchproduct:
case software:
mapTitle(j, re);
re.setDateofacceptance(j.read("$.dateofacceptance.value"));
re.setPublisher(j.read("$.publisher.value"));
JSONArray pids = j.read("$.pid");
re.setPid(pids.stream()
.map(p -> asStructuredProperty((LinkedHashMap<String, Object>) p))
.collect(Collectors.toList()));
re.setResulttype(asQualifier(j.read("$.resulttype")));
JSONArray collfrom = j.read("$.collectedfrom");
re.setCollectedfrom(collfrom.stream()
.map(c -> asKV((LinkedHashMap<String, Object>) c))
.collect(Collectors.toList()));
// will throw exception when the instance is not found
JSONArray instances = j.read("$.instance");
re.setInstances(instances.stream()
.map(i -> {
final LinkedHashMap<String, Object> p = (LinkedHashMap<String, Object>) i;
final Field<String> license = new Field<String>();
license.setValue((String) ((LinkedHashMap<String, Object>) p.get("license")).get("value"));
final Instance instance = new Instance();
instance.setLicense(license);
instance.setAccessright(asQualifier((LinkedHashMap<String, String>) p.get("accessright")));
instance.setInstancetype(asQualifier((LinkedHashMap<String, String>) p.get("instancetype")));
instance.setHostedby(asKV((LinkedHashMap<String, Object>) p.get("hostedby")));
//TODO mapping of distributionlocation
instance.setCollectedfrom(asKV((LinkedHashMap<String, Object>) p.get("collectedfrom")));
Field<String> dateofacceptance = new Field<String>();
dateofacceptance.setValue((String) ((LinkedHashMap<String, Object>) p.get("dateofacceptance")).get("value"));
instance.setDateofacceptance(dateofacceptance);
return instance;
}).collect(Collectors.toList()));
//TODO still to be mapped
//re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));
break;
case datasource:
re.setOfficialname(j.read("$.officialname.value"));
re.setWebsiteurl(j.read("$.websiteurl.value"));
re.setDatasourcetype(asQualifier(j.read("$.datasourcetype")));
re.setOpenairecompatibility(asQualifier(j.read("$.openairecompatibility")));
break;
case organization:
re.setLegalname(j.read("$.legalname.value"));
re.setLegalshortname(j.read("$.legalshortname.value"));
re.setCountry(asQualifier(j.read("$.country")));
break;
case project:
re.setProjectTitle(j.read("$.title.value"));
re.setCode(j.read("$.code.value"));
re.setAcronym(j.read("$.acronym.value"));
re.setContracttype(asQualifier(j.read("$.contracttype")));
JSONArray f = j.read("$.fundingtree");
if (!f.isEmpty()) {
re.setFundingtree(f.stream()
.map(s -> ((LinkedHashMap<String, String>) s).get("value"))
.collect(Collectors.toList()));
}
break;
}
return new EntityRelEntity().setSource(
new TypedRow()
.setSourceId(e.getSource().getSourceId())
.setDeleted(e.getSource().getDeleted())
.setType(e.getSource().getType())
.setOaf(serialize(re)));
}
private static KeyValue asKV(LinkedHashMap<String, Object> j) {
final KeyValue kv = new KeyValue();
kv.setKey((String) j.get("key"));
kv.setValue((String) j.get("value"));
return kv;
}
private static void mapTitle(DocumentContext j, RelatedEntity re) {
final JSONArray a = j.read("$.title");
if (!a.isEmpty()) {
final StructuredProperty sp = asStructuredProperty((LinkedHashMap<String, Object>) a.get(0));
if (StringUtils.isNotBlank(sp.getValue())) {
re.setTitle(sp);
}
}
}
private static StructuredProperty asStructuredProperty(LinkedHashMap<String, Object> j) {
final StructuredProperty sp = new StructuredProperty();
final String value = (String) j.get("value");
if (StringUtils.isNotBlank(value)) {
sp.setValue((String) j.get("value"));
sp.setQualifier(asQualifier((LinkedHashMap<String, String>) j.get("qualifier")));
}
return sp;
}
public static Qualifier asQualifier(LinkedHashMap<String, String> j) {
final Qualifier q = new Qualifier();
final String classid = j.get("classid");
if (StringUtils.isNotBlank(classid)) {
q.setClassid(classid);
}
final String classname = j.get("classname");
if (StringUtils.isNotBlank(classname)) {
q.setClassname(classname);
}
final String schemeid = j.get("schemeid");
if (StringUtils.isNotBlank(schemeid)) {
q.setSchemeid(schemeid);
}
final String schemename = j.get("schemename");
if (StringUtils.isNotBlank(schemename)) {
q.setSchemename(schemename);
}
return q;
}
public static String serialize(final Object o) {
try {
return new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.writeValueAsString(o);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("unable to serialize: " + o.toString(), e);
}
}
public static String removePrefix(final String s) {
if (s.contains("|")) return substringAfter(s, "|");
return s;
}
}

View File

@ -0,0 +1,24 @@
package eu.dnetlib.dhp.graph.utils;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
public class ISLookupClientFactory {
private static final Log log = LogFactory.getLog(ISLookupClientFactory.class);
public static ISLookUpService getLookUpService(final String isLookupUrl) {
return getServiceStub(ISLookUpService.class, isLookupUrl);
}
@SuppressWarnings("unchecked")
private static <T> T getServiceStub(final Class<T> clazz, final String endpoint) {
log.info(String.format("creating %s stub from %s", clazz.getName(), endpoint));
final JaxWsProxyFactoryBean jaxWsProxyFactory = new JaxWsProxyFactoryBean();
jaxWsProxyFactory.setServiceClass(clazz);
jaxWsProxyFactory.setAddress(endpoint);
return (T) jaxWsProxyFactory.create();
}
}

View File

@ -0,0 +1,49 @@
package eu.dnetlib.dhp.graph.utils;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import java.util.Comparator;
public class LicenseComparator implements Comparator<Qualifier> {
@Override
public int compare(Qualifier left, Qualifier right) {
if (left == null && right == null) return 0;
if (left == null) return 1;
if (right == null) return -1;
String lClass = left.getClassid();
String rClass = right.getClassid();
if (lClass.equals(rClass)) return 0;
if (lClass.equals("OPEN SOURCE")) return -1;
if (rClass.equals("OPEN SOURCE")) return 1;
if (lClass.equals("OPEN")) return -1;
if (rClass.equals("OPEN")) return 1;
if (lClass.equals("6MONTHS")) return -1;
if (rClass.equals("6MONTHS")) return 1;
if (lClass.equals("12MONTHS")) return -1;
if (rClass.equals("12MONTHS")) return 1;
if (lClass.equals("EMBARGO")) return -1;
if (rClass.equals("EMBARGO")) return 1;
if (lClass.equals("RESTRICTED")) return -1;
if (rClass.equals("RESTRICTED")) return 1;
if (lClass.equals("CLOSED")) return -1;
if (rClass.equals("CLOSED")) return 1;
if (lClass.equals("UNKNOWN")) return -1;
if (rClass.equals("UNKNOWN")) return 1;
// Else (but unlikely), lexicographical ordering will do.
return lClass.compareTo(rClass);
}
}

View File

@ -0,0 +1,253 @@
package eu.dnetlib.dhp.graph.utils;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import javax.xml.stream.*;
import javax.xml.stream.events.Namespace;
import javax.xml.stream.events.StartElement;
import javax.xml.stream.events.XMLEvent;
import com.google.common.collect.Lists;
import org.apache.solr.common.SolrInputDocument;
/**
* Optimized version of the document parser, drop in replacement of InputDocumentFactory.
*
* <p>
* Faster because:
* </p>
* <ul>
* <li>Doesn't create a DOM for the full document</li>
* <li>Doesn't execute xpaths agains the DOM</li>
* <li>Quickly serialize the 'result' element directly in a string.</li>
* <li>Uses less memory: less pressure on GC and allows more threads to process this in parallel</li>
* </ul>
*
* <p>
* This class is fully reentrant and can be invoked in parallel.
* </p>
*
* @author claudio
*
*/
public class StreamingInputDocumentFactory {
private static final String INDEX_FIELD_PREFIX = "__";
private static final String DS_VERSION = INDEX_FIELD_PREFIX + "dsversion";
private static final String DS_ID = INDEX_FIELD_PREFIX + "dsid";
private static final String RESULT = "result";
private static final String INDEX_RESULT = INDEX_FIELD_PREFIX + RESULT;
private static final String INDEX_RECORD_ID = INDEX_FIELD_PREFIX + "indexrecordidentifier";
private static final String outFormat = new String("yyyy-MM-dd'T'hh:mm:ss'Z'");
private final static List<String> dateFormats = Arrays.asList("yyyy-MM-dd'T'hh:mm:ss", "yyyy-MM-dd", "dd-MM-yyyy", "dd/MM/yyyy", "yyyy");
private static final String DEFAULTDNETRESULT = "dnetResult";
private static final String TARGETFIELDS = "targetFields";
private static final String INDEX_RECORD_ID_ELEMENT = "indexRecordIdentifier";
private static final String ROOT_ELEMENT = "indexRecord";
private static final int MAX_FIELD_LENGTH = 25000;
private ThreadLocal<XMLInputFactory> inputFactory = ThreadLocal.withInitial(() -> XMLInputFactory.newInstance());
private ThreadLocal<XMLOutputFactory> outputFactory = ThreadLocal.withInitial(() -> XMLOutputFactory.newInstance());
private ThreadLocal<XMLEventFactory> eventFactory = ThreadLocal.withInitial(() -> XMLEventFactory.newInstance());
private String version;
private String dsId;
private String resultName = DEFAULTDNETRESULT;
public StreamingInputDocumentFactory(final String version, final String dsId) {
this(version, dsId, DEFAULTDNETRESULT);
}
public StreamingInputDocumentFactory(final String version, final String dsId, final String resultName) {
this.version = version;
this.dsId = dsId;
this.resultName = resultName;
}
public SolrInputDocument parseDocument(final String inputDocument) {
final StringWriter results = new StringWriter();
final List<Namespace> nsList = Lists.newLinkedList();
try {
XMLEventReader parser = inputFactory.get().createXMLEventReader(new StringReader(inputDocument));
final SolrInputDocument indexDocument = new SolrInputDocument(new HashMap<>());
while (parser.hasNext()) {
final XMLEvent event = parser.nextEvent();
if ((event != null) && event.isStartElement()) {
final String localName = event.asStartElement().getName().getLocalPart();
if (ROOT_ELEMENT.equals(localName)) {
nsList.addAll(getNamespaces(event));
} else if (INDEX_RECORD_ID_ELEMENT.equals(localName)) {
final XMLEvent text = parser.nextEvent();
String recordId = getText(text);
indexDocument.addField(INDEX_RECORD_ID, recordId);
} else if (TARGETFIELDS.equals(localName)) {
parseTargetFields(indexDocument, parser);
} else if (resultName.equals(localName)) {
copyResult(indexDocument, results, parser, nsList, resultName);
}
}
}
if (version != null) {
indexDocument.addField(DS_VERSION, version);
}
if (dsId != null) {
indexDocument.addField(DS_ID, dsId);
}
if (!indexDocument.containsKey(INDEX_RECORD_ID)) {
indexDocument.clear();
System.err.println("missing indexrecord id:\n" + inputDocument);
}
return indexDocument;
} catch (XMLStreamException e) {
return new SolrInputDocument();
}
}
private List<Namespace> getNamespaces(final XMLEvent event) {
final List<Namespace> res = Lists.newLinkedList();
@SuppressWarnings("unchecked")
Iterator<Namespace> nsIter = event.asStartElement().getNamespaces();
while (nsIter.hasNext()) {
Namespace ns = nsIter.next();
res.add(ns);
}
return res;
}
/**
* Parse the targetFields block and add fields to the solr document.
*
* @param indexDocument
* @param parser
* @throws XMLStreamException
*/
protected void parseTargetFields(final SolrInputDocument indexDocument, final XMLEventReader parser) throws XMLStreamException {
boolean hasFields = false;
while (parser.hasNext()) {
final XMLEvent targetEvent = parser.nextEvent();
if (targetEvent.isEndElement() && targetEvent.asEndElement().getName().getLocalPart().equals(TARGETFIELDS)) {
break;
}
if (targetEvent.isStartElement()) {
final String fieldName = targetEvent.asStartElement().getName().getLocalPart();
final XMLEvent text = parser.nextEvent();
String data = getText(text);
addField(indexDocument, fieldName, data);
hasFields = true;
}
}
if (!hasFields) {
indexDocument.clear();
}
}
/**
* Copy the /indexRecord/result element and children, preserving namespace declarations etc.
*
* @param indexDocument
* @param results
* @param parser
* @param nsList
* @throws XMLStreamException
*/
protected void copyResult(final SolrInputDocument indexDocument,
final StringWriter results,
final XMLEventReader parser,
final List<Namespace> nsList,
final String dnetResult) throws XMLStreamException {
final XMLEventWriter writer = outputFactory.get().createXMLEventWriter(results);
for (Namespace ns : nsList) {
eventFactory.get().createNamespace(ns.getPrefix(), ns.getNamespaceURI());
}
StartElement newRecord = eventFactory.get().createStartElement("", null, RESULT, null, nsList.iterator());
// new root record
writer.add(newRecord);
// copy the rest as it is
while (parser.hasNext()) {
final XMLEvent resultEvent = parser.nextEvent();
// TODO: replace with depth tracking instead of close tag tracking.
if (resultEvent.isEndElement() && resultEvent.asEndElement().getName().getLocalPart().equals(dnetResult)) {
writer.add(eventFactory.get().createEndElement("", null, RESULT));
break;
}
writer.add(resultEvent);
}
writer.close();
indexDocument.addField(INDEX_RESULT, results.toString());
}
/**
* Helper used to add a field to a solr doc. It avoids to add empy fields
*
* @param indexDocument
* @param field
* @param value
*/
private final void addField(final SolrInputDocument indexDocument, final String field, final String value) {
String cleaned = value.trim();
if (!cleaned.isEmpty()) {
// log.info("\n\n adding field " + field.toLowerCase() + " value: " + cleaned + "\n");
indexDocument.addField(field.toLowerCase(), cleaned);
}
}
/**
* Helper used to get the string from a text element.
*
* @param text
* @return the
*/
protected final String getText(final XMLEvent text) {
if (text.isEndElement()) // log.warn("skipping because isEndOfElement " + text.asEndElement().getName().getLocalPart());
return "";
final String data = text.asCharacters().getData();
if (data != null && data.length() > MAX_FIELD_LENGTH) {
return data.substring(0, MAX_FIELD_LENGTH);
}
return data;
}
}

View File

@ -0,0 +1,107 @@
package eu.dnetlib.dhp.graph.utils;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import org.apache.commons.lang3.StringUtils;
import org.stringtemplate.v4.ST;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.graph.utils.GraphMappingUtils.removePrefix;
import static eu.dnetlib.dhp.graph.utils.XmlSerializationUtils.escapeXml;
public class TemplateFactory {
private TemplateResources resources;
private final static char DELIMITER = '$';
public TemplateFactory() {
try {
resources = new TemplateResources();
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
public String buildBody(final String type, final List<String> metadata, final List<String> rels, final List<String> children, final List<String> extraInfo) {
ST body = getTemplate(resources.getEntity());
body.add("name", type);
body.add("metadata", metadata);
body.add("rels", rels);
body.add("children", children);
body.add("extrainfo", extraInfo);
return body.render();
}
public String getChild(final String name, final String id, final List<String> metadata) {
return getTemplate(resources.getChild())
.add("name", name)
.add("hasId", !(id == null))
.add("id", id != null ? escapeXml(removePrefix(id)) : "")
.add("metadata", metadata)
.render();
}
public String buildRecord(
final OafEntity entity,
final String schemaLocation,
final String body) {
return getTemplate(resources.getRecord())
.add("id", escapeXml(removePrefix(entity.getId())))
.add("dateofcollection", entity.getDateofcollection())
.add("dateoftransformation", entity.getDateoftransformation())
.add("schemaLocation", schemaLocation)
.add("it", body)
.render();
}
public String getRel(final String type,
final String objIdentifier,
final Collection<String> fields,
final String semanticclass,
final String semantischeme,
final DataInfo info) {
return getTemplate(resources.getRel())
.add("type", type)
.add("objIdentifier", escapeXml(removePrefix(objIdentifier)))
.add("class", semanticclass)
.add("scheme", semantischeme)
.add("metadata", fields)
.add("inferred", info.getInferred())
.add("trust", info.getTrust())
.add("inferenceprovenance", info.getInferenceprovenance())
.add("provenanceaction", info.getProvenanceaction() != null ? info.getProvenanceaction().getClassid() : "")
.render();
}
public String getInstance(final String resultId, final List<String> instancemetadata, final List<String> webresources) {
return getTemplate(resources.getInstance())
.add("instanceId", escapeXml(removePrefix(resultId)))
.add("metadata", instancemetadata)
.add("webresources", webresources
.stream()
.filter(StringUtils::isNotBlank)
.map(w -> getWebResource(w))
.collect(Collectors.toList()))
.render();
}
private String getWebResource(final String identifier) {
return getTemplate(resources.getWebresource())
.add("identifier", escapeXml(identifier))
.render();
}
// HELPERS
private ST getTemplate(final String res) {
return new ST(res, DELIMITER, DELIMITER);
}
}

View File

@ -0,0 +1,54 @@
package eu.dnetlib.dhp.graph.utils;
import com.google.common.io.Resources;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class TemplateResources {
private String record = read("eu/dnetlib/dhp/graph/template/record.st");
private String instance = read("eu/dnetlib/dhp/graph/template/instance.st");
private String rel = read("eu/dnetlib/dhp/graph/template/rel.st");
private String webresource = read("eu/dnetlib/dhp/graph/template/webresource.st");
private String child = read("eu/dnetlib/dhp/graph/template/child.st");
private String entity = read("eu/dnetlib/dhp/graph/template/entity.st");
private static String read(final String classpathResource) throws IOException {
return Resources.toString(Resources.getResource(classpathResource), StandardCharsets.UTF_8);
}
public TemplateResources() throws IOException {
}
public String getEntity() {
return entity;
}
public String getRecord() {
return record;
}
public String getInstance() {
return instance;
}
public String getRel() {
return rel;
}
public String getWebresource() {
return webresource;
}
public String getChild() {
return child;
}
}

View File

@ -0,0 +1,962 @@
package eu.dnetlib.dhp.graph.utils;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.mycila.xmltool.XMLDoc;
import com.mycila.xmltool.XMLTag;
import eu.dnetlib.dhp.graph.model.JoinedEntity;
import eu.dnetlib.dhp.graph.model.RelatedEntity;
import eu.dnetlib.dhp.graph.model.Tuple2;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.*;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.Node;
import org.dom4j.io.OutputFormat;
import org.dom4j.io.SAXReader;
import org.dom4j.io.XMLWriter;
import javax.xml.transform.*;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import java.io.IOException;
import java.io.Serializable;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.graph.utils.GraphMappingUtils.*;
import static eu.dnetlib.dhp.graph.utils.XmlSerializationUtils.*;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.substringBefore;
public class XmlRecordFactory implements Serializable {
private Set<String> specialDatasourceTypes;
private ContextMapper contextMapper;
private String schemaLocation;
private Set<String> contextes = Sets.newHashSet();
private boolean indent = false;
public XmlRecordFactory(
final ContextMapper contextMapper, final boolean indent,
final String schemaLocation, final Set<String> otherDatasourceTypesUForUI) {
this.contextMapper = contextMapper;
this.schemaLocation = schemaLocation;
this.specialDatasourceTypes = otherDatasourceTypesUForUI;
this.indent = indent;
}
public String build(final JoinedEntity je) {
final OafEntity entity = je.getEntity();
TemplateFactory templateFactory = new TemplateFactory();
try {
final List<String> metadata = metadata(je.getType(), entity);
// rels has to be processed before the contexts because they enrich the contextMap with the funding info.
final List<String> relations = listRelations(je, templateFactory);
metadata.addAll(buildContexts(getMainType(je.getType())));
metadata.add(parseDataInfo(entity.getDataInfo()));
final String body = templateFactory.buildBody(
getMainType(je.getType()),
metadata,
relations,
listChildren(je, templateFactory), listExtraInfo(je));
return printXML(templateFactory.buildRecord(entity, schemaLocation, body), indent);
} catch (final Throwable e) {
throw new RuntimeException(String.format("error building record '%s'", entity.getId()), e);
}
}
private String printXML(String xml, boolean indent) {
try {
final Document doc = new SAXReader().read(new StringReader(xml));
OutputFormat format = indent ? OutputFormat.createPrettyPrint() : OutputFormat.createCompactFormat();
format.setExpandEmptyElements(false);
format.setSuppressDeclaration(true);
StringWriter sw = new StringWriter();
XMLWriter writer = new XMLWriter(sw, format);
writer.write(doc);
return sw.toString();
} catch (IOException | DocumentException e) {
throw new IllegalArgumentException("Unable to indent XML. Invalid record:\n" + xml, e);
}
}
private List<String> metadata(final String type, final OafEntity entity) {
final List<String> metadata = Lists.newArrayList();
if (entity.getCollectedfrom() != null) {
metadata.addAll(entity.getCollectedfrom()
.stream()
.map(kv -> mapKeyValue("collectedfrom", kv))
.collect(Collectors.toList()));
}
if (entity.getOriginalId() != null) {
metadata.addAll(entity.getOriginalId()
.stream()
.map(s -> asXmlElement("originalId", s))
.collect(Collectors.toList()));
}
if (entity.getPid() != null) {
metadata.addAll(entity.getPid()
.stream()
.map(p -> mapStructuredProperty("pid", p))
.collect(Collectors.toList()));
}
if (GraphMappingUtils.isResult(type)) {
final Result r = (Result) entity;
if (r.getTitle() != null) {
metadata.addAll(r.getTitle()
.stream()
.map(t -> mapStructuredProperty("title", t))
.collect(Collectors.toList()));
}
if (r.getBestaccessright() != null) {
metadata.add(mapQualifier("bestaccessright", r.getBestaccessright()));
}
if (r.getAuthor() != null) {
metadata.addAll(r.getAuthor()
.stream()
.map(a -> {
final StringBuilder sb = new StringBuilder("<creator rank=\"" + a.getRank() + "\"");
if (isNotBlank(a.getName())) {
sb.append(" name=\"" + escapeXml(a.getName()) + "\"");
}
if (isNotBlank(a.getSurname())) {
sb.append(" surname=\"" + escapeXml(a.getSurname()) + "\"");
}
if (a.getPid() != null) {
a.getPid().stream()
.filter(sp -> isNotBlank(sp.getQualifier().getClassid()) && isNotBlank(sp.getValue()))
.forEach(sp -> {
String pidType = escapeXml(sp.getQualifier().getClassid()).replaceAll("\\W", "");
String pidValue = escapeXml(sp.getValue());
// ugly hack: some records provide swapped pidtype and pidvalue
if (authorPidTypes.contains(pidValue.toLowerCase().trim())) {
sb.append(String.format(" %s=\"%s\"", pidValue, pidType));
} else {
pidType = pidType.replaceAll("\\W", "").replaceAll("\\d", "");
if (isNotBlank(pidType)) {
sb.append(String.format(" %s=\"%s\"",
pidType,
pidValue.toLowerCase().replaceAll("orcid", "")));
}
}
});
}
sb.append(">" + escapeXml(a.getFullname()) + "</creator>");
return sb.toString();
}).collect(Collectors.toList()));
}
if (r.getContributor() != null) {
metadata.addAll(r.getContributor()
.stream()
.map(c -> asXmlElement("contributor", c.getValue()))
.collect(Collectors.toList()));
}
if (r.getCountry() != null) {
metadata.addAll(r.getCountry()
.stream()
.map(c -> mapQualifier("country", c))
.collect(Collectors.toList()));
}
if (r.getCoverage() != null) {
metadata.addAll(r.getCoverage()
.stream()
.map(c -> asXmlElement("coverage", c.getValue()))
.collect(Collectors.toList()));
}
if (r.getDateofacceptance() != null) {
metadata.add(asXmlElement("dateofacceptance", r.getDateofacceptance().getValue()));
}
if (r.getDescription() != null) {
metadata.addAll(r.getDescription()
.stream()
.map(c -> asXmlElement("description", c.getValue()))
.collect(Collectors.toList()));
}
if (r.getEmbargoenddate() != null) {
metadata.add(asXmlElement("embargoenddate", r.getEmbargoenddate().getValue()));
}
if (r.getSubject() != null) {
metadata.addAll(r.getSubject()
.stream()
.map(s -> mapStructuredProperty("subject", s))
.collect(Collectors.toList()));
}
if (r.getLanguage() != null) {
metadata.add(mapQualifier("language", r.getLanguage()));
}
if (r.getRelevantdate() != null) {
metadata.addAll(r.getRelevantdate()
.stream()
.map(s -> mapStructuredProperty("relevantdate", s))
.collect(Collectors.toList()));
}
if (r.getPublisher() != null) {
metadata.add(asXmlElement("publisher", r.getPublisher().getValue()));
}
if (r.getSource() != null) {
metadata.addAll(r.getSource()
.stream()
.map(c -> asXmlElement("source", c.getValue()))
.collect(Collectors.toList()));
}
if (r.getFormat() != null) {
metadata.addAll(r.getFormat()
.stream()
.map(c -> asXmlElement("format", c.getValue()))
.collect(Collectors.toList()));
}
if (r.getResulttype() != null) {
metadata.add(mapQualifier("resulttype", r.getResulttype()));
}
if (r.getResourcetype() != null) {
metadata.add(mapQualifier("resourcetype", r.getResourcetype()));
}
metadata.add(mapQualifier("bestaccessright", getBestAccessright(r)));
if (r.getContext() != null) {
contextes.addAll(r.getContext()
.stream()
.map(c -> c.getId())
.collect(Collectors.toList()));
if (contextes.contains("dh-ch::subcommunity::2")) {
contextes.add("clarin");
}
}
}
switch (EntityType.valueOf(type)) {
case publication:
final Publication pub = (Publication) entity;
if (pub.getJournal() != null) {
final Journal j = pub.getJournal();
metadata.add(mapJournal(j));
}
break;
case dataset:
final Dataset d = (Dataset) entity;
if (d.getDevice() != null) {
metadata.add(asXmlElement("device", d.getDevice().getValue()));
}
if (d.getLastmetadataupdate() != null) {
metadata.add(asXmlElement("lastmetadataupdate", d.getLastmetadataupdate().getValue()));
}
if (d.getMetadataversionnumber() != null) {
metadata.add(asXmlElement("metadataversionnumber", d.getMetadataversionnumber().getValue()));
}
if (d.getSize() != null) {
metadata.add(asXmlElement("size", d.getSize().getValue()));
}
if (d.getStoragedate() != null) {
metadata.add(asXmlElement("storagedate", d.getStoragedate().getValue()));
}
if (d.getVersion() != null) {
metadata.add(asXmlElement("version", d.getVersion().getValue()));
}
//TODO d.getGeolocation()
break;
case otherresearchproduct:
final OtherResearchProduct orp = (OtherResearchProduct) entity;
if (orp.getContactperson() != null) {
metadata.addAll(orp.getContactperson()
.stream()
.map(c -> asXmlElement("contactperson", c.getValue()))
.collect(Collectors.toList()));
}
if (orp.getContactgroup() != null) {
metadata.addAll(orp.getContactgroup()
.stream()
.map(c -> asXmlElement("contactgroup", c.getValue()))
.collect(Collectors.toList()));
}
if (orp.getTool() != null) {
metadata.addAll(orp.getTool()
.stream()
.map(c -> asXmlElement("tool", c.getValue()))
.collect(Collectors.toList()));
}
break;
case software:
final Software s = (Software) entity;
if (s.getDocumentationUrl() != null) {
metadata.addAll(s.getDocumentationUrl()
.stream()
.map(c -> asXmlElement("documentationUrl", c.getValue()))
.collect(Collectors.toList()));
}
if (s.getLicense() != null) {
metadata.addAll(s.getLicense()
.stream()
.map(l -> mapStructuredProperty("license", l))
.collect(Collectors.toList()));
}
if (s.getCodeRepositoryUrl() != null) {
metadata.add(asXmlElement("codeRepositoryUrl", s.getCodeRepositoryUrl().getValue()));
}
if (s.getProgrammingLanguage() != null) {
metadata.add(mapQualifier("programmingLanguage", s.getProgrammingLanguage()));
}
break;
case datasource:
final Datasource ds = (Datasource) entity;
if (ds.getDatasourcetype() != null) {
mapDatasourceType(metadata, ds.getDatasourcetype());
}
if (ds.getOpenairecompatibility() != null) {
metadata.add(mapQualifier("openairecompatibility", ds.getOpenairecompatibility()));
}
if (ds.getOfficialname() != null) {
metadata.add(asXmlElement("officialname", ds.getOfficialname().getValue()));
}
if (ds.getEnglishname() != null) {
metadata.add(asXmlElement("englishname", ds.getEnglishname().getValue()));
}
if (ds.getWebsiteurl() != null) {
metadata.add(asXmlElement("websiteurl", ds.getWebsiteurl().getValue()));
}
if (ds.getLogourl() != null) {
metadata.add(asXmlElement("logourl", ds.getLogourl().getValue()));
}
if (ds.getContactemail() != null) {
metadata.add(asXmlElement("contactemail", ds.getContactemail().getValue()));
}
if (ds.getNamespaceprefix() != null) {
metadata.add(asXmlElement("namespaceprefix", ds.getNamespaceprefix().getValue()));
}
if (ds.getLatitude() != null) {
metadata.add(asXmlElement("latitude", ds.getLatitude().getValue()));
}
if (ds.getLongitude() != null) {
metadata.add(asXmlElement("longitude", ds.getLongitude().getValue()));
}
if (ds.getDateofvalidation() != null) {
metadata.add(asXmlElement("dateofvalidation", ds.getDateofvalidation().getValue()));
}
if (ds.getDescription() != null) {
metadata.add(asXmlElement("description", ds.getDescription().getValue()));
}
if (ds.getOdnumberofitems() != null) {
metadata.add(asXmlElement("odnumberofitems", ds.getOdnumberofitems().getValue()));
}
if (ds.getOdnumberofitemsdate() != null) {
metadata.add(asXmlElement("odnumberofitemsdate", ds.getOdnumberofitemsdate().getValue()));
}
if (ds.getOdpolicies() != null) {
metadata.add(asXmlElement("odpolicies", ds.getOdpolicies().getValue()));
}
if (ds.getOdlanguages() != null) {
metadata.addAll(ds.getOdlanguages()
.stream()
.map(c -> asXmlElement("odlanguages", c.getValue()))
.collect(Collectors.toList()));
}
if (ds.getOdcontenttypes() != null) {
metadata.addAll(ds.getOdcontenttypes()
.stream()
.map(c -> asXmlElement("odcontenttypes", c.getValue()))
.collect(Collectors.toList()));
}
if (ds.getAccessinfopackage() != null) {
metadata.addAll(ds.getAccessinfopackage()
.stream()
.map(c -> asXmlElement("accessinfopackage", c.getValue()))
.collect(Collectors.toList()));
}
if (ds.getReleaseenddate() != null) {
metadata.add(asXmlElement("releasestartdate", ds.getReleaseenddate().getValue()));
}
if (ds.getReleaseenddate() != null) {
metadata.add(asXmlElement("releaseenddate", ds.getReleaseenddate().getValue()));
}
if (ds.getMissionstatementurl() != null) {
metadata.add(asXmlElement("missionstatementurl", ds.getMissionstatementurl().getValue()));
}
if (ds.getDataprovider() != null) {
metadata.add(asXmlElement("dataprovider", ds.getDataprovider().getValue().toString()));
}
if (ds.getServiceprovider() != null) {
metadata.add(asXmlElement("serviceprovider", ds.getServiceprovider().getValue().toString()));
}
if (ds.getDatabaseaccesstype() != null) {
metadata.add(asXmlElement("databaseaccesstype", ds.getDatabaseaccesstype().getValue()));
}
if (ds.getDatauploadtype() != null) {
metadata.add(asXmlElement("datauploadtype", ds.getDatauploadtype().getValue()));
}
if (ds.getDatabaseaccessrestriction() != null) {
metadata.add(asXmlElement("databaseaccessrestriction", ds.getDatabaseaccessrestriction().getValue()));
}
if (ds.getDatauploadrestriction() != null) {
metadata.add(asXmlElement("datauploadrestriction", ds.getDatauploadrestriction().getValue()));
}
if (ds.getVersioning() != null) {
metadata.add(asXmlElement("versioning", ds.getVersioning().getValue().toString()));
}
if (ds.getCitationguidelineurl() != null) {
metadata.add(asXmlElement("citationguidelineurl", ds.getCitationguidelineurl().getValue()));
}
if (ds.getQualitymanagementkind() != null) {
metadata.add(asXmlElement("qualitymanagementkind", ds.getQualitymanagementkind().getValue()));
}
if (ds.getPidsystems() != null) {
metadata.add(asXmlElement("pidsystems", ds.getPidsystems().getValue()));
}
if (ds.getCertificates() != null) {
metadata.add(asXmlElement("certificates", ds.getCertificates().getValue()));
}
if (ds.getPolicies() != null) {
metadata.addAll(ds.getPolicies()
.stream()
.map(kv -> mapKeyValue("policies", kv))
.collect(Collectors.toList()));
}
if (ds.getJournal() != null) {
metadata.add(mapJournal(ds.getJournal()));
}
if (ds.getSubjects() != null) {
metadata.addAll(ds.getSubjects()
.stream()
.map(sp -> mapStructuredProperty("subject", sp))
.collect(Collectors.toList()));
}
break;
case organization:
final Organization o = (Organization) entity;
if (o.getLegalshortname() != null) {
metadata.add(asXmlElement("legalshortname", o.getLegalshortname().getValue()));
}
if (o.getLegalname() != null) {
metadata.add(asXmlElement("legalname", o.getLegalname().getValue()));
}
if (o.getAlternativeNames() != null) {
metadata.addAll(o.getAlternativeNames()
.stream()
.map(c -> asXmlElement("alternativeNames", c.getValue()))
.collect(Collectors.toList()));
}
if (o.getWebsiteurl() != null) {
metadata.add(asXmlElement("websiteurl", o.getWebsiteurl().getValue()));
}
if (o.getLogourl() != null) {
metadata.add(asXmlElement("websiteurl", o.getLogourl().getValue()));
}
if (o.getEclegalbody() != null) {
metadata.add(asXmlElement("eclegalbody", o.getEclegalbody().getValue()));
}
if (o.getEclegalperson() != null) {
metadata.add(asXmlElement("eclegalperson", o.getEclegalperson().getValue()));
}
if (o.getEcnonprofit() != null) {
metadata.add(asXmlElement("ecnonprofit", o.getEcnonprofit().getValue()));
}
if (o.getEcresearchorganization() != null) {
metadata.add(asXmlElement("ecresearchorganization", o.getEcresearchorganization().getValue()));
}
if (o.getEchighereducation() != null) {
metadata.add(asXmlElement("echighereducation", o.getEchighereducation().getValue()));
}
if (o.getEcinternationalorganization() != null) {
metadata.add(asXmlElement("ecinternationalorganizationeurinterests", o.getEcinternationalorganization().getValue()));
}
if (o.getEcinternationalorganization() != null) {
metadata.add(asXmlElement("ecinternationalorganization", o.getEcinternationalorganization().getValue()));
}
if (o.getEcenterprise() != null) {
metadata.add(asXmlElement("ecenterprise", o.getEcenterprise().getValue()));
}
if (o.getEcsmevalidated() != null) {
metadata.add(asXmlElement("ecsmevalidated", o.getEcsmevalidated().getValue()));
}
if (o.getEcnutscode() != null) {
metadata.add(asXmlElement("ecnutscode", o.getEcnutscode().getValue()));
}
if (o.getCountry() != null) {
metadata.add(mapQualifier("country", o.getCountry()));
}
break;
case project:
final Project p = (Project) entity;
if (p.getWebsiteurl() != null) {
metadata.add(asXmlElement("websiteurl", p.getWebsiteurl().getValue()));
}
if (p.getCode() != null) {
metadata.add(asXmlElement("code", p.getCode().getValue()));
}
if (p.getAcronym() != null) {
metadata.add(asXmlElement("acronym", p.getAcronym().getValue()));
}
if (p.getTitle() != null) {
metadata.add(asXmlElement("title", p.getTitle().getValue()));
}
if (p.getStartdate() != null) {
metadata.add(asXmlElement("startdate", p.getStartdate().getValue()));
}
if (p.getEnddate() != null) {
metadata.add(asXmlElement("enddate", p.getEnddate().getValue()));
}
if (p.getCallidentifier() != null) {
metadata.add(asXmlElement("callidentifier", p.getCallidentifier().getValue()));
}
if (p.getKeywords() != null) {
metadata.add(asXmlElement("keywords", p.getKeywords().getValue()));
}
if (p.getDuration() != null) {
metadata.add(asXmlElement("duration", p.getDuration().getValue()));
}
if (p.getEcarticle29_3() != null) {
metadata.add(asXmlElement("ecarticle29_3", p.getEcarticle29_3().getValue()));
}
if (p.getSubjects() != null) {
metadata.addAll(p.getSubjects()
.stream()
.map(sp -> mapStructuredProperty("subject", sp))
.collect(Collectors.toList()));
}
if (p.getContracttype() != null) {
metadata.add(mapQualifier("contracttype", p.getContracttype()));
}
if (p.getEcsc39() != null) {
metadata.add(asXmlElement("ecsc39", p.getEcsc39().getValue()));
}
if (p.getContactfullname() != null) {
metadata.add(asXmlElement("contactfullname", p.getContactfullname().getValue()));
}
if (p.getContactfax() != null) {
metadata.add(asXmlElement("contactfax", p.getContactfax().getValue()));
}
if (p.getContactphone() != null) {
metadata.add(asXmlElement("contactphone", p.getContactphone().getValue()));
}
if (p.getContactemail() != null) {
metadata.add(asXmlElement("contactemail", p.getContactemail().getValue()));
}
if (p.getSummary() != null) {
metadata.add(asXmlElement("summary", p.getSummary().getValue()));
}
if (p.getCurrency() != null) {
metadata.add(asXmlElement("currency", p.getCurrency().getValue()));
}
if (p.getTotalcost() != null) {
metadata.add(asXmlElement("totalcost", p.getTotalcost().toString()));
}
if (p.getFundedamount() != null) {
metadata.add(asXmlElement("fundedamount", p.getFundedamount().toString()));
}
if (p.getFundingtree() != null) {
metadata.addAll(p.getFundingtree()
.stream()
.map(ft -> asXmlElement("fundingtree", ft.getValue()))
.collect(Collectors.toList()));
}
break;
default:
throw new IllegalArgumentException("invalid entity type: " + type);
}
return metadata;
}
private void mapDatasourceType(List<String> metadata, final Qualifier dsType) {
metadata.add(mapQualifier("datasourcetype", dsType));
if (specialDatasourceTypes.contains(dsType.getClassid())) {
dsType.setClassid("other");
dsType.setClassname("other");
}
metadata.add(mapQualifier("datasourcetypeui", dsType));
}
private Qualifier getBestAccessright(final Result r) {
Qualifier bestAccessRight = new Qualifier();
bestAccessRight.setClassid("UNKNOWN");
bestAccessRight.setClassname("not available");
bestAccessRight.setSchemeid("dnet:access_modes");
bestAccessRight.setSchemename("dnet:access_modes");
final LicenseComparator lc = new LicenseComparator();
for (final Instance instance : r.getInstance()) {
if (lc.compare(bestAccessRight, instance.getAccessright()) > 0) {
bestAccessRight = instance.getAccessright();
}
}
return bestAccessRight;
}
private List<String> listRelations(final JoinedEntity je, TemplateFactory templateFactory) {
final List<String> rels = Lists.newArrayList();
for (final Tuple2 link : je.getLinks()) {
final Relation rel = link.getRelation();
final RelatedEntity re = link.getRelatedEntity();
final String targetType = link.getRelatedEntity().getType();
final List<String> metadata = Lists.newArrayList();
switch (EntityType.valueOf(targetType)) {
case publication:
case dataset:
case otherresearchproduct:
case software:
if (re.getTitle() != null && isNotBlank(re.getTitle().getValue())) {
metadata.add(mapStructuredProperty("title", re.getTitle()));
}
if (isNotBlank(re.getDateofacceptance())) {
metadata.add(asXmlElement("dateofacceptance", re.getDateofacceptance()));
}
if (isNotBlank(re.getPublisher())) {
metadata.add(asXmlElement("publisher", re.getPublisher()));
}
if (isNotBlank(re.getCodeRepositoryUrl())) {
metadata.add(asXmlElement("coderepositoryurl", re.getCodeRepositoryUrl()));
}
if (re.getResulttype() != null & !re.getResulttype().isBlank()) {
metadata.add(mapQualifier("resulttype", re.getResulttype()));
}
if (re.getCollectedfrom() != null) {
metadata.addAll(re.getCollectedfrom()
.stream()
.map(kv -> mapKeyValue("collectedfrom", kv))
.collect(Collectors.toList()));
}
if (re.getPid() != null) {
metadata.addAll(re.getPid()
.stream()
.map(p -> mapStructuredProperty("pid", p))
.collect(Collectors.toList()));
}
break;
case datasource:
if (isNotBlank(re.getOfficialname())) {
metadata.add(asXmlElement("officialname", re.getOfficialname()));
}
if (re.getDatasourcetype() != null & !re.getDatasourcetype().isBlank()) {
mapDatasourceType(metadata, re.getDatasourcetype());
}
if (re.getOpenairecompatibility() != null & !re.getOpenairecompatibility().isBlank()) {
metadata.add(mapQualifier("openairecompatibility", re.getOpenairecompatibility()));
}
break;
case organization:
if (isNotBlank(re.getLegalname())) {
metadata.add(asXmlElement("legalname", re.getLegalname()));
}
if (isNotBlank(re.getLegalshortname())) {
metadata.add(asXmlElement("legalshortname", re.getLegalshortname()));
}
if (re.getCountry() != null & !re.getCountry().isBlank()) {
metadata.add(mapQualifier("country", re.getCountry()));
}
break;
case project:
if (isNotBlank(re.getProjectTitle())) {
metadata.add(asXmlElement("title", re.getProjectTitle()));
}
if (isNotBlank(re.getCode())) {
metadata.add(asXmlElement("code", re.getCode()));
}
if (isNotBlank(re.getAcronym())) {
metadata.add(asXmlElement("acronym", re.getAcronym()));
}
if (re.getContracttype() != null & !re.getContracttype().isBlank()) {
metadata.add(mapQualifier("contracttype", re.getContracttype()));
}
if (re.getFundingtree() != null) {
metadata.addAll(re.getFundingtree()
.stream()
.peek(ft -> fillContextMap(ft))
.map(ft -> getRelFundingTree(ft))
.collect(Collectors.toList()));
}
break;
default:
throw new IllegalArgumentException("invalid target type: " + targetType);
}
final DataInfo info = rel.getDataInfo();
rels.add(templateFactory.getRel(
targetType,
rel.getTarget(),
Sets.newHashSet(metadata),
getInverseRelClass(rel.getRelClass()),
getScheme(targetType, re.getType()),
info));
}
return rels;
}
private List<String> listChildren(final JoinedEntity je, TemplateFactory templateFactory) {
final List<String> children = Lists.newArrayList();
if (MainEntityType.result.toString().equals(getMainType(je.getType()))) {
final List<Instance> instances = ((Result) je.getEntity()).getInstance();
if (instances != null) {
for (final Instance instance : ((Result) je.getEntity()).getInstance()) {
final List<String> fields = Lists.newArrayList();
if (instance.getAccessright() != null && !instance.getAccessright().isBlank()) {
fields.add(mapQualifier("accessright", instance.getAccessright()));
}
if (instance.getCollectedfrom() != null) {
fields.add(mapKeyValue("collectedfrom", instance.getCollectedfrom()));
}
if (instance.getHostedby() != null) {
fields.add(mapKeyValue("hostedby", instance.getHostedby()));
}
if (instance.getDateofacceptance() != null && isNotBlank(instance.getDateofacceptance().getValue())) {
fields.add(asXmlElement("dateofacceptance", instance.getDateofacceptance().getValue()));
}
if (instance.getInstancetype() != null && !instance.getInstancetype().isBlank()) {
fields.add(mapQualifier("instancetype", instance.getInstancetype()));
}
if (isNotBlank(instance.getDistributionlocation())) {
fields.add(asXmlElement("distributionlocation", instance.getDistributionlocation()));
}
if (instance.getRefereed() != null && isNotBlank(instance.getRefereed().getValue())) {
fields.add(asXmlElement("refereed", instance.getRefereed().getValue()));
}
if (instance.getProcessingchargeamount() != null && isNotBlank(instance.getProcessingchargeamount().getValue())) {
fields.add(asXmlElement("processingchargeamount", instance.getProcessingchargeamount().getValue()));
}
if (instance.getProcessingchargecurrency() != null && isNotBlank(instance.getProcessingchargecurrency().getValue())) {
fields.add(asXmlElement("processingchargecurrency", instance.getProcessingchargecurrency().getValue()));
}
children.add(templateFactory.getInstance(instance.getHostedby().getKey(), fields, instance.getUrl()));
}
}
final List<ExternalReference> ext = ((Result) je.getEntity()).getExternalReference();
if (ext != null) {
for (final ExternalReference er : ((Result) je.getEntity()).getExternalReference()) {
final List<String> fields = Lists.newArrayList();
if (isNotBlank(er.getSitename())) {
fields.add(asXmlElement("sitename", er.getSitename()));
}
if (isNotBlank(er.getLabel())) {
fields.add(asXmlElement("label", er.getLabel()));
}
if (isNotBlank(er.getUrl())) {
fields.add(asXmlElement("url", er.getUrl()));
}
if (isNotBlank(er.getDescription())) {
fields.add(asXmlElement("description", er.getDescription()));
}
if (isNotBlank(er.getUrl())) {
fields.add(mapQualifier("qualifier", er.getQualifier()));
}
if (isNotBlank(er.getRefidentifier())) {
fields.add(asXmlElement("refidentifier", er.getRefidentifier()));
}
if (isNotBlank(er.getQuery())) {
fields.add(asXmlElement("query", er.getQuery()));
}
children.add(templateFactory.getChild("externalreference", null, fields));
}
}
}
return children;
}
private List<String> listExtraInfo(JoinedEntity je) {
final List<ExtraInfo> extraInfo = je.getEntity().getExtraInfo();
return extraInfo != null ? extraInfo
.stream()
.map(e -> mapExtraInfo(e))
.collect(Collectors.toList()) : Lists.newArrayList();
}
private List<String> buildContexts(final String type) {
final List<String> res = Lists.newArrayList();
if ((contextMapper != null) && !contextMapper.isEmpty() && MainEntityType.result.toString().equals(type)) {
XMLTag document = XMLDoc.newDocument(true).addRoot("contextRoot");
for (final String context : contextes) {
String id = "";
for (final String token : Splitter.on("::").split(context)) {
id += token;
final ContextDef def = contextMapper.get(id);
if (def == null) {
continue;
// throw new IllegalStateException(String.format("cannot find context for id '%s'", id));
}
if (def.getName().equals("context")) {
final String xpath = "//context/@id='" + def.getId() + "'";
if (!document.gotoRoot().rawXpathBoolean(xpath, new Object())) {
document = addContextDef(document.gotoRoot(), def);
}
}
if (def.getName().equals("category")) {
final String rootId = substringBefore(def.getId(), "::");
document = addContextDef(document.gotoRoot().gotoTag("//context[./@id='" + rootId + "']", new Object()), def);
}
if (def.getName().equals("concept")) {
document = addContextDef(document, def).gotoParent();
}
id += "::";
}
}
final Transformer transformer = getTransformer();
for (final org.w3c.dom.Element x : document.gotoRoot().getChildElement()) {
try {
res.add(asStringElement(x, transformer));
} catch (final TransformerException e) {
throw new RuntimeException(e);
}
}
}
return res;
}
private Transformer getTransformer() {
try {
Transformer transformer = TransformerFactory.newInstance().newTransformer();
transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
return transformer;
} catch (TransformerConfigurationException e) {
throw new IllegalStateException("unable to create javax.xml.transform.Transformer", e);
}
}
private XMLTag addContextDef(final XMLTag tag, final ContextDef def) {
tag.addTag(def.getName()).addAttribute("id", def.getId()).addAttribute("label", def.getLabel());
if ((def.getType() != null) && !def.getType().isEmpty()) {
tag.addAttribute("type", def.getType());
}
return tag;
}
private String asStringElement(final org.w3c.dom.Element element, final Transformer transformer) throws TransformerException {
final StringWriter buffer = new StringWriter();
transformer.transform(new DOMSource(element), new StreamResult(buffer));
return buffer.toString();
}
private void fillContextMap(final String xmlTree) {
Document fundingPath;
try {
fundingPath = new SAXReader().read(new StringReader(xmlTree));
} catch (final DocumentException e) {
throw new RuntimeException(e);
}
try {
final Node funder = fundingPath.selectSingleNode("//funder");
if (funder != null) {
final String funderShortName = funder.valueOf("./shortname");
contextes.add(funderShortName);
contextMapper.put(funderShortName, new ContextDef(funderShortName, funder.valueOf("./name"), "context", "funding"));
final Node level0 = fundingPath.selectSingleNode("//funding_level_0");
if (level0 != null) {
final String level0Id = Joiner.on("::").join(funderShortName, level0.valueOf("./name"));
contextMapper.put(level0Id, new ContextDef(level0Id, level0.valueOf("./description"), "category", ""));
final Node level1 = fundingPath.selectSingleNode("//funding_level_1");
if (level1 == null) {
contextes.add(level0Id);
} else {
final String level1Id = Joiner.on("::").join(level0Id, level1.valueOf("./name"));
contextMapper.put(level1Id, new ContextDef(level1Id, level1.valueOf("./description"), "concept", ""));
final Node level2 = fundingPath.selectSingleNode("//funding_level_2");
if (level2 == null) {
contextes.add(level1Id);
} else {
final String level2Id = Joiner.on("::").join(level1Id, level2.valueOf("./name"));
contextMapper.put(level2Id, new ContextDef(level2Id, level2.valueOf("./description"), "concept", ""));
contextes.add(level2Id);
}
}
}
}
} catch (final NullPointerException e) {
throw new IllegalArgumentException("malformed funding path: " + xmlTree, e);
}
}
@SuppressWarnings("unchecked")
private String getRelFundingTree(final String xmlTree) {
String funding = "<funding>";
try {
final Document ftree = new SAXReader().read(new StringReader(xmlTree));
funding = "<funding>";
funding += getFunderElement(ftree);
for (final Object o : Lists.reverse(ftree.selectNodes("//fundingtree//*[starts-with(local-name(),'funding_level_')]"))) {
final Element e = (Element) o;
final String _id = e.valueOf("./id");
funding += "<" + e.getName() + " name=\"" + escapeXml(e.valueOf("./name")) + "\">" + escapeXml(_id) + "</" + e.getName() + ">";
}
} catch (final DocumentException e) {
throw new IllegalArgumentException("unable to parse funding tree: " + xmlTree + "\n" + e.getMessage());
} finally {
funding += "</funding>";
}
return funding;
}
private String getFunderElement(final Document ftree) {
final String funderId = ftree.valueOf("//fundingtree/funder/id/text()");
final String funderShortName = ftree.valueOf("//fundingtree/funder/shortname/text()");
final String funderName = ftree.valueOf("//fundingtree/funder/name/text()");
final String funderJurisdiction = ftree.valueOf("//fundingtree/funder/jurisdiction/text()");
return "<funder id=\"" + escapeXml(funderId) + "\" shortname=\"" + escapeXml(funderShortName) + "\" name=\"" + escapeXml(funderName)
+ "\" jurisdiction=\"" + escapeXml(funderJurisdiction) + "\" />";
}
}

View File

@ -0,0 +1,151 @@
package eu.dnetlib.dhp.graph.utils;
import eu.dnetlib.dhp.schema.oaf.*;
import static eu.dnetlib.dhp.graph.utils.GraphMappingUtils.removePrefix;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class XmlSerializationUtils {
// XML 1.0
// #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD] | [#x10000-#x10FFFF]
private final static String xml10pattern = "[^"
+ "\u0009\r\n"
+ "\u0020-\uD7FF"
+ "\uE000-\uFFFD"
+ "\ud800\udc00-\udbff\udfff"
+ "]";
public static String mapJournal(Journal j) {
final String attrs = new StringBuilder()
.append(attr("issn", j.getIssnPrinted()))
.append(attr("eissn", j.getIssnOnline()))
.append(attr("lissn", j.getIssnLinking()))
.append(attr("ep", j.getEp()))
.append(attr("iss", j.getIss()))
.append(attr("sp", j.getSp()))
.append(attr("vol", j.getVol()))
.toString()
.trim();
return new StringBuilder()
.append("<journal")
.append(isNotBlank(attrs) ? (" " + attrs) : "")
.append(">")
.append(escapeXml(j.getName()))
.append("</journal>")
.toString();
}
private static String attr(final String name, final String value) {
return isNotBlank(value) ? name + "=\"" + escapeXml(value) + "\" " : "";
}
public static String mapStructuredProperty(String name, StructuredProperty t) {
return asXmlElement(name, t.getValue(), t.getQualifier(), t.getDataInfo() != null ? t.getDataInfo() : null);
}
public static String mapQualifier(String name, Qualifier q) {
return asXmlElement(name, "", q, null);
}
public static String escapeXml(final String value) {
return value
.replaceAll("&", "&amp;")
.replaceAll("<", "&lt;")
.replaceAll(">", "&gt;")
.replaceAll("\"", "&quot;")
.replaceAll("'", "&apos;")
.replaceAll(xml10pattern, "");
}
public static String parseDataInfo(final DataInfo dataInfo) {
return new StringBuilder()
.append("<datainfo>")
.append(asXmlElement("inferred", dataInfo.getInferred() + ""))
.append(asXmlElement("deletedbyinference", dataInfo.getDeletedbyinference() + ""))
.append(asXmlElement("trust", dataInfo.getTrust() + ""))
.append(asXmlElement("inferenceprovenance", dataInfo.getInferenceprovenance() + ""))
.append(asXmlElement("provenanceaction", null, dataInfo.getProvenanceaction(), null))
.append("</datainfo>")
.toString();
}
private static StringBuilder dataInfoAsAttributes(final StringBuilder sb, final DataInfo info) {
return sb
.append(attr("inferred", info.getInferred() != null ? info.getInferred().toString() : ""))
.append(attr("inferenceprovenance", info.getInferenceprovenance()))
.append(attr("provenanceaction", info.getProvenanceaction() != null ? info.getProvenanceaction().getClassid() : ""))
.append(attr("trust", info.getTrust()));
}
public static String mapKeyValue(final String name, final KeyValue kv) {
return new StringBuilder()
.append("<")
.append(name)
.append(" name=\"")
.append(escapeXml(kv.getValue()))
.append("\" id=\"")
.append(escapeXml(removePrefix(kv.getKey())))
.append("\"/>")
.toString();
}
public static String mapExtraInfo(final ExtraInfo e) {
return new StringBuilder("<extraInfo ")
.append("name=\"" + e.getName() + "\" ")
.append("typology=\"" + e.getTypology() + "\" ")
.append("provenance=\"" + e.getProvenance() + "\" ")
.append("trust=\"" + e.getTrust() + "\"")
.append(">")
.append(e.getValue())
.append("</extraInfo>")
.toString();
}
public static String asXmlElement(final String name, final String value) {
return asXmlElement(name, value, null, null);
}
public static String asXmlElement(final String name, final String value, final Qualifier q, final DataInfo info) {
StringBuilder sb = new StringBuilder();
sb.append("<");
sb.append(name);
if (q != null) {
sb.append(getAttributes(q));
}
if (info != null) {
sb
.append(" ")
.append(attr("inferred", info.getInferred() != null ? info.getInferred().toString() : ""))
.append(attr("inferenceprovenance", info.getInferenceprovenance()))
.append(attr("provenanceaction", info.getProvenanceaction() != null ? info.getProvenanceaction().getClassid() : ""))
.append(attr("trust", info.getTrust()));
}
if (isBlank(value)) {
sb.append("/>");
return sb.toString();
}
sb.append(">");
sb.append(escapeXml(value));
sb.append("</");
sb.append(name);
sb.append(">");
return sb.toString();
}
public static String getAttributes(final Qualifier q) {
if (q == null || q.isBlank()) return "";
return new StringBuilder(" ")
.append(attr("classid", q.getClassid()))
.append(attr("classname", q.getClassname()))
.append(attr("schemeid", q.getSchemeid()))
.append(attr("schemename", q.getSchemename()))
.toString();
}
}

View File

@ -0,0 +1 @@
net.sf.saxon.TransformerFactoryImpl

View File

@ -0,0 +1,6 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true},
{"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the path used to store temporary output files", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequence file to read", "paramRequired": true}
]

View File

@ -0,0 +1,7 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequence file to read the XML records", "paramRequired": true},
{"paramName":"f", "paramLongName":"format", "paramDescription": "MDFormat name found in the IS profile", "paramRequired": true},
{"paramName":"b", "paramLongName":"batchSize", "paramDescription": "size of the batch of documents sent to solr", "paramRequired": false}
]

View File

@ -0,0 +1,34 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hive_db_name</name>
<value>openaire</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18088</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/applicationHistory</value>
</property>
</configuration>

View File

@ -0,0 +1,99 @@
<workflow-app name="index_infospace_graph" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>hive_db_name</name>
<description>the target hive database name</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<start to="reuse_records"/>
<decision name="reuse_records">
<switch>
<case to="adjancency_lists">${wf:conf('reuseRecords') eq false}</case>
<case to="to_solr_index">${wf:conf('reuseRecords') eq true}</case>
<default to="adjancency_lists"/>
</switch>
</decision>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="adjancency_lists">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>cluster</mode>
<name>build_adjacency_lists</name>
<class>eu.dnetlib.dhp.graph.SparkXmlRecordBuilderJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>-mt</arg> <arg>yarn</arg>
<arg>-is</arg> <arg>${isLookupUrl}</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
</spark>
<ok to="to_solr_index"/>
<error to="Kill"/>
</action>
<action name="to_solr_index">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>cluster</mode>
<name>to_solr_index</name>
<class>eu.dnetlib.dhp.graph.SparkXmlIndexingJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForIndexing}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>-mt</arg> <arg>yarn</arg>
<arg>-is</arg> <arg>${isLookupUrl}</arg>
<arg>--sourcePath</arg><arg>${outputPath}/xml</arg>
<arg>--format</arg><arg>${format}</arg>
<arg>--batchSize</arg><arg>${batchSize}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,3 @@
<name$if(hasId)$ objidentifier="$id$"$else$$endif>>
$metadata:{ it | $it$ }$
</name>

View File

@ -0,0 +1,10 @@
<oaf:$name$>
$metadata:{ it | $it$ }$
<rels>
$rels:{ it | $it$ }$
</rels>
<children>
$children:{ it | $it$ }$
</children>
</oaf:$name$>
$extrainfo:{ it | $it$ }$

View File

@ -0,0 +1,4 @@
<instance id="$instanceId$">
$metadata:{ it | $it$ }$
$webresources:{ it | $it$ }$
</instance>

View File

@ -0,0 +1,17 @@
<?xml version="1.0"?>
<record>
<result xmlns:dri="http://www.driver-repository.eu/namespace/dri" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<header>
<dri:objIdentifier>$id$</dri:objIdentifier>
<dri:dateOfCollection>$dateofcollection$</dri:dateOfCollection>
<dri:dateOfTransformation>$dateoftransformation$</dri:dateOfTransformation>
</header>
<metadata>
<oaf:entity xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:oaf="http://namespace.openaire.eu/oaf"
xsi:schemaLocation="http://namespace.openaire.eu/oaf $schemaLocation$">
$it$
</oaf:entity>
</metadata>
</result>
</record>

View File

@ -0,0 +1,4 @@
<rel inferred="$inferred$" trust="$trust$" inferenceprovenance="$inferenceprovenance$" provenanceaction="$provenanceaction$">
<to class="$class$" scheme="$scheme$" type="$type$">$objIdentifier$</to>
$metadata:{ it | $it$ }$
</rel>

View File

@ -0,0 +1,3 @@
<webresource>
<url>$identifier$</url>
</webresource>

View File

@ -10,9 +10,8 @@ This module is automatically executed when running:
on module having set:
<parent>
<groupId>eu.dnetlib</groupId>
<artifactId>dhp-wf</artifactId>
<version>1.0.0-SNAPSHOT</version>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows</artifactId>
</parent>
in `pom.xml` file. `oozie-package` profile initializes oozie workflow packaging, `workflow.source.dir` property points to a workflow (notice: this is not a relative path but a classpath to directory usually holding `oozie_app` subdirectory).

View File

@ -18,6 +18,7 @@
<module>dhp-distcp</module>
<module>dhp-graph-mapper</module>
<module>dhp-dedup</module>
<module>dhp-graph-provision</module>
</modules>
<pluginRepositories>

137
pom.xml
View File

@ -100,7 +100,13 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<artifactId>hadoop-common</artifactId>
<version>${dhp.hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${dhp.hadoop.version}</version>
<scope>provided</scope>
</dependency>
@ -148,11 +154,11 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.sf.saxon</groupId>
<artifactId>Saxon-HE</artifactId>
<version>9.5.1-5</version>
</dependency>
<dependency>
<groupId>net.sf.saxon</groupId>
<artifactId>Saxon-HE</artifactId>
<version>9.9.1-6</version>
</dependency>
<dependency>
<groupId>dom4j</groupId>
@ -173,6 +179,56 @@
</dependency>
<dependency>
<groupId>com.mycila.xmltool</groupId>
<artifactId>xmltool</artifactId>
<version>3.3</version>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
<version>7.5.0</version>
<exclusions>
<exclusion>
<artifactId>*</artifactId>
<groupId>*</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.lucidworks.spark</groupId>
<artifactId>spark-solr</artifactId>
<version>3.6.0</version>
<exclusions>
<exclusion>
<artifactId>*</artifactId>
<groupId>*</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.3</version>
</dependency>
<dependency>
<groupId>org.noggit</groupId>
<artifactId>noggit</artifactId>
<version>0.8</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.11</version>
</dependency>
<dependency>
<groupId>net.schmizz</groupId>
<artifactId>sshj</artifactId>
<version>0.10.0</version>
@ -202,10 +258,19 @@
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-pace-core</artifactId>
<version>4.0.0-SNAPSHOT</version>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>cnr-rmi-api</artifactId>
<version>[2.0.0,3.0.0)</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http</artifactId>
<version>3.1.5</version>
</dependency>
<dependency>
<groupId>javax.persistence</groupId>
<artifactId>javax.persistence-api</artifactId>
@ -213,32 +278,36 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.arakelian</groupId>
<artifactId>java-jq</artifactId>
<version>0.10.1</version>
</dependency>
<dependency>
<groupId>edu.cmu</groupId>
<artifactId>secondstring</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>${mongodb.driver.version}</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.arakelian</groupId>
<artifactId>java-jq</artifactId>
<version>0.10.1</version>
</dependency>
<dependency>
<groupId>edu.cmu</groupId>
<artifactId>secondstring</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>${mongodb.driver.version}</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>stringtemplate</artifactId>
<version>4.0</version>
</dependency>
<dependency>
<groupId>org.apache.oozie</groupId>