1
0
Fork 0

updated wf Datacite Import to retrieve the block size as parameter

This commit is contained in:
Sandro La Bruzzo 2021-05-26 17:06:50 +02:00
parent 4f58418184
commit bced804151
2 changed files with 10 additions and 3 deletions

View File

@ -56,6 +56,7 @@ object ImportDatacite {
val hdfsTargetPath = new Path(targetPath)
log.info(s"hdfsTargetPath is $hdfsTargetPath")
val bs = if (parser.get("blocksize") == null) 100 else parser.get("blocksize").toInt
val spkipImport = parser.get("skipImport")
log.info(s"skipImport is $spkipImport")
@ -110,7 +111,7 @@ object ImportDatacite {
println(s"last Timestamp is $ts")
val cnt = if ("true".equalsIgnoreCase(spkipImport)) 1 else writeSequenceFile(hdfsTargetPath, ts, conf)
val cnt = if ("true".equalsIgnoreCase(spkipImport)) 1 else writeSequenceFile(hdfsTargetPath, ts, conf, bs)
println(s"Imported from Datacite API $cnt documents")
@ -137,7 +138,7 @@ object ImportDatacite {
}
}
private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration): Long = {
private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration, bs:Int): Long = {
var from:Long = timestamp * 1000
val delta:Long = 50000000L
var client: DataciteAPIImporter = null
@ -148,7 +149,7 @@ object ImportDatacite {
try {
var start: Long = System.currentTimeMillis
while (from < now) {
client = new DataciteAPIImporter(from, 100, from + delta)
client = new DataciteAPIImporter(from, bs, from + delta)
var end: Long = 0
val key: IntWritable = new IntWritable(i)
val value: Text = new Text

View File

@ -18,6 +18,12 @@
"paramDescription": "avoid to downlaod new items but apply the previous update",
"paramRequired": false
},
{
"paramName": "bs",
"paramLongName": "blocksize",
"paramDescription": "define the requests block size",
"paramRequired": false
},
{
"paramName": "n",
"paramLongName": "namenode",