diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index fe9bd74ced..d7ec49ccd0 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -82,7 +82,7 @@ public class DedupRecordFactory { final Collection dates = Lists.newArrayList(); final List> authors = Lists.newArrayList(); - final List> bestPids = Lists.newArrayList(); // best pids list +// final List> bestPids = Lists.newArrayList(); // best pids list entities .forEachRemaining( @@ -90,7 +90,7 @@ public class DedupRecordFactory { T duplicate = t._2(); // prepare the list of pids to be used for the id generation - bestPids.add(Identifier.newInstance(duplicate)); +// bestPids.add(Identifier.newInstance(duplicate)); entity.mergeFrom(duplicate); if (ModelSupport.isSubClass(duplicate, Result.class)) { @@ -109,7 +109,8 @@ public class DedupRecordFactory { ((Result) entity).setAuthor(AuthorMerger.merge(authors)); } - entity.setId(IdGenerator.generate(bestPids, id)); +// entity.setId(IdGenerator.generate(bestPids, id)); + entity.setId(id); entity.setLastupdatetimestamp(ts); entity.setDataInfo(dataInfo); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index c27464f90a..a58ce02318 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -7,6 +7,8 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; @@ -100,10 +102,8 @@ public class SparkCreateMergeRels extends AbstractSparkAction { final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); - final JavaPairRDD vertexes = sc - .textFile(graphBasePath + "/" + subEntity) - .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) - .mapToPair((PairFunction) s -> new Tuple2<>(hash(s), s)); + // + JavaPairRDD vertexes = createVertexes(sc, graphBasePath, subEntity, dedupConf); final RDD> edgeRdd = spark .read() @@ -116,9 +116,9 @@ public class SparkCreateMergeRels extends AbstractSparkAction { final Dataset mergeRels = spark .createDataset( GraphProcessor - .findCCs(vertexes.rdd(), edgeRdd, maxIterations, cut) + .findCCs(vertexes.rdd(), edgeRdd, subEntity, maxIterations, cut) .toJavaRDD() - .filter(k -> k.getDocIds().size() > 1) + .filter(k -> k.getEntities().size() > 1) .flatMap(cc -> ccToMergeRel(cc, dedupConf)) .rdd(), Encoders.bean(Relation.class)); @@ -128,16 +128,26 @@ public class SparkCreateMergeRels extends AbstractSparkAction { } } - public Iterator ccToMergeRel(ConnectedComponent cc, DedupConfig dedupConf) { + private JavaPairRDD createVertexes(JavaSparkContext sc, String graphBasePath, String subEntity, DedupConfig dedupConf) throws IOException { + + return sc + .textFile(graphBasePath + "/" + subEntity) + .mapToPair(json -> { + String id = MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), json); + return new Tuple2<>(hash(id), json); + }); + } + + private Iterator ccToMergeRel(ConnectedComponent cc, DedupConfig dedupConf) { return cc - .getDocIds() + .getEntities() .stream() .flatMap( - id -> { + e -> { List tmp = new ArrayList<>(); - tmp.add(rel(cc.getCcId(), id, ModelConstants.MERGES, dedupConf)); - tmp.add(rel(id, cc.getCcId(), ModelConstants.IS_MERGED_IN, dedupConf)); + tmp.add(rel(cc.getCcId(), MapDocumentUtil.getJPathString("$.id", e), ModelConstants.MERGES, dedupConf)); + tmp.add(rel( MapDocumentUtil.getJPathString("$.id", e), cc.getCcId(), ModelConstants.IS_MERGED_IN, dedupConf)); return tmp.stream(); }) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java index 3f24adb939..e1b12412a4 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java @@ -3,10 +3,23 @@ package eu.dnetlib.dhp.oa.dedup.graph; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.google.common.collect.Lists; +import eu.dnetlib.dhp.oa.dedup.IdGenerator; +import eu.dnetlib.dhp.oa.dedup.model.Identifier; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.commons.lang.StringUtils; +import org.apache.spark.api.java.function.MapFunction; import org.codehaus.jackson.annotate.JsonIgnore; import com.fasterxml.jackson.databind.ObjectMapper; @@ -17,30 +30,48 @@ import eu.dnetlib.pace.util.PaceException; public class ConnectedComponent implements Serializable { - private Set docIds; private String ccId; + private Set entities; - public ConnectedComponent(Set docIds, final int cut) { - this.docIds = docIds; - createID(); - if (cut > 0 && docIds.size() > cut) { - this.docIds = docIds - .stream() - .filter(s -> !ccId.equalsIgnoreCase(s)) - .limit(cut - 1) - .collect(Collectors.toSet()); - this.docIds.add(ccId); + protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + public ConnectedComponent(Set entities, String subEntity, final int cut) { + this.entities = entities; + final Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); + + List> identifiers = Lists.newArrayList(); + + entities.forEach(e -> { + try { + T entity = OBJECT_MAPPER.readValue(e, clazz); + identifiers.add(Identifier.newInstance(entity)); + } catch (IOException e1) { + } + }); + + this.ccId = IdGenerator.generate( + identifiers, + createDefaultID() + ); + + if (cut > 0 && entities.size() > cut) { + this.entities = entities + .stream() + .filter(e -> !ccId.equalsIgnoreCase(MapDocumentUtil.getJPathString("$.id", e))) + .limit(cut - 1) + .collect(Collectors.toSet()); } } - public String createID() { - if (docIds.size() > 1) { + public String createDefaultID() { + if (entities.size() > 1) { final String s = getMin(); String prefix = s.split("\\|")[0]; ccId = prefix + "|dedup_wf_001::" + DHPUtils.md5(s); return ccId; } else { - return docIds.iterator().next(); + return MapDocumentUtil.getJPathString("$.id", entities.iterator().next()); } } @@ -49,15 +80,15 @@ public class ConnectedComponent implements Serializable { final StringBuilder min = new StringBuilder(); - docIds + entities .forEach( - i -> { + e -> { if (StringUtils.isBlank(min.toString())) { - min.append(i); + min.append(MapDocumentUtil.getJPathString("$.id", e)); } else { - if (min.toString().compareTo(i) > 0) { + if (min.toString().compareTo(MapDocumentUtil.getJPathString("$.id", e)) > 0) { min.setLength(0); - min.append(i); + min.append(MapDocumentUtil.getJPathString("$.id", e)); } } }); @@ -74,12 +105,12 @@ public class ConnectedComponent implements Serializable { } } - public Set getDocIds() { - return docIds; + public Set getEntities() { + return entities; } - public void setDocIds(Set docIds) { - this.docIds = docIds; + public void setEntities(Set docIds) { + this.entities = entities; } public String getCcId() { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala index f4dd85d758..d8237410e8 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala @@ -7,7 +7,7 @@ import scala.collection.JavaConversions; object GraphProcessor { - def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = { + def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], subEntity: String, maxIterations: Int, cut:Int): RDD[ConnectedComponent] = { val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby val cc = graph.connectedComponents(maxIterations).vertices @@ -22,15 +22,15 @@ object GraphProcessor { } } val connectedComponents = joinResult.groupByKey() - .map[ConnectedComponent](cc => asConnectedComponent(cc, cut)) + .map[ConnectedComponent](cc => asConnectedComponent(cc, subEntity, cut)) connectedComponents } - def asConnectedComponent(group: (VertexId, Iterable[String]), cut:Int): ConnectedComponent = { + def asConnectedComponent(group: (VertexId, Iterable[String]), subEntity: String,cut:Int): ConnectedComponent = { val docs = group._2.toSet[String] - val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs), cut); + val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs), subEntity, cut); connectedComponent } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java index 94a3bf613d..dded5a6b08 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java @@ -1,16 +1,7 @@ package eu.dnetlib.dhp.oa.dedup.model; -import java.io.Serializable; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.stream.Collectors; - -import org.apache.commons.lang3.StringUtils; - import com.google.common.collect.Sets; - import eu.dnetlib.dhp.oa.dedup.DatePicker; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelConstants; @@ -18,6 +9,12 @@ import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.PidComparator; import eu.dnetlib.dhp.schema.oaf.utils.PidType; +import org.apache.commons.lang3.StringUtils; + +import java.io.Serializable; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.stream.Collectors; public class Identifier implements Serializable, Comparable { diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 851e72deeb..010c569f26 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -353,6 +353,7 @@ public class SparkDedupTest implements Serializable { assertEquals(288, sw_mergerel); assertEquals(472, ds_mergerel); assertEquals(718, orp_mergerel); + } @Test @@ -538,7 +539,7 @@ public class SparkDedupTest implements Serializable { long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); - assertEquals(4858, relations); + assertEquals(4862, relations); // check deletedbyinference final Dataset mergeRels = spark