Merge branch 'master' of code-repo.d4science.org:D-Net/dnet-hadoop

This commit is contained in:
miconis 2020-07-13 18:22:45 +02:00
commit 10e08ccf45
17 changed files with 335 additions and 95 deletions

View File

@ -59,7 +59,6 @@
<dependency> <dependency>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dnet-openaire-broker-common</artifactId> <artifactId>dnet-openaire-broker-common</artifactId>
<version>[3.0.0,)</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -0,0 +1,57 @@
package eu.dnetlib.dhp.oa.dedup;
import java.util.Objects;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class RelationAggregator extends Aggregator<Relation, Relation, Relation> {
private static Relation ZERO = new Relation();
@Override
public Relation zero() {
return ZERO;
}
@Override
public Relation reduce(Relation b, Relation a) {
return mergeRel(b, a);
}
@Override
public Relation merge(Relation b, Relation a) {
return mergeRel(b, a);
}
@Override
public Relation finish(Relation r) {
return r;
}
private Relation mergeRel(Relation b, Relation a) {
if (Objects.equals(b, ZERO)) {
return a;
}
if (Objects.equals(a, ZERO)) {
return b;
}
b.mergeFrom(a);
return b;
}
@Override
public Encoder<Relation> bufferEncoder() {
return Encoders.kryo(Relation.class);
}
@Override
public Encoder<Relation> outputEncoder() {
return Encoders.kryo(Relation.class);
}
}

View File

@ -5,11 +5,13 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.graphx.Edge; import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD; import org.apache.spark.rdd.RDD;
@ -75,7 +77,11 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
final String isLookUpUrl = parser.get("isLookUpUrl"); final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId"); final String actionSetId = parser.get("actionSetId");
int cut = Optional
.ofNullable(parser.get("cutConnectedComponent"))
.map(Integer::valueOf)
.orElse(0);
log.info("connected component cut: '{}'", cut);
log.info("graphBasePath: '{}'", graphBasePath); log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl); log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId); log.info("actionSetId: '{}'", actionSetId);
@ -100,8 +106,10 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final RDD<Edge<String>> edgeRdd = spark final RDD<Edge<String>> edgeRdd = spark
.read() .read()
.load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)) .textFile(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity))
.as(Encoders.bean(Relation.class)) .map(
(MapFunction<String, Relation>) r -> OBJECT_MAPPER.readValue(r, Relation.class),
Encoders.bean(Relation.class))
.javaRDD() .javaRDD()
.map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass())) .map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass()))
.rdd(); .rdd();
@ -109,7 +117,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final Dataset<Relation> mergeRels = spark final Dataset<Relation> mergeRels = spark
.createDataset( .createDataset(
GraphProcessor GraphProcessor
.findCCs(vertexes.rdd(), edgeRdd, maxIterations) .findCCs(vertexes.rdd(), edgeRdd, maxIterations, cut)
.toJavaRDD() .toJavaRDD()
.filter(k -> k.getDocIds().size() > 1) .filter(k -> k.getDocIds().size() > 1)
.flatMap(cc -> ccToMergeRel(cc, dedupConf)) .flatMap(cc -> ccToMergeRel(cc, dedupConf))
@ -117,6 +125,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
Encoders.bean(Relation.class)); Encoders.bean(Relation.class));
mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath); mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath);
} }
} }

View File

@ -2,6 +2,7 @@
package eu.dnetlib.dhp.oa.dedup; package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException; import java.io.IOException;
import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -34,6 +35,8 @@ public class SparkCreateSimRels extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class); private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class);
public static final int NUM_PARTITIONS = 1000;
public SparkCreateSimRels(ArgumentApplicationParser parser, SparkSession spark) { public SparkCreateSimRels(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark); super(parser, spark);
} }
@ -48,13 +51,6 @@ public class SparkCreateSimRels extends AbstractSparkAction {
parser.parseArgument(args); parser.parseArgument(args);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf
.registerKryoClasses(
new Class[] {
MapDocument.class, FieldListImpl.class, FieldValueImpl.class, Block.class
});
new SparkCreateSimRels(parser, getSparkSession(conf)) new SparkCreateSimRels(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
} }
@ -68,7 +64,12 @@ public class SparkCreateSimRels extends AbstractSparkAction {
final String isLookUpUrl = parser.get("isLookUpUrl"); final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId"); final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
final int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(NUM_PARTITIONS);
log.info("numPartitions: '{}'", numPartitions);
log.info("graphBasePath: '{}'", graphBasePath); log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl); log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId); log.info("actionSetId: '{}'", actionSetId);
@ -88,6 +89,7 @@ public class SparkCreateSimRels extends AbstractSparkAction {
JavaPairRDD<String, MapDocument> mapDocuments = sc JavaPairRDD<String, MapDocument> mapDocuments = sc
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
.repartition(numPartitions)
.mapToPair( .mapToPair(
(PairFunction<String, String, MapDocument>) s -> { (PairFunction<String, String, MapDocument>) s -> {
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
@ -95,19 +97,17 @@ public class SparkCreateSimRels extends AbstractSparkAction {
}); });
// create blocks for deduplication // create blocks for deduplication
JavaPairRDD<String, Block> blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf); JavaPairRDD<String, Block> blocks = Deduper
.createSortedBlocks(mapDocuments, dedupConf)
.repartition(numPartitions);
// create relations by comparing only elements in the same group // create relations by comparing only elements in the same group
JavaRDD<Relation> relations = Deduper Deduper
.computeRelations(sc, blocks, dedupConf) .computeRelations(sc, blocks, dedupConf)
.map(t -> createSimRel(t._1(), t._2(), entity)); .map(t -> createSimRel(t._1(), t._2(), entity))
.repartition(numPartitions)
// save the simrel in the workingdir .map(r -> OBJECT_MAPPER.writeValueAsString(r))
spark .saveAsTextFile(outputPath);
.createDataset(relations.rdd(), Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Append)
.save(outputPath);
} }
} }

View File

@ -4,12 +4,16 @@ package eu.dnetlib.dhp.oa.dedup;
import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.col;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.DataInfo;
@ -95,7 +99,24 @@ public class SparkPropagateRelation extends AbstractSparkAction {
FieldType.TARGET, FieldType.TARGET,
getDeletedFn()); getDeletedFn());
save(newRels.union(updated).union(mergeRels), outputRelationPath, SaveMode.Overwrite); save(
distinctRelations(
newRels
.union(updated)
.union(mergeRels)
.map((MapFunction<Relation, Relation>) r -> r, Encoders.kryo(Relation.class))),
outputRelationPath, SaveMode.Overwrite);
}
private Dataset<Relation> distinctRelations(Dataset<Relation> rels) {
return rels
.filter(getRelationFilterFunction())
.groupByKey(
(MapFunction<Relation, String>) r -> String
.join(r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()),
Encoders.STRING())
.agg(new RelationAggregator().toColumn())
.map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class));
} }
private static Dataset<Relation> processDataset( private static Dataset<Relation> processDataset(
@ -112,6 +133,14 @@ public class SparkPropagateRelation extends AbstractSparkAction {
.map(mapFn, Encoders.bean(Relation.class)); .map(mapFn, Encoders.bean(Relation.class));
} }
private FilterFunction<Relation> getRelationFilterFunction() {
return (FilterFunction<Relation>) r -> StringUtils.isNotBlank(r.getSource()) ||
StringUtils.isNotBlank(r.getTarget()) ||
StringUtils.isNotBlank(r.getRelClass()) ||
StringUtils.isNotBlank(r.getSubRelType()) ||
StringUtils.isNotBlank(r.getRelClass());
}
private static MapFunction<String, Relation> patchRelFn() { private static MapFunction<String, Relation> patchRelFn() {
return value -> { return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);

View File

@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.dedup.graph;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.codehaus.jackson.annotate.JsonIgnore; import org.codehaus.jackson.annotate.JsonIgnore;
@ -18,12 +19,17 @@ public class ConnectedComponent implements Serializable {
private Set<String> docIds; private Set<String> docIds;
private String ccId; private String ccId;
public ConnectedComponent() { public ConnectedComponent(Set<String> docIds, final int cut) {
}
public ConnectedComponent(Set<String> docIds) {
this.docIds = docIds; this.docIds = docIds;
createID(); createID();
if (cut > 0 && docIds.size() > cut) {
this.docIds = docIds
.stream()
.filter(s -> !ccId.equalsIgnoreCase(s))
.limit(cut - 1)
.collect(Collectors.toSet());
this.docIds.add(ccId);
}
} }
public String createID() { public String createID() {
@ -41,6 +47,7 @@ public class ConnectedComponent implements Serializable {
public String getMin() { public String getMin() {
final StringBuilder min = new StringBuilder(); final StringBuilder min = new StringBuilder();
docIds docIds
.forEach( .forEach(
i -> { i -> {

View File

@ -7,7 +7,7 @@ import scala.collection.JavaConversions;
object GraphProcessor { object GraphProcessor {
def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int): RDD[ConnectedComponent] = { def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = {
val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
val cc = graph.connectedComponents(maxIterations).vertices val cc = graph.connectedComponents(maxIterations).vertices
@ -22,15 +22,15 @@ object GraphProcessor {
} }
} }
val connectedComponents = joinResult.groupByKey() val connectedComponents = joinResult.groupByKey()
.map[ConnectedComponent](cc => asConnectedComponent(cc)) .map[ConnectedComponent](cc => asConnectedComponent(cc, cut))
connectedComponents connectedComponents
} }
def asConnectedComponent(group: (VertexId, Iterable[String])): ConnectedComponent = { def asConnectedComponent(group: (VertexId, Iterable[String]), cut:Int): ConnectedComponent = {
val docs = group._2.toSet[String] val docs = group._2.toSet[String]
val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs)); val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs), cut);
connectedComponent connectedComponent
} }

View File

@ -17,6 +17,12 @@
"paramDescription": "the url for the lookup service", "paramDescription": "the url for the lookup service",
"paramRequired": true "paramRequired": true
}, },
{
"paramName": "cc",
"paramLongName": "cutConnectedComponent",
"paramDescription": "the number of maximum elements that belongs to a connected components",
"paramRequired": false
},
{ {
"paramName": "w", "paramName": "w",
"paramLongName": "workingPath", "paramLongName": "workingPath",

View File

@ -22,5 +22,11 @@
"paramLongName": "workingPath", "paramLongName": "workingPath",
"paramDescription": "path of the working directory", "paramDescription": "path of the working directory",
"paramRequired": true "paramRequired": true
},
{
"paramName": "np",
"paramLongName": "numPartitions",
"paramDescription": "number of partitions for the similarity relations intermediate phases",
"paramRequired": false
} }
] ]

View File

@ -20,6 +20,10 @@
<name>dedupGraphPath</name> <name>dedupGraphPath</name>
<description>path for the output graph</description> <description>path for the output graph</description>
</property> </property>
<property>
<name>cutConnectedComponent</name>
<description>max number of elements in a connected component</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -106,10 +110,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--i</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg> <arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--w</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--numPartitions</arg><arg>8000</arg>
</spark> </spark>
<ok to="CreateMergeRel"/> <ok to="CreateMergeRel"/>
<error to="Kill"/> <error to="Kill"/>
@ -132,10 +137,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--i</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg> <arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--cutConnectedComponent</arg><arg>${cutConnectedComponent}</arg>
</spark> </spark>
<ok to="CreateDedupRecord"/> <ok to="CreateDedupRecord"/>
<error to="Kill"/> <error to="Kill"/>
@ -158,10 +164,10 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--i</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg> <arg>--actionSetId</arg><arg>${actionSetId}</arg>
</spark> </spark>
<ok to="UpdateEntity"/> <ok to="UpdateEntity"/>
<error to="Kill"/> <error to="Kill"/>
@ -184,9 +190,9 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--i</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--o</arg><arg>${dedupGraphPath}</arg> <arg>--dedupGraphPath</arg><arg>${dedupGraphPath}</arg>
</spark> </spark>
<ok to="copyRelations"/> <ok to="copyRelations"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -1,17 +1,17 @@
[ [
{ {
"paramName": "i", "paramName": "i",
"paramLongName": "graphBasePath", "paramLongName": "graphBasePath",
"paramDescription": "the base path of raw graph", "paramDescription": "the base path of raw graph",
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "w", "paramName": "w",
"paramLongName": "workingPath", "paramLongName": "workingPath",
"paramDescription": "the working directory path", "paramDescription": "the working directory path",
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "o", "paramName": "o",
"paramLongName": "dedupGraphPath", "paramLongName": "dedupGraphPath",
"paramDescription": "the path of the dedup graph", "paramDescription": "the path of the dedup graph",

View File

@ -3,6 +3,8 @@ package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory; import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.lenient;
@ -11,6 +13,9 @@ import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -18,6 +23,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -71,11 +77,13 @@ public class SparkDedupTest implements Serializable {
FileUtils.deleteDirectory(new File(testOutputBasePath)); FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
final SparkConf conf = new SparkConf();
conf.set("spark.sql.shuffle.partitions", "200");
spark = SparkSession spark = SparkSession
.builder() .builder()
.appName(SparkDedupTest.class.getSimpleName()) .appName(SparkDedupTest.class.getSimpleName())
.master("local[*]") .master("local[*]")
.config(new SparkConf()) .config(conf)
.getOrCreate(); .getOrCreate();
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -152,37 +160,42 @@ public class SparkDedupTest implements Serializable {
parser parser
.parseArgument( .parseArgument(
new String[] { new String[] {
"-i", "-i", testGraphBasePath,
testGraphBasePath, "-asi", testActionSetId,
"-asi", "-la", "lookupurl",
testActionSetId, "-w", testOutputBasePath,
"-la", "-np", "50"
"lookupurl",
"-w",
testOutputBasePath
}); });
new SparkCreateSimRels(parser, spark).run(isLookUpService); new SparkCreateSimRels(parser, spark).run(isLookUpService);
long orgs_simrel = spark long orgs_simrel = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel") .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
.count(); .count();
long pubs_simrel = spark long pubs_simrel = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel") .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
.count(); .count();
long sw_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/software_simrel").count();
long ds_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel").count(); long sw_simrel = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
.count();
long ds_simrel = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
.count();
long orp_simrel = spark long orp_simrel = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel") .textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
.count(); .count();
assertEquals(3432, orgs_simrel); assertEquals(3432, orgs_simrel);
assertEquals(7054, pubs_simrel); assertEquals(7152, pubs_simrel);
assertEquals(344, sw_simrel); assertEquals(344, sw_simrel);
assertEquals(458, ds_simrel); assertEquals(458, ds_simrel);
assertEquals(6750, orp_simrel); assertEquals(6750, orp_simrel);
@ -190,6 +203,101 @@ public class SparkDedupTest implements Serializable {
@Test @Test
@Order(2) @Order(2)
public void cutMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateMergeRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
parser
.parseArgument(
new String[] {
"-i",
testGraphBasePath,
"-asi",
testActionSetId,
"-la",
"lookupurl",
"-w",
testOutputBasePath,
"-cc",
"3"
});
new SparkCreateMergeRels(parser, spark).run(isLookUpService);
long orgs_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
long pubs_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
long sw_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
long ds_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
long orp_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
assertEquals(0, orgs_mergerel);
assertEquals(0, pubs_mergerel);
assertEquals(0, sw_mergerel);
assertEquals(0, ds_mergerel);
assertEquals(0, orp_mergerel);
FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel"));
FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel"));
FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/software_mergerel"));
FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel"));
FileUtils
.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel"));
}
@Test
@Order(3)
public void createMergeRelsTest() throws Exception { public void createMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -225,8 +333,10 @@ public class SparkDedupTest implements Serializable {
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
.count(); .count();
long ds_mergerel = spark
long ds_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel").count(); .read()
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
.count();
long orp_mergerel = spark long orp_mergerel = spark
.read() .read()
@ -234,14 +344,14 @@ public class SparkDedupTest implements Serializable {
.count(); .count();
assertEquals(1276, orgs_mergerel); assertEquals(1276, orgs_mergerel);
assertEquals(1440, pubs_mergerel); assertEquals(1442, pubs_mergerel);
assertEquals(288, sw_mergerel); assertEquals(288, sw_mergerel);
assertEquals(472, ds_mergerel); assertEquals(472, ds_mergerel);
assertEquals(718, orp_mergerel); assertEquals(718, orp_mergerel);
} }
@Test @Test
@Order(3) @Order(4)
public void createDedupRecordTest() throws Exception { public void createDedupRecordTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -288,7 +398,7 @@ public class SparkDedupTest implements Serializable {
} }
@Test @Test
@Order(4) @Order(5)
public void updateEntityTest() throws Exception { public void updateEntityTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -404,7 +514,7 @@ public class SparkDedupTest implements Serializable {
} }
@Test @Test
@Order(5) @Order(6)
public void propagateRelationTest() throws Exception { public void propagateRelationTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -423,7 +533,7 @@ public class SparkDedupTest implements Serializable {
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
assertEquals(4971, relations); assertEquals(4866, relations);
// check deletedbyinference // check deletedbyinference
final Dataset<Relation> mergeRels = spark final Dataset<Relation> mergeRels = spark
@ -454,7 +564,7 @@ public class SparkDedupTest implements Serializable {
} }
@Test @Test
@Order(6) @Order(7)
public void testRelations() throws Exception { public void testRelations() throws Exception {
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10); testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10);
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2); testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);

View File

@ -6,10 +6,10 @@
"subEntityType" : "resulttype", "subEntityType" : "resulttype",
"subEntityValue" : "dataset", "subEntityValue" : "dataset",
"orderField" : "title", "orderField" : "title",
"queueMaxSize" : "800", "queueMaxSize" : "100",
"groupMaxSize" : "100", "groupMaxSize" : "100",
"maxChildren" : "100", "maxChildren" : "100",
"slidingWindowSize" : "80", "slidingWindowSize" : "100",
"rootBuilder" : ["result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ], "rootBuilder" : ["result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true", "includeChildren" : "true",
"idPath" : "$.id", "idPath" : "$.id",
@ -17,7 +17,8 @@
}, },
"pace" : { "pace" : {
"clustering" : [ "clustering" : [
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
], ],
"decisionTree" : { "decisionTree" : {

View File

@ -6,10 +6,10 @@
"subEntityType" : "resulttype", "subEntityType" : "resulttype",
"subEntityValue" : "otherresearchproduct", "subEntityValue" : "otherresearchproduct",
"orderField" : "title", "orderField" : "title",
"queueMaxSize" : "800", "queueMaxSize" : "100",
"groupMaxSize" : "100", "groupMaxSize" : "100",
"maxChildren" : "100", "maxChildren" : "100",
"slidingWindowSize" : "80", "slidingWindowSize" : "100",
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ], "rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true", "includeChildren" : "true",
"idPath" : "$.id", "idPath" : "$.id",
@ -17,7 +17,8 @@
}, },
"pace" : { "pace" : {
"clustering" : [ "clustering" : [
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
], ],
"decisionTree" : { "decisionTree" : {

View File

@ -6,10 +6,10 @@
"subEntityType": "resulttype", "subEntityType": "resulttype",
"subEntityValue": "publication", "subEntityValue": "publication",
"orderField": "title", "orderField": "title",
"queueMaxSize": "800", "queueMaxSize": "100",
"groupMaxSize": "100", "groupMaxSize": "100",
"maxChildren": "100", "maxChildren": "100",
"slidingWindowSize": "80", "slidingWindowSize": "100",
"rootBuilder": [ "rootBuilder": [
"result", "result",
"resultProject_outcome_isProducedBy", "resultProject_outcome_isProducedBy",
@ -29,7 +29,8 @@
}, },
"pace": { "pace": {
"clustering" : [ "clustering" : [
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
], ],
"decisionTree": { "decisionTree": {

View File

@ -6,10 +6,10 @@
"subEntityType" : "resulttype", "subEntityType" : "resulttype",
"subEntityValue" : "software", "subEntityValue" : "software",
"orderField" : "title", "orderField" : "title",
"queueMaxSize" : "800", "queueMaxSize" : "100",
"groupMaxSize" : "100", "groupMaxSize" : "100",
"maxChildren" : "100", "maxChildren" : "100",
"slidingWindowSize" : "80", "slidingWindowSize" : "100",
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ], "rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true", "includeChildren" : "true",
"idPath" : "$.id", "idPath" : "$.id",
@ -17,8 +17,9 @@
}, },
"pace" : { "pace" : {
"clustering" : [ "clustering" : [
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "lowercase", "fields" : [ "doi", "url" ], "params" : { } } { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
], ],
"decisionTree": { "decisionTree": {
"start": { "start": {

View File

@ -323,6 +323,12 @@
<version>[2.0.0,3.0.0)</version> <version>[2.0.0,3.0.0)</version>
</dependency> </dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dnet-openaire-broker-common</artifactId>
<version>${dnet.openaire.broker.common}</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.cxf</groupId> <groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http</artifactId> <artifactId>cxf-rt-transports-http</artifactId>
@ -618,5 +624,6 @@
<mockito-core.version>3.3.3</mockito-core.version> <mockito-core.version>3.3.3</mockito-core.version>
<mongodb.driver.version>3.4.2</mongodb.driver.version> <mongodb.driver.version>3.4.2</mongodb.driver.version>
<vtd.version>[2.12,3.0)</vtd.version> <vtd.version>[2.12,3.0)</vtd.version>
<dnet.openaire.broker.common>3.0.0</dnet.openaire.broker.common>
</properties> </properties>
</project> </project>