forked from D-Net/dnet-hadoop
Merge remote-tracking branch 'origin/stable_ids' into stable_ids
This commit is contained in:
commit
644aa8f40c
|
@ -57,7 +57,7 @@ public class IdentifierFactory implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<StructuredProperty> getPids(List<StructuredProperty> pid, KeyValue collectedFrom) {
|
public static List<StructuredProperty> getPids(List<StructuredProperty> pid, KeyValue collectedFrom) {
|
||||||
return pidFromInstance(pid, collectedFrom).distinct().collect(Collectors.toList());
|
return pidFromInstance(pid, collectedFrom, true).distinct().collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T extends Result> String createDOIBoostIdentifier(T entity) {
|
public static <T extends Result> String createDOIBoostIdentifier(T entity) {
|
||||||
|
@ -104,7 +104,7 @@ public class IdentifierFactory implements Serializable {
|
||||||
|
|
||||||
checkArgument(StringUtils.isNoneBlank(entity.getId()), "missing entity identifier");
|
checkArgument(StringUtils.isNoneBlank(entity.getId()), "missing entity identifier");
|
||||||
|
|
||||||
final Map<String, List<StructuredProperty>> pids = extractPids(entity);
|
final Map<String, Set<StructuredProperty>> pids = extractPids(entity);
|
||||||
|
|
||||||
return pids
|
return pids
|
||||||
.values()
|
.values()
|
||||||
|
@ -125,7 +125,7 @@ public class IdentifierFactory implements Serializable {
|
||||||
.orElseGet(entity::getId);
|
.orElseGet(entity::getId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T extends OafEntity> Map<String, List<StructuredProperty>> extractPids(T entity) {
|
private static <T extends OafEntity> Map<String, Set<StructuredProperty>> extractPids(T entity) {
|
||||||
if (entity instanceof Result) {
|
if (entity instanceof Result) {
|
||||||
return Optional
|
return Optional
|
||||||
.ofNullable(((Result) entity).getInstance())
|
.ofNullable(((Result) entity).getInstance())
|
||||||
|
@ -142,23 +142,24 @@ public class IdentifierFactory implements Serializable {
|
||||||
Collectors
|
Collectors
|
||||||
.groupingBy(
|
.groupingBy(
|
||||||
p -> p.getQualifier().getClassid(),
|
p -> p.getQualifier().getClassid(),
|
||||||
Collectors.mapping(p -> p, Collectors.toList())));
|
Collectors.mapping(p -> p, Collectors.toCollection(HashSet::new))));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Map<String, List<StructuredProperty>> mapPids(List<Instance> instance) {
|
private static Map<String, Set<StructuredProperty>> mapPids(List<Instance> instance) {
|
||||||
return instance
|
return instance
|
||||||
.stream()
|
.stream()
|
||||||
.map(i -> pidFromInstance(i.getPid(), i.getCollectedfrom()))
|
.map(i -> pidFromInstance(i.getPid(), i.getCollectedfrom(), false))
|
||||||
.flatMap(Function.identity())
|
.flatMap(Function.identity())
|
||||||
.collect(
|
.collect(
|
||||||
Collectors
|
Collectors
|
||||||
.groupingBy(
|
.groupingBy(
|
||||||
p -> p.getQualifier().getClassid(),
|
p -> p.getQualifier().getClassid(),
|
||||||
Collectors.mapping(p -> p, Collectors.toList())));
|
Collectors.mapping(p -> p, Collectors.toCollection(HashSet::new))));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Stream<StructuredProperty> pidFromInstance(List<StructuredProperty> pid, KeyValue collectedFrom) {
|
private static Stream<StructuredProperty> pidFromInstance(List<StructuredProperty> pid, KeyValue collectedFrom,
|
||||||
|
boolean mapHandles) {
|
||||||
return Optional
|
return Optional
|
||||||
.ofNullable(pid)
|
.ofNullable(pid)
|
||||||
.map(
|
.map(
|
||||||
|
@ -167,16 +168,16 @@ public class IdentifierFactory implements Serializable {
|
||||||
// filter away PIDs provided by a DS that is not considered an authority for the
|
// filter away PIDs provided by a DS that is not considered an authority for the
|
||||||
// given PID Type
|
// given PID Type
|
||||||
.filter(p -> {
|
.filter(p -> {
|
||||||
return shouldFilterPid(collectedFrom, p);
|
return shouldFilterPid(collectedFrom, p, mapHandles);
|
||||||
})
|
})
|
||||||
.map(CleaningFunctions::normalizePidValue)
|
.map(CleaningFunctions::normalizePidValue)
|
||||||
.filter(CleaningFunctions::pidFilter))
|
.filter(CleaningFunctions::pidFilter))
|
||||||
.orElse(Stream.empty());
|
.orElse(Stream.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean shouldFilterPid(KeyValue collectedFrom, StructuredProperty p) {
|
private static boolean shouldFilterPid(KeyValue collectedFrom, StructuredProperty p, boolean mapHandles) {
|
||||||
final PidType pType = PidType.tryValueOf(p.getQualifier().getClassid());
|
final PidType pType = PidType.tryValueOf(p.getQualifier().getClassid());
|
||||||
return pType.equals(PidType.handle) || Optional.ofNullable(collectedFrom).isPresent() &&
|
return (mapHandles && pType.equals(PidType.handle)) || Optional.ofNullable(collectedFrom).isPresent() &&
|
||||||
Optional
|
Optional
|
||||||
.ofNullable(PID_AUTHORITY.get(pType))
|
.ofNullable(PID_AUTHORITY.get(pType))
|
||||||
.map(authorities -> {
|
.map(authorities -> {
|
||||||
|
|
|
@ -31,6 +31,9 @@ public class IdentifierFactoryTest {
|
||||||
verifyIdentifier(
|
verifyIdentifier(
|
||||||
"publication_doi3.json", "50|pmc_________::94e4cb08c93f8733b48e2445d04002ac", true);
|
"publication_doi3.json", "50|pmc_________::94e4cb08c93f8733b48e2445d04002ac", true);
|
||||||
|
|
||||||
|
verifyIdentifier(
|
||||||
|
"publication_doi4.json", "50|od______2852::38861c44e6052a8d49f59a4c39ba5e66", true);
|
||||||
|
|
||||||
verifyIdentifier(
|
verifyIdentifier(
|
||||||
"publication_pmc1.json", "50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f", true);
|
"publication_pmc1.json", "50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f", true);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
{
|
||||||
|
"id": "50|od______2852::38861c44e6052a8d49f59a4c39ba5e66",
|
||||||
|
"instance": [
|
||||||
|
{
|
||||||
|
"collectedfrom": {
|
||||||
|
"key": "10|openaire____::1234",
|
||||||
|
"value": "Zenodo"
|
||||||
|
},
|
||||||
|
"pid": [
|
||||||
|
{
|
||||||
|
"qualifier": {"classid": "doi"},
|
||||||
|
"value": "10.1016/j.cmet.2010.03.013"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"qualifier": {"classid": "handle"},
|
||||||
|
"value": "11012/83840"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"collectedfrom": {
|
||||||
|
"key": "10|opendoar____::2852",
|
||||||
|
"value": "Digital library of Brno University of Technology"
|
||||||
|
},
|
||||||
|
"pid": [
|
||||||
|
{
|
||||||
|
"qualifier": {"classid": "pmc"},
|
||||||
|
"value": "21459329"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"qualifier": {"classid": "handle"},
|
||||||
|
"value": "11012/83840"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
|
@ -36,7 +36,6 @@ public class ModelConstants {
|
||||||
public static final String DNET_COUNTRY_TYPE = "dnet:countries";
|
public static final String DNET_COUNTRY_TYPE = "dnet:countries";
|
||||||
public static final String DNET_REVIEW_LEVELS = "dnet:review_levels";
|
public static final String DNET_REVIEW_LEVELS = "dnet:review_levels";
|
||||||
public static final String DNET_PROGRAMMING_LANGUAGES = "dnet:programming_languages";
|
public static final String DNET_PROGRAMMING_LANGUAGES = "dnet:programming_languages";
|
||||||
public static final String DNET_PROVENANCEACTIONS = "dnet:provenanceActions";
|
|
||||||
public static final String DNET_EXTERNAL_REF_TYPES = "dnet:externalReference_typologies";
|
public static final String DNET_EXTERNAL_REF_TYPES = "dnet:externalReference_typologies";
|
||||||
|
|
||||||
public static final String SYSIMPORT_CROSSWALK_REPOSITORY = "sysimport:crosswalk:repository";
|
public static final String SYSIMPORT_CROSSWALK_REPOSITORY = "sysimport:crosswalk:repository";
|
||||||
|
@ -50,7 +49,7 @@ public class ModelConstants {
|
||||||
public static final String PROVENANCE_DEDUP = "sysimport:dedup";
|
public static final String PROVENANCE_DEDUP = "sysimport:dedup";
|
||||||
|
|
||||||
public static final Qualifier PROVENANCE_ACTION_SET_QUALIFIER = qualifier(
|
public static final Qualifier PROVENANCE_ACTION_SET_QUALIFIER = qualifier(
|
||||||
SYSIMPORT_ACTIONSET, SYSIMPORT_ACTIONSET, DNET_PROVENANCEACTIONS, DNET_PROVENANCEACTIONS);
|
SYSIMPORT_ACTIONSET, SYSIMPORT_ACTIONSET, DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS);
|
||||||
|
|
||||||
public static final String DATASET_RESULTTYPE_CLASSID = "dataset";
|
public static final String DATASET_RESULTTYPE_CLASSID = "dataset";
|
||||||
public static final String PUBLICATION_RESULTTYPE_CLASSID = "publication";
|
public static final String PUBLICATION_RESULTTYPE_CLASSID = "publication";
|
||||||
|
|
|
@ -82,16 +82,12 @@ public class DedupRecordFactory {
|
||||||
|
|
||||||
final Collection<String> dates = Lists.newArrayList();
|
final Collection<String> dates = Lists.newArrayList();
|
||||||
final List<List<Author>> authors = Lists.newArrayList();
|
final List<List<Author>> authors = Lists.newArrayList();
|
||||||
final List<Identifier<T>> bestPids = Lists.newArrayList(); // best pids list
|
|
||||||
|
|
||||||
entities
|
entities
|
||||||
.forEachRemaining(
|
.forEachRemaining(
|
||||||
t -> {
|
t -> {
|
||||||
T duplicate = t._2();
|
T duplicate = t._2();
|
||||||
|
|
||||||
// prepare the list of pids to be used for the id generation
|
|
||||||
bestPids.add(Identifier.newInstance(duplicate));
|
|
||||||
|
|
||||||
entity.mergeFrom(duplicate);
|
entity.mergeFrom(duplicate);
|
||||||
if (ModelSupport.isSubClass(duplicate, Result.class)) {
|
if (ModelSupport.isSubClass(duplicate, Result.class)) {
|
||||||
Result r1 = (Result) duplicate;
|
Result r1 = (Result) duplicate;
|
||||||
|
@ -109,7 +105,7 @@ public class DedupRecordFactory {
|
||||||
((Result) entity).setAuthor(AuthorMerger.merge(authors));
|
((Result) entity).setAuthor(AuthorMerger.merge(authors));
|
||||||
}
|
}
|
||||||
|
|
||||||
entity.setId(IdGenerator.generate(bestPids, id));
|
entity.setId(id);
|
||||||
|
|
||||||
entity.setLastupdatetimestamp(ts);
|
entity.setLastupdatetimestamp(ts);
|
||||||
entity.setDataInfo(dataInfo);
|
entity.setDataInfo(dataInfo);
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.dedup;
|
package eu.dnetlib.dhp.oa.dedup;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
|
||||||
|
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
@ -27,8 +30,6 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
|
private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
|
||||||
|
|
||||||
public static final String ROOT_TRUST = "0.8";
|
public static final String ROOT_TRUST = "0.8";
|
||||||
public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup";
|
|
||||||
public static final String PROVENANCE_ACTIONS = "dnet:provenanceActions";
|
|
||||||
|
|
||||||
public SparkCreateDedupRecord(ArgumentApplicationParser parser, SparkSession spark) {
|
public SparkCreateDedupRecord(ArgumentApplicationParser parser, SparkSession spark) {
|
||||||
super(parser, spark);
|
super(parser, spark);
|
||||||
|
@ -94,10 +95,10 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
|
||||||
info.setTrust(ROOT_TRUST);
|
info.setTrust(ROOT_TRUST);
|
||||||
info.setInferenceprovenance(dedupConf.getWf().getConfigurationId());
|
info.setInferenceprovenance(dedupConf.getWf().getConfigurationId());
|
||||||
Qualifier provenance = new Qualifier();
|
Qualifier provenance = new Qualifier();
|
||||||
provenance.setClassid(PROVENANCE_ACTION_CLASS);
|
provenance.setClassid(PROVENANCE_DEDUP);
|
||||||
provenance.setClassname(PROVENANCE_ACTION_CLASS);
|
provenance.setClassname(PROVENANCE_DEDUP);
|
||||||
provenance.setSchemeid(PROVENANCE_ACTIONS);
|
provenance.setSchemeid(DNET_PROVENANCE_ACTIONS);
|
||||||
provenance.setSchemename(PROVENANCE_ACTIONS);
|
provenance.setSchemename(DNET_PROVENANCE_ACTIONS);
|
||||||
info.setProvenanceaction(provenance);
|
info.setProvenanceaction(provenance);
|
||||||
return info;
|
return info;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,18 +1,20 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.dedup;
|
package eu.dnetlib.dhp.oa.dedup;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
|
||||||
|
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.Iterator;
|
import java.util.stream.Collectors;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.api.java.function.PairFunction;
|
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||||
import org.apache.spark.graphx.Edge;
|
import org.apache.spark.graphx.Edge;
|
||||||
import org.apache.spark.rdd.RDD;
|
import org.apache.spark.rdd.RDD;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
|
@ -23,14 +25,18 @@ import org.dom4j.DocumentException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.hash.Hashing;
|
import com.google.common.hash.Hashing;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent;
|
import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent;
|
||||||
import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor;
|
import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor;
|
||||||
|
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
|
||||||
|
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||||
|
@ -42,9 +48,7 @@ import scala.Tuple2;
|
||||||
|
|
||||||
public class SparkCreateMergeRels extends AbstractSparkAction {
|
public class SparkCreateMergeRels extends AbstractSparkAction {
|
||||||
|
|
||||||
public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup";
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkCreateMergeRels.class);
|
private static final Logger log = LoggerFactory.getLogger(SparkCreateMergeRels.class);
|
||||||
public static final String DNET_PROVENANCE_ACTIONS = "dnet:provenanceActions";
|
|
||||||
|
|
||||||
public SparkCreateMergeRels(ArgumentApplicationParser parser, SparkSession spark) {
|
public SparkCreateMergeRels(ArgumentApplicationParser parser, SparkSession spark) {
|
||||||
super(parser, spark);
|
super(parser, spark);
|
||||||
|
@ -92,6 +96,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
|
||||||
|
|
||||||
for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
|
for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
|
||||||
final String subEntity = dedupConf.getWf().getSubEntityValue();
|
final String subEntity = dedupConf.getWf().getSubEntityValue();
|
||||||
|
final Class<OafEntity> clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
|
||||||
|
|
||||||
log.info("Creating mergerels for: '{}'", subEntity);
|
log.info("Creating mergerels for: '{}'", subEntity);
|
||||||
|
|
||||||
|
@ -100,10 +105,8 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
|
||||||
|
|
||||||
final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity);
|
final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity);
|
||||||
|
|
||||||
final JavaPairRDD<Object, String> vertexes = sc
|
// <hash(id), id>
|
||||||
.textFile(graphBasePath + "/" + subEntity)
|
JavaPairRDD<Object, String> vertexes = createVertexes(sc, graphBasePath, subEntity, dedupConf);
|
||||||
.map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s))
|
|
||||||
.mapToPair((PairFunction<String, Object, String>) s -> new Tuple2<>(hash(s), s));
|
|
||||||
|
|
||||||
final RDD<Edge<String>> edgeRdd = spark
|
final RDD<Edge<String>> edgeRdd = spark
|
||||||
.read()
|
.read()
|
||||||
|
@ -113,14 +116,42 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
|
||||||
.map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass()))
|
.map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass()))
|
||||||
.rdd();
|
.rdd();
|
||||||
|
|
||||||
final Dataset<Relation> mergeRels = spark
|
Dataset<Tuple2<String, String>> rawMergeRels = spark
|
||||||
.createDataset(
|
.createDataset(
|
||||||
GraphProcessor
|
GraphProcessor
|
||||||
.findCCs(vertexes.rdd(), edgeRdd, maxIterations, cut)
|
.findCCs(vertexes.rdd(), edgeRdd, maxIterations, cut)
|
||||||
.toJavaRDD()
|
.toJavaRDD()
|
||||||
.filter(k -> k.getDocIds().size() > 1)
|
.filter(k -> k.getIds().size() > 1)
|
||||||
.flatMap(cc -> ccToMergeRel(cc, dedupConf))
|
.flatMap(this::ccToRels)
|
||||||
.rdd(),
|
.rdd(),
|
||||||
|
Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
|
||||||
|
|
||||||
|
Dataset<Tuple2<String, OafEntity>> entities = spark
|
||||||
|
.read()
|
||||||
|
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
|
||||||
|
.map(
|
||||||
|
(MapFunction<String, Tuple2<String, OafEntity>>) it -> {
|
||||||
|
OafEntity entity = OBJECT_MAPPER.readValue(it, clazz);
|
||||||
|
return new Tuple2<>(entity.getId(), entity);
|
||||||
|
},
|
||||||
|
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
|
||||||
|
|
||||||
|
Dataset<Relation> mergeRels = rawMergeRels
|
||||||
|
.joinWith(entities, rawMergeRels.col("_2").equalTo(entities.col("_1")), "inner")
|
||||||
|
// <tmp_source,target>,<target,entity>
|
||||||
|
.map(
|
||||||
|
(MapFunction<Tuple2<Tuple2<String, String>, Tuple2<String, OafEntity>>, Tuple2<String, OafEntity>>) value -> new Tuple2<>(
|
||||||
|
value._1()._1(), value._2()._2()),
|
||||||
|
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)))
|
||||||
|
// <tmp_source,entity>
|
||||||
|
.groupByKey(
|
||||||
|
(MapFunction<Tuple2<String, OafEntity>, String>) Tuple2::_1, Encoders.STRING())
|
||||||
|
.mapGroups(
|
||||||
|
(MapGroupsFunction<String, Tuple2<String, OafEntity>, ConnectedComponent>) this::generateID,
|
||||||
|
Encoders.bean(ConnectedComponent.class))
|
||||||
|
// <root_id, list(target)>
|
||||||
|
.flatMap(
|
||||||
|
(FlatMapFunction<ConnectedComponent, Relation>) cc -> ccToMergeRel(cc, dedupConf),
|
||||||
Encoders.bean(Relation.class));
|
Encoders.bean(Relation.class));
|
||||||
|
|
||||||
mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath);
|
mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath);
|
||||||
|
@ -128,9 +159,45 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Iterator<Relation> ccToMergeRel(ConnectedComponent cc, DedupConfig dedupConf) {
|
private <T extends OafEntity> ConnectedComponent generateID(String key, Iterator<Tuple2<String, T>> values) {
|
||||||
|
|
||||||
|
List<Identifier<T>> identifiers = Lists.newArrayList(values).stream().map(v -> {
|
||||||
|
T entity = v._2();
|
||||||
|
Identifier<T> identifier = Identifier.newInstance(entity);
|
||||||
|
return identifier;
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
|
||||||
|
String rootID = IdGenerator.generate(identifiers, key);
|
||||||
|
|
||||||
|
if (Objects.equals(rootID, key))
|
||||||
|
throw new IllegalStateException("generated default ID: " + rootID);
|
||||||
|
|
||||||
|
return new ConnectedComponent(rootID,
|
||||||
|
identifiers.stream().map(i -> i.getEntity().getId()).collect(Collectors.toSet()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private JavaPairRDD<Object, String> createVertexes(JavaSparkContext sc, String graphBasePath, String subEntity,
|
||||||
|
DedupConfig dedupConf) {
|
||||||
|
|
||||||
|
return sc
|
||||||
|
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
|
||||||
|
.mapToPair(json -> {
|
||||||
|
String id = MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), json);
|
||||||
|
return new Tuple2<>(hash(id), id);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private Iterator<Tuple2<String, String>> ccToRels(ConnectedComponent cc) {
|
||||||
return cc
|
return cc
|
||||||
.getDocIds()
|
.getIds()
|
||||||
|
.stream()
|
||||||
|
.map(id -> new Tuple2<>(cc.getCcId(), id))
|
||||||
|
.iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Iterator<Relation> ccToMergeRel(ConnectedComponent cc, DedupConfig dedupConf) {
|
||||||
|
return cc
|
||||||
|
.getIds()
|
||||||
.stream()
|
.stream()
|
||||||
.flatMap(
|
.flatMap(
|
||||||
id -> {
|
id -> {
|
||||||
|
@ -161,8 +228,8 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
|
||||||
info.setInvisible(false);
|
info.setInvisible(false);
|
||||||
info.setInferenceprovenance(dedupConf.getWf().getConfigurationId());
|
info.setInferenceprovenance(dedupConf.getWf().getConfigurationId());
|
||||||
Qualifier provenanceAction = new Qualifier();
|
Qualifier provenanceAction = new Qualifier();
|
||||||
provenanceAction.setClassid(PROVENANCE_ACTION_CLASS);
|
provenanceAction.setClassid(PROVENANCE_DEDUP);
|
||||||
provenanceAction.setClassname(PROVENANCE_ACTION_CLASS);
|
provenanceAction.setClassname(PROVENANCE_DEDUP);
|
||||||
provenanceAction.setSchemeid(DNET_PROVENANCE_ACTIONS);
|
provenanceAction.setSchemeid(DNET_PROVENANCE_ACTIONS);
|
||||||
provenanceAction.setSchemename(DNET_PROVENANCE_ACTIONS);
|
provenanceAction.setSchemename(DNET_PROVENANCE_ACTIONS);
|
||||||
info.setProvenanceaction(provenanceAction);
|
info.setProvenanceaction(provenanceAction);
|
||||||
|
|
|
@ -3,44 +3,66 @@ package eu.dnetlib.dhp.oa.dedup.graph;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.codehaus.jackson.annotate.JsonIgnore;
|
import org.codehaus.jackson.annotate.JsonIgnore;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
|
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
|
||||||
|
import eu.dnetlib.dhp.oa.dedup.IdGenerator;
|
||||||
|
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
|
||||||
|
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||||
|
import eu.dnetlib.pace.config.DedupConfig;
|
||||||
|
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||||
import eu.dnetlib.pace.util.PaceException;
|
import eu.dnetlib.pace.util.PaceException;
|
||||||
|
|
||||||
public class ConnectedComponent implements Serializable {
|
public class ConnectedComponent implements Serializable {
|
||||||
|
|
||||||
private Set<String> docIds;
|
|
||||||
private String ccId;
|
private String ccId;
|
||||||
|
private Set<String> ids;
|
||||||
|
|
||||||
public ConnectedComponent(Set<String> docIds, final int cut) {
|
private static final String CONNECTED_COMPONENT_ID_PREFIX = "connect_comp";
|
||||||
this.docIds = docIds;
|
|
||||||
createID();
|
public ConnectedComponent(Set<String> ids, final int cut) {
|
||||||
if (cut > 0 && docIds.size() > cut) {
|
this.ids = ids;
|
||||||
this.docIds = docIds
|
|
||||||
|
this.ccId = createDefaultID();
|
||||||
|
|
||||||
|
if (cut > 0 && ids.size() > cut) {
|
||||||
|
this.ids = ids
|
||||||
.stream()
|
.stream()
|
||||||
.filter(s -> !ccId.equalsIgnoreCase(s))
|
.filter(id -> !ccId.equalsIgnoreCase(id))
|
||||||
.limit(cut - 1)
|
.limit(cut - 1)
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
this.docIds.add(ccId);
|
// this.ids.add(ccId); ??
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public String createID() {
|
public ConnectedComponent(String ccId, Set<String> ids) {
|
||||||
if (docIds.size() > 1) {
|
this.ccId = ccId;
|
||||||
|
this.ids = ids;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String createDefaultID() {
|
||||||
|
if (ids.size() > 1) {
|
||||||
final String s = getMin();
|
final String s = getMin();
|
||||||
String prefix = s.split("\\|")[0];
|
String prefix = s.split("\\|")[0];
|
||||||
ccId = prefix + "|dedup_wf_001::" + DHPUtils.md5(s);
|
ccId = prefix + "|" + CONNECTED_COMPONENT_ID_PREFIX + "::" + DHPUtils.md5(s);
|
||||||
return ccId;
|
return ccId;
|
||||||
} else {
|
} else {
|
||||||
return docIds.iterator().next();
|
return ids.iterator().next();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,15 +71,15 @@ public class ConnectedComponent implements Serializable {
|
||||||
|
|
||||||
final StringBuilder min = new StringBuilder();
|
final StringBuilder min = new StringBuilder();
|
||||||
|
|
||||||
docIds
|
ids
|
||||||
.forEach(
|
.forEach(
|
||||||
i -> {
|
id -> {
|
||||||
if (StringUtils.isBlank(min.toString())) {
|
if (StringUtils.isBlank(min.toString())) {
|
||||||
min.append(i);
|
min.append(id);
|
||||||
} else {
|
} else {
|
||||||
if (min.toString().compareTo(i) > 0) {
|
if (min.toString().compareTo(id) > 0) {
|
||||||
min.setLength(0);
|
min.setLength(0);
|
||||||
min.append(i);
|
min.append(id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -74,12 +96,12 @@ public class ConnectedComponent implements Serializable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<String> getDocIds() {
|
public Set<String> getIds() {
|
||||||
return docIds;
|
return ids;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setDocIds(Set<String> docIds) {
|
public void setIds(Set<String> ids) {
|
||||||
this.docIds = docIds;
|
this.ids = ids;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getCcId() {
|
public String getCcId() {
|
||||||
|
|
|
@ -2,7 +2,6 @@
|
||||||
package eu.dnetlib.dhp.oa.dedup.model;
|
package eu.dnetlib.dhp.oa.dedup.model;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.text.ParseException;
|
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
|
@ -55,7 +55,6 @@ public class SparkDedupTest implements Serializable {
|
||||||
private static String testOutputBasePath;
|
private static String testOutputBasePath;
|
||||||
private static String testDedupGraphBasePath;
|
private static String testDedupGraphBasePath;
|
||||||
private static final String testActionSetId = "test-orchestrator";
|
private static final String testActionSetId = "test-orchestrator";
|
||||||
private static String testDedupAssertionsBasePath;
|
|
||||||
|
|
||||||
@BeforeAll
|
@BeforeAll
|
||||||
public static void cleanUp() throws IOException, URISyntaxException {
|
public static void cleanUp() throws IOException, URISyntaxException {
|
||||||
|
@ -70,10 +69,6 @@ public class SparkDedupTest implements Serializable {
|
||||||
testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
|
testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
|
||||||
.toAbsolutePath()
|
.toAbsolutePath()
|
||||||
.toString();
|
.toString();
|
||||||
testDedupAssertionsBasePath = Paths
|
|
||||||
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/assertions").toURI())
|
|
||||||
.toFile()
|
|
||||||
.getAbsolutePath();
|
|
||||||
|
|
||||||
FileUtils.deleteDirectory(new File(testOutputBasePath));
|
FileUtils.deleteDirectory(new File(testOutputBasePath));
|
||||||
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
|
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
|
||||||
|
@ -353,6 +348,7 @@ public class SparkDedupTest implements Serializable {
|
||||||
assertEquals(288, sw_mergerel);
|
assertEquals(288, sw_mergerel);
|
||||||
assertEquals(472, ds_mergerel);
|
assertEquals(472, ds_mergerel);
|
||||||
assertEquals(718, orp_mergerel);
|
assertEquals(718, orp_mergerel);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -538,7 +534,7 @@ public class SparkDedupTest implements Serializable {
|
||||||
|
|
||||||
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
|
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
|
||||||
|
|
||||||
assertEquals(4858, relations);
|
assertEquals(4862, relations);
|
||||||
|
|
||||||
// check deletedbyinference
|
// check deletedbyinference
|
||||||
final Dataset<Relation> mergeRels = spark
|
final Dataset<Relation> mergeRels = spark
|
||||||
|
|
|
@ -10,6 +10,7 @@ export SOURCE=$1
|
||||||
export SHADOW=$2
|
export SHADOW=$2
|
||||||
|
|
||||||
echo "Updating shadow database"
|
echo "Updating shadow database"
|
||||||
|
impala-shell -q "invalidate metadata"
|
||||||
impala-shell -d ${SOURCE} -q "invalidate metadata"
|
impala-shell -d ${SOURCE} -q "invalidate metadata"
|
||||||
impala-shell -d ${SOURCE} -q "show tables" --delimited | sed "s/^\(.*\)/compute stats ${SOURCE}.\1;/" | impala-shell -c -f -
|
impala-shell -d ${SOURCE} -q "show tables" --delimited | sed "s/^\(.*\)/compute stats ${SOURCE}.\1;/" | impala-shell -c -f -
|
||||||
impala-shell -q "create database if not exists ${SHADOW}"
|
impala-shell -q "create database if not exists ${SHADOW}"
|
||||||
|
|
Loading…
Reference in New Issue