diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java index 3367399c62..11c1fb6aea 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java @@ -6,6 +6,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -15,7 +16,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.codehaus.jackson.map.ObjectMapper; @@ -36,7 +36,7 @@ public class AbstractMigrationExecutor implements Closeable { private final AtomicInteger counter = new AtomicInteger(0); - private final IntWritable key = new IntWritable(counter.get()); + private final Text key = new Text(); private final Text value = new Text(); @@ -51,7 +51,7 @@ public class AbstractMigrationExecutor implements Closeable { log.info(String.format("Creating SequenceFile Writer, hdfsPath=%s, nameNode=%s, user=%s", hdfsPath, hdfsNameNode, hdfsUser)); this.writer = SequenceFile.createWriter(getConf(hdfsNameNode, hdfsUser), SequenceFile.Writer.file(new Path(hdfsPath)), SequenceFile.Writer - .keyClass(IntWritable.class), SequenceFile.Writer.valueClass(Text.class)); + .keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class)); } private Configuration getConf(final String hdfsNameNode, final String hdfsUser) throws IOException { @@ -67,7 +67,7 @@ public class AbstractMigrationExecutor implements Closeable { protected void emitOaf(final Oaf oaf) { try { - key.set(counter.getAndIncrement()); + key.set(counter.getAndIncrement() + ":" + oaf.getClass().getSimpleName().toLowerCase()); value.set(objectMapper.writeValueAsString(oaf)); writer.append(key, value); } catch (final Exception e) { @@ -99,6 +99,8 @@ public class AbstractMigrationExecutor implements Closeable { } public static Field field(final T value, final DataInfo info) { + if (value == null || StringUtils.isBlank(value.toString())) { return null; } + final Field field = new Field<>(); field.setValue(value); field.setDataInfo(info); @@ -106,7 +108,7 @@ public class AbstractMigrationExecutor implements Closeable { } public static List> listFields(final DataInfo info, final String... values) { - return Arrays.stream(values).map(v -> field(v, info)).collect(Collectors.toList()); + return Arrays.stream(values).map(v -> field(v, info)).filter(Objects::nonNull).collect(Collectors.toList()); } public static Qualifier qualifier(final String classid, final String classname, final String schemeid, final String schemename) { @@ -124,10 +126,12 @@ public class AbstractMigrationExecutor implements Closeable { final String schemeid, final String schemename, final DataInfo dataInfo) { + return structuredProperty(value, qualifier(classid, classname, schemeid, schemename), dataInfo); } public static StructuredProperty structuredProperty(final String value, final Qualifier qualifier, final DataInfo dataInfo) { + if (value == null) { return null; } final StructuredProperty sp = new StructuredProperty(); sp.setValue(value); sp.setQualifier(qualifier); @@ -178,20 +182,25 @@ public class AbstractMigrationExecutor implements Closeable { final String conferenceplace, final String conferencedate, final DataInfo dataInfo) { - final Journal j = new Journal(); - j.setName(name); - j.setIssnPrinted(issnPrinted); - j.setIssnOnline(issnOnline); - j.setIssnLinking(issnLinking); - j.setEp(ep); - j.setIss(iss); - j.setSp(sp); - j.setVol(vol); - j.setEdition(edition); - j.setConferenceplace(conferenceplace); - j.setConferencedate(conferencedate); - j.setDataInfo(dataInfo); - return j; + + if (StringUtils.isNotBlank(name) || StringUtils.isNotBlank(issnPrinted) || StringUtils.isNotBlank(issnOnline) || StringUtils.isNotBlank(issnLinking)) { + final Journal j = new Journal(); + j.setName(name); + j.setIssnPrinted(issnPrinted); + j.setIssnOnline(issnOnline); + j.setIssnLinking(issnLinking); + j.setEp(ep); + j.setIss(iss); + j.setSp(sp); + j.setVol(vol); + j.setEdition(edition); + j.setConferenceplace(conferenceplace); + j.setConferencedate(conferencedate); + j.setDataInfo(dataInfo); + return j; + } else { + return null; + } } public static DataInfo dataInfo(final Boolean deletedbyinference,