forked from antonis.lempesis/dnet-hadoop
FIxed merge of dhp dedup
This commit is contained in:
parent
8c9a56a0c8
commit
15d9106b3f
|
@ -0,0 +1,119 @@
|
|||
package eu.dnetlib.dedup;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
||||
import java.time.Year;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.reverseOrder;
|
||||
import static java.util.Map.Entry.comparingByValue;
|
||||
import static java.util.stream.Collectors.toMap;
|
||||
import static org.apache.commons.lang.StringUtils.endsWith;
|
||||
import static org.apache.commons.lang.StringUtils.substringBefore;
|
||||
|
||||
public class DatePicker {
|
||||
|
||||
private static final String DATE_PATTERN = "\\d{4}-\\d{2}-\\d{2}";
|
||||
private static final String DATE_DEFAULT_SUFFIX = "01-01";
|
||||
private static final int YEAR_LB = 1300;
|
||||
private static final int YEAR_UB = Year.now().getValue() + 5;
|
||||
|
||||
public static Field<String> pick(final Collection<String> dateofacceptance) {
|
||||
|
||||
final Map<String, Integer> frequencies = dateofacceptance
|
||||
.parallelStream()
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.collect(
|
||||
Collectors.toConcurrentMap(
|
||||
w -> w, w -> 1, Integer::sum));
|
||||
|
||||
if (frequencies.isEmpty()) {
|
||||
return new Field<>();
|
||||
}
|
||||
|
||||
final Field<String> date = new Field<>();
|
||||
date.setValue(frequencies.keySet().iterator().next());
|
||||
|
||||
// let's sort this map by values first, filtering out invalid dates
|
||||
final Map<String, Integer> sorted = frequencies
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(d -> StringUtils.isNotBlank(d.getKey()))
|
||||
.filter(d -> d.getKey().matches(DATE_PATTERN))
|
||||
.filter(d -> inRange(d.getKey()))
|
||||
.sorted(reverseOrder(comparingByValue()))
|
||||
.collect(
|
||||
toMap(
|
||||
Map.Entry::getKey,
|
||||
Map.Entry::getValue, (e1, e2) -> e2,
|
||||
LinkedHashMap::new));
|
||||
|
||||
// shortcut
|
||||
if (sorted.size() == 0) {
|
||||
return date;
|
||||
}
|
||||
|
||||
// voting method (1/3 + 1) wins
|
||||
if (sorted.size() >= 3) {
|
||||
final int acceptThreshold = (sorted.size() / 3) + 1;
|
||||
final List<String> accepted = sorted.entrySet().stream()
|
||||
.filter(e -> e.getValue() >= acceptThreshold)
|
||||
.map(e -> e.getKey())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// cannot find strong majority
|
||||
if (accepted.isEmpty()) {
|
||||
final int max = sorted.values().iterator().next();
|
||||
Optional<String> first = sorted.entrySet().stream()
|
||||
.filter(e -> e.getValue() == max && !endsWith(e.getKey(), DATE_DEFAULT_SUFFIX))
|
||||
.map(Map.Entry::getKey)
|
||||
.findFirst();
|
||||
if (first.isPresent()) {
|
||||
date.setValue(first.get());
|
||||
return date;
|
||||
}
|
||||
|
||||
date.setValue(sorted.keySet().iterator().next());
|
||||
return date;
|
||||
}
|
||||
|
||||
if (accepted.size() == 1) {
|
||||
date.setValue(accepted.get(0));
|
||||
return date;
|
||||
} else {
|
||||
final Optional<String> first = accepted.stream()
|
||||
.filter(d -> !endsWith(d, DATE_DEFAULT_SUFFIX))
|
||||
.findFirst();
|
||||
if (first.isPresent()) {
|
||||
date.setValue(first.get());
|
||||
return date;
|
||||
}
|
||||
|
||||
return date;
|
||||
}
|
||||
|
||||
//1st non YYYY-01-01 is returned
|
||||
} else {
|
||||
if (sorted.size() == 2) {
|
||||
for (Map.Entry<String, Integer> e : sorted.entrySet()) {
|
||||
if (!endsWith(e.getKey(), DATE_DEFAULT_SUFFIX)) {
|
||||
date.setValue(e.getKey());
|
||||
return date;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// none of the dates seems good enough, return the 1st one
|
||||
date.setValue(sorted.keySet().iterator().next());
|
||||
return date;
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean inRange(final String date) {
|
||||
final int year = Integer.parseInt(substringBefore(date, "-"));
|
||||
return year >= YEAR_LB && year <= YEAR_UB;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,283 @@
|
|||
package eu.dnetlib.dedup;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.google.common.collect.Lists;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.PairFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
public class DedupRecordFactory {
|
||||
|
||||
public static JavaRDD<OafEntity> createDedupRecord(final JavaSparkContext sc, final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final OafEntityType entityType, final DedupConfig dedupConf) {
|
||||
long ts = System.currentTimeMillis();
|
||||
//<id, json_entity>
|
||||
final JavaPairRDD<String, String> inputJsonEntities = sc.textFile(entitiesInputPath)
|
||||
.mapToPair((PairFunction<String, String, String>) it ->
|
||||
new Tuple2<String, String>(MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), it), it)
|
||||
);
|
||||
|
||||
//<source, target>: source is the dedup_id, target is the id of the mergedIn
|
||||
JavaPairRDD<String, String> mergeRels = spark
|
||||
.read().load(mergeRelsInputPath).as(Encoders.bean(Relation.class))
|
||||
.where("relClass=='merges'")
|
||||
.javaRDD()
|
||||
.mapToPair(
|
||||
(PairFunction<Relation, String, String>) r ->
|
||||
new Tuple2<String, String>(r.getTarget(), r.getSource())
|
||||
);
|
||||
|
||||
//<dedup_id, json_entity_merged>
|
||||
final JavaPairRDD<String, String> joinResult = mergeRels.join(inputJsonEntities).mapToPair((PairFunction<Tuple2<String, Tuple2<String, String>>, String, String>) Tuple2::_2);
|
||||
|
||||
JavaPairRDD<String, Iterable<String>> sortedJoinResult = joinResult.groupByKey();
|
||||
|
||||
switch (entityType) {
|
||||
case publication:
|
||||
return sortedJoinResult.map(p -> DedupRecordFactory.publicationMerger(p, ts));
|
||||
case dataset:
|
||||
return sortedJoinResult.map(d -> DedupRecordFactory.datasetMerger(d, ts));
|
||||
case project:
|
||||
return sortedJoinResult.map(p -> DedupRecordFactory.projectMerger(p, ts));
|
||||
case software:
|
||||
return sortedJoinResult.map(s -> DedupRecordFactory.softwareMerger(s, ts));
|
||||
case datasource:
|
||||
return sortedJoinResult.map(d -> DedupRecordFactory.datasourceMerger(d, ts));
|
||||
case organization:
|
||||
return sortedJoinResult.map(o -> DedupRecordFactory.organizationMerger(o, ts));
|
||||
case otherresearchproduct:
|
||||
return sortedJoinResult.map(o -> DedupRecordFactory.otherresearchproductMerger(o, ts));
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static Publication publicationMerger(Tuple2<String, Iterable<String>> e, final long ts) {
|
||||
|
||||
Publication p = new Publication(); //the result of the merge, to be returned at the end
|
||||
|
||||
p.setId(e._1());
|
||||
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
|
||||
|
||||
final Collection<String> dateofacceptance = Lists.newArrayList();
|
||||
|
||||
if (e._2() != null)
|
||||
e._2().forEach(pub -> {
|
||||
try {
|
||||
Publication publication = mapper.readValue(pub, Publication.class);
|
||||
|
||||
p.mergeFrom(publication);
|
||||
p.setAuthor(DedupUtility.mergeAuthor(p.getAuthor(), publication.getAuthor()));
|
||||
//add to the list if they are not null
|
||||
if (publication.getDateofacceptance() != null)
|
||||
dateofacceptance.add(publication.getDateofacceptance().getValue());
|
||||
} catch (Exception exc) {
|
||||
throw new RuntimeException(exc);
|
||||
}
|
||||
});
|
||||
p.setDateofacceptance(DatePicker.pick(dateofacceptance));
|
||||
if (p.getDataInfo() == null)
|
||||
p.setDataInfo(new DataInfo());
|
||||
p.getDataInfo().setTrust("0.9");
|
||||
p.setLastupdatetimestamp(ts);
|
||||
return p;
|
||||
}
|
||||
|
||||
private static Dataset datasetMerger(Tuple2<String, Iterable<String>> e, final long ts) {
|
||||
|
||||
Dataset d = new Dataset(); //the result of the merge, to be returned at the end
|
||||
|
||||
d.setId(e._1());
|
||||
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
|
||||
final Collection<String> dateofacceptance = Lists.newArrayList();
|
||||
|
||||
if (e._2() != null)
|
||||
e._2().forEach(dat -> {
|
||||
try {
|
||||
Dataset dataset = mapper.readValue(dat, Dataset.class);
|
||||
|
||||
d.mergeFrom(dataset);
|
||||
d.setAuthor(DedupUtility.mergeAuthor(d.getAuthor(), dataset.getAuthor()));
|
||||
//add to the list if they are not null
|
||||
if (dataset.getDateofacceptance() != null)
|
||||
dateofacceptance.add(dataset.getDateofacceptance().getValue());
|
||||
} catch (Exception exc) {
|
||||
throw new RuntimeException(exc);
|
||||
}
|
||||
});
|
||||
d.setDateofacceptance(DatePicker.pick(dateofacceptance));
|
||||
if (d.getDataInfo() == null)
|
||||
d.setDataInfo(new DataInfo());
|
||||
d.getDataInfo().setTrust("0.9");
|
||||
d.setLastupdatetimestamp(ts);
|
||||
return d;
|
||||
}
|
||||
|
||||
private static Project projectMerger(Tuple2<String, Iterable<String>> e, final long ts) {
|
||||
|
||||
Project p = new Project(); //the result of the merge, to be returned at the end
|
||||
|
||||
p.setId(e._1());
|
||||
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
if (e._2() != null)
|
||||
e._2().forEach(proj -> {
|
||||
try {
|
||||
Project project = mapper.readValue(proj, Project.class);
|
||||
|
||||
p.mergeFrom(project);
|
||||
} catch (Exception exc) {
|
||||
throw new RuntimeException(exc);
|
||||
}
|
||||
});
|
||||
if (p.getDataInfo() == null)
|
||||
p.setDataInfo(new DataInfo());
|
||||
p.getDataInfo().setTrust("0.9");
|
||||
p.setLastupdatetimestamp(ts);
|
||||
return p;
|
||||
}
|
||||
|
||||
private static Software softwareMerger(Tuple2<String, Iterable<String>> e, final long ts) {
|
||||
|
||||
Software s = new Software(); //the result of the merge, to be returned at the end
|
||||
|
||||
s.setId(e._1());
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
final Collection<String> dateofacceptance = Lists.newArrayList();
|
||||
if (e._2() != null)
|
||||
e._2().forEach(soft -> {
|
||||
try {
|
||||
Software software = mapper.readValue(soft, Software.class);
|
||||
|
||||
s.mergeFrom(software);
|
||||
s.setAuthor(DedupUtility.mergeAuthor(s.getAuthor(), software.getAuthor()));
|
||||
//add to the list if they are not null
|
||||
if (software.getDateofacceptance() != null)
|
||||
dateofacceptance.add(software.getDateofacceptance().getValue());
|
||||
} catch (Exception exc) {
|
||||
throw new RuntimeException(exc);
|
||||
}
|
||||
});
|
||||
s.setDateofacceptance(DatePicker.pick(dateofacceptance));
|
||||
if (s.getDataInfo() == null)
|
||||
s.setDataInfo(new DataInfo());
|
||||
s.getDataInfo().setTrust("0.9");
|
||||
s.setLastupdatetimestamp(ts);
|
||||
return s;
|
||||
}
|
||||
|
||||
private static Datasource datasourceMerger(Tuple2<String, Iterable<String>> e, final long ts) {
|
||||
Datasource d = new Datasource(); //the result of the merge, to be returned at the end
|
||||
d.setId(e._1());
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
if (e._2() != null)
|
||||
e._2().forEach(dat -> {
|
||||
try {
|
||||
Datasource datasource = mapper.readValue(dat, Datasource.class);
|
||||
|
||||
d.mergeFrom(datasource);
|
||||
} catch (Exception exc) {
|
||||
throw new RuntimeException(exc);
|
||||
}
|
||||
});
|
||||
if (d.getDataInfo() == null)
|
||||
d.setDataInfo(new DataInfo());
|
||||
d.getDataInfo().setTrust("0.9");
|
||||
d.setLastupdatetimestamp(ts);
|
||||
return d;
|
||||
}
|
||||
|
||||
private static Organization organizationMerger(Tuple2<String, Iterable<String>> e, final long ts) {
|
||||
|
||||
Organization o = new Organization(); //the result of the merge, to be returned at the end
|
||||
|
||||
o.setId(e._1());
|
||||
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
|
||||
|
||||
StringBuilder trust = new StringBuilder("0.0");
|
||||
|
||||
if (e._2() != null)
|
||||
e._2().forEach(pub -> {
|
||||
try {
|
||||
Organization organization = mapper.readValue(pub, Organization.class);
|
||||
|
||||
final String currentTrust = organization.getDataInfo().getTrust();
|
||||
if (!"1.0".equals(currentTrust)) {
|
||||
trust.setLength(0);
|
||||
trust.append(currentTrust);
|
||||
}
|
||||
o.mergeFrom(organization);
|
||||
|
||||
} catch (Exception exc) {
|
||||
throw new RuntimeException(exc);
|
||||
}
|
||||
});
|
||||
|
||||
if (o.getDataInfo() == null)
|
||||
{
|
||||
o.setDataInfo(new DataInfo());
|
||||
}
|
||||
if (o.getDataInfo() == null)
|
||||
o.setDataInfo(new DataInfo());
|
||||
o.getDataInfo().setTrust("0.9");
|
||||
o.setLastupdatetimestamp(ts);
|
||||
|
||||
return o;
|
||||
}
|
||||
|
||||
private static OtherResearchProduct otherresearchproductMerger(Tuple2<String, Iterable<String>> e, final long ts) {
|
||||
|
||||
OtherResearchProduct o = new OtherResearchProduct(); //the result of the merge, to be returned at the end
|
||||
|
||||
o.setId(e._1());
|
||||
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
|
||||
final Collection<String> dateofacceptance = Lists.newArrayList();
|
||||
|
||||
if (e._2() != null)
|
||||
e._2().forEach(orp -> {
|
||||
try {
|
||||
OtherResearchProduct otherResearchProduct = mapper.readValue(orp, OtherResearchProduct.class);
|
||||
|
||||
o.mergeFrom(otherResearchProduct);
|
||||
o.setAuthor(DedupUtility.mergeAuthor(o.getAuthor(), otherResearchProduct.getAuthor()));
|
||||
//add to the list if they are not null
|
||||
if (otherResearchProduct.getDateofacceptance() != null)
|
||||
dateofacceptance.add(otherResearchProduct.getDateofacceptance().getValue());
|
||||
} catch (Exception exc) {
|
||||
throw new RuntimeException(exc);
|
||||
}
|
||||
});
|
||||
if (o.getDataInfo() == null)
|
||||
o.setDataInfo(new DataInfo());
|
||||
o.setDateofacceptance(DatePicker.pick(dateofacceptance));
|
||||
o.getDataInfo().setTrust("0.9");
|
||||
o.setLastupdatetimestamp(ts);
|
||||
return o;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,217 @@
|
|||
package eu.dnetlib.dedup;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import com.wcohen.ss.JaroWinkler;
|
||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import eu.dnetlib.pace.model.MapDocument;
|
||||
import eu.dnetlib.pace.model.Person;
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.spark.SparkContext;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.util.LongAccumulator;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StringWriter;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.MessageDigest;
|
||||
import java.text.Normalizer;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class DedupUtility {
|
||||
private static final Double THRESHOLD = 0.95;
|
||||
|
||||
public static Map<String, LongAccumulator> constructAccumulator(final DedupConfig dedupConf, final SparkContext context) {
|
||||
|
||||
Map<String, LongAccumulator> accumulators = new HashMap<>();
|
||||
|
||||
String acc1 = String.format("%s::%s", dedupConf.getWf().getEntityType(), "records per hash key = 1");
|
||||
accumulators.put(acc1, context.longAccumulator(acc1));
|
||||
String acc2 = String.format("%s::%s", dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField());
|
||||
accumulators.put(acc2, context.longAccumulator(acc2));
|
||||
String acc3 = String.format("%s::%s", dedupConf.getWf().getEntityType(), String.format("Skipped records for count(%s) >= %s", dedupConf.getWf().getOrderField(), dedupConf.getWf().getGroupMaxSize()));
|
||||
accumulators.put(acc3, context.longAccumulator(acc3));
|
||||
String acc4 = String.format("%s::%s", dedupConf.getWf().getEntityType(), "skip list");
|
||||
accumulators.put(acc4, context.longAccumulator(acc4));
|
||||
String acc5 = String.format("%s::%s", dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)");
|
||||
accumulators.put(acc5, context.longAccumulator(acc5));
|
||||
String acc6 = String.format("%s::%s", dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold());
|
||||
accumulators.put(acc6, context.longAccumulator(acc6));
|
||||
|
||||
return accumulators;
|
||||
}
|
||||
|
||||
public static JavaRDD<String> loadDataFromHDFS(String path, JavaSparkContext context) {
|
||||
return context.textFile(path);
|
||||
}
|
||||
|
||||
public static void deleteIfExists(String path) throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
FileSystem fileSystem = FileSystem.get(conf);
|
||||
if (fileSystem.exists(new Path(path))) {
|
||||
fileSystem.delete(new Path(path), true);
|
||||
}
|
||||
}
|
||||
|
||||
public static DedupConfig loadConfigFromHDFS(String path) throws IOException {
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
FileSystem fileSystem = FileSystem.get(conf);
|
||||
FSDataInputStream inputStream = new FSDataInputStream(fileSystem.open(new Path(path)));
|
||||
|
||||
return DedupConfig.load(IOUtils.toString(inputStream, StandardCharsets.UTF_8.name()));
|
||||
|
||||
}
|
||||
|
||||
static <T> String readFromClasspath(final String filename, final Class<T> clazz) {
|
||||
final StringWriter sw = new StringWriter();
|
||||
try {
|
||||
IOUtils.copy(clazz.getResourceAsStream(filename), sw);
|
||||
return sw.toString();
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeException("cannot load resource from classpath: " + filename);
|
||||
}
|
||||
}
|
||||
|
||||
static Set<String> getGroupingKeys(DedupConfig conf, MapDocument doc) {
|
||||
return Sets.newHashSet(BlacklistAwareClusteringCombiner.filterAndCombine(doc, conf));
|
||||
}
|
||||
|
||||
public static String md5(final String s) {
|
||||
try {
|
||||
final MessageDigest md = MessageDigest.getInstance("MD5");
|
||||
md.update(s.getBytes("UTF-8"));
|
||||
return new String(Hex.encodeHex(md.digest()));
|
||||
} catch (final Exception e) {
|
||||
System.err.println("Error creating id");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static List<Author> mergeAuthor(final List<Author> a, final List<Author> b) {
|
||||
int pa = countAuthorsPids(a);
|
||||
int pb = countAuthorsPids(b);
|
||||
List<Author> base, enrich;
|
||||
int sa = authorsSize(a);
|
||||
int sb = authorsSize(b);
|
||||
|
||||
if (pa == pb) {
|
||||
base = sa > sb ? a : b;
|
||||
enrich = sa > sb ? b : a;
|
||||
} else {
|
||||
base = pa > pb ? a : b;
|
||||
enrich = pa > pb ? b : a;
|
||||
}
|
||||
enrichPidFromList(base, enrich);
|
||||
return base;
|
||||
}
|
||||
|
||||
private static void enrichPidFromList(List<Author> base, List<Author> enrich) {
|
||||
if (base == null || enrich == null)
|
||||
return;
|
||||
final Map<String, Author> basePidAuthorMap = base.stream()
|
||||
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
|
||||
.flatMap(a -> a.getPid()
|
||||
.stream()
|
||||
.map(p -> new Tuple2<>(p.toComparableString(), a))
|
||||
).collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1));
|
||||
|
||||
final List<Tuple2<StructuredProperty, Author>> pidToEnrich = enrich
|
||||
.stream()
|
||||
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
|
||||
.flatMap(a -> a.getPid().stream().filter(p -> !basePidAuthorMap.containsKey(p.toComparableString())).map(p -> new Tuple2<>(p, a)))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
|
||||
pidToEnrich.forEach(a -> {
|
||||
Optional<Tuple2<Double, Author>> simAuhtor = base.stream().map(ba -> new Tuple2<>(sim(ba, a._2()), ba)).max(Comparator.comparing(Tuple2::_1));
|
||||
if (simAuhtor.isPresent() && simAuhtor.get()._1() > THRESHOLD) {
|
||||
Author r = simAuhtor.get()._2();
|
||||
r.getPid().add(a._1());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public static String createEntityPath(final String basePath, final String entityType) {
|
||||
return String.format("%s/%s", basePath, entityType);
|
||||
}
|
||||
|
||||
public static String createSimRelPath(final String basePath, final String entityType) {
|
||||
return String.format("%s/%s/simRel", basePath, entityType);
|
||||
}
|
||||
|
||||
public static String createMergeRelPath(final String basePath, final String entityType) {
|
||||
return String.format("%s/%s/mergeRel", basePath, entityType);
|
||||
}
|
||||
|
||||
private static Double sim(Author a, Author b) {
|
||||
|
||||
final Person pa = parse(a);
|
||||
final Person pb = parse(b);
|
||||
|
||||
if (pa.isAccurate() & pb.isAccurate()) {
|
||||
return new JaroWinkler().score(
|
||||
normalize(pa.getSurnameString()),
|
||||
normalize(pb.getSurnameString()));
|
||||
} else {
|
||||
return new JaroWinkler().score(
|
||||
normalize(pa.getNormalisedFullname()),
|
||||
normalize(pb.getNormalisedFullname()));
|
||||
}
|
||||
}
|
||||
|
||||
private static String normalize(final String s) {
|
||||
return nfd(s).toLowerCase()
|
||||
// do not compact the regexes in a single expression, would cause StackOverflowError in case of large input strings
|
||||
.replaceAll("(\\W)+", " ")
|
||||
.replaceAll("(\\p{InCombiningDiacriticalMarks})+", " ")
|
||||
.replaceAll("(\\p{Punct})+", " ")
|
||||
.replaceAll("(\\d)+", " ")
|
||||
.replaceAll("(\\n)+", " ")
|
||||
.trim();
|
||||
}
|
||||
|
||||
private static String nfd(final String s) {
|
||||
return Normalizer.normalize(s, Normalizer.Form.NFD);
|
||||
}
|
||||
|
||||
private static Person parse(Author author) {
|
||||
if (StringUtils.isNotBlank(author.getSurname())) {
|
||||
return new Person(author.getSurname() + ", " + author.getName(), false);
|
||||
} else {
|
||||
return new Person(author.getFullname(), false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static int countAuthorsPids(List<Author> authors) {
|
||||
if (authors == null)
|
||||
return 0;
|
||||
|
||||
return (int) authors.stream().filter(DedupUtility::hasPid).count();
|
||||
}
|
||||
|
||||
private static int authorsSize(List<Author> authors) {
|
||||
if (authors == null)
|
||||
return 0;
|
||||
return authors.size();
|
||||
}
|
||||
|
||||
private static boolean hasPid(Author a) {
|
||||
if (a == null || a.getPid() == null || a.getPid().size() == 0)
|
||||
return false;
|
||||
return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue()));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,162 @@
|
|||
package eu.dnetlib.dedup;
|
||||
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import eu.dnetlib.pace.model.Field;
|
||||
import eu.dnetlib.pace.model.MapDocument;
|
||||
import eu.dnetlib.pace.util.BlockProcessor;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.Function2;
|
||||
import org.apache.spark.api.java.function.PairFlatMapFunction;
|
||||
import org.apache.spark.api.java.function.PairFunction;
|
||||
import org.apache.spark.util.LongAccumulator;
|
||||
import scala.Serializable;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class Deduper implements Serializable {
|
||||
|
||||
private static final Log log = LogFactory.getLog(Deduper.class);
|
||||
|
||||
/**
|
||||
* @return the list of relations generated by the deduplication
|
||||
* @param: the spark context
|
||||
* @param: list of JSON entities to be deduped
|
||||
* @param: the dedup configuration
|
||||
*/
|
||||
public static JavaPairRDD<String, String> dedup(JavaSparkContext context, JavaRDD<String> entities, DedupConfig config) {
|
||||
|
||||
Map<String, LongAccumulator> accumulators = DedupUtility.constructAccumulator(config, context.sc());
|
||||
|
||||
//create vertexes of the graph: <ID, MapDocument>
|
||||
JavaPairRDD<String, MapDocument> mapDocs = mapToVertexes(context, entities, config);
|
||||
|
||||
|
||||
//create blocks for deduplication
|
||||
JavaPairRDD<String, Iterable<MapDocument>> blocks = createBlocks(context, mapDocs, config);
|
||||
|
||||
//create relations by comparing only elements in the same group
|
||||
return computeRelations(context, blocks, config);
|
||||
|
||||
// final RDD<Edge<String>> edgeRdd = relationRDD.map(it -> new Edge<>(it._1().hashCode(), it._2().hashCode(), "equalTo")).rdd();
|
||||
//
|
||||
// RDD<Tuple2<Object, MapDocument>> vertexes = mapDocs.mapToPair((PairFunction<Tuple2<String, MapDocument>, Object, MapDocument>) t -> new Tuple2<Object, MapDocument>((long) t._1().hashCode(), t._2())).rdd();
|
||||
// accumulators.forEach((name, acc) -> log.info(name + " -> " + acc.value()));
|
||||
//
|
||||
// return GraphProcessor.findCCs(vertexes, edgeRdd, 20).toJavaRDD();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the list of relations generated by the deduplication
|
||||
* @param: the spark context
|
||||
* @param: list of blocks
|
||||
* @param: the dedup configuration
|
||||
*/
|
||||
public static JavaPairRDD<String, String> computeRelations(JavaSparkContext context, JavaPairRDD<String, Iterable<MapDocument>> blocks, DedupConfig config) {
|
||||
|
||||
Map<String, LongAccumulator> accumulators = DedupUtility.constructAccumulator(config, context.sc());
|
||||
|
||||
return blocks.flatMapToPair((PairFlatMapFunction<Tuple2<String, Iterable<MapDocument>>, String, String>) it -> {
|
||||
final SparkReporter reporter = new SparkReporter(accumulators);
|
||||
new BlockProcessor(config).process(it._1(), it._2(), reporter);
|
||||
return reporter.getRelations().iterator();
|
||||
|
||||
}).mapToPair(
|
||||
(PairFunction<Tuple2<String, String>, String, Tuple2<String, String>>) item ->
|
||||
new Tuple2<String, Tuple2<String, String>>(item._1() + item._2(), item))
|
||||
.reduceByKey((a, b) -> a)
|
||||
.mapToPair((PairFunction<Tuple2<String, Tuple2<String, String>>, String, String>) Tuple2::_2);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return the list of blocks based on clustering of dedup configuration
|
||||
* @param: the spark context
|
||||
* @param: list of entities: <id, entity>
|
||||
* @param: the dedup configuration
|
||||
*/
|
||||
public static JavaPairRDD<String, Iterable<MapDocument>> createBlocks(JavaSparkContext context, JavaPairRDD<String, MapDocument> mapDocs, DedupConfig config) {
|
||||
return mapDocs
|
||||
//the reduce is just to be sure that we haven't document with same id
|
||||
.reduceByKey((a, b) -> a)
|
||||
.map(Tuple2::_2)
|
||||
//Clustering: from <id, doc> to List<groupkey,doc>
|
||||
.flatMapToPair((PairFlatMapFunction<MapDocument, String, MapDocument>) a ->
|
||||
DedupUtility.getGroupingKeys(config, a)
|
||||
.stream()
|
||||
.map(it -> new Tuple2<>(it, a))
|
||||
.collect(Collectors.toList())
|
||||
.iterator())
|
||||
.groupByKey();
|
||||
}
|
||||
|
||||
|
||||
public static JavaPairRDD<String, List<MapDocument>> createsortedBlocks(JavaSparkContext context, JavaPairRDD<String, MapDocument> mapDocs, DedupConfig config) {
|
||||
final String of = config.getWf().getOrderField();
|
||||
final int maxQueueSize = config.getWf().getGroupMaxSize();
|
||||
return mapDocs
|
||||
//the reduce is just to be sure that we haven't document with same id
|
||||
.reduceByKey((a, b) -> a)
|
||||
.map(Tuple2::_2)
|
||||
//Clustering: from <id, doc> to List<groupkey,doc>
|
||||
.flatMapToPair((PairFlatMapFunction<MapDocument, String, List<MapDocument>>) a ->
|
||||
DedupUtility.getGroupingKeys(config, a)
|
||||
.stream()
|
||||
.map(it -> {
|
||||
List<MapDocument> tmp = new ArrayList<>();
|
||||
tmp.add(a);
|
||||
return new Tuple2<>(it, tmp);
|
||||
}
|
||||
)
|
||||
.collect(Collectors.toList())
|
||||
.iterator())
|
||||
.reduceByKey((Function2<List<MapDocument>, List<MapDocument>, List<MapDocument>>) (v1, v2) -> {
|
||||
v1.addAll(v2);
|
||||
v1.sort(Comparator.comparing(a -> a.getFieldMap().get(of).stringValue()));
|
||||
if (v1.size() > maxQueueSize)
|
||||
return new ArrayList<>(v1.subList(0, maxQueueSize));
|
||||
return v1;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the list of vertexes: <id, mapDocument>
|
||||
* @param: the spark context
|
||||
* @param: list of JSON entities
|
||||
* @param: the dedup configuration
|
||||
*/
|
||||
public static JavaPairRDD<String, MapDocument> mapToVertexes(JavaSparkContext context, JavaRDD<String> entities, DedupConfig config) {
|
||||
|
||||
return entities.mapToPair((PairFunction<String, String, MapDocument>) s -> {
|
||||
|
||||
MapDocument mapDocument = MapDocumentUtil.asMapDocumentWithJPath(config, s);
|
||||
return new Tuple2<String, MapDocument>(mapDocument.getIdentifier(), mapDocument);
|
||||
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
public static JavaPairRDD<String, String> computeRelations2(JavaSparkContext context, JavaPairRDD<String, List<MapDocument>> blocks, DedupConfig config) {
|
||||
Map<String, LongAccumulator> accumulators = DedupUtility.constructAccumulator(config, context.sc());
|
||||
|
||||
return blocks.flatMapToPair((PairFlatMapFunction<Tuple2<String, List<MapDocument>>, String, String>) it -> {
|
||||
try {
|
||||
final SparkReporter reporter = new SparkReporter(accumulators);
|
||||
new BlockProcessor(config).processSortedBlock(it._1(), it._2(), reporter);
|
||||
return reporter.getRelations().iterator();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(it._2().get(0).getIdentifier(), e);
|
||||
}
|
||||
}).mapToPair(
|
||||
(PairFunction<Tuple2<String, String>, String, Tuple2<String, String>>) item ->
|
||||
new Tuple2<String, Tuple2<String, String>>(item._1() + item._2(), item))
|
||||
.reduceByKey((a, b) -> a)
|
||||
.mapToPair((PairFunction<Tuple2<String, Tuple2<String, String>>, String, String>) Tuple2::_2);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
package eu.dnetlib.dedup;
|
||||
|
||||
public enum OafEntityType {
|
||||
|
||||
datasource,
|
||||
organization,
|
||||
project,
|
||||
dataset,
|
||||
otherresearchproduct,
|
||||
software,
|
||||
publication
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
package eu.dnetlib.dedup;
|
||||
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.hash.HashFunction;
|
||||
import com.google.common.hash.Hashing;
|
||||
import eu.dnetlib.dedup.graph.ConnectedComponent;
|
||||
import eu.dnetlib.dedup.graph.GraphProcessor;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.api.java.function.PairFunction;
|
||||
import org.apache.spark.graphx.Edge;
|
||||
import org.apache.spark.rdd.RDD;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class SparkCreateConnectedComponent {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
.appName(SparkCreateConnectedComponent.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.getOrCreate();
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
final String inputPath = parser.get("sourcePath");
|
||||
final String entity = parser.get("entity");
|
||||
final String targetPath = parser.get("targetPath");
|
||||
// final DedupConfig dedupConf = DedupConfig.load(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf2.json")));
|
||||
final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
|
||||
|
||||
final JavaPairRDD<Object, String> vertexes = sc.textFile(inputPath + "/" + entity)
|
||||
.map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s))
|
||||
.mapToPair((PairFunction<String, Object, String>)
|
||||
s -> new Tuple2<Object, String>(getHashcode(s), s)
|
||||
);
|
||||
|
||||
final Dataset<Relation> similarityRelations = spark.read().load(DedupUtility.createSimRelPath(targetPath,entity)).as(Encoders.bean(Relation.class));
|
||||
final RDD<Edge<String>> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd();
|
||||
final JavaRDD<ConnectedComponent> cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD();
|
||||
final Dataset<Relation> mergeRelation = spark.createDataset(cc.filter(k->k.getDocIds().size()>1).flatMap((FlatMapFunction<ConnectedComponent, Relation>) c ->
|
||||
c.getDocIds()
|
||||
.stream()
|
||||
.flatMap(id -> {
|
||||
List<Relation> tmp = new ArrayList<>();
|
||||
Relation r = new Relation();
|
||||
r.setSource(c.getCcId());
|
||||
r.setTarget(id);
|
||||
r.setRelClass("merges");
|
||||
tmp.add(r);
|
||||
r = new Relation();
|
||||
r.setTarget(c.getCcId());
|
||||
r.setSource(id);
|
||||
r.setRelClass("isMergedIn");
|
||||
tmp.add(r);
|
||||
return tmp.stream();
|
||||
}).iterator()).rdd(), Encoders.bean(Relation.class));
|
||||
mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(targetPath,entity));
|
||||
}
|
||||
|
||||
public static long getHashcode(final String id) {
|
||||
return Hashing.murmur3_128().hashUnencodedChars(id).asLong();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
package eu.dnetlib.dedup;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
public class SparkCreateDedupRecord {
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedupRecord_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
.appName(SparkCreateDedupRecord.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.getOrCreate();
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
final String sourcePath = parser.get("sourcePath");
|
||||
final String entity = parser.get("entity");
|
||||
final String dedupPath = parser.get("dedupPath");
|
||||
final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
|
||||
|
||||
final JavaRDD<OafEntity> dedupRecord = DedupRecordFactory.createDedupRecord(sc, spark, DedupUtility.createMergeRelPath(dedupPath,entity), DedupUtility.createEntityPath(sourcePath,entity), OafEntityType.valueOf(entity), dedupConf);
|
||||
dedupRecord.map(r-> {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
return mapper.writeValueAsString(r);
|
||||
}).saveAsTextFile(dedupPath+"/"+entity+"/dedup_records");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
package eu.dnetlib.dedup;
|
||||
|
||||
import com.google.common.hash.Hashing;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import eu.dnetlib.pace.model.MapDocument;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.PairFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
|
||||
/**
|
||||
* This Spark class creates similarity relations between entities, saving result
|
||||
*
|
||||
* param request:
|
||||
* sourcePath
|
||||
* entityType
|
||||
* target Path
|
||||
*/
|
||||
public class SparkCreateSimRels {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
.appName(SparkCreateSimRels.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.getOrCreate();
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
final String inputPath = parser.get("sourcePath");
|
||||
final String entity = parser.get("entity");
|
||||
final String targetPath = parser.get("targetPath");
|
||||
// final DedupConfig dedupConf = DedupConfig.load(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
|
||||
final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
|
||||
|
||||
|
||||
|
||||
JavaPairRDD<String, MapDocument> mapDocument = sc.textFile(inputPath + "/" + entity)
|
||||
.mapToPair(s->{
|
||||
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf,s);
|
||||
return new Tuple2<>(d.getIdentifier(), d);});
|
||||
|
||||
//create blocks for deduplication
|
||||
JavaPairRDD<String, List<MapDocument>> blocks = Deduper.createsortedBlocks(sc, mapDocument, dedupConf);
|
||||
// JavaPairRDD<String, Iterable<MapDocument>> blocks = Deduper.createBlocks(sc, mapDocument, dedupConf);
|
||||
|
||||
//create relations by comparing only elements in the same group
|
||||
final JavaPairRDD<String,String> dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf);
|
||||
// final JavaPairRDD<String,String> dedupRels = Deduper.computeRelations(sc, blocks, dedupConf);
|
||||
|
||||
final JavaRDD<Relation> isSimilarToRDD = dedupRels.map(simRel -> {
|
||||
final Relation r = new Relation();
|
||||
r.setSource(simRel._1());
|
||||
r.setTarget(simRel._2());
|
||||
r.setRelClass("isSimilarTo");
|
||||
return r;
|
||||
});
|
||||
|
||||
spark.createDataset(isSimilarToRDD.rdd(), Encoders.bean(Relation.class)).write().mode("overwrite").save( DedupUtility.createSimRelPath(targetPath,entity));
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
package eu.dnetlib.dedup;
|
||||
|
||||
import eu.dnetlib.pace.util.Reporter;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.spark.util.LongAccumulator;
|
||||
import scala.Serializable;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class SparkReporter implements Serializable, Reporter {
|
||||
|
||||
final List<Tuple2<String, String>> relations = new ArrayList<>();
|
||||
private static final Log log = LogFactory.getLog(SparkReporter.class);
|
||||
Map<String, LongAccumulator> accumulators;
|
||||
|
||||
public SparkReporter(Map<String, LongAccumulator> accumulators){
|
||||
this.accumulators = accumulators;
|
||||
}
|
||||
|
||||
public void incrementCounter(String counterGroup, String counterName, long delta, Map<String, LongAccumulator> accumulators) {
|
||||
|
||||
final String accumulatorName = String.format("%s::%s", counterGroup, counterName);
|
||||
if (accumulators.containsKey(accumulatorName)){
|
||||
accumulators.get(accumulatorName).add(delta);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementCounter(String counterGroup, String counterName, long delta) {
|
||||
|
||||
incrementCounter(counterGroup, counterName, delta, accumulators);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void emit(String type, String from, String to) {
|
||||
relations.add(new Tuple2<>(from, to));
|
||||
}
|
||||
|
||||
public List<Tuple2<String, String>> getRelations() {
|
||||
return relations;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
package eu.dnetlib.dedup.graph;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dedup.DedupUtility;
|
||||
import eu.dnetlib.pace.util.PaceException;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.codehaus.jackson.annotate.JsonIgnore;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.Set;
|
||||
|
||||
public class ConnectedComponent implements Serializable {
|
||||
|
||||
private Set<String> docIds;
|
||||
private String ccId;
|
||||
|
||||
|
||||
public ConnectedComponent() {
|
||||
}
|
||||
|
||||
public ConnectedComponent(Set<String> docIds) {
|
||||
this.docIds = docIds;
|
||||
createID();
|
||||
}
|
||||
|
||||
public String createID() {
|
||||
if (docIds.size() > 1) {
|
||||
final String s = getMin();
|
||||
String prefix = s.split("\\|")[0];
|
||||
ccId =prefix + "|dedup_______::" + DedupUtility.md5(s);
|
||||
return ccId;
|
||||
} else {
|
||||
return docIds.iterator().next();
|
||||
}
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public String getMin(){
|
||||
|
||||
final StringBuilder min = new StringBuilder();
|
||||
docIds.forEach(i -> {
|
||||
if (StringUtils.isBlank(min.toString())) {
|
||||
min.append(i);
|
||||
} else {
|
||||
if (min.toString().compareTo(i) > 0) {
|
||||
min.setLength(0);
|
||||
min.append(i);
|
||||
}
|
||||
}
|
||||
});
|
||||
return min.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString(){
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
try {
|
||||
return mapper.writeValueAsString(this);
|
||||
} catch (IOException e) {
|
||||
throw new PaceException("Failed to create Json: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
public Set<String> getDocIds() {
|
||||
return docIds;
|
||||
}
|
||||
|
||||
public void setDocIds(Set<String> docIds) {
|
||||
this.docIds = docIds;
|
||||
}
|
||||
|
||||
public String getCcId() {
|
||||
return ccId;
|
||||
}
|
||||
|
||||
public void setCcId(String ccId) {
|
||||
this.ccId = ccId;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package eu.dnetlib.dedup.graph
|
||||
|
||||
import org.apache.spark.graphx._
|
||||
import org.apache.spark.rdd.RDD
|
||||
|
||||
import scala.collection.JavaConversions;
|
||||
|
||||
object GraphProcessor {
|
||||
|
||||
def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int): RDD[ConnectedComponent] = {
|
||||
val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
|
||||
val cc = graph.connectedComponents(maxIterations).vertices
|
||||
|
||||
val joinResult = vertexes.leftOuterJoin(cc).map {
|
||||
case (id, (openaireId, cc)) => {
|
||||
if (cc.isEmpty) {
|
||||
(id, openaireId)
|
||||
}
|
||||
else {
|
||||
(cc.get, openaireId)
|
||||
}
|
||||
}
|
||||
}
|
||||
val connectedComponents = joinResult.groupByKey()
|
||||
.map[ConnectedComponent](cc => asConnectedComponent(cc))
|
||||
connectedComponents
|
||||
}
|
||||
|
||||
|
||||
|
||||
def asConnectedComponent(group: (VertexId, Iterable[String])): ConnectedComponent = {
|
||||
val docs = group._2.toSet[String]
|
||||
val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs));
|
||||
connectedComponent
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -25,10 +25,8 @@
|
|||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="DeleteWorkingPath"/>
|
||||
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
@ -163,33 +161,6 @@
|
|||
<ok to="replaceEntity"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<!-- <action name="updateDeletedByInferenceRelation">-->
|
||||
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
|
||||
<!-- <job-tracker>${jobTracker}</job-tracker>-->
|
||||
<!-- <name-node>${nameNode}</name-node>-->
|
||||
<!-- <master>yarn-cluster</master>-->
|
||||
<!-- <mode>cluster</mode>-->
|
||||
<!-- <name>Update ${entity} set deleted by Inference</name>-->
|
||||
<!-- <class>eu.dnetlib.dedup.SparkUpdateEntityJob</class>-->
|
||||
<!-- <jar>dhp-dedup-${projectVersion}.jar</jar>-->
|
||||
<!-- <spark-opts>-->
|
||||
<!-- --executor-memory ${sparkExecutorMemory}-->
|
||||
<!-- --driver-memory=${sparkDriverMemory}-->
|
||||
<!-- ${sparkExtraOPT}-->
|
||||
<!-- </spark-opts>-->
|
||||
<!-- <arg>-mt</arg><arg>yarn-cluster</arg>-->
|
||||
<!-- <arg>--entityPath</arg><arg>${targetPath}/${entity}/relation_propagated</arg>-->
|
||||
<!-- <arg>--mergeRelPath</arg><arg>${targetPath}/${entity}/mergeRel</arg>-->
|
||||
<!-- <arg>--entity</arg><arg>relation</arg>-->
|
||||
<!-- <arg>--dedupRecordPath</arg><arg>${targetPath}/${entity}/dedup_records</arg>-->
|
||||
<!-- <arg>--targetPath</arg><arg>${targetPath}/${entity}/updated_relation</arg>-->
|
||||
<!-- </spark>-->
|
||||
<!-- <ok to="End"/>-->
|
||||
<!-- <error to="Kill"/>-->
|
||||
<!-- </action>-->
|
||||
|
||||
|
||||
<action name="replaceEntity">
|
||||
<fs>
|
||||
<delete path='${sourcePath}/${entity}'/>
|
|
@ -1,4 +1,4 @@
|
|||
package eu.dnetlib.dhp.graph.sx;
|
||||
package eu.dnetlib.dhp.sx.graph;
|
||||
|
||||
import com.mongodb.DBObject;
|
||||
import com.mongodb.MongoClient;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package eu.dnetlib.dhp.graph.sx;
|
||||
package eu.dnetlib.dhp.sx.graph;
|
||||
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package eu.dnetlib.dhp.graph.sx;
|
||||
package eu.dnetlib.dhp.sx.graph;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package eu.dnetlib.dhp.graph.sx;
|
||||
package eu.dnetlib.dhp.sx.graph;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
package eu.dnetlib.dhp.graph.sx;
|
||||
package eu.dnetlib.dhp.sx.graph;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.graph.sx.parser.DatasetScholexplorerParser;
|
||||
import eu.dnetlib.dhp.graph.sx.parser.PublicationScholexplorerParser;
|
||||
import eu.dnetlib.dhp.sx.graph.parser.DatasetScholexplorerParser;
|
||||
import eu.dnetlib.dhp.sx.graph.parser.PublicationScholexplorerParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
import eu.dnetlib.scholexplorer.relation.RelationMapper;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package eu.dnetlib.dhp.graph.sx.parser;
|
||||
package eu.dnetlib.dhp.sx.graph.parser;
|
||||
|
||||
|
||||
import eu.dnetlib.dhp.parser.utility.VtdUtilityParser;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package eu.dnetlib.dhp.graph.sx.parser;
|
||||
package eu.dnetlib.dhp.sx.graph.parser;
|
||||
|
||||
import com.ximpleware.AutoPilot;
|
||||
import com.ximpleware.VTDGen;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package eu.dnetlib.dhp.graph.sx.parser;
|
||||
package eu.dnetlib.dhp.sx.graph.parser;
|
||||
|
||||
import com.ximpleware.AutoPilot;
|
||||
import com.ximpleware.VTDGen;
|
||||
|
|
|
@ -55,7 +55,7 @@
|
|||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.graph.sx.ImportDataFromMongo</main-class>
|
||||
<main-class>eu.dnetlib.dhp.sx.graph.ImportDataFromMongo</main-class>
|
||||
<arg>-t</arg><arg>${targetPath}</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-u</arg><arg>${user}</arg>
|
||||
|
|
|
@ -54,7 +54,7 @@
|
|||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Extract ${entities}</name>
|
||||
<class>eu.dnetlib.dhp.graph.sx.SparkExtractEntitiesJob</class>
|
||||
<class>eu.dnetlib.dhp.sx.graph.SparkExtractEntitiesJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory ${sparkExecutorMemory}
|
||||
|
|
|
@ -45,7 +45,7 @@
|
|||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Merge ${entity}</name>
|
||||
<class>eu.dnetlib.dhp.graph.sx.SparkScholexplorerCreateRawGraphJob</class>
|
||||
<class>eu.dnetlib.dhp.sx.graph.SparkScholexplorerCreateRawGraphJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts> --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT}</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
package eu.dnetlib.dhp.graph.sx;
|
||||
package eu.dnetlib.dhp.sx.graph;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.SerializationFeature;
|
||||
import eu.dnetlib.dhp.graph.sx.parser.DatasetScholexplorerParser;
|
||||
import eu.dnetlib.dhp.sx.graph.parser.DatasetScholexplorerParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
import eu.dnetlib.scholexplorer.relation.RelationMapper;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package eu.dnetlib.dhp.graph.sx;
|
||||
package eu.dnetlib.dhp.sx.graph;
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package eu.dnetlib.dhp.graph.sx;
|
||||
package eu.dnetlib.dhp.sx.graph;
|
||||
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue