diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/ORCIDWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/ORCIDWorker.java index 5442620437..db6475dfa8 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/ORCIDWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/ORCIDWorker.java @@ -101,16 +101,18 @@ public class ORCIDWorker extends Thread { SequenceFile.Writer file) throws IOException { final String response = retrieveURL(id, url, token); if (response != null) { - - if (orcidId == null || response == null) { + if (orcidId == null) { log.error("Thread {} {} {}", id, orcidId, response); throw new RuntimeException("null items "); } if (file == null) { log.error("Thread {} file is null for {} URL:{}", id, url, orcidId); - } else + } else { file.append(new Text(orcidId), new Text(response)); + file.hflush(); + } + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.java index 6438d737cf..388d32b4e2 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.java @@ -17,6 +17,8 @@ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; @@ -96,9 +98,14 @@ public class OrcidGetUpdatesFile { urlConn.setConnectTimeout(clientParams.getConnectTimeOut() * 1000); if (urlConn.getResponseCode() > 199 && urlConn.getResponseCode() < 300) { InputStream input = urlConn.getInputStream(); + + Path hdfsWritePath = new Path("/tmp/orcid_updates.tar.gz"); + final FSDataOutputStream fsDataOutputStream = fileSystem.create(hdfsWritePath, true); + IOUtils.copy(input, fsDataOutputStream); + FSDataInputStream updateFile = fileSystem.open(hdfsWritePath); TarArchiveInputStream tais = new TarArchiveInputStream(new GzipCompressorInputStream( new BufferedInputStream( - input))); + updateFile.getWrappedStream()))); TarArchiveEntry entry; BlockingQueue queue = new ArrayBlockingQueue(3000);