Orcid Update Procedure #394

Merged
sandro.labruzzo merged 13 commits from orcid_update into beta 2024-02-28 09:17:30 +01:00
2 changed files with 13 additions and 4 deletions
Showing only changes of commit e0753f19da - Show all commits

View File

@ -101,16 +101,18 @@ public class ORCIDWorker extends Thread {
SequenceFile.Writer file) throws IOException { SequenceFile.Writer file) throws IOException {
final String response = retrieveURL(id, url, token); final String response = retrieveURL(id, url, token);
if (response != null) { if (response != null) {
if (orcidId == null) {
if (orcidId == null || response == null) {
log.error("Thread {} {} {}", id, orcidId, response); log.error("Thread {} {} {}", id, orcidId, response);
throw new RuntimeException("null items "); throw new RuntimeException("null items ");
} }
if (file == null) { if (file == null) {
log.error("Thread {} file is null for {} URL:{}", id, url, orcidId); log.error("Thread {} file is null for {} URL:{}", id, url, orcidId);
} else } else {
file.append(new Text(orcidId), new Text(response)); file.append(new Text(orcidId), new Text(response));
file.hflush();
}
} }
} }

View File

@ -17,6 +17,8 @@ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
@ -96,9 +98,14 @@ public class OrcidGetUpdatesFile {
urlConn.setConnectTimeout(clientParams.getConnectTimeOut() * 1000); urlConn.setConnectTimeout(clientParams.getConnectTimeOut() * 1000);
if (urlConn.getResponseCode() > 199 && urlConn.getResponseCode() < 300) { if (urlConn.getResponseCode() > 199 && urlConn.getResponseCode() < 300) {
InputStream input = urlConn.getInputStream(); InputStream input = urlConn.getInputStream();
Path hdfsWritePath = new Path("/tmp/orcid_updates.tar.gz");
final FSDataOutputStream fsDataOutputStream = fileSystem.create(hdfsWritePath, true);
IOUtils.copy(input, fsDataOutputStream);
FSDataInputStream updateFile = fileSystem.open(hdfsWritePath);
TarArchiveInputStream tais = new TarArchiveInputStream(new GzipCompressorInputStream( TarArchiveInputStream tais = new TarArchiveInputStream(new GzipCompressorInputStream(
new BufferedInputStream( new BufferedInputStream(
input))); updateFile.getWrappedStream())));
TarArchiveEntry entry; TarArchiveEntry entry;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3000); BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3000);