diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/oozie_app/workflow.xml
index c7fffed5b..e1c6967de 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/oozie_app/workflow.xml
@@ -85,7 +85,7 @@
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
+
@@ -223,11 +223,13 @@
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
+ --conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
+ --conf spark.sql.shuffle.partitions=15000
--hostedByMapPath${hostedByMapPath}
--preparedInfoPath${workingDir}/preparedInfo
@@ -253,11 +255,13 @@
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
+ --conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
+ --conf spark.sql.shuffle.partitions=15000
--outputPath${outputPath}/publication
--preparedInfoPath${workingDir}/preparedInfo
@@ -278,6 +282,7 @@
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
+ --conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToResult.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToResult.scala
index a900fc241..803f95fc1 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToResult.scala
+++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToResult.scala
@@ -25,27 +25,36 @@ object SparkApplyHostedByMapToResult {
val i = p.getInstance().asScala
if (i.size == 1) {
val inst: Instance = i.head
- inst.getHostedby.setKey(ei.getHostedById)
- inst.getHostedby.setValue(ei.getName)
- if (ei.getOpenAccess) {
- inst.setAccessright(
- OafMapperUtils.accessRight(
- ModelConstants.ACCESS_RIGHT_OPEN,
- "Open Access",
- ModelConstants.DNET_ACCESS_MODES,
- ModelConstants.DNET_ACCESS_MODES
- )
- )
- inst.getAccessright.setOpenAccessRoute(OpenAccessRoute.gold)
- p.setBestaccessright(OafMapperUtils.createBestAccessRights(p.getInstance()));
- }
+ patchInstance(p, ei, inst)
+ } else if (i.size == 2) {
+ if (i.map(ii => ii.getCollectedfrom.getValue).contains("UnpayWall")) {
+ val inst: Instance = i.filter(ii => "Crossref".equals(ii.getCollectedfrom.getValue)).head
+ patchInstance(p, ei, inst)
+ }
}
}
p
})(Encoders.bean(classOf[Publication]))
}
+ private def patchInstance(p: Publication, ei: EntityInfo, inst: Instance): Unit = {
+ inst.getHostedby.setKey(ei.getHostedById)
+ inst.getHostedby.setValue(ei.getName)
+ if (ei.getOpenAccess) {
+ inst.setAccessright(
+ OafMapperUtils.accessRight(
+ ModelConstants.ACCESS_RIGHT_OPEN,
+ "Open Access",
+ ModelConstants.DNET_ACCESS_MODES,
+ ModelConstants.DNET_ACCESS_MODES
+ )
+ )
+ inst.getAccessright.setOpenAccessRoute(OpenAccessRoute.gold)
+ p.setBestaccessright(OafMapperUtils.createBestAccessRights(p.getInstance()));
+ }
+ }
+
def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(getClass)