diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java index ab6f54b92..f1f59b398 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java @@ -6,11 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.IOException; import java.io.StringReader; import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Date; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; @@ -24,6 +20,7 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; @@ -110,6 +107,7 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication .read() .parquet(validPaths) .map((MapFunction) MigrateHdfsMdstoresApplication::enrichRecord, Encoders.STRING()) + .filter((FilterFunction) Objects::nonNull) .toJavaRDD() .mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml))) // .coalesce(1) @@ -135,13 +133,14 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication reader.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true); final Document doc = reader.read(new StringReader(xml)); final Element head = (Element) doc.selectSingleNode("//*[local-name() = 'header']"); + head.addElement(new QName("objIdentifier", DRI_NS_PREFIX)).addText(r.getAs("id")); head.addElement(new QName("dateOfCollection", DRI_NS_PREFIX)).addText(collDate); head.addElement(new QName("dateOfTransformation", DRI_NS_PREFIX)).addText(tranDate); return doc.asXML(); } catch (final Exception e) { log.error("Error patching record: " + xml); - throw new RuntimeException("Error patching record: " + xml, e); + return null; } }