fixed stuff

This commit is contained in:
Sandro La Bruzzo 2019-12-18 16:28:22 +01:00
parent 76efcde4fd
commit dd21db7036
6 changed files with 176 additions and 76 deletions

View File

@ -91,6 +91,8 @@ public class DedupRecordFactory {
}
});
p.setDateofacceptance(DatePicker.pick(dateofacceptance));
if (p.getDataInfo() == null)
p.setDataInfo(new DataInfo());
p.getDataInfo().setTrust("0.9");
p.setLastupdatetimestamp(ts);
return p;
@ -121,6 +123,8 @@ public class DedupRecordFactory {
}
});
d.setDateofacceptance(DatePicker.pick(dateofacceptance));
if (d.getDataInfo() == null)
d.setDataInfo(new DataInfo());
d.getDataInfo().setTrust("0.9");
d.setLastupdatetimestamp(ts);
return d;
@ -143,6 +147,8 @@ public class DedupRecordFactory {
throw new RuntimeException(exc);
}
});
if (p.getDataInfo() == null)
p.setDataInfo(new DataInfo());
p.getDataInfo().setTrust("0.9");
p.setLastupdatetimestamp(ts);
return p;
@ -170,6 +176,8 @@ public class DedupRecordFactory {
}
});
s.setDateofacceptance(DatePicker.pick(dateofacceptance));
if (s.getDataInfo() == null)
s.setDataInfo(new DataInfo());
s.getDataInfo().setTrust("0.9");
s.setLastupdatetimestamp(ts);
return s;
@ -189,6 +197,8 @@ public class DedupRecordFactory {
throw new RuntimeException(exc);
}
});
if (d.getDataInfo() == null)
d.setDataInfo(new DataInfo());
d.getDataInfo().setTrust("0.9");
d.setLastupdatetimestamp(ts);
return d;
@ -226,6 +236,8 @@ public class DedupRecordFactory {
{
o.setDataInfo(new DataInfo());
}
if (o.getDataInfo() == null)
o.setDataInfo(new DataInfo());
o.getDataInfo().setTrust("0.9");
o.setLastupdatetimestamp(ts);
@ -256,6 +268,8 @@ public class DedupRecordFactory {
throw new RuntimeException(exc);
}
});
if (o.getDataInfo() == null)
o.setDataInfo(new DataInfo());
o.setDateofacceptance(DatePicker.pick(dateofacceptance));
o.getDataInfo().setTrust("0.9");
o.setLastupdatetimestamp(ts);

View File

@ -119,7 +119,7 @@ public class Deduper implements Serializable {
.reduceByKey((Function2<List<MapDocument>, List<MapDocument>, List<MapDocument>>) (v1, v2) -> {
v1.addAll(v2);
v1.sort(Comparator.comparing(a -> a.getFieldMap().get(of).stringValue()));
if (v1.size()> maxQueueSize)
if (v1.size() > maxQueueSize)
return new ArrayList<>(v1.subList(0, maxQueueSize));
return v1;
});
@ -146,10 +146,13 @@ public class Deduper implements Serializable {
Map<String, LongAccumulator> accumulators = DedupUtility.constructAccumulator(config, context.sc());
return blocks.flatMapToPair((PairFlatMapFunction<Tuple2<String, List<MapDocument>>, String, String>) it -> {
final SparkReporter reporter = new SparkReporter(accumulators);
new BlockProcessor(config).processSortedBlock(it._1(), it._2(), reporter);
return reporter.getRelations().iterator();
try {
final SparkReporter reporter = new SparkReporter(accumulators);
new BlockProcessor(config).processSortedBlock(it._1(), it._2(), reporter);
return reporter.getRelations().iterator();
} catch (Exception e) {
throw new RuntimeException(it._2().get(0).getIdentifier(), e);
}
}).mapToPair(
(PairFunction<Tuple2<String, String>, String, Tuple2<String, String>>) item ->
new Tuple2<String, Tuple2<String, String>>(item._1() + item._2(), item))

View File

@ -1,5 +1,9 @@
package eu.dnetlib.dedup;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import eu.dnetlib.dedup.graph.ConnectedComponent;
import eu.dnetlib.dedup.graph.GraphProcessor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
@ -43,18 +47,12 @@ public class SparkCreateConnectedComponent {
final JavaPairRDD<Object, String> vertexes = sc.textFile(inputPath + "/" + entity)
.map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s))
.mapToPair((PairFunction<String, Object, String>)
s -> new Tuple2<Object, String>((long) s.hashCode(), s)
s -> new Tuple2<Object, String>(getHashcode(s), s)
);
final Dataset<Relation> similarityRelations = spark.read().load(DedupUtility.createSimRelPath(targetPath,entity)).as(Encoders.bean(Relation.class));
final RDD<Edge<String>> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(it.getSource().hashCode(), it.getTarget().hashCode(), it.getRelClass())).rdd();
final JavaRDD<ConnectedComponent> cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, 20).toJavaRDD();
final RDD<Edge<String>> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd();
final JavaRDD<ConnectedComponent> cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD();
final Dataset<Relation> mergeRelation = spark.createDataset(cc.filter(k->k.getDocIds().size()>1).flatMap((FlatMapFunction<ConnectedComponent, Relation>) c ->
c.getDocIds()
.stream()
@ -72,9 +70,10 @@ public class SparkCreateConnectedComponent {
tmp.add(r);
return tmp.stream();
}).iterator()).rdd(), Encoders.bean(Relation.class));
mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(targetPath,entity));
}
public static long getHashcode(final String id) {
return Hashing.murmur3_128().hashUnencodedChars(id).asLong();
}
}

View File

@ -30,7 +30,7 @@
</property>
</parameters>
<start to="CreateDedupRecord"/>
<start to="CreateSimRels"/>
<kill name="Kill">
@ -103,7 +103,7 @@
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create Connected Components</name>
<name>Create Dedup Record</name>
<class>eu.dnetlib.dedup.SparkCreateDedupRecord</class>
<jar>dhp-dedup-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores}

View File

@ -1,6 +1,8 @@
package eu.dnetlib.dedup;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Publication;
import org.apache.commons.io.FileUtils;
@ -67,5 +69,34 @@ public class SparkCreateDedupTest {
}
// [20|grid________::6031f94bef015a37783268ec1e75f17f, 20|nsf_________::b12be9edf414df8ee66b4c52a2d8da46]
// [20|grid________::672e1e5cef49e68f124d3da5225a7357, 20|grid________::7a402604c3853c7a0af14f88f56bf7e1]
// [20|grid________::2fc05b35e11d915b220a66356053eae2, 20|grid________::b02fb3176eb38f6c572722550c07e7ab]
// [20|grid________::bc86248ab2b8d7955dcaf592ba342262, 20|corda_______::45a8ec964029278fb938805182e247a8]
// [20|doajarticles::74551f800ad1c81a6cd31c5162887b7f, 20|rcuk________::86dc9a83df05a58917f38ca09f814617]
// [20|nsf_________::5e837d8e6444cc298db314ea54ad2f4a, 20|snsf________::7b54715f0ec5c6a0a44672f45d98be8d]
// [20|corda__h2020::7ee7e57bad06b92c1a568dd61e10ba8c, 20|snsf________::2d4a2695221a3ce0c749ee34e064c0b3]
// [20|corda_______::25220a523550176dac9e5432dac43596, 20|grid________::9782f16a46650cbbfaaa2315109507d1]
// [20|nih_________::88c3b664dcc7af9e827f94ac964cd66c, 20|grid________::238d3ac0a7d119d5c8342a647f5245f5]
// [20|rcuk________::0582c20fcfb270f9ec1b19b0f0dcd881, 20|nsf_________::9afa48ddf0bc2cd4f3c41dc41daabcdb]
// [20|rcuk________::fbc445f8d24e569bc8b640dba86ae978, 20|corda_______::5a8a4094f1b68a88fc56e65cea7ebfa0]
// [20|rcuk________::7485257cd5caaf6316ba8062feea801d, 20|grid________::dded811e5f5a4c9f7ca8f9955e52ade7]
// [20|nih_________::0576dd270d29d5b7c23dd15a827ccdb9, 20|corda_______::10ca69f6a4a121f75fdde1feee226ce0]
// [20|corda__h2020::0429f6addf10e9b2939d65c6fb097ffd, 20|grid________::6563ec73057624d5ccc0cd050b302181]
@Test
public void testHashCode() {
final String s1 = "20|grid________::6031f94bef015a37783268ec1e75f17f";
final String s2 = "20|nsf_________::b12be9edf414df8ee66b4c52a2d8da46";
final HashFunction hashFunction = Hashing.murmur3_128();
System.out.println( s1.hashCode());
System.out.println(hashFunction.hashUnencodedChars(s1).asLong());
System.out.println( s2.hashCode());
System.out.println(hashFunction.hashUnencodedChars(s2).asLong());
}
}

View File

@ -1,62 +1,102 @@
{
"wf" : {
"threshold" : "0.99",
"dedupRun" : "001",
"entityType" : "result",
"subEntityType" : "resulttype",
"subEntityValue" : "publication",
"orderField" : "title",
"queueMaxSize" : "2000",
"groupMaxSize" : "100",
"maxChildren" : "100",
"idPath": "$.id",
"slidingWindowSize" : "200",
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_isAffiliatedWith", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true"
},
"pace" : {
"clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
"wf": {
"threshold": "0.99",
"dedupRun": "001",
"entityType": "result",
"subEntityType": "resulttype",
"subEntityValue": "publication",
"orderField": "title",
"queueMaxSize": "2000",
"groupMaxSize": "100",
"maxChildren": "100",
"slidingWindowSize": "200",
"rootBuilder": [
"result",
"resultProject_outcome_isProducedBy",
"resultResult_publicationDataset_isRelatedTo",
"resultResult_similarity_isAmongTopNSimilarDocuments",
"resultResult_similarity_hasAmongTopNSimilarDocuments",
"resultOrganization_affiliation_isAffiliatedWith",
"resultResult_part_hasPart",
"resultResult_part_isPartOf",
"resultResult_supplement_isSupplementTo",
"resultResult_supplement_isSupplementedBy",
"resultResult_version_isVersionOf"
],
"decisionTree" : {
"includeChildren": "true",
"maxIterations": 20,
"idPath": "$.id"
},
"pace": {
"clustering": [
{
"name": "ngrampairs",
"fields": [
"title"
],
"params": {
"max": "1",
"ngramLen": "3"
}
},
{
"name": "suffixprefix",
"fields": [
"title"
],
"params": {
"max": "1",
"len": "3"
}
},
{
"name": "lowercase",
"fields": [
"doi"
],
"params": {}
}
],
"decisionTree": {
"start": {
"fields": [
{
"field": "pid",
"comparator": "pidMatch",
"weight": 1,
"comparator": "jsonListMatch",
"weight": 1.0,
"countIfUndefined": "false",
"params": {}
"params": {
"threshold": "0.5",
"jpath_value": "$.value",
"jpath_classid": "$.qualifier.classid"
}
}
],
"threshold": 1,
"aggregation": "SC",
"threshold": 1.0,
"aggregation": "MAX",
"positive": "MATCH",
"negative": "NO_MATCH",
"negative": "layer2",
"undefined": "layer2",
"ignoreUndefined": "false"
"ignoreUndefined": "true"
},
"layer2": {
"fields": [
{
"field": "title",
"comparator": "titleVersionMatch",
"weight": 1,
"countIfUndefined": "true",
"weight": 1.0,
"countIfUndefined": "false",
"params": {}
},
{
"field": "authors",
"comparator": "sizeMatch",
"weight": 1,
"weight": 1.0,
"countIfUndefined": "false",
"params": {}
}
],
"threshold": 1,
"threshold": 1.0,
"aggregation": "NC",
"positive": "layer3",
"negative": "NO_MATCH",
@ -67,30 +107,53 @@
"fields": [
{
"field": "title",
"comparator": "LevensteinTitle",
"weight": 1,
"comparator": "levensteinTitle",
"weight": 1.0,
"countIfUndefined": "true",
"params": {}
}
],
"threshold": 0.99,
"aggregation": "W_MEAN",
"aggregation": "SUM",
"positive": "MATCH",
"negative": "NO_MATCH",
"undefined": "NO_MATCH",
"ignoreUndefined": "false"
"ignoreUndefined": "true"
}
},
"model" : [
{ "name" : "doi", "type" : "String", "path" : "$.pid[?(@.qualifier.classid ==\"doi\")].value" },
{ "name" : "pid", "type" : "JSON","path" : "$.pid", "overrideMatch" : "true" },
{ "name" : "title", "type" : "String", "path" : "$.title[?(@.qualifier.classid ==\"main title\")].value", "length" : 250, "size" : 5 },
{ "name" : "authors", "type" : "List", "path" : "$.author[*].fullname", "size" : 200 },
{ "name" : "resulttype", "type" : "String", "path" : "$.resulttype.classid" }
"model": [
{
"name": "doi",
"type": "String",
"path": "$.pid[?(@.qualifier.classid == 'doi')].value"
},
{
"name": "pid",
"type": "JSON",
"path": "$.pid",
"overrideMatch": "true"
},
{
"name": "title",
"type": "String",
"path": "$.title[?(@.qualifier.classid == 'main title')].value",
"length": 250,
"size": 5
},
{
"name": "authors",
"type": "List",
"path": "$.author[*].fullname",
"size": 200
},
{
"name": "resulttype",
"type": "String",
"path": "$.resulttype.classid"
}
],
"synonyms": {},
"blacklists" : {
"title" : [
"blacklists": {
"title": [
"^Inside Front Cover$",
"(?i)^Poster presentations$",
"^THE ASSOCIATION AND THE GENERAL MEDICAL COUNCIL$",
@ -102,7 +165,6 @@
"^Cartas? ao editor Letters? to the Editor$",
"^Note from the Editor$",
"^Anesthesia Abstract$",
"^Annual report$",
"(?i)^“?THE RADICAL PREVENTION OF VENEREAL DISEASE\\.?”?$",
"(?i)^Graph and Table of Infectious Diseases?$",
@ -122,14 +184,12 @@
"^Cálculo de concentraciones en disoluciones acuosas. Ejercicio interactivo\\..*\\.$",
"(?i)^Genetic and functional analyses of SHANK2 mutations suggest a multiple hit model of Autism spectrum disorders?\\.?$",
"^Gushi hakubutsugaku$",
"^Starobosanski nadpisi u Bosni i Hercegovini \\(.*\\)$",
"^Intestinal spirocha?etosis$",
"^Treatment of Rodent Ulcer$",
"(?i)^\\W*Cloud Computing\\W*$",
"^Compendio mathematico : en que se contienen todas las materias mas principales de las Ciencias que tratan de la cantidad$",
"^Free Communications, Poster Presentations: Session [A-F]$",
"^“The Historical Aspects? of Quackery\\.?”$",
"^A designated centre for people with disabilities operated by St John of God Community Services (Limited|Ltd), Louth$",
"^P(er|re)-Mile Premiums for Auto Insurance\\.?$",
@ -150,10 +210,8 @@
"(?i)^Measurement of the pseudorapidity and centrality dependence of the transverse energy density in Pb(-?)Pb collisions at.*tev(\\.?)$",
"(?i)^Search for resonances decaying into top-quark pairs using fully hadronic decays in pp collisions with ATLAS at.*TeV$",
"(?i)^Search for neutral minimal supersymmetric standard model Higgs bosons decaying to tau pairs in pp collisions at.*tev$",
"(?i)^Relatório de Estágio (de|em) Angiologia e Cirurgia Vascular$",
"^Aus der AGMB$",
"^Znanstveno-stručni prilozi$",
"(?i)^Zhodnocení finanční situace podniku a návrhy na zlepšení$",
"(?i)^Evaluation of the Financial Situation in the Firm and Proposals to its Improvement$",
@ -190,7 +248,6 @@
"(?i)^RUBRIKA UREDNIKA$",
"^A Matching Model of the Academic Publication Market$",
"^Yōgaku kōyō$",
"^Internetový marketing$",
"^Internet marketing$",
"^Chūtō kokugo dokuhon$",
@ -223,21 +280,17 @@
"^Information System Assessment and Proposal for ICT Modification$",
"^Stresové zatížení pracovníků ve vybrané profesi$",
"^Stress load in a specific job$",
"^Sunday: Poster Sessions, Pt.*$",
"^Monday: Poster Sessions, Pt.*$",
"^Wednesday: Poster Sessions, Pt.*",
"^Tuesday: Poster Sessions, Pt.*$",
"^Analýza reklamy$",
"^Analysis of advertising$",
"^Shōgaku shūshinsho$",
"^Shōgaku sansū$",
"^Shintei joshi kokubun$",
"^Taishō joshi kokubun dokuhon$",
"^Joshi kokubun$",
"^Účetní uzávěrka a účetní závěrka v ČR$",
"(?i)^The \"?Causes\"? of Cancer$",
"^Normas para la publicación de artículos$",
@ -256,7 +309,6 @@
"^Abdominal [Aa]ortic [Aa]neurysms.*$",
"^Pseudomyxoma peritonei$",
"^Kazalo autora$",
"(?i)^uvodna riječ$",
"^Motivace jako způsob vedení lidí$",
"^Motivation as a leadership$",
@ -329,6 +381,7 @@
"(?i)^.*authors[']? reply\\.?$",
"(?i)^.*authors[']? response\\.?$"
]
}
},
"synonyms": {}
}
}