BrBETA_dnet-hadoop/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/AbstractMigrationApplicatio...

81 lines
2.5 KiB
Java
Raw Normal View History

2020-03-02 16:12:14 +01:00
package eu.dnetlib.dhp.migration.utils;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.atomic.AtomicInteger;
2020-02-11 12:48:03 +01:00
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.codehaus.jackson.map.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.Oaf;
2020-03-02 16:12:14 +01:00
public class AbstractMigrationApplication implements Closeable {
private final AtomicInteger counter = new AtomicInteger(0);
2020-02-12 11:12:38 +01:00
private final Text key = new Text();
private final Text value = new Text();
private final SequenceFile.Writer writer;
2020-03-02 16:12:14 +01:00
private final ObjectMapper objectMapper = new ObjectMapper();
private static final Log log = LogFactory.getLog(AbstractMigrationApplication.class);
2020-02-11 12:48:03 +01:00
2020-03-02 16:12:14 +01:00
public AbstractMigrationApplication(final String hdfsPath, final String hdfsNameNode, final String hdfsUser) throws Exception {
2020-02-11 12:48:03 +01:00
log.info(String.format("Creating SequenceFile Writer, hdfsPath=%s, nameNode=%s, user=%s", hdfsPath, hdfsNameNode, hdfsUser));
this.writer = SequenceFile.createWriter(getConf(hdfsNameNode, hdfsUser), SequenceFile.Writer.file(new Path(hdfsPath)), SequenceFile.Writer
2020-02-12 11:12:38 +01:00
.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class));
}
private Configuration getConf(final String hdfsNameNode, final String hdfsUser) throws IOException {
final Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
System.setProperty("HADOOP_USER_NAME", hdfsUser);
System.setProperty("hadoop.home.dir", "/");
FileSystem.get(URI.create(hdfsNameNode), conf);
return conf;
}
2020-03-02 16:12:14 +01:00
protected void emit(final String s, final String type) {
try {
2020-03-02 16:12:14 +01:00
key.set(counter.getAndIncrement() + ":" + type);
value.set(s);
writer.append(key, value);
} catch (final Exception e) {
2020-02-11 15:29:50 +01:00
throw new RuntimeException(e);
}
}
2020-03-02 16:12:14 +01:00
protected void emitOaf(final Oaf oaf) {
try {
emit(objectMapper.writeValueAsString(oaf), oaf.getClass().getSimpleName().toLowerCase());
} catch (final Exception e) {
throw new RuntimeException(e);
2020-02-12 11:12:38 +01:00
}
2020-01-21 14:17:05 +01:00
}
2020-03-02 16:12:14 +01:00
public ObjectMapper getObjectMapper() {
return objectMapper;
2020-01-21 14:17:05 +01:00
}
2020-03-02 16:12:14 +01:00
@Override
public void close() throws IOException {
writer.hflush();
writer.close();
2020-01-21 14:17:05 +01:00
}
}