mergin with branch beta

pull/159/head
Miriam Baglioni 2 years ago
commit 8905a39bf3

@ -22,9 +22,20 @@
<id>dnet45-releases</id>
<url>https://maven.d4science.org/nexus/content/repositories/dnet45-releases</url>
</repository>
<site>
<id>DHPSite</id>
<url>${dhp.site.stage.path}/dhp-build/dhp-code-style</url>
</site>
</distributionManagement>
<build>
<extensions>
<extension>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-ssh</artifactId>
<version>2.10</version>
</extension>
</extensions>
<pluginManagement>
<plugins>
<plugin>
@ -35,7 +46,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
<version>3.9.1</version>
</plugin>
</plugins>
</pluginManagement>
@ -43,6 +54,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<dhp.site.stage.path>sftp://dnet-hadoop@static-web.d4science.org/dnet-hadoop</dhp.site.stage.path>
</properties>
</project>

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="ISO-8859-1"?>
<project xmlns="http://maven.apache.org/DECORATION/1.8.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/DECORATION/1.8.0 https://maven.apache.org/xsd/decoration-1.8.0.xsd"
name="DHP-Aggregation">
<skin>
<groupId>org.apache.maven.skins</groupId>
<artifactId>maven-fluido-skin</artifactId>
<version>1.8</version>
</skin>
<poweredBy>
<logo name="OpenAIRE Research Graph" href="https://graph.openaire.eu/"
img="https://graph.openaire.eu/assets/common-assets/logo-large-graph.png"/>
</poweredBy>
<body>
<links>
<item name="Code" href="https://code-repo.d4science.org/" />
</links>
<menu ref="modules" />
<menu ref="reports"/>
</body>
</project>

@ -10,6 +10,9 @@
<packaging>pom</packaging>
<description>This module is a container for the build tools used in dnet-hadoop</description>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
</properties>
<modules>
<module>dhp-code-style</module>
@ -17,4 +20,12 @@
<module>dhp-build-properties-maven-plugin</module>
</modules>
<distributionManagement>
<site>
<id>DHPSite</id>
<url>${dhp.site.stage.path}/dhp-build/</url>
</site>
</distributionManagement>
</project>

@ -0,0 +1,22 @@
<?xml version="1.0" encoding="ISO-8859-1"?>
<project xmlns="http://maven.apache.org/DECORATION/1.8.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/DECORATION/1.8.0 https://maven.apache.org/xsd/decoration-1.8.0.xsd"
name="DHP-Aggregation">
<skin>
<groupId>org.apache.maven.skins</groupId>
<artifactId>maven-fluido-skin</artifactId>
<version>1.8</version>
</skin>
<poweredBy>
<logo name="OpenAIRE Research Graph" href="https://graph.openaire.eu/"
img="https://graph.openaire.eu/assets/common-assets/logo-large-graph.png"/>
</poweredBy>
<body>
<links>
<item name="Code" href="https://code-repo.d4science.org/" />
</links>
<menu ref="modules" />
<menu ref="reports"/>
</body>
</project>

@ -13,7 +13,51 @@
<artifactId>dhp-common</artifactId>
<packaging>jar</packaging>
<distributionManagement>
<site>
<id>DHPSite</id>
<url>${dhp.site.stage.path}/dhp-common</url>
</site>
</distributionManagement>
<description>This module contains common utilities meant to be used across the dnet-hadoop submodules</description>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>${net.alchim31.maven.version}</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>initialize</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
<execution>
<id>scala-doc</id>
<phase>process-resources</phase> <!-- or wherever -->
<goals>
<goal>doc</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>

@ -0,0 +1,72 @@
package eu.dnetlib.dhp.application
import scala.io.Source
/**
* This is the main Interface SparkApplication
* where all the Spark Scala class should inherit
*
*/
trait SparkScalaApplication {
/**
* This is the path in the classpath of the json
* describes all the argument needed to run
*/
val propertyPath: String
/**
* Utility to parse the arguments using the
* property json in the classpath identified from
* the variable propertyPath
*
* @param args the list of arguments
*/
def parseArguments(args: Array[String]): ArgumentApplicationParser = {
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream(propertyPath)).mkString)
parser.parseArgument(args)
parser
}
/**
* Here all the spark applications runs this method
* where the whole logic of the spark node is defined
*/
def run(): Unit
}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.Logger
abstract class AbstractScalaApplication (val propertyPath:String, val args:Array[String], log:Logger) extends SparkScalaApplication {
var parser: ArgumentApplicationParser = null
var spark:SparkSession = null
def initialize():SparkScalaApplication = {
parser = parseArguments(args)
spark = createSparkSession()
this
}
/**
* Utility for creating a spark session starting from parser
*
* @return a spark Session
*/
private def createSparkSession():SparkSession = {
require(parser!= null)
val conf:SparkConf = new SparkConf()
val master = parser.get("master")
log.info(s"Creating Spark session: Master: $master")
SparkSession.builder().config(conf)
.appName(getClass.getSimpleName)
.master(master)
.getOrCreate()
}
}

@ -0,0 +1,134 @@
package eu.dnetlib.dhp.datacite
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.{DataInfo, KeyValue}
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils
import java.io.InputStream
import java.time.format.DateTimeFormatter
import java.util.Locale
import java.util.regex.Pattern
import scala.io.Source
/**
* This class represent the dataModel of the input Dataset of Datacite
* @param doi THE DOI
* @param timestamp timestamp of last update date
* @param isActive the record is active or deleted
* @param json the json native records
*/
case class DataciteType(doi: String, timestamp: Long, isActive: Boolean, json: String) {}
/*
The following class are utility class used for the mapping from
json datacite to OAF Shema
*/
case class RelatedIdentifierType(relationType: String, relatedIdentifier: String, relatedIdentifierType: String) {}
case class NameIdentifiersType(nameIdentifierScheme: Option[String], schemeUri: Option[String], nameIdentifier: Option[String]) {}
case class CreatorType(nameType: Option[String], nameIdentifiers: Option[List[NameIdentifiersType]], name: Option[String], familyName: Option[String], givenName: Option[String], affiliation: Option[List[String]]) {}
case class TitleType(title: Option[String], titleType: Option[String], lang: Option[String]) {}
case class SubjectType(subject: Option[String], subjectScheme: Option[String]) {}
case class DescriptionType(descriptionType: Option[String], description: Option[String]) {}
case class FundingReferenceType(funderIdentifierType: Option[String], awardTitle: Option[String], awardUri: Option[String], funderName: Option[String], funderIdentifier: Option[String], awardNumber: Option[String]) {}
case class DateType(date: Option[String], dateType: Option[String]) {}
case class OAFRelations(relation:String, inverse:String, relType:String)
class DataciteModelConstants extends Serializable {
}
object DataciteModelConstants {
val REL_TYPE_VALUE:String = "resultResult"
val DATE_RELATION_KEY = "RelationDate"
val DATACITE_FILTER_PATH = "/eu/dnetlib/dhp/datacite/datacite_filter"
val DOI_CLASS = "doi"
val SUBJ_CLASS = "keywords"
val DATACITE_NAME = "Datacite"
val dataInfo: DataInfo = dataciteDataInfo("0.9")
val DATACITE_COLLECTED_FROM: KeyValue = OafMapperUtils.keyValue(ModelConstants.DATACITE_ID, DATACITE_NAME)
val subRelTypeMapping: Map[String,OAFRelations] = Map(
ModelConstants.REFERENCES -> OAFRelations(ModelConstants.REFERENCES, ModelConstants.IS_REFERENCED_BY, ModelConstants.RELATIONSHIP),
ModelConstants.IS_REFERENCED_BY -> OAFRelations(ModelConstants.IS_REFERENCED_BY,ModelConstants.REFERENCES, ModelConstants.RELATIONSHIP),
ModelConstants.IS_SUPPLEMENTED_BY -> OAFRelations(ModelConstants.IS_SUPPLEMENTED_BY,ModelConstants.IS_SUPPLEMENT_TO,ModelConstants.SUPPLEMENT),
ModelConstants.IS_SUPPLEMENT_TO -> OAFRelations(ModelConstants.IS_SUPPLEMENT_TO,ModelConstants.IS_SUPPLEMENTED_BY,ModelConstants.SUPPLEMENT),
ModelConstants.HAS_PART -> OAFRelations(ModelConstants.HAS_PART,ModelConstants.IS_PART_OF, ModelConstants.PART),
ModelConstants.IS_PART_OF -> OAFRelations(ModelConstants.IS_PART_OF,ModelConstants.HAS_PART, ModelConstants.PART),
ModelConstants.IS_VERSION_OF-> OAFRelations(ModelConstants.IS_VERSION_OF,ModelConstants.HAS_VERSION,ModelConstants.VERSION),
ModelConstants.HAS_VERSION-> OAFRelations(ModelConstants.HAS_VERSION,ModelConstants.IS_VERSION_OF,ModelConstants.VERSION),
ModelConstants.IS_IDENTICAL_TO -> OAFRelations(ModelConstants.IS_IDENTICAL_TO,ModelConstants.IS_IDENTICAL_TO, ModelConstants.RELATIONSHIP),
ModelConstants.IS_CONTINUED_BY -> OAFRelations(ModelConstants.IS_CONTINUED_BY,ModelConstants.CONTINUES, ModelConstants.RELATIONSHIP),
ModelConstants.CONTINUES -> OAFRelations(ModelConstants.CONTINUES,ModelConstants.IS_CONTINUED_BY, ModelConstants.RELATIONSHIP),
ModelConstants.IS_NEW_VERSION_OF-> OAFRelations(ModelConstants.IS_NEW_VERSION_OF,ModelConstants.IS_PREVIOUS_VERSION_OF, ModelConstants.VERSION),
ModelConstants.IS_PREVIOUS_VERSION_OF ->OAFRelations(ModelConstants.IS_PREVIOUS_VERSION_OF,ModelConstants.IS_NEW_VERSION_OF, ModelConstants.VERSION),
ModelConstants.IS_DOCUMENTED_BY -> OAFRelations(ModelConstants.IS_DOCUMENTED_BY,ModelConstants.DOCUMENTS, ModelConstants.RELATIONSHIP),
ModelConstants.DOCUMENTS -> OAFRelations(ModelConstants.DOCUMENTS,ModelConstants.IS_DOCUMENTED_BY, ModelConstants.RELATIONSHIP),
ModelConstants.IS_SOURCE_OF -> OAFRelations(ModelConstants.IS_SOURCE_OF,ModelConstants.IS_DERIVED_FROM, ModelConstants.VERSION),
ModelConstants.IS_DERIVED_FROM -> OAFRelations(ModelConstants.IS_DERIVED_FROM,ModelConstants.IS_SOURCE_OF, ModelConstants.VERSION),
ModelConstants.CITES -> OAFRelations(ModelConstants.CITES,ModelConstants.IS_CITED_BY, ModelConstants.CITATION),
ModelConstants.IS_CITED_BY -> OAFRelations(ModelConstants.IS_CITED_BY,ModelConstants.CITES, ModelConstants.CITATION),
ModelConstants.IS_VARIANT_FORM_OF -> OAFRelations(ModelConstants.IS_VARIANT_FORM_OF,ModelConstants.IS_DERIVED_FROM, ModelConstants.VERSION),
ModelConstants.IS_OBSOLETED_BY -> OAFRelations(ModelConstants.IS_OBSOLETED_BY,ModelConstants.IS_NEW_VERSION_OF, ModelConstants.VERSION),
ModelConstants.REVIEWS -> OAFRelations(ModelConstants.REVIEWS,ModelConstants.IS_REVIEWED_BY, ModelConstants.REVIEW),
ModelConstants.IS_REVIEWED_BY -> OAFRelations(ModelConstants.IS_REVIEWED_BY,ModelConstants.REVIEWS, ModelConstants.REVIEW),
ModelConstants.DOCUMENTS -> OAFRelations(ModelConstants.DOCUMENTS,ModelConstants.IS_DOCUMENTED_BY, ModelConstants.RELATIONSHIP),
ModelConstants.IS_DOCUMENTED_BY -> OAFRelations(ModelConstants.IS_DOCUMENTED_BY,ModelConstants.DOCUMENTS, ModelConstants.RELATIONSHIP),
ModelConstants.COMPILES -> OAFRelations(ModelConstants.COMPILES,ModelConstants.IS_COMPILED_BY, ModelConstants.RELATIONSHIP),
ModelConstants.IS_COMPILED_BY -> OAFRelations(ModelConstants.IS_COMPILED_BY,ModelConstants.COMPILES, ModelConstants.RELATIONSHIP)
)
val datacite_filter: List[String] = {
val stream: InputStream = getClass.getResourceAsStream(DATACITE_FILTER_PATH)
require(stream!= null)
Source.fromInputStream(stream).getLines().toList
}
def dataciteDataInfo(trust: String): DataInfo = OafMapperUtils.dataInfo(false,null, false, false, ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER, trust)
val df_en: DateTimeFormatter = DateTimeFormatter.ofPattern("[MM-dd-yyyy][MM/dd/yyyy][dd-MM-yy][dd-MMM-yyyy][dd/MMM/yyyy][dd-MMM-yy][dd/MMM/yy][dd-MM-yy][dd/MM/yy][dd-MM-yyyy][dd/MM/yyyy][yyyy-MM-dd][yyyy/MM/dd]", Locale.ENGLISH)
val df_it: DateTimeFormatter = DateTimeFormatter.ofPattern("[dd-MM-yyyy][dd/MM/yyyy]", Locale.ITALIAN)
val funder_regex: List[(Pattern, String)] = List(
(Pattern.compile("(info:eu-repo/grantagreement/ec/h2020/)(\\d\\d\\d\\d\\d\\d)(.*)", Pattern.MULTILINE | Pattern.CASE_INSENSITIVE), "40|corda__h2020::"),
(Pattern.compile("(info:eu-repo/grantagreement/ec/fp7/)(\\d\\d\\d\\d\\d\\d)(.*)", Pattern.MULTILINE | Pattern.CASE_INSENSITIVE), "40|corda_______::")
)
val Date_regex: List[Pattern] = List(
//Y-M-D
Pattern.compile("(18|19|20)\\d\\d([- /.])(0[1-9]|1[012])\\2(0[1-9]|[12][0-9]|3[01])", Pattern.MULTILINE),
//M-D-Y
Pattern.compile("((0[1-9]|1[012])|([1-9]))([- /.])(0[1-9]|[12][0-9]|3[01])([- /.])(18|19|20)?\\d\\d", Pattern.MULTILINE),
//D-M-Y
Pattern.compile("(?:(?:31(/|-|\\.)(?:0?[13578]|1[02]|(?:Jan|Mar|May|Jul|Aug|Oct|Dec)))\\1|(?:(?:29|30)(/|-|\\.)(?:0?[1,3-9]|1[0-2]|(?:Jan|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec))\\2))(?:(?:1[6-9]|[2-9]\\d)?\\d{2})|(?:29(/|-|\\.)(?:0?2|(?:Feb))\\3(?:(?:(?:1[6-9]|[2-9]\\d)?(?:0[48]|[2468][048]|[13579][26])|(?:(?:16|[2468][048]|[3579][26])00))))|(?:0?[1-9]|1\\d|2[0-8])(/|-|\\.)(?:(?:0?[1-9]|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep))|(?:1[0-2]|(?:Oct|Nov|Dec)))\\4(?:(?:1[6-9]|[2-9]\\d)?\\d{2})", Pattern.MULTILINE),
//Y
Pattern.compile("(19|20)\\d\\d", Pattern.MULTILINE)
)
}

@ -2,6 +2,7 @@ package eu.dnetlib.dhp.datacite
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.datacite.DataciteModelConstants._
import eu.dnetlib.dhp.schema.action.AtomicAction
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, OafMapperUtils}
@ -12,121 +13,30 @@ import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JField, JObject, JString}
import org.json4s.jackson.JsonMethods.parse
import java.nio.charset.CodingErrorAction
import java.text.SimpleDateFormat
import java.time.LocalDate
import java.time.chrono.ThaiBuddhistDate
import java.time.format.DateTimeFormatter
import java.util.regex.Pattern
import java.util.{Date, Locale}
import scala.collection.JavaConverters._
import scala.io.{Codec, Source}
import scala.language.postfixOps
case class DataciteType(doi: String, timestamp: Long, isActive: Boolean, json: String) {}
case class RelatedIdentifierType(relationType: String, relatedIdentifier: String, relatedIdentifierType: String) {}
case class NameIdentifiersType(nameIdentifierScheme: Option[String], schemeUri: Option[String], nameIdentifier: Option[String]) {}
case class CreatorType(nameType: Option[String], nameIdentifiers: Option[List[NameIdentifiersType]], name: Option[String], familyName: Option[String], givenName: Option[String], affiliation: Option[List[String]]) {}
case class TitleType(title: Option[String], titleType: Option[String], lang: Option[String]) {}
case class SubjectType(subject: Option[String], subjectScheme: Option[String]) {}
case class DescriptionType(descriptionType: Option[String], description: Option[String]) {}
case class FundingReferenceType(funderIdentifierType: Option[String], awardTitle: Option[String], awardUri: Option[String], funderName: Option[String], funderIdentifier: Option[String], awardNumber: Option[String]) {}
case class DateType(date: Option[String], dateType: Option[String]) {}
case class HostedByMapType(openaire_id: String, datacite_name: String, official_name: String, similarity: Option[Float]) {}
object DataciteToOAFTransformation {
val REL_TYPE_VALUE:String = "resultResult"
val DATE_RELATION_KEY = "RelationDate"
val subRelTypeMapping: Map[String,(String,String)] = Map(
"References" ->("IsReferencedBy","relationship"),
"IsSupplementTo" ->("IsSupplementedBy","supplement"),
"IsPartOf" ->("HasPart","part"),
"HasPart" ->("IsPartOf","part"),
"IsVersionOf" ->("HasVersion","version"),
"HasVersion" ->("IsVersionOf","version"),
"IsIdenticalTo" ->("IsIdenticalTo","relationship"),
"IsPreviousVersionOf" ->("IsNewVersionOf","version"),
"IsContinuedBy" ->("Continues","relationship"),
"Continues" ->("IsContinuedBy","relationship"),
"IsNewVersionOf" ->("IsPreviousVersionOf","version"),
"IsSupplementedBy" ->("IsSupplementTo","supplement"),
"IsDocumentedBy" ->("Documents","relationship"),
"IsSourceOf" ->("IsDerivedFrom","relationship"),
"Cites" ->("IsCitedBy","citation"),
"IsCitedBy" ->("Cites","citation"),
"IsDerivedFrom" ->("IsSourceOf","relationship"),
"IsVariantFormOf" ->("IsDerivedFrom","version"),
"IsReferencedBy" ->("References","relationship"),
"IsObsoletedBy" ->("IsNewVersionOf","version"),
"Reviews" ->("IsReviewedBy","review"),
"Documents" ->("IsDocumentedBy","relationship"),
"IsCompiledBy" ->("Compiles","relationship"),
"Compiles" ->("IsCompiledBy","relationship"),
"IsReviewedBy" ->("Reviews","review")
)
implicit val codec: Codec = Codec("UTF-8")
codec.onMalformedInput(CodingErrorAction.REPLACE)
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
val DOI_CLASS = "doi"
val SUBJ_CLASS = "keywords"
val j_filter: List[String] = {
val s = Source.fromInputStream(getClass.getResourceAsStream("datacite_filter")).mkString
s.lines.toList
}
val mapper = new ObjectMapper()
val unknown_repository: HostedByMapType = HostedByMapType(ModelConstants.UNKNOWN_REPOSITORY_ORIGINALID, ModelConstants.UNKNOWN_REPOSITORY.getValue, ModelConstants.UNKNOWN_REPOSITORY.getValue, Some(1.0F))
val dataInfo: DataInfo = generateDataInfo("0.9")
val DATACITE_COLLECTED_FROM: KeyValue = OafMapperUtils.keyValue(ModelConstants.DATACITE_ID, "Datacite")
val hostedByMap: Map[String, HostedByMapType] = {
val s = Source.fromInputStream(getClass.getResourceAsStream("hostedBy_map.json")).mkString
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: org.json4s.JValue = parse(s)
json.extract[Map[String, HostedByMapType]]
}
val df_en: DateTimeFormatter = DateTimeFormatter.ofPattern("[MM-dd-yyyy][MM/dd/yyyy][dd-MM-yy][dd-MMM-yyyy][dd/MMM/yyyy][dd-MMM-yy][dd/MMM/yy][dd-MM-yy][dd/MM/yy][dd-MM-yyyy][dd/MM/yyyy][yyyy-MM-dd][yyyy/MM/dd]", Locale.ENGLISH)
val df_it: DateTimeFormatter = DateTimeFormatter.ofPattern("[dd-MM-yyyy][dd/MM/yyyy]", Locale.ITALIAN)
val funder_regex: List[(Pattern, String)] = List(
(Pattern.compile("(info:eu-repo/grantagreement/ec/h2020/)(\\d\\d\\d\\d\\d\\d)(.*)", Pattern.MULTILINE | Pattern.CASE_INSENSITIVE), "40|corda__h2020::"),
(Pattern.compile("(info:eu-repo/grantagreement/ec/fp7/)(\\d\\d\\d\\d\\d\\d)(.*)", Pattern.MULTILINE | Pattern.CASE_INSENSITIVE), "40|corda_______::")
)
val Date_regex: List[Pattern] = List(
//Y-M-D
Pattern.compile("(18|19|20)\\d\\d([- /.])(0[1-9]|1[012])\\2(0[1-9]|[12][0-9]|3[01])", Pattern.MULTILINE),
//M-D-Y
Pattern.compile("((0[1-9]|1[012])|([1-9]))([- /.])(0[1-9]|[12][0-9]|3[01])([- /.])(18|19|20)?\\d\\d", Pattern.MULTILINE),
//D-M-Y
Pattern.compile("(?:(?:31(/|-|\\.)(?:0?[13578]|1[02]|(?:Jan|Mar|May|Jul|Aug|Oct|Dec)))\\1|(?:(?:29|30)(/|-|\\.)(?:0?[1,3-9]|1[0-2]|(?:Jan|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec))\\2))(?:(?:1[6-9]|[2-9]\\d)?\\d{2})|(?:29(/|-|\\.)(?:0?2|(?:Feb))\\3(?:(?:(?:1[6-9]|[2-9]\\d)?(?:0[48]|[2468][048]|[13579][26])|(?:(?:16|[2468][048]|[3579][26])00))))|(?:0?[1-9]|1\\d|2[0-8])(/|-|\\.)(?:(?:0?[1-9]|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep))|(?:1[0-2]|(?:Oct|Nov|Dec)))\\4(?:(?:1[6-9]|[2-9]\\d)?\\d{2})", Pattern.MULTILINE),
//Y
Pattern.compile("(19|20)\\d\\d", Pattern.MULTILINE)
)
def filter_json(json: String): Boolean = {
j_filter.exists(f => json.contains(f))
/**
* This method should skip record if json contains invalid text
* defined in gile datacite_filter
* @param json
* @return True if the record should be skipped
*/
def skip_record(json: String): Boolean = {
datacite_filter.exists(f => json.contains(f))
}
@deprecated("this method will be removed", "dhp")
def toActionSet(item: Oaf): (String, String) = {
val mapper = new ObjectMapper()
@ -206,6 +116,8 @@ object DataciteToOAFTransformation {
case _: Throwable => ""
}
}
def getTypeQualifier(resourceType: String, resourceTypeGeneral: String, schemaOrg: String, vocabularies: VocabularyGroup): (Qualifier, Qualifier) = {
if (resourceType != null && resourceType.nonEmpty) {
val typeQualifier = vocabularies.getSynonymAsQualifier(ModelConstants.DNET_PUBLICATION_RESOURCE, resourceType)
@ -324,11 +236,7 @@ object DataciteToOAFTransformation {
val p = match_pattern.get._2
val grantId = m.matcher(awardUri).replaceAll("$2")
val targetId = s"$p${DHPUtils.md5(grantId)}"
List(
generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo)
// REMOVED INVERSE RELATION since there is a specific method that should generate later
// generateRelation(targetId, sourceId, "produces", DATACITE_COLLECTED_FROM, dataInfo)
)
List( generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo) )
}
else
List()
@ -337,7 +245,7 @@ object DataciteToOAFTransformation {
def generateOAF(input: String, ts: Long, dateOfCollection: Long, vocabularies: VocabularyGroup, exportLinks: Boolean): List[Oaf] = {
if (filter_json(input))
if (skip_record(input))
return List()
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
@ -516,8 +424,8 @@ object DataciteToOAFTransformation {
val access_rights_qualifier = if (aRights.isDefined) aRights.get else OafMapperUtils.accessRight(ModelConstants.UNKNOWN, ModelConstants.NOT_AVAILABLE, ModelConstants.DNET_ACCESS_MODES, ModelConstants.DNET_ACCESS_MODES)
if (client.isDefined) {
val hb = hostedByMap.getOrElse(client.get.toUpperCase(), unknown_repository)
instance.setHostedby(OafMapperUtils.keyValue(generateDSId(hb.openaire_id), hb.official_name))
instance.setHostedby(OafMapperUtils.keyValue(generateDSId(ModelConstants.UNKNOWN_REPOSITORY_ORIGINALID), ModelConstants.UNKNOWN_REPOSITORY.getValue))
instance.setCollectedfrom(DATACITE_COLLECTED_FROM)
instance.setUrl(List(s"https://dx.doi.org/$doi").asJava)
instance.setAccessright(access_rights_qualifier)
@ -571,7 +479,7 @@ object DataciteToOAFTransformation {
rel.setCollectedfrom(List(DATACITE_COLLECTED_FROM).asJava)
rel.setDataInfo(dataInfo)
val subRelType = subRelTypeMapping(r.relationType)._2
val subRelType = subRelTypeMapping(r.relationType).relType
rel.setRelType(REL_TYPE_VALUE)
rel.setSubRelType(subRelType)
rel.setRelClass(r.relationType)
@ -585,18 +493,9 @@ object DataciteToOAFTransformation {
rel.setCollectedfrom(List(DATACITE_COLLECTED_FROM).asJava)
rel.getCollectedfrom.asScala.map(c => c.getValue).toList
rel
}).toList
})
}
def generateDataInfo(trust: String): DataInfo = {
val di = new DataInfo
di.setDeletedbyinference(false)
di.setInferred(false)
di.setInvisible(false)
di.setTrust(trust)
di.setProvenanceaction(ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER)
di
}
def generateDSId(input: String): String = {
val b = StringUtils.substringBefore(input, "::")

@ -1,64 +1,94 @@
package eu.dnetlib.dhp.datacite
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.application.AbstractScalaApplication
import eu.dnetlib.dhp.collection.CollectionUtils.fixRelations
import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH
import eu.dnetlib.dhp.common.Constants.MDSTORE_SIZE_PATH
import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH}
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.mdstore.{MDStoreVersion, MetadataRecord}
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile
import eu.dnetlib.dhp.utils.ISLookupClientFactory
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source
object GenerateDataciteDatasetSpark {
val log: Logger = LoggerFactory.getLogger(GenerateDataciteDatasetSpark.getClass)
class GenerateDataciteDatasetSpark (propertyPath:String, args:Array[String], log:Logger) extends AbstractScalaApplication(propertyPath, args, log:Logger) {
/**
* Here all the spark applications runs this method
* where the whole logic of the spark node is defined
*/
override def run(): Unit = {
def main(args: Array[String]): Unit = {
val conf = new SparkConf
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/datacite/generate_dataset_params.json")).mkString)
parser.parseArgument(args)
val master = parser.get("master")
val sourcePath = parser.get("sourcePath")
log.info(s"SourcePath is '$sourcePath'")
val exportLinks = "true".equalsIgnoreCase(parser.get("exportLinks"))
log.info(s"exportLinks is '$exportLinks'")
val isLookupUrl: String = parser.get("isLookupUrl")
log.info("isLookupUrl: {}", isLookupUrl)
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
val spark: SparkSession = SparkSession.builder().config(conf)
.appName(GenerateDataciteDatasetSpark.getClass.getSimpleName)
.master(master)
.getOrCreate()
import spark.implicits._
implicit val mrEncoder: Encoder[MetadataRecord] = Encoders.kryo[MetadataRecord]
implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
require(vocabularies != null)
val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'")
val mapper = new ObjectMapper()
val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
val outputBasePath = cleanedMdStoreVersion.getHdfsPath
log.info("outputBasePath: {}", outputBasePath)
log.info(s"outputBasePath is '$outputBasePath'")
val targetPath = s"$outputBasePath/$MDSTORE_DATA_PATH"
log.info(s"targetPath is '$targetPath'")
generateDataciteDataset(sourcePath, exportLinks, vocabularies, targetPath, spark)
reportTotalSize(targetPath, outputBasePath)
}
/**
* For working with MDStore we need to store in a file on hdfs the size of
* the current dataset
* @param targetPath
* @param outputBasePath
*/
def reportTotalSize( targetPath: String, outputBasePath: String ):Unit = {
val total_items = spark.read.load(targetPath).count()
writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$total_items", outputBasePath + MDSTORE_SIZE_PATH)
}
/**
* Generate the transformed and cleaned OAF Dataset from the native one
* @param sourcePath sourcePath of the native Dataset in format JSON/Datacite
* @param exportLinks If true it generates unresolved links
* @param vocabularies vocabularies for cleaning
* @param targetPath the targetPath of the result Dataset
*/
def generateDataciteDataset(sourcePath: String, exportLinks: Boolean, vocabularies: VocabularyGroup, targetPath: String, spark:SparkSession):Unit = {
require(spark!= null)
import spark.implicits._
implicit val mrEncoder: Encoder[MetadataRecord] = Encoders.kryo[MetadataRecord]
implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
spark.read.load(sourcePath).as[DataciteType]
.filter(d => d.isActive)
.flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks))
.filter(d => d != null)
.flatMap(i => fixRelations(i)).filter(i => i != null)
.write.mode(SaveMode.Overwrite).save(targetPath)
}
val total_items = spark.read.load(targetPath).as[Oaf].count()
writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$total_items", outputBasePath + MDSTORE_SIZE_PATH)
}
object GenerateDataciteDatasetSpark {
val log: Logger = LoggerFactory.getLogger(GenerateDataciteDatasetSpark.getClass)
def main(args: Array[String]): Unit = {
new GenerateDataciteDatasetSpark("/eu/dnetlib/dhp/datacite/generate_dataset_params.json", args, log).initialize().run()
}
}

@ -7,3 +7,6 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
log4j.logger.org.apache.spark=FATAL
log4j.logger.org.spark_project=FATAL

@ -1,9 +1,20 @@
##DHP-Aggregation
This module defines a set of oozie workflows for the **collection** and **transformation** of metadata records.
This module defines a set of oozie workflows for
Both workflows interact with the Metadata Store Manager (MdSM) to handle the logical transactions required to ensure
1. the **collection** and **transformation** of metadata records.
2. the **integration** of new external information in the result
### Collection and Transformation
The workflows interact with the Metadata Store Manager (MdSM) to handle the logical transactions required to ensure
the consistency of the read/write operations on the data as the MdSM in fact keeps track of the logical-physical mapping
of each MDStore.
It defines [mappings](mappings.md) for transformation of different datasource (See mapping section).
It defines [mappings](mappings.md) for transformation of different datasource (See mapping section).
### Integration of external information in the result
The workflows create new entity in the OpenAIRE format (OAF) whose aim is to enrich the result already contained in the graph.
See integration section for more insight

@ -0,0 +1,36 @@
DHP Aggregation - Integration method
=====================================
The integration method can be applied every time new information, which is not aggregated from the repositories
nor computed directly by OpenAIRE, should be added to the results of the graph.
The information integrated so far is:
1. Article impact measures
1. [Bip!Finder](https://dl.acm.org/doi/10.1145/3357384.3357850) scores
2. Result Subjects
1. Integration of Fields of Science and Techonology ([FOS](https://www.qnrf.org/en-us/FOS)) classification in
results subjects.
The method always consists in the creation of a new entity in the OpenAIRE format (OAF entity) containing only the id
and the element in the OAF model that should be used to map the information we want to integrate.
The id is set by using a particular encoding of the given PID
*unresolved::[pid]::[pidtype]*
where
1. *unresolved* is a constant value
2. *pid* is the persistent id value, e.g. 10.5281/zenodo.4707307
3. *pidtype* is the persistent id type, e.g. doi
Such entities are matched against those available in the graph using the result.instance.pid values.
This mechanism can be used to integrate enrichments produced as associated by a given PID.
If a match will be found with one of the results already in the graph that said result will be enriched with the information
present in the new OAF.
All the entities for which a match is not found are discarded.

@ -4,13 +4,13 @@ This section describes the mapping implemented for [MEDLINE/PubMed](https://pubm
Collection
---------
The native data is collected from [ftp baseline](https://ftp.ncbi.nlm.nih.gov/pubmed/baseline/) containing XML with
the following [shcema](https://www.nlm.nih.gov/bsd/licensee/elements_descriptions.html)
the following [schema](https://www.nlm.nih.gov/bsd/licensee/elements_descriptions.html)
Parsing
-------
The resposible class of parsing is [PMParser](./scaladocs/#eu.dnetlib.dhp.sx.bio.pubmed.PMParser) that generates
an intermediate mapping of PubMed Article defined [here](/apidocs/eu/dnetlib/dhp/sx/bio/pubmed/package-summary.html)
The resposible class of parsing is [PMParser](/dnet-hadoop/scaladocs/#eu.dnetlib.dhp.sx.bio.pubmed.PMParser) that generates
an intermediate mapping of PubMed Article defined [here](/dnet-hadoop/apidocs/eu/dnetlib/dhp/sx/bio/pubmed/package-summary.html)
Mapping
@ -50,6 +50,10 @@ The table below describes the mapping from the XML Native to the OAF mapping
|//Author/FullName| author.Forename| Concatenation of forname + lastName if exist |
|FOR ALL AUTHOR | author.rank| sequential number starting from 1|
#TODO
Missing item mapped

@ -20,7 +20,9 @@
<item name="Pubmed" href="pubmed.html"/>
<item name="Datacite" href="datacite.html"/>
</item>
<item name="Release Notes" href="release-notes.html" />
<item name="Integration" href="integration.html" collapse="true">
</item>
<item name="General Information" href="about.html"/>
<item name="JavaDoc" href="apidocs/" />

@ -4,24 +4,38 @@ package eu.dnetlib.dhp.datacite
import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest
import eu.dnetlib.dhp.schema.oaf.Oaf
import org.apache.commons.io.FileUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, count}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{BeforeEach, Test}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.junit.jupiter.MockitoExtension
import org.slf4j.{Logger, LoggerFactory}
import java.nio.file.{Files, Path}
import java.text.SimpleDateFormat
import java.util.Locale
import scala.io.Source
import org.junit.jupiter.api.Assertions._
@ExtendWith(Array(classOf[MockitoExtension]))
class DataciteToOAFTest extends AbstractVocabularyTest{
private var workingDir:Path = null
val log: Logger = LoggerFactory.getLogger(getClass)
@BeforeEach
def setUp() :Unit = {
workingDir= Files.createTempDirectory(getClass.getSimpleName)
super.setUpVocabulary()
}
@AfterEach
def tearDown() :Unit = {
FileUtils.deleteDirectory(workingDir.toFile)
}
@Test
def testDateMapping:Unit = {
@ -35,11 +49,54 @@ class DataciteToOAFTest extends AbstractVocabularyTest{
@Test
def testMapping() :Unit = {
val record =Source.fromInputStream(getClass.getResourceAsStream("record.json")).mkString
def testConvert(): Unit = {
val path = getClass.getResource("/eu/dnetlib/dhp/actionmanager/datacite/dataset").getPath
val conf = new SparkConf()
val spark:SparkSession = SparkSession.builder().config(conf)
.appName(getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
val instance = new GenerateDataciteDatasetSpark(null, null, log)
val targetPath = s"$workingDir/result"
instance.generateDataciteDataset(path, exportLinks = true, vocabularies,targetPath, spark)
import spark.implicits._
val nativeSize =spark.read.load(path).count()
assertEquals(100, nativeSize)
val result:Dataset[Oaf] = spark.read.load(targetPath).as[Oaf]
result.map(s => s.getClass.getSimpleName).groupBy(col("value").alias("class")).agg(count("value").alias("Total")).show(false)
val t = spark.read.load(targetPath).count()
assertTrue(t >0)
spark.stop()
}
@Test
def testMapping() :Unit = {
val record =Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/record.json")).mkString
val mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT)
val res:List[Oaf] =DataciteToOAFTransformation.generateOAF(record, 0L,0L, vocabularies, true )

@ -0,0 +1,38 @@
package eu.dnetlib.dhp.blacklist;
import java.util.Arrays;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.common.RelationInverse;
public class BlacklistRelationTest {
@Test
public void testRelationInverseLookup() {
final List<String> rels = Arrays
.asList(
"resultResult_relationship_IsRelatedTo",
"resultOrganization_affiliation_isAuthorInstitutionOf",
"resultOrganization_affiliation_hasAuthorInstitution",
"datasourceOrganization_provision_isProvidedBy",
"projectOrganization_participation_hasParticipant",
"resultProject_outcome_produces",
"resultProject_outcome_isProducedBy");
rels.forEach(r -> {
RelationInverse inverse = ModelSupport.relationInverseMap.get(r);
Assertions.assertNotNull(inverse);
Assertions.assertNotNull(inverse.getRelType());
Assertions.assertNotNull(inverse.getSubReltype());
Assertions.assertNotNull(inverse.getRelClass());
});
}
}

@ -19,7 +19,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
@ExtendWith(MockitoExtension.class)
class UpdateMatcherTest {
public class UpdateMatcherTest {
UpdateMatcher<String> matcher = new EnrichMissingPublicationDate();

@ -11,7 +11,7 @@ import org.junit.jupiter.api.Test;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
class EnrichMissingPublicationDateTest {
public class EnrichMissingPublicationDateTest {
final EnrichMissingPublicationDate matcher = new EnrichMissingPublicationDate();

@ -8,7 +8,7 @@ import java.util.Arrays;
import org.junit.jupiter.api.Test;
class SubscriptionUtilsTest {
public class SubscriptionUtilsTest {
@Test
void testVerifyListSimilar() {

@ -9,7 +9,7 @@ import eu.dnetlib.broker.objects.OaBrokerAuthor;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerTypedValue;
class TrustUtilsTest {
public class TrustUtilsTest {
private static final double THRESHOLD = 0.95;

@ -139,14 +139,28 @@ abstract class AbstractSparkAction implements Serializable {
protected boolean isOpenorgs(Relation rel) {
return Optional
.ofNullable(rel.getCollectedfrom())
.map(
c -> c
.stream()
.filter(Objects::nonNull)
.anyMatch(kv -> ModelConstants.OPENORGS_NAME.equals(kv.getValue())))
.map(c -> isCollectedFromOpenOrgs(c))
.orElse(false);
}
protected boolean isOpenorgsDedupRel(Relation rel) {
return isOpenorgs(rel) && isOpenOrgsDedupMergeRelation(rel);
}
private boolean isCollectedFromOpenOrgs(List<KeyValue> c) {
return c
.stream()
.filter(Objects::nonNull)
.anyMatch(kv -> ModelConstants.OPENORGS_NAME.equals(kv.getValue()));
}
private boolean isOpenOrgsDedupMergeRelation(Relation rel) {
return ModelConstants.ORG_ORG_RELTYPE.equals(rel.getRelType()) &&
ModelConstants.DEDUP.equals(rel.getSubRelType())
&& (ModelConstants.IS_MERGED_IN.equals(rel.getRelClass()) ||
ModelConstants.MERGES.equals(rel.getRelClass()));
}
protected static Boolean parseECField(Field<String> field) {
if (field == null)
return null;

@ -14,6 +14,9 @@ import org.xml.sax.SAXException;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -152,4 +155,25 @@ public class DedupUtility {
return o1.compareTo(o2);
}
public static Relation createSimRel(String source, String target, String entity) {
final Relation r = new Relation();
r.setSource(source);
r.setTarget(target);
r.setSubRelType("dedupSimilarity");
r.setRelClass(ModelConstants.IS_SIMILAR_TO);
r.setDataInfo(new DataInfo());
switch (entity) {
case "result":
r.setRelType(ModelConstants.RESULT_RESULT);
break;
case "organization":
r.setRelType(ModelConstants.ORG_ORG_RELTYPE);
break;
default:
throw new IllegalArgumentException("unmanaged entity type: " + entity);
}
return r;
}
}

@ -61,9 +61,11 @@ public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction {
.textFile(relationPath)
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD()
.filter(x -> !isOpenorgs(x));
.filter(x -> !isOpenorgsDedupRel(x));
log.info("Number of non-Openorgs relations collected: {}", simRels.count());
if (log.isDebugEnabled()) {
log.debug("Number of non-Openorgs relations collected: {}", simRels.count());
}
spark
.createDataset(simRels.rdd(), Encoders.bean(Relation.class))

@ -20,6 +20,7 @@ import org.xml.sax.SAXException;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.model.Block;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
@ -102,7 +103,7 @@ public class SparkCreateSimRels extends AbstractSparkAction {
.createDataset(
Deduper
.computeRelations(sc, blocks, dedupConf)
.map(t -> createSimRel(t._1(), t._2(), entity))
.map(t -> DedupUtility.createSimRel(t._1(), t._2(), entity))
.repartition(numPartitions)
.rdd(),
Encoders.bean(Relation.class));
@ -111,24 +112,4 @@ public class SparkCreateSimRels extends AbstractSparkAction {
}
}
private Relation createSimRel(String source, String target, String entity) {
final Relation r = new Relation();
r.setSource(source);
r.setTarget(target);
r.setSubRelType("dedupSimilarity");
r.setRelClass("isSimilarTo");
r.setDataInfo(new DataInfo());
switch (entity) {
case "result":
r.setRelType("resultResult");
break;
case "organization":
r.setRelType("organizationOrganization");
break;
default:
throw new IllegalArgumentException("unmanaged entity type: " + entity);
}
return r;
}
}

@ -124,31 +124,12 @@ public class SparkWhitelistSimRels extends AbstractSparkAction {
Dataset<Relation> whiteListSimRels = whiteListRels2
.map(
(MapFunction<Tuple2<String, String>, Relation>) r -> createSimRel(r._1(), r._2(), entity),
(MapFunction<Tuple2<String, String>, Relation>) r -> DedupUtility
.createSimRel(r._1(), r._2(), entity),
Encoders.bean(Relation.class));
saveParquet(whiteListSimRels, outputPath, SaveMode.Append);
}
}
private Relation createSimRel(String source, String target, String entity) {
final Relation r = new Relation();
r.setSource(source);
r.setTarget(target);
r.setSubRelType("dedupSimilarity");
r.setRelClass("isSimilarTo");
r.setDataInfo(new DataInfo());
switch (entity) {
case "result":
r.setRelType("resultResult");
break;
case "organization":
r.setRelType("organizationOrganization");
break;
default:
throw new IllegalArgumentException("unmanaged entity type: " + entity);
}
return r;
}
}

@ -11,6 +11,8 @@ import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
@ -29,6 +31,8 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
@ -226,9 +230,10 @@ public class SparkOpenorgsProvisionTest implements Serializable {
new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService);
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
final JavaRDD<String> rels = jsc.textFile(testDedupGraphBasePath + "/relation");
assertEquals(2382, rels.count());
assertEquals(2380, relations);
}
@Test
@ -250,7 +255,7 @@ public class SparkOpenorgsProvisionTest implements Serializable {
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
assertEquals(4894, relations);
assertEquals(4896, relations);
// check deletedbyinference
final Dataset<Relation> mergeRels = spark

@ -2518,3 +2518,5 @@
{"subRelType": "dedup", "relClass": "isMergedIn", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|openorgs____::5c351d85f02db01ca291acd119f0bd78", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|opendoar____::37248e2f6987b18670dd2b8a51d6ef55", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []}
{"subRelType": "dedup", "relClass": "merges", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|corda_______::6acb33e6ea8c6fcdabc891c80d083c64", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|openorgs____::e38c1a27fcb0f0ab218828e4f5fc7be9", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []}
{"subRelType": "dedup", "relClass": "isMergedIn", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|openorgs____::e38c1a27fcb0f0ab218828e4f5fc7be9", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|corda_______::6acb33e6ea8c6fcdabc891c80d083c64", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []}
{"subRelType": "relationship", "relClass": "IsParentOf", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|openorgs____::e38c1a27fcb0f0ab218828e4f5fc7be9", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|corda_______::6acb33e6ea8c6fcdabc891c80d083c64", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []}
{"subRelType": "relationship", "relClass": "IsChildOf", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|corda_______::6acb33e6ea8c6fcdabc891c80d083c64", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|openorgs____::e38c1a27fcb0f0ab218828e4f5fc7be9", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []}

@ -146,7 +146,7 @@ object SparkProcessMAG {
.save(s"$workingPath/mag_publication")
spark.read.load(s"$workingPath/mag_publication").as[Publication]
.filter(p => p.getId == null)
.filter(p => p.getId != null)
.groupByKey(p => p.getId)
.reduceGroups((a:Publication, b:Publication) => ConversionUtil.mergePublication(a,b))
.map(_._2)

@ -0,0 +1,26 @@
<?xml version="1.0" encoding="ISO-8859-1"?>
<project xmlns="http://maven.apache.org/DECORATION/1.8.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/DECORATION/1.8.0 https://maven.apache.org/xsd/decoration-1.8.0.xsd"
name="DHP-Aggregation">
<skin>
<groupId>org.apache.maven.skins</groupId>
<artifactId>maven-fluido-skin</artifactId>
<version>1.8</version>
</skin>
<poweredBy>
<logo name="OpenAIRE Research Graph" href="https://graph.openaire.eu/"
img="https://graph.openaire.eu/assets/common-assets/logo-large-graph.png"/>
</poweredBy>
<body>
<links>
<item name="Code" href="https://code-repo.d4science.org/" />
</links>
<menu name="Documentation">
<item name="Link1 Collapsable" href="about.html" collapse="true">
<item name="item1" href="pubmed.html"/>
<item name="item2" href="datacite.html"/>
</item>
</menu>
<menu ref="reports"/>
</body>
</project>

@ -30,6 +30,11 @@ public class OafCleaner implements Serializable {
}
} else if (hasMapping(o, mapping)) {
mapping.get(o.getClass()).accept(o);
for (final Field f : getAllFields(o.getClass())) {
f.setAccessible(true);
final Object val = f.get(o);
navigate(val, mapping);
}
} else {
for (final Field f : getAllFields(o.getClass())) {
f.setAccessible(true);

@ -14,7 +14,7 @@ import org.slf4j.{Logger, LoggerFactory}
object SparkResolveEntities {
val mapper = new ObjectMapper()
val entities = List(EntityType.dataset,EntityType.publication, EntityType.software, EntityType.otherresearchproduct)
val entities = List(EntityType.dataset, EntityType.publication, EntityType.software, EntityType.otherresearchproduct)
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
@ -36,25 +36,19 @@ object SparkResolveEntities {
val unresolvedPath = parser.get("unresolvedPath")
log.info(s"unresolvedPath -> $unresolvedPath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath")
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
fs.mkdirs(new Path(workingPath))
resolveEntities(spark, workingPath, unresolvedPath)
generateResolvedEntities(spark, workingPath, graphBasePath)
// TO BE conservative we keep the original entities in the working dir
// and save the resolved entities on the graphBasePath
//In future these lines of code should be removed
entities.foreach {
e =>
fs.rename(new Path(s"$graphBasePath/$e"), new Path(s"$workingPath/${e}_old"))
fs.rename(new Path(s"$workingPath/resolvedGraph/$e"), new Path(s"$graphBasePath/$e"))
}
}
generateResolvedEntities(spark, workingPath, graphBasePath, targetPath)
}
def resolveEntities(spark: SparkSession, workingPath: String, unresolvedPath: String) = {
def resolveEntities(spark: SparkSession, workingPath: String, unresolvedPath: String) = {
implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
import spark.implicits._
@ -71,37 +65,42 @@ def resolveEntities(spark: SparkSession, workingPath: String, unresolvedPath: St
}
def deserializeObject(input:String, entity:EntityType ) :Result = {
def deserializeObject(input: String, entity: EntityType): Result = {
entity match {
case EntityType.publication => mapper.readValue(input, classOf[Publication])
case EntityType.dataset => mapper.readValue(input, classOf[OafDataset])
case EntityType.software=> mapper.readValue(input, classOf[Software])
case EntityType.otherresearchproduct=> mapper.readValue(input, classOf[OtherResearchProduct])
}
entity match {
case EntityType.publication => mapper.readValue(input, classOf[Publication])
case EntityType.dataset => mapper.readValue(input, classOf[OafDataset])
case EntityType.software => mapper.readValue(input, classOf[Software])
case EntityType.otherresearchproduct => mapper.readValue(input, classOf[OtherResearchProduct])
}
}
def generateResolvedEntities(spark:SparkSession, workingPath: String, graphBasePath:String) = {
def generateResolvedEntities(spark: SparkSession, workingPath: String, graphBasePath: String, targetPath:String) = {
implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
import spark.implicits._
val re:Dataset[Result] = spark.read.load(s"$workingPath/resolvedEntities").as[Result]
val re: Dataset[(String, Result)] = spark.read.load(s"$workingPath/resolvedEntities").as[Result].map(r => (r.getId, r))(Encoders.tuple(Encoders.STRING, resEncoder))
entities.foreach {
e =>
spark.read.text(s"$graphBasePath/$e").as[String]
.map(s => deserializeObject(s, e))
.union(re)
.groupByKey(_.getId)
.reduceGroups {
(x, y) =>
x.mergeFrom(y)
x
}.map(_._2)
.filter(r => r.getClass.getSimpleName.toLowerCase != "result")
.map(r => mapper.writeValueAsString(r))(Encoders.STRING)
.write.mode(SaveMode.Overwrite).option("compression", "gzip").text(s"$workingPath/resolvedGraph/$e")
e => {
val currentEntityDataset: Dataset[(String, Result)] = spark.read.text(s"$graphBasePath/$e").as[String].map(s => deserializeObject(s, e)).map(r => (r.getId, r))(Encoders.tuple(Encoders.STRING, resEncoder))
currentEntityDataset.joinWith(re, currentEntityDataset("_1").equalTo(re("_1")), "left").map(k => {
val a = k._1
val b = k._2
if (b == null)
a._2
else {
a._2.mergeFrom(b._2)
a._2
}
}).map(r => mapper.writeValueAsString(r))(Encoders.STRING)
.write.mode(SaveMode.Overwrite).option("compression", "gzip").text(s"$targetPath/$e")
}
}
}
}

@ -35,6 +35,9 @@ object SparkResolveRelation {
val workingPath = parser.get("workingPath")
log.info(s"workingPath -> $workingPath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath")
implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
import spark.implicits._
@ -80,20 +83,13 @@ object SparkResolveRelation {
.mode(SaveMode.Overwrite)
.save(s"$workingPath/relation_resolved")
// TO BE conservative we keep the original relation in the working dir
// and save the relation resolved on the graphBasePath
//In future this two line of code should be removed
fs.rename(new Path(s"$graphBasePath/relation"), new Path(s"$workingPath/relation"))
spark.read.load(s"$workingPath/relation_resolved").as[Relation]
.filter(r => !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved"))
.map(r => mapper.writeValueAsString(r))
.write
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.text(s"$graphBasePath/relation")
.text(s"$targetPath/relation")
}
def extractInstanceCF(input: String): List[(String, String)] = {

@ -8,3 +8,12 @@ CREATE VIEW IF NOT EXISTS ${hiveDbName}.result as
select id, originalid, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance, measures from ${hiveDbName}.software s
union all
select id, originalid, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance, measures from ${hiveDbName}.otherresearchproduct o;
ANALYZE TABLE ${hiveDbName}.datasource COMPUTE STATISTICS;
ANALYZE TABLE ${hiveDbName}.organization COMPUTE STATISTICS;
ANALYZE TABLE ${hiveDbName}.project COMPUTE STATISTICS;
ANALYZE TABLE ${hiveDbName}.publication COMPUTE STATISTICS;
ANALYZE TABLE ${hiveDbName}.dataset COMPUTE STATISTICS;
ANALYZE TABLE ${hiveDbName}.otherresearchproduct COMPUTE STATISTICS;
ANALYZE TABLE ${hiveDbName}.software COMPUTE STATISTICS;
ANALYZE TABLE ${hiveDbName}.relation COMPUTE STATISTICS;

@ -292,7 +292,7 @@
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Import table project</name>
<name>Import table relation</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableImporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>

@ -8,6 +8,15 @@
<name>unresolvedPath</name>
<description>the path of the unresolved Entities</description>
</property>
<property>
<name>targetPath</name>
<description>the target path after resolution</description>
</property>
<property>
<name>shouldResolveEntities</name>
<value>true</value>
<description>allows to activate/deactivate the resolution process over the entities</description>
</property>
</parameters>
<start to="ResolveRelations"/>
@ -36,11 +45,20 @@
<arg>--master</arg><arg>yarn</arg>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingDir}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
</spark>
<ok to="ResolveEntities"/>
<ok to="decision_resolveEntities"/>
<error to="Kill"/>
</action>
<decision name="decision_resolveEntities">
<switch>
<case to="copy_result">${wf:conf('shouldResolveEntities') eq false}</case>
<case to="ResolveEntities">${wf:conf('shouldResolveEntities') eq true}</case>
<default to="ResolveEntities"/>
</switch>
</decision>
<action name="ResolveEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
@ -62,11 +80,91 @@
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--unresolvedPath</arg><arg>${unresolvedPath}</arg>
<arg>--workingPath</arg><arg>${workingDir}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
</spark>
<ok to="End"/>
<ok to="copy_entities"/>
<error to="Kill"/>
</action>
<end name="End"/>
<fork name="copy_result">
<path start="copy_publication"/>
<path start="copy_dataset"/>
<path start="copy_otherresearchproduct"/>
<path start="copy_software"/>
</fork>
<action name="copy_publication">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/publication</arg>
<arg>${nameNode}/${targetPath}/publication</arg>
</distcp>
<ok to="copy_wait_result"/>
<error to="Kill"/>
</action>
<action name="copy_dataset">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/dataset</arg>
<arg>${nameNode}/${targetPath}/dataset</arg>
</distcp>
<ok to="copy_wait_result"/>
<error to="Kill"/>
</action>
<action name="copy_otherresearchproduct">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/otherresearchproduct</arg>
<arg>${nameNode}/${targetPath}/otherresearchproduct</arg>
</distcp>
<ok to="copy_wait_result"/>
<error to="Kill"/>
</action>
<action name="copy_software">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/software</arg>
<arg>${nameNode}/${targetPath}/software</arg>
</distcp>
<ok to="copy_wait_result"/>
<error to="Kill"/>
</action>
<join name="copy_wait_result" to="copy_entities"/>
<fork name="copy_entities">
<path start="copy_organization"/>
<path start="copy_projects"/>
<path start="copy_datasource"/>
</fork>
<action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/organization</arg>
<arg>${nameNode}/${targetPath}/organization</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<action name="copy_projects">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/project</arg>
<arg>${nameNode}/${targetPath}/project</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<action name="copy_datasource">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/datasource</arg>
<arg>${nameNode}/${targetPath}/datasource</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<join name="copy_wait" to="End"/>
<end name="End"/>
</workflow-app>

@ -2,5 +2,6 @@
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"u", "paramLongName":"unresolvedPath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true}
{"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the target path", "paramRequired": true}
]

@ -1,5 +1,6 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true}
{"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the target path", "paramRequired": true}
]

@ -12,6 +12,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ -66,6 +68,9 @@ public class GraphCleaningFunctionsTest {
Relation r_out = OafCleaner.apply(r_in, mapping);
assertTrue(vocabularies.getTerms(ModelConstants.DNET_RELATION_RELCLASS).contains(r_out.getRelClass()));
assertTrue(vocabularies.getTerms(ModelConstants.DNET_RELATION_SUBRELTYPE).contains(r_out.getSubRelType()));
assertEquals("iis", r_out.getDataInfo().getProvenanceaction().getClassid());
assertEquals("Inferred by OpenAIRE", r_out.getDataInfo().getProvenanceaction().getClassname());
}
}
@ -222,4 +227,27 @@ public class GraphCleaningFunctionsTest {
.readLines(
GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt"));
}
@Test
public void testCleanDoiBoost() throws IOException {
String json = IOUtils
.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/doiboostpub.json"));
Publication p_in = MAPPER.readValue(json, Publication.class);
Publication p_out = OafCleaner.apply(GraphCleaningFunctions.fixVocabularyNames(p_in), mapping);
Publication cleaned = GraphCleaningFunctions.cleanup(p_out);
Assertions.assertEquals(true, GraphCleaningFunctions.filter(cleaned));
}
@Test
public void testCleanDoiBoost2() throws IOException {
String json = IOUtils
.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/doiboostpub2.json"));
Publication p_in = MAPPER.readValue(json, Publication.class);
Publication p_out = OafCleaner.apply(GraphCleaningFunctions.fixVocabularyNames(p_in), mapping);
Publication cleaned = GraphCleaningFunctions.cleanup(p_out);
Assertions.assertEquals(true, GraphCleaningFunctions.filter(cleaned));
}
}

@ -4,7 +4,7 @@ package eu.dnetlib.dhp.oa.graph.resolution
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.schema.common.EntityType
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils
import eu.dnetlib.dhp.schema.oaf.{Result, StructuredProperty}
import eu.dnetlib.dhp.schema.oaf.{Publication, Result, StructuredProperty}
import org.apache.commons.io.FileUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
@ -146,27 +146,47 @@ class ResolveEntitiesTest extends Serializable {
implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
val m = new ObjectMapper()
SparkResolveEntities.resolveEntities(spark,s"$workingDir/work", s"$workingDir/updates" )
SparkResolveEntities.generateResolvedEntities(spark,s"$workingDir/work",s"$workingDir/graph" )
SparkResolveEntities.generateResolvedEntities(spark,s"$workingDir/work",s"$workingDir/graph", s"$workingDir/target" )
val pubDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/publication").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.publication))
val pubDS:Dataset[Result] = spark.read.text(s"$workingDir/target/publication").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.publication))
val t = pubDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
var ct = pubDS.count()
var et = pubDS.filter(p => p.getTitle!= null && p.getTitle.asScala.forall(t => t.getValue != null && t.getValue.nonEmpty)).count()
val datDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/dataset").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.dataset))
assertEquals(ct, et)
val datDS:Dataset[Result] = spark.read.text(s"$workingDir/target/dataset").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.dataset))
val td = datDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
ct = datDS.count()
et = datDS.filter(p => p.getTitle!= null && p.getTitle.asScala.forall(t => t.getValue != null && t.getValue.nonEmpty)).count()
assertEquals(ct, et)
val softDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/software").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.software))
val softDS:Dataset[Result] = spark.read.text(s"$workingDir/target/software").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.software))
val ts = softDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
ct = softDS.count()
et = softDS.filter(p => p.getTitle!= null && p.getTitle.asScala.forall(t => t.getValue != null && t.getValue.nonEmpty)).count()
assertEquals(ct, et)
val orpDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/otherresearchproduct").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.otherresearchproduct))
val orpDS:Dataset[Result] = spark.read.text(s"$workingDir/target/otherresearchproduct").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.otherresearchproduct))
val to = orpDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
ct = orpDS.count()
et = orpDS.filter(p => p.getTitle!= null && p.getTitle.asScala.forall(t => t.getValue != null && t.getValue.nonEmpty)).count()
assertEquals(ct, et)
assertEquals(0, t)
assertEquals(2, td)
assertEquals(1, ts)
@ -178,6 +198,32 @@ class ResolveEntitiesTest extends Serializable {
@Test
def testMerge():Unit = {
val r = new Result
r.setSubject(List(OafMapperUtils.structuredProperty(FAKE_SUBJECT, OafMapperUtils.qualifier("fos","fosCS", "fossSchema", "fossiFIgo"), null)).asJava)
val mapper = new ObjectMapper()
val p = mapper.readValue(Source.fromInputStream(this.getClass.getResourceAsStream(s"publication")).mkString.lines.next(), classOf[Publication])
r.mergeFrom(p)
println(mapper.writeValueAsString(r))
}

@ -0,0 +1 @@
{"context": [], "dataInfo": {"invisible": false, "trust": "0.9", "provenanceaction": {"classid": "sysimport:actionset", "classname": "sysimport:actionset", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "inferred": false, "deletedbyinference": false}, "resourcetype": {"classid": "0001", "classname": "Article", "schemeid": "dnet:publication_resource", "schemename": "dnet:publication_resource"}, "pid": [{"qualifier": {"classid": "doi", "classname": "doi", "schemeid": "dnet:pid_types", "schemename": "dnet:pid_types"}, "value": "10.1097/00132586-197308000-00003"}], "contributor": [], "bestaccessright": {"classid": "UNKNOWN", "classname": "not available", "schemeid": "dnet:access_modes", "schemename": "dnet:access_modes"}, "relevantdate": [{"qualifier": {"classid": "created", "classname": "created", "schemeid": "dnet:dataCite_date", "schemename": "dnet:dataCite_date"}, "value": "2006-11-06T11:36:37Z"}], "collectedfrom": [{"key": "10|openaire____::081b82f96300b6a6e3d282bad31cb6e2", "value": "Crossref"}], "id": "50|doi_________::b0baa0eb88a5788f0b8815560d2a32f2", "subject": [], "lastupdatetimestamp": 1620353302565, "author": [{"fullname": "N. S. AGRUSS", "surname": "AGRUSS", "name": "N. S.", "rank": 1}, {"fullname": "E. Y. ROSIN", "surname": "ROSIN", "name": "E. Y.", "rank": 2}, {"fullname": "R. J. ADOLPH", "surname": "ADOLPH", "name": "R. J.", "rank": 3}, {"fullname": "N. O. FOWLER", "surname": "FOWLER", "name": "N. O.", "rank": 4}], "instance": [{"hostedby": {"key": "10|issn___print::b8cee613d4f898f8c03956d57ea69be2", "value": "Survey of Anesthesiology"}, "url": ["https://doi.org/10.1097/00132586-197308000-00003"], "pid": [{"qualifier": {"classid": "doi", "classname": "doi", "schemeid": "dnet:pid_types", "schemename": "dnet:pid_types"}, "value": "10.1097/00132586-197308000-00003"}], "dateofacceptance": {"value": "2006-11-06T11:36:37Z"}, "collectedfrom": {"key": "10|openaire____::081b82f96300b6a6e3d282bad31cb6e2", "value": "Crossref"}, "accessright": {"classid": "UNKNOWN", "classname": "not available", "schemeid": "dnet:access_modes", "schemename": "dnet:access_modes"}, "instancetype": {"classid": "0001", "classname": "Article", "schemeid": "dnet:publication_resource", "schemename": "dnet:publication_resource"}}], "dateofcollection": "2021-05-07T02:08:22Z", "fulltext": [], "description": [], "format": [], "journal": {"issnPrinted": "0039-6206", "vol": "17", "sp": "304", "name": "Survey of Anesthesiology"}, "measures": [], "coverage": [], "externalReference": [], "publisher": {"value": "Ovid Technologies (Wolters Kluwer Health)"}, "resulttype": {"classid": "publication", "classname": "publication", "schemeid": "dnet:result_typologies", "schemename": "dnet:result_typologies"}, "country": [], "extraInfo": [], "originalId": ["10.1097/00132586-197308000-00003", "50|doiboost____::b0baa0eb88a5788f0b8815560d2a32f2"], "source": [{"value": "Crossref"}], "dateofacceptance": {"value": "2006-11-06T11:36:37Z"}, "title": [{"qualifier": {"classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title"}, "value": "SIGNIFICANCE OF CHRONIC SINUS BRADYCARDIA IN ELDERLY PEOPLE"}]}

@ -0,0 +1 @@
{"context": [], "dataInfo": {"invisible": false, "trust": "0.9", "provenanceaction": {"classid": "sysimport:actionset", "classname": "sysimport:actionset", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "inferred": false, "deletedbyinference": false}, "resourcetype": {"classid": "0001", "classname": "Article", "schemeid": "dnet:publication_resource", "schemename": "dnet:publication_resource"}, "pid": [{"qualifier": {"classid": "doi", "classname": "doi", "schemeid": "dnet:pid_types", "schemename": "dnet:pid_types"}, "value": "10.2143/tvg.62.1.5002364"}], "contributor": [], "bestaccessright": {"classid": "UNKNOWN", "classname": "not available", "schemeid": "dnet:access_modes", "schemename": "dnet:access_modes"}, "relevantdate": [{"qualifier": {"classid": "created", "classname": "created", "schemeid": "dnet:dataCite_date", "schemename": "dnet:dataCite_date"}, "value": "2007-08-20T08:35:04Z"}, {"qualifier": {"classid": "published-online", "classname": "published-online", "schemeid": "dnet:dataCite_date", "schemename": "dnet:dataCite_date"}, "value": "2006-01-01"}], "collectedfrom": [{"key": "10|openaire____::081b82f96300b6a6e3d282bad31cb6e2", "value": "Crossref"}], "id": "50|doi_________::4972b0ca81b96b225aed8038bb965656", "subject": [{"qualifier": {"classid": "keywords", "classname": "keywords", "schemeid": "dnet:subject_classification_typologies", "schemename": "dnet:subject_classification_typologies"}, "value": "General Medicine"}], "lastupdatetimestamp": 1620381522840, "author": [{"fullname": "null VERHAMME P", "surname": "VERHAMME P", "rank": 1}], "instance": [{"hostedby": {"key": "10|issn__online::7ec728ad1ac65c60cd563a5137111125", "value": "Tijdschrift voor Geneeskunde"}, "url": ["https://doi.org/10.2143/tvg.62.1.5002364"], "pid": [{"qualifier": {"classid": "doi", "classname": "doi", "schemeid": "dnet:pid_types", "schemename": "dnet:pid_types"}, "value": "10.2143/tvg.62.1.5002364"}], "dateofacceptance": {"value": "2006-01-01"}, "collectedfrom": {"key": "10|openaire____::081b82f96300b6a6e3d282bad31cb6e2", "value": "Crossref"}, "accessright": {"classid": "UNKNOWN", "classname": "not available", "schemeid": "dnet:access_modes", "schemename": "dnet:access_modes"}, "instancetype": {"classid": "0001", "classname": "Article", "schemeid": "dnet:publication_resource", "schemename": "dnet:publication_resource"}}], "dateofcollection": "2021-05-07T09:58:42Z", "fulltext": [], "description": [], "format": [], "journal": {"vol": "62", "sp": "55", "issnOnline": "0371-683X", "ep": "61", "name": "Tijdschrift voor Geneeskunde"}, "measures": [], "coverage": [], "externalReference": [], "publisher": {"value": "Peeters Publishers"}, "resulttype": {"classid": "publication", "classname": "publication", "schemeid": "dnet:result_typologies", "schemename": "dnet:result_typologies"}, "country": [], "extraInfo": [], "originalId": ["10.2143/tvg.62.1.5002364", "50|doiboost____::4972b0ca81b96b225aed8038bb965656"], "source": [{"value": "Crossref"}], "dateofacceptance": {"value": "2006-01-01"}, "title": [{"qualifier": {"classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title"}, "value": "Antitrombotica: nieuwe moleculen"}]}

@ -1,10 +1,10 @@
{"relType":"resultResult","subRelType":"citation","relClass":"cites","source":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556","target":"50|openaire____::007a4870b31056f89b768cf508e1538e"}
{"relType":"resultResult","subRelType":"citation","relClass":"isCitedBy","source":"50|openaire____::007a4870b31056f89b768cf508e1538e","target":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556"}
{"relType":"resultResult","subRelType":"supplement","relClass":"isSupplementTo","source":"50|openaire____::007a4870b31056f89b768cf508e1538e","target":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556"}
{"relType":"resultResult","subRelType":"supplement","relClass":"isSupplementedBy","source":"50|openaire____::007a4870b31056f89b768cf508e1538e","target":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556"}
{"relType":"resultResult","subRelType":"part","relClass":"isPartOf","source":"50|openaire____::007a4870b31056f89b768cf508e1538e","target":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556"}
{"relType":"resultResult","subRelType":"part","relClass":"hasPart","source":"50|openaire____::007a4870b31056f89b768cf508e1538e","target":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556"}
{"relType":"resultResult","subRelType":"review","relClass":"isReviewedBy","source":"50|openaire____::007a4870b31056f89b768cf508e1538e","target":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556"}
{"relType":"resultResult","subRelType":"review","relClass":"reviews","source":"50|openaire____::007a4870b31056f89b768cf508e1538e","target":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556"}
{"relType":"resultResult","subRelType":"relationship","relClass":"isRelatedTo","source":"50|openaire____::007a4870b31056f89b768cf508e1538e","target":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556"}
{"relType":"resultResult","subRelType":"publicationDataset","relClass":"isRelatedTo","source":"50|openaire____::007a4870b31056f89b768cf508e1538e","target":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556"}
{"relType":"resultResult","subRelType":"citation","relClass":"cites","source":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556","target":"50|openaire____::007a4870b31056f89b768cf508e1538e","dataInfo": {"provenanceaction": {"classid": "iis", "classname": "erroneous label to be cleaned","schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}}}
{"relType":"resultResult","subRelType":"citation","relClass":"isCitedBy","source":"50|openaire____::007a4870b31056f89b768cf508e1538e","target":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556","dataInfo": {"provenanceaction": {"classid": "iis", "classname": "erroneous label to be cleaned","schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}}}
{"relType":"resultResult","subRelType":"supplement","relClass":"isSupplementTo","source":"50|openaire____::007a4870b31056f89b768cf508e1538e","target":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556","dataInfo": {"provenanceaction": {"classid": "iis", "classname": "erroneous label to be cleaned","schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}}}
{"relType":"resultResult","subRelType":"supplement","relClass":"isSupplementedBy","source":"50|openaire____::007a4870b31056f89b768cf508e1538e","target":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556","dataInfo": {"provenanceaction": {"classid": "iis", "classname": "erroneous label to be cleaned","schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}}}
{"relType":"resultResult","subRelType":"part","relClass":"isPartOf","source":"50|openaire____::007a4870b31056f89b768cf508e1538e","target":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556","dataInfo": {"provenanceaction": {"classid": "iis", "classname": "erroneous label to be cleaned","schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}}}
{"relType":"resultResult","subRelType":"part","relClass":"hasPart","source":"50|openaire____::007a4870b31056f89b768cf508e1538e","target":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556","dataInfo": {"provenanceaction": {"classid": "iis", "classname": "erroneous label to be cleaned","schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}}}
{"relType":"resultResult","subRelType":"review","relClass":"isReviewedBy","source":"50|openaire____::007a4870b31056f89b768cf508e1538e","target":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556","dataInfo": {"provenanceaction": {"classid": "iis", "classname": "erroneous label to be cleaned","schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}}}
{"relType":"resultResult","subRelType":"review","relClass":"reviews","source":"50|openaire____::007a4870b31056f89b768cf508e1538e","target":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556","dataInfo": {"provenanceaction": {"classid": "iis", "classname": "erroneous label to be cleaned","schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}}}
{"relType":"resultResult","subRelType":"relationship","relClass":"isRelatedTo","source":"50|openaire____::007a4870b31056f89b768cf508e1538e","target":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556","dataInfo": {"provenanceaction": {"classid": "iis", "classname": "erroneous label to be cleaned","schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}}}
{"relType":"resultResult","subRelType":"publicationDataset","relClass":"isRelatedTo","source":"50|openaire____::007a4870b31056f89b768cf508e1538e","target":"50|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556","dataInfo": {"provenanceaction": {"classid": "iis", "classname": "erroneous label to be cleaned","schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}}}

@ -1241,4 +1241,5 @@ dnet:relation_relClass @=@ Reviews @=@ reviews
dnet:relation_relClass @=@ IsSupplementTo @=@ isSupplementTo
dnet:relation_relClass @=@ IsSupplementedBy @=@ isSupplementedBy
dnet:relation_relClass @=@ IsRelatedTo @=@ isRelatedTo
dnet:relation_subRelType @=@ relationship @=@ publicationDataset
dnet:relation_subRelType @=@ relationship @=@ publicationDataset
dnet:provenanceActions @=@ iis @=@ erroneous label to be cleaned

@ -8,7 +8,7 @@ join result_instance ri on ri.id = p.id
join datasource on datasource.id = ri.hostedby
where datasource.type like '%Repository%'
and (ri.accessright = 'Open Access'
or ri.accessright = 'Embargo')) tmp
or ri.accessright = 'Embargo' or ri.accessright = 'Open Source')) tmp
on p.id= tmp.id;
create table indi_pub_grey_lit stored as parquet as
@ -41,178 +41,178 @@ join datasource on datasource.id = ri.hostedby
where datasource.id like '%doajarticles%') tmp
on p.id= tmp.id;
create table indi_project_pubs_count stored as parquet as
select pr.id id, count(p.id) total_pubs from project_results pr
join publication p on p.id=pr.result
group by pr.id;
create table indi_project_datasets_count stored as parquet as
select pr.id id, count(d.id) total_datasets from project_results pr
join dataset d on d.id=pr.result
group by pr.id;
create table indi_project_software_count stored as parquet as
select pr.id id, count(s.id) total_software from project_results pr
join software s on s.id=pr.result
group by pr.id;
create table indi_project_otherresearch_count stored as parquet as
select pr.id id, count(o.id) total_other from project_results pr
join otherresearchproduct o on o.id=pr.result
group by pr.id;
create table indi_pub_avg_year_country_oa stored as parquet as
select year, country, round(OpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageOA,
round(NonOpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageNonOA
from
(SELECT year, country, SUM(CASE
WHEN bestlicence='Open Access' THEN 1
ELSE 0
END) AS OpenAccess, SUM(CASE
WHEN bestlicence<>'Open Access' THEN 1
ELSE 0
END) AS NonOpenAccess
FROM publication p
join result_organization ro on p.id=ro.id
join organization o on o.id=ro.organization
where cast(year as int)>=2003 and cast(year as int)<=2021
group by year, country) tmp;
create table indi_dataset_avg_year_country_oa stored as parquet as
select year, country, round(OpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageOA,
round(NonOpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageNonOA
from
(SELECT year, country, SUM(CASE
WHEN bestlicence='Open Access' THEN 1
ELSE 0
END) AS OpenAccess, SUM(CASE
WHEN bestlicence<>'Open Access' THEN 1
ELSE 0
END) AS NonOpenAccess
FROM dataset d
join result_organization ro on d.id=ro.id
join organization o on o.id=ro.organization
where cast(year as int)>=2003 and cast(year as int)<=2021
group by year, country) tmp;
create table indi_software_avg_year_country_oa stored as parquet as
select year, country, round(OpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageOA,
round(NonOpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageNonOA
from
(SELECT year, country, SUM(CASE
WHEN bestlicence='Open Access' THEN 1
ELSE 0
END) AS OpenAccess, SUM(CASE
WHEN bestlicence<>'Open Access' THEN 1
ELSE 0
END) AS NonOpenAccess
FROM software s
join result_organization ro on s.id=ro.id
join organization o on o.id=ro.organization
where cast(year as int)>=2003 and cast(year as int)<=2021
group by year, country) tmp;
create table indi_other_avg_year_country_oa stored as parquet as
select year, country, round(OpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageOA,
round(NonOpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageNonOA
from
(SELECT year, country, SUM(CASE
WHEN bestlicence='Open Access' THEN 1
ELSE 0
END) AS OpenAccess, SUM(CASE
WHEN bestlicence<>'Open Access' THEN 1
ELSE 0
END) AS NonOpenAccess
FROM otherresearchproduct orp
join result_organization ro on orp.id=ro.id
join organization o on o.id=ro.organization
where cast(year as int)>=2003 and cast(year as int)<=2021
group by year, country) tmp;
create table indi_pub_avg_year_context_oa stored as parquet as
with total as
(select count(distinct pc.id) no_of_pubs, year, c.name name, sum(count(distinct pc.id)) over(PARTITION by year) as total from publication_concepts pc
join context c on pc.concept like concat('%',c.id,'%')
join publication p on p.id=pc.id
where cast(year as int)>=2003 and cast(year as int)<=2021
group by c.name, year )
select year, name, round(no_of_pubs/total*100,3) averageofpubs
from total;
create table indi_dataset_avg_year_context_oa stored as parquet as
with total as
(select count(distinct pc.id) no_of_pubs, year, c.name name, sum(count(distinct pc.id)) over(PARTITION by year) as total from dataset_concepts pc
join context c on pc.concept like concat('%',c.id,'%')
join dataset p on p.id=pc.id
where cast(year as int)>=2003 and cast(year as int)<=2021
group by c.name, year )
select year, name, round(no_of_pubs/total*100,3) averageofdataset
from total;
create table indi_software_avg_year_context_oa stored as parquet as
with total as
(select count(distinct pc.id) no_of_pubs, year, c.name name, sum(count(distinct pc.id)) over(PARTITION by year) as total from software_concepts pc
join context c on pc.concept like concat('%',c.id,'%')
join software p on p.id=pc.id
where cast(year as int)>=2003 and cast(year as int)<=2021
group by c.name, year )
select year, name, round(no_of_pubs/total*100,3) averageofsoftware
from total;
create table indi_other_avg_year_context_oa stored as parquet as
with total as
(select count(distinct pc.id) no_of_pubs, year, c.name name, sum(count(distinct pc.id)) over(PARTITION by year) as total from otherresearchproduct_concepts pc
join context c on pc.concept like concat('%',c.id,'%')
join otherresearchproduct p on p.id=pc.id
where cast(year as int)>=2003 and cast(year as int)<=2021
group by c.name, year )
select year, name, round(no_of_pubs/total*100,3) averageofother
from total;
create table indi_other_avg_year_content_oa stored as parquet as
with total as
(select count(distinct pd.id) no_of_pubs, year, d.type type, sum(count(distinct pd.id)) over(PARTITION by year) as total
from otherresearchproduct_datasources pd
join datasource d on datasource=d.id
join otherresearchproduct p on p.id=pd.id
where cast(year as int)>=2003 and cast(year as int)<=2021
group by d.type, year)
select year, type, round(no_of_pubs/total*100,3) averageOfOtherresearchproduct
from total;
create table indi_software_avg_year_content_oa stored as parquet as
with total as
(select count(distinct pd.id) no_of_pubs, year, d.type type, sum(count(distinct pd.id)) over(PARTITION by year) as total
from software_datasources pd
join datasource d on datasource=d.id
join software p on p.id=pd.id
where cast(year as int)>=2003 and cast(year as int)<=2021
group by d.type, year)
select year, type, round(no_of_pubs/total*100,3) averageOfSoftware
from total;
create table indi_dataset_avg_year_content_oa stored as parquet as
with total as
(select count(distinct pd.id) no_of_pubs, year, d.type type, sum(count(distinct pd.id)) over(PARTITION by year) as total
from dataset_datasources pd
join datasource d on datasource=d.id
join dataset p on p.id=pd.id
where cast(year as int)>=2003 and cast(year as int)<=2021
group by d.type, year)
select year, type, round(no_of_pubs/total*100,3) averageOfDatasets
from total;
create table indi_pub_avg_year_content_oa stored as parquet as
with total as
(select count(distinct pd.id) no_of_pubs, year, d.type type, sum(count(distinct pd.id)) over(PARTITION by year) as total
from publication_datasources pd
join datasource d on datasource=d.id
join publication p on p.id=pd.id
where cast(year as int)>=2003 and cast(year as int)<=2021
group by d.type, year)
select year, type, round(no_of_pubs/total*100,3) averageOfPubs
from total;
--create table indi_project_pubs_count stored as parquet as
--select pr.id id, count(p.id) total_pubs from project_results pr
--join publication p on p.id=pr.result
--group by pr.id;
--create table indi_project_datasets_count stored as parquet as
--select pr.id id, count(d.id) total_datasets from project_results pr
--join dataset d on d.id=pr.result
--group by pr.id;
--create table indi_project_software_count stored as parquet as
--select pr.id id, count(s.id) total_software from project_results pr
--join software s on s.id=pr.result
--group by pr.id;
--create table indi_project_otherresearch_count stored as parquet as
--select pr.id id, count(o.id) total_other from project_results pr
--join otherresearchproduct o on o.id=pr.result
--group by pr.id;
--create table indi_pub_avg_year_country_oa stored as parquet as
--select year, country, round(OpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageOA,
--round(NonOpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageNonOA
--from
--(SELECT year, country, SUM(CASE
--WHEN bestlicence='Open Access' THEN 1
--ELSE 0
--END) AS OpenAccess, SUM(CASE
--WHEN bestlicence<>'Open Access' THEN 1
--ELSE 0
--END) AS NonOpenAccess
--FROM publication p
--join result_organization ro on p.id=ro.id
--join organization o on o.id=ro.organization
--where cast(year as int)>=2003 and cast(year as int)<=2021
--group by year, country) tmp;
--create table indi_dataset_avg_year_country_oa stored as parquet as
--select year, country, round(OpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageOA,
--round(NonOpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageNonOA
--from
--(SELECT year, country, SUM(CASE
--WHEN bestlicence='Open Access' THEN 1
--ELSE 0
--END) AS OpenAccess, SUM(CASE
--WHEN bestlicence<>'Open Access' THEN 1
--ELSE 0
--END) AS NonOpenAccess
--FROM dataset d
--join result_organization ro on d.id=ro.id
--join organization o on o.id=ro.organization
--where cast(year as int)>=2003 and cast(year as int)<=2021
--group by year, country) tmp;
--create table indi_software_avg_year_country_oa stored as parquet as
--select year, country, round(OpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageOA,
--round(NonOpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageNonOA
--from
-- (SELECT year, country, SUM(CASE
--WHEN bestlicence='Open Access' THEN 1
-- ELSE 0
--END) AS OpenAccess, SUM(CASE
-- WHEN bestlicence<>'Open Access' THEN 1
-- ELSE 0
-- END) AS NonOpenAccess
-- FROM software s
-- join result_organization ro on s.id=ro.id
-- join organization o on o.id=ro.organization
-- where cast(year as int)>=2003 and cast(year as int)<=2021
-- group by year, country) tmp;
--create table indi_other_avg_year_country_oa stored as parquet as
--select year, country, round(OpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageOA,
--round(NonOpenAccess/(OpenAccess+NonOpenAccess)*100,3) as averageNonOA
-- from
-- (SELECT year, country, SUM(CASE
-- WHEN bestlicence='Open Access' THEN 1
-- ELSE 0
-- END) AS OpenAccess, SUM(CASE
-- WHEN bestlicence<>'Open Access' THEN 1
-- ELSE 0
-- END) AS NonOpenAccess
-- FROM otherresearchproduct orp
-- join result_organization ro on orp.id=ro.id
-- join organization o on o.id=ro.organization
-- where cast(year as int)>=2003 and cast(year as int)<=2021
-- group by year, country) tmp;
--create table indi_pub_avg_year_context_oa stored as parquet as
--with total as
--(select count(distinct pc.id) no_of_pubs, year, c.name name, sum(count(distinct pc.id)) over(PARTITION by year) as total from publication_concepts pc
--join context c on pc.concept like concat('%',c.id,'%')
--join publication p on p.id=pc.id
--where cast(year as int)>=2003 and cast(year as int)<=2021
--group by c.name, year )
--select year, name, round(no_of_pubs/total*100,3) averageofpubs
--from total;
--create table indi_dataset_avg_year_context_oa stored as parquet as
--with total as
--(select count(distinct pc.id) no_of_pubs, year, c.name name, sum(count(distinct pc.id)) over(PARTITION by year) as total from dataset_concepts pc
--join context c on pc.concept like concat('%',c.id,'%')
--join dataset p on p.id=pc.id
--where cast(year as int)>=2003 and cast(year as int)<=2021
--group by c.name, year )
--select year, name, round(no_of_pubs/total*100,3) averageofdataset
--from total;
--create table indi_software_avg_year_context_oa stored as parquet as
--with total as
--(select count(distinct pc.id) no_of_pubs, year, c.name name, sum(count(distinct pc.id)) over(PARTITION by year) as total from software_concepts pc
--join context c on pc.concept like concat('%',c.id,'%')
--join software p on p.id=pc.id
--where cast(year as int)>=2003 and cast(year as int)<=2021
--group by c.name, year )
--select year, name, round(no_of_pubs/total*100,3) averageofsoftware
--from total;
--create table indi_other_avg_year_context_oa stored as parquet as
--with total as
--(select count(distinct pc.id) no_of_pubs, year, c.name name, sum(count(distinct pc.id)) over(PARTITION by year) as total from otherresearchproduct_concepts pc
--join context c on pc.concept like concat('%',c.id,'%')
--join otherresearchproduct p on p.id=pc.id
--where cast(year as int)>=2003 and cast(year as int)<=2021
--group by c.name, year )
--select year, name, round(no_of_pubs/total*100,3) averageofother
--from total;
--create table indi_other_avg_year_content_oa stored as parquet as
--with total as
--(select count(distinct pd.id) no_of_pubs, year, d.type type, sum(count(distinct pd.id)) over(PARTITION by year) as total
--from otherresearchproduct_datasources pd
--join datasource d on datasource=d.id
--join otherresearchproduct p on p.id=pd.id
--where cast(year as int)>=2003 and cast(year as int)<=2021
--group by d.type, year)
--select year, type, round(no_of_pubs/total*100,3) averageOfOtherresearchproduct
--from total;
--create table indi_software_avg_year_content_oa stored as parquet as
--with total as
--(select count(distinct pd.id) no_of_pubs, year, d.type type, sum(count(distinct pd.id)) over(PARTITION by year) as total
--from software_datasources pd
--join datasource d on datasource=d.id
--join software p on p.id=pd.id
--where cast(year as int)>=2003 and cast(year as int)<=2021
--group by d.type, year)
--select year, type, round(no_of_pubs/total*100,3) averageOfSoftware
--from total;
--create table indi_dataset_avg_year_content_oa stored as parquet as
--with total as
--(select count(distinct pd.id) no_of_pubs, year, d.type type, sum(count(distinct pd.id)) over(PARTITION by year) as total
--from dataset_datasources pd
--join datasource d on datasource=d.id
--join dataset p on p.id=pd.id
--where cast(year as int)>=2003 and cast(year as int)<=2021
--group by d.type, year)
--select year, type, round(no_of_pubs/total*100,3) averageOfDatasets
--from total;
--create table indi_pub_avg_year_content_oa stored as parquet as
--with total as
--(select count(distinct pd.id) no_of_pubs, year, d.type type, sum(count(distinct pd.id)) over(PARTITION by year) as total
--from publication_datasources pd
--join datasource d on datasource=d.id
--join publication p on p.id=pd.id
--where cast(year as int)>=2003 and cast(year as int)<=2021
--group by d.type, year)
--select year, type, round(no_of_pubs/total*100,3) averageOfPubs
--from total;
create table indi_pub_has_cc_licence stored as parquet as
select distinct p.id, (case when lic='' or lic is null then 0 else 1 end) as has_cc_license
@ -231,11 +231,40 @@ join publication_licenses as license on license.id = p.id
WHERE lower(parse_url(license.type, 'HOST')) = 'creativecommons.org') tmp
on p.id= tmp.id;
-- EOSC-TR1.1-02M:
-- ## Indicator: has_cc_license. Creative Commons licensing has become a
-- de facto standard in scholarly communication and is promoted by many initiatives
-- like Plan S. This indicator might be only useful when applied
-- to openly available publications.
--create table indi_pub_has_cc_licence_tr stored as parquet as
--select distinct p.id, case when lic='' or lic is null then 0 else 1 end as has_cc_license_tr
--from publication p
--left outer join (select p.id, license.type as lic from publication p
--join publication_licenses as license on license.id = p.id
--where lower(license.type) LIKE '%creativecommons.org%' OR lower(license.type) LIKE '%cc-%') tmp
--on p.id= tmp.id
-- #EOSC-F2-01M_cc Rich metadata for scholarly publications
-- ## Indicator: has_cc_license. Creative Commons licensing has become a
-- de facto standard in scholarly communication and is promoted by many initiatives
-- like Plan S. This indicator might be only useful when applied
-- to openly available publications.
-- Same indicator as EOSC-TR1.1-02M (Najko's instructions)
-- create table indi_pub_has_cc_licence_f stored as parquet as
-- select
-- distinct p.id, case when lic='' or lic is null then 0 else 1 end as has_cc_license_f
-- from publication p
-- left outer join (selectp.id,license.type as lic from publication p
-- join publication_licenses as license on license.id = p.id
-- where lower(license.type) LIKE '%creativecommons.org%' OR lower(license.type) LIKE '%cc-%') tmp
-- on p.id= tmp.id
create table indi_pub_has_abstract stored as parquet as
select distinct publication.id, coalesce(abstract, 1) has_abstract
from publication;
create table indi_with_orcid stored as parquet as
create table indi_result_with_orcid stored as parquet as
select distinct r.id, coalesce(has_orcid, 0) as has_orcid
from result r
left outer join (select id, 1 as has_orcid from result_orcid) tmp
@ -270,13 +299,64 @@ join tmp as o2 on o1.result=o2.result
where o1.id<>o2.id
group by o1.id, o2.id, o1.type
create table indi_result_org_country_collab stored as parquet as
with tmp as
(select o.id as id, o.country , ro.id as result,r.type from organization o
join result_organization ro on o.id=ro.organization
join result r on r.id=ro.id where o.country <> 'UNKNOWN')
select o1.id org1,o2.country country2, o1.type, count(distinct o1.result) as collaborations
from tmp as o1
join tmp as o2 on o1.result=o2.result
where o1.id<>o2.id and o1.country<>o2.country
group by o1.id, o1.type,o2.country
create table indi_funder_country_collab stored as parquet as
with tmp as (select funder, project, country from organization_projects op
join organization o on o.id=op.id
join project p on p.id=op.project
where country <> 'UNKNOWN')
select f1.funder, f1.country, f2.country, count(distinct f1.project) as collaborations
from tmp as f1
join tmp as f2 on f1.project=f2.project
where f1.country<>f2.country
group by f1.funder, f2.country, f1.country
create table indi_pub_diamond stored as parquet as
select distinct pd.id, coalesce(in_diamond_journal, 0) as in_diamond_journal
from publication_datasources pd
left outer join (
select pd.id, 1 as in_diamond_journal from publication_datasources pd
join datasource d on d.id=pd.datasource
join stats_ext.plan_s_jn ps where (ps.issn_print=d.issn_printed and ps.issn_online=d.issn_online)
and (ps.journal_is_in_doaj=true or ps.journal_is_oa=true) and ps.has_apc=false) tmp
on pd.id=tmp.id
create table indi_pub_hybrid stored as parquet as
select distinct pd.id, coalesce(is_hybrid, 0) as is_hybrid
from publication_datasources pd
left outer join (
select pd.id, 1 as is_hybrid from publication_datasources pd
join datasource d on d.id=pd.datasource
join stats_ext.plan_s_jn ps where (ps.issn_print=d.issn_printed and ps.issn_online=d.issn_online)
and (ps.journal_is_in_doaj=false and ps.journal_is_oa=false)) tmp
on pd.id=tmp.id
create table indi_is_gold_oa stored as parquet as
(select distinct pd.id, coalesce(gold_oa, 0) as gold_oa
from publication_datasources pd
left outer join (
select pd.id, 1 as gold_oa from publication_datasources pd
join datasource d on d.id=pd.datasource
join stats_ext.plan_s_jn ps on (ps.issn_print=d.issn_printed or ps.issn_online=d.issn_online)
where ps.journal_is_in_doaj is true or ps.journal_is_oa is true) tmp
on pd.id=tmp.id)
create table indi_pub_in_transformative stored as parquet as
select distinct pd.id, coalesce(is_transformative, 0) as is_transformative
from publication pd
left outer join (
select pd.id, 1 as is_transformative from publication_datasources pd
join datasource d on d.id=pd.datasource
join stats_ext.plan_s_jn ps where (ps.issn_print=d.issn_printed and ps.issn_online=d.issn_online)
and ps.is_transformative_journal=true) tmp
on pd.id=tmp.id
create table indi_pub_closed_other_open stored as parquet as
select distinct ri.id, coalesce(pub_closed_other_open, 0) as pub_closed_other_open from result_instance ri
left outer join
(select ri.id, 1 as pub_closed_other_open from result_instance ri
join publication p on p.id=ri.id
join datasource d on ri.hostedby=d.id
where d.type like '%Journal%' and ri.accessright='Closed Access' and
(p.bestlicence='Open Access' or p.bestlicence='Open Source')) tmp
on tmp.id=ri.id

@ -105,23 +105,6 @@ create table TARGET.project_results stored as parquet as select id as result, pr
compute stats TARGET.project_results;
-- indicators
create view TARGET.indi_dataset_avg_year_content_oa as select * from SOURCE.indi_dataset_avg_year_content_oa orig;
create view TARGET.indi_dataset_avg_year_context_oa as select * from SOURCE.indi_dataset_avg_year_context_oa orig;
create view TARGET.indi_dataset_avg_year_country_oa as select * from SOURCE.indi_dataset_avg_year_country_oa orig;
create view TARGET.indi_other_avg_year_content_oa as select * from SOURCE.indi_other_avg_year_content_oa orig;
create view TARGET.indi_other_avg_year_context_oa as select * from SOURCE.indi_other_avg_year_context_oa orig;
create view TARGET.indi_other_avg_year_country_oa as select * from SOURCE.indi_other_avg_year_country_oa orig;
create view TARGET.indi_project_datasets_count as select * from SOURCE.indi_project_datasets_count orig;
create view TARGET.indi_project_otherresearch_count as select * from SOURCE.indi_project_otherresearch_count orig;
create view TARGET.indi_project_pubs_count as select * from SOURCE.indi_project_pubs_count orig;
create view TARGET.indi_project_software_count as select * from SOURCE.indi_project_software_count orig;
create view TARGET.indi_pub_avg_year_content_oa as select * from SOURCE.indi_pub_avg_year_content_oa orig;
create view TARGET.indi_pub_avg_year_context_oa as select * from SOURCE.indi_pub_avg_year_context_oa orig;
create view TARGET.indi_pub_avg_year_country_oa as select * from SOURCE.indi_pub_avg_year_country_oa orig;
create table TARGET.indi_pub_green_oa stored as parquet as select * from SOURCE.indi_pub_green_oa orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_pub_green_oa;
create table TARGET.indi_pub_grey_lit stored as parquet as select * from SOURCE.indi_pub_grey_lit orig where exists (select 1 from TARGET.result r where r.id=orig.id);
@ -137,9 +120,20 @@ compute stats TARGET.indi_pub_has_cc_licence;
create table TARGET.indi_pub_has_cc_licence_url stored as parquet as select * from SOURCE.indi_pub_has_cc_licence_url orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_pub_has_cc_licence_url;
create view TARGET.indi_software_avg_year_content_oa as select * from SOURCE.indi_software_avg_year_content_oa orig;
create view TARGET.indi_software_avg_year_context_oa as select * from SOURCE.indi_software_avg_year_context_oa orig;
create view TARGET.indi_software_avg_year_country_oa as select * from SOURCE.indi_software_avg_year_country_oa orig;
create view TARGET.indi_funder_country_collab stored as select * from SOURCE.indi_funder_country_collab;
create table TARGET.indi_result_with_orcid stored as parquet as select * from SOURCE.indi_result_with_orcid orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_result_with_orcid;
create table TARGET.indi_funded_result_with_fundref stored as parquet as select * from SOURCE.indi_funded_result_with_fundref orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_funded_result_with_fundref;
create table TARGET.indi_pub_diamond stored as parquet as select * from SOURCE.indi_pub_diamond orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_pub_diamond;
create table TARGET.indi_pub_hybrid stored as parquet as select * from SOURCE.indi_pub_hybrid orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_pub_hybrid;
create table TARGET.indi_pub_in_transformative stored as parquet as select * from SOURCE.indi_pub_in_transformative orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_pub_in_transformative;
create table TARGET.indi_pub_closed_other_open stored as parquet as select * from SOURCE.indi_pub_closed_other_open orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_pub_closed_other_open;
--denorm
alter table TARGET.result rename to TARGET.res_tmp;

@ -15,6 +15,13 @@
<description>This module is the container for the oozie workflow definitions in dnet-hadoop project</description>
<distributionManagement>
<site>
<id>DHPSite</id>
<url>${dhp.site.stage.path}/dhp-workflows</url>
</site>
</distributionManagement>
<modules>
<module>dhp-workflow-profiles</module>
<module>dhp-aggregation</module>

@ -0,0 +1,25 @@
<?xml version="1.0" encoding="ISO-8859-1"?>
<project xmlns="http://maven.apache.org/DECORATION/1.8.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/DECORATION/1.8.0 https://maven.apache.org/xsd/decoration-1.8.0.xsd"
name="DHP-Aggregation">
<skin>
<groupId>org.apache.maven.skins</groupId>
<artifactId>maven-fluido-skin</artifactId>
<version>1.8</version>
</skin>
<poweredBy>
<logo name="OpenAIRE Research Graph" href="https://graph.openaire.eu/"
img="https://graph.openaire.eu/assets/common-assets/logo-large-graph.png"/>
</poweredBy>
<body>
<links>
<item name="Code" href="https://code-repo.d4science.org/" />
</links>
<menu name="APIDocs">
<item name="JavaDoc" href="apidocs/" />
<item name="ScalaDoc" href="scaladocs/" />
</menu>
<menu ref="modules" />
<menu ref="reports"/>
</body>
</project>

@ -719,6 +719,10 @@
<id>dnet45-releases</id>
<url>https://maven.d4science.org/nexus/content/repositories/dnet45-releases</url>
</repository>
<site>
<id>DHPSite</id>
<url>${dhp.site.stage.path}/</url>
</site>
</distributionManagement>
<reporting>
<plugins>
@ -734,6 +738,7 @@
</reporting>
<properties>
<dhp.site.stage.path>sftp://dnet-hadoop@static-web.d4science.org/dnet-hadoop</dhp.site.stage.path>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.plugin.version>3.6.0</maven.compiler.plugin.version>

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="ISO-8859-1"?>
<project xmlns="http://maven.apache.org/DECORATION/1.8.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/DECORATION/1.8.0 https://maven.apache.org/xsd/decoration-1.8.0.xsd"
name="DHP-Aggregation">
<skin>
<groupId>org.apache.maven.skins</groupId>
<artifactId>maven-fluido-skin</artifactId>
<version>1.8</version>
</skin>
<poweredBy>
<logo name="OpenAIRE Research Graph" href="https://graph.openaire.eu/"
img="https://graph.openaire.eu/assets/common-assets/logo-large-graph.png"/>
</poweredBy>
<body>
<links>
<item name="Code" href="https://code-repo.d4science.org/" />
</links>
<menu ref="modules" />
<menu ref="reports"/>
</body>
</project>
Loading…
Cancel
Save