merge with master

This commit is contained in:
Miriam Baglioni 2021-07-27 12:34:59 +02:00
commit 35e395eae8
36 changed files with 548 additions and 754 deletions

View File

@ -98,7 +98,7 @@ public class GraphCleaningFunctions extends CleaningFunctions {
Result r = (Result) value; Result r = (Result) value;
if (Objects.nonNull(r.getTitle()) && r.getTitle().isEmpty()) { if (Objects.isNull(r.getTitle()) || r.getTitle().isEmpty()) {
return false; return false;
} }

View File

@ -532,11 +532,11 @@ object DataciteToOAFTransformation {
JField("awardUri", JString(awardUri)) <- fundingReferences JField("awardUri", JString(awardUri)) <- fundingReferences
} yield awardUri } yield awardUri
result.setId(IdentifierFactory.createIdentifier(result))
var relations: List[Relation] = awardUris.flatMap(a => get_projectRelation(a, result.getId)).filter(r => r != null) var relations: List[Relation] = awardUris.flatMap(a => get_projectRelation(a, result.getId)).filter(r => r != null)
fix_figshare(result) fix_figshare(result)
result.setId(IdentifierFactory.createIdentifier(result))
if (result.getId == null) if (result.getId == null)
return List() return List()

View File

@ -4,6 +4,10 @@
<name>mainPath</name> <name>mainPath</name>
<description>the working path of Datacite stores</description> <description>the working path of Datacite stores</description>
</property> </property>
<property>
<name>oafTargetPath</name>
<description>the target path where the OAF records are stored</description>
</property>
<property> <property>
<name>isLookupUrl</name> <name>isLookupUrl</name>
<description>The IS lookUp service endopoint</description> <description>The IS lookUp service endopoint</description>
@ -56,7 +60,7 @@
<arg>--master</arg><arg>yarn-cluster</arg> <arg>--master</arg><arg>yarn-cluster</arg>
<arg>--blocksize</arg><arg>${blocksize}</arg> <arg>--blocksize</arg><arg>${blocksize}</arg>
</spark> </spark>
<ok to="TransformJob"/> <ok to="TransformDatacite"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
@ -78,7 +82,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${mainPath}/datacite_dump</arg> <arg>--sourcePath</arg><arg>${mainPath}/datacite_dump</arg>
<arg>--targetPath</arg><arg>${mainPath}/datacite_oaf</arg> <arg>--targetPath</arg><arg>${oafTargetPath}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--exportLinks</arg><arg>${exportLinks}</arg> <arg>--exportLinks</arg><arg>${exportLinks}</arg>
<arg>--master</arg><arg>yarn-cluster</arg> <arg>--master</arg><arg>yarn-cluster</arg>

View File

@ -21,7 +21,7 @@ object SparkMapDumpIntoOAF {
val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass) val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
val conf: SparkConf = new SparkConf() val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json"))) val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_crossref_dump_to_oaf_params.json")))
parser.parseArgument(args) parser.parseArgument(args)
val spark: SparkSession = val spark: SparkSession =
SparkSession SparkSession

View File

@ -1,52 +1,19 @@
package eu.dnetlib.doiboost.orcid package eu.dnetlib.doiboost.orcid
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.oa.merge.AuthorMerger
import eu.dnetlib.dhp.schema.oaf.Publication import eu.dnetlib.dhp.schema.oaf.Publication
import eu.dnetlib.dhp.schema.orcid.OrcidDOI
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
object SparkConvertORCIDToOAF { object SparkConvertORCIDToOAF {
val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass) val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass)
def fixORCIDItem(item :ORCIDItem):ORCIDItem = {
new ORCIDItem(item.doi, item.authors.groupBy(_.oid).map(_._2.head).toList)
} def run(spark:SparkSession, workingPath:String, targetPath:String) :Unit = {
def run(spark:SparkSession,sourcePath:String,workingPath:String, targetPath:String):Unit = {
import spark.implicits._
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
import spark.implicits._
val inputRDD:RDD[OrcidAuthor] = spark.sparkContext.textFile(s"$sourcePath/authors").map(s => ORCIDToOAF.convertORCIDAuthor(s)).filter(s => s!= null).filter(s => ORCIDToOAF.authorValid(s))
spark.createDataset(inputRDD).as[OrcidAuthor].write.mode(SaveMode.Overwrite).save(s"$workingPath/author")
val res = spark.sparkContext.textFile(s"$sourcePath/works").flatMap(s => ORCIDToOAF.extractDOIWorks(s)).filter(s => s!= null)
spark.createDataset(res).as[OrcidWork].write.mode(SaveMode.Overwrite).save(s"$workingPath/works")
val authors :Dataset[OrcidAuthor] = spark.read.load(s"$workingPath/author").as[OrcidAuthor]
val works :Dataset[OrcidWork] = spark.read.load(s"$workingPath/works").as[OrcidWork]
works.joinWith(authors, authors("oid").equalTo(works("oid")))
.map(i =>{
val doi = i._1.doi
var author = i._2
(doi, author)
}).groupBy(col("_1").alias("doi"))
.agg(collect_list(col("_2")).alias("authors")).as[ORCIDItem]
.map(s => fixORCIDItem(s))
.write.mode(SaveMode.Overwrite).save(s"$workingPath/orcidworksWithAuthor")
val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem] val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem]
logger.info("Converting ORCID to OAF") logger.info("Converting ORCID to OAF")
@ -55,7 +22,7 @@ object SparkConvertORCIDToOAF {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf() val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json"))) val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_orcid_to_oaf_params.json")))
parser.parseArgument(args) parser.parseArgument(args)
val spark: SparkSession = val spark: SparkSession =
SparkSession SparkSession
@ -65,10 +32,10 @@ object SparkConvertORCIDToOAF {
.master(parser.get("master")).getOrCreate() .master(parser.get("master")).getOrCreate()
val sourcePath = parser.get("sourcePath")
val workingPath = parser.get("workingPath") val workingPath = parser.get("workingPath")
val targetPath = parser.get("targetPath") val targetPath = parser.get("targetPath")
run(spark, sourcePath, workingPath, targetPath)
run(spark,workingPath, targetPath)
} }

View File

@ -0,0 +1,70 @@
package eu.dnetlib.doiboost.orcid
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.oa.merge.AuthorMerger
import eu.dnetlib.dhp.schema.oaf.Publication
import eu.dnetlib.dhp.schema.orcid.OrcidDOI
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
object SparkPreprocessORCID {
val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass)
def fixORCIDItem(item :ORCIDItem):ORCIDItem = {
ORCIDItem(item.doi, item.authors.groupBy(_.oid).map(_._2.head).toList)
}
def run(spark:SparkSession,sourcePath:String,workingPath:String):Unit = {
import spark.implicits._
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
val inputRDD:RDD[OrcidAuthor] = spark.sparkContext.textFile(s"$sourcePath/authors").map(s => ORCIDToOAF.convertORCIDAuthor(s)).filter(s => s!= null).filter(s => ORCIDToOAF.authorValid(s))
spark.createDataset(inputRDD).as[OrcidAuthor].write.mode(SaveMode.Overwrite).save(s"$workingPath/author")
val res = spark.sparkContext.textFile(s"$sourcePath/works").flatMap(s => ORCIDToOAF.extractDOIWorks(s)).filter(s => s!= null)
spark.createDataset(res).as[OrcidWork].write.mode(SaveMode.Overwrite).save(s"$workingPath/works")
val authors :Dataset[OrcidAuthor] = spark.read.load(s"$workingPath/author").as[OrcidAuthor]
val works :Dataset[OrcidWork] = spark.read.load(s"$workingPath/works").as[OrcidWork]
works.joinWith(authors, authors("oid").equalTo(works("oid")))
.map(i =>{
val doi = i._1.doi
val author = i._2
(doi, author)
}).groupBy(col("_1").alias("doi"))
.agg(collect_list(col("_2")).alias("authors")).as[ORCIDItem]
.map(s => fixORCIDItem(s))
.write.mode(SaveMode.Overwrite).save(s"$workingPath/orcidworksWithAuthor")
}
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/preprocess_orcid_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sourcePath = parser.get("sourcePath")
val workingPath = parser.get("workingPath")
run(spark, sourcePath, workingPath)
}
}

View File

@ -4,6 +4,7 @@ package eu.dnetlib.doiboost.orcidnodoi;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
@ -32,10 +33,7 @@ import com.google.gson.JsonParser;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.orcid.AuthorData; import eu.dnetlib.dhp.schema.orcid.*;
import eu.dnetlib.dhp.schema.orcid.AuthorSummary;
import eu.dnetlib.dhp.schema.orcid.Work;
import eu.dnetlib.dhp.schema.orcid.WorkDetail;
import eu.dnetlib.doiboost.orcid.json.JsonHelper; import eu.dnetlib.doiboost.orcid.json.JsonHelper;
import eu.dnetlib.doiboost.orcid.util.HDFSUtil; import eu.dnetlib.doiboost.orcid.util.HDFSUtil;
import eu.dnetlib.doiboost.orcidnodoi.oaf.PublicationToOaf; import eu.dnetlib.doiboost.orcidnodoi.oaf.PublicationToOaf;
@ -111,6 +109,10 @@ public class SparkGenEnrichedOrcidWorks {
Encoders.bean(WorkDetail.class)); Encoders.bean(WorkDetail.class));
logger.info("Works data loaded: " + workDataset.count()); logger.info("Works data loaded: " + workDataset.count());
final LongAccumulator warnNotFoundContributors = spark
.sparkContext()
.longAccumulator("warnNotFoundContributors");
JavaRDD<Tuple2<String, String>> enrichedWorksRDD = workDataset JavaRDD<Tuple2<String, String>> enrichedWorksRDD = workDataset
.joinWith( .joinWith(
authorDataset, authorDataset,
@ -119,7 +121,21 @@ public class SparkGenEnrichedOrcidWorks {
(MapFunction<Tuple2<WorkDetail, AuthorData>, Tuple2<String, String>>) value -> { (MapFunction<Tuple2<WorkDetail, AuthorData>, Tuple2<String, String>>) value -> {
WorkDetail w = value._1; WorkDetail w = value._1;
AuthorData a = value._2; AuthorData a = value._2;
AuthorMatcher.match(a, w.getContributors()); if (w.getContributors() == null
|| (w.getContributors() != null && w.getContributors().size() == 0)) {
Contributor c = new Contributor();
c.setName(a.getName());
c.setSurname(a.getSurname());
c.setCreditName(a.getCreditName());
c.setOid(a.getOid());
List<Contributor> contributors = Arrays.asList(c);
w.setContributors(contributors);
if (warnNotFoundContributors != null) {
warnNotFoundContributors.add(1);
}
} else {
AuthorMatcher.match(a, w.getContributors());
}
return new Tuple2<>(a.getOid(), JsonHelper.createOidWork(w)); return new Tuple2<>(a.getOid(), JsonHelper.createOidWork(w));
}, },
Encoders.tuple(Encoders.STRING(), Encoders.STRING())) Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
@ -172,7 +188,7 @@ public class SparkGenEnrichedOrcidWorks {
OBJECT_MAPPER.writeValueAsString(new AtomicAction<>(Publication.class, p)))) OBJECT_MAPPER.writeValueAsString(new AtomicAction<>(Publication.class, p))))
.mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2()))) .mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2())))
.saveAsNewAPIHadoopFile( .saveAsNewAPIHadoopFile(
workingPath.concat(outputEnrichedWorksPath), outputEnrichedWorksPath,
Text.class, Text.class,
Text.class, Text.class,
SequenceFileOutputFormat.class, SequenceFileOutputFormat.class,
@ -180,6 +196,7 @@ public class SparkGenEnrichedOrcidWorks {
logger.info("parsedPublications: " + parsedPublications.value().toString()); logger.info("parsedPublications: " + parsedPublications.value().toString());
logger.info("enrichedPublications: " + enrichedPublications.value().toString()); logger.info("enrichedPublications: " + enrichedPublications.value().toString());
logger.info("warnNotFoundContributors: " + warnNotFoundContributors.value().toString());
logger.info("errorsGeneric: " + errorsGeneric.value().toString()); logger.info("errorsGeneric: " + errorsGeneric.value().toString());
logger.info("errorsInvalidTitle: " + errorsInvalidTitle.value().toString()); logger.info("errorsInvalidTitle: " + errorsInvalidTitle.value().toString());
logger.info("errorsNotFoundAuthors: " + errorsNotFoundAuthors.value().toString()); logger.info("errorsNotFoundAuthors: " + errorsNotFoundAuthors.value().toString());

View File

@ -18,7 +18,7 @@ object SparkMapUnpayWallToOAF {
val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass) val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
val conf: SparkConf = new SparkConf() val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json"))) val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_uw_to_oaf_params.json")))
parser.parseArgument(args) parser.parseArgument(args)
val spark: SparkSession = val spark: SparkSession =
SparkSession SparkSession

View File

@ -0,0 +1,6 @@
[
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the OAF Orcid transformed", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source path ", "paramRequired": false},
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
]

View File

@ -0,0 +1,6 @@
[
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the OAF Orcid transformed", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working path ", "paramRequired": false},
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
]

View File

@ -0,0 +1,6 @@
[
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the OAF Orcid transformed", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source path ", "paramRequired": false},
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
]

View File

@ -1,101 +0,0 @@
<workflow-app name="import Crossref from index into HDFS" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>workingPath</name>
<description>the working dir base path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>timestamp</name>
<description>Timestamp for incremental Harvesting</description>
</property>
</parameters>
<start to="ImportCrossRef"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ImportCrossRef">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>
<arg>-t</arg><arg>${workingPath}/input/crossref/index_update</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-ts</arg><arg>${timestamp}</arg>
</java>
<ok to="GenerateDataset"/>
<error to="Kill"/>
</action>
<action name="GenerateDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExtractCrossrefToOAF</name>
<class>eu.dnetlib.doiboost.crossref.CrossrefDataset</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT}
</spark-opts>
<arg>--workingPath</arg><arg>/data/doiboost/input/crossref</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="RenameDataset"/>
<error to="Kill"/>
</action>
<action name="RenameDataset">
<fs>
<delete path='${workingPath}/input/crossref/crossref_ds'/>
<move source="${workingPath}/input/crossref/crossref_ds_updated"
target="${workingPath}/input/crossref/crossref_ds"/>
</fs>
<ok to="ConvertCrossrefToOAF"/>
<error to="Kill"/>
</action>
<action name="ConvertCrossrefToOAF">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ConvertCrossrefToOAF</name>
<class>eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingPath}/input/crossref/crossref_ds</arg>
<arg>--targetPath</arg><arg>${workingPath}/process/</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -1,38 +0,0 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
</property>
</configuration>

View File

@ -1,96 +0,0 @@
<workflow-app name="Create DOIBoostActionSet" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>hostedByMapPath</name>
<description>the Hosted By Map Path</description>
</property>
<property>
<name>affiliationPath</name>
<description>the Affliation Path</description>
</property>
<property>
<name>paperAffiliationPath</name>
<description>the paperAffiliation Path</description>
</property>
<property>
<name>workingDirPath</name>
<description>the Working Path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="CreateDOIBoost"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="CreateDOIBoost">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create DOIBoost Infospace</name>
<class>eu.dnetlib.doiboost.SparkGenerateDoiBoost</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT}
</spark-opts>
<arg>--hostedByMapPath</arg><arg>${hostedByMapPath}</arg>
<arg>--affiliationPath</arg><arg>${affiliationPath}</arg>
<arg>--paperAffiliationPath</arg><arg>${paperAffiliationPath}</arg>
<arg>--workingDirPath</arg><arg>${workingDirPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="GenerateActionSet"/>
<error to="Kill"/>
</action>
<action name="GenerateActionSet">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Generate DOIBoost ActionSet</name>
<class>eu.dnetlib.doiboost.SparkGenerateDOIBoostActionSet</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT}
</spark-opts>
<arg>--dbPublicationPath</arg><arg>${workingDirPath}/doiBoostPublicationFiltered</arg>
<arg>--dbDatasetPath</arg><arg>${workingDirPath}/crossrefDataset</arg>
<arg>--crossRefRelation</arg><arg>${workingDirPath}/crossrefRelation</arg>
<arg>--dbaffiliationRelationPath</arg><arg>${workingDirPath}/doiBoostPublicationAffiliation</arg>
<arg>-do</arg><arg>${workingDirPath}/doiBoostOrganization</arg>
<arg>--targetPath</arg><arg>${workingDirPath}/actionDataSet</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -1,42 +0,0 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.wf.rerun.failnodes</name>
<value>false</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
</property>
</configuration>

View File

@ -1,92 +0,0 @@
<workflow-app name="import MAG into HDFS" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the working dir base path</description>
</property>
<property>
<name>targetPath</name>
<description>the working dir base path</description>
</property>
<property>
<name>workingPath</name>
<description>the working dir base path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="ResetWorkingPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResetWorkingPath">
<fs>
<delete path='${workingPath}'/>
<mkdir path='${workingPath}'/>
</fs>
<ok to="ConvertMagToDataset"/>
<error to="Kill"/>
</action>
<action name="ConvertMagToDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert Mag to Dataset</name>
<class>eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkExtraOPT}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--targetPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="PreprocessMag"/>
<error to="Kill"/>
</action>
<action name="PreprocessMag">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert Mag to OAF Dataset</name>
<class>eu.dnetlib.doiboost.mag.SparkPreProcessMAG</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}/process</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -34,7 +34,7 @@
<master>yarn-cluster</master> <master>yarn-cluster</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Convert ORCID to Dataset</name> <name>Convert ORCID to Dataset</name>
<class>eu.dnetlib.doiboost.orcid.SparkConvertORCIDToOAF</class> <class>eu.dnetlib.doiboost.orcid.SparkPreprocessORCID</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar> <jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}

View File

@ -7,9 +7,14 @@
</property> </property>
<property> <property>
<name>outputPath</name> <name>outputPath</name>
<value>/data/orcid_activities_2020/no_doi_dataset_prod/</value>
<description>path where to store the action set</description> <description>path where to store the action set</description>
</property> </property>
<property>
<name>processOutputPath</name>
<value>/data/orcid_activities_2020/process_no_doi_dataset_prod</value>
<description>temporary path where to store the action set</description>
</property>
<property> <property>
<name>spark2GenNoDoiDatasetMaxExecutors</name> <name>spark2GenNoDoiDatasetMaxExecutors</name>
<value>40</value> <value>40</value>
@ -66,7 +71,7 @@
<action name="ResetWorkingPath"> <action name="ResetWorkingPath">
<fs> <fs>
<delete path='${workingPath}/no_doi_dataset'/> <delete path='${processOutputPath}'/>
</fs> </fs>
<ok to="GenOrcidNoDoiDataset"/> <ok to="GenOrcidNoDoiDataset"/>
<error to="Kill"/> <error to="Kill"/>
@ -92,7 +97,7 @@
<arg>--workingPath</arg><arg>${workingPath}/</arg> <arg>--workingPath</arg><arg>${workingPath}/</arg>
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg> <arg>--hdfsServerUri</arg><arg>${nameNode}</arg>
<arg>--orcidDataFolder</arg><arg>last_orcid_dataset</arg> <arg>--orcidDataFolder</arg><arg>last_orcid_dataset</arg>
<arg>--outputEnrichedWorksPath</arg><arg>no_doi_dataset</arg> <arg>--outputEnrichedWorksPath</arg><arg>${processOutputPath}</arg>
</spark> </spark>
<ok to="importOrcidNoDoi"/> <ok to="importOrcidNoDoi"/>
<error to="Kill"/> <error to="Kill"/>
@ -100,7 +105,7 @@
<action name="importOrcidNoDoi"> <action name="importOrcidNoDoi">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${workingPath}/no_doi_dataset/*</arg> <arg>${processOutputPath}/*</arg>
<arg>${outputPath}</arg> <arg>${outputPath}</arg>
</distcp> </distcp>
<ok to="End"/> <ok to="End"/>

View File

@ -0,0 +1,216 @@
<workflow-app name="Generate DOIBoost ActionSet" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<!-- Crossref Parameters -->
<property>
<name>inputPathCrossref</name>
<description>the Crossref input path</description>
</property>
<property>
<name>crossrefDumpPath</name>
<description>the Crossref dump path</description>
</property>
<!-- MAG Parameters -->
<property>
<name>MAGDumpPath</name>
<description>the MAG dump working path</description>
</property>
<property>
<name>inputPathMAG</name>
<description>the MAG working path</description>
</property>
<!-- ORCID Parameters -->
<property>
<name>inputPathOrcid</name>
<description>the ORCID input path</description>
</property>
<property>
<name>workingPathOrcid</name>
<description>the ORCID working path</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="resume_from"/>
<decision name="resume_from">
<switch>
<case to="UnpackCrossrefEntries">${wf:conf('resumeFrom') eq 'UnpackCrossrefEntries'}</case>
<case to="GenerateCrossrefDataset">${wf:conf('resumeFrom') eq 'GenerateCrossrefDataset'}</case>
<case to="ResetMagWorkingPath">${wf:conf('resumeFrom') eq 'ResetMagWorkingPath'}</case>
<case to="ConvertMagToDataset">${wf:conf('resumeFrom') eq 'ConvertMagToDataset'}</case>
<case to="PreProcessORCID">${wf:conf('resumeFrom') eq 'PreProcessORCID'}</case>
<default to="ImportCrossRef"/>
</switch>
</decision>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ImportCrossRef">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords</main-class>
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg>
<arg>--crossrefFileNameTarGz</arg><arg>${crossrefDumpPath}/crossref.tar.gz</arg>
<arg>--workingPath</arg><arg>${crossrefDumpPath}</arg>
<arg>--outputPath</arg><arg>${crossrefDumpPath}/files/</arg>
</java>
<ok to="UnpackCrossrefEntries"/>
<error to="Kill"/>
</action>
<action name="UnpackCrossrefEntries">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>SparkUnpackCrossrefEntries</name>
<class>eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${crossrefDumpPath}/files</arg>
<arg>--targetPath</arg><arg>${crossrefDumpPath}/crossref_unpack/</arg>
</spark>
<ok to="GenerateCrossrefDataset"/>
<error to="Kill"/>
</action>
<action name="GenerateCrossrefDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>SparkGenerateCrossrefDataset</name>
<class>eu.dnetlib.doiboost.crossref.GenerateCrossrefDataset</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=7G
--executor-cores=2
--driver-memory=7G
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${crossrefDumpPath}/crossref_unpack/</arg>
<arg>--targetPath</arg><arg>${inputPathCrossref}/crossref_ds</arg>
</spark>
<ok to="removeFiles"/>
<error to="Kill"/>
</action>
<action name="removeFiles">
<fs>
<!-- <delete path="${crossrefDumpPath}/files"/>-->
<delete path="${crossrefDumpPath}/crossref_unpack/"/>
</fs>
<ok to="ResetMagWorkingPath"/>
<error to="Kill"/>
</action>
<!-- MAG SECTION -->
<action name="ResetMagWorkingPath">
<fs>
<delete path="${inputPathMAG}/dataset"/>
<delete path="${inputPathMAG}/process"/>
</fs>
<ok to="ConvertMagToDataset"/>
<error to="Kill"/>
</action>
<action name="ConvertMagToDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert Mag to Dataset</name>
<class>eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${MAGDumpPath}</arg>
<arg>--targetPath</arg><arg>${inputPathMAG}/dataset</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="PreProcessORCID"/>
<error to="Kill"/>
</action>
<!-- ORCID SECTION -->
<action name="PreProcessORCID">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert ORCID to Dataset</name>
<class>eu.dnetlib.doiboost.orcid.SparkPreprocessORCID</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${inputPathOrcid}</arg>
<arg>--workingPath</arg><arg>${workingPathOrcid}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -1,7 +1,6 @@
[ [
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the Orcid Input file", "paramRequired": true}, {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the Orcid Input file", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working path ", "paramRequired": false}, {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working path ", "paramRequired": false},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true}, {"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
] ]

View File

@ -1,4 +1,4 @@
<workflow-app name="Generate DOIBoost ActionSet" xmlns="uri:oozie:workflow:0.5"> <workflow-app name="Generate DOIBoost ActionSet for PROD" xmlns="uri:oozie:workflow:0.5">
<parameters> <parameters>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
@ -17,8 +17,6 @@
<name>sparkExecutorCores</name> <name>sparkExecutorCores</name>
<description>number of cores used by single executor</description> <description>number of cores used by single executor</description>
</property> </property>
<!-- Itersection Parameters --> <!-- Itersection Parameters -->
<property> <property>
<name>workingPath</name> <name>workingPath</name>
@ -40,29 +38,8 @@
<name>inputPathCrossref</name> <name>inputPathCrossref</name>
<description>the Crossref input path</description> <description>the Crossref input path</description>
</property> </property>
<property>
<name>crossrefDumpPath</name>
<description>the Crossref dump path</description>
</property>
<!-- <property>-->
<!-- <name>crossrefTimestamp</name>-->
<!-- <description>Timestamp for the Crossref incremental Harvesting</description>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>esServer</name>-->
<!-- <description>elasticsearch server url for the Crossref Harvesting</description>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>esIndex</name>-->
<!-- <description>elasticsearch index name for the Crossref Harvesting</description>-->
<!-- </property>-->
<!-- MAG Parameters --> <!-- MAG Parameters -->
<property>
<name>MAGDumpPath</name>
<description>the MAG dump working path</description>
</property>
<property> <property>
<name>inputPathMAG</name> <name>inputPathMAG</name>
<description>the MAG working path</description> <description>the MAG working path</description>
@ -76,11 +53,6 @@
</property> </property>
<!-- ORCID Parameters --> <!-- ORCID Parameters -->
<property>
<name>inputPathOrcid</name>
<description>the ORCID input path</description>
</property>
<property> <property>
<name>workingPathOrcid</name> <name>workingPathOrcid</name>
<description>the ORCID working path</description> <description>the ORCID working path</description>
@ -103,15 +75,12 @@
<decision name="resume_from"> <decision name="resume_from">
<switch> <switch>
<case to="ConvertCrossrefToOAF">${wf:conf('resumeFrom') eq 'ConvertCrossrefToOAF'}</case>
<case to="ResetMagWorkingPath">${wf:conf('resumeFrom') eq 'ResetMagWorkingPath'}</case>
<case to="ProcessMAG">${wf:conf('resumeFrom') eq 'PreprocessMag'}</case> <case to="ProcessMAG">${wf:conf('resumeFrom') eq 'PreprocessMag'}</case>
<case to="ProcessUW">${wf:conf('resumeFrom') eq 'PreprocessUW'}</case> <case to="ProcessUW">${wf:conf('resumeFrom') eq 'PreprocessUW'}</case>
<case to="ProcessORCID">${wf:conf('resumeFrom') eq 'PreprocessORCID'}</case> <case to="ProcessORCID">${wf:conf('resumeFrom') eq 'ProcessORCID'}</case>
<case to="CreateDOIBoost">${wf:conf('resumeFrom') eq 'CreateDOIBoost'}</case> <case to="CreateDOIBoost">${wf:conf('resumeFrom') eq 'CreateDOIBoost'}</case>
<case to="GenerateActionSet">${wf:conf('resumeFrom') eq 'GenerateActionSet'}</case> <case to="GenerateActionSet">${wf:conf('resumeFrom') eq 'GenerateActionSet'}</case>
<case to="GenerateCrossrefDataset">${wf:conf('resumeFrom') eq 'GenerateCrossrefDataset'}</case> <default to="ConvertCrossrefToOAF"/>
<default to="ImportCrossRef"/>
</switch> </switch>
</decision> </decision>
@ -119,170 +88,6 @@
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<!-- <action name="ImportCrossRef">-->
<!-- <java>-->
<!-- <main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>-->
<!-- <arg>&#45;&#45;targetPath</arg><arg>${inputPathCrossref}/index_update</arg>-->
<!-- <arg>&#45;&#45;namenode</arg><arg>${nameNode}</arg>-->
<!-- <arg>&#45;&#45;esServer</arg><arg>${esServer}</arg>-->
<!-- <arg>&#45;&#45;esIndex</arg><arg>${esIndex}</arg>-->
<!-- <arg>&#45;&#45;timestamp</arg><arg>${crossrefTimestamp}</arg>-->
<!-- </java>-->
<!-- <ok to="GenerateCrossrefDataset"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<action name="ImportCrossRef">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords</main-class>
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg>
<arg>--crossrefFileNameTarGz</arg><arg>${crossrefDumpPath}/crossref.tar.gz</arg>
<arg>--workingPath</arg><arg>${crossrefDumpPath}</arg>
<arg>--outputPath</arg><arg>${crossrefDumpPath}/files/</arg>
</java>
<ok to="UnpackCrossrefEntries"/>
<error to="Kill"/>
</action>
<action name="UnpackCrossrefEntries">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>SparkUnpackCrossrefEntries</name>
<class>eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${crossrefDumpPath}/files</arg>
<arg>--targetPath</arg><arg>${crossrefDumpPath}/crossref_unpack/</arg>
</spark>
<ok to="GenerateCrossrefDataset"/>
<error to="Kill"/>
</action>
<action name="GenerateCrossrefDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>SparkGenerateCrossrefDataset</name>
<class>eu.dnetlib.doiboost.crossref.GenerateCrossrefDataset</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=7G
--executor-cores=2
--driver-memory=7G
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${crossrefDumpPath}/crossref_unpack/</arg>
<arg>--targetPath</arg><arg>${inputPathCrossref}/crossref_ds</arg>
</spark>
<ok to="removeFiles"/>
<error to="Kill"/>
</action>
<action name="removeFiles">
<fs>
<!-- <delete path="${crossrefDumpPath}/files"/>-->
<delete path="${crossrefDumpPath}/crossref_unpack/"/>
</fs>
<ok to="ResetMagWorkingPath"/>
<error to="Kill"/>
</action>
<!-- CROSSREF SECTION -->
<!-- <action name="GenerateCrossrefDataset">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <master>yarn-cluster</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>GenerateCrossrefDataset</name>-->
<!-- <class>eu.dnetlib.doiboost.crossref.CrossrefDataset</class>-->
<!-- <jar>dhp-doiboost-${projectVersion}.jar</jar>-->
<!-- <spark-opts>-->
<!-- &#45;&#45;executor-memory=${sparkExecutorMemory}-->
<!-- &#45;&#45;executor-cores=${sparkExecutorCores}-->
<!-- &#45;&#45;driver-memory=${sparkDriverMemory}-->
<!-- &#45;&#45;conf spark.sql.shuffle.partitions=3840-->
<!-- &#45;&#45;conf spark.extraListeners=${spark2ExtraListeners}-->
<!-- &#45;&#45;conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
<!-- &#45;&#45;conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
<!-- &#45;&#45;conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
<!-- </spark-opts>-->
<!-- <arg>&#45;&#45;workingPath</arg><arg>${inputPathCrossref}</arg>-->
<!-- <arg>&#45;&#45;master</arg><arg>yarn-cluster</arg>-->
<!-- </spark>-->
<!-- <ok to="RenameDataset"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<!-- <action name="RenameDataset">-->
<!-- <fs>-->
<!-- <delete path="${inputPathCrossref}/crossref_ds"/>-->
<!-- <move source="${inputPathCrossref}/crossref_ds_updated"-->
<!-- target="${inputPathCrossref}/crossref_ds"/>-->
<!-- </fs>-->
<!-- <ok to="ResetMagWorkingPath"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<!-- MAG SECTION -->
<action name="ResetMagWorkingPath">
<fs>
<delete path="${inputPathMAG}/dataset"/>
<delete path="${inputPathMAG}/process"/>
</fs>
<ok to="ConvertMagToDataset"/>
<error to="Kill"/>
</action>
<action name="ConvertMagToDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert Mag to Dataset</name>
<class>eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${MAGDumpPath}</arg>
<arg>--targetPath</arg><arg>${inputPathMAG}/dataset</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="ConvertCrossrefToOAF"/>
<error to="Kill"/>
</action>
<action name="ConvertCrossrefToOAF"> <action name="ConvertCrossrefToOAF">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master> <master>yarn-cluster</master>
@ -326,7 +131,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${inputPathMAG}/dataset</arg> <arg>--sourcePath</arg><arg>${inputPathMAG}/dataset</arg>
<arg>--workingPath</arg><arg>${inputPathMAG}/process</arg> <arg>--workingPath</arg><arg>${inputPathMAG}/process_p</arg>
<arg>--targetPath</arg><arg>${workingPath}</arg> <arg>--targetPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg> <arg>--master</arg><arg>yarn-cluster</arg>
</spark> </spark>
@ -380,7 +185,6 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${inputPathOrcid}</arg>
<arg>--workingPath</arg><arg>${workingPathOrcid}</arg> <arg>--workingPath</arg><arg>${workingPathOrcid}</arg>
<arg>--targetPath</arg><arg>${workingPath}/orcidPublication</arg> <arg>--targetPath</arg><arg>${workingPath}/orcidPublication</arg>
<arg>--master</arg><arg>yarn-cluster</arg> <arg>--master</arg><arg>yarn-cluster</arg>

View File

@ -1,38 +0,0 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
</property>
</configuration>

View File

@ -1,55 +0,0 @@
<workflow-app name="import UnpayWall into HDFS" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the working dir base path</description>
</property>
<property>
<name>targetPath</name>
<description>the working dir base path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="PreprocessUW"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="PreprocessUW">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert UnpayWall to Dataset</name>
<class>eu.dnetlib.doiboost.uw.SparkMapUnpayWallToOAF</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/uw_extracted</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -1,22 +1,15 @@
package eu.dnetlib.doiboost.mag package eu.dnetlib.doiboost.mag
import java.sql.Timestamp import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, SparkSession}
import eu.dnetlib.dhp.schema.oaf.Publication import org.codehaus.jackson.map.ObjectMapper
import org.apache.htrace.fasterxml.jackson.databind.SerializationFeature
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.java.function.MapFunction
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
import org.junit.jupiter.api.Test
import org.slf4j.{Logger, LoggerFactory}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.apache.spark.sql.functions._ import org.junit.jupiter.api.Test
import org.json4s.DefaultFormats
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._ import java.sql.Timestamp
import scala.io.Source import scala.io.Source
import scala.reflect.ClassTag
import scala.util.matching.Regex
@ -65,13 +58,13 @@ class MAGMappingTest {
@Test @Test
def normalizeDoiTest():Unit = { def normalizeDoiTest():Unit = {
import org.json4s.jackson.Serialization.write
import org.json4s.DefaultFormats
implicit val formats = DefaultFormats implicit val formats = DefaultFormats
val conf = new SparkConf()
val conf = new SparkConf().setAppName("test").setMaster("local[*]").set("spark.driver.host", "localhost") conf.setMaster("local[*]")
conf.set("spark.driver.host", "localhost")
val spark: SparkSession = val spark: SparkSession =
SparkSession SparkSession
@ -96,20 +89,21 @@ class MAGMappingTest {
@Test @Test
def normalizeDoiTest2():Unit = { def normalizeDoiTest2():Unit = {
import org.json4s.jackson.Serialization.write
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
implicit val formats = DefaultFormats implicit val formats = DefaultFormats
val conf = new SparkConf().setAppName("test").setMaster("local[*]").set("spark.driver.host", "localhost") val conf = new SparkConf()
conf.setMaster("local[*]")
conf.set("spark.driver.host", "localhost")
val spark: SparkSession =
SparkSession
.builder()
.appName(getClass.getSimpleName)
.config(conf)
.getOrCreate()
val spark: SparkSession =
SparkSession
.builder()
.appName(getClass.getSimpleName)
.config(conf)
.getOrCreate()
val path = getClass.getResource("duplicatedMagPapers.json").getPath val path = getClass.getResource("duplicatedMagPapers.json").getPath
import org.apache.spark.sql.Encoders import org.apache.spark.sql.Encoders

View File

@ -46,7 +46,9 @@ class MappingORCIDToOAFTest {
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
import spark.implicits._ import spark.implicits._
SparkConvertORCIDToOAF.run( spark,sourcePath, workingPath, targetPath) SparkPreprocessORCID.run( spark,sourcePath, workingPath)
SparkConvertORCIDToOAF.run(spark, workingPath,targetPath)
val mapper = new ObjectMapper() val mapper = new ObjectMapper()
@ -62,6 +64,8 @@ class MappingORCIDToOAFTest {
println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(p.first())) println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(p.first()))
spark.close() spark.close()
} }

View File

@ -71,6 +71,8 @@ public abstract class AbstractMdRecordToOafMapper {
private final boolean shouldHashId; private final boolean shouldHashId;
private final boolean forceOriginalId;
protected static final String DATACITE_SCHEMA_KERNEL_4 = "http://datacite.org/schema/kernel-4"; protected static final String DATACITE_SCHEMA_KERNEL_4 = "http://datacite.org/schema/kernel-4";
protected static final String DATACITE_SCHEMA_KERNEL_4_SLASH = "http://datacite.org/schema/kernel-4/"; protected static final String DATACITE_SCHEMA_KERNEL_4_SLASH = "http://datacite.org/schema/kernel-4/";
protected static final String DATACITE_SCHEMA_KERNEL_3 = "http://datacite.org/schema/kernel-3"; protected static final String DATACITE_SCHEMA_KERNEL_3 = "http://datacite.org/schema/kernel-3";
@ -98,11 +100,20 @@ public abstract class AbstractMdRecordToOafMapper {
nsContext.put("datacite", DATACITE_SCHEMA_KERNEL_3); nsContext.put("datacite", DATACITE_SCHEMA_KERNEL_3);
} }
protected AbstractMdRecordToOafMapper(final VocabularyGroup vocs, final boolean invisible,
final boolean shouldHashId, final boolean forceOriginalId) {
this.vocs = vocs;
this.invisible = invisible;
this.shouldHashId = shouldHashId;
this.forceOriginalId = forceOriginalId;
}
protected AbstractMdRecordToOafMapper(final VocabularyGroup vocs, final boolean invisible, protected AbstractMdRecordToOafMapper(final VocabularyGroup vocs, final boolean invisible,
final boolean shouldHashId) { final boolean shouldHashId) {
this.vocs = vocs; this.vocs = vocs;
this.invisible = invisible; this.invisible = invisible;
this.shouldHashId = shouldHashId; this.shouldHashId = shouldHashId;
this.forceOriginalId = false;
} }
public List<Oaf> processMdRecord(final String xml) { public List<Oaf> processMdRecord(final String xml) {
@ -190,10 +201,16 @@ public abstract class AbstractMdRecordToOafMapper {
final long lastUpdateTimestamp) { final long lastUpdateTimestamp) {
final OafEntity entity = createEntity(doc, type, instances, collectedFrom, info, lastUpdateTimestamp); final OafEntity entity = createEntity(doc, type, instances, collectedFrom, info, lastUpdateTimestamp);
final String id = IdentifierFactory.createIdentifier(entity, shouldHashId);
if (!id.equals(entity.getId())) { final Set<String> originalId = Sets.newHashSet(entity.getOriginalId());
entity.getOriginalId().add(entity.getId()); originalId.add(entity.getId());
entity.setId(id); entity.setOriginalId(Lists.newArrayList(originalId));
if (!forceOriginalId) {
final String id = IdentifierFactory.createIdentifier(entity, shouldHashId);
if (!id.equals(entity.getId())) {
entity.setId(id);
}
} }
final List<Oaf> oafs = Lists.newArrayList(entity); final List<Oaf> oafs = Lists.newArrayList(entity);

View File

@ -163,11 +163,13 @@ public class GenerateEntitiesApplication {
switch (type.toLowerCase()) { switch (type.toLowerCase()) {
case "oaf-store-cleaned": case "oaf-store-cleaned":
case "oaf-store-claim":
return new OafToOafMapper(vocs, false, shouldHashId).processMdRecord(s); return new OafToOafMapper(vocs, false, shouldHashId).processMdRecord(s);
case "oaf-store-claim":
return new OafToOafMapper(vocs, false, shouldHashId, true).processMdRecord(s);
case "odf-store-cleaned": case "odf-store-cleaned":
case "odf-store-claim":
return new OdfToOafMapper(vocs, false, shouldHashId).processMdRecord(s); return new OdfToOafMapper(vocs, false, shouldHashId).processMdRecord(s);
case "odf-store-claim":
return new OdfToOafMapper(vocs, false, shouldHashId, true).processMdRecord(s);
case "oaf-store-intersection": case "oaf-store-intersection":
return new OafToOafMapper(vocs, true, shouldHashId).processMdRecord(s); return new OafToOafMapper(vocs, true, shouldHashId).processMdRecord(s);
case "odf-store-intersection": case "odf-store-intersection":

View File

@ -27,6 +27,11 @@ import eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits;
public class OafToOafMapper extends AbstractMdRecordToOafMapper { public class OafToOafMapper extends AbstractMdRecordToOafMapper {
public OafToOafMapper(final VocabularyGroup vocs, final boolean invisible, final boolean shouldHashId,
final boolean forceOrginalId) {
super(vocs, invisible, shouldHashId, forceOrginalId);
}
public OafToOafMapper(final VocabularyGroup vocs, final boolean invisible, final boolean shouldHashId) { public OafToOafMapper(final VocabularyGroup vocs, final boolean invisible, final boolean shouldHashId) {
super(vocs, invisible, shouldHashId); super(vocs, invisible, shouldHashId);
} }

View File

@ -22,6 +22,11 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
public static final String HTTP_DX_DOI_PREIFX = "http://dx.doi.org/"; public static final String HTTP_DX_DOI_PREIFX = "http://dx.doi.org/";
public OdfToOafMapper(final VocabularyGroup vocs, final boolean invisible, final boolean shouldHashId,
final boolean forceOrginalId) {
super(vocs, invisible, shouldHashId, forceOrginalId);
}
public OdfToOafMapper(final VocabularyGroup vocs, final boolean invisible, final boolean shouldHashId) { public OdfToOafMapper(final VocabularyGroup vocs, final boolean invisible, final boolean shouldHashId) {
super(vocs, invisible, shouldHashId); super(vocs, invisible, shouldHashId);
} }

View File

@ -567,6 +567,31 @@ public class MappersTest {
assertNotNull(d.getInstance().get(0).getUrl()); assertNotNull(d.getInstance().get(0).getUrl());
} }
@Test
void testEnermaps() throws IOException {
final String xml = IOUtils.toString(getClass().getResourceAsStream("enermaps.xml"));
final List<Oaf> list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml);
System.out.println("***************");
System.out.println(new ObjectMapper().writeValueAsString(list));
System.out.println("***************");
assertEquals(1, list.size());
assertTrue(list.get(0) instanceof Dataset);
final Dataset d = (Dataset) list.get(0);
assertValidId(d.getId());
assertValidId(d.getCollectedfrom().get(0).getKey());
assertTrue(StringUtils.isNotBlank(d.getTitle().get(0).getValue()));
assertEquals(1, d.getAuthor().size());
assertEquals(1, d.getInstance().size());
assertNotNull(d.getInstance().get(0).getUrl());
assertNotNull(d.getContext());
assertTrue(StringUtils.isNotBlank(d.getContext().get(0).getId()));
assertEquals("enermaps::selection::tgs00004", d.getContext().get(0).getId());
}
@Test @Test
void testClaimFromCrossref() throws IOException { void testClaimFromCrossref() throws IOException {
final String xml = IOUtils.toString(getClass().getResourceAsStream("oaf_claim_crossref.xml")); final String xml = IOUtils.toString(getClass().getResourceAsStream("oaf_claim_crossref.xml"));

View File

@ -0,0 +1,72 @@
<?xml version="1.0" encoding="UTF-8"?>
<record xmlns="http://datacite.org/schema/kernel-4"
xmlns:dr="http://www.driver-repository.eu/namespace/dr" xmlns:oaf="http://namespace.openaire.eu/oaf">
<oai:header xmlns="http://namespace.openaire.eu/"
xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<dri:objIdentifier>enermaps____::04149ee428d07360314c2cb3ba95d41e</dri:objIdentifier>
<dri:recordIdentifier>tgs00004</dri:recordIdentifier>
<dri:dateOfCollection>2021-07-20T18:43:12.096+02:00</dri:dateOfCollection>
<oaf:datasourceprefix>enermaps____</oaf:datasourceprefix>
</oai:header>
<metadata>
<resource>
<identifier identifierType="URL">https://ec.europa.eu/eurostat/web/products-datasets/-/tgs00004</identifier>
<creators>
<creator>
<creatorName>Statistical Office of the European Union (Eurostat)</creatorName>
</creator>
</creators>
<titles>
<title>
Regional GDP
</title>
</titles>
<publisher>Statistical Office of the European Union (Eurostat)</publisher>
<publicationYear>2020</publicationYear>
<dates>
<date dateType="Issued">2020-10-07</date>
</dates>
<resourceType resourceTypeGeneral="Dataset"/>
<rightsList>
<rights rightsURI="info:eu-repo/semantics/openAccess">OPEN</rights>
<rights rightsURI="https://creativecommons.org/licenses/by/4.0/">Creative Commons Attribution 4.0 International</rights>
</rightsList>
<descriptions>
<description descriptionType="Abstract" xml:lang="EN">GDP expressed in PPS (purchasing power standards) eliminates differences in price levels between countries. Calculations on a per inhabitant basis allow for the comparison of economies and regions significantly different in absolute size. GDP per inhabitant in PPS is the key variable for determining the eligibility of NUTS 2 regions in the framework of the European Unions structural policy.</description>
</descriptions>
<dr:CobjCategory type="dataset">0021</dr:CobjCategory>
<oaf:dateAccepted>2020-10-07</oaf:dateAccepted>
<oaf:accessrights>OPEN</oaf:accessrights>
<oaf:license>Creative Commons Attribution 4.0 International</oaf:license>
<oaf:hostedBy
id="openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18" name="Unknown Repository"/>
<oaf:collectedFrom id="enermaps____::db" name="Enermaps"/>
<oaf:concept id="enermaps::selection::tgs00004"/>
</resource>
</metadata>
<about xmlns="" xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
<originDescription altered="true" harvestDate="2021-07-20T18:43:12.096+02:00">
<baseURL>https%3A%2F%2Flab.idiap.ch%2Fenermaps%2Fapi%2Fdatacite</baseURL>
<identifier/>
<datestamp/>
<metadataNamespace/>
</originDescription>
</provenance>
<oaf:datainfo>
<oaf:inferred>false</oaf:inferred>
<oaf:deletedbyinference>false</oaf:deletedbyinference>
<oaf:trust>0.9</oaf:trust>
<oaf:inferenceprovenance/>
<oaf:provenanceaction classid="sysimport:crosswalk"
classname="sysimport:crosswalk"
schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
</oaf:datainfo>
</about>
</record>

View File

@ -16,6 +16,7 @@ import javax.xml.transform.*;
import javax.xml.transform.dom.DOMSource; import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult; import javax.xml.transform.stream.StreamResult;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document; import org.dom4j.Document;
@ -183,6 +184,7 @@ public class XmlRecordFactory implements Serializable {
.getOriginalId() .getOriginalId()
.stream() .stream()
.filter(Objects::nonNull) .filter(Objects::nonNull)
.filter(id -> !id.matches("^\\d{2}" + IdentifierFactory.ID_PREFIX_SEPARATOR))
.map(s -> XmlSerializationUtils.asXmlElement("originalId", s)) .map(s -> XmlSerializationUtils.asXmlElement("originalId", s))
.collect(Collectors.toList())); .collect(Collectors.toList()));
} }

View File

@ -7,6 +7,8 @@ import java.io.IOException;
import java.io.StringReader; import java.io.StringReader;
import java.util.List; import java.util.List;
import eu.dnetlib.dhp.oa.provision.utils.ContextDef;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.dom4j.Document; import org.dom4j.Document;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
@ -131,4 +133,31 @@ public class XmlRecordFactoryTest {
System.out.println(doc.asXML()); System.out.println(doc.asXML());
assertEquals("", doc.valueOf("//rel/validated")); assertEquals("", doc.valueOf("//rel/validated"));
} }
@Test
public void testEnermapsRecord() throws IOException, DocumentException {
String contextmap = "<entries><entry id=\"enermaps\" label=\"Energy Research\" name=\"context\" type=\"community\"/>" +
"<entry id=\"enermaps::selection\" label=\"Featured dataset\" name=\"category\"/>"+
"<entry id=\"enermaps::selection::tgs00004\" label=\"Dataset title\" name=\"concept\"/>"+
"</entries>";
ContextMapper contextMapper = ContextMapper.fromXml(contextmap);
XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, XmlConverterJob.schemaLocation,
otherDsTypeId);
Dataset d = OBJECT_MAPPER
.readValue(IOUtils.toString(getClass().getResourceAsStream("enermaps.json")), Dataset.class);
JoinedEntity je = new JoinedEntity<>(d);
String xml = xmlRecordFactory.build(je);
assertNotNull(xml);
Document doc = new SAXReader().read(new StringReader(xml));
assertNotNull(doc);
System.out.println(doc.asXML());
assertEquals("enermaps::selection::tgs00004", doc.valueOf("//concept/@id"));
}
} }

File diff suppressed because one or more lines are too long