[aggregator graph] save invalid records aside for further inspection

This commit is contained in:
Claudio Atzori 2022-09-14 17:11:26 +02:00
parent a0919ed495
commit c48f6e9c57
2 changed files with 6 additions and 4 deletions

View File

@ -143,7 +143,7 @@ public abstract class AbstractMdRecordToOafMapper {
return createOafs(doc, type, instances, collectedFrom, info, lastUpdateTimestamp); return createOafs(doc, type, instances, collectedFrom, info, lastUpdateTimestamp);
} catch (DocumentException e) { } catch (DocumentException e) {
log.error("Error with record:\n" + xml); log.error("Error with record:\n" + xml);
return Lists.newArrayList(); return null;
} }
} }

View File

@ -126,8 +126,6 @@ public class GenerateEntitiesApplication {
log.info("Generate entities from files:"); log.info("Generate entities from files:");
existingSourcePaths.forEach(log::info); existingSourcePaths.forEach(log::info);
JavaRDD<Oaf> inputRdd = sc.emptyRDD();
for (final String sp : existingSourcePaths) { for (final String sp : existingSourcePaths) {
RDD<String> invalidRecords = sc RDD<String> invalidRecords = sc
.sequenceFile(sp, Text.class, Text.class) .sequenceFile(sp, Text.class, Text.class)
@ -141,7 +139,11 @@ public class GenerateEntitiesApplication {
.mode(SaveMode.Append) .mode(SaveMode.Append)
.option("compression", "gzip") .option("compression", "gzip")
.text(invalidPath); .text(invalidPath);
}
JavaRDD<Oaf> inputRdd = sc.emptyRDD();
for (final String sp : existingSourcePaths) {
inputRdd = inputRdd inputRdd = inputRdd
.union( .union(
sc sc
@ -223,7 +225,7 @@ public class GenerateEntitiesApplication {
final boolean shouldHashId, final boolean shouldHashId,
final VocabularyGroup vocs) { final VocabularyGroup vocs) {
if (convertToListOaf(id, s, shouldHashId, vocs).isEmpty()) { if (Objects.isNull(convertToListOaf(id, s, shouldHashId, vocs))) {
return s; return s;
} }
return null; return null;