1
0
Fork 0

hostedby patching to work with the updated Crossref contents

This commit is contained in:
Claudio Atzori 2024-05-28 15:28:13 +02:00
parent fb266efbcb
commit db5e18c784
2 changed files with 29 additions and 15 deletions

View File

@ -85,7 +85,7 @@
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<fork name="fork_downloads_csv"> <fork name="fork_downloads_csv">
<path start="download_gold"/> <path start="download_gold"/>
<path start="download_doaj_json"/> <path start="download_doaj_json"/>
</fork> </fork>
@ -223,11 +223,13 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=15000
</spark-opts> </spark-opts>
<arg>--hostedByMapPath</arg><arg>${hostedByMapPath}</arg> <arg>--hostedByMapPath</arg><arg>${hostedByMapPath}</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg> <arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
@ -253,11 +255,13 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=15000
</spark-opts> </spark-opts>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg> <arg>--outputPath</arg><arg>${outputPath}/publication</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg> <arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
@ -278,6 +282,7 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}

View File

@ -25,27 +25,36 @@ object SparkApplyHostedByMapToResult {
val i = p.getInstance().asScala val i = p.getInstance().asScala
if (i.size == 1) { if (i.size == 1) {
val inst: Instance = i.head val inst: Instance = i.head
inst.getHostedby.setKey(ei.getHostedById) patchInstance(p, ei, inst)
inst.getHostedby.setValue(ei.getName)
if (ei.getOpenAccess) {
inst.setAccessright(
OafMapperUtils.accessRight(
ModelConstants.ACCESS_RIGHT_OPEN,
"Open Access",
ModelConstants.DNET_ACCESS_MODES,
ModelConstants.DNET_ACCESS_MODES
)
)
inst.getAccessright.setOpenAccessRoute(OpenAccessRoute.gold)
p.setBestaccessright(OafMapperUtils.createBestAccessRights(p.getInstance()));
}
} else if (i.size == 2) {
if (i.map(ii => ii.getCollectedfrom.getValue).contains("UnpayWall")) {
val inst: Instance = i.filter(ii => "Crossref".equals(ii.getCollectedfrom.getValue)).head
patchInstance(p, ei, inst)
}
} }
} }
p p
})(Encoders.bean(classOf[Publication])) })(Encoders.bean(classOf[Publication]))
} }
private def patchInstance(p: Publication, ei: EntityInfo, inst: Instance): Unit = {
inst.getHostedby.setKey(ei.getHostedById)
inst.getHostedby.setValue(ei.getName)
if (ei.getOpenAccess) {
inst.setAccessright(
OafMapperUtils.accessRight(
ModelConstants.ACCESS_RIGHT_OPEN,
"Open Access",
ModelConstants.DNET_ACCESS_MODES,
ModelConstants.DNET_ACCESS_MODES
)
)
inst.getAccessright.setOpenAccessRoute(OpenAccessRoute.gold)
p.setBestaccessright(OafMapperUtils.createBestAccessRights(p.getInstance()));
}
}
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(getClass) val logger: Logger = LoggerFactory.getLogger(getClass)