1
0
Fork 0

implemented test for cut of connected component

This commit is contained in:
Sandro La Bruzzo 2020-07-13 15:28:17 +02:00
parent d561b2dd21
commit 9ef2385022
4 changed files with 107 additions and 23 deletions

View File

@ -80,16 +80,10 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
try {
cut = Integer.parseInt(parser.get("cutConnectedComponent"));
} catch (Throwable e) {
log.error("unable to parse " + parser.get(" cut-off threshold"));
}
log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId);
@ -134,9 +128,6 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath);
}
}

View File

@ -19,16 +19,16 @@ public class ConnectedComponent implements Serializable {
private Set<String> docIds;
private String ccId;
public ConnectedComponent() {
}
public ConnectedComponent(Set<String> docIds, final int cut) {
this.docIds = docIds;
createID();
if (cut > 0 && docIds.size() > cut) {
docIds = docIds.stream().filter(s -> !ccId.equalsIgnoreCase(s)).limit(cut -1).collect(Collectors.toSet());
docIds.add(ccId);
this.docIds = docIds
.stream()
.filter(s -> !ccId.equalsIgnoreCase(s))
.limit(cut - 1)
.collect(Collectors.toSet());
this.docIds.add(ccId);
}
}

View File

@ -22,8 +22,7 @@
"paramLongName": "cutConnectedComponent",
"paramDescription": "the number of maximum elements that belongs to a connected components",
"paramRequired": false
}
,
},
{
"paramName": "w",
"paramLongName": "workingPath",

View File

@ -3,6 +3,8 @@ package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient;
@ -11,6 +13,9 @@ import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
@ -18,6 +23,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
@ -190,6 +196,94 @@ public class SparkDedupTest implements Serializable {
@Test
@Order(2)
public void cutMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateMergeRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
parser
.parseArgument(
new String[] {
"-i",
testGraphBasePath,
"-asi",
testActionSetId,
"-la",
"lookupurl",
"-w",
testOutputBasePath,
"-cc",
"3"
});
new SparkCreateMergeRels(parser, spark).run(isLookUpService);
long orgs_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
long pubs_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
long sw_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
long ds_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
long orp_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
assertEquals(0, orgs_mergerel);
assertEquals(0, pubs_mergerel);
assertEquals(0, sw_mergerel);
assertEquals(0, ds_mergerel);
assertEquals(0, orp_mergerel);
}
@Test
@Order(3)
public void createMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -241,7 +335,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(3)
@Order(4)
public void createDedupRecordTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -288,7 +382,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(4)
@Order(5)
public void updateEntityTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -404,7 +498,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(5)
@Order(6)
public void propagateRelationTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -454,7 +548,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(6)
@Order(7)
public void testRelations() throws Exception {
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10);
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);