forked from D-Net/dnet-hadoop
recert file SparkDedupTest.java
This commit is contained in:
parent
49910aedca
commit
f4bd2b5619
|
@ -12,6 +12,7 @@ import java.io.IOException;
|
|||
import java.io.Serializable;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
@ -26,13 +27,7 @@ import org.apache.spark.sql.Dataset;
|
|||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.MethodOrderer;
|
||||
import org.junit.jupiter.api.Order;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestMethodOrder;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
|
@ -102,7 +97,8 @@ public class SparkDedupTest implements Serializable {
|
|||
IOUtils
|
||||
.toString(
|
||||
SparkDedupTest.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml")));
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml")));
|
||||
|
||||
lenient()
|
||||
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization")))
|
||||
|
@ -110,7 +106,8 @@ public class SparkDedupTest implements Serializable {
|
|||
IOUtils
|
||||
.toString(
|
||||
SparkDedupTest.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
|
||||
|
||||
lenient()
|
||||
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication")))
|
||||
|
@ -118,7 +115,8 @@ public class SparkDedupTest implements Serializable {
|
|||
IOUtils
|
||||
.toString(
|
||||
SparkDedupTest.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")));
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")));
|
||||
|
||||
lenient()
|
||||
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software")))
|
||||
|
@ -126,7 +124,8 @@ public class SparkDedupTest implements Serializable {
|
|||
IOUtils
|
||||
.toString(
|
||||
SparkDedupTest.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json")));
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json")));
|
||||
|
||||
lenient()
|
||||
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("dataset")))
|
||||
|
@ -134,7 +133,8 @@ public class SparkDedupTest implements Serializable {
|
|||
IOUtils
|
||||
.toString(
|
||||
SparkDedupTest.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json")));
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json")));
|
||||
|
||||
lenient()
|
||||
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("otherresearchproduct")))
|
||||
|
@ -142,21 +142,24 @@ public class SparkDedupTest implements Serializable {
|
|||
IOUtils
|
||||
.toString(
|
||||
SparkDedupTest.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json")));
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json")));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(1)
|
||||
public void createSimRelsTest() throws Exception {
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
SparkCreateSimRels.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json")));
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json")));
|
||||
|
||||
parser
|
||||
.parseArgument(new String[] {
|
||||
.parseArgument(
|
||||
new String[] {
|
||||
"-i", testGraphBasePath,
|
||||
"-asi", testActionSetId,
|
||||
"-la", "lookupurl",
|
||||
|
@ -166,27 +169,27 @@ public class SparkDedupTest implements Serializable {
|
|||
|
||||
new SparkCreateSimRels(parser, spark).run(isLookUpService);
|
||||
|
||||
final long orgs_simrel = spark
|
||||
long orgs_simrel = spark
|
||||
.read()
|
||||
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization"))
|
||||
.count();
|
||||
|
||||
final long pubs_simrel = spark
|
||||
long pubs_simrel = spark
|
||||
.read()
|
||||
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "publication"))
|
||||
.count();
|
||||
|
||||
final long sw_simrel = spark
|
||||
long sw_simrel = spark
|
||||
.read()
|
||||
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "software"))
|
||||
.count();
|
||||
|
||||
final long ds_simrel = spark
|
||||
long ds_simrel = spark
|
||||
.read()
|
||||
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "dataset"))
|
||||
.count();
|
||||
|
||||
final long orp_simrel = spark
|
||||
long orp_simrel = spark
|
||||
.read()
|
||||
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "otherresearchproduct"))
|
||||
.count();
|
||||
|
@ -202,14 +205,16 @@ public class SparkDedupTest implements Serializable {
|
|||
@Order(2)
|
||||
public void cutMergeRelsTest() throws Exception {
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
SparkCreateMergeRels.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
|
||||
|
||||
parser
|
||||
.parseArgument(new String[] {
|
||||
.parseArgument(
|
||||
new String[] {
|
||||
"-i",
|
||||
testGraphBasePath,
|
||||
"-asi",
|
||||
|
@ -224,7 +229,7 @@ public class SparkDedupTest implements Serializable {
|
|||
|
||||
new SparkCreateMergeRels(parser, spark).run(isLookUpService);
|
||||
|
||||
final long orgs_mergerel = spark
|
||||
long orgs_mergerel = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
|
||||
.as(Encoders.bean(Relation.class))
|
||||
|
@ -235,7 +240,7 @@ public class SparkDedupTest implements Serializable {
|
|||
.where("cnt > 3")
|
||||
.count();
|
||||
|
||||
final long pubs_mergerel = spark
|
||||
long pubs_mergerel = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")
|
||||
.as(Encoders.bean(Relation.class))
|
||||
|
@ -245,7 +250,7 @@ public class SparkDedupTest implements Serializable {
|
|||
.select("source", "cnt")
|
||||
.where("cnt > 3")
|
||||
.count();
|
||||
final long sw_mergerel = spark
|
||||
long sw_mergerel = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
|
||||
.as(Encoders.bean(Relation.class))
|
||||
|
@ -256,7 +261,7 @@ public class SparkDedupTest implements Serializable {
|
|||
.where("cnt > 3")
|
||||
.count();
|
||||
|
||||
final long ds_mergerel = spark
|
||||
long ds_mergerel = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
|
||||
.as(Encoders.bean(Relation.class))
|
||||
|
@ -267,7 +272,7 @@ public class SparkDedupTest implements Serializable {
|
|||
.where("cnt > 3")
|
||||
.count();
|
||||
|
||||
final long orp_mergerel = spark
|
||||
long orp_mergerel = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")
|
||||
.as(Encoders.bean(Relation.class))
|
||||
|
@ -296,14 +301,16 @@ public class SparkDedupTest implements Serializable {
|
|||
@Order(3)
|
||||
public void createMergeRelsTest() throws Exception {
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
SparkCreateMergeRels.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
|
||||
|
||||
parser
|
||||
.parseArgument(new String[] {
|
||||
.parseArgument(
|
||||
new String[] {
|
||||
"-i",
|
||||
testGraphBasePath,
|
||||
"-asi",
|
||||
|
@ -316,24 +323,24 @@ public class SparkDedupTest implements Serializable {
|
|||
|
||||
new SparkCreateMergeRels(parser, spark).run(isLookUpService);
|
||||
|
||||
final long orgs_mergerel = spark
|
||||
long orgs_mergerel = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
|
||||
.count();
|
||||
final long pubs_mergerel = spark
|
||||
long pubs_mergerel = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")
|
||||
.count();
|
||||
final long sw_mergerel = spark
|
||||
long sw_mergerel = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
|
||||
.count();
|
||||
final long ds_mergerel = spark
|
||||
long ds_mergerel = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
|
||||
.count();
|
||||
|
||||
final long orp_mergerel = spark
|
||||
long orp_mergerel = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")
|
||||
.count();
|
||||
|
@ -350,13 +357,15 @@ public class SparkDedupTest implements Serializable {
|
|||
@Order(4)
|
||||
public void createDedupRecordTest() throws Exception {
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
SparkCreateDedupRecord.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json")));
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json")));
|
||||
parser
|
||||
.parseArgument(new String[] {
|
||||
.parseArgument(
|
||||
new String[] {
|
||||
"-i",
|
||||
testGraphBasePath,
|
||||
"-asi",
|
||||
|
@ -369,20 +378,19 @@ public class SparkDedupTest implements Serializable {
|
|||
|
||||
new SparkCreateDedupRecord(parser, spark).run(isLookUpService);
|
||||
|
||||
final long orgs_deduprecord = jsc
|
||||
long orgs_deduprecord = jsc
|
||||
.textFile(testOutputBasePath + "/" + testActionSetId + "/organization_deduprecord")
|
||||
.count();
|
||||
final long pubs_deduprecord = jsc
|
||||
long pubs_deduprecord = jsc
|
||||
.textFile(testOutputBasePath + "/" + testActionSetId + "/publication_deduprecord")
|
||||
.count();
|
||||
final long sw_deduprecord = jsc
|
||||
long sw_deduprecord = jsc
|
||||
.textFile(testOutputBasePath + "/" + testActionSetId + "/software_deduprecord")
|
||||
.count();
|
||||
final long ds_deduprecord = jsc
|
||||
.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_deduprecord")
|
||||
.count();
|
||||
final long orp_deduprecord = jsc
|
||||
.textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord")
|
||||
long ds_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_deduprecord").count();
|
||||
long orp_deduprecord = jsc
|
||||
.textFile(
|
||||
testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord")
|
||||
.count();
|
||||
|
||||
assertEquals(85, orgs_deduprecord);
|
||||
|
@ -396,27 +404,29 @@ public class SparkDedupTest implements Serializable {
|
|||
@Order(5)
|
||||
public void updateEntityTest() throws Exception {
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
SparkUpdateEntity.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")));
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")));
|
||||
parser
|
||||
.parseArgument(new String[] {
|
||||
.parseArgument(
|
||||
new String[] {
|
||||
"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath
|
||||
});
|
||||
|
||||
new SparkUpdateEntity(parser, spark).run(isLookUpService);
|
||||
|
||||
final long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count();
|
||||
final long publications = jsc.textFile(testDedupGraphBasePath + "/publication").count();
|
||||
final long projects = jsc.textFile(testDedupGraphBasePath + "/project").count();
|
||||
final long datasource = jsc.textFile(testDedupGraphBasePath + "/datasource").count();
|
||||
final long softwares = jsc.textFile(testDedupGraphBasePath + "/software").count();
|
||||
final long dataset = jsc.textFile(testDedupGraphBasePath + "/dataset").count();
|
||||
final long otherresearchproduct = jsc.textFile(testDedupGraphBasePath + "/otherresearchproduct").count();
|
||||
long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count();
|
||||
long publications = jsc.textFile(testDedupGraphBasePath + "/publication").count();
|
||||
long projects = jsc.textFile(testDedupGraphBasePath + "/project").count();
|
||||
long datasource = jsc.textFile(testDedupGraphBasePath + "/datasource").count();
|
||||
long softwares = jsc.textFile(testDedupGraphBasePath + "/software").count();
|
||||
long dataset = jsc.textFile(testDedupGraphBasePath + "/dataset").count();
|
||||
long otherresearchproduct = jsc.textFile(testDedupGraphBasePath + "/otherresearchproduct").count();
|
||||
|
||||
final long mergedOrgs = spark
|
||||
long mergedOrgs = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
|
||||
.as(Encoders.bean(Relation.class))
|
||||
|
@ -426,7 +436,7 @@ public class SparkDedupTest implements Serializable {
|
|||
.distinct()
|
||||
.count();
|
||||
|
||||
final long mergedPubs = spark
|
||||
long mergedPubs = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")
|
||||
.as(Encoders.bean(Relation.class))
|
||||
|
@ -436,7 +446,7 @@ public class SparkDedupTest implements Serializable {
|
|||
.distinct()
|
||||
.count();
|
||||
|
||||
final long mergedSw = spark
|
||||
long mergedSw = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
|
||||
.as(Encoders.bean(Relation.class))
|
||||
|
@ -446,7 +456,7 @@ public class SparkDedupTest implements Serializable {
|
|||
.distinct()
|
||||
.count();
|
||||
|
||||
final long mergedDs = spark
|
||||
long mergedDs = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
|
||||
.as(Encoders.bean(Relation.class))
|
||||
|
@ -456,7 +466,7 @@ public class SparkDedupTest implements Serializable {
|
|||
.distinct()
|
||||
.count();
|
||||
|
||||
final long mergedOrp = spark
|
||||
long mergedOrp = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")
|
||||
.as(Encoders.bean(Relation.class))
|
||||
|
@ -474,27 +484,27 @@ public class SparkDedupTest implements Serializable {
|
|||
assertEquals(389, dataset);
|
||||
assertEquals(517, otherresearchproduct);
|
||||
|
||||
final long deletedOrgs = jsc
|
||||
long deletedOrgs = jsc
|
||||
.textFile(testDedupGraphBasePath + "/organization")
|
||||
.filter(this::isDeletedByInference)
|
||||
.count();
|
||||
|
||||
final long deletedPubs = jsc
|
||||
long deletedPubs = jsc
|
||||
.textFile(testDedupGraphBasePath + "/publication")
|
||||
.filter(this::isDeletedByInference)
|
||||
.count();
|
||||
|
||||
final long deletedSw = jsc
|
||||
long deletedSw = jsc
|
||||
.textFile(testDedupGraphBasePath + "/software")
|
||||
.filter(this::isDeletedByInference)
|
||||
.count();
|
||||
|
||||
final long deletedDs = jsc
|
||||
long deletedDs = jsc
|
||||
.textFile(testDedupGraphBasePath + "/dataset")
|
||||
.filter(this::isDeletedByInference)
|
||||
.count();
|
||||
|
||||
final long deletedOrp = jsc
|
||||
long deletedOrp = jsc
|
||||
.textFile(testDedupGraphBasePath + "/otherresearchproduct")
|
||||
.filter(this::isDeletedByInference)
|
||||
.count();
|
||||
|
@ -510,19 +520,21 @@ public class SparkDedupTest implements Serializable {
|
|||
@Order(6)
|
||||
public void propagateRelationTest() throws Exception {
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
SparkPropagateRelation.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json")));
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json")));
|
||||
parser
|
||||
.parseArgument(new String[] {
|
||||
.parseArgument(
|
||||
new String[] {
|
||||
"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath
|
||||
});
|
||||
|
||||
new SparkPropagateRelation(parser, spark).run(isLookUpService);
|
||||
|
||||
final long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
|
||||
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
|
||||
|
||||
assertEquals(4862, relations);
|
||||
|
||||
|
@ -536,9 +548,10 @@ public class SparkDedupTest implements Serializable {
|
|||
.select(mergeRels.col("target"))
|
||||
.distinct()
|
||||
.toJavaRDD()
|
||||
.mapToPair((PairFunction<Row, String, String>) r -> new Tuple2<>(r.getString(0), "d"));
|
||||
.mapToPair(
|
||||
(PairFunction<Row, String, String>) r -> new Tuple2<String, String>(r.getString(0), "d"));
|
||||
|
||||
final JavaRDD<String> toCheck = jsc
|
||||
JavaRDD<String> toCheck = jsc
|
||||
.textFile(testDedupGraphBasePath + "/relation")
|
||||
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json))
|
||||
.join(mergedIds)
|
||||
|
@ -547,8 +560,8 @@ public class SparkDedupTest implements Serializable {
|
|||
.join(mergedIds)
|
||||
.map(t -> t._2()._1());
|
||||
|
||||
final long deletedbyinference = toCheck.filter(this::isDeletedByInference).count();
|
||||
final long updated = toCheck.count();
|
||||
long deletedbyinference = toCheck.filter(this::isDeletedByInference).count();
|
||||
long updated = toCheck.count();
|
||||
|
||||
assertEquals(updated, deletedbyinference);
|
||||
}
|
||||
|
@ -560,8 +573,8 @@ public class SparkDedupTest implements Serializable {
|
|||
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);
|
||||
}
|
||||
|
||||
private void testUniqueness(final String path, final int expected_total, final int expected_unique) {
|
||||
final Dataset<Relation> rel = spark
|
||||
private void testUniqueness(String path, int expected_total, int expected_unique) {
|
||||
Dataset<Relation> rel = spark
|
||||
.read()
|
||||
.textFile(getClass().getResource(path).getPath())
|
||||
.map(
|
||||
|
@ -578,8 +591,7 @@ public class SparkDedupTest implements Serializable {
|
|||
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
|
||||
}
|
||||
|
||||
public boolean isDeletedByInference(final String s) {
|
||||
public boolean isDeletedByInference(String s) {
|
||||
return s.contains("\"deletedbyinference\":true");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue