From e0753f19da7a33978d57016a6781076458a09186 Mon Sep 17 00:00:00 2001 From: "sandro.labruzzo" Date: Sat, 13 Jan 2024 09:27:08 +0100 Subject: [PATCH] Fixed error of connection timeout --- .../eu/dnetlib/dhp/collection/orcid/ORCIDWorker.java | 8 +++++--- .../dhp/collection/orcid/OrcidGetUpdatesFile.java | 9 ++++++++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/ORCIDWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/ORCIDWorker.java index 544262043..db6475dfa 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/ORCIDWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/ORCIDWorker.java @@ -101,16 +101,18 @@ public class ORCIDWorker extends Thread { SequenceFile.Writer file) throws IOException { final String response = retrieveURL(id, url, token); if (response != null) { - - if (orcidId == null || response == null) { + if (orcidId == null) { log.error("Thread {} {} {}", id, orcidId, response); throw new RuntimeException("null items "); } if (file == null) { log.error("Thread {} file is null for {} URL:{}", id, url, orcidId); - } else + } else { file.append(new Text(orcidId), new Text(response)); + file.hflush(); + } + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.java index 6438d737c..388d32b4e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.java @@ -17,6 +17,8 @@ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; @@ -96,9 +98,14 @@ public class OrcidGetUpdatesFile { urlConn.setConnectTimeout(clientParams.getConnectTimeOut() * 1000); if (urlConn.getResponseCode() > 199 && urlConn.getResponseCode() < 300) { InputStream input = urlConn.getInputStream(); + + Path hdfsWritePath = new Path("/tmp/orcid_updates.tar.gz"); + final FSDataOutputStream fsDataOutputStream = fileSystem.create(hdfsWritePath, true); + IOUtils.copy(input, fsDataOutputStream); + FSDataInputStream updateFile = fileSystem.open(hdfsWritePath); TarArchiveInputStream tais = new TarArchiveInputStream(new GzipCompressorInputStream( new BufferedInputStream( - input))); + updateFile.getWrappedStream()))); TarArchiveEntry entry; BlockingQueue queue = new ArrayBlockingQueue(3000);