2020-04-29 13:24:29 +02:00
|
|
|
|
2020-04-20 09:53:34 +02:00
|
|
|
package eu.dnetlib.doiboost.crossref;
|
2020-04-01 14:12:33 +02:00
|
|
|
|
2020-04-23 09:33:48 +02:00
|
|
|
import java.io.ByteArrayOutputStream;
|
2021-01-05 10:00:13 +01:00
|
|
|
import java.util.Optional;
|
2020-04-23 09:33:48 +02:00
|
|
|
import java.util.zip.Inflater;
|
2020-04-29 13:24:29 +02:00
|
|
|
|
2020-04-20 09:53:34 +02:00
|
|
|
import org.apache.commons.codec.binary.Base64;
|
2020-04-01 14:12:33 +02:00
|
|
|
import org.apache.commons.io.IOUtils;
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
|
import org.apache.hadoop.io.IntWritable;
|
|
|
|
import org.apache.hadoop.io.SequenceFile;
|
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
|
|
2020-04-29 13:24:29 +02:00
|
|
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
|
|
|
2020-04-01 14:12:33 +02:00
|
|
|
public class CrossrefImporter {
|
|
|
|
|
2020-04-29 13:24:29 +02:00
|
|
|
public static void main(String[] args) throws Exception {
|
2020-04-01 14:12:33 +02:00
|
|
|
|
2020-04-29 13:24:29 +02:00
|
|
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
|
|
|
IOUtils
|
|
|
|
.toString(
|
|
|
|
CrossrefImporter.class
|
|
|
|
.getResourceAsStream(
|
|
|
|
"/eu/dnetlib/dhp/doiboost/import_from_es.json")));
|
2020-05-28 09:57:46 +02:00
|
|
|
|
2020-04-29 13:24:29 +02:00
|
|
|
parser.parseArgument(args);
|
2020-04-01 14:12:33 +02:00
|
|
|
|
2021-01-05 10:00:13 +01:00
|
|
|
final String namenode = parser.get("namenode");
|
|
|
|
System.out.println("namenode: " + namenode);
|
2020-04-20 09:53:34 +02:00
|
|
|
|
2021-01-05 10:00:13 +01:00
|
|
|
Path targetPath = new Path(parser.get("targetPath"));
|
|
|
|
System.out.println("targetPath: " + targetPath);
|
2020-04-01 14:12:33 +02:00
|
|
|
|
2021-01-05 10:00:13 +01:00
|
|
|
final Long timestamp = Optional
|
|
|
|
.ofNullable(parser.get("timestamp"))
|
|
|
|
.map(s -> {
|
|
|
|
try {
|
|
|
|
return Long.parseLong(s);
|
|
|
|
} catch (NumberFormatException e) {
|
|
|
|
return -1L;
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.orElse(-1L);
|
|
|
|
System.out.println("timestamp: " + timestamp);
|
|
|
|
|
|
|
|
final String esServer = parser.get("esServer");
|
|
|
|
System.out.println("esServer: " + esServer);
|
|
|
|
|
|
|
|
final String esIndex = parser.get("esIndex");
|
|
|
|
System.out.println("esIndex: " + esIndex);
|
2020-04-01 14:12:33 +02:00
|
|
|
|
2020-04-29 13:24:29 +02:00
|
|
|
// ====== Init HDFS File System Object
|
|
|
|
Configuration conf = new Configuration();
|
|
|
|
// Set FileSystem URI
|
2021-01-05 10:00:13 +01:00
|
|
|
conf.set("fs.defaultFS", namenode);
|
2020-04-29 13:24:29 +02:00
|
|
|
// Because of Maven
|
|
|
|
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
|
|
|
|
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
|
2020-04-01 14:12:33 +02:00
|
|
|
|
2021-01-05 10:00:13 +01:00
|
|
|
// "ip-90-147-167-25.ct1.garrservices.it", "crossref"
|
|
|
|
final ESClient client = new ESClient(esServer, esIndex, timestamp);
|
2020-04-01 14:12:33 +02:00
|
|
|
|
2020-04-29 13:24:29 +02:00
|
|
|
try (SequenceFile.Writer writer = SequenceFile
|
|
|
|
.createWriter(
|
|
|
|
conf,
|
2021-01-05 10:00:13 +01:00
|
|
|
SequenceFile.Writer.file(targetPath),
|
2020-04-29 13:24:29 +02:00
|
|
|
SequenceFile.Writer.keyClass(IntWritable.class),
|
|
|
|
SequenceFile.Writer.valueClass(Text.class))) {
|
2020-04-01 14:12:33 +02:00
|
|
|
|
2020-04-29 13:24:29 +02:00
|
|
|
int i = 0;
|
|
|
|
long start = System.currentTimeMillis();
|
|
|
|
long end = 0;
|
|
|
|
final IntWritable key = new IntWritable(i);
|
|
|
|
final Text value = new Text();
|
|
|
|
while (client.hasNext()) {
|
|
|
|
key.set(i++);
|
|
|
|
value.set(client.next());
|
|
|
|
writer.append(key, value);
|
2020-05-28 09:57:46 +02:00
|
|
|
if (i % 100000 == 0) {
|
2020-04-29 13:24:29 +02:00
|
|
|
end = System.currentTimeMillis();
|
|
|
|
final float time = (end - start) / 1000.0F;
|
2020-05-28 09:57:46 +02:00
|
|
|
System.out
|
2021-01-05 10:00:13 +01:00
|
|
|
.println(String.format("Imported %s records last 100000 imported in %s seconds", i, time));
|
2020-04-29 13:24:29 +02:00
|
|
|
start = System.currentTimeMillis();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-04-20 09:53:34 +02:00
|
|
|
|
2020-04-29 13:24:29 +02:00
|
|
|
public static String decompressBlob(final String blob) {
|
|
|
|
try {
|
|
|
|
byte[] byteArray = Base64.decodeBase64(blob.getBytes());
|
|
|
|
final Inflater decompresser = new Inflater();
|
|
|
|
decompresser.setInput(byteArray);
|
|
|
|
final ByteArrayOutputStream bos = new ByteArrayOutputStream(byteArray.length);
|
|
|
|
byte[] buffer = new byte[8192];
|
|
|
|
while (!decompresser.finished()) {
|
|
|
|
int size = decompresser.inflate(buffer);
|
|
|
|
bos.write(buffer, 0, size);
|
|
|
|
}
|
|
|
|
decompresser.end();
|
2021-05-14 10:58:12 +02:00
|
|
|
return bos.toString();
|
2020-04-29 13:24:29 +02:00
|
|
|
} catch (Throwable e) {
|
|
|
|
throw new RuntimeException("Wrong record:" + blob, e);
|
|
|
|
}
|
|
|
|
}
|
2020-04-01 14:12:33 +02:00
|
|
|
}
|