diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index d9fb24078e..4c12d1dc65 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -1,130 +1,187 @@ package eu.dnetlib.dhp.oa.dedup; -import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.ReduceFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.dhp.oa.dedup.model.Identifier; import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Author; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Result; +import org.apache.commons.beanutils.BeanUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.ReduceFunction; +import org.apache.spark.sql.*; import scala.Tuple2; +import scala.Tuple3; +import scala.collection.JavaConversions; + +import java.util.*; +import java.util.stream.Stream; public class DedupRecordFactory { + public static final class DedupRecordReduceState { + public final String dedupId; - protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + public final ArrayList aliases = new ArrayList<>(); - private DedupRecordFactory() { - } + public final HashSet acceptanceDate = new HashSet<>(); - public static Dataset createDedupRecord( - final SparkSession spark, - final DataInfo dataInfo, - final String mergeRelsInputPath, - final String entitiesInputPath, - final Class clazz) { + public OafEntity entity; - long ts = System.currentTimeMillis(); + public DedupRecordReduceState(String dedupId, String id, OafEntity entity) { + this.dedupId = dedupId; + this.entity = entity; + if (entity == null) { + aliases.add(id); + } else { + if (Result.class.isAssignableFrom(entity.getClass())) { + Result result = (Result) entity; + if (result.getDateofacceptance() != null && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) { + acceptanceDate.add(result.getDateofacceptance().getValue()); + } + } + } + } - // - Dataset entities = spark - .read() - .schema(Encoders.bean(clazz).schema()) - .json(entitiesInputPath) - .as(Encoders.bean(clazz)) - .map( - (MapFunction>) entity -> { - return new Tuple2<>(entity.getId(), entity); - }, - Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))) - .selectExpr("_1 AS id", "_2 AS kryoObject"); + public String getDedupId() { + return dedupId; + } + } + private static final int MAX_ACCEPTANCE_DATE = 20; - // : source is the dedup_id, target is the id of the mergedIn - Dataset mergeRels = spark - .read() - .load(mergeRelsInputPath) - .where("relClass == 'merges'") - .selectExpr("source as dedupId", "target as id"); + private DedupRecordFactory() { + } - return mergeRels - .join(entities, "id") - .select("dedupId", "kryoObject") - .as(Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))) - .groupByKey((MapFunction, String>) Tuple2::_1, Encoders.STRING()) - .reduceGroups( - (ReduceFunction>) (t1, t2) -> new Tuple2<>(t1._1(), - reduceEntity(t1._1(), t1._2(), t2._2(), clazz))) - .map( - (MapFunction>, T>) t -> { - T res = t._2()._2(); - res.setDataInfo(dataInfo); - res.setLastupdatetimestamp(ts); - return res; - }, - Encoders.bean(clazz)); - } + public static Dataset createDedupRecord( + final SparkSession spark, + final DataInfo dataInfo, + final String mergeRelsInputPath, + final String entitiesInputPath, + final Class clazz) { - public static T reduceEntity( - String id, T entity, T duplicate, Class clazz) { + final long ts = System.currentTimeMillis(); + final Encoder beanEncoder = Encoders.bean(clazz); + final Encoder kryoEncoder = Encoders.kryo(clazz); - int compare = new IdentifierComparator() - .compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate)); + // + Dataset entities = spark + .read() + .schema(Encoders.bean(clazz).schema()) + .json(entitiesInputPath) + .as(beanEncoder) + .map( + (MapFunction>) entity -> { + return new Tuple2<>(entity.getId(), entity); + }, + Encoders.tuple(Encoders.STRING(), kryoEncoder)) + .selectExpr("_1 AS id", "_2 AS kryoObject"); - if (compare > 0) { - T swap = duplicate; - duplicate = entity; - entity = swap; + // : source is the dedup_id, target is the id of the mergedIn + Dataset mergeRels = spark + .read() + .load(mergeRelsInputPath) + .where("relClass == 'merges'") + .selectExpr("source as dedupId", "target as id"); + + return mergeRels + .join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") + .select("dedupId", "id", "kryoObject") + .as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder)) + .map((MapFunction, DedupRecordReduceState>) t -> new DedupRecordReduceState(t._1(), t._2(), t._3()), Encoders.kryo(DedupRecordReduceState.class)) + .groupByKey((MapFunction) DedupRecordReduceState::getDedupId, Encoders.STRING()) + .reduceGroups( + (ReduceFunction) (t1, t2) -> { + if (t1.entity == null) { + t2.aliases.addAll(t1.aliases); + return t2; + } + if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) { + t1.acceptanceDate.addAll(t2.acceptanceDate); + } + t1.aliases.addAll(t2.aliases); + t1.entity = reduceEntity(t1.entity, t2.entity); + + return t1; + } + ) + .flatMap + ((FlatMapFunction, OafEntity>) t -> { + String dedupId = t._1(); + DedupRecordReduceState agg = t._2(); + + if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) { + return Collections.emptyIterator(); + } + + return Stream.concat(Stream.of(agg.getDedupId()), agg.aliases.stream()) + .map(id -> { + try { + OafEntity res = (OafEntity) BeanUtils.cloneBean(agg.entity); + res.setId(id); + res.setDataInfo(dataInfo); + res.setLastupdatetimestamp(ts); + return res; + } catch (Exception e) { + throw new RuntimeException(e); + } + }).iterator(); + }, beanEncoder); + } + + private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) { + + if (duplicate == null) { + return entity; } - entity.mergeFrom(duplicate); - entity.setId(id); - if (ModelSupport.isSubClass(duplicate, Result.class)) { - Result re = (Result) entity; - Result rd = (Result) duplicate; + int compare = new IdentifierComparator<>() + .compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate)); - List> authors = new ArrayList<>(); - if (re.getAuthor() != null) { - authors.add(re.getAuthor()); - } - if (rd.getAuthor() != null) { - authors.add(rd.getAuthor()); - } + if (compare > 0) { + OafEntity swap = duplicate; + duplicate = entity; + entity = swap; + } - re.setAuthor(AuthorMerger.merge(authors)); - } + entity.mergeFrom(duplicate); - return entity; - } + if (ModelSupport.isSubClass(duplicate, Result.class)) { + Result re = (Result) entity; + Result rd = (Result) duplicate; - public static T entityMerger( - String id, Iterator> entities, long ts, DataInfo dataInfo, Class clazz) - throws IllegalAccessException, InstantiationException, InvocationTargetException { - T base = entities.next()._2(); + List> authors = new ArrayList<>(); + if (re.getAuthor() != null) { + authors.add(re.getAuthor()); + } + if (rd.getAuthor() != null) { + authors.add(rd.getAuthor()); + } - while (entities.hasNext()) { - T duplicate = entities.next()._2(); - if (duplicate != null) - base = reduceEntity(id, base, duplicate, clazz); - } + re.setAuthor(AuthorMerger.merge(authors)); + } - base.setDataInfo(dataInfo); - base.setLastupdatetimestamp(ts); + return entity; + } - return base; - } + public static T entityMerger( + String id, Iterator> entities, long ts, DataInfo dataInfo, Class clazz) { + T base = entities.next()._2(); + + while (entities.hasNext()) { + T duplicate = entities.next()._2(); + if (duplicate != null) + base = (T) reduceEntity(base, duplicate); + } + + base.setId(id); + base.setDataInfo(dataInfo); + base.setLastupdatetimestamp(ts); + + return base; + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index bd5a04e62f..8b3480e60b 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -611,7 +611,7 @@ public class SparkDedupTest implements Serializable { assertEquals(91, pubs.count()); assertEquals(47, sw_deduprecord); assertEquals(97, ds_deduprecord); - assertEquals(93, orp_deduprecord); + assertEquals(92, orp_deduprecord); verifyRoot_1(mapper, pubs); @@ -751,7 +751,7 @@ public class SparkDedupTest implements Serializable { assertEquals(100, datasource); assertEquals(196, softwares); assertEquals(389, dataset); - assertEquals(521, otherresearchproduct); + assertEquals(520, otherresearchproduct); // System.out.println("publications = " + publications); // System.out.println("organizations = " + organizations);