diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java index ed3ac8231..5f81669e9 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java @@ -91,6 +91,8 @@ public class DedupRecordFactory { } }); p.setDateofacceptance(DatePicker.pick(dateofacceptance)); + if (p.getDataInfo() == null) + p.setDataInfo(new DataInfo()); p.getDataInfo().setTrust("0.9"); p.setLastupdatetimestamp(ts); return p; @@ -121,6 +123,8 @@ public class DedupRecordFactory { } }); d.setDateofacceptance(DatePicker.pick(dateofacceptance)); + if (d.getDataInfo() == null) + d.setDataInfo(new DataInfo()); d.getDataInfo().setTrust("0.9"); d.setLastupdatetimestamp(ts); return d; @@ -143,6 +147,8 @@ public class DedupRecordFactory { throw new RuntimeException(exc); } }); + if (p.getDataInfo() == null) + p.setDataInfo(new DataInfo()); p.getDataInfo().setTrust("0.9"); p.setLastupdatetimestamp(ts); return p; @@ -170,6 +176,8 @@ public class DedupRecordFactory { } }); s.setDateofacceptance(DatePicker.pick(dateofacceptance)); + if (s.getDataInfo() == null) + s.setDataInfo(new DataInfo()); s.getDataInfo().setTrust("0.9"); s.setLastupdatetimestamp(ts); return s; @@ -189,6 +197,8 @@ public class DedupRecordFactory { throw new RuntimeException(exc); } }); + if (d.getDataInfo() == null) + d.setDataInfo(new DataInfo()); d.getDataInfo().setTrust("0.9"); d.setLastupdatetimestamp(ts); return d; @@ -226,6 +236,8 @@ public class DedupRecordFactory { { o.setDataInfo(new DataInfo()); } + if (o.getDataInfo() == null) + o.setDataInfo(new DataInfo()); o.getDataInfo().setTrust("0.9"); o.setLastupdatetimestamp(ts); @@ -256,6 +268,8 @@ public class DedupRecordFactory { throw new RuntimeException(exc); } }); + if (o.getDataInfo() == null) + o.setDataInfo(new DataInfo()); o.setDateofacceptance(DatePicker.pick(dateofacceptance)); o.getDataInfo().setTrust("0.9"); o.setLastupdatetimestamp(ts); diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/Deduper.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/Deduper.java index 51b991da5..7206f892f 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/Deduper.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/Deduper.java @@ -119,7 +119,7 @@ public class Deduper implements Serializable { .reduceByKey((Function2, List, List>) (v1, v2) -> { v1.addAll(v2); v1.sort(Comparator.comparing(a -> a.getFieldMap().get(of).stringValue())); - if (v1.size()> maxQueueSize) + if (v1.size() > maxQueueSize) return new ArrayList<>(v1.subList(0, maxQueueSize)); return v1; }); @@ -146,10 +146,13 @@ public class Deduper implements Serializable { Map accumulators = DedupUtility.constructAccumulator(config, context.sc()); return blocks.flatMapToPair((PairFlatMapFunction>, String, String>) it -> { - final SparkReporter reporter = new SparkReporter(accumulators); - new BlockProcessor(config).processSortedBlock(it._1(), it._2(), reporter); - return reporter.getRelations().iterator(); - + try { + final SparkReporter reporter = new SparkReporter(accumulators); + new BlockProcessor(config).processSortedBlock(it._1(), it._2(), reporter); + return reporter.getRelations().iterator(); + } catch (Exception e) { + throw new RuntimeException(it._2().get(0).getIdentifier(), e); + } }).mapToPair( (PairFunction, String, Tuple2>) item -> new Tuple2>(item._1() + item._2(), item)) diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java index 9783e93d6..16e112c25 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java @@ -1,5 +1,9 @@ package eu.dnetlib.dedup; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; import eu.dnetlib.dedup.graph.ConnectedComponent; import eu.dnetlib.dedup.graph.GraphProcessor; import eu.dnetlib.dhp.application.ArgumentApplicationParser; @@ -43,18 +47,12 @@ public class SparkCreateConnectedComponent { final JavaPairRDD vertexes = sc.textFile(inputPath + "/" + entity) .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) .mapToPair((PairFunction) - s -> new Tuple2((long) s.hashCode(), s) + s -> new Tuple2(getHashcode(s), s) ); final Dataset similarityRelations = spark.read().load(DedupUtility.createSimRelPath(targetPath,entity)).as(Encoders.bean(Relation.class)); - - - final RDD> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(it.getSource().hashCode(), it.getTarget().hashCode(), it.getRelClass())).rdd(); - - - final JavaRDD cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, 20).toJavaRDD(); - - + final RDD> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd(); + final JavaRDD cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD(); final Dataset mergeRelation = spark.createDataset(cc.filter(k->k.getDocIds().size()>1).flatMap((FlatMapFunction) c -> c.getDocIds() .stream() @@ -72,9 +70,10 @@ public class SparkCreateConnectedComponent { tmp.add(r); return tmp.stream(); }).iterator()).rdd(), Encoders.bean(Relation.class)); - mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(targetPath,entity)); + } - + public static long getHashcode(final String id) { + return Hashing.murmur3_128().hashUnencodedChars(id).asLong(); } } diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml index 0e75aa072..5a00a5967 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml @@ -30,7 +30,7 @@ - + @@ -103,7 +103,7 @@ ${nameNode} yarn-cluster cluster - Create Connected Components + Create Dedup Record eu.dnetlib.dedup.SparkCreateDedupRecord dhp-dedup-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} diff --git a/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java b/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java index 19522e275..f294b10fe 100644 --- a/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java +++ b/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java @@ -1,6 +1,8 @@ package eu.dnetlib.dedup; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Publication; import org.apache.commons.io.FileUtils; @@ -67,5 +69,34 @@ public class SparkCreateDedupTest { } +// [20|grid________::6031f94bef015a37783268ec1e75f17f, 20|nsf_________::b12be9edf414df8ee66b4c52a2d8da46] +// [20|grid________::672e1e5cef49e68f124d3da5225a7357, 20|grid________::7a402604c3853c7a0af14f88f56bf7e1] +// [20|grid________::2fc05b35e11d915b220a66356053eae2, 20|grid________::b02fb3176eb38f6c572722550c07e7ab] +// [20|grid________::bc86248ab2b8d7955dcaf592ba342262, 20|corda_______::45a8ec964029278fb938805182e247a8] +// [20|doajarticles::74551f800ad1c81a6cd31c5162887b7f, 20|rcuk________::86dc9a83df05a58917f38ca09f814617] +// [20|nsf_________::5e837d8e6444cc298db314ea54ad2f4a, 20|snsf________::7b54715f0ec5c6a0a44672f45d98be8d] +// [20|corda__h2020::7ee7e57bad06b92c1a568dd61e10ba8c, 20|snsf________::2d4a2695221a3ce0c749ee34e064c0b3] +// [20|corda_______::25220a523550176dac9e5432dac43596, 20|grid________::9782f16a46650cbbfaaa2315109507d1] +// [20|nih_________::88c3b664dcc7af9e827f94ac964cd66c, 20|grid________::238d3ac0a7d119d5c8342a647f5245f5] +// [20|rcuk________::0582c20fcfb270f9ec1b19b0f0dcd881, 20|nsf_________::9afa48ddf0bc2cd4f3c41dc41daabcdb] +// [20|rcuk________::fbc445f8d24e569bc8b640dba86ae978, 20|corda_______::5a8a4094f1b68a88fc56e65cea7ebfa0] +// [20|rcuk________::7485257cd5caaf6316ba8062feea801d, 20|grid________::dded811e5f5a4c9f7ca8f9955e52ade7] +// [20|nih_________::0576dd270d29d5b7c23dd15a827ccdb9, 20|corda_______::10ca69f6a4a121f75fdde1feee226ce0] +// [20|corda__h2020::0429f6addf10e9b2939d65c6fb097ffd, 20|grid________::6563ec73057624d5ccc0cd050b302181] + + @Test + public void testHashCode() { + final String s1 = "20|grid________::6031f94bef015a37783268ec1e75f17f"; + final String s2 = "20|nsf_________::b12be9edf414df8ee66b4c52a2d8da46"; + + final HashFunction hashFunction = Hashing.murmur3_128(); + + System.out.println( s1.hashCode()); + System.out.println(hashFunction.hashUnencodedChars(s1).asLong()); + System.out.println( s2.hashCode()); + System.out.println(hashFunction.hashUnencodedChars(s2).asLong()); + + } + } diff --git a/dhp-workflows/dhp-dedup/src/test/resources/eu/dnetlib/dedup/conf/pub_dt.curr.conf.json b/dhp-workflows/dhp-dedup/src/test/resources/eu/dnetlib/dedup/conf/pub_dt.curr.conf.json index 568b0e962..18b048e9e 100644 --- a/dhp-workflows/dhp-dedup/src/test/resources/eu/dnetlib/dedup/conf/pub_dt.curr.conf.json +++ b/dhp-workflows/dhp-dedup/src/test/resources/eu/dnetlib/dedup/conf/pub_dt.curr.conf.json @@ -1,62 +1,102 @@ { - "wf" : { - "threshold" : "0.99", - "dedupRun" : "001", - "entityType" : "result", - "subEntityType" : "resulttype", - "subEntityValue" : "publication", - "orderField" : "title", - "queueMaxSize" : "2000", - "groupMaxSize" : "100", - "maxChildren" : "100", - "idPath": "$.id", - "slidingWindowSize" : "200", - "rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_isAffiliatedWith", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ], - "includeChildren" : "true" - }, - "pace" : { - "clustering" : [ - { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} }, - { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } }, - { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } + "wf": { + "threshold": "0.99", + "dedupRun": "001", + "entityType": "result", + "subEntityType": "resulttype", + "subEntityValue": "publication", + "orderField": "title", + "queueMaxSize": "2000", + "groupMaxSize": "100", + "maxChildren": "100", + "slidingWindowSize": "200", + "rootBuilder": [ + "result", + "resultProject_outcome_isProducedBy", + "resultResult_publicationDataset_isRelatedTo", + "resultResult_similarity_isAmongTopNSimilarDocuments", + "resultResult_similarity_hasAmongTopNSimilarDocuments", + "resultOrganization_affiliation_isAffiliatedWith", + "resultResult_part_hasPart", + "resultResult_part_isPartOf", + "resultResult_supplement_isSupplementTo", + "resultResult_supplement_isSupplementedBy", + "resultResult_version_isVersionOf" ], - - "decisionTree" : { + "includeChildren": "true", + "maxIterations": 20, + "idPath": "$.id" + }, + "pace": { + "clustering": [ + { + "name": "ngrampairs", + "fields": [ + "title" + ], + "params": { + "max": "1", + "ngramLen": "3" + } + }, + { + "name": "suffixprefix", + "fields": [ + "title" + ], + "params": { + "max": "1", + "len": "3" + } + }, + { + "name": "lowercase", + "fields": [ + "doi" + ], + "params": {} + } + ], + "decisionTree": { "start": { "fields": [ { "field": "pid", - "comparator": "pidMatch", - "weight": 1, + "comparator": "jsonListMatch", + "weight": 1.0, "countIfUndefined": "false", - "params": {} + "params": { + "threshold": "0.5", + "jpath_value": "$.value", + "jpath_classid": "$.qualifier.classid" + } } ], - "threshold": 1, - "aggregation": "SC", + "threshold": 1.0, + "aggregation": "MAX", "positive": "MATCH", - "negative": "NO_MATCH", + "negative": "layer2", "undefined": "layer2", - "ignoreUndefined": "false" + "ignoreUndefined": "true" }, "layer2": { "fields": [ { "field": "title", "comparator": "titleVersionMatch", - "weight": 1, - "countIfUndefined": "true", + "weight": 1.0, + "countIfUndefined": "false", "params": {} }, { "field": "authors", "comparator": "sizeMatch", - "weight": 1, + "weight": 1.0, "countIfUndefined": "false", "params": {} } ], - "threshold": 1, + "threshold": 1.0, "aggregation": "NC", "positive": "layer3", "negative": "NO_MATCH", @@ -67,30 +107,53 @@ "fields": [ { "field": "title", - "comparator": "LevensteinTitle", - "weight": 1, + "comparator": "levensteinTitle", + "weight": 1.0, "countIfUndefined": "true", "params": {} } ], "threshold": 0.99, - "aggregation": "W_MEAN", + "aggregation": "SUM", "positive": "MATCH", "negative": "NO_MATCH", "undefined": "NO_MATCH", - "ignoreUndefined": "false" + "ignoreUndefined": "true" } }, - "model" : [ - { "name" : "doi", "type" : "String", "path" : "$.pid[?(@.qualifier.classid ==\"doi\")].value" }, - { "name" : "pid", "type" : "JSON","path" : "$.pid", "overrideMatch" : "true" }, - { "name" : "title", "type" : "String", "path" : "$.title[?(@.qualifier.classid ==\"main title\")].value", "length" : 250, "size" : 5 }, - { "name" : "authors", "type" : "List", "path" : "$.author[*].fullname", "size" : 200 }, - { "name" : "resulttype", "type" : "String", "path" : "$.resulttype.classid" } + "model": [ + { + "name": "doi", + "type": "String", + "path": "$.pid[?(@.qualifier.classid == 'doi')].value" + }, + { + "name": "pid", + "type": "JSON", + "path": "$.pid", + "overrideMatch": "true" + }, + { + "name": "title", + "type": "String", + "path": "$.title[?(@.qualifier.classid == 'main title')].value", + "length": 250, + "size": 5 + }, + { + "name": "authors", + "type": "List", + "path": "$.author[*].fullname", + "size": 200 + }, + { + "name": "resulttype", + "type": "String", + "path": "$.resulttype.classid" + } ], - "synonyms": {}, - "blacklists" : { - "title" : [ + "blacklists": { + "title": [ "^Inside Front Cover$", "(?i)^Poster presentations$", "^THE ASSOCIATION AND THE GENERAL MEDICAL COUNCIL$", @@ -102,7 +165,6 @@ "^Cartas? ao editor Letters? to the Editor$", "^Note from the Editor$", "^Anesthesia Abstract$", - "^Annual report$", "(?i)^“?THE RADICAL PREVENTION OF VENEREAL DISEASE\\.?”?$", "(?i)^Graph and Table of Infectious Diseases?$", @@ -122,14 +184,12 @@ "^Cálculo de concentraciones en disoluciones acuosas. Ejercicio interactivo\\..*\\.$", "(?i)^Genetic and functional analyses of SHANK2 mutations suggest a multiple hit model of Autism spectrum disorders?\\.?$", "^Gushi hakubutsugaku$", - "^Starobosanski nadpisi u Bosni i Hercegovini \\(.*\\)$", "^Intestinal spirocha?etosis$", "^Treatment of Rodent Ulcer$", "(?i)^\\W*Cloud Computing\\W*$", "^Compendio mathematico : en que se contienen todas las materias mas principales de las Ciencias que tratan de la cantidad$", "^Free Communications, Poster Presentations: Session [A-F]$", - "^“The Historical Aspects? of Quackery\\.?”$", "^A designated centre for people with disabilities operated by St John of God Community Services (Limited|Ltd), Louth$", "^P(er|re)-Mile Premiums for Auto Insurance\\.?$", @@ -150,10 +210,8 @@ "(?i)^Measurement of the pseudorapidity and centrality dependence of the transverse energy density in Pb(-?)Pb collisions at.*tev(\\.?)$", "(?i)^Search for resonances decaying into top-quark pairs using fully hadronic decays in pp collisions with ATLAS at.*TeV$", "(?i)^Search for neutral minimal supersymmetric standard model Higgs bosons decaying to tau pairs in pp collisions at.*tev$", - "(?i)^Relatório de Estágio (de|em) Angiologia e Cirurgia Vascular$", "^Aus der AGMB$", - "^Znanstveno-stručni prilozi$", "(?i)^Zhodnocení finanční situace podniku a návrhy na zlepšení$", "(?i)^Evaluation of the Financial Situation in the Firm and Proposals to its Improvement$", @@ -190,7 +248,6 @@ "(?i)^RUBRIKA UREDNIKA$", "^A Matching Model of the Academic Publication Market$", "^Yōgaku kōyō$", - "^Internetový marketing$", "^Internet marketing$", "^Chūtō kokugo dokuhon$", @@ -223,21 +280,17 @@ "^Information System Assessment and Proposal for ICT Modification$", "^Stresové zatížení pracovníků ve vybrané profesi$", "^Stress load in a specific job$", - "^Sunday: Poster Sessions, Pt.*$", "^Monday: Poster Sessions, Pt.*$", "^Wednesday: Poster Sessions, Pt.*", "^Tuesday: Poster Sessions, Pt.*$", - "^Analýza reklamy$", "^Analysis of advertising$", - "^Shōgaku shūshinsho$", "^Shōgaku sansū$", "^Shintei joshi kokubun$", "^Taishō joshi kokubun dokuhon$", "^Joshi kokubun$", - "^Účetní uzávěrka a účetní závěrka v ČR$", "(?i)^The \"?Causes\"? of Cancer$", "^Normas para la publicación de artículos$", @@ -256,7 +309,6 @@ "^Abdominal [Aa]ortic [Aa]neurysms.*$", "^Pseudomyxoma peritonei$", "^Kazalo autora$", - "(?i)^uvodna riječ$", "^Motivace jako způsob vedení lidí$", "^Motivation as a leadership$", @@ -329,6 +381,7 @@ "(?i)^.*authors['’′]? reply\\.?$", "(?i)^.*authors['’′]? response\\.?$" ] - } + }, + "synonyms": {} } } \ No newline at end of file