1
0
Fork 0

merge upstream

This commit is contained in:
Miriam Baglioni 2020-07-20 18:05:01 +02:00
commit 2a15494b16
39 changed files with 2681 additions and 272 deletions

View File

@ -14,6 +14,37 @@
<description>This module contains common schema classes meant to be used across the dnet-hadoop submodules</description> <description>This module contains common schema classes meant to be used across the dnet-hadoop submodules</description>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.0.1</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>initialize</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</build>
<dependencies> <dependencies>
<dependency> <dependency>

View File

@ -1,8 +1,6 @@
package eu.dnetlib.dhp.schema.common; package eu.dnetlib.dhp.schema.common;
import java.security.Key;
import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Qualifier;

View File

@ -0,0 +1,90 @@
package eu.dnetlib.dhp.schema.scholexplorer
import eu.dnetlib.dhp.schema.oaf.{DataInfo, Field, KeyValue, Qualifier, StructuredProperty}
object OafUtils {
def generateKeyValue(key: String, value: String): KeyValue = {
val kv: KeyValue = new KeyValue()
kv.setKey(key)
kv.setValue(value)
kv.setDataInfo(generateDataInfo("0.9"))
kv
}
def generateDataInfo(trust: String = "0.9", invisibile: Boolean = false): DataInfo = {
val di = new DataInfo
di.setDeletedbyinference(false)
di.setInferred(false)
di.setInvisible(false)
di.setTrust(trust)
di.setProvenanceaction(createQualifier("sysimport:actionset", "dnet:provenanceActions"))
di
}
def createQualifier(cls: String, sch: String): Qualifier = {
createQualifier(cls, cls, sch, sch)
}
def createQualifier(classId: String, className: String, schemeId: String, schemeName: String): Qualifier = {
val q: Qualifier = new Qualifier
q.setClassid(classId)
q.setClassname(className)
q.setSchemeid(schemeId)
q.setSchemename(schemeName)
q
}
def asField[T](value: T): Field[T] = {
val tmp = new Field[T]
tmp.setValue(value)
tmp
}
def createSP(value: String, classId: String,className:String, schemeId: String, schemeName:String): StructuredProperty = {
val sp = new StructuredProperty
sp.setQualifier(createQualifier(classId,className, schemeId, schemeName))
sp.setValue(value)
sp
}
def createSP(value: String, classId: String,className:String, schemeId: String, schemeName:String, dataInfo: DataInfo): StructuredProperty = {
val sp = new StructuredProperty
sp.setQualifier(createQualifier(classId,className, schemeId, schemeName))
sp.setValue(value)
sp.setDataInfo(dataInfo)
sp
}
def createSP(value: String, classId: String, schemeId: String): StructuredProperty = {
val sp = new StructuredProperty
sp.setQualifier(createQualifier(classId, schemeId))
sp.setValue(value)
sp
}
def createSP(value: String, classId: String, schemeId: String, dataInfo: DataInfo): StructuredProperty = {
val sp = new StructuredProperty
sp.setQualifier(createQualifier(classId, schemeId))
sp.setValue(value)
sp.setDataInfo(dataInfo)
sp
}
}

View File

@ -34,7 +34,10 @@ public class EventFactory {
final MappedFields map = createMapFromResult(updateInfo); final MappedFields map = createMapFromResult(updateInfo);
final String eventId = calculateEventId( final String eventId = calculateEventId(
updateInfo.getTopicPath(), updateInfo.getTarget().getOpenaireId(), updateInfo.getHighlightValueAsString()); updateInfo.getTopicPath(), updateInfo.getTargetDs().getOpenaireId(), updateInfo
.getTarget()
.getOpenaireId(),
updateInfo.getHighlightValueAsString());
res.setEventId(eventId); res.setEventId(eventId);
res.setProducerId(PRODUCER_ID); res.setProducerId(PRODUCER_ID);
@ -93,11 +96,13 @@ public class EventFactory {
return map; return map;
} }
private static String calculateEventId(final String topic, final String publicationId, final String value) { private static String calculateEventId(final String topic, final String dsId, final String publicationId,
final String value) {
return "event-" return "event-"
+ DigestUtils.md5Hex(topic).substring(0, 6) + "-" + DigestUtils.md5Hex(topic).substring(0, 4) + "-"
+ DigestUtils.md5Hex(publicationId).substring(0, 8) + "-" + DigestUtils.md5Hex(dsId).substring(0, 4) + "-"
+ DigestUtils.md5Hex(value).substring(0, 8); + DigestUtils.md5Hex(publicationId).substring(0, 7) + "-"
+ DigestUtils.md5Hex(value).substring(0, 5);
} }
private static long calculateExpiryDate(final long now) { private static long calculateExpiryDate(final long now) {

View File

@ -64,157 +64,12 @@
</configuration> </configuration>
</global> </global>
<start to="join_entities_step0"/> <start to="generate_events"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="join_entities_step0">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep0</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep0Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step1"/>
<error to="Kill"/>
</action>
<action name="join_entities_step1">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep1</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep1Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step2"/>
<error to="Kill"/>
</action>
<action name="join_entities_step2">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep2</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep2Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step3"/>
<error to="Kill"/>
</action>
<action name="join_entities_step3">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep3</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep3Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step4"/>
<error to="Kill"/>
</action>
<action name="join_entities_step4">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep4</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep4Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="prepare_groups"/>
<error to="Kill"/>
</action>
<action name="prepare_groups">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareGroupsJob</name>
<class>eu.dnetlib.dhp.broker.oa.PrepareGroupsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="generate_events"/>
<error to="Kill"/>
</action>
<action name="generate_events"> <action name="generate_events">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>

View File

@ -12,8 +12,6 @@ import org.apache.spark.sql.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.DataInfo;

View File

@ -9,6 +9,37 @@
<artifactId>dhp-graph-mapper</artifactId> <artifactId>dhp-graph-mapper</artifactId>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.0.1</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>initialize</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</build>
<dependencies> <dependencies>
<dependency> <dependency>
@ -61,6 +92,13 @@
<groupId>org.postgresql</groupId> <groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId> <artifactId>postgresql</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_2.11</artifactId>
<version>3.5.3</version>
</dependency>
</dependencies> </dependencies>

View File

@ -0,0 +1,162 @@
package eu.dnetlib.dhp.oa.graph.merge;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
/**
* Combines the content from two aggregator graph tables of the same type, entities (or relationships) with the same ids
* are picked preferring those from the BETA aggregator rather then from PROD. The identity of a relationship is defined
* by eu.dnetlib.dhp.schema.common.ModelSupport#idFn()
*/
public class MergeGraphSparkJob {
private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String PRIORITY_DEFAULT = "BETA"; // BETA | PROD
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
CleanGraphSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
String priority = Optional
.ofNullable(parser.get("priority"))
.orElse(PRIORITY_DEFAULT);
log.info("priority: {}", priority);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String betaInputPath = parser.get("betaInputPath");
log.info("betaInputPath: {}", betaInputPath);
String prodInputPath = parser.get("prodInputPath");
log.info("prodInputPath: {}", prodInputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
mergeGraphTable(spark, priority, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath);
});
}
private static <P extends Oaf, B extends Oaf> void mergeGraphTable(
SparkSession spark,
String priority,
String betaInputPath,
String prodInputPath,
Class<P> p_clazz,
Class<B> b_clazz,
String outputPath) {
Dataset<Tuple2<String, B>> beta = readTableFromPath(spark, betaInputPath, b_clazz);
Dataset<Tuple2<String, P>> prod = readTableFromPath(spark, prodInputPath, p_clazz);
prod
.joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer")
.map((MapFunction<Tuple2<Tuple2<String, P>, Tuple2<String, B>>, P>) value -> {
Optional<P> p = Optional.ofNullable(value._1()).map(Tuple2::_2);
Optional<B> b = Optional.ofNullable(value._2()).map(Tuple2::_2);
switch (priority) {
default:
case "BETA":
return mergeWithPriorityToBETA(p, b);
case "PROD":
return mergeWithPriorityToPROD(p, b);
}
}, Encoders.bean(p_clazz))
.filter((FilterFunction<P>) Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
private static <P extends Oaf, B extends Oaf> P mergeWithPriorityToPROD(Optional<P> p, Optional<B> b) {
if (b.isPresent() & !p.isPresent()) {
return (P) b.get();
}
if (p.isPresent()) {
return p.get();
}
return null;
}
private static <P extends Oaf, B extends Oaf> P mergeWithPriorityToBETA(Optional<P> p, Optional<B> b) {
if (p.isPresent() & !b.isPresent()) {
return p.get();
}
if (b.isPresent()) {
return (P) b.get();
}
return null;
}
private static <T extends Oaf> Dataset<Tuple2<String, T>> readTableFromPath(
SparkSession spark, String inputEntityPath, Class<T> clazz) {
log.info("Reading Graph table from: {}", inputEntityPath);
return spark
.read()
.textFile(inputEntityPath)
.map(
(MapFunction<String, Tuple2<String, T>>) value -> {
final T t = OBJECT_MAPPER.readValue(value, clazz);
final String id = ModelSupport.idFn().apply(t);
return new Tuple2<>(id, t);
},
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -1,36 +1,10 @@
package eu.dnetlib.dhp.oa.graph.raw; package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId; import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.dataInfo; import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.journal;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.keyValue;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.oaiIProvenance;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_ACCESS_MODES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PID_TYPES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PRODUCED_BY;
import static eu.dnetlib.dhp.schema.common.ModelConstants.NOT_AVAILABLE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ORP_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.OUTCOME;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PRODUCES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PUBLICATION_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.REPOSITORY_PROVENANCE_ACTIONS;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.UNKNOWN;
import java.util.ArrayList; import java.util.*;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document; import org.dom4j.Document;
@ -40,24 +14,8 @@ import org.dom4j.Node;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.LicenseComparator; import eu.dnetlib.dhp.schema.common.LicenseComparator;
import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.GeoLocation;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public abstract class AbstractMdRecordToOafMapper { public abstract class AbstractMdRecordToOafMapper {
@ -99,7 +57,6 @@ public abstract class AbstractMdRecordToOafMapper {
final Document doc = DocumentHelper final Document doc = DocumentHelper
.parseText(xml.replaceAll(DATACITE_SCHEMA_KERNEL_4, DATACITE_SCHEMA_KERNEL_3)); .parseText(xml.replaceAll(DATACITE_SCHEMA_KERNEL_4, DATACITE_SCHEMA_KERNEL_3));
final String type = doc.valueOf("//dr:CobjCategory/@type");
final KeyValue collectedFrom = getProvenanceDatasource( final KeyValue collectedFrom = getProvenanceDatasource(
doc, "//oaf:collectedFrom/@id", "//oaf:collectedFrom/@name"); doc, "//oaf:collectedFrom/@id", "//oaf:collectedFrom/@name");
@ -118,12 +75,32 @@ public abstract class AbstractMdRecordToOafMapper {
final DataInfo info = prepareDataInfo(doc, invisible); final DataInfo info = prepareDataInfo(doc, invisible);
final long lastUpdateTimestamp = new Date().getTime(); final long lastUpdateTimestamp = new Date().getTime();
return createOafs(doc, type, collectedFrom, hostedBy, info, lastUpdateTimestamp); final List<Instance> instances = prepareInstances(doc, info, collectedFrom, hostedBy);
final String type = getResultType(doc, instances);
return createOafs(doc, type, instances, collectedFrom, info, lastUpdateTimestamp);
} catch (final Exception e) { } catch (final Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
protected String getResultType(final Document doc, final List<Instance> instances) {
String type = doc.valueOf("//dr:CobjCategory/@type");
if (StringUtils.isBlank(type) & vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) {
String instanceType = instances
.stream()
.map(i -> i.getInstancetype().getClassid())
.findFirst()
.orElse("0000"); // Unknown
Qualifier resultType = vocs.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, instanceType);
return resultType.getClassid();
}
return type;
}
private KeyValue getProvenanceDatasource(final Document doc, final String xpathId, final String xpathName) { private KeyValue getProvenanceDatasource(final Document doc, final String xpathId, final String xpathName) {
final String dsId = doc.valueOf(xpathId); final String dsId = doc.valueOf(xpathId);
final String dsName = doc.valueOf(xpathName); final String dsName = doc.valueOf(xpathName);
@ -138,8 +115,8 @@ public abstract class AbstractMdRecordToOafMapper {
protected List<Oaf> createOafs( protected List<Oaf> createOafs(
final Document doc, final Document doc,
final String type, final String type,
final List<Instance> instances,
final KeyValue collectedFrom, final KeyValue collectedFrom,
final KeyValue hostedBy,
final DataInfo info, final DataInfo info,
final long lastUpdateTimestamp) { final long lastUpdateTimestamp) {
@ -148,14 +125,14 @@ public abstract class AbstractMdRecordToOafMapper {
switch (type.toLowerCase()) { switch (type.toLowerCase()) {
case "publication": case "publication":
final Publication p = new Publication(); final Publication p = new Publication();
populateResultFields(p, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp); populateResultFields(p, doc, instances, collectedFrom, info, lastUpdateTimestamp);
p.setResulttype(PUBLICATION_DEFAULT_RESULTTYPE); p.setResulttype(PUBLICATION_DEFAULT_RESULTTYPE);
p.setJournal(prepareJournal(doc, info)); p.setJournal(prepareJournal(doc, info));
oafs.add(p); oafs.add(p);
break; break;
case "dataset": case "dataset":
final Dataset d = new Dataset(); final Dataset d = new Dataset();
populateResultFields(d, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp); populateResultFields(d, doc, instances, collectedFrom, info, lastUpdateTimestamp);
d.setResulttype(DATASET_DEFAULT_RESULTTYPE); d.setResulttype(DATASET_DEFAULT_RESULTTYPE);
d.setStoragedate(prepareDatasetStorageDate(doc, info)); d.setStoragedate(prepareDatasetStorageDate(doc, info));
d.setDevice(prepareDatasetDevice(doc, info)); d.setDevice(prepareDatasetDevice(doc, info));
@ -168,7 +145,7 @@ public abstract class AbstractMdRecordToOafMapper {
break; break;
case "software": case "software":
final Software s = new Software(); final Software s = new Software();
populateResultFields(s, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp); populateResultFields(s, doc, instances, collectedFrom, info, lastUpdateTimestamp);
s.setResulttype(SOFTWARE_DEFAULT_RESULTTYPE); s.setResulttype(SOFTWARE_DEFAULT_RESULTTYPE);
s.setDocumentationUrl(prepareSoftwareDocumentationUrls(doc, info)); s.setDocumentationUrl(prepareSoftwareDocumentationUrls(doc, info));
s.setLicense(prepareSoftwareLicenses(doc, info)); s.setLicense(prepareSoftwareLicenses(doc, info));
@ -180,7 +157,7 @@ public abstract class AbstractMdRecordToOafMapper {
case "otherresearchproducts": case "otherresearchproducts":
default: default:
final OtherResearchProduct o = new OtherResearchProduct(); final OtherResearchProduct o = new OtherResearchProduct();
populateResultFields(o, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp); populateResultFields(o, doc, instances, collectedFrom, info, lastUpdateTimestamp);
o.setResulttype(ORP_DEFAULT_RESULTTYPE); o.setResulttype(ORP_DEFAULT_RESULTTYPE);
o.setContactperson(prepareOtherResearchProductContactPersons(doc, info)); o.setContactperson(prepareOtherResearchProductContactPersons(doc, info));
o.setContactgroup(prepareOtherResearchProductContactGroups(doc, info)); o.setContactgroup(prepareOtherResearchProductContactGroups(doc, info));
@ -259,14 +236,16 @@ public abstract class AbstractMdRecordToOafMapper {
private void populateResultFields( private void populateResultFields(
final Result r, final Result r,
final Document doc, final Document doc,
final List<Instance> instances,
final KeyValue collectedFrom, final KeyValue collectedFrom,
final KeyValue hostedBy,
final DataInfo info, final DataInfo info,
final long lastUpdateTimestamp) { final long lastUpdateTimestamp) {
r.setDataInfo(info); r.setDataInfo(info);
r.setLastupdatetimestamp(lastUpdateTimestamp); r.setLastupdatetimestamp(lastUpdateTimestamp);
r.setId(createOpenaireId(50, doc.valueOf("//dri:objIdentifier"), false)); r.setId(createOpenaireId(50, doc.valueOf("//dri:objIdentifier"), false));
r.setOriginalId(Arrays.asList(doc.valueOf("//dri:objIdentifier")));
r.setOriginalId(Arrays.asList(findOriginalId(doc)));
r.setCollectedfrom(Arrays.asList(collectedFrom)); r.setCollectedfrom(Arrays.asList(collectedFrom));
r.setPid(prepareResultPids(doc, info)); r.setPid(prepareResultPids(doc, info));
r.setDateofcollection(doc.valueOf("//dr:dateOfCollection")); r.setDateofcollection(doc.valueOf("//dr:dateOfCollection"));
@ -291,7 +270,7 @@ public abstract class AbstractMdRecordToOafMapper {
r.setCoverage(prepareCoverages(doc, info)); r.setCoverage(prepareCoverages(doc, info));
r.setContext(prepareContexts(doc, info)); r.setContext(prepareContexts(doc, info));
r.setExternalReference(new ArrayList<>()); // NOT PRESENT IN MDSTORES r.setExternalReference(new ArrayList<>()); // NOT PRESENT IN MDSTORES
final List<Instance> instances = prepareInstances(doc, info, collectedFrom, hostedBy);
r.setInstance(instances); r.setInstance(instances);
r.setBestaccessright(getBestAccessRights(instances)); r.setBestaccessright(getBestAccessRights(instances));
} }
@ -429,6 +408,18 @@ public abstract class AbstractMdRecordToOafMapper {
return null; return null;
} }
private String findOriginalId(final Document doc) {
final Node n = doc.selectSingleNode("//*[local-name()='provenance']/*[local-name()='originDescription']");
if (n != null) {
final String id = n.valueOf("./*[local-name()='identifier']");
if (StringUtils.isNotBlank(id)) {
return id;
}
}
return doc.valueOf("//*[local-name()='header']/*[local-name()='identifier']");
}
protected Qualifier prepareQualifier(final Node node, final String xpath, final String schemeId) { protected Qualifier prepareQualifier(final Node node, final String xpath, final String schemeId) {
return prepareQualifier(node.valueOf(xpath).trim(), schemeId); return prepareQualifier(node.valueOf(xpath).trim(), schemeId);
} }

View File

@ -4,7 +4,11 @@ package eu.dnetlib.dhp.oa.graph.raw.common;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -57,6 +61,7 @@ public class OafMapperUtils {
.stream(values) .stream(values)
.map(v -> field(v, info)) .map(v -> field(v, info))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.filter(distinctByKey(f -> f.getValue()))
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
@ -65,6 +70,7 @@ public class OafMapperUtils {
.stream() .stream()
.map(v -> field(v, info)) .map(v -> field(v, info))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.filter(distinctByKey(f -> f.getValue()))
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
@ -237,4 +243,10 @@ public class OafMapperUtils {
public static String asString(final Object o) { public static String asString(final Object o) {
return o == null ? "" : o.toString(); return o == null ? "" : o.toString();
} }
public static <T> Predicate<T> distinctByKey(
final Function<? super T, ?> keyExtractor) {
final Map<Object, Boolean> seen = new ConcurrentHashMap<>();
return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
}
} }

View File

@ -0,0 +1,89 @@
package eu.dnetlib.dhp.sx.ebi
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator
object EBIAggregator {
def getDatasetAggregator(): Aggregator[(String, OafDataset), OafDataset, OafDataset] = new Aggregator[(String, OafDataset), OafDataset, OafDataset]{
override def zero: OafDataset = new OafDataset()
override def reduce(b: OafDataset, a: (String, OafDataset)): OafDataset = {
b.mergeFrom(a._2)
if (b.getId == null)
b.setId(a._2.getId)
b
}
override def merge(wx: OafDataset, wy: OafDataset): OafDataset = {
wx.mergeFrom(wy)
if(wx.getId == null && wy.getId.nonEmpty)
wx.setId(wy.getId)
wx
}
override def finish(reduction: OafDataset): OafDataset = reduction
override def bufferEncoder: Encoder[OafDataset] =
Encoders.kryo(classOf[OafDataset])
override def outputEncoder: Encoder[OafDataset] =
Encoders.kryo(classOf[OafDataset])
}
def getPublicationAggregator(): Aggregator[(String, Publication), Publication, Publication] = new Aggregator[(String, Publication), Publication, Publication]{
override def zero: Publication = new Publication()
override def reduce(b: Publication, a: (String, Publication)): Publication = {
b.mergeFrom(a._2)
if (b.getId == null)
b.setId(a._2.getId)
b
}
override def merge(wx: Publication, wy: Publication): Publication = {
wx.mergeFrom(wy)
if(wx.getId == null && wy.getId.nonEmpty)
wx.setId(wy.getId)
wx
}
override def finish(reduction: Publication): Publication = reduction
override def bufferEncoder: Encoder[Publication] =
Encoders.kryo(classOf[Publication])
override def outputEncoder: Encoder[Publication] =
Encoders.kryo(classOf[Publication])
}
def getRelationAggregator(): Aggregator[(String, Relation), Relation, Relation] = new Aggregator[(String, Relation), Relation, Relation]{
override def zero: Relation = new Relation()
override def reduce(b: Relation, a: (String, Relation)): Relation = {
a._2
}
override def merge(a: Relation, b: Relation): Relation = {
if(b!= null) b else a
}
override def finish(reduction: Relation): Relation = reduction
override def bufferEncoder: Encoder[Relation] =
Encoders.kryo(classOf[Relation])
override def outputEncoder: Encoder[Relation] =
Encoders.kryo(classOf[Relation])
}
}

View File

@ -0,0 +1,138 @@
package eu.dnetlib.dhp.sx.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Instance, KeyValue, Oaf}
import eu.dnetlib.dhp.schema.scholexplorer.OafUtils.createQualifier
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIRelation, OafUtils, ProvenaceInfo}
import eu.dnetlib.dhp.utils.DHPUtils
import eu.dnetlib.scholexplorer.relation.RelationMapper
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JField, JObject, JString}
import org.json4s.jackson.JsonMethods.parse
import scala.collection.JavaConverters._
object SparkAddLinkUpdates {
val relationMapper = RelationMapper.load
case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:String, turl:String, title:String, publisher:String) {}
def generatePubmedDLICollectedFrom(): KeyValue = {
OafUtils.generateKeyValue("dli_________::europe_pmc__", "Europe PMC")
}
def ebiLinksToOaf(input:(String, String)):List[Oaf] = {
val pmid :String = input._1
val input_json :String = input._2
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input_json)
val targets:List[EBILinks] = for {
JObject(link) <- json \\ "Category" \\ "Link"
JField("PublicationDate", JString(pubdate)) <- link
JField("RelationshipType", JObject(relationshipType)) <- link
JField("Name", JString(relname)) <- relationshipType
JField("Target", JObject(target)) <- link
JField("Identifier", JObject(identifier)) <- target
JField("ID", JString(tpid)) <- identifier
JField("IDScheme", JString(tpidtype)) <- identifier
JField("IDURL", JString(turl)) <- identifier
JField("Title", JString(title)) <- target
JField("Publisher", JObject(pub)) <- target
JField("Name", JString(publisher)) <- pub
} yield EBILinks(relname, pubdate, tpid, tpidtype, turl,title, publisher)
val dnetPublicationId = s"50|${DHPUtils.md5(s"$pmid::pmid")}"
targets.flatMap(l => {
val relation = new DLIRelation
val inverseRelation = new DLIRelation
val targetDnetId = s"50|${DHPUtils.md5(s"${l.tpid.toLowerCase.trim}::${l.tpidType.toLowerCase.trim}")}"
val relInfo = relationMapper.get(l.relation.toLowerCase)
val relationSemantic = relInfo.getOriginal
val inverseRelationSemantic = relInfo.getInverse
relation.setSource(dnetPublicationId)
relation.setTarget(targetDnetId)
relation.setRelClass("datacite")
relation.setRelType(relationSemantic)
relation.setCollectedfrom(List(generatePubmedDLICollectedFrom()).asJava)
inverseRelation.setSource(targetDnetId)
inverseRelation.setTarget(dnetPublicationId)
inverseRelation.setRelClass("datacite")
inverseRelation.setRelType(inverseRelationSemantic)
inverseRelation.setCollectedfrom(List(generatePubmedDLICollectedFrom()).asJava)
val d = new DLIDataset
d.setId(targetDnetId)
d.setDataInfo(OafUtils.generateDataInfo())
d.setPid(List(OafUtils.createSP(l.tpid.toLowerCase.trim, l.tpidType.toLowerCase.trim, "dnet:pid_types")).asJava)
d.setCompletionStatus("complete")
val pi = new ProvenaceInfo
pi.setId("dli_________::europe_pmc__")
pi.setName( "Europe PMC")
pi.setCompletionStatus("complete")
pi.setCollectionMode("collected")
d.setDlicollectedfrom(List(pi).asJava)
d.setCollectedfrom(List(generatePubmedDLICollectedFrom()).asJava)
d.setPublisher(OafUtils.asField(l.publisher))
d.setTitle(List(OafUtils.createSP(l.title, "main title", "dnet:dataCite_title")).asJava)
d.setDateofacceptance(OafUtils.asField(l.pubdate))
val i = new Instance
i.setCollectedfrom(generatePubmedDLICollectedFrom())
i.setDateofacceptance(d.getDateofacceptance)
i.setUrl(List(l.turl).asJava)
i.setInstancetype(createQualifier("0021", "Dataset", "dnet:publication_resource", "dnet:publication_resource"))
d.setInstance(List(i).asJava)
List(relation, inverseRelation, d)
})
}
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateEBIDataFrame.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(SparkCreateEBIDataFrame.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val workingPath = parser.get("workingPath")
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo(classOf[DLIRelation])
implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset])
val ds:Dataset[(String,String)] = spark.read.load(s"$workingPath/baseline_links_updates").as[(String,String)](Encoders.tuple(Encoders.STRING, Encoders.STRING))
ds.flatMap(l =>ebiLinksToOaf(l)).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_oaf")
ds.filter(s => s.isInstanceOf)
val oDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/baseline_links_updates_oaf").as[Oaf]
oDataset.filter(p =>p.isInstanceOf[DLIRelation]).map(p => p.asInstanceOf[DLIRelation]).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_relation")
oDataset.filter(p =>p.isInstanceOf[DLIDataset]).map(p => p.asInstanceOf[DLIDataset]).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_dataset")
}
}

View File

@ -0,0 +1,49 @@
package eu.dnetlib.dhp.sx.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal, PMParser}
import scala.io.Source
import scala.xml.pull.XMLEventReader
object SparkCreateBaselineDataFrame {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateEBIDataFrame.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(SparkCreateEBIDataFrame.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sc = spark.sparkContext
val workingPath = parser.get("workingPath")
implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle])
implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal])
implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor])
val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline",2000)
val ds:Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i =>{
val xml = new XMLEventReader(Source.fromBytes(i._2.getBytes()))
new PMParser(xml)
} ))
ds.write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset")
}
}

View File

@ -0,0 +1,87 @@
package eu.dnetlib.dhp.sx.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser}
import eu.dnetlib.scholexplorer.relation.RelationMapper
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
object SparkCreateEBIDataFrame {
def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(SparkCreateEBIDataFrame.getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateEBIDataFrame.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(SparkCreateEBIDataFrame.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sc = spark.sparkContext
val workingPath = parser.get("workingPath")
val relationMapper = RelationMapper.load
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset])
implicit val pubEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication])
implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
logger.info("Extract Publication and relation from publication_xml")
val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s =>
{
new ObjectMapper().readValue(s, classOf[String])
}).flatMap(s => {
val d = new PublicationScholexplorerParser
d.parseObject(s, relationMapper).asScala.iterator})
val mapper = new ObjectMapper()
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
spark.createDataset(oafPubsRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf")
logger.info("Extract Publication and relation from dataset_xml")
val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/dataset_xml").map(s =>
{
new ObjectMapper().readValue(s, classOf[String])
}).flatMap(s => {
val d = new DatasetScholexplorerParser
d.parseObject(s, relationMapper).asScala.iterator})
spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf")
val dataset: Dataset[OafDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[OafDataset]).map(d => d.asInstanceOf[OafDataset])
val publication: Dataset[Publication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Publication]).map(d => d.asInstanceOf[Publication])
val relations: Dataset[Relation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Relation]).map(d => d.asInstanceOf[Relation])
publication.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getPublicationAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/publication")
dataset.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datasetEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDatasetAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset")
relations.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getRelationAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/relation")
relations.map(r => (r.getSource, r.getTarget))(Encoders.tuple(Encoders.STRING,Encoders.STRING))
}
}

View File

@ -0,0 +1,64 @@
package eu.dnetlib.dhp.sx.ebi.model;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class PMArticle implements Serializable {
private String pmid;
private String date;
private PMJournal journal;
private String title;
private String description;
private List<PMAuthor> authors = new ArrayList<>();
public String getPmid() {
return pmid;
}
public void setPmid(String pmid) {
this.pmid = pmid;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public PMJournal getJournal() {
return journal;
}
public void setJournal(PMJournal journal) {
this.journal = journal;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public List<PMAuthor> getAuthors() {
return authors;
}
public void setAuthors(List<PMAuthor> authors) {
this.authors = authors;
}
}

View File

@ -0,0 +1,31 @@
package eu.dnetlib.dhp.sx.ebi.model;
import java.io.Serializable;
public class PMAuthor implements Serializable {
private String lastName;
private String foreName;
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public String getForeName() {
return foreName;
}
public void setForeName(String foreName) {
this.foreName = foreName;
}
public String getFullName() {
return String.format("%s, %s", this.foreName, this.lastName);
}
}

View File

@ -0,0 +1,53 @@
package eu.dnetlib.dhp.sx.ebi.model;
import java.io.Serializable;
public class PMJournal implements Serializable {
private String issn;
private String volume;
private String issue;
private String date;
private String title;
public String getIssn() {
return issn;
}
public void setIssn(String issn) {
this.issn = issn;
}
public String getVolume() {
return volume;
}
public void setVolume(String volume) {
this.volume = volume;
}
public String getIssue() {
return issue;
}
public void setIssue(String issue) {
this.issue = issue;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
}

View File

@ -0,0 +1,92 @@
package eu.dnetlib.dhp.sx.ebi.model
import scala.xml.pull.{EvElemEnd, EvElemStart, EvText, XMLEventReader}
class PMParser(xml:XMLEventReader) extends Iterator[PMArticle] {
var currentArticle:PMArticle = generateNextArticle()
override def hasNext: Boolean = currentArticle!= null
override def next(): PMArticle = {
val tmp = currentArticle
currentArticle = generateNextArticle()
tmp
}
def generateNextArticle():PMArticle = {
var currentAuthor: PMAuthor = null
var currentJournal: PMJournal = null
var currNode: String = null
var currentYear = "0"
var currentMonth = "01"
var currentDay = "01"
while (xml.hasNext) {
xml.next match {
case EvElemStart(_, label, _, _) =>
currNode = label
label match {
case "PubmedArticle" => currentArticle = new PMArticle
case "Author" => currentAuthor = new PMAuthor
case "Journal" => currentJournal = new PMJournal
case _ =>
}
case EvElemEnd(_, label) =>
label match {
case "PubmedArticle" => return currentArticle
case "Author" => currentArticle.getAuthors.add(currentAuthor)
case "Journal" => currentArticle.setJournal(currentJournal)
case "DateCompleted" => currentArticle.setDate(s"$currentYear-$currentMonth-$currentDay")
case "PubDate" => currentJournal.setDate(s"$currentYear-$currentMonth-$currentDay")
case _ =>
}
case EvText(text) =>
if (currNode!= null && text.trim.nonEmpty)
currNode match {
case "ArticleTitle" => {
if (currentArticle.getTitle==null)
currentArticle.setTitle(text.trim)
else
currentArticle.setTitle(currentArticle.getTitle + text.trim)
}
case "AbstractText" => {
if (currentArticle.getDescription==null)
currentArticle.setDescription(text.trim)
else
currentArticle.setDescription(currentArticle.getDescription + text.trim)
}
case "PMID" => currentArticle.setPmid(text.trim)
case "ISSN" => currentJournal.setIssn(text.trim)
case "Year" => currentYear = text.trim
case "Month" => currentMonth = text.trim
case "Day" => currentDay = text.trim
case "Volume" => currentJournal.setVolume( text.trim)
case "Issue" => currentJournal.setIssue (text.trim)
case "LastName" => {
if (currentAuthor != null)
currentAuthor.setLastName(text.trim)
}
case "ForeName" => if (currentAuthor != null)
currentAuthor.setForeName(text.trim)
case "Title" =>
if (currentJournal.getTitle==null)
currentJournal.setTitle(text.trim)
else
currentJournal.setTitle(currentJournal.getTitle + text.trim)
case _ =>
}
case _ =>
}
}
null
}
}

View File

@ -150,6 +150,17 @@ public abstract class AbstractScholexplorerParser {
return uk; return uk;
} }
protected Qualifier generateQualifier(final String classId, final String className, final String schemeId,
final String schemeName) {
final Qualifier q = new Qualifier();
q.setClassid(classId);
q.setClassid(className);
q.setSchemeid(schemeId);
q.setSchemename(schemeName);
return q;
}
protected void generateRelations( protected void generateRelations(
RelationMapper relationMapper, RelationMapper relationMapper,
Result parsedObject, Result parsedObject,

View File

@ -64,7 +64,6 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
currentDate.setQualifier(dateQualifier); currentDate.setQualifier(dateQualifier);
parsedObject.setRelevantdate(Collections.singletonList(currentDate)); parsedObject.setRelevantdate(Collections.singletonList(currentDate));
} }
final String completionStatus = VtdUtilityParser final String completionStatus = VtdUtilityParser
.getSingleValue(ap, vn, "//*[local-name()='completionStatus']"); .getSingleValue(ap, vn, "//*[local-name()='completionStatus']");
final String provisionMode = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='provisionMode']"); final String provisionMode = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='provisionMode']");
@ -149,6 +148,37 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
inferPid(currentPid); inferPid(currentPid);
parsedObject.setPid(Collections.singletonList(currentPid)); parsedObject.setPid(Collections.singletonList(currentPid));
String resolvedURL = null;
switch (currentPid.getQualifier().getClassname().toLowerCase()) {
case "uniprot":
resolvedURL = "https://www.uniprot.org/uniprot/" + currentPid.getValue();
break;
case "ena":
if (StringUtils.isNotBlank(currentPid.getValue()) && currentPid.getValue().length() > 7)
resolvedURL = "https://www.ebi.ac.uk/ena/data/view/" + currentPid.getValue().substring(0, 8);
break;
case "chembl":
resolvedURL = "https://www.ebi.ac.uk/chembl/compound_report_card/" + currentPid.getValue();
break;
case "ncbi-n":
resolvedURL = "https://www.ncbi.nlm.nih.gov/nuccore/" + currentPid.getValue();
break;
case "ncbi-p":
resolvedURL = "https://www.ncbi.nlm.nih.gov/nuccore/" + currentPid.getValue();
break;
case "genbank":
resolvedURL = "https://www.ncbi.nlm.nih.gov/nuccore/" + currentPid.getValue();
break;
case "pdb":
resolvedURL = "https://www.ncbi.nlm.nih.gov/nuccore/" + currentPid.getValue();
break;
case "url":
resolvedURL = currentPid.getValue();
break;
}
final String sourceId = generateId( final String sourceId = generateId(
currentPid.getValue(), currentPid.getQualifier().getClassid(), "dataset"); currentPid.getValue(), currentPid.getQualifier().getClassid(), "dataset");
parsedObject.setId(sourceId); parsedObject.setId(sourceId);
@ -251,6 +281,11 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
t -> { t -> {
final StructuredProperty st = new StructuredProperty(); final StructuredProperty st = new StructuredProperty();
st.setValue(t); st.setValue(t);
st
.setQualifier(
generateQualifier(
"main title", "main title", "dnet:dataCite_title",
"dnet:dataCite_title"));
return st; return st;
}) })
.collect(Collectors.toList())); .collect(Collectors.toList()));
@ -282,6 +317,13 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
.collect(Collectors.toList())); .collect(Collectors.toList()));
} }
if (StringUtils.isNotBlank(resolvedURL)) {
Instance i = new Instance();
i.setCollectedfrom(parsedObject.getCollectedfrom().get(0));
i.setUrl(Collections.singletonList(resolvedURL));
parsedObject.setInstance(Collections.singletonList(i));
}
result.add(parsedObject); result.add(parsedObject);
return result; return result;
} catch (Throwable e) { } catch (Throwable e) {

View File

@ -202,6 +202,11 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser
t -> { t -> {
final StructuredProperty st = new StructuredProperty(); final StructuredProperty st = new StructuredProperty();
st.setValue(t); st.setValue(t);
st
.setQualifier(
generateQualifier(
"main title", "main title", "dnet:dataCite_title",
"dnet:dataCite_title"));
return st; return st;
}) })
.collect(Collectors.toList())); .collect(Collectors.toList()));

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -0,0 +1,293 @@
<workflow-app name="merge graphs" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>betaInputGgraphPath</name>
<description>the beta graph root path</description>
</property>
<property>
<name>prodInputGgraphPath</name>
<description>the production graph root path</description>
</property>
<property>
<name>graphOutputPath</name>
<description>the output merged graph root path</description>
</property>
<property>
<name>priority</name>
<description>decides from which infrastructure the content must win in case of ID clash</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<start to="fork_merge_graph"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<fork name="fork_merge_graph">
<path start="merge_publication"/>
<path start="merge_dataset"/>
<path start="merge_otherresearchproduct"/>
<path start="merge_software"/>
<path start="merge_datasource"/>
<path start="merge_organization"/>
<path start="merge_project"/>
<path start="merge_relation"/>
</fork>
<action name="merge_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge publications</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/publication</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/publication</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge datasets</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/dataset</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/dataset</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge otherresearchproducts</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/otherresearchproduct</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge softwares</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/software</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/software</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_datasource">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge datasources</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/datasource</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/datasource</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_organization">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge organizations</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/organization</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/organization</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge projects</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/project</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/project</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_relation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge relations</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/relation</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/relation</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<join name="wait_merge" to="End"/>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,38 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "bin",
"paramLongName": "betaInputPath",
"paramDescription": "the beta graph root path",
"paramRequired": true
},
{
"paramName": "pin",
"paramLongName": "prodInputPath",
"paramDescription": "the production graph root path",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the output merged graph root path",
"paramRequired": true
},
{
"paramName": "class",
"paramLongName": "graphTableClassName",
"paramDescription": "class name moelling the graph table",
"paramRequired": true
},
{
"paramName": "pr",
"paramLongName": "priority",
"paramDescription": "decides from which infrastructure the content must win in case of ID clash",
"paramRequired": false
}
]

View File

@ -0,0 +1,4 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}
]

View File

@ -0,0 +1,68 @@
<configuration>
<!-- OCEAN -->
<!--
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
-->
<!-- GARR -->
<property>
<name>jobTracker</name>
<value>yarn</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://hadoop-rm1.garr-pa1.d4science.org:8020</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://hadoop-edge3.garr-pa1.d4science.org:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://hadoop-rm2.garr-pa1.d4science.org:19888</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
</property>
</configuration>

View File

@ -0,0 +1,97 @@
<workflow-app name="Create EBI Dataset" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>workingPath</name>
<description>the Working Path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="GenerateUpdates"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="GenerateBaselineDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create Baselnie DataSet</name>
<class>eu.dnetlib.dhp.sx.ebi.SparkCreateBaselineDataFrame</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=1
--driver-memory=${sparkDriverMemory}
--executor-cores=${sparkExecutorCores}
${sparkExtraOPT}
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="GenerateUpdates">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create Baselnie DataSet</name>
<class>eu.dnetlib.dhp.sx.ebi.SparkAddLinkUpdates</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=1
--driver-memory=${sparkDriverMemory}
--executor-cores=${sparkExecutorCores}
${sparkExtraOPT}
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="CreateEBIDataSet">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create EBI DataSet</name>
<class>eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=1000
${sparkExtraOPT}
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -6,6 +6,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
@ -20,6 +21,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils; import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
@ -31,24 +33,25 @@ import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class MappersTest { public class MappersTest {
@Mock
private ISLookUpService isLookUpService;
@Mock @Mock
private VocabularyGroup vocs; private VocabularyGroup vocs;
@BeforeEach @BeforeEach
public void setUp() throws Exception { public void setUp() throws Exception {
when(vocs.getTermAsQualifier(anyString(), anyString())) lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs());
.thenAnswer( lenient()
invocation -> OafMapperUtils .when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY))
.qualifier( .thenReturn(synonyms());
invocation.getArgument(1), invocation.getArgument(1), invocation.getArgument(0),
invocation.getArgument(0)));
when(vocs.termExists(anyString(), anyString())).thenReturn(true);
vocs = VocabularyGroup.loadVocsFromIS(isLookUpService);
} }
@Test @Test
@ -68,9 +71,14 @@ public class MappersTest {
final Relation r2 = (Relation) list.get(2); final Relation r2 = (Relation) list.get(2);
assertValidId(p.getId()); assertValidId(p.getId());
assertTrue(p.getOriginalId().size() == 1);
assertEquals("10.3897/oneeco.2.e13718", p.getOriginalId().get(0));
assertValidId(p.getCollectedfrom().get(0).getKey()); assertValidId(p.getCollectedfrom().get(0).getKey());
assertTrue(StringUtils.isNotBlank(p.getTitle().get(0).getValue())); assertTrue(StringUtils.isNotBlank(p.getTitle().get(0).getValue()));
assertFalse(p.getDataInfo().getInvisible()); assertFalse(p.getDataInfo().getInvisible());
assertTrue(p.getSource().size() == 1);
assertTrue(p.getAuthor().size() > 0); assertTrue(p.getAuthor().size() > 0);
final Optional<Author> author = p final Optional<Author> author = p
@ -79,6 +87,7 @@ public class MappersTest {
.filter(a -> a.getPid() != null && !a.getPid().isEmpty()) .filter(a -> a.getPid() != null && !a.getPid().isEmpty())
.findFirst(); .findFirst();
assertTrue(author.isPresent()); assertTrue(author.isPresent());
final StructuredProperty pid = author final StructuredProperty pid = author
.get() .get()
.getPid() .getPid()
@ -169,6 +178,8 @@ public class MappersTest {
final Relation r2 = (Relation) list.get(2); final Relation r2 = (Relation) list.get(2);
assertValidId(d.getId()); assertValidId(d.getId());
assertTrue(d.getOriginalId().size() == 1);
assertEquals("oai:zenodo.org:3234526", d.getOriginalId().get(0));
assertValidId(d.getCollectedfrom().get(0).getKey()); assertValidId(d.getCollectedfrom().get(0).getKey());
assertTrue(StringUtils.isNotBlank(d.getTitle().get(0).getValue())); assertTrue(StringUtils.isNotBlank(d.getTitle().get(0).getValue()));
assertTrue(d.getAuthor().size() > 0); assertTrue(d.getAuthor().size() > 0);
@ -261,4 +272,15 @@ public class MappersTest {
assertEquals(':', id.charAt(15)); assertEquals(':', id.charAt(15));
assertEquals(':', id.charAt(16)); assertEquals(':', id.charAt(16));
} }
private List<String> vocs() throws IOException {
return IOUtils
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));
}
private List<String> synonyms() throws IOException {
return IOUtils
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt"));
}
} }

View File

@ -0,0 +1,20 @@
package eu.dnetlib.dhp.sx.ebi
import org.junit.jupiter.api.Test
class TestEBI {
@Test
def testEBIData() = {
SparkAddLinkUpdates.main("-mt local[*] -w /home/sandro/Downloads".split(" "))
}
}

View File

@ -34,6 +34,8 @@
<dc:relation>info:eu-repo/semantics/altIdentifier/eissn/2367-8194</dc:relation> <dc:relation>info:eu-repo/semantics/altIdentifier/eissn/2367-8194</dc:relation>
<dc:relation>info:eu-repo/grantAgreement/EC/FP7/226852</dc:relation> <dc:relation>info:eu-repo/grantAgreement/EC/FP7/226852</dc:relation>
<dc:source>One Ecosystem 2: e13718</dc:source> <dc:source>One Ecosystem 2: e13718</dc:source>
<dc:source>One Ecosystem 2: e13718</dc:source>
<dc:source>One Ecosystem 2: e13718</dc:source>
<dc:subject>Ecosystem Services hotspots</dc:subject> <dc:subject>Ecosystem Services hotspots</dc:subject>
<dc:subject>Natura 2000</dc:subject> <dc:subject>Natura 2000</dc:subject>
<dc:subject>Quiet Protected Areas</dc:subject> <dc:subject>Quiet Protected Areas</dc:subject>
@ -47,7 +49,8 @@
<dc:subject>regulating services</dc:subject> <dc:subject>regulating services</dc:subject>
<dc:subject>supporting services</dc:subject> <dc:subject>supporting services</dc:subject>
<dc:type>Research Article</dc:type> <dc:type>Research Article</dc:type>
<dr:CobjCategory type="publication">0001</dr:CobjCategory> <!--<dr:CobjCategory type="publication">0001</dr:CobjCategory>-->
<dr:CobjCategory>0001</dr:CobjCategory>
<oaf:dateAccepted>2017-01-01</oaf:dateAccepted> <oaf:dateAccepted>2017-01-01</oaf:dateAccepted>
<oaf:projectid>corda_______::226852</oaf:projectid> <oaf:projectid>corda_______::226852</oaf:projectid>
<oaf:accessrights>OPEN</oaf:accessrights> <oaf:accessrights>OPEN</oaf:accessrights>

View File

@ -82,7 +82,8 @@
<p>All files are in MATLAB .mat format.</p></description> <p>All files are in MATLAB .mat format.</p></description>
</descriptions> </descriptions>
</resource> </resource>
<dr:CobjCategory type="dataset">0021</dr:CobjCategory> <!--<dr:CobjCategory type="dataset">0021</dr:CobjCategory>-->
<dr:CobjCategory>0021</dr:CobjCategory>
<oaf:dateAccepted>2019-01-01</oaf:dateAccepted> <oaf:dateAccepted>2019-01-01</oaf:dateAccepted>
<oaf:accessrights>OPEN</oaf:accessrights> <oaf:accessrights>OPEN</oaf:accessrights>
<oaf:language>und</oaf:language> <oaf:language>und</oaf:language>

View File

@ -52,7 +52,8 @@
subjectScheme="EDAM Ontology" valueURI="http://edamontology.org/topic_3534">Protein binding sites</datacite:subject> subjectScheme="EDAM Ontology" valueURI="http://edamontology.org/topic_3534">Protein binding sites</datacite:subject>
</datacite:subjects> </datacite:subjects>
</datacite:resource> </datacite:resource>
<dr:CobjCategory type="software">0029</dr:CobjCategory> <!--<dr:CobjCategory type="software">0029</dr:CobjCategory>-->
<dr:CobjCategory>0029</dr:CobjCategory>
<oaf:hostedBy id="rest________::bioTools" name="bio.tools"/> <oaf:hostedBy id="rest________::bioTools" name="bio.tools"/>
<oaf:collectedFrom id="rest________::bioTools" name="bio.tools"/> <oaf:collectedFrom id="rest________::bioTools" name="bio.tools"/>
<oaf:dateAccepted>2018-06-06</oaf:dateAccepted> <oaf:dateAccepted>2018-06-06</oaf:dateAccepted>

View File

@ -0,0 +1,55 @@
{
"Category": [
{
"Section": [
{
"Linklist": {
"Link": [
{
"LinkProvider": {
"Name": "Europe PMC"
},
"Target": {
"Publisher": {
"Name": "Altmetric"
},
"ImageURL": "https://api.altmetric.com/v1/donut/58578459_64.png",
"Identifier": {
"ID": "https://www.altmetric.com/details/58578459",
"IDScheme": "URL",
"IDURL": "https://www.altmetric.com/details/58578459"
},
"Type": {
"Name": "dataset"
},
"Title": "Optical clumped isotope thermometry of carbon dioxide"
},
"Source": {
"Identifier": {
"ID": "30886173",
"IDScheme": "PMID"
},
"Type": {
"Name": "literature"
}
},
"PublicationDate": "06-04-2019",
"RelationshipType": {
"Name": "IsReferencedBy"
},
"ObtainedBy": "ext_links"
}
]
},
"ObtainedBy": "ext_links",
"SectionLinkCount": 1,
"Tags": [
"altmetrics"
]
}
],
"CategoryLinkCount": 1,
"Name": "Altmetric"
}
]
}

View File

@ -0,0 +1,191 @@
{
"version": "6.3",
"hitCount": 4,
"request": {
"id": "28818901",
"source": "MED"
},
"dataLinkList": {
"Category": [
{
"Name": "Nucleotide Sequences",
"CategoryLinkCount": 3,
"Section": [
{
"ObtainedBy": "tm_accession",
"Tags": [
"supporting_data"
],
"SectionLinkCount": 1,
"Linklist": {
"Link": [
{
"ObtainedBy": "tm_accession",
"PublicationDate": "27-02-2020",
"LinkProvider": {
"Name": "Europe PMC"
},
"RelationshipType": {
"Name": "References"
},
"Source": {
"Type": {
"Name": "literature"
},
"Identifier": {
"ID": "28818901",
"IDScheme": "MED"
}
},
"Target": {
"Type": {
"Name": "dataset"
},
"Identifier": {
"ID": "AP008937",
"IDScheme": "ENA",
"IDURL": "http://identifiers.org/ena.embl/AP008937"
},
"Title": "AP008937",
"Publisher": {
"Name": "Europe PMC"
}
},
"Frequency": 1
}
]
}
},
{
"ObtainedBy": "submission",
"Tags": [
"related_data"
],
"SectionLinkCount": 2,
"CollectionURL": "http://www.ebi.ac.uk/ena/data/search?query=28818901",
"Linklist": {
"Link": [
{
"ObtainedBy": "submission",
"PublicationDate": "25-06-2018",
"LinkProvider": {
"Name": "Europe PMC"
},
"RelationshipType": {
"Name": "IsReferencedBy"
},
"Source": {
"Type": {
"Name": "literature"
},
"Identifier": {
"ID": "28818901",
"IDScheme": "PMID"
}
},
"Target": {
"Type": {
"Name": "dataset"
},
"Identifier": {
"ID": "NIWV01000000",
"IDScheme": "ENA",
"IDURL": "http://www.ebi.ac.uk/ena/data/view/NIWV01000000"
},
"Title": "Nucleotide sequences",
"Publisher": {
"Name": "ENA"
}
}
},
{
"ObtainedBy": "submission",
"PublicationDate": "25-06-2018",
"LinkProvider": {
"Name": "Europe PMC"
},
"RelationshipType": {
"Name": "IsReferencedBy"
},
"Source": {
"Type": {
"Name": "literature"
},
"Identifier": {
"ID": "28818901",
"IDScheme": "PMID"
}
},
"Target": {
"Type": {
"Name": "dataset"
},
"Identifier": {
"ID": "PRJNA390617",
"IDScheme": "ENA",
"IDURL": "http://www.ebi.ac.uk/ena/data/view/PRJNA390617"
},
"Title": "Lactobacillus fermentum strain:BFE 6620",
"Publisher": {
"Name": "ENA"
}
}
}
]
}
}
]
},
{
"Name": "BioStudies: supplemental material and supporting data",
"CategoryLinkCount": 1,
"Section": [
{
"ObtainedBy": "ext_links",
"Tags": [
"supporting_data"
],
"SectionLinkCount": 1,
"Linklist": {
"Link": [
{
"ObtainedBy": "ext_links",
"PublicationDate": "24-07-2018",
"LinkProvider": {
"Name": "Europe PMC"
},
"RelationshipType": {
"Name": "IsReferencedBy"
},
"Source": {
"Type": {
"Name": "literature"
},
"Identifier": {
"ID": "28818901",
"IDScheme": "PMID"
}
},
"Target": {
"Type": {
"Name": "dataset"
},
"Identifier": {
"ID": "http://www.ebi.ac.uk/biostudies/studies/S-EPMC5604774?xr=true",
"IDScheme": "URL",
"IDURL": "http://www.ebi.ac.uk/biostudies/studies/S-EPMC5604774?xr=true"
},
"Title": "Draft Genome Sequence of Lactobacillus fermentum BFE 6620, a Potential Starter Culture for African Vegetable Foods, Isolated from Fermented Cassava.",
"Publisher": {
"Name": "BioStudies: supplemental material and supporting data"
}
}
}
]
}
}
]
}
]
}
}

View File

@ -5,11 +5,12 @@ import java.time.format.DateTimeFormatter
import eu.dnetlib.dhp.common.PacePerson import eu.dnetlib.dhp.common.PacePerson
import eu.dnetlib.dhp.schema.action.AtomicAction import eu.dnetlib.dhp.schema.action.AtomicAction
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Dataset, ExternalReference, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, StructuredProperty} import eu.dnetlib.dhp.schema.oaf.{Author, Dataset, ExternalReference, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, Result, StructuredProperty}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation}
import eu.dnetlib.dhp.utils.DHPUtils import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.StringUtils
import org.codehaus.jackson.map.ObjectMapper import org.codehaus.jackson.map.ObjectMapper
import eu.dnetlib.dhp.schema.scholexplorer.OafUtils._
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
@ -99,6 +100,20 @@ object DLIToOAF {
) )
def fixInstance(r:Publication) :Publication = {
val collectedFrom = r.getCollectedfrom.asScala.head
r.getInstance().asScala.foreach(i => i.setCollectedfrom(collectedFrom))
r
}
def fixInstanceDataset(r:Dataset) :Dataset = {
val collectedFrom = r.getCollectedfrom.asScala.head
r.getInstance().asScala.foreach(i => i.setCollectedfrom(collectedFrom))
r
}
def toActionSet(item: Oaf): (String, String) = { def toActionSet(item: Oaf): (String, String) = {
val mapper = new ObjectMapper() val mapper = new ObjectMapper()
@ -412,46 +427,6 @@ object DLIToOAF {
} }
def generateKeyValue(key: String, value: String): KeyValue = {
val kv: KeyValue = new KeyValue()
kv.setKey(key)
kv.setValue(value)
kv.setDataInfo(generateDataInfo("0.9"))
kv
}
def generateDataInfo(trust: String = "0.9", invisibile: Boolean = false): DataInfo = {
val di = new DataInfo
di.setDeletedbyinference(false)
di.setInferred(false)
di.setInvisible(false)
di.setTrust(trust)
di.setProvenanceaction(createQualifier("sysimport:actionset", "dnet:provenanceActions"))
di
}
def createQualifier(cls: String, sch: String): Qualifier = {
createQualifier(cls, cls, sch, sch)
}
def createQualifier(classId: String, className: String, schemeId: String, schemeName: String): Qualifier = {
val q: Qualifier = new Qualifier
q.setClassid(classId)
q.setClassname(className)
q.setSchemeid(schemeId)
q.setSchemename(schemeName)
q
}
def asField[T](value: T): Field[T] = {
val tmp = new Field[T]
tmp.setValue(value)
tmp
} }
}

View File

@ -1,7 +1,7 @@
package eu.dnetlib.dhp.`export` package eu.dnetlib.dhp.`export`
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset} import eu.dnetlib.dhp.schema.oaf.{Instance, Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation}
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.Text import org.apache.hadoop.io.Text
@ -166,10 +166,13 @@ object SparkExportContentForOpenAire {
}).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationAS") }).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationAS")
val fRels:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationAS").as[Relation].map(DLIToOAF.toActionSet)
val fpubs:Dataset[(String,String)] = spark.read.load(s"$workingPath/publicationAS").as[Publication].map(DLIToOAF.toActionSet)
val fdats:Dataset[(String,String)] = spark.read.load(s"$workingPath/datasetAS").as[OafDataset].map(DLIToOAF.toActionSet)
spark.read.load(s"$workingPath/publicationAS").as[Publication].map(DLIToOAF.fixInstance).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationAS_fixed")
spark.read.load(s"$workingPath/datasetAS").as[OafDataset].map(DLIToOAF.fixInstanceDataset).write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetAS_fixed")
val fRels:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationAS").as[Relation].map(DLIToOAF.toActionSet)
val fpubs:Dataset[(String,String)] = spark.read.load(s"$workingPath/publicationAS_fixed").as[Publication].map(DLIToOAF.toActionSet)
val fdats:Dataset[(String,String)] = spark.read.load(s"$workingPath/datasetAS_fixed").as[OafDataset].map(DLIToOAF.toActionSet)
fRels.union(fpubs).union(fdats).rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingPath/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec]) fRels.union(fpubs).union(fdats).rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingPath/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
} }

View File

@ -0,0 +1,779 @@
<RESOURCE_PROFILE>
<HEADER>
<RESOURCE_IDENTIFIER value="b05c97e6-69b5-497d-87fd-2137d3ff2c2e_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/>
<RESOURCE_TYPE value="WorkflowDSResourceType"/>
<RESOURCE_KIND value="WorkflowDSResources"/>
<RESOURCE_URI value=""/>
<DATE_OF_CREATION value="2020-06-15T16:35:09+00:00"/>
</HEADER>
<BODY>
<WORKFLOW_NAME>Graph Construction [HYBRID]</WORKFLOW_NAME>
<WORKFLOW_TYPE>Data Provision</WORKFLOW_TYPE>
<WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
<CONFIGURATION start="manual">
<NODE isStart="true" name="reuseProdContent" type="SetEnvParameter">
<DESCRIPTION>reuse cached content from the PROD aggregation system</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">reuseProdContent</PARAM>
<PARAM function="validValues(['true', 'false'])" managedBy="user" name="parameterValue" required="true" type="string">true</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="contentPathProd"/>
</ARCS>
</NODE>
<NODE name="contentPathProd" type="SetEnvParameter">
<DESCRIPTION>set the PROD aggregator content path</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">prodContentPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_aggregator</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="prodAggregatorGraphPath"/>
</ARCS>
</NODE>
<NODE name="prodAggregatorGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the path containing the PROD AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">prodAggregatorGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/00_prod_graph_aggregator</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="reuseBetaContent" type="SetEnvParameter">
<DESCRIPTION>reuse cached content from the BETA aggregation system</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">reuseBetaContent</PARAM>
<PARAM function="validValues(['true', 'false'])" managedBy="user" name="parameterValue" required="true" type="string">true</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="contentPathBeta"/>
</ARCS>
</NODE>
<NODE name="contentPathBeta" type="SetEnvParameter">
<DESCRIPTION>set the BETA aggregator content path</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">betaContentPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_aggregator</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="betaAggregatorGraphPath"/>
</ARCS>
</NODE>
<NODE name="betaAggregatorGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the path containing the BETA AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">betaAggregatorGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/00_beta_graph_aggregator</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="setIsLookUpUrl"/>
</ARCS>
</NODE>
<NODE name="setIsLookUpUrl" type="SetEnvParameter">
<DESCRIPTION>Set the IS lookup service address</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">isLookUpUrl</PARAM>
<PARAM managedBy="system" name="parameterValue" required="true" type="string">http://services.openaire.eu:8280/is/services/isLookUp?wsdl</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setMergedGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the MERGED graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">mergedGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/01_graph_merged</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setRawGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the RAW graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">rawGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/02_graph_raw</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setDedupGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the DEDUPED graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">dedupGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/03_graph_dedup</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setInferredGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the INFERRED graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">inferredGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/04_graph_inferred</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setConsistentGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the CONSISTENCY graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">consistentGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/05_graph_consistent</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setOrcidGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the ORCID enriched graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">orcidGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/06_graph_orcid</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setBulkTaggingGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the BULK TAGGED graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">bulkTaggingGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/07_graph_bulktagging</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setAffiliationGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the AFFILIATION from INSTITUTIONAL REPOS graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">affiliationGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/08_graph_affiliation</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setCommunityOrganizationGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the COMMUNITY from SELECTED SOURCES graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">communityOrganizationGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/09_graph_comunity_organization</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setFundingGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the FUNDING from SEMANTIC RELATION graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">fundingGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/10_graph_funding</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setCommunitySemRelGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the COMMUNITY from SEMANTIC RELATION graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">communitySemRelGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/11_graph_comunity_sem_rel</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setCountryGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the COUNTRY enriched graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">countryGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/12_graph_country</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setCleanedGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the CLEANED graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">cleanedGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/13_graph_cleaned</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setBlacklistedGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the blacklisted graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">blacklistedGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/core_provision/graph/14_graph_blacklisted</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setBulkTaggingPathMap" type="SetEnvParameter">
<DESCRIPTION>Set the map of paths for the Bulk Tagging</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">bulkTaggingPathMap</PARAM>
<PARAM managedBy="system" name="parameterValue" required="true" type="string">{"author" : "$['author'][*]['fullname']", "title" : "$['title'][*]['value']", "orcid" : "$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']", "contributor" : "$['contributor'][*]['value']", "description" : "$['description'][*]['value']"}</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setPropagationOrganizationCommunityMap" type="SetEnvParameter">
<DESCRIPTION>Set the map of associations organization, community list for the propagation of community to result through organization</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">propagationOrganizationCommunityMap</PARAM>
<PARAM managedBy="system" name="parameterValue" required="true" type="string">{"20|corda__h2020::3fb05a9524c3f790391261347852f638":["mes","euromarine"], "20|corda__h2020::e8dbe14cca9bf6fce09d468872f813f8":["mes","euromarine"], "20|snsf________::9b253f265e3bef5cae6d881fdf61aceb":["mes","euromarine"],"20|rcuk________::e054eea0a47665af8c3656b5785ccf76":["mes","euromarine"],"20|corda__h2020::edc18d67c9b11fb616ca9f6e1db1b151":["mes","euromarine"],"20|rcuk________::d5736d9da90521ddcdc7828a05a85e9a":["mes","euromarine"],"20|corda__h2020::f5d418d3aa1cf817ddefcc3fdc039f27":["mes","euromarine"],"20|snsf________::8fa091f8f25a846779acb4ea97b50aef":["mes","euromarine"],"20|corda__h2020::81e020977211c2c40fae2e1a50bffd71":["mes","euromarine"],"20|corda_______::81e020977211c2c40fae2e1a50bffd71":["mes","euromarine"],"20|snsf________::31d0a100e54e3cdb3c6f52d91e638c78":["mes","euromarine"],"20|corda__h2020::ea379ef91b8cc86f9ac5edc4169292db":["mes","euromarine"],"20|corda__h2020::f75ee2ee48e5cb0ec8c8d30aaa8fef70":["mes","euromarine"],"20|rcuk________::e16010089551a1a9182a94604fc0ea59":["mes","euromarine"],"20|corda__h2020::38531a2cce7c5c347ffc439b07c1f43b":["mes","euromarine"],"20|corda_______::38531a2cce7c5c347ffc439b07c1f43b":["mes","euromarine"],"20|grid________::b2cbbf5eadbbf87d534b022bad3191d7":["mes","euromarine"],"20|snsf________::74730ef1439d7f7636a8be58a6b471b8":["mes","euromarine"],"20|nsf_________::ad72e19043a5a467e35f9b444d11563e":["mes","euromarine"],"20|rcuk________::0fc3e92500290902a2d38ec2445e74c3":["mes","euromarine"],"20|grid________::ad2c29905da0eb3c06b3fa80cacd89ea":["mes","euromarine"],"20|corda__h2020::30b53e4d63d3724f00acb9cbaca40860":["mes","euromarine"],"20|corda__h2020::f60f84bee14ad93f0db0e49af1d5c317":["mes","euromarine"], "20|corda__h2020::7bf251ac3765b5e89d82270a1763d09f":["mes","euromarine"], "20|corda__h2020::65531bd11be9935948c7f2f4db1c1832":["mes","euromarine"], "20|corda__h2020::e0e98f86bbc76638bbb72a8fe2302946":["mes","euromarine"], "20|snsf________::3eb43582ac27601459a8d8b3e195724b":["mes","euromarine"], "20|corda__h2020::af2481dab65d06c8ea0ae02b5517b9b6":["mes","euromarine"], "20|corda__h2020::c19d05cfde69a50d3ebc89bd0ee49929":["mes","euromarine"], "20|corda__h2020::af0bfd9fc09f80d9488f56d71a9832f0":["mes","euromarine"], "20|rcuk________::f33c02afb0dc66c49d0ed97ca5dd5cb0":["beopen"],
"20|grid________::a867f78acdc5041b34acfe4f9a349157":["beopen"], "20|grid________::7bb116a1a9f95ab812bf9d2dea2be1ff":["beopen"], "20|corda__h2020::6ab0e0739dbe625b99a2ae45842164ad":["beopen"], "20|corda__h2020::8ba50792bc5f4d51d79fca47d860c602":["beopen"], "20|corda_______::8ba50792bc5f4d51d79fca47d860c602":["beopen"], "20|corda__h2020::e70e9114979e963eef24666657b807c3":["beopen"], "20|corda_______::e70e9114979e963eef24666657b807c3":["beopen"], "20|corda_______::15911e01e9744d57205825d77c218737":["beopen"], "20|opendoar____::056a41e24e2a9a67215e87bbee6a80ab":["beopen"], "20|opendoar____::7f67f2e6c6fbb0628f8160fcd3d92ae3":["beopen"], "20|grid________::a8ecfd7c084e561168bcbe6bf0daf3e3":["beopen"], "20|corda_______::7bbe6cc5d8ec1864739a04b0d020c9e9":["beopen"], "20|corda_______::3ff558e30c2e434d688539548300b050":["beopen"], "20|corda__h2020::5ffee5b3b83b33a8cf0e046877bd3a39":["beopen"], "20|corda__h2020::5187217e2e806a6df3579c46f82401bc":["beopen"], "20|grid________::5fa7e2709bcd945e26bfa18689adeec1":["beopen"], "20|corda_______::d8696683c53027438031a96ad27c3c07":["beopen"], "20|corda__h2020::d8696683c53027438031a96ad27c3c07":["beopen"], "20|rcuk________::23a79ebdfa59790864e4a485881568c1":["beopen"], "20|corda__h2020::b76cf8fe49590a966953c37e18608af9":["beopen"], "20|grid________::d2f0204126ee709244a488a4cd3b91c2":["beopen"], "20|corda__h2020::05aba9d2ed17533d15221e5655ac11e6":["beopen"], "20|grid________::802401579481dc32062bdee69f5e6a34":["beopen"], "20|corda__h2020::3f6d9d54cac975a517ba6b252c81582d":["beopen"]}
</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setDedupConfig" type="SetEnvParameter">
<DESCRIPTION>Set the dedup orchestrator name</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">dedupConfig</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">decisiontree-dedup-test</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="actionSetsRaw" type="SetEnvParameter">
<DESCRIPTION>declares the ActionSet ids to promote in the RAW graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">actionSetIdsRawGraph</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">scholexplorer-dump,gridac-dump,doiboost-organizations,doiboost,orcidworks-no-doi,iis-wos-entities,iis-entities-software,iis-entities-patent</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="actionSetsIIS" type="SetEnvParameter">
<DESCRIPTION>declares the ActionSet ids to promote in the INFERRED graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">actionSetIdsIISGraph</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">iis-researchinitiative,iis-document-citations,iis-document-affiliation,iis-document-classes,iis-document-similarities,iis-referenced-datasets-main,iis-referenced-datasets-preprocessing,iis-referenced-projects-main,iis-referenced-projects-preprocessing,iis-referenceextraction-pdb,document_software_url,iis-extracted-metadata,iis-communities,iis-referenced-patents,iis-covid-19</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isJoin="true" name="waitConfig">
<DESCRIPTION>wait configurations</DESCRIPTION>
<PARAMETERS/>
<ARCS>
<ARC to="betaAggregatorGraph"/>
<ARC to="prodAggregatorGraph"/>
</ARCS>
</NODE>
<NODE name="betaAggregatorGraph" type="SubmitHadoopJob">
<DESCRIPTION>create the AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'graphOutputPath' : 'betaAggregatorGraphPath',
'isLookupUrl' : 'isLookUpUrl',
'reuseContent' : 'reuseBetaContent',
'contentPath' : 'betaContentPath'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/graph/raw_all/oozie_app',
'mongoURL' : 'mongodb://beta.services.openaire.eu',
'mongoDb' : 'mdstore',
'postgresURL' : 'jdbc:postgresql://beta.services.openaire.eu:5432/dnet_openaireplus',
'postgresUser' : 'dnet',
'postgresPassword' : '',
'workingDir' : '/tmp/core_provision/working_dir/beta_aggregator'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitAggregatorGraph"/>
</ARCS>
</NODE>
<NODE name="prodAggregatorGraph" type="SubmitHadoopJob">
<DESCRIPTION>create the AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'graphOutputPath' : 'prodAggregatorGraphPath',
'isLookupUrl' : 'isLookUpUrl',
'reuseContent' : 'reuseProdContent',
'contentPath' : 'prodContentPath'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/graph/raw_all/oozie_app',
'mongoURL' : 'mongodb://services.openaire.eu',
'mongoDb' : 'mdstore',
'postgresURL' : 'jdbc:postgresql://postgresql.services.openaire.eu:5432/dnet_openaireplus',
'postgresUser' : 'dnet',
'postgresPassword' : '',
'workingDir' : '/tmp/core_provision/working_dir/prod_aggregator'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitAggregatorGraph"/>
</ARCS>
</NODE>
<NODE isJoin="true" name="waitAggregatorGraph">
<DESCRIPTION>wait configurations</DESCRIPTION>
<PARAMETERS/>
<ARCS>
<ARC to="mergeAggregatorGraphs"/>
</ARCS>
</NODE>
<NODE name="mergeAggregatorGraphs" type="SubmitHadoopJob">
<DESCRIPTION>create the AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'betaInputGgraphPath' : 'betaAggregatorGraphPath',
'prodInputGgraphPath' : 'prodAggregatorGraphPath',
'graphOutputPath' : 'mergedGraphPath'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/graph/merge/oozie_app',
'workingDir' : '/tmp/core_provision/working_dir/merge_graph'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="promoteActionsRaw"/>
</ARCS>
</NODE>
<NODE name="promoteActionsRaw" type="SubmitHadoopJob">
<DESCRIPTION>create the RAW graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'inputActionSetIds' : 'actionSetIdsRawGraph',
'inputGraphRootPath' : 'mergedGraphPath',
'outputGraphRootPath' : 'rawGraphPath',
'isLookupUrl' : 'isLookUpUrl'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/actionmanager/wf/main/oozie_app',
'sparkExecutorCores' : '3',
'sparkExecutorMemory' : '10G',
'activePromoteDatasetActionPayload' : 'true',
'activePromoteDatasourceActionPayload' : 'true',
'activePromoteOrganizationActionPayload' : 'true',
'activePromoteOtherResearchProductActionPayload' : 'true',
'activePromoteProjectActionPayload' : 'true',
'activePromotePublicationActionPayload' : 'true',
'activePromoteRelationActionPayload' : 'true',
'activePromoteResultActionPayload' : 'true',
'activePromoteSoftwareActionPayload' : 'true',
'mergeAndGetStrategy' : 'MERGE_FROM_AND_GET',
'workingDir' : '/tmp/core_provision/working_dir/promoteActionsRaw'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="duplicateScan"/>
</ARCS>
</NODE>
<NODE name="duplicateScan" type="SubmitHadoopJob">
<DESCRIPTION>search for duplicates in the raw graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'actionSetId' : 'dedupConfig',
'graphBasePath' : 'rawGraphPath',
'dedupGraphPath': 'dedupGraphPath',
'isLookUpUrl' : 'isLookUpUrl'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/dedup/scan/oozie_app',
'workingPath' : '/tmp/core_provision/working_dir/dedup'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="promoteActionsIIS"/>
</ARCS>
</NODE>
<NODE name="promoteActionsIIS" type="SubmitHadoopJob">
<DESCRIPTION>create the INFERRED graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'inputActionSetIds' : 'actionSetIdsIISGraph',
'inputGraphRootPath' : 'dedupGraphPath',
'outputGraphRootPath' : 'inferredGraphPath',
'isLookupUrl' : 'isLookUpUrl'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/actionmanager/wf/main/oozie_app',
'sparkExecutorCores' : '3',
'sparkExecutorMemory' : '10G',
'activePromoteDatasetActionPayload' : 'true',
'activePromoteDatasourceActionPayload' : 'true',
'activePromoteOrganizationActionPayload' : 'true',
'activePromoteOtherResearchProductActionPayload' : 'true',
'activePromoteProjectActionPayload' : 'true',
'activePromotePublicationActionPayload' : 'true',
'activePromoteRelationActionPayload' : 'true',
'activePromoteResultActionPayload' : 'true',
'activePromoteSoftwareActionPayload' : 'true',
'mergeAndGetStrategy' : 'MERGE_FROM_AND_GET',
'workingDir' : '/tmp/core_provision/working_dir/promoteActionsIIS'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="dedupConsistency"/>
</ARCS>
</NODE>
<NODE name="dedupConsistency" type="SubmitHadoopJob">
<DESCRIPTION>mark duplicates as deleted and redistribute the relationships</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'graphBasePath' : 'inferredGraphPath',
'dedupGraphPath': 'consistentGraphPath'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/dedup/consistency/oozie_app',
'workingPath' : '/tmp/core_provision/working_dir/dedup'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="orcidPropagation"/>
</ARCS>
</NODE>
<NODE name="orcidPropagation" type="SubmitHadoopJob">
<DESCRIPTION>propagates ORCID among results linked by allowedsemrels semantic relationships</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'sourcePath' : 'consistentGraphPath',
'outputPath': 'orcidGraphPath'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/enrichment/orcidtoresultfromsemrel/oozie_app',
'workingDir' : '/tmp/core_provision/working_dir/orcid',
'allowedsemrels' : 'isSupplementedBy;isSupplementTo',
'saveGraph' : 'true'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="bulkTagging"/>
</ARCS>
</NODE>
<NODE name="bulkTagging" type="SubmitHadoopJob">
<DESCRIPTION>mark results respecting some rules as belonging to communities</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'sourcePath' : 'orcidGraphPath',
'outputPath': 'bulkTaggingGraphPath',
'isLookUpUrl' : 'isLookUpUrl',
'pathMap' : 'bulkTaggingPathMap'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/enrichment/bulktag/oozie_app',
'workingDir' : '/tmp/core_provision/working_dir/bulktag'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="affiliationPropagation"/>
</ARCS>
</NODE>
<NODE name="affiliationPropagation" type="SubmitHadoopJob">
<DESCRIPTION>creates relashionships between results and organizations when the organizations are associated to institutional repositories</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'sourcePath' : 'bulkTaggingGraphPath',
'outputPath': 'affiliationGraphPath'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/enrichment/affiliation/oozie_app',
'workingDir' : '/tmp/core_provision/working_dir/affiliation',
'saveGraph' : 'true'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="communityOrganizationPropagation"/>
</ARCS>
</NODE>
<NODE name="communityOrganizationPropagation" type="SubmitHadoopJob">
<DESCRIPTION>marks as belonging to communities the result collected from datasources related to the organizations specified in the organizationCommunityMap</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'sourcePath' : 'affiliationGraphPath',
'outputPath': 'communityOrganizationGraphPath',
'organizationtoresultcommunitymap': 'propagationOrganizationCommunityMap'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/enrichment/community_organization/oozie_app',
'workingDir' : '/tmp/core_provision/working_dir/community_organization',
'saveGraph' : 'true'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="resultProjectPropagation"/>
</ARCS>
</NODE>
<NODE name="resultProjectPropagation" type="SubmitHadoopJob">
<DESCRIPTION>created relation between projects and results linked to other results trough allowedsemrel semantic relations linked to projects</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'sourcePath' : 'communityOrganizationGraphPath',
'outputPath': 'fundingGraphPath'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/enrichment/funding/oozie_app',
'workingDir' : '/tmp/core_provision/working_dir/funding',
'allowedsemrels' : 'isSupplementedBy;isSupplementTo',
'saveGraph' : 'true'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="communitySemrelPropagation"/>
</ARCS>
</NODE>
<NODE name="communitySemrelPropagation" type="SubmitHadoopJob">
<DESCRIPTION>tag as belonging to communitites result in in allowedsemrels relation with other result already linked to communities </DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'sourcePath' : 'fundingGraphPath',
'outputPath': 'communitySemRelGraphPath',
'isLookUpUrl' : 'isLookUpUrl'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/enrichment/community_semrel/oozie_app',
'workingDir' : '/tmp/core_provision/working_dir/community_semrel',
'allowedsemrels' : 'isSupplementedBy;isSupplementTo',
'saveGraph' : 'true'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="countryPropagation"/>
</ARCS>
</NODE>
<NODE name="countryPropagation" type="SubmitHadoopJob">
<DESCRIPTION>associated to results colleced from allowedtypes and those in the whithelist the country of the organization(s) handling the datasource it is collected from </DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'sourcePath' : 'communitySemRelGraphPath',
'outputPath': 'countryGraphPath'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/enrichment/country/oozie_app',
'sparkExecutorCores' : '3',
'sparkExecutorMemory' : '10G',
'workingDir' : '/tmp/core_provision/working_dir/country',
'allowedtypes' : 'pubsrepository::institutional',
'whitelist' : '10|opendoar____::300891a62162b960cf02ce3827bb363c',
'saveGraph' : 'true'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="graphCleaning"/>
</ARCS>
</NODE>
<NODE name="graphCleaning" type="SubmitHadoopJob">
<DESCRIPTION>clean the properties in the graph typed as Qualifier according to the vocabulary indicated in schemeid</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'graphInputPath' : 'countryGraphPath',
'graphOutputPath': 'cleanedGraphPath',
'isLookupUrl': 'isLookUpUrl'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/graph/clean/oozie_app',
'workingPath' : '/tmp/core_provision/working_dir/clean'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="blacklistRelations"/>
</ARCS>
</NODE>
<NODE name="blacklistRelations" type="SubmitHadoopJob">
<DESCRIPTION>removes blacklisted relations </DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'sourcePath' : 'cleanedGraphPath',
'outputPath': 'blacklistedGraphPath'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/enrichment/blacklist/oozie_app',
'workingDir' : '/tmp/core_provision/working_dir/blacklist',
'postgresURL' : 'jdbc:postgresql://beta.services.openaire.eu:5432/dnet_openaireplus',
'postgresUser' : 'dnet',
'postgresPassword' : ''
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="success"/>
</ARCS>
</NODE>
</CONFIGURATION>
<STATUS>
<LAST_EXECUTION_ID>wf_20200615_163630_609</LAST_EXECUTION_ID>
<LAST_EXECUTION_DATE>2020-06-15T17:08:00+00:00</LAST_EXECUTION_DATE>
<LAST_EXECUTION_STATUS>SUCCESS</LAST_EXECUTION_STATUS>
<LAST_EXECUTION_ERROR/>
</STATUS>
</BODY>
</RESOURCE_PROFILE>

View File

@ -7,7 +7,7 @@
<DATE_OF_CREATION value="2020-06-15T16:35:09+00:00"/> <DATE_OF_CREATION value="2020-06-15T16:35:09+00:00"/>
</HEADER> </HEADER>
<BODY> <BODY>
<WORKFLOW_NAME>Graph Construction [OCEAN]</WORKFLOW_NAME> <WORKFLOW_NAME>Graph Construction [PROD]</WORKFLOW_NAME>
<WORKFLOW_TYPE>Data Provision</WORKFLOW_TYPE> <WORKFLOW_TYPE>Data Provision</WORKFLOW_TYPE>
<WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY> <WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
<CONFIGURATION start="manual"> <CONFIGURATION start="manual">
@ -413,7 +413,7 @@
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM> <PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS> </PARAMETERS>
<ARCS> <ARCS>
<ARC to="graphCleaning"/> <ARC to="orcidPropagation"/>
</ARCS> </ARCS>
</NODE> </NODE>

View File

@ -315,7 +315,7 @@
<dependency> <dependency>
<groupId>eu.dnetlib</groupId> <groupId>eu.dnetlib</groupId>
<artifactId>dnet-pace-core</artifactId> <artifactId>dnet-pace-core</artifactId>
<version>4.0.2</version> <version>4.0.4</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.dnetlib</groupId> <groupId>eu.dnetlib</groupId>