From db5e18c7848d990c562ddb6f9f92054ff0bca0d8 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 28 May 2024 15:28:13 +0200 Subject: [PATCH] hostedby patching to work with the updated Crossref contents --- .../graph/hostedbymap/oozie_app/workflow.xml | 7 +++- .../SparkApplyHostedByMapToResult.scala | 37 ++++++++++++------- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/oozie_app/workflow.xml index c7fffed5b..e1c6967de 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/oozie_app/workflow.xml @@ -85,7 +85,7 @@ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + @@ -223,11 +223,13 @@ --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + --conf spark.sql.shuffle.partitions=15000 --hostedByMapPath${hostedByMapPath} --preparedInfoPath${workingDir}/preparedInfo @@ -253,11 +255,13 @@ --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + --conf spark.sql.shuffle.partitions=15000 --outputPath${outputPath}/publication --preparedInfoPath${workingDir}/preparedInfo @@ -278,6 +282,7 @@ --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToResult.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToResult.scala index a900fc241..803f95fc1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToResult.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToResult.scala @@ -25,27 +25,36 @@ object SparkApplyHostedByMapToResult { val i = p.getInstance().asScala if (i.size == 1) { val inst: Instance = i.head - inst.getHostedby.setKey(ei.getHostedById) - inst.getHostedby.setValue(ei.getName) - if (ei.getOpenAccess) { - inst.setAccessright( - OafMapperUtils.accessRight( - ModelConstants.ACCESS_RIGHT_OPEN, - "Open Access", - ModelConstants.DNET_ACCESS_MODES, - ModelConstants.DNET_ACCESS_MODES - ) - ) - inst.getAccessright.setOpenAccessRoute(OpenAccessRoute.gold) - p.setBestaccessright(OafMapperUtils.createBestAccessRights(p.getInstance())); - } + patchInstance(p, ei, inst) + } else if (i.size == 2) { + if (i.map(ii => ii.getCollectedfrom.getValue).contains("UnpayWall")) { + val inst: Instance = i.filter(ii => "Crossref".equals(ii.getCollectedfrom.getValue)).head + patchInstance(p, ei, inst) + } } } p })(Encoders.bean(classOf[Publication])) } + private def patchInstance(p: Publication, ei: EntityInfo, inst: Instance): Unit = { + inst.getHostedby.setKey(ei.getHostedById) + inst.getHostedby.setValue(ei.getName) + if (ei.getOpenAccess) { + inst.setAccessright( + OafMapperUtils.accessRight( + ModelConstants.ACCESS_RIGHT_OPEN, + "Open Access", + ModelConstants.DNET_ACCESS_MODES, + ModelConstants.DNET_ACCESS_MODES + ) + ) + inst.getAccessright.setOpenAccessRoute(OpenAccessRoute.gold) + p.setBestaccessright(OafMapperUtils.createBestAccessRights(p.getInstance())); + } + } + def main(args: Array[String]): Unit = { val logger: Logger = LoggerFactory.getLogger(getClass)