From 15d9106b3febd27ad11437bf1501db91fc5c338f Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 27 Mar 2020 13:48:44 +0100 Subject: [PATCH] FIxed merge of dhp dedup --- .../java/eu/dnetlib/dedup/DatePicker.java | 119 ++++++++ .../eu/dnetlib/dedup/DedupRecordFactory.java | 283 ++++++++++++++++++ .../java/eu/dnetlib/dedup/DedupUtility.java | 217 ++++++++++++++ .../main/java/eu/dnetlib/dedup/Deduper.java | 162 ++++++++++ .../java/eu/dnetlib/dedup/OafEntityType.java | 15 + .../dedup/SparkCreateConnectedComponent.java | 79 +++++ .../dnetlib/dedup/SparkCreateDedupRecord.java | 34 +++ .../eu/dnetlib/dedup/SparkCreateSimRels.java | 73 +++++ .../java/eu/dnetlib/dedup/SparkReporter.java | 47 +++ .../dedup/graph/ConnectedComponent.java | 80 +++++ .../dnetlib/dedup/graph/GraphProcessor.scala | 37 +++ .../dedup_delete_by_inference_parameters.json | 0 .../dedup_propagate_relation_parameters.json | 0 .../dhp/sx/dedup/oozie_app/config-default.xml | 18 ++ .../dhp/{ => sx}/dedup/oozie_app/workflow.xml | 29 -- .../dhp/sx/graph/ImportDataFromMongo.java | 2 +- .../dhp/sx/graph/SparkExtractEntitiesJob.java | 2 +- .../sx/graph/SparkSXGeneratePidSimlarity.java | 2 +- .../SparkScholexplorerCreateRawGraphJob.java | 2 +- .../SparkScholexplorerGraphImporter.java | 6 +- .../parser/AbstractScholexplorerParser.java | 2 +- .../parser/DatasetScholexplorerParser.java | 2 +- .../PublicationScholexplorerParser.java | 2 +- .../dhp/sx/graph/step1/oozie_app/workflow.xml | 2 +- .../dhp/sx/graph/step2/oozie_app/workflow.xml | 2 +- .../dhp/sx/graph/step3/oozie_app/workflow.xml | 2 +- .../dhp/sx/graph/ScholexplorerParserTest.java | 4 +- .../SparkScholexplorerGraphImporterTest.java | 2 +- ...parkScholexplorerMergeEntitiesJobTest.java | 2 +- 29 files changed, 1181 insertions(+), 46 deletions(-) create mode 100644 dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DatePicker.java create mode 100644 dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java create mode 100644 dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupUtility.java create mode 100644 dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/Deduper.java create mode 100644 dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/OafEntityType.java create mode 100644 dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java create mode 100644 dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java create mode 100644 dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java create mode 100644 dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkReporter.java create mode 100644 dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/ConnectedComponent.java create mode 100644 dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/GraphProcessor.scala rename dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/{ => sx}/dedup/dedup_delete_by_inference_parameters.json (100%) rename dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/{ => sx}/dedup/dedup_propagate_relation_parameters.json (100%) create mode 100644 dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/config-default.xml rename dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/{ => sx}/dedup/oozie_app/workflow.xml (83%) diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DatePicker.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DatePicker.java new file mode 100644 index 000000000..73f178edc --- /dev/null +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DatePicker.java @@ -0,0 +1,119 @@ +package eu.dnetlib.dedup; + +import eu.dnetlib.dhp.schema.oaf.Field; +import org.apache.commons.lang.StringUtils; + +import java.time.Year; +import java.util.*; +import java.util.stream.Collectors; + +import static java.util.Collections.reverseOrder; +import static java.util.Map.Entry.comparingByValue; +import static java.util.stream.Collectors.toMap; +import static org.apache.commons.lang.StringUtils.endsWith; +import static org.apache.commons.lang.StringUtils.substringBefore; + +public class DatePicker { + + private static final String DATE_PATTERN = "\\d{4}-\\d{2}-\\d{2}"; + private static final String DATE_DEFAULT_SUFFIX = "01-01"; + private static final int YEAR_LB = 1300; + private static final int YEAR_UB = Year.now().getValue() + 5; + + public static Field pick(final Collection dateofacceptance) { + + final Map frequencies = dateofacceptance + .parallelStream() + .filter(StringUtils::isNotBlank) + .collect( + Collectors.toConcurrentMap( + w -> w, w -> 1, Integer::sum)); + + if (frequencies.isEmpty()) { + return new Field<>(); + } + + final Field date = new Field<>(); + date.setValue(frequencies.keySet().iterator().next()); + + // let's sort this map by values first, filtering out invalid dates + final Map sorted = frequencies + .entrySet() + .stream() + .filter(d -> StringUtils.isNotBlank(d.getKey())) + .filter(d -> d.getKey().matches(DATE_PATTERN)) + .filter(d -> inRange(d.getKey())) + .sorted(reverseOrder(comparingByValue())) + .collect( + toMap( + Map.Entry::getKey, + Map.Entry::getValue, (e1, e2) -> e2, + LinkedHashMap::new)); + + // shortcut + if (sorted.size() == 0) { + return date; + } + + // voting method (1/3 + 1) wins + if (sorted.size() >= 3) { + final int acceptThreshold = (sorted.size() / 3) + 1; + final List accepted = sorted.entrySet().stream() + .filter(e -> e.getValue() >= acceptThreshold) + .map(e -> e.getKey()) + .collect(Collectors.toList()); + + // cannot find strong majority + if (accepted.isEmpty()) { + final int max = sorted.values().iterator().next(); + Optional first = sorted.entrySet().stream() + .filter(e -> e.getValue() == max && !endsWith(e.getKey(), DATE_DEFAULT_SUFFIX)) + .map(Map.Entry::getKey) + .findFirst(); + if (first.isPresent()) { + date.setValue(first.get()); + return date; + } + + date.setValue(sorted.keySet().iterator().next()); + return date; + } + + if (accepted.size() == 1) { + date.setValue(accepted.get(0)); + return date; + } else { + final Optional first = accepted.stream() + .filter(d -> !endsWith(d, DATE_DEFAULT_SUFFIX)) + .findFirst(); + if (first.isPresent()) { + date.setValue(first.get()); + return date; + } + + return date; + } + + //1st non YYYY-01-01 is returned + } else { + if (sorted.size() == 2) { + for (Map.Entry e : sorted.entrySet()) { + if (!endsWith(e.getKey(), DATE_DEFAULT_SUFFIX)) { + date.setValue(e.getKey()); + return date; + } + } + } + + // none of the dates seems good enough, return the 1st one + date.setValue(sorted.keySet().iterator().next()); + return date; + } + } + + private static boolean inRange(final String date) { + final int year = Integer.parseInt(substringBefore(date, "-")); + return year >= YEAR_LB && year <= YEAR_UB; + } + +} \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java new file mode 100644 index 000000000..ebb504078 --- /dev/null +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java @@ -0,0 +1,283 @@ +package eu.dnetlib.dedup; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.google.common.collect.Lists; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.util.MapDocumentUtil; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import com.fasterxml.jackson.databind.ObjectMapper; +import scala.Tuple2; + +import java.util.Collection; + +public class DedupRecordFactory { + + public static JavaRDD createDedupRecord(final JavaSparkContext sc, final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final OafEntityType entityType, final DedupConfig dedupConf) { + long ts = System.currentTimeMillis(); + // + final JavaPairRDD inputJsonEntities = sc.textFile(entitiesInputPath) + .mapToPair((PairFunction) it -> + new Tuple2(MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), it), it) + ); + + //: source is the dedup_id, target is the id of the mergedIn + JavaPairRDD mergeRels = spark + .read().load(mergeRelsInputPath).as(Encoders.bean(Relation.class)) + .where("relClass=='merges'") + .javaRDD() + .mapToPair( + (PairFunction) r -> + new Tuple2(r.getTarget(), r.getSource()) + ); + + // + final JavaPairRDD joinResult = mergeRels.join(inputJsonEntities).mapToPair((PairFunction>, String, String>) Tuple2::_2); + + JavaPairRDD> sortedJoinResult = joinResult.groupByKey(); + + switch (entityType) { + case publication: + return sortedJoinResult.map(p -> DedupRecordFactory.publicationMerger(p, ts)); + case dataset: + return sortedJoinResult.map(d -> DedupRecordFactory.datasetMerger(d, ts)); + case project: + return sortedJoinResult.map(p -> DedupRecordFactory.projectMerger(p, ts)); + case software: + return sortedJoinResult.map(s -> DedupRecordFactory.softwareMerger(s, ts)); + case datasource: + return sortedJoinResult.map(d -> DedupRecordFactory.datasourceMerger(d, ts)); + case organization: + return sortedJoinResult.map(o -> DedupRecordFactory.organizationMerger(o, ts)); + case otherresearchproduct: + return sortedJoinResult.map(o -> DedupRecordFactory.otherresearchproductMerger(o, ts)); + default: + return null; + } + + } + + private static Publication publicationMerger(Tuple2> e, final long ts) { + + Publication p = new Publication(); //the result of the merge, to be returned at the end + + p.setId(e._1()); + + final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + + final Collection dateofacceptance = Lists.newArrayList(); + + if (e._2() != null) + e._2().forEach(pub -> { + try { + Publication publication = mapper.readValue(pub, Publication.class); + + p.mergeFrom(publication); + p.setAuthor(DedupUtility.mergeAuthor(p.getAuthor(), publication.getAuthor())); + //add to the list if they are not null + if (publication.getDateofacceptance() != null) + dateofacceptance.add(publication.getDateofacceptance().getValue()); + } catch (Exception exc) { + throw new RuntimeException(exc); + } + }); + p.setDateofacceptance(DatePicker.pick(dateofacceptance)); + if (p.getDataInfo() == null) + p.setDataInfo(new DataInfo()); + p.getDataInfo().setTrust("0.9"); + p.setLastupdatetimestamp(ts); + return p; + } + + private static Dataset datasetMerger(Tuple2> e, final long ts) { + + Dataset d = new Dataset(); //the result of the merge, to be returned at the end + + d.setId(e._1()); + + final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + final Collection dateofacceptance = Lists.newArrayList(); + + if (e._2() != null) + e._2().forEach(dat -> { + try { + Dataset dataset = mapper.readValue(dat, Dataset.class); + + d.mergeFrom(dataset); + d.setAuthor(DedupUtility.mergeAuthor(d.getAuthor(), dataset.getAuthor())); + //add to the list if they are not null + if (dataset.getDateofacceptance() != null) + dateofacceptance.add(dataset.getDateofacceptance().getValue()); + } catch (Exception exc) { + throw new RuntimeException(exc); + } + }); + d.setDateofacceptance(DatePicker.pick(dateofacceptance)); + if (d.getDataInfo() == null) + d.setDataInfo(new DataInfo()); + d.getDataInfo().setTrust("0.9"); + d.setLastupdatetimestamp(ts); + return d; + } + + private static Project projectMerger(Tuple2> e, final long ts) { + + Project p = new Project(); //the result of the merge, to be returned at the end + + p.setId(e._1()); + + final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + if (e._2() != null) + e._2().forEach(proj -> { + try { + Project project = mapper.readValue(proj, Project.class); + + p.mergeFrom(project); + } catch (Exception exc) { + throw new RuntimeException(exc); + } + }); + if (p.getDataInfo() == null) + p.setDataInfo(new DataInfo()); + p.getDataInfo().setTrust("0.9"); + p.setLastupdatetimestamp(ts); + return p; + } + + private static Software softwareMerger(Tuple2> e, final long ts) { + + Software s = new Software(); //the result of the merge, to be returned at the end + + s.setId(e._1()); + final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + final Collection dateofacceptance = Lists.newArrayList(); + if (e._2() != null) + e._2().forEach(soft -> { + try { + Software software = mapper.readValue(soft, Software.class); + + s.mergeFrom(software); + s.setAuthor(DedupUtility.mergeAuthor(s.getAuthor(), software.getAuthor())); + //add to the list if they are not null + if (software.getDateofacceptance() != null) + dateofacceptance.add(software.getDateofacceptance().getValue()); + } catch (Exception exc) { + throw new RuntimeException(exc); + } + }); + s.setDateofacceptance(DatePicker.pick(dateofacceptance)); + if (s.getDataInfo() == null) + s.setDataInfo(new DataInfo()); + s.getDataInfo().setTrust("0.9"); + s.setLastupdatetimestamp(ts); + return s; + } + + private static Datasource datasourceMerger(Tuple2> e, final long ts) { + Datasource d = new Datasource(); //the result of the merge, to be returned at the end + d.setId(e._1()); + final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + if (e._2() != null) + e._2().forEach(dat -> { + try { + Datasource datasource = mapper.readValue(dat, Datasource.class); + + d.mergeFrom(datasource); + } catch (Exception exc) { + throw new RuntimeException(exc); + } + }); + if (d.getDataInfo() == null) + d.setDataInfo(new DataInfo()); + d.getDataInfo().setTrust("0.9"); + d.setLastupdatetimestamp(ts); + return d; + } + + private static Organization organizationMerger(Tuple2> e, final long ts) { + + Organization o = new Organization(); //the result of the merge, to be returned at the end + + o.setId(e._1()); + + final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + + StringBuilder trust = new StringBuilder("0.0"); + + if (e._2() != null) + e._2().forEach(pub -> { + try { + Organization organization = mapper.readValue(pub, Organization.class); + + final String currentTrust = organization.getDataInfo().getTrust(); + if (!"1.0".equals(currentTrust)) { + trust.setLength(0); + trust.append(currentTrust); + } + o.mergeFrom(organization); + + } catch (Exception exc) { + throw new RuntimeException(exc); + } + }); + + if (o.getDataInfo() == null) + { + o.setDataInfo(new DataInfo()); + } + if (o.getDataInfo() == null) + o.setDataInfo(new DataInfo()); + o.getDataInfo().setTrust("0.9"); + o.setLastupdatetimestamp(ts); + + return o; + } + + private static OtherResearchProduct otherresearchproductMerger(Tuple2> e, final long ts) { + + OtherResearchProduct o = new OtherResearchProduct(); //the result of the merge, to be returned at the end + + o.setId(e._1()); + + final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + final Collection dateofacceptance = Lists.newArrayList(); + + if (e._2() != null) + e._2().forEach(orp -> { + try { + OtherResearchProduct otherResearchProduct = mapper.readValue(orp, OtherResearchProduct.class); + + o.mergeFrom(otherResearchProduct); + o.setAuthor(DedupUtility.mergeAuthor(o.getAuthor(), otherResearchProduct.getAuthor())); + //add to the list if they are not null + if (otherResearchProduct.getDateofacceptance() != null) + dateofacceptance.add(otherResearchProduct.getDateofacceptance().getValue()); + } catch (Exception exc) { + throw new RuntimeException(exc); + } + }); + if (o.getDataInfo() == null) + o.setDataInfo(new DataInfo()); + o.setDateofacceptance(DatePicker.pick(dateofacceptance)); + o.getDataInfo().setTrust("0.9"); + o.setLastupdatetimestamp(ts); + return o; + } + +} diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupUtility.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupUtility.java new file mode 100644 index 000000000..7ed102e03 --- /dev/null +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupUtility.java @@ -0,0 +1,217 @@ +package eu.dnetlib.dedup; + +import com.google.common.collect.Sets; +import com.wcohen.ss.JaroWinkler; +import eu.dnetlib.dhp.schema.oaf.Author; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner; +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.model.MapDocument; +import eu.dnetlib.pace.model.Person; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.util.LongAccumulator; +import scala.Tuple2; + +import java.io.IOException; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.text.Normalizer; +import java.util.*; +import java.util.stream.Collectors; + +public class DedupUtility { + private static final Double THRESHOLD = 0.95; + + public static Map constructAccumulator(final DedupConfig dedupConf, final SparkContext context) { + + Map accumulators = new HashMap<>(); + + String acc1 = String.format("%s::%s", dedupConf.getWf().getEntityType(), "records per hash key = 1"); + accumulators.put(acc1, context.longAccumulator(acc1)); + String acc2 = String.format("%s::%s", dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField()); + accumulators.put(acc2, context.longAccumulator(acc2)); + String acc3 = String.format("%s::%s", dedupConf.getWf().getEntityType(), String.format("Skipped records for count(%s) >= %s", dedupConf.getWf().getOrderField(), dedupConf.getWf().getGroupMaxSize())); + accumulators.put(acc3, context.longAccumulator(acc3)); + String acc4 = String.format("%s::%s", dedupConf.getWf().getEntityType(), "skip list"); + accumulators.put(acc4, context.longAccumulator(acc4)); + String acc5 = String.format("%s::%s", dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)"); + accumulators.put(acc5, context.longAccumulator(acc5)); + String acc6 = String.format("%s::%s", dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold()); + accumulators.put(acc6, context.longAccumulator(acc6)); + + return accumulators; + } + + public static JavaRDD loadDataFromHDFS(String path, JavaSparkContext context) { + return context.textFile(path); + } + + public static void deleteIfExists(String path) throws IOException { + Configuration conf = new Configuration(); + FileSystem fileSystem = FileSystem.get(conf); + if (fileSystem.exists(new Path(path))) { + fileSystem.delete(new Path(path), true); + } + } + + public static DedupConfig loadConfigFromHDFS(String path) throws IOException { + + Configuration conf = new Configuration(); + FileSystem fileSystem = FileSystem.get(conf); + FSDataInputStream inputStream = new FSDataInputStream(fileSystem.open(new Path(path))); + + return DedupConfig.load(IOUtils.toString(inputStream, StandardCharsets.UTF_8.name())); + + } + + static String readFromClasspath(final String filename, final Class clazz) { + final StringWriter sw = new StringWriter(); + try { + IOUtils.copy(clazz.getResourceAsStream(filename), sw); + return sw.toString(); + } catch (final IOException e) { + throw new RuntimeException("cannot load resource from classpath: " + filename); + } + } + + static Set getGroupingKeys(DedupConfig conf, MapDocument doc) { + return Sets.newHashSet(BlacklistAwareClusteringCombiner.filterAndCombine(doc, conf)); + } + + public static String md5(final String s) { + try { + final MessageDigest md = MessageDigest.getInstance("MD5"); + md.update(s.getBytes("UTF-8")); + return new String(Hex.encodeHex(md.digest())); + } catch (final Exception e) { + System.err.println("Error creating id"); + return null; + } + } + + + public static List mergeAuthor(final List a, final List b) { + int pa = countAuthorsPids(a); + int pb = countAuthorsPids(b); + List base, enrich; + int sa = authorsSize(a); + int sb = authorsSize(b); + + if (pa == pb) { + base = sa > sb ? a : b; + enrich = sa > sb ? b : a; + } else { + base = pa > pb ? a : b; + enrich = pa > pb ? b : a; + } + enrichPidFromList(base, enrich); + return base; + } + + private static void enrichPidFromList(List base, List enrich) { + if (base == null || enrich == null) + return; + final Map basePidAuthorMap = base.stream() + .filter(a -> a.getPid() != null && a.getPid().size() > 0) + .flatMap(a -> a.getPid() + .stream() + .map(p -> new Tuple2<>(p.toComparableString(), a)) + ).collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1)); + + final List> pidToEnrich = enrich + .stream() + .filter(a -> a.getPid() != null && a.getPid().size() > 0) + .flatMap(a -> a.getPid().stream().filter(p -> !basePidAuthorMap.containsKey(p.toComparableString())).map(p -> new Tuple2<>(p, a))) + .collect(Collectors.toList()); + + + pidToEnrich.forEach(a -> { + Optional> simAuhtor = base.stream().map(ba -> new Tuple2<>(sim(ba, a._2()), ba)).max(Comparator.comparing(Tuple2::_1)); + if (simAuhtor.isPresent() && simAuhtor.get()._1() > THRESHOLD) { + Author r = simAuhtor.get()._2(); + r.getPid().add(a._1()); + } + }); + } + + public static String createEntityPath(final String basePath, final String entityType) { + return String.format("%s/%s", basePath, entityType); + } + + public static String createSimRelPath(final String basePath, final String entityType) { + return String.format("%s/%s/simRel", basePath, entityType); + } + + public static String createMergeRelPath(final String basePath, final String entityType) { + return String.format("%s/%s/mergeRel", basePath, entityType); + } + + private static Double sim(Author a, Author b) { + + final Person pa = parse(a); + final Person pb = parse(b); + + if (pa.isAccurate() & pb.isAccurate()) { + return new JaroWinkler().score( + normalize(pa.getSurnameString()), + normalize(pb.getSurnameString())); + } else { + return new JaroWinkler().score( + normalize(pa.getNormalisedFullname()), + normalize(pb.getNormalisedFullname())); + } + } + + private static String normalize(final String s) { + return nfd(s).toLowerCase() + // do not compact the regexes in a single expression, would cause StackOverflowError in case of large input strings + .replaceAll("(\\W)+", " ") + .replaceAll("(\\p{InCombiningDiacriticalMarks})+", " ") + .replaceAll("(\\p{Punct})+", " ") + .replaceAll("(\\d)+", " ") + .replaceAll("(\\n)+", " ") + .trim(); + } + + private static String nfd(final String s) { + return Normalizer.normalize(s, Normalizer.Form.NFD); + } + + private static Person parse(Author author) { + if (StringUtils.isNotBlank(author.getSurname())) { + return new Person(author.getSurname() + ", " + author.getName(), false); + } else { + return new Person(author.getFullname(), false); + } + } + + + private static int countAuthorsPids(List authors) { + if (authors == null) + return 0; + + return (int) authors.stream().filter(DedupUtility::hasPid).count(); + } + + private static int authorsSize(List authors) { + if (authors == null) + return 0; + return authors.size(); + } + + private static boolean hasPid(Author a) { + if (a == null || a.getPid() == null || a.getPid().size() == 0) + return false; + return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue())); + } +} diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/Deduper.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/Deduper.java new file mode 100644 index 000000000..7206f892f --- /dev/null +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/Deduper.java @@ -0,0 +1,162 @@ +package eu.dnetlib.dedup; + +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.model.Field; +import eu.dnetlib.pace.model.MapDocument; +import eu.dnetlib.pace.util.BlockProcessor; +import eu.dnetlib.pace.util.MapDocumentUtil; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.util.LongAccumulator; +import scala.Serializable; +import scala.Tuple2; + +import java.util.*; +import java.util.stream.Collectors; + +public class Deduper implements Serializable { + + private static final Log log = LogFactory.getLog(Deduper.class); + + /** + * @return the list of relations generated by the deduplication + * @param: the spark context + * @param: list of JSON entities to be deduped + * @param: the dedup configuration + */ + public static JavaPairRDD dedup(JavaSparkContext context, JavaRDD entities, DedupConfig config) { + + Map accumulators = DedupUtility.constructAccumulator(config, context.sc()); + + //create vertexes of the graph: + JavaPairRDD mapDocs = mapToVertexes(context, entities, config); + + + //create blocks for deduplication + JavaPairRDD> blocks = createBlocks(context, mapDocs, config); + + //create relations by comparing only elements in the same group + return computeRelations(context, blocks, config); + +// final RDD> edgeRdd = relationRDD.map(it -> new Edge<>(it._1().hashCode(), it._2().hashCode(), "equalTo")).rdd(); +// +// RDD> vertexes = mapDocs.mapToPair((PairFunction, Object, MapDocument>) t -> new Tuple2((long) t._1().hashCode(), t._2())).rdd(); +// accumulators.forEach((name, acc) -> log.info(name + " -> " + acc.value())); +// +// return GraphProcessor.findCCs(vertexes, edgeRdd, 20).toJavaRDD(); + } + + /** + * @return the list of relations generated by the deduplication + * @param: the spark context + * @param: list of blocks + * @param: the dedup configuration + */ + public static JavaPairRDD computeRelations(JavaSparkContext context, JavaPairRDD> blocks, DedupConfig config) { + + Map accumulators = DedupUtility.constructAccumulator(config, context.sc()); + + return blocks.flatMapToPair((PairFlatMapFunction>, String, String>) it -> { + final SparkReporter reporter = new SparkReporter(accumulators); + new BlockProcessor(config).process(it._1(), it._2(), reporter); + return reporter.getRelations().iterator(); + + }).mapToPair( + (PairFunction, String, Tuple2>) item -> + new Tuple2>(item._1() + item._2(), item)) + .reduceByKey((a, b) -> a) + .mapToPair((PairFunction>, String, String>) Tuple2::_2); + } + + + /** + * @return the list of blocks based on clustering of dedup configuration + * @param: the spark context + * @param: list of entities: + * @param: the dedup configuration + */ + public static JavaPairRDD> createBlocks(JavaSparkContext context, JavaPairRDD mapDocs, DedupConfig config) { + return mapDocs + //the reduce is just to be sure that we haven't document with same id + .reduceByKey((a, b) -> a) + .map(Tuple2::_2) + //Clustering: from to List + .flatMapToPair((PairFlatMapFunction) a -> + DedupUtility.getGroupingKeys(config, a) + .stream() + .map(it -> new Tuple2<>(it, a)) + .collect(Collectors.toList()) + .iterator()) + .groupByKey(); + } + + + public static JavaPairRDD> createsortedBlocks(JavaSparkContext context, JavaPairRDD mapDocs, DedupConfig config) { + final String of = config.getWf().getOrderField(); + final int maxQueueSize = config.getWf().getGroupMaxSize(); + return mapDocs + //the reduce is just to be sure that we haven't document with same id + .reduceByKey((a, b) -> a) + .map(Tuple2::_2) + //Clustering: from to List + .flatMapToPair((PairFlatMapFunction>) a -> + DedupUtility.getGroupingKeys(config, a) + .stream() + .map(it -> { + List tmp = new ArrayList<>(); + tmp.add(a); + return new Tuple2<>(it, tmp); + } + ) + .collect(Collectors.toList()) + .iterator()) + .reduceByKey((Function2, List, List>) (v1, v2) -> { + v1.addAll(v2); + v1.sort(Comparator.comparing(a -> a.getFieldMap().get(of).stringValue())); + if (v1.size() > maxQueueSize) + return new ArrayList<>(v1.subList(0, maxQueueSize)); + return v1; + }); + } + + /** + * @return the list of vertexes: + * @param: the spark context + * @param: list of JSON entities + * @param: the dedup configuration + */ + public static JavaPairRDD mapToVertexes(JavaSparkContext context, JavaRDD entities, DedupConfig config) { + + return entities.mapToPair((PairFunction) s -> { + + MapDocument mapDocument = MapDocumentUtil.asMapDocumentWithJPath(config, s); + return new Tuple2(mapDocument.getIdentifier(), mapDocument); + + + }); + } + + public static JavaPairRDD computeRelations2(JavaSparkContext context, JavaPairRDD> blocks, DedupConfig config) { + Map accumulators = DedupUtility.constructAccumulator(config, context.sc()); + + return blocks.flatMapToPair((PairFlatMapFunction>, String, String>) it -> { + try { + final SparkReporter reporter = new SparkReporter(accumulators); + new BlockProcessor(config).processSortedBlock(it._1(), it._2(), reporter); + return reporter.getRelations().iterator(); + } catch (Exception e) { + throw new RuntimeException(it._2().get(0).getIdentifier(), e); + } + }).mapToPair( + (PairFunction, String, Tuple2>) item -> + new Tuple2>(item._1() + item._2(), item)) + .reduceByKey((a, b) -> a) + .mapToPair((PairFunction>, String, String>) Tuple2::_2); + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/OafEntityType.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/OafEntityType.java new file mode 100644 index 000000000..fb347ed51 --- /dev/null +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/OafEntityType.java @@ -0,0 +1,15 @@ +package eu.dnetlib.dedup; + +public enum OafEntityType { + + datasource, + organization, + project, + dataset, + otherresearchproduct, + software, + publication + + + +} diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java new file mode 100644 index 000000000..16e112c25 --- /dev/null +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java @@ -0,0 +1,79 @@ +package eu.dnetlib.dedup; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import eu.dnetlib.dedup.graph.ConnectedComponent; +import eu.dnetlib.dedup.graph.GraphProcessor; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.util.MapDocumentUtil; +import org.apache.commons.io.IOUtils; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.graphx.Edge; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.List; + +public class SparkCreateConnectedComponent { + + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_parameters.json"))); + parser.parseArgument(args); + final SparkSession spark = SparkSession + .builder() + .appName(SparkCreateConnectedComponent.class.getSimpleName()) + .master(parser.get("master")) + .getOrCreate(); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String entity = parser.get("entity"); + final String targetPath = parser.get("targetPath"); +// final DedupConfig dedupConf = DedupConfig.load(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf2.json"))); + final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf")); + + final JavaPairRDD vertexes = sc.textFile(inputPath + "/" + entity) + .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) + .mapToPair((PairFunction) + s -> new Tuple2(getHashcode(s), s) + ); + + final Dataset similarityRelations = spark.read().load(DedupUtility.createSimRelPath(targetPath,entity)).as(Encoders.bean(Relation.class)); + final RDD> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd(); + final JavaRDD cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD(); + final Dataset mergeRelation = spark.createDataset(cc.filter(k->k.getDocIds().size()>1).flatMap((FlatMapFunction) c -> + c.getDocIds() + .stream() + .flatMap(id -> { + List tmp = new ArrayList<>(); + Relation r = new Relation(); + r.setSource(c.getCcId()); + r.setTarget(id); + r.setRelClass("merges"); + tmp.add(r); + r = new Relation(); + r.setTarget(c.getCcId()); + r.setSource(id); + r.setRelClass("isMergedIn"); + tmp.add(r); + return tmp.stream(); + }).iterator()).rdd(), Encoders.bean(Relation.class)); + mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(targetPath,entity)); + } + + public static long getHashcode(final String id) { + return Hashing.murmur3_128().hashUnencodedChars(id).asLong(); + } +} diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java new file mode 100644 index 000000000..8e60df945 --- /dev/null +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java @@ -0,0 +1,34 @@ +package eu.dnetlib.dedup; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.pace.config.DedupConfig; +import org.apache.commons.io.IOUtils; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +public class SparkCreateDedupRecord { + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedupRecord_parameters.json"))); + parser.parseArgument(args); + final SparkSession spark = SparkSession + .builder() + .appName(SparkCreateDedupRecord.class.getSimpleName()) + .master(parser.get("master")) + .getOrCreate(); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String sourcePath = parser.get("sourcePath"); + final String entity = parser.get("entity"); + final String dedupPath = parser.get("dedupPath"); + final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf")); + + final JavaRDD dedupRecord = DedupRecordFactory.createDedupRecord(sc, spark, DedupUtility.createMergeRelPath(dedupPath,entity), DedupUtility.createEntityPath(sourcePath,entity), OafEntityType.valueOf(entity), dedupConf); + dedupRecord.map(r-> { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(r); + }).saveAsTextFile(dedupPath+"/"+entity+"/dedup_records"); + } +} diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java new file mode 100644 index 000000000..2bdfa8759 --- /dev/null +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java @@ -0,0 +1,73 @@ +package eu.dnetlib.dedup; + +import com.google.common.hash.Hashing; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.model.MapDocument; +import eu.dnetlib.pace.util.MapDocumentUtil; +import org.apache.commons.io.IOUtils; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.util.List; + + +/** + * This Spark class creates similarity relations between entities, saving result + * + * param request: + * sourcePath + * entityType + * target Path + */ +public class SparkCreateSimRels { + + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_parameters.json"))); + parser.parseArgument(args); + final SparkSession spark = SparkSession + .builder() + .appName(SparkCreateSimRels.class.getSimpleName()) + .master(parser.get("master")) + .getOrCreate(); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String entity = parser.get("entity"); + final String targetPath = parser.get("targetPath"); +// final DedupConfig dedupConf = DedupConfig.load(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); + final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf")); + + + + JavaPairRDD mapDocument = sc.textFile(inputPath + "/" + entity) + .mapToPair(s->{ + MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf,s); + return new Tuple2<>(d.getIdentifier(), d);}); + + //create blocks for deduplication + JavaPairRDD> blocks = Deduper.createsortedBlocks(sc, mapDocument, dedupConf); +// JavaPairRDD> blocks = Deduper.createBlocks(sc, mapDocument, dedupConf); + + //create relations by comparing only elements in the same group + final JavaPairRDD dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf); +// final JavaPairRDD dedupRels = Deduper.computeRelations(sc, blocks, dedupConf); + + final JavaRDD isSimilarToRDD = dedupRels.map(simRel -> { + final Relation r = new Relation(); + r.setSource(simRel._1()); + r.setTarget(simRel._2()); + r.setRelClass("isSimilarTo"); + return r; + }); + + spark.createDataset(isSimilarToRDD.rdd(), Encoders.bean(Relation.class)).write().mode("overwrite").save( DedupUtility.createSimRelPath(targetPath,entity)); + + } +} diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkReporter.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkReporter.java new file mode 100644 index 000000000..165a10b25 --- /dev/null +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkReporter.java @@ -0,0 +1,47 @@ +package eu.dnetlib.dedup; + +import eu.dnetlib.pace.util.Reporter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.spark.util.LongAccumulator; +import scala.Serializable; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class SparkReporter implements Serializable, Reporter { + + final List> relations = new ArrayList<>(); + private static final Log log = LogFactory.getLog(SparkReporter.class); + Map accumulators; + + public SparkReporter(Map accumulators){ + this.accumulators = accumulators; + } + + public void incrementCounter(String counterGroup, String counterName, long delta, Map accumulators) { + + final String accumulatorName = String.format("%s::%s", counterGroup, counterName); + if (accumulators.containsKey(accumulatorName)){ + accumulators.get(accumulatorName).add(delta); + } + + } + + @Override + public void incrementCounter(String counterGroup, String counterName, long delta) { + + incrementCounter(counterGroup, counterName, delta, accumulators); + } + + @Override + public void emit(String type, String from, String to) { + relations.add(new Tuple2<>(from, to)); + } + + public List> getRelations() { + return relations; + } +} diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/ConnectedComponent.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/ConnectedComponent.java new file mode 100644 index 000000000..27a61c02d --- /dev/null +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/ConnectedComponent.java @@ -0,0 +1,80 @@ +package eu.dnetlib.dedup.graph; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dedup.DedupUtility; +import eu.dnetlib.pace.util.PaceException; +import org.apache.commons.lang.StringUtils; +import org.codehaus.jackson.annotate.JsonIgnore; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Set; + +public class ConnectedComponent implements Serializable { + + private Set docIds; + private String ccId; + + + public ConnectedComponent() { + } + + public ConnectedComponent(Set docIds) { + this.docIds = docIds; + createID(); + } + + public String createID() { + if (docIds.size() > 1) { + final String s = getMin(); + String prefix = s.split("\\|")[0]; + ccId =prefix + "|dedup_______::" + DedupUtility.md5(s); + return ccId; + } else { + return docIds.iterator().next(); + } + } + + @JsonIgnore + public String getMin(){ + + final StringBuilder min = new StringBuilder(); + docIds.forEach(i -> { + if (StringUtils.isBlank(min.toString())) { + min.append(i); + } else { + if (min.toString().compareTo(i) > 0) { + min.setLength(0); + min.append(i); + } + } + }); + return min.toString(); + } + + @Override + public String toString(){ + ObjectMapper mapper = new ObjectMapper(); + try { + return mapper.writeValueAsString(this); + } catch (IOException e) { + throw new PaceException("Failed to create Json: ", e); + } + } + + public Set getDocIds() { + return docIds; + } + + public void setDocIds(Set docIds) { + this.docIds = docIds; + } + + public String getCcId() { + return ccId; + } + + public void setCcId(String ccId) { + this.ccId = ccId; + } +} diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/GraphProcessor.scala b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/GraphProcessor.scala new file mode 100644 index 000000000..38c695152 --- /dev/null +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/GraphProcessor.scala @@ -0,0 +1,37 @@ +package eu.dnetlib.dedup.graph + +import org.apache.spark.graphx._ +import org.apache.spark.rdd.RDD + +import scala.collection.JavaConversions; + +object GraphProcessor { + + def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int): RDD[ConnectedComponent] = { + val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby + val cc = graph.connectedComponents(maxIterations).vertices + + val joinResult = vertexes.leftOuterJoin(cc).map { + case (id, (openaireId, cc)) => { + if (cc.isEmpty) { + (id, openaireId) + } + else { + (cc.get, openaireId) + } + } + } + val connectedComponents = joinResult.groupByKey() + .map[ConnectedComponent](cc => asConnectedComponent(cc)) + connectedComponents + } + + + + def asConnectedComponent(group: (VertexId, Iterable[String])): ConnectedComponent = { + val docs = group._2.toSet[String] + val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs)); + connectedComponent + } + +} \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/dedup/dedup_delete_by_inference_parameters.json b/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/dedup_delete_by_inference_parameters.json similarity index 100% rename from dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/dedup/dedup_delete_by_inference_parameters.json rename to dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/dedup_delete_by_inference_parameters.json diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/dedup/dedup_propagate_relation_parameters.json b/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/dedup_propagate_relation_parameters.json similarity index 100% rename from dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/dedup/dedup_propagate_relation_parameters.json rename to dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/dedup_propagate_relation_parameters.json diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/config-default.xml new file mode 100644 index 000000000..2e0ed9aee --- /dev/null +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/workflow.xml similarity index 83% rename from dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml rename to dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/workflow.xml index 46f334b1b..6c8dba653 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/workflow.xml @@ -25,10 +25,8 @@ memory for individual executor - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -163,33 +161,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ImportDataFromMongo.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ImportDataFromMongo.java index 8994e9667..c313c139e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ImportDataFromMongo.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ImportDataFromMongo.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.graph.sx; +package eu.dnetlib.dhp.sx.graph; import com.mongodb.DBObject; import com.mongodb.MongoClient; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkExtractEntitiesJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkExtractEntitiesJob.java index 9f5a91d3c..f2a1aa4d7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkExtractEntitiesJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkExtractEntitiesJob.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.graph.sx; +package eu.dnetlib.dhp.sx.graph; import com.jayway.jsonpath.JsonPath; import eu.dnetlib.dhp.application.ArgumentApplicationParser; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSXGeneratePidSimlarity.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSXGeneratePidSimlarity.java index c3e55ca2f..806140160 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSXGeneratePidSimlarity.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSXGeneratePidSimlarity.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.graph.sx; +package eu.dnetlib.dhp.sx.graph; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.DHPUtils; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerCreateRawGraphJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerCreateRawGraphJob.java index 5d8a35c1b..36d3cf540 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerCreateRawGraphJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerCreateRawGraphJob.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.graph.sx; +package eu.dnetlib.dhp.sx.graph; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporter.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporter.java index 96e2c0826..90606f1b8 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporter.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporter.java @@ -1,9 +1,9 @@ -package eu.dnetlib.dhp.graph.sx; +package eu.dnetlib.dhp.sx.graph; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.graph.sx.parser.DatasetScholexplorerParser; -import eu.dnetlib.dhp.graph.sx.parser.PublicationScholexplorerParser; +import eu.dnetlib.dhp.sx.graph.parser.DatasetScholexplorerParser; +import eu.dnetlib.dhp.sx.graph.parser.PublicationScholexplorerParser; import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.scholexplorer.relation.RelationMapper; import org.apache.commons.io.IOUtils; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java index f3f81013c..ca20c0aba 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.graph.sx.parser; +package eu.dnetlib.dhp.sx.graph.parser; import eu.dnetlib.dhp.parser.utility.VtdUtilityParser; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java index 7e3f06e22..2ba2bd519 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.graph.sx.parser; +package eu.dnetlib.dhp.sx.graph.parser; import com.ximpleware.AutoPilot; import com.ximpleware.VTDGen; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java index 456b19064..b8b38515b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.graph.sx.parser; +package eu.dnetlib.dhp.sx.graph.parser; import com.ximpleware.AutoPilot; import com.ximpleware.VTDGen; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml index 918cc652a..4da737c33 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml @@ -55,7 +55,7 @@ ${jobTracker} ${nameNode} - eu.dnetlib.dhp.graph.sx.ImportDataFromMongo + eu.dnetlib.dhp.sx.graph.ImportDataFromMongo -t${targetPath} -n${nameNode} -u${user} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml index 01fdec2ef..46e2dc3f9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml @@ -54,7 +54,7 @@ yarn-cluster cluster Extract ${entities} - eu.dnetlib.dhp.graph.sx.SparkExtractEntitiesJob + eu.dnetlib.dhp.sx.graph.SparkExtractEntitiesJob dhp-graph-mapper-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/workflow.xml index cf66ab6e6..4d54b2afb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/workflow.xml @@ -45,7 +45,7 @@ yarn-cluster cluster Merge ${entity} - eu.dnetlib.dhp.graph.sx.SparkScholexplorerCreateRawGraphJob + eu.dnetlib.dhp.sx.graph.SparkScholexplorerCreateRawGraphJob dhp-graph-mapper-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} -mt yarn-cluster diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/ScholexplorerParserTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/ScholexplorerParserTest.java index 0717efe4a..5741dd628 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/ScholexplorerParserTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/ScholexplorerParserTest.java @@ -1,9 +1,9 @@ -package eu.dnetlib.dhp.graph.sx; +package eu.dnetlib.dhp.sx.graph; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; -import eu.dnetlib.dhp.graph.sx.parser.DatasetScholexplorerParser; +import eu.dnetlib.dhp.sx.graph.parser.DatasetScholexplorerParser; import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.scholexplorer.relation.RelationMapper; import org.apache.commons.io.IOUtils; diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporterTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporterTest.java index f33340547..4c4d5372c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporterTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporterTest.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.graph.sx; +package eu.dnetlib.dhp.sx.graph; diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerMergeEntitiesJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerMergeEntitiesJobTest.java index 623c38112..f080b36cb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerMergeEntitiesJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerMergeEntitiesJobTest.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.graph.sx; +package eu.dnetlib.dhp.sx.graph;