WIP SparkCreateMergeRels distinct relations

This commit is contained in:
Claudio Atzori 2020-07-13 15:31:38 +02:00
commit 4c101a9d66
3 changed files with 105 additions and 11 deletions

View File

@ -19,15 +19,16 @@ public class ConnectedComponent implements Serializable {
private Set<String> docIds; private Set<String> docIds;
private String ccId; private String ccId;
public ConnectedComponent() {
}
public ConnectedComponent(Set<String> docIds, final int cut) { public ConnectedComponent(Set<String> docIds, final int cut) {
this.docIds = docIds; this.docIds = docIds;
createID(); createID();
if (cut > 0 && docIds.size() > cut) { if (cut > 0 && docIds.size() > cut) {
docIds = docIds.stream().filter(s -> !ccId.equalsIgnoreCase(s)).limit(cut - 1).collect(Collectors.toSet()); this.docIds = docIds
docIds.add(ccId); .stream()
.filter(s -> !ccId.equalsIgnoreCase(s))
.limit(cut - 1)
.collect(Collectors.toSet());
this.docIds.add(ccId);
} }
} }

View File

@ -22,8 +22,7 @@
"paramLongName": "cutConnectedComponent", "paramLongName": "cutConnectedComponent",
"paramDescription": "the number of maximum elements that belongs to a connected components", "paramDescription": "the number of maximum elements that belongs to a connected components",
"paramRequired": false "paramRequired": false
} },
,
{ {
"paramName": "w", "paramName": "w",
"paramLongName": "workingPath", "paramLongName": "workingPath",

View File

@ -3,6 +3,8 @@ package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory; import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.lenient;
@ -11,6 +13,9 @@ import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -18,6 +23,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -198,6 +204,94 @@ public class SparkDedupTest implements Serializable {
@Test @Test
@Order(2) @Order(2)
public void cutMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateMergeRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
parser
.parseArgument(
new String[] {
"-i",
testGraphBasePath,
"-asi",
testActionSetId,
"-la",
"lookupurl",
"-w",
testOutputBasePath,
"-cc",
"3"
});
new SparkCreateMergeRels(parser, spark).run(isLookUpService);
long orgs_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
long pubs_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
long sw_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
long ds_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
long orp_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
assertEquals(0, orgs_mergerel);
assertEquals(0, pubs_mergerel);
assertEquals(0, sw_mergerel);
assertEquals(0, ds_mergerel);
assertEquals(0, orp_mergerel);
}
@Test
@Order(3)
public void createMergeRelsTest() throws Exception { public void createMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -251,7 +345,7 @@ public class SparkDedupTest implements Serializable {
} }
@Test @Test
@Order(3) @Order(4)
public void createDedupRecordTest() throws Exception { public void createDedupRecordTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -298,7 +392,7 @@ public class SparkDedupTest implements Serializable {
} }
@Test @Test
@Order(4) @Order(5)
public void updateEntityTest() throws Exception { public void updateEntityTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -414,7 +508,7 @@ public class SparkDedupTest implements Serializable {
} }
@Test @Test
@Order(5) @Order(6)
public void propagateRelationTest() throws Exception { public void propagateRelationTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -464,7 +558,7 @@ public class SparkDedupTest implements Serializable {
} }
@Test @Test
@Order(6) @Order(7)
public void testRelations() throws Exception { public void testRelations() throws Exception {
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10); testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10);
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2); testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);