merge branch with beta

pull/151/head
Miriam Baglioni 3 years ago
commit 6bd1eca7e0

@ -67,6 +67,7 @@ public class AuthorMerger {
a -> a
.getPid()
.stream()
.filter(Objects::nonNull)
.map(p -> new Tuple2<>(pidToComparableString(p), a)))
.collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1));
@ -78,6 +79,7 @@ public class AuthorMerger {
a -> a
.getPid()
.stream()
.filter(Objects::nonNull)
.filter(p -> !basePidAuthorMap.containsKey(pidToComparableString(p)))
.map(p -> new Tuple2<>(p, a)))
.collect(Collectors.toList());
@ -150,7 +152,7 @@ public class AuthorMerger {
}
private static boolean hasPid(Author a) {
if (a == null || a.getPid() == null || a.getPid().size() == 0)
if (a == null || a.getPid() == null || a.getPid().isEmpty())
return false;
return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue()));
}
@ -159,7 +161,10 @@ public class AuthorMerger {
if (StringUtils.isNotBlank(author.getSurname())) {
return new Person(author.getSurname() + ", " + author.getName(), false);
} else {
return new Person(author.getFullname(), false);
if (StringUtils.isNotBlank(author.getFullname()))
return new Person(author.getFullname(), false);
else
return new Person("", false);
}
}

@ -1,9 +1,10 @@
package eu.dnetlib.dhp.actionmanager.datacite
import org.apache.commons.io.IOUtils
import org.apache.http.client.config.RequestConfig
import org.apache.http.client.methods.{HttpGet, HttpPost, HttpRequestBase, HttpUriRequest}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import org.apache.http.impl.client.{HttpClientBuilder, HttpClients}
import java.io.IOException
@ -56,31 +57,31 @@ abstract class AbstractRestClient extends Iterator[String]{
private def doHTTPRequest[A <: HttpUriRequest](r: A) :String ={
val client = HttpClients.createDefault
val timeout = 60; // seconds
val config = RequestConfig.custom()
.setConnectTimeout(timeout * 1000)
.setConnectionRequestTimeout(timeout * 1000)
.setSocketTimeout(timeout * 1000).build()
val client =HttpClientBuilder.create().setDefaultRequestConfig(config).build()
var tries = 4
try {
while (tries > 0) {
while (tries > 0) {
println(s"requesting ${r.getURI}")
val response = client.execute(r)
println(s"get response with status${response.getStatusLine.getStatusCode}")
if (response.getStatusLine.getStatusCode > 400) {
tries -= 1
try {
val response = client.execute(r)
println(s"get response with status${response.getStatusLine.getStatusCode}")
if (response.getStatusLine.getStatusCode > 400) {
tries -= 1
}
else
return IOUtils.toString(response.getEntity.getContent)
} catch {
case e: Throwable =>
println(s"Error on requesting ${r.getURI}")
e.printStackTrace()
tries-=1
}
else
return IOUtils.toString(response.getEntity.getContent)
}
""
} catch {
case e: Throwable =>
throw new RuntimeException("Error on executing request ", e)
} finally try client.close()
catch {
case e: IOException =>
throw new RuntimeException("Unable to close client ", e)
}
}
}
getBufferData()
}

@ -367,7 +367,7 @@ object DataciteToOAFTransformation {
result.setDateofcollection(ISO8601FORMAT.format(d))
result.setDateoftransformation(ISO8601FORMAT.format(ts))
result.setDateoftransformation(ISO8601FORMAT.format(d))
result.setDataInfo(dataInfo)
val creators = (json \\ "creators").extractOrElse[List[CreatorType]](List())

@ -140,7 +140,7 @@ object ImportDatacite {
private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration, bs:Int): Long = {
var from:Long = timestamp * 1000
val delta:Long = 50000000L
val delta:Long = 100000000L
var client: DataciteAPIImporter = null
val now :Long =System.currentTimeMillis()
var i = 0

@ -0,0 +1,73 @@
package eu.dnetlib.dhp.actionmanager.scholix
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result}
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source
object SparkCreateActionset {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/actionset/generate_actionset.json")).mkString)
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath -> $sourcePath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath")
val workingDirFolder = parser.get("workingDirFolder")
log.info(s"workingDirFolder -> $workingDirFolder")
implicit val oafEncoders: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val resultEncoders: Encoder[Result] = Encoders.kryo[Result]
implicit val relationEncoders: Encoder[Relation] = Encoders.kryo[Relation]
import spark.implicits._
val relation = spark.read.load(s"$sourcePath/relation").as[Relation]
relation.filter(r => (r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
.flatMap(r => List(r.getSource, r.getTarget)).distinct().write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/id_relation")
val idRelation = spark.read.load(s"$workingDirFolder/id_relation").as[String]
log.info("extract source and target Identifier involved in relations")
log.info("save relation filtered")
relation.filter(r => (r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
.write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/actionSetOaf")
log.info("saving entities")
val entities: Dataset[(String, Result)] = spark.read.load(s"$sourcePath/entities/*").as[Result].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING, resultEncoders))
entities.filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result])
entities
.joinWith(idRelation, entities("_1").equalTo(idRelation("value")))
.map(p => p._1._2)
.write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf")
}
}

@ -0,0 +1,86 @@
package eu.dnetlib.dhp.actionmanager.scholix
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.action.AtomicAction
import eu.dnetlib.dhp.schema.oaf.{Oaf, Dataset => OafDataset,Publication, Software, OtherResearchProduct, Relation}
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source
object SparkSaveActionSet {
def toActionSet(item: Oaf): (String, String) = {
val mapper = new ObjectMapper()
item match {
case dataset: OafDataset =>
val a: AtomicAction[OafDataset] = new AtomicAction[OafDataset]
a.setClazz(classOf[OafDataset])
a.setPayload(dataset)
(dataset.getClass.getCanonicalName, mapper.writeValueAsString(a))
case publication: Publication =>
val a: AtomicAction[Publication] = new AtomicAction[Publication]
a.setClazz(classOf[Publication])
a.setPayload(publication)
(publication.getClass.getCanonicalName, mapper.writeValueAsString(a))
case software: Software =>
val a: AtomicAction[Software] = new AtomicAction[Software]
a.setClazz(classOf[Software])
a.setPayload(software)
(software.getClass.getCanonicalName, mapper.writeValueAsString(a))
case orp: OtherResearchProduct =>
val a: AtomicAction[OtherResearchProduct] = new AtomicAction[OtherResearchProduct]
a.setClazz(classOf[OtherResearchProduct])
a.setPayload(orp)
(orp.getClass.getCanonicalName, mapper.writeValueAsString(a))
case relation: Relation =>
val a: AtomicAction[Relation] = new AtomicAction[Relation]
a.setClazz(classOf[Relation])
a.setPayload(relation)
(relation.getClass.getCanonicalName, mapper.writeValueAsString(a))
case _ =>
null
}
}
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/actionset/save_actionset.json")).mkString)
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath -> $sourcePath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath")
implicit val oafEncoders: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val tEncoder: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING)
spark.read.load(sourcePath).as[Oaf]
.map(o => toActionSet(o))
.filter(o => o != null)
.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text, Text]], classOf[GzipCodec])
}
}

@ -4,10 +4,6 @@
<name>mainPath</name>
<description>the working path of Datacite stores</description>
</property>
<property>
<name>oafTargetPath</name>
<description>the target path where the OAF records are stored</description>
</property>
<property>
<name>isLookupUrl</name>
<description>The IS lookUp service endopoint</description>
@ -17,26 +13,15 @@
<value>100</value>
<description>The request block size</description>
</property>
<property>
<name>exportLinks</name>
<value>false</value>
<description>instructs the transformation phase to produce the links or not</description>
</property>
</parameters>
<start to="resume_from"/>
<start to="ImportDatacite"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<decision name="resume_from">
<switch>
<case to="TransformDatacite">${wf:conf('resumeFrom') eq 'TransformDatacite'}</case>
<default to="ImportDatacite"/>
</switch>
</decision>
<action name="ImportDatacite">
<spark xmlns="uri:oozie:spark-action:0.2">
@ -60,11 +45,12 @@
<arg>--master</arg><arg>yarn-cluster</arg>
<arg>--blocksize</arg><arg>${blocksize}</arg>
</spark>
<ok to="TransformDatacite"/>
<ok to="TransformJob"/>
<error to="Kill"/>
</action>
<action name="TransformDatacite">
<action name="TransformJob">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
@ -82,9 +68,9 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${mainPath}/datacite_dump</arg>
<arg>--targetPath</arg><arg>${oafTargetPath}</arg>
<arg>--targetPath</arg><arg>${mainPath}/datacite_oaf</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--exportLinks</arg><arg>${exportLinks}</arg>
<arg>--exportLinks</arg><arg>true</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>

@ -0,0 +1,6 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath","paramDescription": "source path", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingDirFolder","paramDescription": "the working Dir Folder", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath","paramDescription": "the target path ", "paramRequired": true}
]

@ -0,0 +1,23 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

@ -0,0 +1,76 @@
<workflow-app name="Scholexplorer_to_ActionSet_Workflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the path of the consistent graph</description>
</property>
<property>
<name>workingDirFolder</name>
<description>the path of working dir ActionSet</description>
</property>
<property>
<name>outputPath</name>
<description>the path of Scholexplorer ActionSet</description>
</property>
</parameters>
<start to="createActionSet"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="createActionSet">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create Action Set</name>
<class>eu.dnetlib.dhp.actionmanager.scholix.SparkCreateActionset</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--targetPath</arg><arg>${outputPath}</arg>
<arg>--workingDirFolder</arg><arg>${workingDirFolder}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="SaveActionSet"/>
<error to="Kill"/>
</action>
<action name="SaveActionSet">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Save Action Set</name>
<class>eu.dnetlib.dhp.actionmanager.scholix.SparkSaveActionSet</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDirFolder}/actionSetOaf</arg>
<arg>--targetPath</arg><arg>${outputPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

@ -0,0 +1,5 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath","paramDescription": "source path", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath","paramDescription": "the target path ", "paramRequired": true}
]

@ -3,13 +3,14 @@ package eu.dnetlib.dhp.actionmanager.datacite
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest
import eu.dnetlib.dhp.schema.oaf.Oaf
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.junit.jupiter.MockitoExtension
import java.text.SimpleDateFormat
import java.util.Locale
import scala.io.Source
@ExtendWith(Array(classOf[MockitoExtension]))
@ -22,6 +23,18 @@ class DataciteToOAFTest extends AbstractVocabularyTest{
super.setUpVocabulary()
}
@Test
def testDateMapping:Unit = {
val inputDate = "2021-07-14T11:52:54+0000"
val ISO8601FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.US)
val dt = ISO8601FORMAT.parse(inputDate)
println(dt.getTime)
}
@Test
def testMapping() :Unit = {
val record =Source.fromInputStream(getClass.getResourceAsStream("record.json")).mkString

@ -2,6 +2,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.schema.common.ModelConstants;
public class EnrichMissingDatasetIsReferencedBy extends AbstractEnrichMissingDataset {
@ -11,7 +12,7 @@ public class EnrichMissingDatasetIsReferencedBy extends AbstractEnrichMissingDat
@Override
protected boolean filterByType(final String relType) {
return relType.equals("isReferencedBy");
return relType.equals(ModelConstants.IS_REFERENCED_BY);
}
}

@ -2,6 +2,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.schema.common.ModelConstants;
public class EnrichMissingDatasetIsRelatedTo extends AbstractEnrichMissingDataset {
@ -11,7 +12,7 @@ public class EnrichMissingDatasetIsRelatedTo extends AbstractEnrichMissingDatase
@Override
protected boolean filterByType(final String relType) {
return relType.equals("isRelatedTo");
return relType.equals(ModelConstants.IS_RELATED_TO);
}
}

@ -2,6 +2,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.schema.common.ModelConstants;
public class EnrichMissingDatasetIsSupplementedBy extends AbstractEnrichMissingDataset {
@ -11,7 +12,7 @@ public class EnrichMissingDatasetIsSupplementedBy extends AbstractEnrichMissingD
@Override
protected boolean filterByType(final String relType) {
return relType.equals("isSupplementedBy");
return relType.equals(ModelConstants.IS_SUPPLEMENTED_BY);
}
}

@ -2,6 +2,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.schema.common.ModelConstants;
public class EnrichMissingDatasetIsSupplementedTo extends AbstractEnrichMissingDataset {
@ -11,7 +12,7 @@ public class EnrichMissingDatasetIsSupplementedTo extends AbstractEnrichMissingD
@Override
protected boolean filterByType(final String relType) {
return relType.equals("isSupplementedTo");
return relType.equals(ModelConstants.IS_SUPPLEMENT_TO);
}
}

@ -2,6 +2,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.schema.common.ModelConstants;
public class EnrichMissingDatasetReferences extends AbstractEnrichMissingDataset {
@ -11,7 +12,7 @@ public class EnrichMissingDatasetReferences extends AbstractEnrichMissingDataset
@Override
protected boolean filterByType(final String relType) {
return relType.equals("references");
return relType.equals(ModelConstants.REFERENCES);
}
}

@ -2,6 +2,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.relatedPublications;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.schema.common.ModelConstants;
public class EnrichMissingPublicationIsReferencedBy extends AbstractEnrichMissingPublication {
@ -11,6 +12,6 @@ public class EnrichMissingPublicationIsReferencedBy extends AbstractEnrichMissin
@Override
protected boolean filterByType(final String relType) {
return relType.equals("isReferencedBy");
return relType.equals(ModelConstants.IS_REFERENCED_BY);
}
}

@ -2,6 +2,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.relatedPublications;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.schema.common.ModelConstants;
public class EnrichMissingPublicationIsRelatedTo extends AbstractEnrichMissingPublication {
@ -11,7 +12,7 @@ public class EnrichMissingPublicationIsRelatedTo extends AbstractEnrichMissingPu
@Override
protected boolean filterByType(final String relType) {
return relType.equals("isRelatedTo");
return relType.equals(ModelConstants.IS_RELATED_TO);
}
}

@ -2,6 +2,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.relatedPublications;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.schema.common.ModelConstants;
public class EnrichMissingPublicationIsSupplementedBy extends AbstractEnrichMissingPublication {
@ -11,6 +12,6 @@ public class EnrichMissingPublicationIsSupplementedBy extends AbstractEnrichMiss
@Override
protected boolean filterByType(final String relType) {
return relType.equals("isSupplementedBy");
return relType.equals(ModelConstants.IS_SUPPLEMENTED_BY);
}
}

@ -2,6 +2,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.relatedPublications;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.schema.common.ModelConstants;
public class EnrichMissingPublicationIsSupplementedTo extends AbstractEnrichMissingPublication {
@ -11,7 +12,7 @@ public class EnrichMissingPublicationIsSupplementedTo extends AbstractEnrichMiss
@Override
protected boolean filterByType(final String relType) {
return relType.equals("isSupplementedTo");
return relType.equals(ModelConstants.IS_SUPPLEMENT_TO);
}
}

@ -2,6 +2,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.relatedPublications;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.schema.common.ModelConstants;
public class EnrichMissingPublicationReferences extends AbstractEnrichMissingPublication {
@ -11,7 +12,7 @@ public class EnrichMissingPublicationReferences extends AbstractEnrichMissingPub
@Override
protected boolean filterByType(final String relType) {
return relType.equals("references");
return relType.equals(ModelConstants.REFERENCES);
}
}

@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class ClusterUtils {
@ -52,15 +53,15 @@ public class ClusterUtils {
}
public static boolean isDedupRoot(final String id) {
return id.contains("dedup_wf_");
return id.contains("dedup");
}
public static final boolean isValidResultResultClass(final String s) {
return s.equals("isReferencedBy")
|| s.equals("isRelatedTo")
|| s.equals("references")
|| s.equals("isSupplementedBy")
|| s.equals("isSupplementedTo");
return s.equals(ModelConstants.IS_REFERENCED_BY)
|| s.equals(ModelConstants.IS_RELATED_TO)
|| s.equals(ModelConstants.REFERENCES)
|| s.equals(ModelConstants.IS_SUPPLEMENTED_BY)
|| s.equals(ModelConstants.IS_SUPPLEMENT_TO);
}
public static <T> T incrementAccumulator(final T o, final LongAccumulator acc) {

@ -23,6 +23,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
@ -77,48 +78,54 @@ public class SparkUpdateEntity extends AbstractSparkAction {
(type, clazz) -> {
final String outputPath = dedupGraphPath + "/" + type;
removeOutputDir(spark, outputPath);
JavaRDD<String> sourceEntity = sc
.textFile(DedupUtility.createEntityPath(graphBasePath, type.toString()));
if (mergeRelExists(workingPath, type.toString())) {
final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, "*", type.toString());
final String dedupRecordPath = DedupUtility
.createDedupRecordPath(workingPath, "*", type.toString());
final Dataset<Relation> rel = spark.read().load(mergeRelPath).as(Encoders.bean(Relation.class));
final JavaPairRDD<String, String> mergedIds = rel
.where("relClass == 'merges'")
.where("source != target")
.select(rel.col("target"))
.distinct()
.toJavaRDD()
.mapToPair(
(PairFunction<Row, String, String>) r -> new Tuple2<>(r.getString(0), "d"));
JavaPairRDD<String, String> entitiesWithId = sourceEntity
.mapToPair(
(PairFunction<String, String, String>) s -> new Tuple2<>(
MapDocumentUtil.getJPathString(IDJSONPATH, s), s));
if (type == EntityType.organization) // exclude root records from organizations
entitiesWithId = excludeRootOrgs(entitiesWithId, rel);
JavaRDD<String> map = entitiesWithId
.leftOuterJoin(mergedIds)
.map(k -> {
if (k._2()._2().isPresent()) {
return updateDeletedByInference(k._2()._1(), clazz);
}
return k._2()._1();
});
sourceEntity = map.union(sc.textFile(dedupRecordPath));
final String ip = DedupUtility.createEntityPath(graphBasePath, type.toString());
if (HdfsSupport.exists(ip, sc.hadoopConfiguration())) {
JavaRDD<String> sourceEntity = sc
.textFile(DedupUtility.createEntityPath(graphBasePath, type.toString()));
if (mergeRelExists(workingPath, type.toString())) {
final String mergeRelPath = DedupUtility
.createMergeRelPath(workingPath, "*", type.toString());
final String dedupRecordPath = DedupUtility
.createDedupRecordPath(workingPath, "*", type.toString());
final Dataset<Relation> rel = spark
.read()
.load(mergeRelPath)
.as(Encoders.bean(Relation.class));
final JavaPairRDD<String, String> mergedIds = rel
.where("relClass == 'merges'")
.where("source != target")
.select(rel.col("target"))
.distinct()
.toJavaRDD()
.mapToPair(
(PairFunction<Row, String, String>) r -> new Tuple2<>(r.getString(0), "d"));
JavaPairRDD<String, String> entitiesWithId = sourceEntity
.mapToPair(
(PairFunction<String, String, String>) s -> new Tuple2<>(
MapDocumentUtil.getJPathString(IDJSONPATH, s), s));
if (type == EntityType.organization) // exclude root records from organizations
entitiesWithId = excludeRootOrgs(entitiesWithId, rel);
JavaRDD<String> map = entitiesWithId
.leftOuterJoin(mergedIds)
.map(k -> {
if (k._2()._2().isPresent()) {
return updateDeletedByInference(k._2()._1(), clazz);
}
return k._2()._1();
});
sourceEntity = map.union(sc.textFile(dedupRecordPath));
}
sourceEntity.saveAsTextFile(outputPath, GzipCodec.class);
}
sourceEntity.saveAsTextFile(outputPath, GzipCodec.class);
});
}

@ -1,12 +1,16 @@
package eu.dnetlib.doiboost
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import eu.dnetlib.dhp.schema.action.AtomicAction
import eu.dnetlib.dhp.schema.oaf.{AccessRight, DataInfo, Dataset, Field, Instance, KeyValue, Oaf, Organization, Publication, Qualifier, Relation, Result, StructuredProperty}
import eu.dnetlib.dhp.schema.oaf.{AccessRight, DataInfo, Dataset, Field, Instance, KeyValue, Oaf, OpenAccessRoute, Organization, Publication, Qualifier, Relation, Result, StructuredProperty}
import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.commons.lang3.StringUtils
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils
import eu.dnetlib.doiboost.DoiBoostMappingUtil.{getClosedAccessQualifier, getEmbargoedAccessQualifier, getUnknownQualifier}
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
@ -118,14 +122,92 @@ object DoiBoostMappingUtil {
}
def decideAccessRight(lic : Field[String], date:String) : AccessRight = {
if(lic == null){
//Default value Unknown
return getUnknownQualifier()
}
val license : String = lic.getValue
//CC licenses
if(license.startsWith("cc") ||
license.startsWith("http://creativecommons.org/licenses") ||
license.startsWith("https://creativecommons.org/licenses") ||
//ACS Publications Author choice licenses (considered OPEN also by Unpaywall)
license.equals("http://pubs.acs.org/page/policy/authorchoice_ccby_termsofuse.html") ||
license.equals("http://pubs.acs.org/page/policy/authorchoice_termsofuse.html") ||
license.equals("http://pubs.acs.org/page/policy/authorchoice_ccbyncnd_termsofuse.html") ||
//APA (considered OPEN also by Unpaywall)
license.equals("http://www.apa.org/pubs/journals/resources/open-access.aspx")){
val oaq : AccessRight = getOpenAccessQualifier()
oaq.setOpenAccessRoute(OpenAccessRoute.hybrid)
return oaq
}
//OUP (BUT ONLY AFTER 12 MONTHS FROM THE PUBLICATION DATE, OTHERWISE THEY ARE EMBARGOED)
if(license.equals("https://academic.oup.com/journals/pages/open_access/funder_policies/chorus/standard_publication_model")){
val now = java.time.LocalDate.now
try{
val pub_date = LocalDate.parse(date, DateTimeFormatter.ofPattern("yyyy-MM-dd"))
if (((now.toEpochDay - pub_date.toEpochDay)/365.0) > 1){
val oaq : AccessRight = getOpenAccessQualifier()
oaq.setOpenAccessRoute(OpenAccessRoute.hybrid)
return oaq
}
else{
return getEmbargoedAccessQualifier()
}
}catch {
case e: Exception => {
try{
val pub_date = LocalDate.parse(date, DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'"))
if (((now.toEpochDay - pub_date.toEpochDay)/365.0) > 1){
val oaq : AccessRight = getOpenAccessQualifier()
oaq.setOpenAccessRoute(OpenAccessRoute.hybrid)
return oaq
}
else{
return getEmbargoedAccessQualifier()
}
}catch{
case ex: Exception => return getClosedAccessQualifier()
}
}
}
}
return getClosedAccessQualifier()
}
def getOpenAccessQualifier():AccessRight = {
OafMapperUtils.accessRight("OPEN","Open Access", ModelConstants.DNET_ACCESS_MODES, ModelConstants.DNET_ACCESS_MODES)
OafMapperUtils.accessRight(ModelConstants.ACCESS_RIGHT_OPEN,"Open Access", ModelConstants.DNET_ACCESS_MODES, ModelConstants.DNET_ACCESS_MODES)
}
def getRestrictedQualifier():AccessRight = {
OafMapperUtils.accessRight("RESTRICTED","Restricted",ModelConstants.DNET_ACCESS_MODES, ModelConstants.DNET_ACCESS_MODES)
OafMapperUtils.accessRight( "RESTRICTED","Restricted",ModelConstants.DNET_ACCESS_MODES, ModelConstants.DNET_ACCESS_MODES)
}
def getUnknownQualifier():AccessRight = {
OafMapperUtils.accessRight(ModelConstants.UNKNOWN, ModelConstants.NOT_AVAILABLE,ModelConstants.DNET_ACCESS_MODES, ModelConstants.DNET_ACCESS_MODES)
}
def getEmbargoedAccessQualifier():AccessRight = {
OafMapperUtils.accessRight("EMBARGO","Embargo",ModelConstants.DNET_ACCESS_MODES, ModelConstants.DNET_ACCESS_MODES)
}
def getClosedAccessQualifier():AccessRight = {
OafMapperUtils.accessRight("CLOSED","Closed Access", ModelConstants.DNET_ACCESS_MODES, ModelConstants.DNET_ACCESS_MODES)
}
@ -150,10 +232,11 @@ object DoiBoostMappingUtil {
if (item != null) {
hb.setValue(item.officialname)
hb.setKey(generateDSId(item.id))
if (item.openAccess)
if (item.openAccess) {
i.setAccessright(getOpenAccessQualifier())
val ar = getOpenAccessQualifier()
publication.setBestaccessright(OafMapperUtils.qualifier(ar.getClassid, ar.getClassname, ar.getSchemeid, ar.getSchemename))
i.getAccessright.setOpenAccessRoute(OpenAccessRoute.gold)
}
}
else {
hb = ModelConstants.UNKNOWN_REPOSITORY
@ -161,17 +244,8 @@ object DoiBoostMappingUtil {
i.setHostedby(hb)
})
val ar = publication.getInstance().asScala.filter(i => i.getInstancetype != null && i.getAccessright!= null && i.getAccessright.getClassid!= null).map(f=> f.getAccessright.getClassid)
if (ar.nonEmpty) {
if(ar.contains(ModelConstants.ACCESS_RIGHT_OPEN)){
val ar = getOpenAccessQualifier()
publication.setBestaccessright(OafMapperUtils.qualifier(ar.getClassid, ar.getClassname, ar.getSchemeid, ar.getSchemename))
}
else {
val ar = getRestrictedQualifier()
publication.setBestaccessright(OafMapperUtils.qualifier(ar.getClassid, ar.getClassname, ar.getSchemeid, ar.getSchemename))
}
}
publication.setBestaccessright(OafMapperUtils.createBestAccessRights(publication.getInstance()))
publication
}

@ -4,7 +4,7 @@ import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf._
import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, OafMapperUtils}
import eu.dnetlib.dhp.utils.DHPUtils
import eu.dnetlib.doiboost.DoiBoostMappingUtil._
import eu.dnetlib.doiboost.DoiBoostMappingUtil.{decideAccessRight, _}
import org.apache.commons.lang.StringUtils
import org.json4s
import org.json4s.DefaultFormats
@ -168,12 +168,22 @@ case object Crossref2Oaf {
// Mapping instance
val instance = new Instance()
val license = for {
JString(lic) <- json \ "license" \ "URL"
} yield asField(lic)
val l = license.filter(d => StringUtils.isNotBlank(d.getValue))
if (l.nonEmpty)
instance.setLicense(l.head)
JObject(license) <- json \ "license"
JField("URL", JString(lic)) <- license
JField("content-version", JString(content_version)) <- license
} yield (asField(lic), content_version)
val l = license.filter(d => StringUtils.isNotBlank(d._1.getValue))
if (l.nonEmpty){
if (l exists (d => d._2.equals("vor"))){
for(d <- l){
if (d._2.equals("vor")){
instance.setLicense(d._1)
}
}
}
else{
instance.setLicense(l.head._1)}
}
// Ticket #6281 added pid to Instance
instance.setPid(result.getPid)
@ -185,7 +195,7 @@ case object Crossref2Oaf {
OafMapperUtils.qualifier("0001", "peerReviewed", ModelConstants.DNET_REVIEW_LEVELS, ModelConstants.DNET_REVIEW_LEVELS))
}
instance.setAccessright(getRestrictedQualifier())
instance.setAccessright(decideAccessRight(instance.getLicense, result.getDateofacceptance.getValue))
instance.setInstancetype(OafMapperUtils.qualifier(cobjCategory.substring(0, 4), cobjCategory.substring(5), ModelConstants.DNET_PUBLICATION_RESOURCE, ModelConstants.DNET_PUBLICATION_RESOURCE))
result.setResourcetype(OafMapperUtils.qualifier(cobjCategory.substring(0, 4), cobjCategory.substring(5), ModelConstants.DNET_PUBLICATION_RESOURCE, ModelConstants.DNET_PUBLICATION_RESOURCE))

@ -11,6 +11,7 @@ import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
import eu.dnetlib.doiboost.DoiBoostMappingUtil._
import eu.dnetlib.doiboost.uw.UnpayWallToOAF.get_unpaywall_color
@ -23,6 +24,21 @@ case class OALocation(evidence:Option[String], host_type:Option[String], is_best
object UnpayWallToOAF {
val logger: Logger = LoggerFactory.getLogger(getClass)
def get_unpaywall_color(input:String):Option[OpenAccessRoute] = {
if(input == null || input.equalsIgnoreCase("close"))
return None
if(input.equalsIgnoreCase("green"))
return Some(OpenAccessRoute.green)
if(input.equalsIgnoreCase("bronze"))
return Some(OpenAccessRoute.bronze)
if(input.equalsIgnoreCase("hybrid"))
return Some(OpenAccessRoute.hybrid)
else
return Some(OpenAccessRoute.gold)
}
def get_color(is_oa:Boolean, location: OALocation, journal_is_oa:Boolean):Option[OpenAccessRoute] = {
if (is_oa) {
if (location.host_type.isDefined) {
@ -65,7 +81,7 @@ object UnpayWallToOAF {
val oaLocation:OALocation = (json \ "best_oa_location").extractOrElse[OALocation](null)
val colour = get_color(is_oa, oaLocation, journal_is_oa)
val colour = get_unpaywall_color((json \ "oa_status").extractOrElse[String](null))
pub.setCollectedfrom(List(createUnpayWallCollectedFrom()).asJava)
pub.setDataInfo(generateDataInfo())

@ -1,4 +1,4 @@
<workflow-app name="Generate DOIBoost ActionSet" xmlns="uri:oozie:workflow:0.5">
<workflow-app name="Generate DOIBoost ActionSet for BETA - PREPROCESS" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sparkDriverMemory</name>

@ -1,4 +1,4 @@
<workflow-app name="Generate DOIBoost ActionSet for PROD" xmlns="uri:oozie:workflow:0.5">
<workflow-app name="Generate DOIBoost ActionSet for BETA - PROCESS" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sparkDriverMemory</name>
@ -99,7 +99,7 @@
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.sql.shuffle.partitions=7680
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -124,7 +124,7 @@
--executor-memory=${sparkExecutorIntersectionMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.sql.shuffle.partitions=7680
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}

@ -492,6 +492,124 @@ class CrossrefMappingTest {
}
@Test
def testLicenseVorClosed() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("publication_license_vor.json")).mkString
assertNotNull(json)
assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json)
assertTrue(resultList.nonEmpty)
val item : Result = resultList.filter(p => p.isInstanceOf[Result]).head.asInstanceOf[Result]
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
println(mapper.writeValueAsString(item))
assertTrue(item.getInstance().asScala exists (i => i.getLicense.getValue.equals("https://www.springer.com/vor")))
assertTrue(item.getInstance().asScala exists (i => i.getAccessright.getClassid.equals("CLOSED")))
assertTrue(item.getInstance().asScala exists (i => i.getAccessright.getOpenAccessRoute == null))
}
@Test
def testLicenseOpen() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("publication_license_open.json")).mkString
assertNotNull(json)
assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json)
assertTrue(resultList.nonEmpty)
val item : Result = resultList.filter(p => p.isInstanceOf[Result]).head.asInstanceOf[Result]
assertTrue(item.getInstance().asScala exists (i => i.getLicense.getValue.equals("http://pubs.acs.org/page/policy/authorchoice_ccby_termsofuse.html")))
assertTrue(item.getInstance().asScala exists (i => i.getAccessright.getClassid.equals("OPEN")))
assertTrue(item.getInstance().asScala exists (i => i.getAccessright.getOpenAccessRoute == OpenAccessRoute.hybrid))
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
println(mapper.writeValueAsString(item))
}
@Test
def testLicenseEmbargoOpen() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("publication_license_embargo_open.json")).mkString
assertNotNull(json)
assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json)
assertTrue(resultList.nonEmpty)
val item : Result = resultList.filter(p => p.isInstanceOf[Result]).head.asInstanceOf[Result]
assertTrue(item.getInstance().asScala exists (i => i.getLicense.getValue.equals("https://academic.oup.com/journals/pages/open_access/funder_policies/chorus/standard_publication_model")))
assertTrue(item.getInstance().asScala exists (i => i.getAccessright.getClassid.equals("OPEN")))
assertTrue(item.getInstance().asScala exists (i => i.getAccessright.getOpenAccessRoute == OpenAccessRoute.hybrid))
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
println(mapper.writeValueAsString(item))
}
@Test
def testLicenseEmbargo() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("publication_license_embargo.json")).mkString
assertNotNull(json)
assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json)
assertTrue(resultList.nonEmpty)
val item : Result = resultList.filter(p => p.isInstanceOf[Result]).head.asInstanceOf[Result]
assertTrue(item.getInstance().asScala exists (i => i.getLicense.getValue.equals("https://academic.oup.com/journals/pages/open_access/funder_policies/chorus/standard_publication_model")))
assertTrue(item.getInstance().asScala exists (i => i.getAccessright.getClassid.equals("EMBARGO")))
assertTrue(item.getInstance().asScala exists (i => i.getAccessright.getOpenAccessRoute == null))
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
println(mapper.writeValueAsString(item))
}
@Test
def testLicenseEmbargoDateTime() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("publication_license_embargo_datetime.json")).mkString
assertNotNull(json)
assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json)
assertTrue(resultList.nonEmpty)
val item : Result = resultList.filter(p => p.isInstanceOf[Result]).head.asInstanceOf[Result]
assertTrue(item.getInstance().asScala exists (i => i.getLicense.getValue.equals("https://academic.oup.com/journals/pages/open_access/funder_policies/chorus/standard_publication_model")))
assertTrue(item.getInstance().asScala exists (i => i.getAccessright.getClassid.equals("EMBARGO")))
assertTrue(item.getInstance().asScala exists (i => i.getAccessright.getOpenAccessRoute == null))
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
println(mapper.writeValueAsString(item))
}
}

@ -65,7 +65,6 @@ class MAGMappingTest {
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.set("spark.driver.host", "localhost")
val spark: SparkSession =
SparkSession
.builder()
@ -93,7 +92,6 @@ class MAGMappingTest {
implicit val formats = DefaultFormats
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.set("spark.driver.host", "localhost")
@ -103,7 +101,6 @@ class MAGMappingTest {
.appName(getClass.getSimpleName)
.config(conf)
.getOrCreate()
val path = getClass.getResource("duplicatedMagPapers.json").getPath
import org.apache.spark.sql.Encoders

@ -0,0 +1,42 @@
package eu.dnetlib.dhp.sx.graph
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, OtherResearchProduct, Publication, Result, Software, Dataset => OafDataset}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
object SparkConvertDatasetToJsonRDD {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath -> $sourcePath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath")
val resultObject = List("publication","dataset","software", "otherResearchProduct")
val mapper = new ObjectMapper()
implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
resultObject.foreach{item =>
spark.read.load(s"$sourcePath/$item").as[Result].map(r=> mapper.writeValueAsString(r))(Encoders.STRING).rdd.saveAsTextFile(s"$targetPath/${item.toLowerCase}", classOf[GzipCodec])
}
}
}

@ -0,0 +1,67 @@
package eu.dnetlib.dhp.sx.graph
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset}
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
object SparkConvertRDDtoDataset {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath -> $sourcePath")
val t = parser.get("targetPath")
log.info(s"targetPath -> $t")
val entityPath = s"$t/entities"
val relPath = s"$t/relation"
val mapper = new ObjectMapper()
implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset])
implicit val publicationEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication])
implicit val relationEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
implicit val orpEncoder: Encoder[OtherResearchProduct] = Encoders.kryo(classOf[OtherResearchProduct])
implicit val softwareEncoder: Encoder[Software] = Encoders.kryo(classOf[Software])
log.info("Converting dataset")
val rddDataset =spark.sparkContext.textFile(s"$sourcePath/dataset").map(s => mapper.readValue(s, classOf[OafDataset]))
spark.createDataset(rddDataset).as[OafDataset].write.mode(SaveMode.Overwrite).save(s"$entityPath/dataset")
log.info("Converting publication")
val rddPublication =spark.sparkContext.textFile(s"$sourcePath/publication").map(s => mapper.readValue(s, classOf[Publication]))
spark.createDataset(rddPublication).as[Publication].write.mode(SaveMode.Overwrite).save(s"$entityPath/publication")
log.info("Converting software")
val rddSoftware =spark.sparkContext.textFile(s"$sourcePath/software").map(s => mapper.readValue(s, classOf[Software]))
spark.createDataset(rddSoftware).as[Software].write.mode(SaveMode.Overwrite).save(s"$entityPath/software")
log.info("Converting otherresearchproduct")
val rddOtherResearchProduct =spark.sparkContext.textFile(s"$sourcePath/otherresearchproduct").map(s => mapper.readValue(s, classOf[OtherResearchProduct]))
spark.createDataset(rddOtherResearchProduct).as[OtherResearchProduct].write.mode(SaveMode.Overwrite).save(s"$entityPath/otherresearchproduct")
log.info("Converting Relation")
val rddRelation =spark.sparkContext.textFile(s"$sourcePath/relation").map(s => mapper.readValue(s, classOf[Relation]))
spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath")
}
}

@ -70,7 +70,7 @@ object SparkCreateInputGraph {
resultObject.foreach { r =>
log.info(s"Make ${r._1} unique")
makeDatasetUnique(s"$targetPath/extracted/${r._1}",s"$targetPath/dedup/${r._1}",spark, r._2)
makeDatasetUnique(s"$targetPath/extracted/${r._1}",s"$targetPath/preprocess/${r._1}",spark, r._2)
}
}

@ -42,6 +42,7 @@ object SparkCreateScholix {
val relationDS: Dataset[(String, Relation)] = spark.read.load(relationPath).as[Relation]
.filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
.map(r => (r.getSource, r))(Encoders.tuple(Encoders.STRING, relEncoder))
val summaryDS: Dataset[(String, ScholixSummary)] = spark.read.load(summaryPath).as[ScholixSummary]

@ -1,7 +1,7 @@
package eu.dnetlib.dhp.sx.graph
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Result
import eu.dnetlib.dhp.schema.oaf.{Oaf, Result}
import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary
import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils
import org.apache.commons.io.IOUtils
@ -29,11 +29,12 @@ object SparkCreateSummaryObject {
log.info(s"targetPath -> $targetPath")
implicit val resultEncoder:Encoder[Result] = Encoders.kryo[Result]
implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val summaryEncoder:Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary]
val ds:Dataset[Result] = spark.read.load(s"$sourcePath/*").as[Result]
val ds:Dataset[Result] = spark.read.load(s"$sourcePath/*").as[Result].filter(r=>r.getDataInfo== null || r.getDataInfo.getDeletedbyinference== false)
ds.repartition(6000).map(r => ScholixUtils.resultToSummary(r)).filter(s => s!= null).write.mode(SaveMode.Overwrite).save(targetPath)

@ -1,10 +1,17 @@
package eu.dnetlib.dhp.sx.graph
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Relation, Result}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JField, JObject, JString}
import org.json4s.jackson.JsonMethods.parse
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
@ -25,60 +32,109 @@ object SparkResolveRelation {
val relationPath = parser.get("relationPath")
log.info(s"sourcePath -> $relationPath")
val entityPath = parser.get("entityPath")
log.info(s"targetPath -> $entityPath")
log.info(s"entityPath -> $entityPath")
val workingPath = parser.get("workingPath")
log.info(s"workingPath -> $workingPath")
implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
import spark.implicits._
val entities:Dataset[Result] = spark.read.load(s"$entityPath/*").as[Result]
entities.flatMap(e => e.getPid.asScala
.map(p =>
convertPidToDNETIdentifier(p.getValue, p.getQualifier.getClassid))
.filter(s => s!= null)
.map(s => (s,e.getId))
).groupByKey(_._1)
.reduceGroups((x,y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y)
.map(s =>s._2)
.write
.mode(SaveMode.Overwrite)
.save(s"$workingPath/resolvedPid")
val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/resolvedPid").as[(String,String)]
extractPidResolvedTableFromJsonRDD(spark, entityPath, workingPath)
val mappper = new ObjectMapper()
val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String,String)]
val relationDs:Dataset[(String,Relation)] = spark.read.load(relationPath).as[Relation].map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_1")), "left").map{
relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_2")), "left").map{
m =>
val sourceResolved = m._2
val currentRelation = m._1._2
if (sourceResolved!=null && sourceResolved._2.nonEmpty)
currentRelation.setSource(sourceResolved._2)
if (sourceResolved!=null && sourceResolved._1!=null && sourceResolved._1.nonEmpty)
currentRelation.setSource(sourceResolved._1)
currentRelation
}.write
.mode(SaveMode.Overwrite)
.save(s"$workingPath/resolvedSource")
.save(s"$workingPath/relationResolvedSource")
val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/resolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_1")), "left").map{
val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/relationResolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_2")), "left").map{
m =>
val targetResolved = m._2
val currentRelation = m._1._2
if (targetResolved!=null && targetResolved._2.nonEmpty)
currentRelation.setTarget(targetResolved._2)
if (targetResolved!=null && targetResolved._1.nonEmpty)
currentRelation.setTarget(targetResolved._1)
currentRelation
}.filter(r => r.getSource.startsWith("50")&& r.getTarget.startsWith("50"))
.write
.mode(SaveMode.Overwrite)
.save(s"$workingPath/resolvedRelation")
.save(s"$workingPath/relation_resolved")
spark.read.load(s"$workingPath/relation_resolved").as[Relation]
.map(r => mappper.writeValueAsString(r))
.rdd.saveAsTextFile(s"$workingPath/relation", classOf[GzipCodec])
}
private def extractPidsFromRecord(input:String):(String,List[(String,String)]) = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input)
val id:String = (json \ "id").extract[String]
val result: List[(String,String)] = for {
JObject(pids) <- json \ "pid"
JField("value", JString(pidValue)) <- pids
JField("qualifier", JObject(qualifier)) <- pids
JField("classname", JString(pidType)) <- qualifier
} yield (pidValue, pidType)
(id,result)
}
private def extractPidResolvedTableFromJsonRDD(spark: SparkSession, entityPath: String, workingPath: String) = {
import spark.implicits._
val d: RDD[(String,String)] = spark.sparkContext.textFile(s"$entityPath/*")
.map(i => extractPidsFromRecord(i))
.filter(s => s != null && s._1!= null && s._2!=null && s._2.nonEmpty)
.flatMap{ p =>
p._2.map(pid =>
(p._1, convertPidToDNETIdentifier(pid._1, pid._2))
)
}.filter(r =>r._1 != null || r._2 != null)
spark.createDataset(d)
.groupByKey(_._2)
.reduceGroups((x, y) => if (x._1.startsWith("50|doi") || x._1.startsWith("50|pmid")) x else y)
.map(s => s._2)
.write
.mode(SaveMode.Overwrite)
.save(s"$workingPath/relationResolvedPid")
}
/*
This method should be used once we finally convert everythings in Kryo dataset
instead of using rdd of json
*/
private def extractPidResolvedTableFromKryo(spark: SparkSession, entityPath: String, workingPath: String) = {
import spark.implicits._
implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
val entities: Dataset[Result] = spark.read.load(s"$entityPath/*").as[Result]
entities.flatMap(e => e.getPid.asScala
.map(p =>
convertPidToDNETIdentifier(p.getValue, p.getQualifier.getClassid))
.filter(s => s != null)
.map(s => (s, e.getId))
).groupByKey(_._1)
.reduceGroups((x, y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y)
.map(s => s._2)
.write
.mode(SaveMode.Overwrite)
.save(s"$workingPath/relationResolvedPid")
}
def convertPidToDNETIdentifier(pid:String, pidType: String):String = {
if (pid==null || pid.isEmpty || pidType== null || pidType.isEmpty)

@ -199,7 +199,7 @@ object BioDBToOAF {
d.setDateofacceptance(OafMapperUtils.field(i_date.get.date, DATA_INFO))
}
val relevant_dates: List[StructuredProperty] = dates.filter(d => !d.date_info.contains("entry version"))
.map(date => OafMapperUtils.structuredProperty(date.date, "UNKNOWN", "UNKNOWN", ModelConstants.DNET_DATACITE_DATE, ModelConstants.DNET_DATACITE_DATE, DATA_INFO))
.map(date => OafMapperUtils.structuredProperty(date.date, ModelConstants.UNKNOWN, ModelConstants.UNKNOWN, ModelConstants.DNET_DATACITE_DATE, ModelConstants.DNET_DATACITE_DATE, DATA_INFO))
if (relevant_dates != null && relevant_dates.nonEmpty)
d.setRelevantdate(relevant_dates.asJava)
d.setDateofacceptance(OafMapperUtils.field(i_date.get.date, DATA_INFO))
@ -218,12 +218,12 @@ object BioDBToOAF {
if (references_pmid != null && references_pmid.nonEmpty) {
val rel = createRelation(references_pmid.head, "pmid", d.getId, collectedFromMap("uniprot"), "relationship", "isRelatedTo", if (i_date.isDefined) i_date.get.date else null)
val rel = createRelation(references_pmid.head, "pmid", d.getId, collectedFromMap("uniprot"), ModelConstants.RELATIONSHIP, ModelConstants.IS_RELATED_TO, if (i_date.isDefined) i_date.get.date else null)
rel.getCollectedfrom
List(d, rel)
}
else if (references_doi != null && references_doi.nonEmpty) {
val rel = createRelation(references_doi.head, "doi", d.getId, collectedFromMap("uniprot"), "relationship", "isRelatedTo", if (i_date.isDefined) i_date.get.date else null)
val rel = createRelation(references_doi.head, "doi", d.getId, collectedFromMap("uniprot"), ModelConstants.RELATIONSHIP, ModelConstants.IS_RELATED_TO, if (i_date.isDefined) i_date.get.date else null)
List(d, rel)
}
else
@ -243,7 +243,7 @@ object BioDBToOAF {
rel.setCollectedfrom(List(collectedFromMap("pdb")).asJava)
rel.setDataInfo(DATA_INFO)
rel.setRelType("resultResult")
rel.setRelType(ModelConstants.RESULT_RESULT)
rel.setSubRelType(subRelType)
rel.setRelClass(relClass)
@ -263,7 +263,7 @@ object BioDBToOAF {
def createSupplementaryRelation(pid: String, pidType: String, sourceId: String, collectedFrom: KeyValue, date:String): Relation = {
createRelation(pid,pidType,sourceId,collectedFrom, "supplement","IsSupplementTo", date)
createRelation(pid,pidType,sourceId,collectedFrom, ModelConstants.SUPPLEMENT, ModelConstants.IS_SUPPLEMENT_TO, date)
}
@ -392,6 +392,6 @@ object BioDBToOAF {
i.setDateofacceptance(OafMapperUtils.field(GraphCleaningFunctions.cleanDate(input.date), DATA_INFO))
d.setDateofacceptance(OafMapperUtils.field(GraphCleaningFunctions.cleanDate(input.date), DATA_INFO))
List(d, createRelation(input.pmid, "pmid", d.getId, collectedFromMap("ebi"),"relationship", "isRelatedTo", GraphCleaningFunctions.cleanDate(input.date)))
List(d, createRelation(input.pmid, "pmid", d.getId, collectedFromMap("ebi"), ModelConstants.RELATIONSHIP, ModelConstants.IS_RELATED_TO, GraphCleaningFunctions.cleanDate(input.date)))
}
}

@ -16,7 +16,7 @@ object PubMedToOaf {
)
def createResult(cobjQualifier: Qualifier, vocabularies: VocabularyGroup): Result = {
val result_typologies = getVocabularyTerm("dnet:result_typologies", vocabularies, cobjQualifier.getClassid)
val result_typologies = getVocabularyTerm(ModelConstants.DNET_RESULT_TYPOLOGIES, vocabularies, cobjQualifier.getClassid)
result_typologies.getClassid match {
case "dataset" => new Dataset
case "publication" => new Publication
@ -68,11 +68,11 @@ object PubMedToOaf {
//else We have to find a terms that match the vocabulary otherwise we discard it
val ja = article.getPublicationTypes.asScala.find(s => "Journal Article".equalsIgnoreCase(s.getValue))
if (ja.isDefined) {
val cojbCategory = getVocabularyTerm("dnet:publication_resource", vocabularies, ja.get.getValue)
val cojbCategory = getVocabularyTerm(ModelConstants.DNET_PUBLICATION_RESOURCE, vocabularies, ja.get.getValue)
i.setInstancetype(cojbCategory)
} else {
val i_type = article.getPublicationTypes.asScala
.map(s => getVocabularyTerm("dnet:publication_resource", vocabularies, s.getValue))
.map(s => getVocabularyTerm(ModelConstants.DNET_PUBLICATION_RESOURCE, vocabularies, s.getValue))
.find(q => q != null)
if (i_type.isDefined)
i.setInstancetype(i_type.get)
@ -112,7 +112,7 @@ object PubMedToOaf {
if (article.getLanguage != null) {
val term = vocabularies.getSynonymAsQualifier("dnet:languages", article.getLanguage)
val term = vocabularies.getSynonymAsQualifier(ModelConstants.DNET_LANGUAGES, article.getLanguage)
if (term != null)
result.setLanguage(term)
}

@ -1,11 +1,10 @@
package eu.dnetlib.dhp.sx.graph.scholix
import eu.dnetlib.dhp.schema.oaf.{Dataset, Relation, Result, StructuredProperty}
import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixCollectedFrom, ScholixEntityId, ScholixIdentifier, ScholixRelationship, ScholixResource}
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Result, StructuredProperty}
import eu.dnetlib.dhp.schema.sx.scholix._
import eu.dnetlib.dhp.schema.sx.summary.{CollectedFromType, SchemeValue, ScholixSummary, Typology}
import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.spark.sql.Encoders.bean
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}
import org.json4s
@ -301,14 +300,14 @@ object ScholixUtils {
if (r.getPid == null || r.getPid.isEmpty)
return null
val pids:List[ScholixIdentifier] = extractTypedIdentifierFromInstance(r)
if (pids.isEmpty)
val persistentIdentifiers:List[ScholixIdentifier] = extractTypedIdentifierFromInstance(r)
if (persistentIdentifiers.isEmpty)
return null
s.setLocalIdentifier(pids.asJava)
if (r.isInstanceOf[Dataset])
s.setTypology(Typology.dataset)
else
s.setLocalIdentifier(persistentIdentifiers.asJava)
if (r.isInstanceOf[Publication] )
s.setTypology(Typology.publication)
else
s.setTypology(Typology.dataset)
s.setSubType(r.getInstance().get(0).getInstancetype.getClassname)

@ -0,0 +1,5 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true}
]

@ -0,0 +1,85 @@
<workflow-app name="Create Raw Graph Step 1: extract Entities in raw graph" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the working dir base path</description>
</property>
<property>
<name>targetPath</name>
<description>the graph Raw base path</description>
</property>
</parameters>
<start to="ExtractEntities"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ExtractEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extract entities in raw graph</name>
<class>eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=2000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
</spark>
<ok to="DropDedupPath"/>
<error to="Kill"/>
</action>
<action name="DropDedupPath">
<fs>
<delete path='${targetPath}/dedup'/>
<mkdir path='${targetPath}/dedup/'/>
</fs>
<ok to="GenerateInputGraphForDedup"/>
<error to="Kill"/>
</action>
<action name="GenerateInputGraphForDedup">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Generate Input Graph for deduplication</name>
<class>eu.dnetlib.dhp.sx.graph.SparkConvertDatasetToJsonRDD</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=3000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--sourcePath</arg><arg>${targetPath}/preprocess</arg>
<arg>--targetPath</arg><arg>${targetPath}/dedup</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

@ -1,4 +1,4 @@
<workflow-app name="Create Raw Graph Step 1: extract Entities in raw graph" xmlns="uri:oozie:workflow:0.5">
<workflow-app name="Create Scholix final Graph" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
@ -6,29 +6,29 @@
</property>
<property>
<name>targetPath</name>
<description>the graph Raw base path</description>
<description>the final graph path</description>
</property>
</parameters>
<start to="ExtractEntities"/>
<start to="ImportDatasetEntities"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ExtractEntities">
<action name="ImportDatasetEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extract entities in raw graph</name>
<class>eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph</class>
<name>Import JSONRDD to Dataset kryo</name>
<class>eu.dnetlib.dhp.sx.graph.SparkConvertRDDtoDataset</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=2000
--conf spark.sql.shuffle.partitions=3000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
@ -37,33 +37,6 @@
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
</spark>
<ok to="ResolveRelations"/>
<error to="Kill"/>
</action>
<action name="ResolveRelations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Resolve Relations in raw graph</name>
<class>eu.dnetlib.dhp.sx.graph.SparkResolveRelation</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=3000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--relationPath</arg><arg>${targetPath}/extracted/relation</arg>
<arg>--workingPath</arg><arg>${targetPath}/resolved/</arg>
<arg>--entityPath</arg><arg>${targetPath}/dedup</arg>
</spark>
<ok to="CreateSummaries"/>
<error to="Kill"/>
</action>
@ -87,7 +60,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--sourcePath</arg><arg>${targetPath}/dedup</arg>
<arg>--sourcePath</arg><arg>${targetPath}/entities</arg>
<arg>--targetPath</arg><arg>${targetPath}/provision/summaries</arg>
</spark>
<ok to="CreateScholix"/>
@ -114,7 +87,7 @@
<arg>--master</arg><arg>yarn</arg>
<arg>--summaryPath</arg><arg>${targetPath}/provision/summaries</arg>
<arg>--targetPath</arg><arg>${targetPath}/provision/scholix</arg>
<arg>--relationPath</arg><arg>${targetPath}/resolved/resolvedRelation</arg>
<arg>--relationPath</arg><arg>${targetPath}/relation</arg>
</spark>
<ok to="DropJSONPath"/>
@ -182,9 +155,5 @@
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

@ -0,0 +1,62 @@
<workflow-app name="Resolve Relation" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>entityPath</name>
<description>the path of deduplicate Entities</description>
</property>
<property>
<name>relationPath</name>
<description>the path of relation unresolved</description>
</property>
<property>
<name>targetPath</name>
<description>the path of relation unresolved</description>
</property>
</parameters>
<start to="DropRelFolder"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="DropRelFolder">
<fs>
<delete path='${targetPath}/relation'/>
<delete path='${targetPath}/relation_resolved'/>
<delete path='${targetPath}/resolvedSource'/>
<delete path='${targetPath}/resolvedPid'/>
</fs>
<ok to="ResolveRelations"/>
<error to="Kill"/>
</action>
<action name="ResolveRelations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Resolve Relations in raw graph</name>
<class>eu.dnetlib.dhp.sx.graph.SparkResolveRelation</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=3000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--relationPath</arg><arg>${relationPath}</arg>
<arg>--workingPath</arg><arg>${targetPath}</arg>
<arg>--entityPath</arg><arg>${entityPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

@ -1,120 +0,0 @@
<workflow-app name="Create Raw Graph Step 2: Map XML to OAF Entities" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>workingPath</name>
<description>the working path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
</parameters>
<start to="ExtractDLIPublication"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ExtractDLIPublication">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Extract DLI Entities (Publication)</name>
<class>eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=5000
${sparkExtraOPT}
</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>-e</arg><arg>publication</arg>
</spark>
<ok to="ExtractDLIDataset"/>
<error to="Kill"/>
</action>
<action name="ExtractDLIDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Extract DLI Entities (Dataset)</name>
<class>eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=5000
${sparkExtraOPT}
</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>-e</arg><arg>dataset</arg>
</spark>
<ok to="ExtractDLIUnknown"/>
<error to="Kill"/>
</action>
<action name="ExtractDLIUnknown">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Extract DLI Entities (Unknown)</name>
<class>eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=5000
${sparkExtraOPT}
</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>-e</arg><arg>unknown</arg>
</spark>
<ok to="ExtractDLIRelation"/>
<error to="Kill"/>
</action>
<action name="ExtractDLIRelation">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Extract DLI Entities (Relation)</name>
<class>eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=5000
${sparkExtraOPT}
</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>-e</arg><arg>relation</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

@ -1,61 +0,0 @@
<workflow-app name="Create Raw Graph Final Step: Construct the Scholexplorer Raw Graph" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>targetPath</name>
<description>the source path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>entity</name>
<description>the entity to be merged</description>
</property>
</parameters>
<start to="DeleteTargetPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="DeleteTargetPath">
<fs>
<mkdir path="${targetPath}"/>
<delete path='${targetPath}/${entity}'/>
</fs>
<ok to="MergeDLIEntities"/>
<error to="Kill"/>
</action>
<action name="MergeDLIEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Merge ${entity}</name>
<class>eu.dnetlib.dhp.sx.graph.SparkScholexplorerCreateRawGraphJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT}</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/${entity}</arg>
<arg>--targetPath</arg><arg>${targetPath}/${entity}</arg>
<arg>--entity</arg><arg>${entity}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

@ -24,6 +24,7 @@ import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.clean.GraphCleaningFunctionsTest;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -250,7 +251,24 @@ public class MappersTest {
final Relation r1 = (Relation) list.get(1);
final Relation r2 = (Relation) list.get(2);
assertEquals(d.getId(), r1.getSource());
assertEquals("40|corda_______::e06332dee33bec6c2ba4c98601053229", r1.getTarget());
assertEquals(ModelConstants.RESULT_PROJECT, r1.getRelType());
assertEquals(ModelConstants.OUTCOME, r1.getSubRelType());
assertEquals(ModelConstants.IS_PRODUCED_BY, r1.getRelClass());
assertTrue(r1.getValidated());
assertEquals("2020-01-01", r1.getValidationDate());
assertEquals(d.getId(), r2.getTarget());
assertEquals("40|corda_______::e06332dee33bec6c2ba4c98601053229", r2.getSource());
assertEquals(ModelConstants.RESULT_PROJECT, r2.getRelType());
assertEquals(ModelConstants.OUTCOME, r2.getSubRelType());
assertEquals(ModelConstants.PRODUCES, r2.getRelClass());
assertTrue(r2.getValidated());
assertEquals("2020-01-01", r2.getValidationDate());
assertValidId(d.getId());
assertEquals("50|doi_________::000374d100a9db469bd42b69dbb40b36", d.getId());
assertEquals(2, d.getOriginalId().size());
assertTrue(d.getOriginalId().stream().anyMatch(oid -> oid.equals("oai:zenodo.org:3234526")));
assertValidId(d.getCollectedfrom().get(0).getKey());
@ -304,10 +322,12 @@ public class MappersTest {
});
assertEquals("0001", d.getInstance().get(0).getRefereed().getClassid());
assertNotNull(d.getInstance().get(0).getPid());
assertTrue(d.getInstance().get(0).getPid().isEmpty());
assertFalse(d.getInstance().get(0).getPid().isEmpty());
assertEquals("doi", d.getInstance().get(0).getAlternateIdentifier().get(0).getQualifier().getClassid());
assertEquals("10.5281/zenodo.3234526", d.getInstance().get(0).getAlternateIdentifier().get(0).getValue());
assertEquals("doi", d.getInstance().get(0).getPid().get(0).getQualifier().getClassid());
assertEquals("10.5281/zenodo.3234526", d.getInstance().get(0).getPid().get(0).getValue());
assertTrue(d.getInstance().get(0).getAlternateIdentifier().isEmpty());
assertValidId(r1.getSource());
assertValidId(r1.getTarget());
@ -561,6 +581,31 @@ public class MappersTest {
assertNotNull(d.getInstance().get(0).getUrl());
}
@Test
void testEnermaps() throws IOException {
final String xml = IOUtils.toString(getClass().getResourceAsStream("enermaps.xml"));
final List<Oaf> list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml);
System.out.println("***************");
System.out.println(new ObjectMapper().writeValueAsString(list));
System.out.println("***************");
assertEquals(1, list.size());
assertTrue(list.get(0) instanceof Dataset);
final Dataset d = (Dataset) list.get(0);
assertValidId(d.getId());
assertValidId(d.getCollectedfrom().get(0).getKey());
assertTrue(StringUtils.isNotBlank(d.getTitle().get(0).getValue()));
assertEquals(1, d.getAuthor().size());
assertEquals(1, d.getInstance().size());
assertNotNull(d.getInstance().get(0).getUrl());
assertNotNull(d.getContext());
assertTrue(StringUtils.isNotBlank(d.getContext().get(0).getId()));
assertEquals("enermaps::selection::tgs00004", d.getContext().get(0).getId());
}
@Test
void testClaimFromCrossref() throws IOException {
final String xml = IOUtils.toString(getClass().getResourceAsStream("oaf_claim_crossref.xml"));
@ -713,12 +758,11 @@ public class MappersTest {
}
private void assertValidId(final String id) {
System.out.println(id);
// System.out.println(id);
assertEquals(49, id.length());
assertEquals('|', id.charAt(2));
assertEquals(':', id.charAt(15));
assertEquals(':', id.charAt(16));
assertEquals(IdentifierFactory.ID_PREFIX_SEPARATOR, id.substring(2, 3));
assertEquals(IdentifierFactory.ID_SEPARATOR, id.substring(15, 17));
}
private List<String> vocs() throws IOException {

@ -9,6 +9,41 @@
<artifactId>dhp-graph-provision</artifactId>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.0.1</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>initialize</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<args>
<arg>-Xmax-classfile-name</arg>
<arg>200</arg>
</args>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>

@ -71,6 +71,9 @@ public class DropAndCreateESIndex {
log.info(STATUS_CODE_TEXT, response.getStatusLine());
}
log.info("Sleeping 60 seconds to avoid to lost the creation of index request");
Thread.sleep(60000);
try (CloseableHttpClient client = HttpClients.createDefault()) {
final String summaryConf = IOUtils

@ -21,6 +21,7 @@ import com.google.common.collect.Lists;
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import eu.dnetlib.dhp.oa.provision.utils.ContextDef;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
import eu.dnetlib.dhp.schema.oaf.Dataset;

@ -13,7 +13,7 @@ echo "Getting file from " $SCRIPT_PATH
hdfs dfs -copyToLocal $SCRIPT_PATH
echo "Creating indicators"
impala-shell -d ${TARGET} -q "invalidate metadata"
impala-shell -q "invalidate metadata"
impala-shell -d ${TARGET} -q "show tables" --delimited | sed "s/^\(.*\)/compute stats ${TARGET}.\1;/" | impala-shell -c -f -
cat step16_7-createIndicatorsTables.sql | impala-shell -d $TARGET -f -
echo "Indicators created"

@ -57,12 +57,14 @@ UNION ALL
SELECT * FROM ${stats_db_name}.software_sources
UNION ALL
SELECT * FROM ${stats_db_name}.otherresearchproduct_sources;
--
-- ANALYZE TABLE ${stats_db_name}.publication_sources COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.publication_sources COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.dataset_sources COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.dataset_sources COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.software_sources COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.software_sources COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_sources COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_sources COMPUTE STATISTICS FOR COLUMNS;
create table ${stats_db_name}.result_orcid as
select distinct res.id, regexp_replace(res.orcid, 'http://orcid.org/' ,'') as orcid
from (
SELECT substr(res.id, 4) as id, auth_pid.value as orcid
FROM ${openaire_db_name}.result res
LATERAL VIEW explode(author) a as auth
LATERAL VIEW explode(auth.pid) ap as auth_pid
LATERAL VIEW explode(auth.pid.qualifier.classid) apt as author_pid_type
WHERE res.datainfo.deletedbyinference = FALSE and res.datainfo.invisible = FALSE and author_pid_type = 'orcid') as res

@ -33,13 +33,4 @@ select * from ${stats_db_name}.dataset_refereed
union all
select * from ${stats_db_name}.software_refereed
union all
select * from ${stats_db_name}.otherresearchproduct_refereed;
--
-- ANALYZE TABLE ${stats_db_name}.publication_refereed COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.publication_refereed COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.dataset_refereed COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.dataset_refereed COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.software_refereed COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.software_refereed COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_refereed COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_refereed COMPUTE STATISTICS FOR COLUMNS;
select * from ${stats_db_name}.otherresearchproduct_refereed;

@ -39,4 +39,198 @@ from publication p
join result_instance ri on ri.id = p.id
join datasource on datasource.id = ri.hostedby
where datasource.id like '%doajarticles%') tmp
on p.id= tmp.id;
on p.id= tmp.id;
create table indi_project_pubs_count stored as parquet as
select pr.id id, count(p.id) total_pubs from project_results pr
join publication p on p.id=pr.result
group by pr.id;
create table indi_project_datasets_count stored as parquet as
select pr.id id, count(d.id) total_datasets from project_results pr
join dataset d on d.id=pr.result
group by pr.id;
create table indi_project_software_count stored as parquet as
select pr.id id, count(s.id) total_software from project_results pr
join software s on s.id=pr.result
group by pr.id;
create table indi_project_otherresearch_count stored as parquet as
select pr.id id, count(o.id) total_other from project_results pr
join otherresearchproduct o on o.id=pr.result
group by pr.id;
create table indi_pub_avg_year_country_oa stored as parquet as
select year, country, round(OpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageOA,
round(NonOpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageNonOA
from
(SELECT year, country, SUM(CASE
WHEN bestlicence='Open Access' THEN 1
ELSE 0
END) AS OpenAccess, SUM(CASE
WHEN bestlicence<>'Open Access' THEN 1
ELSE 0
END) AS NonOpenAccess
FROM publication p
join result_organization ro on p.id=ro.id
join organization o on o.id=ro.organization
where cast(year as int)>=2003 and cast(year as int)<=2021
group by year, country) tmp;
create table indi_dataset_avg_year_country_oa stored as parquet as
select year, country, round(OpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageOA,
round(NonOpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageNonOA
from
(SELECT year, country, SUM(CASE
WHEN bestlicence='Open Access' THEN 1
ELSE 0
END) AS OpenAccess, SUM(CASE
WHEN bestlicence<>'Open Access' THEN 1
ELSE 0
END) AS NonOpenAccess
FROM dataset d
join result_organization ro on d.id=ro.id
join organization o on o.id=ro.organization
where cast(year as int)>=2003 and cast(year as int)<=2021
group by year, country) tmp;
create table indi_software_avg_year_country_oa stored as parquet as
select year, country, round(OpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageOA,
round(NonOpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageNonOA
from
(SELECT year, country, SUM(CASE
WHEN bestlicence='Open Access' THEN 1
ELSE 0
END) AS OpenAccess, SUM(CASE
WHEN bestlicence<>'Open Access' THEN 1
ELSE 0
END) AS NonOpenAccess
FROM software s
join result_organization ro on s.id=ro.id
join SOURCER.organization o on o.id=ro.organization
where cast(year as int)>=2003 and cast(year as int)<=2021
group by year, country) tmp;
create table indi_other_avg_year_country_oa stored as parquet as
select year, country, round(OpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageOA,
round(NonOpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageNonOA
from
(SELECT year, country, SUM(CASE
WHEN bestlicence='Open Access' THEN 1
ELSE 0
END) AS OpenAccess, SUM(CASE
WHEN bestlicence<>'Open Access' THEN 1
ELSE 0
END) AS NonOpenAccess
FROM otherresearchproduct orp
join result_organization ro on orp.id=ro.id
join organization o on o.id=ro.organization
where cast(year as int)>=2003 and cast(year as int)<=2021
group by year, country) tmp;
create table indi_pub_avg_year_context_oa stored as parquet as
with total as
(select count(distinct pc.id) no_of_pubs, year, c.name name, sum(count(distinct pc.id)) over(PARTITION by year) as total from publication_concepts pc
join context c on pc.concept like concat('%',c.id,'%')
join publication p on p.id=pc.id
where cast(year as int)>=2003 and cast(year as int)<=2021
group by c.name, year )
select year, name, round(no_of_pubs/total*100,3) averageofpubs
from total;
create table indi_dataset_avg_year_context_oa stored as parquet as
with total as
(select count(distinct pc.id) no_of_pubs, year, c.name name, sum(count(distinct pc.id)) over(PARTITION by year) as total from dataset_concepts pc
join context c on pc.concept like concat('%',c.id,'%')
join dataset p on p.id=pc.id
where cast(year as int)>=2003 and cast(year as int)<=2021
group by c.name, year )
select year, name, round(no_of_pubs/total*100,3) averageofdataset
from total;
create table indi_software_avg_year_context_oa stored as parquet as
with total as
(select count(distinct pc.id) no_of_pubs, year, c.name name, sum(count(distinct pc.id)) over(PARTITION by year) as total from software_concepts pc
join context c on pc.concept like concat('%',c.id,'%')
join software p on p.id=pc.id
where cast(year as int)>=2003 and cast(year as int)<=2021
group by c.name, year )
select year, name, round(no_of_pubs/total*100,3) averageofsoftware
from total;
create table indi_other_avg_year_context_oa stored as parquet as
with total as
(select count(distinct pc.id) no_of_pubs, year, c.name name, sum(count(distinct pc.id)) over(PARTITION by year) as total from otherresearchproduct_concepts pc
join context c on pc.concept like concat('%',c.id,'%')
join otherresearchproduct p on p.id=pc.id
where cast(year as int)>=2003 and cast(year as int)<=2021
group by c.name, year )
select year, name, round(no_of_pubs/total*100,3) averageofother
from total;
create table indi_other_avg_year_content_oa stored as parquet as
with total as
(select count(distinct pd.id) no_of_pubs, year, d.type type, sum(count(distinct pd.id)) over(PARTITION by year) as total
from otherresearchproduct_datasources pd
join datasource d on datasource=d.id
join otherresearchproduct p on p.id=pd.id
where cast(year as int)>=2003 and cast(year as int)<=2021
group by d.type, year)
select year, type, round(no_of_pubs/total*100,3) averageOfOtherresearchproduct
from total;
create table indi_software_avg_year_content_oa stored as parquet as
with total as
(select count(distinct pd.id) no_of_pubs, year, d.type type, sum(count(distinct pd.id)) over(PARTITION by year) as total
from software_datasources pd
join datasource d on datasource=d.id
join software p on p.id=pd.id
where cast(year as int)>=2003 and cast(year as int)<=2021
group by d.type, year)
select year, type, round(no_of_pubs/total*100,3) averageOfSoftware
from total;
create table indi_dataset_avg_year_content_oa stored as parquet as
with total as
(select count(distinct pd.id) no_of_pubs, year, d.type type, sum(count(distinct pd.id)) over(PARTITION by year) as total
from dataset_datasources pd
join datasource d on datasource=d.id
join dataset p on p.id=pd.id
where cast(year as int)>=2003 and cast(year as int)<=2021
group by d.type, year)
select year, type, round(no_of_pubs/total*100,3) averageOfDatasets
from total;
create table indi_pub_avg_year_content_oa stored as parquet as
with total as
(select count(distinct pd.id) no_of_pubs, year, d.type type, sum(count(distinct pd.id)) over(PARTITION by year) as total
from publication_datasources pd
join datasource d on datasource=d.id
join publication p on p.id=pd.id
where cast(year as int)>=2003 and cast(year as int)<=2021
group by d.type, year)
select year, type, round(no_of_pubs/total*100,3) averageOfPubs
from total;
create table indi_pub_has_cc_licence stored as parquet as
select distinct p.id, (case when lic='' or lic is null then 0 else 1 end) as has_cc_license
from publication p
left outer join (select p.id, license.type as lic from publication p
join publication_licenses as license on license.id = p.id
where lower(license.type) LIKE '%creativecommons.org%' OR lower(license.type) LIKE '%cc-%') tmp
on p.id= tmp.id;
create table indi_pub_has_cc_licence_url stored as parquet as
select distinct p.id, (case when lic_host='' or lic_host is null then 0 else 1 end) as has_cc_license_url
from publication p
left outer join (select p.id, lower(parse_url(license.type, "HOST")) as lic_host
from publication p
join publication_licenses as license on license.id = p.id
WHERE lower(parse_url(license.type, 'HOST')) = 'creativecommons.org') tmp
on p.id= tmp.id;
create table indi_pub_has_abstract stored as parquet as
select distinct publication.id, coalesce(abstract, 1) has_abstract
from publication;

@ -90,27 +90,8 @@ FROM ${openaire_db_name}.publication p
where p.datainfo.deletedbyinference = false;
CREATE TABLE ${stats_db_name}.publication_citations AS
SELECT substr(p.id, 4) AS id, xpath_string(citation.value, "//citation/id[@type='openaire']/@value") AS result
SELECT substr(p.id, 4) AS id, xpath_string(citation.value, "//citation/id[@type='openaire']/@value") AS cites
FROM ${openaire_db_name}.publication p
lateral view explode(p.extrainfo) citations AS citation
WHERE xpath_string(citation.value, "//citation/id[@type='openaire']/@value") != ""
and p.datainfo.deletedbyinference = false;
-- ANALYZE TABLE ${stats_db_name}.publication_tmp COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.publication_tmp COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.publication_classifications COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.publication_classifications COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.publication_concepts COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.publication_concepts COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.publication_datasources COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.publication_datasources COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.publication_languages COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.publication_languages COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.publication_oids COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.publication_oids COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.publication_pids COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.publication_pids COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.publication_topics COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.publication_topics COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.publication_citations COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.publication_citations COMPUTE STATISTICS FOR COLUMNS;
and p.datainfo.deletedbyinference = false;

@ -116,6 +116,13 @@ compute stats TARGET.indi_pub_doi_from_crossref;
create table TARGET.indi_pub_gold_oa as select * from SOURCE.indi_pub_gold_oa orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_pub_gold_oa;
create view TARGET.indi_dataset_avg_year_country_oa as select * from SOURCE.indi_dataset_avg_year_country_oa orig;
create view TARGET.indi_project_datasets_count as select * from SOURCE.indi_project_datasets_count orig;
create view TARGET.indi_project_otherresearch_count as select * from SOURCE.indi_project_otherresearch_count orig;
create view TARGET.indi_project_pubs_count as select * from SOURCE.indi_project_pubs_count orig;
create view TARGET.indi_project_software_count as select * from SOURCE.indi_project_software_count orig;
create view TARGET.indi_pub_avg_year_country_oa as select * from SOURCE.indi_pub_avg_year_country_oa orig;
--denorm
alter table TARGET.result rename to TARGET.res_tmp;

@ -41,7 +41,7 @@ FROM ${openaire_db_name}.dataset d
WHERE d.datainfo.deletedbyinference = FALSE;
CREATE TABLE ${stats_db_name}.dataset_citations AS
SELECT substr(d.id, 4) AS id, xpath_string(citation.value, "//citation/id[@type='openaire']/@value") AS result
SELECT substr(d.id, 4) AS id, xpath_string(citation.value, "//citation/id[@type='openaire']/@value") AS cites
FROM ${openaire_db_name}.dataset d
LATERAL VIEW explode(d.extrainfo) citations AS citation
WHERE xpath_string(citation.value, "//citation/id[@type='openaire']/@value") != ""
@ -95,21 +95,4 @@ CREATE TABLE ${stats_db_name}.dataset_topics AS
SELECT substr(p.id, 4) AS id, subjects.subject.qualifier.classname AS type, subjects.subject.value AS topic
FROM ${openaire_db_name}.dataset p
LATERAL VIEW explode(p.subject) subjects AS subject
where p.datainfo.deletedbyinference = false;
--
-- ANALYZE TABLE ${stats_db_name}.dataset_tmp COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.dataset_tmp COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.dataset_classifications COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.dataset_classifications COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.dataset_concepts COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.dataset_concepts COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.dataset_datasources COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.dataset_datasources COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.dataset_languages COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.dataset_languages COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.dataset_oids COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.dataset_oids COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.dataset_pids COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.dataset_pids COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.dataset_topics COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.dataset_topics COMPUTE STATISTICS FOR COLUMNS;
where p.datainfo.deletedbyinference = false;

@ -41,7 +41,7 @@ from ${openaire_db_name}.software s
where s.datainfo.deletedbyinference = false;
CREATE TABLE ${stats_db_name}.software_citations AS
SELECT substr(s.id, 4) as id, xpath_string(citation.value, "//citation/id[@type='openaire']/@value") AS RESULT
SELECT substr(s.id, 4) as id, xpath_string(citation.value, "//citation/id[@type='openaire']/@value") AS cites
FROM ${openaire_db_name}.software s
LATERAL VIEW explode(s.extrainfo) citations as citation
where xpath_string(citation.value, "//citation/id[@type='openaire']/@value") != ""
@ -95,21 +95,4 @@ CREATE TABLE ${stats_db_name}.software_topics AS
SELECT substr(p.id, 4) AS id, subjects.subject.qualifier.classname AS type, subjects.subject.value AS topic
FROM ${openaire_db_name}.software p
LATERAL VIEW explode(p.subject) subjects AS subject
where p.datainfo.deletedbyinference = false;
--
-- ANALYZE TABLE ${stats_db_name}.software_tmp COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.software_tmp COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.software_classifications COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.software_classifications COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.software_concepts COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.software_concepts COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.software_datasources COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.software_datasources COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.software_languages COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.software_languages COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.software_oids COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.software_oids COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.software_pids COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.software_pids COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.software_topics COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.software_topics COMPUTE STATISTICS FOR COLUMNS;
where p.datainfo.deletedbyinference = false;

@ -41,7 +41,7 @@ WHERE o.datainfo.deletedbyinference = FALSE;
-- Otherresearchproduct_citations
CREATE TABLE ${stats_db_name}.otherresearchproduct_citations AS
SELECT substr(o.id, 4) AS id, xpath_string(citation.value, "//citation/id[@type='openaire']/@value") AS RESULT
SELECT substr(o.id, 4) AS id, xpath_string(citation.value, "//citation/id[@type='openaire']/@value") AS cites
FROM ${openaire_db_name}.otherresearchproduct o LATERAL VIEW explode(o.extrainfo) citations AS citation
WHERE xpath_string(citation.value, "//citation/id[@type='openaire']/@value") != ""
and o.datainfo.deletedbyinference = false;
@ -86,21 +86,4 @@ where p.datainfo.deletedbyinference = false;
CREATE TABLE ${stats_db_name}.otherresearchproduct_topics AS
SELECT substr(p.id, 4) AS id, subjects.subject.qualifier.classname AS type, subjects.subject.value AS topic
FROM ${openaire_db_name}.otherresearchproduct p LATERAL VIEW explode(p.subject) subjects AS subject
where p.datainfo.deletedbyinference = false;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_tmp COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_tmp COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_classifications COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_classifications COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_concepts COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_concepts COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_datasources COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_datasources COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_languages COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_languages COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_oids COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_oids COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_pids COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_pids COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_topics COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.otherresearchproduct_topics COMPUTE STATISTICS FOR COLUMNS;
where p.datainfo.deletedbyinference = false;

@ -13,11 +13,17 @@ WHERE r.reltype = 'projectOrganization'
and r.datainfo.deletedbyinference = false;
CREATE TABLE ${stats_db_name}.project_results AS
SELECT substr(r.target, 4) AS id, substr(r.source, 4) AS result
SELECT substr(r.target, 4) AS id, substr(r.source, 4) AS result, r.datainfo.provenanceaction.classname as provenance
FROM ${openaire_db_name}.relation r
WHERE r.reltype = 'resultProject'
and r.datainfo.deletedbyinference = false;
create table ${stats_db_name}.project_classification as
select substr(p.id, 4) as id, class.h2020programme.code, class.level1, class.level2, class.level3
from ${openaire_db_name}.project p
lateral view explode(p.h2020classification) classifs as class
where p.datainfo.deletedbyinference=false and class.h2020programme is not null;
CREATE TABLE ${stats_db_name}.project_tmp
(
id STRING,

@ -130,12 +130,7 @@ WHERE r.reltype = 'resultOrganization'
and r.datainfo.deletedbyinference = false;
CREATE TABLE ${stats_db_name}.result_projects AS
select pr.result AS id, pr.id AS project, datediff(p.enddate, p.startdate) AS daysfromend
select pr.result AS id, pr.id AS project, datediff(p.enddate, p.startdate) AS daysfromend, pr.provenance as provenance
FROM ${stats_db_name}.result r
JOIN ${stats_db_name}.project_results pr ON r.id = pr.result
JOIN ${stats_db_name}.project_tmp p ON p.id = pr.id;
-- ANALYZE TABLE ${stats_db_name}.result_organization COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.result_organization COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.result_projects COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.result_projects COMPUTE STATISTICS FOR COLUMNS;
JOIN ${stats_db_name}.project_tmp p ON p.id = pr.id;

@ -17,7 +17,9 @@ CREATE TABLE ${stats_db_name}.datasource_tmp
`latitude` STRING,
`longitude` STRING,
`websiteurl` STRING,
`compatibility` STRING
`compatibility` STRING,
issn_printed STRING,
issn_online STRING
) CLUSTERED BY (id) INTO 100 buckets stored AS orc tblproperties ('transactional' = 'true');
-- Insert statement that takes into account the piwik_id of the openAIRE graph
@ -32,7 +34,9 @@ SELECT substr(d1.id, 4) AS id,
d1.latitude.value AS latitude,
d1.longitude.value AS longitude,
d1.websiteurl.value AS websiteurl,
d1.openairecompatibility.classid AS compatibility
d1.openairecompatibility.classid AS compatibility,
d1.journal.issnprinted AS issn_printed,
d1.journal.issnonline AS issn_online
FROM ${openaire_db_name}.datasource d1
LEFT OUTER JOIN
(SELECT id, split(originalidd, '\\:')[1] as piwik_id
@ -51,7 +55,7 @@ CREATE TABLE ${stats_db_name}.dual
INSERT INTO ${stats_db_name}.dual
VALUES ('X');
INSERT INTO ${stats_db_name}.datasource_tmp (`id`, `name`, `type`, `dateofvalidation`, `yearofvalidation`, `harvested`,
`piwik_id`, `latitude`, `longitude`, `websiteurl`, `compatibility`)
`piwik_id`, `latitude`, `longitude`, `websiteurl`, `compatibility`, `issn_printed`, `issn_online`)
SELECT 'other',
'Other',
'Repository',
@ -62,7 +66,9 @@ SELECT 'other',
NULL,
NULL,
NULL,
'unknown'
'unknown',
null,
null
FROM ${stats_db_name}.dual
WHERE 'other' not in (SELECT id FROM ${stats_db_name}.datasource_tmp WHERE name = 'Unknown Repository');
DROP TABLE ${stats_db_name}.dual;
@ -97,13 +103,4 @@ where d.datainfo.deletedbyinference = false;
CREATE OR REPLACE VIEW ${stats_db_name}.datasource_results AS
SELECT datasource AS id, id AS result
FROM ${stats_db_name}.result_datasources;
-- ANALYZE TABLE ${stats_db_name}.datasource_tmp COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.datasource_tmp COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.datasource_languages COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.datasource_languages COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.datasource_oids COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.datasource_oids COMPUTE STATISTICS FOR COLUMNS;
-- ANALYZE TABLE ${stats_db_name}.datasource_organizations COMPUTE STATISTICS;
-- ANALYZE TABLE ${stats_db_name}.datasource_organizations COMPUTE STATISTICS FOR COLUMNS;
FROM ${stats_db_name}.result_datasources;

@ -741,7 +741,7 @@
<mockito-core.version>3.3.3</mockito-core.version>
<mongodb.driver.version>3.4.2</mongodb.driver.version>
<vtd.version>[2.12,3.0)</vtd.version>
<dhp-schemas.version>[2.6.14]</dhp-schemas.version>
<dhp-schemas.version>[2.7.15]</dhp-schemas.version>
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>

Loading…
Cancel
Save