From c01dd0c92524c21bfe8f06649a6d75f942ffbcc7 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 20 Oct 2021 13:55:07 +0200 Subject: [PATCH 01/14] registered oaf model classes for the KryoSerializer --- .../eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java index 4cff88d82..25b343c1a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java @@ -76,6 +76,9 @@ public class CopyHdfsOafApplication extends AbstractMigrationApplication { final Set paths = mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation); final SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + runWithSparkSession(conf, isSparkSessionManaged, spark -> processPaths(spark, vocs, hdfsPath, paths)); } From 00b78b9c588e0c36264a3cd128419d91b6dd91c4 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 20 Oct 2021 14:04:45 +0200 Subject: [PATCH 02/14] cleanup: mapping contents in the graph already defined in the OAF graph model doesn't require to be aware of the vocabularies --- .../dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java | 9 +-------- .../dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json | 6 ------ .../dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml | 1 - 3 files changed, 1 insertion(+), 15 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java index 25b343c1a..c016e2156 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java @@ -67,23 +67,16 @@ public class CopyHdfsOafApplication extends AbstractMigrationApplication { final String hdfsPath = parser.get("hdfsPath"); log.info("hdfsPath: {}", hdfsPath); - final String isLookupUrl = parser.get("isLookupUrl"); - log.info("isLookupUrl: {}", isLookupUrl); - - final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); - final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService); - final Set paths = mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation); final SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - runWithSparkSession(conf, isSparkSessionManaged, spark -> processPaths(spark, vocs, hdfsPath, paths)); + runWithSparkSession(conf, isSparkSessionManaged, spark -> processPaths(spark, hdfsPath, paths)); } public static void processPaths(final SparkSession spark, - final VocabularyGroup vocs, final String outputPath, final Set paths) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json index 1e862198f..1d89017c5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json @@ -28,11 +28,5 @@ "paramLongName": "mdInterpretation", "paramDescription": "metadata interpretation", "paramRequired": true - }, - { - "paramName": "isu", - "paramLongName": "isLookupUrl", - "paramDescription": "the url of the ISLookupService", - "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index c4cca52f6..137c69ed8 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -569,7 +569,6 @@ --mdFormatOAF --mdLayoutstore --mdInterpretationgraph - --isLookupUrl${isLookupUrl} From 4f8970f8ed73acd2c89c23af6c209d0d4443c59c Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 20 Oct 2021 14:14:53 +0200 Subject: [PATCH 03/14] [stats] reducing the step22 wait time --- .../eu/dnetlib/dhp/oa/graph/stats/oozie_app/updateCache.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/updateCache.sh b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/updateCache.sh index dc19f84b4..03aa535e1 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/updateCache.sh +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/updateCache.sh @@ -1,4 +1,4 @@ #!/usr/bin/env bash curl --request GET $1/cache/updateCache -sleep 20h \ No newline at end of file +sleep 6h \ No newline at end of file From ae4e99a4715811e193ee7892f679125cd8dbbdaa Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 20 Oct 2021 17:12:08 +0200 Subject: [PATCH 04/14] Adapted workflow of resolution of PID to work into OpenAIRE data workflow - Added relations in both verse on all Scholexplorer datasources --- .../java/eu/dnetlib/dhp/utils/DHPUtils.java | 9 + .../DataciteToOAFTransformation.scala | 11 +- .../GenerateDataciteDatasetSpark.scala | 19 ++- .../dhp/collection/CollectionUtils.scala | 43 +++++ .../bio/SparkTransformBioDatabaseToOAF.scala | 9 +- .../dhp/sx/bio/ebi/SparkEBILinksToOaf.scala | 2 + .../collection/oozie_app/config-default.xml | 23 +++ .../collection/oozie_app/workflow.xml | 52 ++++++ .../datacite/generate_dataset_params.json | 4 +- .../oozie_app/config-default.xml | 23 +++ .../transformation/oozie_app/workflow.xml | 126 ++++++++++++++ .../resolution/SparkResolveRelation.scala | 152 +++++++++++++++++ .../dhp/sx/graph/SparkResolveRelation.scala | 154 ------------------ .../resolution}/oozie_app/config-default.xml | 0 .../graph/resolution}/oozie_app/workflow.xml | 34 +--- .../resolution}/resolve_relations_params.json | 3 +- .../sx/graph/scholix/ScholixGraphTest.scala | 2 +- 17 files changed, 468 insertions(+), 198 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/{sx/graph/resolverelation => oa/graph/resolution}/oozie_app/config-default.xml (100%) rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/{sx/graph/resolverelation => oa/graph/resolution}/oozie_app/workflow.xml (54%) rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/{sx/graph => oa/graph/resolution}/resolve_relations_params.json (50%) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java index 6a86f30df..c53affbad 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java @@ -10,6 +10,7 @@ import java.util.Properties; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64OutputStream; import org.apache.commons.codec.binary.Hex; @@ -56,6 +57,14 @@ public class DHPUtils { return String.format("%s::%s", nsPrefix, DHPUtils.md5(originalId)); } + + public static String generateUnresolvedIdentifier(final String pid, final String pidType) { + + final String cleanedPid = CleaningFunctions.normalizePidValue(pidType, pid); + + return String.format("unresolved::%s::%s", cleanedPid, pidType.toLowerCase().trim()); + } + public static String getJPathString(final String jsonPath, final String json) { try { Object o = JsonPath.read(json, jsonPath); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala index cfdd98d30..e3729e5b7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala @@ -325,8 +325,9 @@ object DataciteToOAFTransformation { val grantId = m.matcher(awardUri).replaceAll("$2") val targetId = s"$p${DHPUtils.md5(grantId)}" List( - generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo), - generateRelation(targetId, sourceId, "produces", DATACITE_COLLECTED_FROM, dataInfo) + generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo) +// REMOVED INVERSE RELATION since there is a specific method that should generate later +// generateRelation(targetId, sourceId, "produces", DATACITE_COLLECTED_FROM, dataInfo) ) } else @@ -580,11 +581,11 @@ object DataciteToOAFTransformation { rel.setProperties(List(dateProps).asJava) rel.setSource(id) - rel.setTarget(s"unresolved::${r.relatedIdentifier}::${r.relatedIdentifierType}") + rel.setTarget(DHPUtils.generateUnresolvedIdentifier(r.relatedIdentifier,r.relatedIdentifierType)) rel.setCollectedfrom(List(DATACITE_COLLECTED_FROM).asJava) - rel.getCollectedfrom.asScala.map(c => c.getValue)(collection.breakOut) + rel.getCollectedfrom.asScala.map(c => c.getValue).toList rel - })(collection breakOut) + }).toList } def generateDataInfo(trust: String): DataInfo = { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala index 2cabc7879..65d00c4d1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala @@ -1,9 +1,13 @@ package eu.dnetlib.dhp.actionmanager.datacite +import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH} import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup -import eu.dnetlib.dhp.schema.mdstore.MetadataRecord +import eu.dnetlib.dhp.collection.CollectionUtils.fixRelations +import eu.dnetlib.dhp.schema.mdstore.{MDStoreVersion, MetadataRecord} import eu.dnetlib.dhp.schema.oaf.Oaf +import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile import eu.dnetlib.dhp.utils.ISLookupClientFactory import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} @@ -21,7 +25,6 @@ object GenerateDataciteDatasetSpark { parser.parseArgument(args) val master = parser.get("master") val sourcePath = parser.get("sourcePath") - val targetPath = parser.get("targetPath") val exportLinks = "true".equalsIgnoreCase(parser.get("exportLinks")) val isLookupUrl: String = parser.get("isLookupUrl") log.info("isLookupUrl: {}", isLookupUrl) @@ -39,10 +42,22 @@ object GenerateDataciteDatasetSpark { import spark.implicits._ + val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") + val mapper = new ObjectMapper() + val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) + val outputBasePath = cleanedMdStoreVersion.getHdfsPath + + log.info("outputBasePath: {}", outputBasePath) + val targetPath = s"$outputBasePath/$MDSTORE_DATA_PATH" + spark.read.load(sourcePath).as[DataciteType] .filter(d => d.isActive) .flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks)) .filter(d => d != null) + .flatMap(i=> fixRelations(i)).filter(i => i != null) .write.mode(SaveMode.Overwrite).save(targetPath) + + val total_items =spark.read.load(targetPath).as[Oaf].count() + writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$total_items", outputBasePath + MDSTORE_SIZE_PATH) } } \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala new file mode 100644 index 000000000..e212d7e2a --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala @@ -0,0 +1,43 @@ +package eu.dnetlib.dhp.collection + +import eu.dnetlib.dhp.schema.common.ModelSupport +import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity, Relation} + +object CollectionUtils { + + /** + * This method in pipeline to the transformation phase, + * generates relations in both verse, typically it should be a phase of flatMap + * + * @param i input OAF + * @return + * If the input OAF is an entity -> List(i) + * If the input OAF is a relation -> List(relation, inverseRelation) + * + */ + + def fixRelations(i: Oaf): List[Oaf] = { + if (i.isInstanceOf[OafEntity]) + return List(i) + else { + val r: Relation = i.asInstanceOf[Relation] + val currentRel = ModelSupport.findRelation(r.getRelClass) + if (currentRel != null) { + + // Cleaning relation + r.setRelType(currentRel.getRelType) + r.setSubRelType(currentRel.getSubReltype) + r.setRelClass(currentRel.getRelClass) + val inverse = new Relation + inverse.setSource(r.getTarget) + inverse.setTarget(r.getSource) + inverse.setRelType(currentRel.getRelType) + inverse.setSubRelType(currentRel.getSubReltype) + inverse.setRelClass(currentRel.getInverseRelClass) + return List(r, inverse) + } + } + List() + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala index 7a62437a3..8ae8285e3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.sx.bio import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.Oaf import BioDBToOAF.ScholixResolved +import eu.dnetlib.dhp.collection.CollectionUtils import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} @@ -35,13 +36,13 @@ object SparkTransformBioDatabaseToOAF { import spark.implicits._ database.toUpperCase() match { case "UNIPROT" => - spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))).write.mode(SaveMode.Overwrite).save(targetPath) + spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) case "PDB" => - spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))).write.mode(SaveMode.Overwrite).save(targetPath) + spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) case "SCHOLIX" => - spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)).write.mode(SaveMode.Overwrite).save(targetPath) + spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) case "CROSSREF_LINKS" => - spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))).write.mode(SaveMode.Overwrite).save(targetPath) + spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala index b19bfc23a..8da617ca0 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala @@ -5,6 +5,7 @@ import eu.dnetlib.dhp.schema.oaf.Oaf import eu.dnetlib.dhp.sx.bio.BioDBToOAF import eu.dnetlib.dhp.sx.bio.BioDBToOAF.EBILinkItem import BioDBToOAF.EBILinkItem +import eu.dnetlib.dhp.collection.CollectionUtils import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql._ @@ -37,6 +38,7 @@ object SparkEBILinksToOaf { ebLinks.flatMap(j => BioDBToOAF.parse_ebi_links(j.links)) .filter(p => BioDBToOAF.EBITargetLinksFilter(p)) .flatMap(p => BioDBToOAF.convertEBILinksToOaf(p)) + .flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null) .write.mode(SaveMode.Overwrite).save(targetPath) } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/config-default.xml new file mode 100644 index 000000000..dd3c32c62 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/config-default.xml @@ -0,0 +1,23 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + + oozie.launcher.mapreduce.user.classpath.first + true + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/workflow.xml new file mode 100644 index 000000000..41a2e2291 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/workflow.xml @@ -0,0 +1,52 @@ + + + + mainPath + the working path of Datacite stores + + + isLookupUrl + The IS lookUp service endopoint + + + blocksize + 100 + The request block size + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + yarn-cluster + cluster + ImportDatacite + eu.dnetlib.dhp.actionmanager.datacite.ImportDatacite + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --targetPath${mainPath}/datacite_update + --dataciteDumpPath${mainPath}/datacite_dump + --namenode${nameNode} + --masteryarn-cluster + --blocksize${blocksize} + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json index 67e7f37dc..04dc8b942 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json @@ -7,8 +7,8 @@ }, { - "paramName": "t", - "paramLongName": "targetPath", + "paramName": "mo", + "paramLongName": "mdstoreOutputVersion", "paramDescription": "the target mdstore path", "paramRequired": true }, diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/config-default.xml new file mode 100644 index 000000000..dd3c32c62 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/config-default.xml @@ -0,0 +1,23 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + + oozie.launcher.mapreduce.user.classpath.first + true + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/workflow.xml new file mode 100644 index 000000000..aeb824a41 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/workflow.xml @@ -0,0 +1,126 @@ + + + + mainPath + the working path of Datacite stores + + + isLookupUrl + The IS lookUp service endopoint + + + mdStoreOutputId + the identifier of the cleaned MDStore + + + mdStoreManagerURI + the path of the cleaned mdstore + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionNEW_VERSION + --mdStoreID${mdStoreOutputId} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + + + yarn-cluster + cluster + TransformJob + eu.dnetlib.dhp.actionmanager.datacite.GenerateDataciteDatasetSpark + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${mainPath}/datacite_dump + --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --isLookupUrl${isLookupUrl} + --exportLinkstrue + --masteryarn-cluster + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionCOMMIT + --namenode${nameNode} + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionREAD_UNLOCK + --mdStoreManagerURI${mdStoreManagerURI} + --readMDStoreId${wf:actionData('BeginRead')['mdStoreReadLockVersion']} + + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionROLLBACK + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala new file mode 100644 index 000000000..e87f46b00 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala @@ -0,0 +1,152 @@ +package eu.dnetlib.dhp.oa.graph.resolution + +import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.common.HdfsSupport +import eu.dnetlib.dhp.schema.oaf.{Relation, Result} +import eu.dnetlib.dhp.utils.DHPUtils +import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.JsonAST.{JField, JObject, JString} +import org.json4s.jackson.JsonMethods.parse +import org.slf4j.{Logger, LoggerFactory} + +object SparkResolveRelation { + def main(args: Array[String]): Unit = { + val log: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + + val graphBasePath = parser.get("graphBasePath") + log.info(s"graphBasePath -> $graphBasePath") + val workingPath = parser.get("workingPath") + log.info(s"workingPath -> $workingPath") + + implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) + import spark.implicits._ + + + //CLEANING TEMPORARY FOLDER + HdfsSupport.remove(workingPath, spark.sparkContext.hadoopConfiguration) + val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) + fs.mkdirs(new Path(workingPath)) + + extractPidResolvedTableFromJsonRDD(spark, graphBasePath, workingPath) + + val mapper: ObjectMapper = new ObjectMapper() + + val rPid: Dataset[(String, String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String, String)] + + val relationDs: Dataset[(String, Relation)] = spark.read.text(s"$graphBasePath/relation").as[String] + .map(s => mapper.readValue(s, classOf[Relation])).as[Relation] + .map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) + + relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_2")), "left").map { + m => + val sourceResolved = m._2 + val currentRelation = m._1._2 + if (sourceResolved != null && sourceResolved._1 != null && sourceResolved._1.nonEmpty) + currentRelation.setSource(sourceResolved._1) + currentRelation + }.write + .mode(SaveMode.Overwrite) + .save(s"$workingPath/relationResolvedSource") + + + val relationSourceResolved: Dataset[(String, Relation)] = spark.read.load(s"$workingPath/relationResolvedSource").as[Relation] + .map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) + relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_2")), "left").map { + m => + val targetResolved = m._2 + val currentRelation = m._1._2 + if (targetResolved != null && targetResolved._1.nonEmpty) + currentRelation.setTarget(targetResolved._1) + currentRelation + }.filter(r => !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved")) + .write + .mode(SaveMode.Overwrite) + .save(s"$workingPath/relation_resolved") + + + // TO BE conservative we keep the original relation in the working dir + // and save the relation resolved on the graphBasePath + //In future this two line of code should be removed + + fs.rename(new Path(s"$graphBasePath/relation"), new Path(s"$workingPath/relation")) + + spark.read.load(s"$workingPath/relation_resolved").as[Relation] + .map(r => mapper.writeValueAsString(r)) + .write + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .text(s"$graphBasePath/relation") + } + + + def extractPidsFromRecord(input: String): (String, List[(String, String)]) = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + val id: String = (json \ "id").extract[String] + val result: List[(String, String)] = for { + JObject(pids) <- json \\ "instance" \ "pid" + JField("value", JString(pidValue)) <- pids + JField("qualifier", JObject(qualifier)) <- pids + JField("classname", JString(pidType)) <- qualifier + } yield (pidValue, pidType) + + val alternateIds: List[(String, String)] = for { + JObject(pids) <- json \\ "alternateIdentifier" + JField("value", JString(pidValue)) <- pids + JField("qualifier", JObject(qualifier)) <- pids + JField("classname", JString(pidType)) <- qualifier + } yield (pidValue, pidType) + + (id, result ::: alternateIds) + } + + + private def isRelation(input: String): Boolean = { + + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + val source = (json \ "source").extractOrElse[String](null) + + source != null + } + + private def extractPidResolvedTableFromJsonRDD(spark: SparkSession, graphPath: String, workingPath: String) = { + import spark.implicits._ + + val d: RDD[(String, String)] = spark.sparkContext.textFile(s"$graphPath/*") + .filter(i => !isRelation(i)) + .map(i => extractPidsFromRecord(i)) + .filter(s => s != null && s._1 != null && s._2 != null && s._2.nonEmpty) + .flatMap { p => + p._2.map(pid => + (p._1, DHPUtils.generateUnresolvedIdentifier(pid._1, pid._2)) + ) + }.filter(r => r._1 != null || r._2 != null) + + spark.createDataset(d) + .groupByKey(_._2) + .reduceGroups((x, y) => if (x._1.startsWith("50|doi") || x._1.startsWith("50|pmid")) x else y) + .map(s => s._2) + .write + .mode(SaveMode.Overwrite) + .save(s"$workingPath/relationResolvedPid") + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala deleted file mode 100644 index 1b13b81c7..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala +++ /dev/null @@ -1,154 +0,0 @@ -package eu.dnetlib.dhp.sx.graph - -import com.fasterxml.jackson.databind.ObjectMapper -import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{Relation, Result} -import org.apache.commons.io.IOUtils -import org.apache.hadoop.io.compress.GzipCodec -import org.apache.spark.SparkConf -import org.apache.spark.rdd.RDD -import org.apache.spark.sql._ -import org.json4s -import org.json4s.DefaultFormats -import org.json4s.JsonAST.{JField, JObject, JString} -import org.json4s.jackson.JsonMethods.parse -import org.slf4j.{Logger, LoggerFactory} - -import scala.collection.JavaConverters._ -object SparkResolveRelation { - def main(args: Array[String]): Unit = { - val log: Logger = LoggerFactory.getLogger(getClass) - val conf: SparkConf = new SparkConf() - val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json"))) - parser.parseArgument(args) - val spark: SparkSession = - SparkSession - .builder() - .config(conf) - .appName(getClass.getSimpleName) - .master(parser.get("master")).getOrCreate() - - - val relationPath = parser.get("relationPath") - log.info(s"sourcePath -> $relationPath") - val entityPath = parser.get("entityPath") - log.info(s"entityPath -> $entityPath") - val workingPath = parser.get("workingPath") - log.info(s"workingPath -> $workingPath") - - implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) - import spark.implicits._ - - - extractPidResolvedTableFromJsonRDD(spark, entityPath, workingPath) - - val mappper = new ObjectMapper() - - val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String,String)] - - val relationDs:Dataset[(String,Relation)] = spark.read.load(relationPath).as[Relation].map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) - - relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_2")), "left").map{ - m => - val sourceResolved = m._2 - val currentRelation = m._1._2 - if (sourceResolved!=null && sourceResolved._1!=null && sourceResolved._1.nonEmpty) - currentRelation.setSource(sourceResolved._1) - currentRelation - }.write - .mode(SaveMode.Overwrite) - .save(s"$workingPath/relationResolvedSource") - - - val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/relationResolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) - relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_2")), "left").map{ - m => - val targetResolved = m._2 - val currentRelation = m._1._2 - if (targetResolved!=null && targetResolved._1.nonEmpty) - currentRelation.setTarget(targetResolved._1) - currentRelation - }.filter(r => r.getSource.startsWith("50")&& r.getTarget.startsWith("50")) - .write - .mode(SaveMode.Overwrite) - .save(s"$workingPath/relation_resolved") - - spark.read.load(s"$workingPath/relation_resolved").as[Relation] - .map(r => mappper.writeValueAsString(r)) - .rdd.saveAsTextFile(s"$workingPath/relation", classOf[GzipCodec]) - - } - - - def extractPidsFromRecord(input:String):(String,List[(String,String)]) = { - implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - lazy val json: json4s.JValue = parse(input) - val id:String = (json \ "id").extract[String] - val result: List[(String,String)] = for { - JObject(pids) <- json \ "pid" - JField("value", JString(pidValue)) <- pids - JField("qualifier", JObject(qualifier)) <- pids - JField("classname", JString(pidType)) <- qualifier - } yield (pidValue, pidType) - - val alternateIds: List[(String,String)] = for { - JObject(pids) <- json \\ "alternateIdentifier" - JField("value", JString(pidValue)) <- pids - JField("qualifier", JObject(qualifier)) <- pids - JField("classname", JString(pidType)) <- qualifier - } yield (pidValue, pidType) - - (id,result:::alternateIds) - } - - private def extractPidResolvedTableFromJsonRDD(spark: SparkSession, entityPath: String, workingPath: String) = { - import spark.implicits._ - - val d: RDD[(String,String)] = spark.sparkContext.textFile(s"$entityPath/*") - .map(i => extractPidsFromRecord(i)) - .filter(s => s != null && s._1!= null && s._2!=null && s._2.nonEmpty) - .flatMap{ p => - p._2.map(pid => - (p._1, convertPidToDNETIdentifier(pid._1, pid._2)) - ) - }.filter(r =>r._1 != null || r._2 != null) - - spark.createDataset(d) - .groupByKey(_._2) - .reduceGroups((x, y) => if (x._1.startsWith("50|doi") || x._1.startsWith("50|pmid")) x else y) - .map(s => s._2) - .write - .mode(SaveMode.Overwrite) - .save(s"$workingPath/relationResolvedPid") - } - - - /* - This method should be used once we finally convert everythings in Kryo dataset - instead of using rdd of json - */ - private def extractPidResolvedTableFromKryo(spark: SparkSession, entityPath: String, workingPath: String) = { - import spark.implicits._ - implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) - val entities: Dataset[Result] = spark.read.load(s"$entityPath/*").as[Result] - entities.flatMap(e => e.getPid.asScala - .map(p => - convertPidToDNETIdentifier(p.getValue, p.getQualifier.getClassid)) - .filter(s => s != null) - .map(s => (s, e.getId)) - ).groupByKey(_._1) - .reduceGroups((x, y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y) - .map(s => s._2) - .write - .mode(SaveMode.Overwrite) - .save(s"$workingPath/relationResolvedPid") - } - - def convertPidToDNETIdentifier(pid:String, pidType: String):String = { - if (pid==null || pid.isEmpty || pidType== null || pidType.isEmpty) - null - else - s"unresolved::${pid.toLowerCase}::${pidType.toLowerCase}" - } - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml similarity index 54% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml index 7683ff94c..e9e1a8ede 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml @@ -1,44 +1,23 @@ - entityPath - the path of deduplicate Entities + graphBasePath + the path of the graph - - relationPath - the path of relation unresolved - - - targetPath - the path of relation unresolved - - - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - yarn cluster Resolve Relations in raw graph - eu.dnetlib.dhp.sx.graph.SparkResolveRelation + eu.dnetlib.dhp.oa.graph.resolution.SparkResolveRelation dhp-graph-mapper-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -51,9 +30,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --masteryarn - --relationPath${relationPath} - --workingPath${targetPath} - --entityPath${entityPath} + --graphBasePath${graphBasePath} + --workingPath${workingDir} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json similarity index 50% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json index f211adb9a..1fbe20648 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json @@ -1,6 +1,5 @@ [ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"r", "paramLongName":"relationPath", "paramDescription": "the source Path", "paramRequired": true}, {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the source Path", "paramRequired": true}, - {"paramName":"e", "paramLongName":"entityPath", "paramDescription": "the path of the raw graph", "paramRequired": true} + {"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala index 5b7fbe1cf..bd7e4fd09 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala @@ -1,10 +1,10 @@ package eu.dnetlib.dhp.sx.graph.scholix import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature} +import eu.dnetlib.dhp.oa.graph.resolution.SparkResolveRelation import eu.dnetlib.dhp.schema.oaf.{Relation, Result} import eu.dnetlib.dhp.schema.sx.scholix.Scholix import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary -import eu.dnetlib.dhp.sx.graph.SparkResolveRelation import eu.dnetlib.dhp.sx.graph.bio.pubmed.AbstractVocabularyTest import org.json4s import org.json4s.DefaultFormats From ab3a99d3e9463fce361dc3cd1f2872ddbaefdac0 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 20 Oct 2021 17:19:47 +0200 Subject: [PATCH 05/14] removed old datacite oozie workflow --- .../actionset/oozie_app/config-default.xml | 23 ----- .../datacite/actionset/oozie_app/workflow.xml | 46 ---------- .../datacite/oozie_app/config-default.xml | 23 ----- .../datacite/oozie_app/workflow.xml | 81 ------------------ .../scholix/oozie_app/config-default.xml | 23 ----- .../datacite/scholix/oozie_app/workflow.xml | 84 ------------------- 6 files changed, 280 deletions(-) delete mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/actionset/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/actionset/oozie_app/workflow.xml delete mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/workflow.xml delete mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/actionset/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/actionset/oozie_app/config-default.xml deleted file mode 100644 index dd3c32c62..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/actionset/oozie_app/config-default.xml +++ /dev/null @@ -1,23 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - - - oozie.launcher.mapreduce.user.classpath.first - true - - \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/actionset/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/actionset/oozie_app/workflow.xml deleted file mode 100644 index 3c58ace7b..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/actionset/oozie_app/workflow.xml +++ /dev/null @@ -1,46 +0,0 @@ - - - - sourcePath - the working path of Datacite stores - - - outputPath - the path of Datacite ActionSet - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - yarn-cluster - cluster - ExportDataset - eu.dnetlib.dhp.actionmanager.datacite.ExportActionSetJobNode - dhp-aggregation-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --sourcePath${sourcePath} - --targetPath${outputPath} - --masteryarn-cluster - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/config-default.xml deleted file mode 100644 index dd3c32c62..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/config-default.xml +++ /dev/null @@ -1,23 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - - - oozie.launcher.mapreduce.user.classpath.first - true - - \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/workflow.xml deleted file mode 100644 index c6332ff7d..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/workflow.xml +++ /dev/null @@ -1,81 +0,0 @@ - - - - mainPath - the working path of Datacite stores - - - isLookupUrl - The IS lookUp service endopoint - - - blocksize - 100 - The request block size - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - yarn-cluster - cluster - ImportDatacite - eu.dnetlib.dhp.actionmanager.datacite.ImportDatacite - dhp-aggregation-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --targetPath${mainPath}/datacite_update - --dataciteDumpPath${mainPath}/datacite_dump - --namenode${nameNode} - --masteryarn-cluster - --blocksize${blocksize} - - - - - - - - - yarn-cluster - cluster - TransformJob - eu.dnetlib.dhp.actionmanager.datacite.GenerateDataciteDatasetSpark - dhp-aggregation-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --sourcePath${mainPath}/datacite_dump - --targetPath${mainPath}/datacite_oaf - --isLookupUrl${isLookupUrl} - --exportLinksfalse - --masteryarn-cluster - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/config-default.xml deleted file mode 100644 index dd3c32c62..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/config-default.xml +++ /dev/null @@ -1,23 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - - - oozie.launcher.mapreduce.user.classpath.first - true - - \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/workflow.xml deleted file mode 100644 index 397288c69..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/workflow.xml +++ /dev/null @@ -1,84 +0,0 @@ - - - - datacitePath - the path of Datacite spark dataset - - - isLookupUrl - The IS lookUp service endopoint - - - crossrefPath - the path of Crossref spark dataset - - - - targetPath - the path of Crossref spark dataset - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - yarn-cluster - cluster - ImportDatacite - eu.dnetlib.dhp.actionmanager.datacite.GenerateDataciteDatasetSpark - dhp-aggregation-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --sourcePath${datacitePath} - --targetPath${targetPath}/datacite_oaf - --isLookupUrl${isLookupUrl} - --exportLinkstrue - --masteryarn-cluster - - - - - - - - - yarn-cluster - cluster - FilterCrossrefEntities - eu.dnetlib.dhp.actionmanager.datacite.FilterCrossrefEntitiesSpark - dhp-aggregation-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --sourcePath${crossrefPath} - --targetPath${targetPath}/crossref_oaf - --masteryarn-cluster - - - - - - - \ No newline at end of file From aeeebd573b2538e78c81ec8b9680ab5f4b4a794d Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 20 Oct 2021 17:37:42 +0200 Subject: [PATCH 06/14] code refactor renamed datacite package --- .../datacite/ExportActionSetJobNode.scala | 41 ----------------- .../FilterCrossrefEntitiesSpark.scala | 46 ------------------- .../datacite/AbstractRestClient.scala | 8 ++-- .../datacite/DataciteAPIImporter.scala | 4 +- .../DataciteToOAFTransformation.scala | 2 +- .../GenerateDataciteDatasetSpark.scala | 19 ++++---- .../datacite/ImportDatacite.scala | 21 ++++----- .../SparkDownloadUpdateDatacite.scala | 16 +++---- .../collection/oozie_app/config-default.xml | 0 .../collection/oozie_app/workflow.xml | 2 +- .../datacite/datacite_filter | 0 .../datacite/exportDataset_parameters.json | 0 .../datacite/filter_crossref_param.json | 0 .../datacite/generate_dataset_params.json | 0 .../datacite/hostedBy_map.json | 0 .../datacite/import_from_api.json | 0 .../oozie_app/config-default.xml | 0 .../transformation/oozie_app/workflow.xml | 2 +- .../datacite/DataciteToOAFTest.scala | 5 +- 19 files changed, 36 insertions(+), 130 deletions(-) delete mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ExportActionSetJobNode.scala delete mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/FilterCrossrefEntitiesSpark.scala rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/{actionmanager => }/datacite/AbstractRestClient.scala (90%) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/{actionmanager => }/datacite/DataciteAPIImporter.scala (95%) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/{actionmanager => }/datacite/DataciteToOAFTransformation.scala (99%) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/{actionmanager => }/datacite/GenerateDataciteDatasetSpark.scala (86%) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/{actionmanager => }/datacite/ImportDatacite.scala (93%) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/{actionmanager => }/datacite/SparkDownloadUpdateDatacite.scala (68%) rename dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/{actionmanager => }/datacite/collection/oozie_app/config-default.xml (100%) rename dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/{actionmanager => }/datacite/collection/oozie_app/workflow.xml (95%) rename dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/{actionmanager => }/datacite/datacite_filter (100%) rename dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/{actionmanager => }/datacite/exportDataset_parameters.json (100%) rename dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/{actionmanager => }/datacite/filter_crossref_param.json (100%) rename dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/{actionmanager => }/datacite/generate_dataset_params.json (100%) rename dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/{actionmanager => }/datacite/hostedBy_map.json (100%) rename dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/{actionmanager => }/datacite/import_from_api.json (100%) rename dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/{actionmanager => }/datacite/transformation/oozie_app/config-default.xml (100%) rename dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/{actionmanager => }/datacite/transformation/oozie_app/workflow.xml (97%) rename dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/{actionmanager => }/datacite/DataciteToOAFTest.scala (88%) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ExportActionSetJobNode.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ExportActionSetJobNode.scala deleted file mode 100644 index 9f0d25735..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ExportActionSetJobNode.scala +++ /dev/null @@ -1,41 +0,0 @@ -package eu.dnetlib.dhp.actionmanager.datacite - -import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.Oaf -import org.apache.hadoop.io.Text -import org.apache.hadoop.io.compress.GzipCodec -import org.apache.hadoop.mapred.SequenceFileOutputFormat -import org.apache.spark.SparkConf -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} -import org.slf4j.{Logger, LoggerFactory} - -import scala.io.Source - -object ExportActionSetJobNode { - - val log: Logger = LoggerFactory.getLogger(ExportActionSetJobNode.getClass) - - def main(args: Array[String]): Unit = { - val conf = new SparkConf - val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/exportDataset_parameters.json")).mkString) - parser.parseArgument(args) - val master = parser.get("master") - val sourcePath = parser.get("sourcePath") - val targetPath = parser.get("targetPath") - - val spark: SparkSession = SparkSession.builder().config(conf) - .appName(ExportActionSetJobNode.getClass.getSimpleName) - .master(master) - .getOrCreate() - implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] - implicit val tEncoder:Encoder[(String,String)] = Encoders.tuple(Encoders.STRING,Encoders.STRING) - - spark.read.load(sourcePath).as[Oaf] - .map(o =>DataciteToOAFTransformation.toActionSet(o)) - .filter(o => o!= null) - .rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec]) - - - } - -} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/FilterCrossrefEntitiesSpark.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/FilterCrossrefEntitiesSpark.scala deleted file mode 100644 index 5860c50ac..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/FilterCrossrefEntitiesSpark.scala +++ /dev/null @@ -1,46 +0,0 @@ -package eu.dnetlib.dhp.actionmanager.datacite - -import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup -import eu.dnetlib.dhp.schema.mdstore.MetadataRecord -import eu.dnetlib.dhp.schema.oaf.{Oaf, Result} -import eu.dnetlib.dhp.utils.ISLookupClientFactory -import org.apache.spark.SparkConf -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} -import org.slf4j.{Logger, LoggerFactory} - -import scala.io.Source - -object FilterCrossrefEntitiesSpark { - - val log: Logger = LoggerFactory.getLogger(getClass.getClass) - - def main(args: Array[String]): Unit = { - val conf = new SparkConf - val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/filter_crossref_param.json")).mkString) - parser.parseArgument(args) - val master = parser.get("master") - val sourcePath = parser.get("sourcePath") - log.info("sourcePath: {}", sourcePath) - val targetPath = parser.get("targetPath") - log.info("targetPath: {}", targetPath) - - - - val spark: SparkSession = SparkSession.builder().config(conf) - .appName(getClass.getSimpleName) - .master(master) - .getOrCreate() - - - - implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] - implicit val resEncoder: Encoder[Result] = Encoders.kryo[Result] - - val d:Dataset[Oaf]= spark.read.load(sourcePath).as[Oaf] - - d.filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result]).write.mode(SaveMode.Overwrite).save(targetPath) - - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/AbstractRestClient.scala similarity index 90% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/AbstractRestClient.scala index bae41b218..6a9b8e3e5 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/AbstractRestClient.scala @@ -1,12 +1,10 @@ -package eu.dnetlib.dhp.actionmanager.datacite +package eu.dnetlib.dhp.datacite import org.apache.commons.io.IOUtils import org.apache.http.client.config.RequestConfig -import org.apache.http.client.methods.{HttpGet, HttpPost, HttpRequestBase, HttpUriRequest} +import org.apache.http.client.methods.{HttpGet, HttpPost, HttpUriRequest} import org.apache.http.entity.StringEntity -import org.apache.http.impl.client.{HttpClientBuilder, HttpClients} - -import java.io.IOException +import org.apache.http.impl.client.HttpClientBuilder abstract class AbstractRestClient extends Iterator[String] { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteAPIImporter.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteAPIImporter.scala similarity index 95% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteAPIImporter.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteAPIImporter.scala index 36ec9e8c3..7ec44a6ff 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteAPIImporter.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteAPIImporter.scala @@ -1,7 +1,7 @@ -package eu.dnetlib.dhp.actionmanager.datacite +package eu.dnetlib.dhp.datacite -import org.json4s.{DefaultFormats, JValue} import org.json4s.jackson.JsonMethods.{compact, parse, render} +import org.json4s.{DefaultFormats, JValue} class DataciteAPIImporter(timestamp: Long = 0, blocks: Long = 10, until:Long = -1) extends AbstractRestClient { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala similarity index 99% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala index e3729e5b7..6ce4920ed 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.actionmanager.datacite +package eu.dnetlib.dhp.datacite import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala similarity index 86% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala index 65d00c4d1..a63627d1c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala @@ -1,10 +1,11 @@ -package eu.dnetlib.dhp.actionmanager.datacite +package eu.dnetlib.dhp.datacite import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH} -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.collection.CollectionUtils.fixRelations +import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH +import eu.dnetlib.dhp.common.Constants.MDSTORE_SIZE_PATH +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.schema.mdstore.{MDStoreVersion, MetadataRecord} import eu.dnetlib.dhp.schema.oaf.Oaf import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile @@ -21,7 +22,7 @@ object GenerateDataciteDatasetSpark { def main(args: Array[String]): Unit = { val conf = new SparkConf - val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json")).mkString) + val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/datacite/generate_dataset_params.json")).mkString) parser.parseArgument(args) val master = parser.get("master") val sourcePath = parser.get("sourcePath") @@ -36,12 +37,12 @@ object GenerateDataciteDatasetSpark { .master(master) .getOrCreate() + import spark.implicits._ + implicit val mrEncoder: Encoder[MetadataRecord] = Encoders.kryo[MetadataRecord] implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] - import spark.implicits._ - val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") val mapper = new ObjectMapper() val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) @@ -54,10 +55,10 @@ object GenerateDataciteDatasetSpark { .filter(d => d.isActive) .flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks)) .filter(d => d != null) - .flatMap(i=> fixRelations(i)).filter(i => i != null) + .flatMap(i => fixRelations(i)).filter(i => i != null) .write.mode(SaveMode.Overwrite).save(targetPath) - val total_items =spark.read.load(targetPath).as[Oaf].count() + val total_items = spark.read.load(targetPath).as[Oaf].count() writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$total_items", outputBasePath + MDSTORE_SIZE_PATH) } -} \ No newline at end of file +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/ImportDatacite.scala similarity index 93% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/ImportDatacite.scala index 2b73d2955..018b4958a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/ImportDatacite.scala @@ -1,6 +1,5 @@ -package eu.dnetlib.dhp.actionmanager.datacite +package eu.dnetlib.dhp.datacite -import eu.dnetlib.dhp.actionmanager.datacite.DataciteToOAFTransformation.df_it import eu.dnetlib.dhp.application.ArgumentApplicationParser import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, LocalFileSystem, Path} @@ -9,14 +8,14 @@ import org.apache.hadoop.io.{IntWritable, SequenceFile, Text} import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.sql.functions.max import org.apache.spark.sql.{Dataset, Encoder, SaveMode, SparkSession} import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse -import org.apache.spark.sql.functions.max import org.slf4j.{Logger, LoggerFactory} -import java.time.format.DateTimeFormatter._ -import java.time.{LocalDate, LocalDateTime, ZoneOffset} +import java.time.format.DateTimeFormatter.ISO_DATE_TIME +import java.time.{LocalDateTime, ZoneOffset} import scala.io.Source object ImportDatacite { @@ -138,11 +137,11 @@ object ImportDatacite { } } - private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration, bs:Int): Long = { - var from:Long = timestamp * 1000 - val delta:Long = 100000000L + private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration, bs: Int): Long = { + var from: Long = timestamp * 1000 + val delta: Long = 100000000L var client: DataciteAPIImporter = null - val now :Long =System.currentTimeMillis() + val now: Long = System.currentTimeMillis() var i = 0 try { val writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(hdfsTargetPath), SequenceFile.Writer.keyClass(classOf[IntWritable]), SequenceFile.Writer.valueClass(classOf[Text])) @@ -168,7 +167,7 @@ object ImportDatacite { start = System.currentTimeMillis } } - println(s"updating from value: $from -> ${from+delta}") + println(s"updating from value: $from -> ${from + delta}") from = from + delta } } catch { @@ -183,4 +182,4 @@ object ImportDatacite { i } -} \ No newline at end of file +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/SparkDownloadUpdateDatacite.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/SparkDownloadUpdateDatacite.scala similarity index 68% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/SparkDownloadUpdateDatacite.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/SparkDownloadUpdateDatacite.scala index 0b459fcf6..d46e5423d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/SparkDownloadUpdateDatacite.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/SparkDownloadUpdateDatacite.scala @@ -1,18 +1,14 @@ -package eu.dnetlib.dhp.actionmanager.datacite - +package eu.dnetlib.dhp.datacite import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.{Oaf, Result} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.LocalFileSystem -import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.spark.SparkConf -import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.apache.spark.sql.functions.max +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.slf4j.{Logger, LoggerFactory} import java.text.SimpleDateFormat -import java.util.{Date, Locale} +import java.util.Locale import scala.io.Source object SparkDownloadUpdateDatacite { @@ -21,7 +17,7 @@ object SparkDownloadUpdateDatacite { def main(args: Array[String]): Unit = { val conf = new SparkConf - val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json")).mkString) + val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/datacite/generate_dataset_params.json")).mkString) parser.parseArgument(args) val master = parser.get("master") val sourcePath = parser.get("sourcePath") @@ -42,9 +38,9 @@ object SparkDownloadUpdateDatacite { import spark.implicits._ - val maxDate:String = spark.read.load(workingPath).as[Oaf].filter(s => s.isInstanceOf[Result]).map(r => r.asInstanceOf[Result].getDateofcollection).select(max("value")).first().getString(0) + val maxDate: String = spark.read.load(workingPath).as[Oaf].filter(s => s.isInstanceOf[Result]).map(r => r.asInstanceOf[Result].getDateofcollection).select(max("value")).first().getString(0) val ISO8601FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.US) - val string_to_date =ISO8601FORMAT.parse(maxDate) + val string_to_date = ISO8601FORMAT.parse(maxDate) val ts = string_to_date.getTime diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/collection/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/config-default.xml rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/collection/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/collection/oozie_app/workflow.xml similarity index 95% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/workflow.xml rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/collection/oozie_app/workflow.xml index 41a2e2291..6989eed66 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/collection/oozie_app/workflow.xml @@ -28,7 +28,7 @@ yarn-cluster cluster ImportDatacite - eu.dnetlib.dhp.actionmanager.datacite.ImportDatacite + eu.dnetlib.dhp.datacite.ImportDatacite dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/datacite_filter b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/datacite_filter similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/datacite_filter rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/datacite_filter diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/exportDataset_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/exportDataset_parameters.json similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/exportDataset_parameters.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/exportDataset_parameters.json diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/filter_crossref_param.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/filter_crossref_param.json similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/filter_crossref_param.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/filter_crossref_param.json diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/generate_dataset_params.json similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/generate_dataset_params.json diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/hostedBy_map.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/hostedBy_map.json similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/hostedBy_map.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/hostedBy_map.json diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/import_from_api.json similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/import_from_api.json diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/transformation/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/config-default.xml rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/transformation/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/transformation/oozie_app/workflow.xml similarity index 97% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/workflow.xml rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/transformation/oozie_app/workflow.xml index aeb824a41..d5bae7305 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/transformation/oozie_app/workflow.xml @@ -47,7 +47,7 @@ yarn-cluster cluster TransformJob - eu.dnetlib.dhp.actionmanager.datacite.GenerateDataciteDatasetSpark + eu.dnetlib.dhp.datacite.GenerateDataciteDatasetSpark dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTest.scala b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala similarity index 88% rename from dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTest.scala rename to dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala index a795a910d..f21e9eab1 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTest.scala +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala @@ -1,8 +1,7 @@ -package eu.dnetlib.dhp.actionmanager.datacite +package eu.dnetlib.dhp.datacite -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.databind.SerializationFeature +import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature} import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest import eu.dnetlib.dhp.schema.oaf.Oaf import org.junit.jupiter.api.extension.ExtendWith From ac36aa7d1c9324b1b018a71eeb3b4f94b05165a6 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 21 Oct 2021 11:35:02 +0200 Subject: [PATCH 07/14] fixed wrong Encoding during a map phase --- .../eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java index c016e2156..77971a5a4 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java @@ -102,7 +102,7 @@ public class CopyHdfsOafApplication extends AbstractMigrationApplication { for (Map.Entry e : ModelSupport.oafTypes.entrySet()) { oaf .filter((FilterFunction) o -> o.getClass().getSimpleName().toLowerCase().equals(e.getKey())) - .map((MapFunction) OBJECT_MAPPER::writeValueAsString, Encoders.bean(e.getValue())) + .map((MapFunction) OBJECT_MAPPER::writeValueAsString, Encoders.STRING()) .write() .option("compression", "gzip") .mode(SaveMode.Append) From 3702fe478d2082cd28b1f27bf0d094600d2e8b9b Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 21 Oct 2021 12:05:02 +0200 Subject: [PATCH 08/14] cleanup --- .../oa/graph/raw/CopyHdfsOafApplication.java | 20 +------------------ 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java index 77971a5a4..792264e18 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java @@ -110,23 +110,5 @@ public class CopyHdfsOafApplication extends AbstractMigrationApplication { } } } - - private static Relation getInverse(Relation rel, VocabularyGroup vocs) { - final Relation inverse = new Relation(); - - inverse.setProperties(rel.getProperties()); - inverse.setValidated(rel.getValidated()); - inverse.setValidationDate(rel.getValidationDate()); - inverse.setCollectedfrom(rel.getCollectedfrom()); - inverse.setDataInfo(rel.getDataInfo()); - inverse.setLastupdatetimestamp(rel.getLastupdatetimestamp()); - - inverse.setSource(rel.getTarget()); - inverse.setTarget(rel.getSource()); - inverse.setRelType(rel.getRelType()); - inverse.setSubRelType(rel.getSubRelType()); - - return inverse; - } - + } From d147295c2f9b531b8da7973436694c0354c79ba9 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 21 Oct 2021 14:15:57 +0200 Subject: [PATCH 09/14] avoiding java.io.NotSerializableException: java.util.HashMap --- .../dhp/oa/graph/raw/CopyHdfsOafApplication.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java index 792264e18..31ebcbc6e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java @@ -31,6 +31,7 @@ import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import scala.Tuple2; public class CopyHdfsOafApplication extends AbstractMigrationApplication { @@ -73,10 +74,13 @@ public class CopyHdfsOafApplication extends AbstractMigrationApplication { conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - runWithSparkSession(conf, isSparkSessionManaged, spark -> processPaths(spark, hdfsPath, paths)); + final List oafTypes = Lists.newArrayList(ModelSupport.oafTypes.keySet()); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> processPaths(spark, oafTypes, hdfsPath, paths)); } public static void processPaths(final SparkSession spark, + final List oafTypes, final String outputPath, final Set paths) { @@ -99,16 +103,16 @@ public class CopyHdfsOafApplication extends AbstractMigrationApplication { .as(Encoders.kryo(Oaf.class)); // dispatch each entity type individually in the respective graph subdirectory in append mode - for (Map.Entry e : ModelSupport.oafTypes.entrySet()) { + for (String type : oafTypes) { oaf - .filter((FilterFunction) o -> o.getClass().getSimpleName().toLowerCase().equals(e.getKey())) + .filter((FilterFunction) o -> o.getClass().getSimpleName().toLowerCase().equals(type)) .map((MapFunction) OBJECT_MAPPER::writeValueAsString, Encoders.STRING()) .write() .option("compression", "gzip") .mode(SaveMode.Append) - .text(outputPath + "/" + e.getKey()); + .text(outputPath + "/" + type); } } } - + } From 6b34ba737eae62a3c8c61dc279fbfdd17233aa18 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 21 Oct 2021 14:16:18 +0200 Subject: [PATCH 10/14] minor --- .../src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java | 2 +- dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java index 0b2cd571f..654fdd5ac 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java @@ -28,7 +28,7 @@ public class HdfsSupport { * @param configuration Configuration of hadoop env */ public static boolean exists(String path, Configuration configuration) { - logger.info("Removing path: {}", path); + logger.info("Checking existence for path: {}", path); return rethrowAsRuntimeException( () -> { Path f = new Path(path); diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java index c53affbad..4fee87ed7 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java @@ -10,7 +10,6 @@ import java.util.Properties; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64OutputStream; import org.apache.commons.codec.binary.Hex; @@ -27,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; import com.jayway.jsonpath.JsonPath; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; import net.minidev.json.JSONArray; import scala.collection.JavaConverters; import scala.collection.Seq; @@ -57,7 +57,6 @@ public class DHPUtils { return String.format("%s::%s", nsPrefix, DHPUtils.md5(originalId)); } - public static String generateUnresolvedIdentifier(final String pid, final String pidType) { final String cleanedPid = CleaningFunctions.normalizePidValue(pidType, pid); From 034304b33af2b8fdec46cafc8f3c81f4fb704c2e Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 26 Oct 2021 09:40:47 +0200 Subject: [PATCH 11/14] conflict resolved on merge --- .../java/eu/dnetlib/dhp/utils/DHPUtils.java | 82 ++++++++---- .../dhp/sx/bio/ebi/oozie_app/workflow.xml | 4 +- .../dhp/sx/bio/pubmed/oozie_app/workflow.xml | 2 +- .../oa/graph/raw/CopyHdfsOafApplication.java | 118 ------------------ .../raw/CopyHdfsOafSparkApplication.scala | 74 +++++++++++ .../common/AbstractMigrationApplication.java | 23 +--- .../oa/graph/copy_hdfs_oaf_parameters.json | 6 + .../oa/graph/raw_all/oozie_app/workflow.xml | 3 +- 8 files changed, 147 insertions(+), 165 deletions(-) delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java index 4fee87ed7..66e4cb780 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java @@ -1,36 +1,35 @@ package eu.dnetlib.dhp.utils; -import java.io.*; -import java.nio.charset.StandardCharsets; -import java.security.MessageDigest; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.codec.binary.Base64OutputStream; -import org.apache.commons.codec.binary.Hex; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.SaveMode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; import com.jayway.jsonpath.JsonPath; - +import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; import net.minidev.json.JSONArray; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SaveMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.collection.JavaConverters; import scala.collection.Seq; +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.util.*; +import java.util.stream.Collectors; + public class DHPUtils { private static final Logger log = LoggerFactory.getLogger(DHPUtils.class); @@ -53,6 +52,45 @@ public class DHPUtils { } } + /** + * Retrieves from the metadata store manager application the list of paths associated with mdstores characterized + * by he given format, layout, interpretation + * @param mdstoreManagerUrl the URL of the mdstore manager service + * @param format the mdstore format + * @param layout the mdstore layout + * @param interpretation the mdstore interpretation + * @param includeEmpty include Empty mdstores + * @return the set of hdfs paths + * @throws IOException in case of HTTP communication issues + */ + public static Set mdstorePaths(final String mdstoreManagerUrl, + final String format, + final String layout, + final String interpretation, + boolean includeEmpty) throws IOException { + final String url = mdstoreManagerUrl + "/mdstores/"; + final ObjectMapper objectMapper = new ObjectMapper(); + + final HttpGet req = new HttpGet(url); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + final String json = IOUtils.toString(response.getEntity().getContent()); + final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class); + return Arrays + .stream(mdstores) + .filter(md -> md.getFormat().equalsIgnoreCase(format)) + .filter(md -> md.getLayout().equalsIgnoreCase(layout)) + .filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation)) + .filter(md -> StringUtils.isNotBlank(md.getHdfsPath())) + .filter(md -> StringUtils.isNotBlank(md.getCurrentVersion())) + .filter(md -> includeEmpty || md.getSize() > 0) + .map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store") + .collect(Collectors.toSet()); + } + } + } + public static String generateIdentifier(final String originalId, final String nsPrefix) { return String.format("%s::%s", nsPrefix, DHPUtils.md5(originalId)); } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml index 73b1a3b60..4b47ae38e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml @@ -52,7 +52,7 @@ yarn-cluster cluster Incremental Download EBI Links - eu.dnetllib.dhp.sx.bio.ebi.SparkDownloadEBILinks + eu.dnetlib.dhp.sx.bio.ebi.SparkDownloadEBILinks dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -85,7 +85,7 @@ yarn-cluster cluster Create OAF DataSet - eu.dnetllib.dhp.sx.bio.ebi.SparkEBILinksToOaf + eu.dnetlib.dhp.sx.bio.ebi.SparkEBILinksToOaf dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml index 21fd2d153..8915a090b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml @@ -30,7 +30,7 @@ yarn cluster Convert Baseline to OAF Dataset - eu.dnetllib.dhp.sx.bio.ebi.SparkCreateBaselineDataFrame + eu.dnetlib.dhp.sx.bio.ebi.SparkCreateBaselineDataFrame dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java deleted file mode 100644 index 31ebcbc6e..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java +++ /dev/null @@ -1,118 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.raw; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.util.*; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.clearspring.analytics.util.Lists; -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; -import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; -import eu.dnetlib.dhp.schema.common.EntityType; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import scala.Tuple2; - -public class CopyHdfsOafApplication extends AbstractMigrationApplication { - - private static final Logger log = LoggerFactory.getLogger(CopyHdfsOafApplication.class); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - public static void main(final String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - CopyHdfsOafApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json"))); - parser.parseArgument(args); - - final Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String mdstoreManagerUrl = parser.get("mdstoreManagerUrl"); - log.info("mdstoreManagerUrl: {}", mdstoreManagerUrl); - - final String mdFormat = parser.get("mdFormat"); - log.info("mdFormat: {}", mdFormat); - - final String mdLayout = parser.get("mdLayout"); - log.info("mdLayout: {}", mdLayout); - - final String mdInterpretation = parser.get("mdInterpretation"); - log.info("mdInterpretation: {}", mdInterpretation); - - final String hdfsPath = parser.get("hdfsPath"); - log.info("hdfsPath: {}", hdfsPath); - - final Set paths = mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation); - - final SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - - final List oafTypes = Lists.newArrayList(ModelSupport.oafTypes.keySet()); - - runWithSparkSession(conf, isSparkSessionManaged, spark -> processPaths(spark, oafTypes, hdfsPath, paths)); - } - - public static void processPaths(final SparkSession spark, - final List oafTypes, - final String outputPath, - final Set paths) { - - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - - log.info("Found {} mdstores", paths.size()); - paths.forEach(log::info); - - final String[] validPaths = paths - .stream() - .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) - .toArray(String[]::new); - log.info("Non empty mdstores {}", validPaths.length); - - if (validPaths.length > 0) { - // load the dataset - Dataset oaf = spark - .read() - .load(validPaths) - .as(Encoders.kryo(Oaf.class)); - - // dispatch each entity type individually in the respective graph subdirectory in append mode - for (String type : oafTypes) { - oaf - .filter((FilterFunction) o -> o.getClass().getSimpleName().toLowerCase().equals(type)) - .map((MapFunction) OBJECT_MAPPER::writeValueAsString, Encoders.STRING()) - .write() - .option("compression", "gzip") - .mode(SaveMode.Append) - .text(outputPath + "/" + type); - } - } - } - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala new file mode 100644 index 000000000..c7ad1890d --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala @@ -0,0 +1,74 @@ +package eu.dnetlib.dhp.oa.graph.raw + +import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.common.HdfsSupport +import eu.dnetlib.dhp.schema.common.ModelSupport +import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo +import eu.dnetlib.dhp.schema.oaf.Oaf +import eu.dnetlib.dhp.utils.DHPUtils +import org.apache.commons.io.IOUtils +import org.apache.commons.lang3.StringUtils +import org.apache.http.client.methods.HttpGet +import org.apache.http.impl.client.HttpClients +import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.{SparkConf, SparkContext} +import org.slf4j.LoggerFactory + +import scala.collection.JavaConverters._ +import scala.io.Source + +object CopyHdfsOafSparkApplication { + + def main(args: Array[String]): Unit = { + val log = LoggerFactory.getLogger(getClass) + val conf = new SparkConf() + val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json")).mkString) + parser.parseArgument(args) + + val spark = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + val sc: SparkContext = spark.sparkContext + + val mdstoreManagerUrl = parser.get("mdstoreManagerUrl") + log.info("mdstoreManagerUrl: {}", mdstoreManagerUrl) + + val mdFormat = parser.get("mdFormat") + log.info("mdFormat: {}", mdFormat) + + val mdLayout = parser.get("mdLayout") + log.info("mdLayout: {}", mdLayout) + + val mdInterpretation = parser.get("mdInterpretation") + log.info("mdInterpretation: {}", mdInterpretation) + + val hdfsPath = parser.get("hdfsPath") + log.info("hdfsPath: {}", hdfsPath) + + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] + + val paths = DHPUtils.mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation, true).asScala + + val validPaths: List[String] = paths.filter(p => HdfsSupport.exists(p, sc.hadoopConfiguration)).toList + + if (validPaths.nonEmpty) { + val oaf = spark.read.load(validPaths: _*).as[Oaf] + val mapper = new ObjectMapper() + val l =ModelSupport.oafTypes.entrySet.asScala.map(e => e.getKey).toList + l.foreach( + e => + oaf.filter(o => o.getClass.getSimpleName.equalsIgnoreCase(e)) + .map(s => mapper.writeValueAsString(s))(Encoders.STRING) + .write + .option("compression", "gzip") + .mode(SaveMode.Append) + .text(s"$hdfsPath/${e}") + ) + } + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java index 8cd495e08..cba64899b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.utils.DHPUtils; public class AbstractMigrationApplication implements Closeable { @@ -71,27 +72,7 @@ public class AbstractMigrationApplication implements Closeable { final String format, final String layout, final String interpretation) throws IOException { - final String url = mdstoreManagerUrl + "/mdstores/"; - final ObjectMapper objectMapper = new ObjectMapper(); - - final HttpGet req = new HttpGet(url); - - try (final CloseableHttpClient client = HttpClients.createDefault()) { - try (final CloseableHttpResponse response = client.execute(req)) { - final String json = IOUtils.toString(response.getEntity().getContent()); - final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class); - return Arrays - .stream(mdstores) - .filter(md -> md.getFormat().equalsIgnoreCase(format)) - .filter(md -> md.getLayout().equalsIgnoreCase(layout)) - .filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation)) - .filter(md -> StringUtils.isNotBlank(md.getHdfsPath())) - .filter(md -> StringUtils.isNotBlank(md.getCurrentVersion())) - .filter(md -> md.getSize() > 0) - .map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store") - .collect(Collectors.toSet()); - } - } + return DHPUtils.mdstorePaths(mdstoreManagerUrl, format, layout, interpretation, false); } private Configuration getConf() { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json index 1d89017c5..d1b16b09a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json @@ -23,6 +23,12 @@ "paramDescription": "metadata layout", "paramRequired": true }, + { + "paramName": "m", + "paramLongName": "master", + "paramDescription": "should be yarn or local", + "paramRequired": true + }, { "paramName": "i", "paramLongName": "mdInterpretation", diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index 137c69ed8..307e26267 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -553,7 +553,7 @@ yarn cluster ImportOAF_hdfs_graph - eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafApplication + eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafSparkApplication dhp-graph-mapper-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} @@ -568,6 +568,7 @@ --mdstoreManagerUrl${mdstoreManagerUrl} --mdFormatOAF --mdLayoutstore + --masteryarn --mdInterpretationgraph From aafdffa6b3c3d423a1b9a0085b4d6540bb723e13 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 26 Oct 2021 09:45:46 +0200 Subject: [PATCH 12/14] resolved conflict --- .../java/eu/dnetlib/dhp/utils/DHPUtils.java | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java index 66e4cb780..5a59bc0df 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java @@ -1,12 +1,12 @@ package eu.dnetlib.dhp.utils; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Maps; -import com.jayway.jsonpath.JsonPath; -import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; -import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; -import net.minidev.json.JSONArray; +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.util.*; +import java.util.stream.Collectors; + import org.apache.commons.codec.binary.Hex; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -21,15 +21,17 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SaveMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; +import com.jayway.jsonpath.JsonPath; + +import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import net.minidev.json.JSONArray; import scala.collection.JavaConverters; import scala.collection.Seq; -import java.io.*; -import java.nio.charset.StandardCharsets; -import java.security.MessageDigest; -import java.util.*; -import java.util.stream.Collectors; - public class DHPUtils { private static final Logger log = LoggerFactory.getLogger(DHPUtils.class); From 4acfa8fa2e317874caabe209f1d6c685b36d9a66 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 26 Oct 2021 17:51:20 +0200 Subject: [PATCH 13/14] Scholexplorer Datasource Aggregation: - Added collectedfrom in the inverse relation generated Relation resolution: - increased number of partitions in workflow.xml - using classid instead of classname to build the pid-dnetId mapping --- .../java/eu/dnetlib/dhp/collection/CollectionUtils.scala | 6 ++++++ .../dhp/oa/graph/resolution/SparkResolveRelation.scala | 4 ++-- .../dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala index e212d7e2a..11ecfd6cb 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala @@ -34,6 +34,12 @@ object CollectionUtils { inverse.setRelType(currentRel.getRelType) inverse.setSubRelType(currentRel.getSubReltype) inverse.setRelClass(currentRel.getInverseRelClass) + inverse.setCollectedfrom(r.getCollectedfrom) + inverse.setDataInfo(r.getDataInfo) + inverse.setProperties(r.getProperties) + inverse.setLastupdatetimestamp(r.getLastupdatetimestamp) + inverse.setValidated(r.getValidated) + inverse.setValidationDate(r.getValidationDate) return List(r, inverse) } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala index e87f46b00..5ca7d9782 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala @@ -104,14 +104,14 @@ object SparkResolveRelation { JObject(pids) <- json \\ "instance" \ "pid" JField("value", JString(pidValue)) <- pids JField("qualifier", JObject(qualifier)) <- pids - JField("classname", JString(pidType)) <- qualifier + JField("classid", JString(pidType)) <- qualifier } yield (pidValue, pidType) val alternateIds: List[(String, String)] = for { JObject(pids) <- json \\ "alternateIdentifier" JField("value", JString(pidValue)) <- pids JField("qualifier", JObject(qualifier)) <- pids - JField("classname", JString(pidType)) <- qualifier + JField("classid", JString(pidType)) <- qualifier } yield (pidValue, pidType) (id, result ::: alternateIds) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml index e9e1a8ede..31cc53ae3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml @@ -24,7 +24,7 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=3000 + --conf spark.sql.shuffle.partitions=8000 --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} From 1be9aa0a5f2d747ad7eb41191956d14a4f3c0943 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 26 Oct 2021 17:52:20 +0200 Subject: [PATCH 14/14] Removed filter of datacite items from the raw graph merging phase, Datacite is not an actionset anymore in beta --- .../dhp/oa/graph/merge/MergeGraphTableSparkJob.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java index ef419a042..474944260 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java @@ -127,13 +127,6 @@ public class MergeGraphTableSparkJob { } }, Encoders.bean(p_clazz)) .filter((FilterFunction

) Objects::nonNull) - .filter((FilterFunction

) o -> { - HashSet collectedFromNames = Optional - .ofNullable(o.getCollectedfrom()) - .map(c -> c.stream().map(KeyValue::getValue).collect(Collectors.toCollection(HashSet::new))) - .orElse(new HashSet<>()); - return !collectedFromNames.contains("Datacite"); - }) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip")