package eu.dnetlib.dhp.oa.dedup; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.pace.config.DedupConfig; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import scala.Tuple2; import java.util.Collection; import java.util.Iterator; public class DedupRecordFactory { protected static final ObjectMapper OBJECT_MAPPER = new com.fasterxml.jackson.databind.ObjectMapper() .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); public static Dataset createDedupRecord( final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final Class clazz, final DedupConfig dedupConf) { long ts = System.currentTimeMillis(); // Dataset> entities = spark.read() .textFile(entitiesInputPath) .map((MapFunction>) it -> { T entity = OBJECT_MAPPER.readValue(it, clazz); return new Tuple2<>(entity.getId(), entity); }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); //: source is the dedup_id, target is the id of the mergedIn Dataset> mergeRels = spark .read() .load(mergeRelsInputPath) .as(Encoders.bean(Relation.class)) .where("relClass == 'merges'") .map((MapFunction>) r -> new Tuple2<>(r.getSource(), r.getTarget()), Encoders.tuple(Encoders.STRING(), Encoders.STRING())); // return mergeRels.joinWith(entities, mergeRels.col("_1").equalTo(entities.col("_1")), "left_outer") .filter((FilterFunction, Tuple2>>) value -> value._2() != null) .map((MapFunction, Tuple2>, T>) value -> value._2()._2(), Encoders.kryo(clazz)) .groupByKey((MapFunction) value -> value.getId(), Encoders.STRING()) .mapGroups((MapGroupsFunction) (key, values) -> entityMerger(key, values, ts, clazz), Encoders.bean(clazz)); } private static T entityMerger(String id, Iterator entities, final long ts, Class clazz) { try { T entity = clazz.newInstance(); entity.setId(id); if (entity.getDataInfo() == null) { entity.setDataInfo(new DataInfo()); } entity.getDataInfo().setTrust("0.9"); entity.setLastupdatetimestamp(ts); final Collection dates = Lists.newArrayList(); entities.forEachRemaining(e -> { entity.mergeFrom(e); if (ModelSupport.isSubClass(e, Result.class)) { Result r1 = (Result) e; Result er = (Result) entity; er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor())); if (er.getDateofacceptance() != null) { dates.add(r1.getDateofacceptance().getValue()); } } }); if (ModelSupport.isSubClass(entity, Result.class)) { ((Result) entity).setDateofacceptance(DatePicker.pick(dates)); } return entity; } catch (IllegalAccessException | InstantiationException e) { throw new RuntimeException(e); } } }