BrBETA_dnet-hadoop/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefImporter.java

94 lines
3.6 KiB
Java
Raw Normal View History

package eu.dnetlib.doiboost.crossref;
2020-04-01 14:12:33 +02:00
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import org.apache.commons.codec.binary.Base64;
2020-04-01 14:12:33 +02:00
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
2020-04-01 14:12:33 +02:00
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
2020-04-01 14:12:33 +02:00
import java.io.ByteArrayOutputStream;
import java.util.zip.Inflater;
2020-04-01 14:12:33 +02:00
public class CrossrefImporter {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(CrossrefImporter.class.getResourceAsStream("/eu/dnetlib/dhp/doiboost/import_from_es.json")));
Logger logger = LoggerFactory.getLogger(CrossrefImporter.class);
2020-04-01 14:12:33 +02:00
parser.parseArgument(args);
final String hdfsuri = parser.get("namenode");
logger.info("HDFS URI"+hdfsuri);
2020-04-01 14:12:33 +02:00
Path hdfswritepath = new Path(parser.get("targetPath"));
logger.info("TargetPath: "+hdfsuri);
final Long timestamp = StringUtils.isNotBlank(parser.get("timestamp"))?Long.parseLong(parser.get("timestamp")):-1;
if(timestamp>0)
logger.info("Timestamp added "+timestamp);
2020-04-01 14:12:33 +02:00
// ====== Init HDFS File System Object
Configuration conf = new Configuration();
// Set FileSystem URI
conf.set("fs.defaultFS", hdfsuri);
// Because of Maven
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
ESClient client = timestamp>0?new ESClient("ip-90-147-167-25.ct1.garrservices.it", "crossref", timestamp):new ESClient("ip-90-147-167-25.ct1.garrservices.it", "crossref");
2020-04-01 14:12:33 +02:00
try (SequenceFile.Writer writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(hdfswritepath), SequenceFile.Writer.keyClass(IntWritable.class),
SequenceFile.Writer.valueClass(Text.class))) {
int i = 0;
long start= System.currentTimeMillis();
long end = 0;
final IntWritable key = new IntWritable(i);
final Text value = new Text();
while (client.hasNext()) {
key.set(i++);
value.set(client.next());
writer.append(key, value);
if (i % 1000000 == 0) {
2020-04-01 14:12:33 +02:00
end = System.currentTimeMillis();
final float time = (end - start) / 1000.0F;
logger.info(String.format("Imported %d records last 100000 imported in %f seconds", i, time));
2020-04-01 14:12:33 +02:00
start = System.currentTimeMillis();
}
}
}
}
public static String decompressBlob(final String blob) {
try {
byte[] byteArray = Base64.decodeBase64(blob.getBytes());
final Inflater decompresser = new Inflater();
decompresser.setInput(byteArray);
final ByteArrayOutputStream bos = new ByteArrayOutputStream(byteArray.length);
byte[] buffer = new byte[8192];
while (!decompresser.finished()) {
int size = decompresser.inflate(buffer);
bos.write(buffer, 0, size);
}
byte[] unzippeddata = bos.toByteArray();
decompresser.end();
return new String(unzippeddata);
} catch (Throwable e) {
throw new RuntimeException("Wrong record:" + blob,e);
}
}
2020-04-01 14:12:33 +02:00
}