[aggregator graph] save invalid records aside for further inspection

This commit is contained in:
Claudio Atzori 2022-09-14 13:27:39 +02:00
parent b99a011345
commit a0919ed495
6 changed files with 45 additions and 6 deletions

View File

@ -107,7 +107,7 @@ public abstract class AbstractMdRecordToOafMapper {
this.forceOriginalId = false; this.forceOriginalId = false;
} }
public List<Oaf> processMdRecord(final String xml) throws DocumentException { public List<Oaf> processMdRecord(final String xml) {
DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext); DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext);
try { try {

View File

@ -16,6 +16,9 @@ import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -76,6 +79,9 @@ public class GenerateEntitiesApplication {
final String targetPath = parser.get("targetPath"); final String targetPath = parser.get("targetPath");
log.info("targetPath: {}", targetPath); log.info("targetPath: {}", targetPath);
final String invalidPath = parser.get("invalidPath");
log.info("invalidPath: {}", invalidPath);
final String isLookupUrl = parser.get("isLookupUrl"); final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl); log.info("isLookupUrl: {}", isLookupUrl);
@ -97,7 +103,8 @@ public class GenerateEntitiesApplication {
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> { runWithSparkSession(conf, isSparkSessionManaged, spark -> {
HdfsSupport.remove(targetPath, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(targetPath, spark.sparkContext().hadoopConfiguration());
generateEntities(spark, vocs, sourcePaths, targetPath, shouldHashId, mode); HdfsSupport.remove(invalidPath, spark.sparkContext().hadoopConfiguration());
generateEntities(spark, vocs, sourcePaths, targetPath, invalidPath, shouldHashId, mode);
}); });
} }
@ -106,6 +113,7 @@ public class GenerateEntitiesApplication {
final VocabularyGroup vocs, final VocabularyGroup vocs,
final String sourcePaths, final String sourcePaths,
final String targetPath, final String targetPath,
final String invalidPath,
final boolean shouldHashId, final boolean shouldHashId,
final Mode mode) { final Mode mode) {
@ -121,6 +129,19 @@ public class GenerateEntitiesApplication {
JavaRDD<Oaf> inputRdd = sc.emptyRDD(); JavaRDD<Oaf> inputRdd = sc.emptyRDD();
for (final String sp : existingSourcePaths) { for (final String sp : existingSourcePaths) {
RDD<String> invalidRecords = sc
.sequenceFile(sp, Text.class, Text.class)
.map(k -> new Tuple2<>(k._1().toString(), k._2().toString()))
.map(k -> tryApplyMapping(k._1(), k._2(), shouldHashId, vocs))
.filter(Objects::nonNull)
.rdd();
spark
.createDataset(invalidRecords, Encoders.STRING())
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.text(invalidPath);
inputRdd = inputRdd inputRdd = inputRdd
.union( .union(
sc sc
@ -159,7 +180,7 @@ public class GenerateEntitiesApplication {
final String id, final String id,
final String s, final String s,
final boolean shouldHashId, final boolean shouldHashId,
final VocabularyGroup vocs) throws DocumentException { final VocabularyGroup vocs) {
final String type = StringUtils.substringAfter(id, ":"); final String type = StringUtils.substringAfter(id, ":");
switch (type.toLowerCase()) { switch (type.toLowerCase()) {
@ -196,6 +217,18 @@ public class GenerateEntitiesApplication {
} }
} }
private static String tryApplyMapping(
final String id,
final String s,
final boolean shouldHashId,
final VocabularyGroup vocs) {
if (convertToListOaf(id, s, shouldHashId, vocs).isEmpty()) {
return s;
}
return null;
}
private static Oaf convertFromJson(final String s, final Class<? extends Oaf> clazz) { private static Oaf convertFromJson(final String s, final Class<? extends Oaf> clazz) {
try { try {
return OBJECT_MAPPER.readValue(s, clazz); return OBJECT_MAPPER.readValue(s, clazz);

View File

@ -17,6 +17,12 @@
"paramDescription": "the path of the target file", "paramDescription": "the path of the target file",
"paramRequired": true "paramRequired": true
}, },
{
"paramName": "i",
"paramLongName": "invalidPath",
"paramDescription": "the path of the invalid records file",
"paramRequired": false
},
{ {
"paramName": "isu", "paramName": "isu",
"paramLongName": "isLookupUrl", "paramLongName": "isLookupUrl",

View File

@ -468,6 +468,7 @@
</spark-opts> </spark-opts>
<arg>--sourcePaths</arg><arg>${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims</arg> <arg>--sourcePaths</arg><arg>${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims</arg>
<arg>--targetPath</arg><arg>${workingDir}/entities_claim</arg> <arg>--targetPath</arg><arg>${workingDir}/entities_claim</arg>
<arg>--invalidPath</arg><arg>${workingDir}/invalid_records_claim</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--shouldHashId</arg><arg>${shouldHashId}</arg> <arg>--shouldHashId</arg><arg>${shouldHashId}</arg>
<arg>--mode</arg><arg>claim</arg> <arg>--mode</arg><arg>claim</arg>
@ -517,6 +518,7 @@
</spark-opts> </spark-opts>
<arg>--sourcePaths</arg><arg>${contentPath}/db_openaire,${contentPath}/db_openorgs,${contentPath}/oaf_records,${contentPath}/odf_records,${contentPath}/oaf_records_hdfs,${contentPath}/odf_records_hdfs,${contentPath}/oaf_records_invisible</arg> <arg>--sourcePaths</arg><arg>${contentPath}/db_openaire,${contentPath}/db_openorgs,${contentPath}/oaf_records,${contentPath}/odf_records,${contentPath}/oaf_records_hdfs,${contentPath}/odf_records_hdfs,${contentPath}/oaf_records_invisible</arg>
<arg>--targetPath</arg><arg>${workingDir}/entities</arg> <arg>--targetPath</arg><arg>${workingDir}/entities</arg>
<arg>--invalidPath</arg><arg>${workingDir}/invalid_records</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--shouldHashId</arg><arg>${shouldHashId}</arg> <arg>--shouldHashId</arg><arg>${shouldHashId}</arg>
</spark> </spark>

View File

@ -12,7 +12,6 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import kotlin.jvm.Throws;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
@ -22,7 +21,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;

View File

@ -18,7 +18,6 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@ -32,6 +31,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;