From bced8041511f7ff2b8b671360119d68f6b552270 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 26 May 2021 17:06:50 +0200 Subject: [PATCH] updated wf Datacite Import to retrieve the block size as parameter --- .../dhp/actionmanager/datacite/ImportDatacite.scala | 7 ++++--- .../dhp/actionmanager/datacite/import_from_api.json | 6 ++++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala index d6101ba7a..931ac06f6 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala @@ -56,6 +56,7 @@ object ImportDatacite { val hdfsTargetPath = new Path(targetPath) log.info(s"hdfsTargetPath is $hdfsTargetPath") + val bs = if (parser.get("blocksize") == null) 100 else parser.get("blocksize").toInt val spkipImport = parser.get("skipImport") log.info(s"skipImport is $spkipImport") @@ -110,7 +111,7 @@ object ImportDatacite { println(s"last Timestamp is $ts") - val cnt = if ("true".equalsIgnoreCase(spkipImport)) 1 else writeSequenceFile(hdfsTargetPath, ts, conf) + val cnt = if ("true".equalsIgnoreCase(spkipImport)) 1 else writeSequenceFile(hdfsTargetPath, ts, conf, bs) println(s"Imported from Datacite API $cnt documents") @@ -137,7 +138,7 @@ object ImportDatacite { } } - private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration): Long = { + private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration, bs:Int): Long = { var from:Long = timestamp * 1000 val delta:Long = 50000000L var client: DataciteAPIImporter = null @@ -148,7 +149,7 @@ object ImportDatacite { try { var start: Long = System.currentTimeMillis while (from < now) { - client = new DataciteAPIImporter(from, 100, from + delta) + client = new DataciteAPIImporter(from, bs, from + delta) var end: Long = 0 val key: IntWritable = new IntWritable(i) val value: Text = new Text diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json index 69fb039ba..a37ae4bba 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json @@ -18,6 +18,12 @@ "paramDescription": "avoid to downlaod new items but apply the previous update", "paramRequired": false }, + { + "paramName": "bs", + "paramLongName": "blocksize", + "paramDescription": "define the requests block size", + "paramRequired": false + }, { "paramName": "n", "paramLongName": "namenode",