forked from antonis.lempesis/dnet-hadoop
simplifications
This commit is contained in:
parent
06c2fd6df9
commit
69336195d3
|
@ -6,6 +6,7 @@ import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@ -15,7 +16,6 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.IntWritable;
|
|
||||||
import org.apache.hadoop.io.SequenceFile;
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
@ -36,7 +36,7 @@ public class AbstractMigrationExecutor implements Closeable {
|
||||||
|
|
||||||
private final AtomicInteger counter = new AtomicInteger(0);
|
private final AtomicInteger counter = new AtomicInteger(0);
|
||||||
|
|
||||||
private final IntWritable key = new IntWritable(counter.get());
|
private final Text key = new Text();
|
||||||
|
|
||||||
private final Text value = new Text();
|
private final Text value = new Text();
|
||||||
|
|
||||||
|
@ -51,7 +51,7 @@ public class AbstractMigrationExecutor implements Closeable {
|
||||||
log.info(String.format("Creating SequenceFile Writer, hdfsPath=%s, nameNode=%s, user=%s", hdfsPath, hdfsNameNode, hdfsUser));
|
log.info(String.format("Creating SequenceFile Writer, hdfsPath=%s, nameNode=%s, user=%s", hdfsPath, hdfsNameNode, hdfsUser));
|
||||||
|
|
||||||
this.writer = SequenceFile.createWriter(getConf(hdfsNameNode, hdfsUser), SequenceFile.Writer.file(new Path(hdfsPath)), SequenceFile.Writer
|
this.writer = SequenceFile.createWriter(getConf(hdfsNameNode, hdfsUser), SequenceFile.Writer.file(new Path(hdfsPath)), SequenceFile.Writer
|
||||||
.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(Text.class));
|
.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
private Configuration getConf(final String hdfsNameNode, final String hdfsUser) throws IOException {
|
private Configuration getConf(final String hdfsNameNode, final String hdfsUser) throws IOException {
|
||||||
|
@ -67,7 +67,7 @@ public class AbstractMigrationExecutor implements Closeable {
|
||||||
|
|
||||||
protected void emitOaf(final Oaf oaf) {
|
protected void emitOaf(final Oaf oaf) {
|
||||||
try {
|
try {
|
||||||
key.set(counter.getAndIncrement());
|
key.set(counter.getAndIncrement() + ":" + oaf.getClass().getSimpleName().toLowerCase());
|
||||||
value.set(objectMapper.writeValueAsString(oaf));
|
value.set(objectMapper.writeValueAsString(oaf));
|
||||||
writer.append(key, value);
|
writer.append(key, value);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
|
@ -99,6 +99,8 @@ public class AbstractMigrationExecutor implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> Field<T> field(final T value, final DataInfo info) {
|
public static <T> Field<T> field(final T value, final DataInfo info) {
|
||||||
|
if (value == null || StringUtils.isBlank(value.toString())) { return null; }
|
||||||
|
|
||||||
final Field<T> field = new Field<>();
|
final Field<T> field = new Field<>();
|
||||||
field.setValue(value);
|
field.setValue(value);
|
||||||
field.setDataInfo(info);
|
field.setDataInfo(info);
|
||||||
|
@ -106,7 +108,7 @@ public class AbstractMigrationExecutor implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<Field<String>> listFields(final DataInfo info, final String... values) {
|
public static List<Field<String>> listFields(final DataInfo info, final String... values) {
|
||||||
return Arrays.stream(values).map(v -> field(v, info)).collect(Collectors.toList());
|
return Arrays.stream(values).map(v -> field(v, info)).filter(Objects::nonNull).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Qualifier qualifier(final String classid, final String classname, final String schemeid, final String schemename) {
|
public static Qualifier qualifier(final String classid, final String classname, final String schemeid, final String schemename) {
|
||||||
|
@ -124,10 +126,12 @@ public class AbstractMigrationExecutor implements Closeable {
|
||||||
final String schemeid,
|
final String schemeid,
|
||||||
final String schemename,
|
final String schemename,
|
||||||
final DataInfo dataInfo) {
|
final DataInfo dataInfo) {
|
||||||
|
|
||||||
return structuredProperty(value, qualifier(classid, classname, schemeid, schemename), dataInfo);
|
return structuredProperty(value, qualifier(classid, classname, schemeid, schemename), dataInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static StructuredProperty structuredProperty(final String value, final Qualifier qualifier, final DataInfo dataInfo) {
|
public static StructuredProperty structuredProperty(final String value, final Qualifier qualifier, final DataInfo dataInfo) {
|
||||||
|
if (value == null) { return null; }
|
||||||
final StructuredProperty sp = new StructuredProperty();
|
final StructuredProperty sp = new StructuredProperty();
|
||||||
sp.setValue(value);
|
sp.setValue(value);
|
||||||
sp.setQualifier(qualifier);
|
sp.setQualifier(qualifier);
|
||||||
|
@ -178,20 +182,25 @@ public class AbstractMigrationExecutor implements Closeable {
|
||||||
final String conferenceplace,
|
final String conferenceplace,
|
||||||
final String conferencedate,
|
final String conferencedate,
|
||||||
final DataInfo dataInfo) {
|
final DataInfo dataInfo) {
|
||||||
final Journal j = new Journal();
|
|
||||||
j.setName(name);
|
if (StringUtils.isNotBlank(name) || StringUtils.isNotBlank(issnPrinted) || StringUtils.isNotBlank(issnOnline) || StringUtils.isNotBlank(issnLinking)) {
|
||||||
j.setIssnPrinted(issnPrinted);
|
final Journal j = new Journal();
|
||||||
j.setIssnOnline(issnOnline);
|
j.setName(name);
|
||||||
j.setIssnLinking(issnLinking);
|
j.setIssnPrinted(issnPrinted);
|
||||||
j.setEp(ep);
|
j.setIssnOnline(issnOnline);
|
||||||
j.setIss(iss);
|
j.setIssnLinking(issnLinking);
|
||||||
j.setSp(sp);
|
j.setEp(ep);
|
||||||
j.setVol(vol);
|
j.setIss(iss);
|
||||||
j.setEdition(edition);
|
j.setSp(sp);
|
||||||
j.setConferenceplace(conferenceplace);
|
j.setVol(vol);
|
||||||
j.setConferencedate(conferencedate);
|
j.setEdition(edition);
|
||||||
j.setDataInfo(dataInfo);
|
j.setConferenceplace(conferenceplace);
|
||||||
return j;
|
j.setConferencedate(conferencedate);
|
||||||
|
j.setDataInfo(dataInfo);
|
||||||
|
return j;
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static DataInfo dataInfo(final Boolean deletedbyinference,
|
public static DataInfo dataInfo(final Boolean deletedbyinference,
|
||||||
|
|
Loading…
Reference in New Issue