[graph provision] addded filter to exclude records marked with datainfo.deletedbyinference = true

This commit is contained in:
Claudio Atzori 2024-07-24 10:00:10 +02:00
parent ceb210993c
commit 01958a3e07
1 changed files with 9 additions and 0 deletions

View File

@ -11,6 +11,7 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.util.LongAccumulator;
@ -29,6 +30,8 @@ import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.model.TupleWrapper;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.solr.SolrRecord;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -115,6 +118,12 @@ public class PayloadConverterJob {
.read()
.load(toSeq(paths))
.as(Encoders.kryo(JoinedEntity.class))
.filter(
(FilterFunction<JoinedEntity>) je -> !Optional
.ofNullable(je.getEntity())
.map(Oaf::getDataInfo)
.map(DataInfo::getDeletedbyinference)
.orElse(false))
.map(
(MapFunction<JoinedEntity, Tuple2<String, SolrRecord>>) je -> new Tuple2<>(
recordFactory.build(je, validateXML),