Improvements and refactoring in Dedup #367

Merged
giambattista.bloisi merged 5 commits from dedup_increasenumofblocks into beta 2024-01-11 11:24:07 +01:00
2 changed files with 158 additions and 101 deletions
Showing only changes of commit 3c66e3bd7b - Show all commits

View File

@ -1,55 +1,82 @@
package eu.dnetlib.dhp.oa.dedup; package eu.dnetlib.dhp.oa.dedup;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.dedup.model.Identifier; import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.oa.merge.AuthorMerger;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Result;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import scala.Tuple2; import scala.Tuple2;
import scala.Tuple3;
import scala.collection.JavaConversions;
import java.util.*;
import java.util.stream.Stream;
public class DedupRecordFactory { public class DedupRecordFactory {
public static final class DedupRecordReduceState {
public final String dedupId;
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() public final ArrayList<String> aliases = new ArrayList<>();
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
public final HashSet<String> acceptanceDate = new HashSet<>();
public OafEntity entity;
public DedupRecordReduceState(String dedupId, String id, OafEntity entity) {
this.dedupId = dedupId;
this.entity = entity;
if (entity == null) {
aliases.add(id);
} else {
if (Result.class.isAssignableFrom(entity.getClass())) {
Result result = (Result) entity;
if (result.getDateofacceptance() != null && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) {
acceptanceDate.add(result.getDateofacceptance().getValue());
}
}
}
}
public String getDedupId() {
return dedupId;
}
}
private static final int MAX_ACCEPTANCE_DATE = 20;
private DedupRecordFactory() { private DedupRecordFactory() {
} }
public static <T extends OafEntity> Dataset<T> createDedupRecord( public static Dataset<OafEntity> createDedupRecord(
final SparkSession spark, final SparkSession spark,
final DataInfo dataInfo, final DataInfo dataInfo,
final String mergeRelsInputPath, final String mergeRelsInputPath,
final String entitiesInputPath, final String entitiesInputPath,
final Class<T> clazz) { final Class<OafEntity> clazz) {
long ts = System.currentTimeMillis(); final long ts = System.currentTimeMillis();
final Encoder<OafEntity> beanEncoder = Encoders.bean(clazz);
final Encoder<OafEntity> kryoEncoder = Encoders.kryo(clazz);
// <id, json_entity> // <id, json_entity>
Dataset<Row> entities = spark Dataset<Row> entities = spark
.read() .read()
.schema(Encoders.bean(clazz).schema()) .schema(Encoders.bean(clazz).schema())
.json(entitiesInputPath) .json(entitiesInputPath)
.as(Encoders.bean(clazz)) .as(beanEncoder)
.map( .map(
(MapFunction<T, Tuple2<String, T>>) entity -> { (MapFunction<OafEntity, Tuple2<String, OafEntity>>) entity -> {
return new Tuple2<>(entity.getId(), entity); return new Tuple2<>(entity.getId(), entity);
}, },
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))) Encoders.tuple(Encoders.STRING(), kryoEncoder))
.selectExpr("_1 AS id", "_2 AS kryoObject"); .selectExpr("_1 AS id", "_2 AS kryoObject");
// <source, target>: source is the dedup_id, target is the id of the mergedIn // <source, target>: source is the dedup_id, target is the id of the mergedIn
@ -60,37 +87,67 @@ public class DedupRecordFactory {
.selectExpr("source as dedupId", "target as id"); .selectExpr("source as dedupId", "target as id");
return mergeRels return mergeRels
.join(entities, "id") .join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left")
.select("dedupId", "kryoObject") .select("dedupId", "id", "kryoObject")
.as(Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))) .as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder))
.groupByKey((MapFunction<Tuple2<String, T>, String>) Tuple2::_1, Encoders.STRING()) .map((MapFunction<Tuple3<String, String, OafEntity>, DedupRecordReduceState>) t -> new DedupRecordReduceState(t._1(), t._2(), t._3()), Encoders.kryo(DedupRecordReduceState.class))
.groupByKey((MapFunction<DedupRecordReduceState, String>) DedupRecordReduceState::getDedupId, Encoders.STRING())
.reduceGroups( .reduceGroups(
(ReduceFunction<Tuple2<String, T>>) (t1, t2) -> new Tuple2<>(t1._1(), (ReduceFunction<DedupRecordReduceState>) (t1, t2) -> {
reduceEntity(t1._1(), t1._2(), t2._2(), clazz))) if (t1.entity == null) {
.map( t2.aliases.addAll(t1.aliases);
(MapFunction<Tuple2<String, Tuple2<String, T>>, T>) t -> { return t2;
T res = t._2()._2(); }
if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) {
t1.acceptanceDate.addAll(t2.acceptanceDate);
}
t1.aliases.addAll(t2.aliases);
t1.entity = reduceEntity(t1.entity, t2.entity);
return t1;
}
)
.flatMap
((FlatMapFunction<Tuple2<String, DedupRecordReduceState>, OafEntity>) t -> {
String dedupId = t._1();
DedupRecordReduceState agg = t._2();
if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) {
return Collections.emptyIterator();
}
return Stream.concat(Stream.of(agg.getDedupId()), agg.aliases.stream())
.map(id -> {
try {
OafEntity res = (OafEntity) BeanUtils.cloneBean(agg.entity);
res.setId(id);
res.setDataInfo(dataInfo); res.setDataInfo(dataInfo);
res.setLastupdatetimestamp(ts); res.setLastupdatetimestamp(ts);
return res; return res;
}, } catch (Exception e) {
Encoders.bean(clazz)); throw new RuntimeException(e);
}
}).iterator();
}, beanEncoder);
} }
public static <T extends OafEntity> T reduceEntity( private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) {
String id, T entity, T duplicate, Class<T> clazz) {
int compare = new IdentifierComparator() if (duplicate == null) {
return entity;
}
int compare = new IdentifierComparator<>()
.compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate)); .compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate));
if (compare > 0) { if (compare > 0) {
T swap = duplicate; OafEntity swap = duplicate;
duplicate = entity; duplicate = entity;
entity = swap; entity = swap;
} }
entity.mergeFrom(duplicate); entity.mergeFrom(duplicate);
entity.setId(id);
if (ModelSupport.isSubClass(duplicate, Result.class)) { if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result re = (Result) entity; Result re = (Result) entity;
@ -111,16 +168,16 @@ public class DedupRecordFactory {
} }
public static <T extends OafEntity> T entityMerger( public static <T extends OafEntity> T entityMerger(
String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo, Class<T> clazz) String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo, Class<T> clazz) {
throws IllegalAccessException, InstantiationException, InvocationTargetException {
T base = entities.next()._2(); T base = entities.next()._2();
while (entities.hasNext()) { while (entities.hasNext()) {
T duplicate = entities.next()._2(); T duplicate = entities.next()._2();
if (duplicate != null) if (duplicate != null)
base = reduceEntity(id, base, duplicate, clazz); base = (T) reduceEntity(base, duplicate);
} }
base.setId(id);
base.setDataInfo(dataInfo); base.setDataInfo(dataInfo);
base.setLastupdatetimestamp(ts); base.setLastupdatetimestamp(ts);

View File

@ -611,7 +611,7 @@ public class SparkDedupTest implements Serializable {
assertEquals(91, pubs.count()); assertEquals(91, pubs.count());
assertEquals(47, sw_deduprecord); assertEquals(47, sw_deduprecord);
assertEquals(97, ds_deduprecord); assertEquals(97, ds_deduprecord);
assertEquals(93, orp_deduprecord); assertEquals(92, orp_deduprecord);
verifyRoot_1(mapper, pubs); verifyRoot_1(mapper, pubs);
@ -751,7 +751,7 @@ public class SparkDedupTest implements Serializable {
assertEquals(100, datasource); assertEquals(100, datasource);
assertEquals(196, softwares); assertEquals(196, softwares);
assertEquals(389, dataset); assertEquals(389, dataset);
assertEquals(521, otherresearchproduct); assertEquals(520, otherresearchproduct);
// System.out.println("publications = " + publications); // System.out.println("publications = " + publications);
// System.out.println("organizations = " + organizations); // System.out.println("organizations = " + organizations);