WIP SparkCreateMergeRels distinct relations

This commit is contained in:
Claudio Atzori 2020-07-13 15:30:57 +02:00
parent d561b2dd21
commit 8a612d861a
9 changed files with 61 additions and 52 deletions

View File

@ -80,16 +80,10 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
try { try {
cut = Integer.parseInt(parser.get("cutConnectedComponent")); cut = Integer.parseInt(parser.get("cutConnectedComponent"));
} catch (Throwable e) { } catch (Throwable e) {
log.error("unable to parse "+parser.get(" cut-off threshold")); log.error("unable to parse " + parser.get(" cut-off threshold"));
} }
log.info("graphBasePath: '{}'", graphBasePath); log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl); log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId); log.info("actionSetId: '{}'", actionSetId);
@ -134,9 +128,6 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath); mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath);
} }
} }

View File

@ -34,6 +34,8 @@ public class SparkCreateSimRels extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class); private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class);
public static final int NUM_PARTITIONS = 10000;
public SparkCreateSimRels(ArgumentApplicationParser parser, SparkSession spark) { public SparkCreateSimRels(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark); super(parser, spark);
} }
@ -48,13 +50,6 @@ public class SparkCreateSimRels extends AbstractSparkAction {
parser.parseArgument(args); parser.parseArgument(args);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf
.registerKryoClasses(
new Class[] {
MapDocument.class, FieldListImpl.class, FieldValueImpl.class, Block.class
});
new SparkCreateSimRels(parser, getSparkSession(conf)) new SparkCreateSimRels(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
} }
@ -88,7 +83,7 @@ public class SparkCreateSimRels extends AbstractSparkAction {
JavaPairRDD<String, MapDocument> mapDocuments = sc JavaPairRDD<String, MapDocument> mapDocuments = sc
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
.repartition(10000) .repartition(NUM_PARTITIONS)
.mapToPair( .mapToPair(
(PairFunction<String, String, MapDocument>) s -> { (PairFunction<String, String, MapDocument>) s -> {
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
@ -98,21 +93,15 @@ public class SparkCreateSimRels extends AbstractSparkAction {
// create blocks for deduplication // create blocks for deduplication
JavaPairRDD<String, Block> blocks = Deduper JavaPairRDD<String, Block> blocks = Deduper
.createSortedBlocks(mapDocuments, dedupConf) .createSortedBlocks(mapDocuments, dedupConf)
.repartition(10000); .repartition(NUM_PARTITIONS);
// create relations by comparing only elements in the same group // create relations by comparing only elements in the same group
Deduper Deduper
.computeRelations(sc, blocks, dedupConf) .computeRelations(sc, blocks, dedupConf)
.map(t -> createSimRel(t._1(), t._2(), entity)) .map(t -> createSimRel(t._1(), t._2(), entity))
.repartition(10000) .repartition(NUM_PARTITIONS)
.map(r -> OBJECT_MAPPER.writeValueAsString(r)) .map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(outputPath); .saveAsTextFile(outputPath);
// save the simrel in the workingdir
/*
* spark .createDataset(relations.rdd(), Encoders.bean(Relation.class)) .write() .mode(SaveMode.Append)
* .save(outputPath);
*/
} }
} }

View File

@ -4,7 +4,9 @@ package eu.dnetlib.dhp.oa.dedup;
import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.col;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -95,11 +97,17 @@ public class SparkPropagateRelation extends AbstractSparkAction {
FieldType.TARGET, FieldType.TARGET,
getDeletedFn()); getDeletedFn());
save(distinctRelations(newRels.union(updated).union(mergeRels)), outputRelationPath, SaveMode.Overwrite); save(
newRels
.union(updated)
.union(mergeRels)
.map((MapFunction<Relation, Relation>) r -> r, Encoders.kryo(Relation.class)),
outputRelationPath, SaveMode.Overwrite);
} }
private Dataset<Relation> distinctRelations(Dataset<Relation> rels) { private Dataset<Relation> distinctRelations(Dataset<Relation> rels) {
return rels return rels
.filter(getRelationFilterFunction())
.groupByKey((MapFunction<Relation, String>) r -> ModelSupport.idFn().apply(r), Encoders.STRING()) .groupByKey((MapFunction<Relation, String>) r -> ModelSupport.idFn().apply(r), Encoders.STRING())
.agg(new RelationAggregator().toColumn()) .agg(new RelationAggregator().toColumn())
.map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class)); .map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class));
@ -119,6 +127,14 @@ public class SparkPropagateRelation extends AbstractSparkAction {
.map(mapFn, Encoders.bean(Relation.class)); .map(mapFn, Encoders.bean(Relation.class));
} }
private FilterFunction<Relation> getRelationFilterFunction() {
return (FilterFunction<Relation>) r -> StringUtils.isNotBlank(r.getSource()) ||
StringUtils.isNotBlank(r.getTarget()) ||
StringUtils.isNotBlank(r.getRelClass()) ||
StringUtils.isNotBlank(r.getSubRelType()) ||
StringUtils.isNotBlank(r.getRelClass());
}
private static MapFunction<String, Relation> patchRelFn() { private static MapFunction<String, Relation> patchRelFn() {
return value -> { return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);

View File

@ -22,12 +22,11 @@ public class ConnectedComponent implements Serializable {
public ConnectedComponent() { public ConnectedComponent() {
} }
public ConnectedComponent(Set<String> docIds, final int cut) { public ConnectedComponent(Set<String> docIds, final int cut) {
this.docIds = docIds; this.docIds = docIds;
createID(); createID();
if (cut > 0 && docIds.size() > cut){ if (cut > 0 && docIds.size() > cut) {
docIds = docIds.stream().filter(s -> !ccId.equalsIgnoreCase(s)).limit(cut -1).collect(Collectors.toSet()); docIds = docIds.stream().filter(s -> !ccId.equalsIgnoreCase(s)).limit(cut - 1).collect(Collectors.toSet());
docIds.add(ccId); docIds.add(ccId);
} }
} }

View File

@ -166,23 +166,31 @@ public class SparkDedupTest implements Serializable {
long orgs_simrel = spark long orgs_simrel = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel") .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
.count(); .count();
long pubs_simrel = spark long pubs_simrel = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel") .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
.count(); .count();
long sw_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/software_simrel").count();
long ds_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel").count(); long sw_simrel = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
.count();
long ds_simrel = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
.count();
long orp_simrel = spark long orp_simrel = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel") .textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
.count(); .count();
assertEquals(3432, orgs_simrel); assertEquals(3432, orgs_simrel);
assertEquals(7054, pubs_simrel); assertEquals(7152, pubs_simrel);
assertEquals(344, sw_simrel); assertEquals(344, sw_simrel);
assertEquals(458, ds_simrel); assertEquals(458, ds_simrel);
assertEquals(6750, orp_simrel); assertEquals(6750, orp_simrel);
@ -225,8 +233,10 @@ public class SparkDedupTest implements Serializable {
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
.count(); .count();
long ds_mergerel = spark
long ds_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel").count(); .read()
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
.count();
long orp_mergerel = spark long orp_mergerel = spark
.read() .read()
@ -234,7 +244,7 @@ public class SparkDedupTest implements Serializable {
.count(); .count();
assertEquals(1276, orgs_mergerel); assertEquals(1276, orgs_mergerel);
assertEquals(1440, pubs_mergerel); assertEquals(1442, pubs_mergerel);
assertEquals(288, sw_mergerel); assertEquals(288, sw_mergerel);
assertEquals(472, ds_mergerel); assertEquals(472, ds_mergerel);
assertEquals(718, orp_mergerel); assertEquals(718, orp_mergerel);

View File

@ -6,10 +6,10 @@
"subEntityType" : "resulttype", "subEntityType" : "resulttype",
"subEntityValue" : "dataset", "subEntityValue" : "dataset",
"orderField" : "title", "orderField" : "title",
"queueMaxSize" : "800", "queueMaxSize" : "100",
"groupMaxSize" : "100", "groupMaxSize" : "100",
"maxChildren" : "100", "maxChildren" : "100",
"slidingWindowSize" : "80", "slidingWindowSize" : "100",
"rootBuilder" : ["result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ], "rootBuilder" : ["result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true", "includeChildren" : "true",
"idPath" : "$.id", "idPath" : "$.id",
@ -17,7 +17,8 @@
}, },
"pace" : { "pace" : {
"clustering" : [ "clustering" : [
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
], ],
"decisionTree" : { "decisionTree" : {

View File

@ -6,10 +6,10 @@
"subEntityType" : "resulttype", "subEntityType" : "resulttype",
"subEntityValue" : "otherresearchproduct", "subEntityValue" : "otherresearchproduct",
"orderField" : "title", "orderField" : "title",
"queueMaxSize" : "800", "queueMaxSize" : "100",
"groupMaxSize" : "100", "groupMaxSize" : "100",
"maxChildren" : "100", "maxChildren" : "100",
"slidingWindowSize" : "80", "slidingWindowSize" : "100",
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ], "rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true", "includeChildren" : "true",
"idPath" : "$.id", "idPath" : "$.id",
@ -17,7 +17,8 @@
}, },
"pace" : { "pace" : {
"clustering" : [ "clustering" : [
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
], ],
"decisionTree" : { "decisionTree" : {

View File

@ -6,10 +6,10 @@
"subEntityType": "resulttype", "subEntityType": "resulttype",
"subEntityValue": "publication", "subEntityValue": "publication",
"orderField": "title", "orderField": "title",
"queueMaxSize": "800", "queueMaxSize": "100",
"groupMaxSize": "100", "groupMaxSize": "100",
"maxChildren": "100", "maxChildren": "100",
"slidingWindowSize": "80", "slidingWindowSize": "100",
"rootBuilder": [ "rootBuilder": [
"result", "result",
"resultProject_outcome_isProducedBy", "resultProject_outcome_isProducedBy",
@ -29,7 +29,8 @@
}, },
"pace": { "pace": {
"clustering" : [ "clustering" : [
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
], ],
"decisionTree": { "decisionTree": {

View File

@ -6,10 +6,10 @@
"subEntityType" : "resulttype", "subEntityType" : "resulttype",
"subEntityValue" : "software", "subEntityValue" : "software",
"orderField" : "title", "orderField" : "title",
"queueMaxSize" : "800", "queueMaxSize" : "100",
"groupMaxSize" : "100", "groupMaxSize" : "100",
"maxChildren" : "100", "maxChildren" : "100",
"slidingWindowSize" : "80", "slidingWindowSize" : "100",
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ], "rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true", "includeChildren" : "true",
"idPath" : "$.id", "idPath" : "$.id",
@ -17,8 +17,9 @@
}, },
"pace" : { "pace" : {
"clustering" : [ "clustering" : [
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "lowercase", "fields" : [ "doi", "url" ], "params" : { } } { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
], ],
"decisionTree": { "decisionTree": {
"start": { "start": {