ignore non processable records

This commit is contained in:
Claudio Atzori 2023-03-01 14:49:51 +01:00
parent 7d263f265e
commit 6f488547a7
1 changed files with 5 additions and 6 deletions

View File

@ -6,11 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException; import java.io.IOException;
import java.io.StringReader; import java.io.StringReader;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Arrays; import java.util.*;
import java.util.Date;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -24,6 +20,7 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
@ -110,6 +107,7 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication
.read() .read()
.parquet(validPaths) .parquet(validPaths)
.map((MapFunction<Row, String>) MigrateHdfsMdstoresApplication::enrichRecord, Encoders.STRING()) .map((MapFunction<Row, String>) MigrateHdfsMdstoresApplication::enrichRecord, Encoders.STRING())
.filter((FilterFunction<String>) Objects::nonNull)
.toJavaRDD() .toJavaRDD()
.mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml))) .mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml)))
// .coalesce(1) // .coalesce(1)
@ -135,13 +133,14 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication
reader.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true); reader.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
final Document doc = reader.read(new StringReader(xml)); final Document doc = reader.read(new StringReader(xml));
final Element head = (Element) doc.selectSingleNode("//*[local-name() = 'header']"); final Element head = (Element) doc.selectSingleNode("//*[local-name() = 'header']");
head.addElement(new QName("objIdentifier", DRI_NS_PREFIX)).addText(r.getAs("id")); head.addElement(new QName("objIdentifier", DRI_NS_PREFIX)).addText(r.getAs("id"));
head.addElement(new QName("dateOfCollection", DRI_NS_PREFIX)).addText(collDate); head.addElement(new QName("dateOfCollection", DRI_NS_PREFIX)).addText(collDate);
head.addElement(new QName("dateOfTransformation", DRI_NS_PREFIX)).addText(tranDate); head.addElement(new QName("dateOfTransformation", DRI_NS_PREFIX)).addText(tranDate);
return doc.asXML(); return doc.asXML();
} catch (final Exception e) { } catch (final Exception e) {
log.error("Error patching record: " + xml); log.error("Error patching record: " + xml);
throw new RuntimeException("Error patching record: " + xml, e); return null;
} }
} }