id generation of representative record moved to the SparkCreateMergeRel job

This commit is contained in:
miconis 2021-04-14 18:06:07 +02:00
parent 1542196a33
commit 3525a8f504
6 changed files with 91 additions and 51 deletions

View File

@ -82,7 +82,7 @@ public class DedupRecordFactory {
final Collection<String> dates = Lists.newArrayList(); final Collection<String> dates = Lists.newArrayList();
final List<List<Author>> authors = Lists.newArrayList(); final List<List<Author>> authors = Lists.newArrayList();
final List<Identifier<T>> bestPids = Lists.newArrayList(); // best pids list // final List<Identifier<T>> bestPids = Lists.newArrayList(); // best pids list
entities entities
.forEachRemaining( .forEachRemaining(
@ -90,7 +90,7 @@ public class DedupRecordFactory {
T duplicate = t._2(); T duplicate = t._2();
// prepare the list of pids to be used for the id generation // prepare the list of pids to be used for the id generation
bestPids.add(Identifier.newInstance(duplicate)); // bestPids.add(Identifier.newInstance(duplicate));
entity.mergeFrom(duplicate); entity.mergeFrom(duplicate);
if (ModelSupport.isSubClass(duplicate, Result.class)) { if (ModelSupport.isSubClass(duplicate, Result.class)) {
@ -109,7 +109,8 @@ public class DedupRecordFactory {
((Result) entity).setAuthor(AuthorMerger.merge(authors)); ((Result) entity).setAuthor(AuthorMerger.merge(authors));
} }
entity.setId(IdGenerator.generate(bestPids, id)); // entity.setId(IdGenerator.generate(bestPids, id));
entity.setId(id);
entity.setLastupdatetimestamp(ts); entity.setLastupdatetimestamp(ts);
entity.setDataInfo(dataInfo); entity.setDataInfo(dataInfo);

View File

@ -7,6 +7,8 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
@ -100,10 +102,8 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity);
final JavaPairRDD<Object, String> vertexes = sc //<hash(id), json>
.textFile(graphBasePath + "/" + subEntity) JavaPairRDD<Object, String> vertexes = createVertexes(sc, graphBasePath, subEntity, dedupConf);
.map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s))
.mapToPair((PairFunction<String, Object, String>) s -> new Tuple2<>(hash(s), s));
final RDD<Edge<String>> edgeRdd = spark final RDD<Edge<String>> edgeRdd = spark
.read() .read()
@ -116,9 +116,9 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final Dataset<Relation> mergeRels = spark final Dataset<Relation> mergeRels = spark
.createDataset( .createDataset(
GraphProcessor GraphProcessor
.findCCs(vertexes.rdd(), edgeRdd, maxIterations, cut) .findCCs(vertexes.rdd(), edgeRdd, subEntity, maxIterations, cut)
.toJavaRDD() .toJavaRDD()
.filter(k -> k.getDocIds().size() > 1) .filter(k -> k.getEntities().size() > 1)
.flatMap(cc -> ccToMergeRel(cc, dedupConf)) .flatMap(cc -> ccToMergeRel(cc, dedupConf))
.rdd(), .rdd(),
Encoders.bean(Relation.class)); Encoders.bean(Relation.class));
@ -128,16 +128,26 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
} }
} }
public Iterator<Relation> ccToMergeRel(ConnectedComponent cc, DedupConfig dedupConf) { private JavaPairRDD<Object, String> createVertexes(JavaSparkContext sc, String graphBasePath, String subEntity, DedupConfig dedupConf) throws IOException {
return sc
.textFile(graphBasePath + "/" + subEntity)
.mapToPair(json -> {
String id = MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), json);
return new Tuple2<>(hash(id), json);
});
}
private Iterator<Relation> ccToMergeRel(ConnectedComponent cc, DedupConfig dedupConf) {
return cc return cc
.getDocIds() .getEntities()
.stream() .stream()
.flatMap( .flatMap(
id -> { e -> {
List<Relation> tmp = new ArrayList<>(); List<Relation> tmp = new ArrayList<>();
tmp.add(rel(cc.getCcId(), id, ModelConstants.MERGES, dedupConf)); tmp.add(rel(cc.getCcId(), MapDocumentUtil.getJPathString("$.id", e), ModelConstants.MERGES, dedupConf));
tmp.add(rel(id, cc.getCcId(), ModelConstants.IS_MERGED_IN, dedupConf)); tmp.add(rel( MapDocumentUtil.getJPathString("$.id", e), cc.getCcId(), ModelConstants.IS_MERGED_IN, dedupConf));
return tmp.stream(); return tmp.stream();
}) })

View File

@ -3,10 +3,23 @@ package eu.dnetlib.dhp.oa.dedup.graph;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.oa.dedup.IdGenerator;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.util.MapDocumentUtil;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.function.MapFunction;
import org.codehaus.jackson.annotate.JsonIgnore; import org.codehaus.jackson.annotate.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@ -17,30 +30,48 @@ import eu.dnetlib.pace.util.PaceException;
public class ConnectedComponent implements Serializable { public class ConnectedComponent implements Serializable {
private Set<String> docIds;
private String ccId; private String ccId;
private Set<String> entities;
public ConnectedComponent(Set<String> docIds, final int cut) { protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
this.docIds = docIds; .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
createID();
if (cut > 0 && docIds.size() > cut) { public <T extends OafEntity> ConnectedComponent(Set<String> entities, String subEntity, final int cut) {
this.docIds = docIds this.entities = entities;
final Class<T> clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
List<Identifier<T>> identifiers = Lists.newArrayList();
entities.forEach(e -> {
try {
T entity = OBJECT_MAPPER.readValue(e, clazz);
identifiers.add(Identifier.newInstance(entity));
} catch (IOException e1) {
}
});
this.ccId = IdGenerator.generate(
identifiers,
createDefaultID()
);
if (cut > 0 && entities.size() > cut) {
this.entities = entities
.stream() .stream()
.filter(s -> !ccId.equalsIgnoreCase(s)) .filter(e -> !ccId.equalsIgnoreCase(MapDocumentUtil.getJPathString("$.id", e)))
.limit(cut - 1) .limit(cut - 1)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
this.docIds.add(ccId);
} }
} }
public String createID() { public String createDefaultID() {
if (docIds.size() > 1) { if (entities.size() > 1) {
final String s = getMin(); final String s = getMin();
String prefix = s.split("\\|")[0]; String prefix = s.split("\\|")[0];
ccId = prefix + "|dedup_wf_001::" + DHPUtils.md5(s); ccId = prefix + "|dedup_wf_001::" + DHPUtils.md5(s);
return ccId; return ccId;
} else { } else {
return docIds.iterator().next(); return MapDocumentUtil.getJPathString("$.id", entities.iterator().next());
} }
} }
@ -49,15 +80,15 @@ public class ConnectedComponent implements Serializable {
final StringBuilder min = new StringBuilder(); final StringBuilder min = new StringBuilder();
docIds entities
.forEach( .forEach(
i -> { e -> {
if (StringUtils.isBlank(min.toString())) { if (StringUtils.isBlank(min.toString())) {
min.append(i); min.append(MapDocumentUtil.getJPathString("$.id", e));
} else { } else {
if (min.toString().compareTo(i) > 0) { if (min.toString().compareTo(MapDocumentUtil.getJPathString("$.id", e)) > 0) {
min.setLength(0); min.setLength(0);
min.append(i); min.append(MapDocumentUtil.getJPathString("$.id", e));
} }
} }
}); });
@ -74,12 +105,12 @@ public class ConnectedComponent implements Serializable {
} }
} }
public Set<String> getDocIds() { public Set<String> getEntities() {
return docIds; return entities;
} }
public void setDocIds(Set<String> docIds) { public void setEntities(Set<String> docIds) {
this.docIds = docIds; this.entities = entities;
} }
public String getCcId() { public String getCcId() {

View File

@ -7,7 +7,7 @@ import scala.collection.JavaConversions;
object GraphProcessor { object GraphProcessor {
def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = { def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], subEntity: String, maxIterations: Int, cut:Int): RDD[ConnectedComponent] = {
val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
val cc = graph.connectedComponents(maxIterations).vertices val cc = graph.connectedComponents(maxIterations).vertices
@ -22,15 +22,15 @@ object GraphProcessor {
} }
} }
val connectedComponents = joinResult.groupByKey() val connectedComponents = joinResult.groupByKey()
.map[ConnectedComponent](cc => asConnectedComponent(cc, cut)) .map[ConnectedComponent](cc => asConnectedComponent(cc, subEntity, cut))
connectedComponents connectedComponents
} }
def asConnectedComponent(group: (VertexId, Iterable[String]), cut:Int): ConnectedComponent = { def asConnectedComponent(group: (VertexId, Iterable[String]), subEntity: String,cut:Int): ConnectedComponent = {
val docs = group._2.toSet[String] val docs = group._2.toSet[String]
val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs), cut); val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs), subEntity, cut);
connectedComponent connectedComponent
} }

View File

@ -1,16 +1,7 @@
package eu.dnetlib.dhp.oa.dedup.model; package eu.dnetlib.dhp.oa.dedup.model;
import java.io.Serializable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import eu.dnetlib.dhp.oa.dedup.DatePicker; import eu.dnetlib.dhp.oa.dedup.DatePicker;
import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
@ -18,6 +9,12 @@ import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.PidComparator; import eu.dnetlib.dhp.schema.oaf.utils.PidComparator;
import eu.dnetlib.dhp.schema.oaf.utils.PidType; import eu.dnetlib.dhp.schema.oaf.utils.PidType;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.stream.Collectors;
public class Identifier<T extends OafEntity> implements Serializable, Comparable<Identifier> { public class Identifier<T extends OafEntity> implements Serializable, Comparable<Identifier> {

View File

@ -353,6 +353,7 @@ public class SparkDedupTest implements Serializable {
assertEquals(288, sw_mergerel); assertEquals(288, sw_mergerel);
assertEquals(472, ds_mergerel); assertEquals(472, ds_mergerel);
assertEquals(718, orp_mergerel); assertEquals(718, orp_mergerel);
} }
@Test @Test
@ -538,7 +539,7 @@ public class SparkDedupTest implements Serializable {
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
assertEquals(4858, relations); assertEquals(4862, relations);
// check deletedbyinference // check deletedbyinference
final Dataset<Relation> mergeRels = spark final Dataset<Relation> mergeRels = spark