package eu.dnetlib.dhp.oa.dedup; import java.util.*; import java.util.stream.Stream; import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.sql.*; import eu.dnetlib.dhp.oa.dedup.model.Identifier; import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.Result; import scala.Tuple2; import scala.Tuple3; import scala.collection.JavaConversions; public class DedupRecordFactory { public static final class DedupRecordReduceState { public final String dedupId; public final ArrayList aliases = new ArrayList<>(); public final HashSet acceptanceDate = new HashSet<>(); public OafEntity entity; public DedupRecordReduceState(String dedupId, String id, OafEntity entity) { this.dedupId = dedupId; this.entity = entity; if (entity == null) { aliases.add(id); } else { if (Result.class.isAssignableFrom(entity.getClass())) { Result result = (Result) entity; if (result.getDateofacceptance() != null && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) { acceptanceDate.add(result.getDateofacceptance().getValue()); } } } } public String getDedupId() { return dedupId; } } private static final int MAX_ACCEPTANCE_DATE = 20; private DedupRecordFactory() { } public static Dataset createDedupRecord( final SparkSession spark, final DataInfo dataInfo, final String mergeRelsInputPath, final String entitiesInputPath, final Class clazz) { final long ts = System.currentTimeMillis(); final Encoder beanEncoder = Encoders.bean(clazz); final Encoder kryoEncoder = Encoders.kryo(clazz); // Dataset entities = spark .read() .schema(Encoders.bean(clazz).schema()) .json(entitiesInputPath) .as(beanEncoder) .map( (MapFunction>) entity -> { return new Tuple2<>(entity.getId(), entity); }, Encoders.tuple(Encoders.STRING(), kryoEncoder)) .selectExpr("_1 AS id", "_2 AS kryoObject"); // : source is the dedup_id, target is the id of the mergedIn Dataset mergeRels = spark .read() .load(mergeRelsInputPath) .where("relClass == 'merges'") .selectExpr("source as dedupId", "target as id"); return mergeRels .join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") .select("dedupId", "id", "kryoObject") .as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder)) .map( (MapFunction, DedupRecordReduceState>) t -> new DedupRecordReduceState( t._1(), t._2(), t._3()), Encoders.kryo(DedupRecordReduceState.class)) .groupByKey( (MapFunction) DedupRecordReduceState::getDedupId, Encoders.STRING()) .reduceGroups( (ReduceFunction) (t1, t2) -> { if (t1.entity == null) { t2.aliases.addAll(t1.aliases); return t2; } if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) { t1.acceptanceDate.addAll(t2.acceptanceDate); } t1.aliases.addAll(t2.aliases); t1.entity = reduceEntity(t1.entity, t2.entity); return t1; }) .flatMap((FlatMapFunction, OafEntity>) t -> { String dedupId = t._1(); DedupRecordReduceState agg = t._2(); if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) { return Collections.emptyIterator(); } return Stream .concat( Stream .of(agg.getDedupId()) .map(id -> createDedupOafEntity(id, agg.entity, dataInfo, ts)), agg.aliases .stream() .map(id -> createMergedDedupAliasOafEntity(id, agg.entity, dataInfo, ts))) .iterator(); }, beanEncoder); } private static OafEntity createDedupOafEntity(String id, OafEntity base, DataInfo dataInfo, long ts) { try { OafEntity res = (OafEntity) BeanUtils.cloneBean(base); res.setId(id); res.setDataInfo(dataInfo); res.setLastupdatetimestamp(ts); return res; } catch (Exception e) { throw new RuntimeException(e); } } private static OafEntity createMergedDedupAliasOafEntity(String id, OafEntity base, DataInfo dataInfo, long ts) { try { OafEntity res = createDedupOafEntity(id, base, dataInfo, ts); DataInfo ds = (DataInfo) BeanUtils.cloneBean(dataInfo); ds.setDeletedbyinference(true); res.setDataInfo(ds); return res; } catch (Exception e) { throw new RuntimeException(e); } } private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) { if (duplicate == null) { return entity; } int compare = new IdentifierComparator<>() .compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate)); if (compare > 0) { OafEntity swap = duplicate; duplicate = entity; entity = swap; } entity = MergeUtils.checkedMerge(entity, duplicate); if (ModelSupport.isSubClass(duplicate, Result.class)) { Result re = (Result) entity; Result rd = (Result) duplicate; List> authors = new ArrayList<>(); if (re.getAuthor() != null) { authors.add(re.getAuthor()); } if (rd.getAuthor() != null) { authors.add(rd.getAuthor()); } re.setAuthor(AuthorMerger.merge(authors)); } return entity; } public static T entityMerger( String id, Iterator> entities, long ts, DataInfo dataInfo, Class clazz) { T base = entities.next()._2(); while (entities.hasNext()) { T duplicate = entities.next()._2(); if (duplicate != null) base = (T) reduceEntity(base, duplicate); } base.setId(id); base.setDataInfo(dataInfo); base.setLastupdatetimestamp(ts); return base; } }