forked from antonis.lempesis/dnet-hadoop
implementation of the workflow for entity update and for relations update
This commit is contained in:
parent
6d879e2ee1
commit
e16e644faf
|
@ -122,6 +122,10 @@ public class DedupUtility {
|
|||
});
|
||||
}
|
||||
|
||||
public static String createDedupRecordPath(final String basePath, final String actionSetId, final String entityType) {
|
||||
return String.format("%s/%s/%s_deduprecord", basePath, actionSetId, entityType);
|
||||
}
|
||||
|
||||
public static String createEntityPath(final String basePath, final String entityType) {
|
||||
return String.format("%s/%s", basePath, entityType);
|
||||
}
|
||||
|
|
|
@ -5,9 +5,11 @@ import eu.dnetlib.dedup.graph.ConnectedComponent;
|
|||
import eu.dnetlib.dedup.graph.GraphProcessor;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
|
@ -18,6 +20,7 @@ import org.apache.spark.rdd.RDD;
|
|||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.dom4j.DocumentException;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -26,51 +29,72 @@ import java.util.List;
|
|||
public class SparkCreateConnectedComponent {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createSimRels_parameters.json")));
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createCC_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
.appName(SparkCreateConnectedComponent.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.getOrCreate();
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
final String inputPath = parser.get("sourcePath");
|
||||
final String entity = parser.get("entity");
|
||||
final String targetPath = parser.get("targetPath");
|
||||
// final DedupConfig dedupConf = DedupConfig.load(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf2.json")));
|
||||
final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
|
||||
new SparkCreateConnectedComponent().run(parser);
|
||||
}
|
||||
|
||||
final JavaPairRDD<Object, String> vertexes = sc.textFile(inputPath + "/" + entity)
|
||||
.map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s))
|
||||
.mapToPair((PairFunction<String, Object, String>)
|
||||
s -> new Tuple2<Object, String>(getHashcode(s), s)
|
||||
);
|
||||
private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException {
|
||||
|
||||
final Dataset<Relation> similarityRelations = spark.read().load(DedupUtility.createSimRelPath(targetPath, "",entity)).as(Encoders.bean(Relation.class));
|
||||
final RDD<Edge<String>> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd();
|
||||
final JavaRDD<ConnectedComponent> cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD();
|
||||
final Dataset<Relation> mergeRelation = spark.createDataset(cc.filter(k->k.getDocIds().size()>1).flatMap((FlatMapFunction<ConnectedComponent, Relation>) c ->
|
||||
c.getDocIds()
|
||||
.stream()
|
||||
.flatMap(id -> {
|
||||
List<Relation> tmp = new ArrayList<>();
|
||||
Relation r = new Relation();
|
||||
r.setSource(c.getCcId());
|
||||
r.setTarget(id);
|
||||
r.setRelClass("merges");
|
||||
tmp.add(r);
|
||||
r = new Relation();
|
||||
r.setTarget(c.getCcId());
|
||||
r.setSource(id);
|
||||
r.setRelClass("isMergedIn");
|
||||
tmp.add(r);
|
||||
return tmp.stream();
|
||||
}).iterator()).rdd(), Encoders.bean(Relation.class));
|
||||
mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(targetPath,"",entity));
|
||||
final String graphBasePath = parser.get("graphBasePath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
final String isLookUpUrl = parser.get("isLookUpUrl");
|
||||
final String actionSetId = parser.get("actionSetId");
|
||||
|
||||
try (SparkSession spark = getSparkSession(parser)) {
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) {
|
||||
|
||||
final String entity = dedupConf.getWf().getEntityType();
|
||||
final String subEntity = dedupConf.getWf().getSubEntityValue();
|
||||
|
||||
final JavaPairRDD<Object, String> vertexes = sc.textFile(graphBasePath + "/" + subEntity)
|
||||
.map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s))
|
||||
.mapToPair((PairFunction<String, Object, String>)
|
||||
s -> new Tuple2<Object, String>(getHashcode(s), s)
|
||||
);
|
||||
|
||||
final Dataset<Relation> similarityRelations = spark.read().load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)).as(Encoders.bean(Relation.class));
|
||||
final RDD<Edge<String>> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd();
|
||||
final JavaRDD<ConnectedComponent> cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD();
|
||||
final Dataset<Relation> mergeRelation = spark.createDataset(cc.filter(k -> k.getDocIds().size() > 1).flatMap((FlatMapFunction<ConnectedComponent, Relation>) c ->
|
||||
c.getDocIds()
|
||||
.stream()
|
||||
.flatMap(id -> {
|
||||
List<Relation> tmp = new ArrayList<>();
|
||||
Relation r = new Relation();
|
||||
r.setSource(c.getCcId());
|
||||
r.setTarget(id);
|
||||
r.setRelClass("merges");
|
||||
tmp.add(r);
|
||||
r = new Relation();
|
||||
r.setTarget(c.getCcId());
|
||||
r.setSource(id);
|
||||
r.setRelClass("isMergedIn");
|
||||
tmp.add(r);
|
||||
return tmp.stream();
|
||||
}).iterator()).rdd(), Encoders.bean(Relation.class));
|
||||
mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(workingPath, actionSetId, entity));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static long getHashcode(final String id) {
|
||||
return Hashing.murmur3_128().hashUnencodedChars(id).asLong();
|
||||
}
|
||||
|
||||
private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
return SparkSession
|
||||
.builder()
|
||||
.appName(SparkCreateSimRels.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.config(conf)
|
||||
.enableHiveSupport()
|
||||
.getOrCreate();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,100 +0,0 @@
|
|||
package eu.dnetlib.dedup;
|
||||
|
||||
import com.google.common.hash.Hashing;
|
||||
import eu.dnetlib.dedup.graph.ConnectedComponent;
|
||||
import eu.dnetlib.dedup.graph.GraphProcessor;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.api.java.function.PairFunction;
|
||||
import org.apache.spark.graphx.Edge;
|
||||
import org.apache.spark.rdd.RDD;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.dom4j.DocumentException;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class SparkCreateConnectedComponent2 {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createCC_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
new SparkCreateConnectedComponent2().run(parser);
|
||||
}
|
||||
|
||||
private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException {
|
||||
|
||||
final String graphBasePath = parser.get("graphBasePath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
final String isLookUpUrl = parser.get("isLookUpUrl");
|
||||
final String actionSetId = parser.get("actionSetId");
|
||||
|
||||
try (SparkSession spark = getSparkSession(parser)) {
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
|
||||
for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) {
|
||||
final String entity = dedupConf.getWf().getEntityType();
|
||||
final String subEntity = dedupConf.getWf().getSubEntityValue();
|
||||
|
||||
final JavaPairRDD<Object, String> vertexes = sc.textFile(graphBasePath + "/" + subEntity)
|
||||
.map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s))
|
||||
.mapToPair((PairFunction<String, Object, String>)
|
||||
s -> new Tuple2<Object, String>(getHashcode(s), s)
|
||||
);
|
||||
|
||||
final Dataset<Relation> similarityRelations = spark.read().load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)).as(Encoders.bean(Relation.class));
|
||||
final RDD<Edge<String>> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd();
|
||||
final JavaRDD<ConnectedComponent> cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD();
|
||||
final Dataset<Relation> mergeRelation = spark.createDataset(cc.filter(k -> k.getDocIds().size() > 1).flatMap((FlatMapFunction<ConnectedComponent, Relation>) c ->
|
||||
c.getDocIds()
|
||||
.stream()
|
||||
.flatMap(id -> {
|
||||
List<Relation> tmp = new ArrayList<>();
|
||||
Relation r = new Relation();
|
||||
r.setSource(c.getCcId());
|
||||
r.setTarget(id);
|
||||
r.setRelClass("merges");
|
||||
tmp.add(r);
|
||||
r = new Relation();
|
||||
r.setTarget(c.getCcId());
|
||||
r.setSource(id);
|
||||
r.setRelClass("isMergedIn");
|
||||
tmp.add(r);
|
||||
return tmp.stream();
|
||||
}).iterator()).rdd(), Encoders.bean(Relation.class));
|
||||
mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(workingPath, actionSetId, entity));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static long getHashcode(final String id) {
|
||||
return Hashing.murmur3_128().hashUnencodedChars(id).asLong();
|
||||
}
|
||||
|
||||
private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
return SparkSession
|
||||
.builder()
|
||||
.appName(SparkCreateSimRels2.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.config(conf)
|
||||
.enableHiveSupport()
|
||||
.getOrCreate();
|
||||
}
|
||||
}
|
|
@ -3,37 +3,57 @@ package eu.dnetlib.dedup;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.dom4j.DocumentException;
|
||||
|
||||
public class SparkCreateDedupRecord {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedupRecord_parameters.json")));
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createDedupRecord_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
final SparkSession spark = SparkSession
|
||||
|
||||
new SparkCreateDedupRecord().run(parser);
|
||||
}
|
||||
|
||||
private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException {
|
||||
|
||||
final String graphBasePath = parser.get("graphBasePath");
|
||||
final String isLookUpUrl = parser.get("isLookUpUrl");
|
||||
final String actionSetId = parser.get("actionSetId");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
|
||||
try (SparkSession spark = getSparkSession(parser)) {
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) {
|
||||
String subEntity = dedupConf.getWf().getSubEntityValue();
|
||||
|
||||
final JavaRDD<OafEntity> dedupRecord =
|
||||
DedupRecordFactory.createDedupRecord(sc, spark, DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity), DedupUtility.createEntityPath(graphBasePath, subEntity), OafEntityType.valueOf(subEntity), dedupConf);
|
||||
dedupRecord.map(r -> {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
return mapper.writeValueAsString(r);
|
||||
}).saveAsTextFile(DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
return SparkSession
|
||||
.builder()
|
||||
.appName(SparkCreateDedupRecord.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.config(conf)
|
||||
.enableHiveSupport()
|
||||
.getOrCreate();
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
final String sourcePath = parser.get("sourcePath");
|
||||
final String entity = parser.get("entity");
|
||||
final String dedupPath = parser.get("dedupPath");
|
||||
// final DedupConfig dedupConf = DedupConfig.load(IOUtils.toString(SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf2.json")));
|
||||
final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
|
||||
|
||||
final JavaRDD<OafEntity> dedupRecord = DedupRecordFactory.createDedupRecord(sc, spark, DedupUtility.createMergeRelPath(dedupPath,entity), DedupUtility.createEntityPath(sourcePath,entity), OafEntityType.valueOf(entity), dedupConf);
|
||||
dedupRecord.map(r-> {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
return mapper.writeValueAsString(r);
|
||||
}).saveAsTextFile(dedupPath+"/"+entity+"_dedup_record_json");
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -1,71 +1,135 @@
|
|||
package eu.dnetlib.dedup;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import eu.dnetlib.pace.model.MapDocument;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.dom4j.DocumentException;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
public class SparkCreateSimRels implements Serializable {
|
||||
|
||||
/**
|
||||
* This Spark class creates similarity relations between entities, saving result
|
||||
*
|
||||
* param request:
|
||||
* sourcePath
|
||||
* entityType
|
||||
* target Path
|
||||
*/
|
||||
public class SparkCreateSimRels {
|
||||
private static final Log log = LogFactory.getLog(SparkCreateSimRels.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createSimRels_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
final SparkSession spark = SparkSession
|
||||
|
||||
new SparkCreateSimRels().run(parser);
|
||||
}
|
||||
|
||||
private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException {
|
||||
|
||||
//read oozie parameters
|
||||
final String graphBasePath = parser.get("graphBasePath");
|
||||
final String rawSet = parser.get("rawSet");
|
||||
final String isLookUpUrl = parser.get("isLookUpUrl");
|
||||
final String actionSetId = parser.get("actionSetId");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
|
||||
try (SparkSession spark = getSparkSession(parser)) {
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
//create empty sequenceFile for the accumulation
|
||||
JavaRDD<Tuple2<Text,Text>> simRel = sc.emptyRDD();
|
||||
|
||||
//for each dedup configuration
|
||||
for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) {
|
||||
final String entity = dedupConf.getWf().getEntityType();
|
||||
final String subEntity = dedupConf.getWf().getSubEntityValue();
|
||||
|
||||
JavaPairRDD<String, MapDocument> mapDocument = sc.textFile(graphBasePath + "/" + subEntity)
|
||||
.mapToPair(s -> {
|
||||
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
|
||||
return new Tuple2<>(d.getIdentifier(), d);
|
||||
});
|
||||
|
||||
//create blocks for deduplication
|
||||
JavaPairRDD<String, List<MapDocument>> blocks = Deduper.createsortedBlocks(sc, mapDocument, dedupConf);
|
||||
|
||||
//create relations by comparing only elements in the same group
|
||||
final JavaPairRDD<String, String> dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf);
|
||||
|
||||
JavaRDD<Relation> relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2(), entity));
|
||||
|
||||
//save the simrel in the workingdir
|
||||
spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class)).write().mode("overwrite").save( DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity));
|
||||
|
||||
//create atomic actions
|
||||
JavaRDD<Tuple2<Text, Text>> newSimRels = relationsRDD
|
||||
.map(this::createSequenceFileRow);
|
||||
|
||||
simRel = simRel.union(newSimRels);
|
||||
}
|
||||
|
||||
simRel.mapToPair(r -> r)
|
||||
.saveAsHadoopFile(rawSet, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public Tuple2<Text, Text> createSequenceFileRow(Relation relation) throws JsonProcessingException {
|
||||
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
String id = relation.getSource() + "@" + relation.getRelClass() + "@" + relation.getTarget();
|
||||
AtomicAction<Relation> aa = new AtomicAction<>(Relation.class, relation);
|
||||
|
||||
return new Tuple2<>(
|
||||
new Text(id),
|
||||
new Text(mapper.writeValueAsString(aa))
|
||||
);
|
||||
}
|
||||
|
||||
public Relation createSimRel(String source, String target, String entity){
|
||||
final Relation r = new Relation();
|
||||
r.setSource(source);
|
||||
r.setTarget(target);
|
||||
|
||||
switch(entity){
|
||||
case "result":
|
||||
r.setRelClass("resultResult_dedupSimilarity_isSimilarTo");
|
||||
break;
|
||||
case "organization":
|
||||
r.setRelClass("organizationOrganization_dedupSimilarity_isSimilarTo");
|
||||
break;
|
||||
default:
|
||||
r.setRelClass("isSimilarTo");
|
||||
break;
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
return SparkSession
|
||||
.builder()
|
||||
.appName(SparkCreateSimRels.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.config(conf)
|
||||
.enableHiveSupport()
|
||||
.getOrCreate();
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
final String inputPath = parser.get("sourcePath");
|
||||
final String entity = parser.get("entity");
|
||||
final String targetPath = parser.get("targetPath");
|
||||
// final DedupConfig dedupConf = DedupConfig.load(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
|
||||
final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
|
||||
|
||||
final long total = sc.textFile(inputPath + "/" + entity).count();
|
||||
|
||||
JavaPairRDD<String, MapDocument> mapDocument = sc.textFile(inputPath + "/" + entity)
|
||||
.mapToPair(s->{
|
||||
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf,s);
|
||||
return new Tuple2<>(d.getIdentifier(), d);});
|
||||
|
||||
//create blocks for deduplication
|
||||
JavaPairRDD<String, List<MapDocument>> blocks = Deduper.createsortedBlocks(sc, mapDocument, dedupConf);
|
||||
// JavaPairRDD<String, Iterable<MapDocument>> blocks = Deduper.createBlocks(sc, mapDocument, dedupConf);
|
||||
|
||||
//create relations by comparing only elements in the same group
|
||||
final JavaPairRDD<String,String> dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf);
|
||||
// final JavaPairRDD<String,String> dedupRels = Deduper.computeRelations(sc, blocks, dedupConf);
|
||||
|
||||
final JavaRDD<Relation> isSimilarToRDD = dedupRels.map(simRel -> {
|
||||
final Relation r = new Relation();
|
||||
r.setSource(simRel._1());
|
||||
r.setTarget(simRel._2());
|
||||
r.setRelClass("isSimilarTo");
|
||||
return r;
|
||||
});
|
||||
|
||||
spark.createDataset(isSimilarToRDD.rdd(), Encoders.bean(Relation.class)).write().mode("overwrite").save( DedupUtility.createSimRelPath(targetPath,entity));
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -1,144 +0,0 @@
|
|||
package eu.dnetlib.dedup;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.actionmanager.actions.AtomicAction;
|
||||
import eu.dnetlib.actionmanager.common.Agent;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import eu.dnetlib.pace.model.MapDocument;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.dom4j.Document;
|
||||
import org.dom4j.DocumentException;
|
||||
import org.dom4j.Element;
|
||||
import org.dom4j.io.SAXReader;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.io.StringReader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class SparkCreateSimRels2 implements Serializable {
|
||||
|
||||
private static final Log log = LogFactory.getLog(SparkCreateSimRels2.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createSimRels_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
new SparkCreateSimRels2().run(parser);
|
||||
}
|
||||
|
||||
private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException {
|
||||
|
||||
//read oozie parameters
|
||||
final String graphBasePath = parser.get("graphBasePath");
|
||||
final String rawSet = parser.get("rawSet");
|
||||
final String isLookUpUrl = parser.get("isLookUpUrl");
|
||||
final String actionSetId = parser.get("actionSetId");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
|
||||
try (SparkSession spark = getSparkSession(parser)) {
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
//create empty sequenceFile for the accumulation
|
||||
JavaRDD<Tuple2<Text,Text>> simRel = sc.emptyRDD();
|
||||
|
||||
//for each dedup configuration
|
||||
for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) {
|
||||
final String entity = dedupConf.getWf().getEntityType();
|
||||
final String subEntity = dedupConf.getWf().getSubEntityValue();
|
||||
|
||||
JavaPairRDD<String, MapDocument> mapDocument = sc.textFile(graphBasePath + "/" + subEntity)
|
||||
.mapToPair(s -> {
|
||||
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
|
||||
return new Tuple2<>(d.getIdentifier(), d);
|
||||
});
|
||||
|
||||
//create blocks for deduplication
|
||||
JavaPairRDD<String, List<MapDocument>> blocks = Deduper.createsortedBlocks(sc, mapDocument, dedupConf);
|
||||
|
||||
//create relations by comparing only elements in the same group
|
||||
final JavaPairRDD<String, String> dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf);
|
||||
|
||||
JavaRDD<Relation> relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2(), entity));
|
||||
|
||||
//save the simrel in the workingdir
|
||||
spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class)).write().mode("overwrite").save( DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity));
|
||||
|
||||
//create atomic actions
|
||||
JavaRDD<Tuple2<Text, Text>> newSimRels = relationsRDD
|
||||
.map(this::createSequenceFileRow);
|
||||
|
||||
simRel = simRel.union(newSimRels);
|
||||
}
|
||||
|
||||
simRel.mapToPair(r -> r)
|
||||
.saveAsHadoopFile(rawSet, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public Tuple2<Text, Text> createSequenceFileRow(Relation relation) throws JsonProcessingException {
|
||||
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
String id = relation.getSource() + "@" + relation.getRelClass() + "@" + relation.getTarget();
|
||||
//TODO do be replaced by the new implementation of AtomicAction
|
||||
AtomicAction aa = new AtomicAction("rawSet", new Agent("agentId", "agentName", Agent.AGENT_TYPE.service), relation.getSource(), relation.getRelClass(), relation.getTarget(), new ObjectMapper().writeValueAsString(relation).getBytes());
|
||||
|
||||
return new Tuple2<>(
|
||||
new Text(id),
|
||||
new Text(mapper.writeValueAsString(aa))
|
||||
);
|
||||
}
|
||||
|
||||
public Relation createSimRel(String source, String target, String entity){
|
||||
final Relation r = new Relation();
|
||||
r.setSource(source);
|
||||
r.setTarget(target);
|
||||
|
||||
switch(entity){
|
||||
case "result":
|
||||
r.setRelClass("resultResult_dedupSimilarity_isSimilarTo");
|
||||
break;
|
||||
case "organization":
|
||||
r.setRelClass("organizationOrganization_dedupSimilarity_isSimilarTo");
|
||||
break;
|
||||
default:
|
||||
r.setRelClass("isSimilarTo");
|
||||
break;
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
return SparkSession
|
||||
.builder()
|
||||
.appName(SparkCreateSimRels2.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.config(conf)
|
||||
.enableHiveSupport()
|
||||
.getOrCreate();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,169 @@
|
|||
package eu.dnetlib.dedup;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.Optional;
|
||||
import org.apache.spark.api.java.function.Function;
|
||||
import org.apache.spark.api.java.function.PairFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class SparkPropagateRelation {
|
||||
|
||||
enum FieldType {
|
||||
SOURCE,
|
||||
TARGET
|
||||
}
|
||||
|
||||
final static String SOURCEJSONPATH = "$.source";
|
||||
final static String TARGETJSONPATH = "$.target";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkPropagateRelation.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/propagateRelation_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
new SparkPropagateRelation().run(parser);
|
||||
}
|
||||
|
||||
public void run(ArgumentApplicationParser parser) {
|
||||
|
||||
final String graphBasePath = parser.get("graphBasePath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
final String dedupGraphPath = parser.get("dedupGraphPath");
|
||||
|
||||
try (SparkSession spark = getSparkSession(parser)) {
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
final Dataset<Relation> mergeRels = spark.read().load(DedupUtility.createMergeRelPath(workingPath, "*", "*")).as(Encoders.bean(Relation.class));
|
||||
|
||||
final JavaPairRDD<String, String> mergedIds = mergeRels
|
||||
.where("relClass == 'merges'")
|
||||
.select(mergeRels.col("source"), mergeRels.col("target"))
|
||||
.distinct()
|
||||
.toJavaRDD()
|
||||
.mapToPair((PairFunction<Row, String, String>) r -> new Tuple2<>(r.getString(1), r.getString(0)));
|
||||
|
||||
JavaRDD<String> relations = sc.textFile(DedupUtility.createEntityPath(graphBasePath, "relation"));
|
||||
|
||||
JavaRDD<String> newRels = relations.mapToPair(
|
||||
(PairFunction<String, String, String>) s ->
|
||||
new Tuple2<>(MapDocumentUtil.getJPathString(SOURCEJSONPATH, s), s))
|
||||
.leftOuterJoin(mergedIds)
|
||||
.map((Function<Tuple2<String, Tuple2<String, Optional<String>>>, String>) v1 -> {
|
||||
if (v1._2()._2().isPresent()) {
|
||||
return replaceField(v1._2()._1(), v1._2()._2().get(), FieldType.SOURCE);
|
||||
}
|
||||
return v1._2()._1();
|
||||
})
|
||||
.mapToPair(
|
||||
(PairFunction<String, String, String>) s ->
|
||||
new Tuple2<>(MapDocumentUtil.getJPathString(TARGETJSONPATH, s), s))
|
||||
.leftOuterJoin(mergedIds)
|
||||
.map((Function<Tuple2<String, Tuple2<String, Optional<String>>>, String>) v1 -> {
|
||||
if (v1._2()._2().isPresent()) {
|
||||
return replaceField(v1._2()._1(), v1._2()._2().get(), FieldType.TARGET);
|
||||
}
|
||||
return v1._2()._1();
|
||||
}).filter(SparkPropagateRelation::containsDedup)
|
||||
.repartition(500);
|
||||
|
||||
//update deleted by inference
|
||||
relations = relations.mapToPair(
|
||||
(PairFunction<String, String, String>) s ->
|
||||
new Tuple2<>(MapDocumentUtil.getJPathString(SOURCEJSONPATH, s), s))
|
||||
.leftOuterJoin(mergedIds)
|
||||
.map((Function<Tuple2<String, Tuple2<String, Optional<String>>>, String>) v1 -> {
|
||||
if (v1._2()._2().isPresent()) {
|
||||
return updateDeletedByInference(v1._2()._1(), Relation.class);
|
||||
}
|
||||
return v1._2()._1();
|
||||
})
|
||||
.mapToPair(
|
||||
(PairFunction<String, String, String>) s ->
|
||||
new Tuple2<>(MapDocumentUtil.getJPathString(TARGETJSONPATH, s), s))
|
||||
.leftOuterJoin(mergedIds)
|
||||
.map((Function<Tuple2<String, Tuple2<String, Optional<String>>>, String>) v1 -> {
|
||||
if (v1._2()._2().isPresent()) {
|
||||
return updateDeletedByInference(v1._2()._1(), Relation.class);
|
||||
}
|
||||
return v1._2()._1();
|
||||
})
|
||||
.repartition(500);
|
||||
|
||||
newRels.union(relations).repartition(1000)
|
||||
.saveAsTextFile(DedupUtility.createEntityPath(dedupGraphPath, "relation"), GzipCodec.class);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean containsDedup(final String json) {
|
||||
final String source = MapDocumentUtil.getJPathString(SOURCEJSONPATH, json);
|
||||
final String target = MapDocumentUtil.getJPathString(TARGETJSONPATH, json);
|
||||
|
||||
return source.toLowerCase().contains("dedup") || target.toLowerCase().contains("dedup");
|
||||
}
|
||||
|
||||
private static String replaceField(final String json, final String id, final FieldType type) {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
try {
|
||||
Relation relation = mapper.readValue(json, Relation.class);
|
||||
if (relation.getDataInfo() == null)
|
||||
relation.setDataInfo(new DataInfo());
|
||||
relation.getDataInfo().setDeletedbyinference(false);
|
||||
switch (type) {
|
||||
case SOURCE:
|
||||
relation.setSource(id);
|
||||
return mapper.writeValueAsString(relation);
|
||||
case TARGET:
|
||||
relation.setTarget(id);
|
||||
return mapper.writeValueAsString(relation);
|
||||
default:
|
||||
throw new IllegalArgumentException("");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("unable to deserialize json relation: " + json, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
return SparkSession
|
||||
.builder()
|
||||
.appName(SparkPropagateRelation.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.config(conf)
|
||||
.enableHiveSupport()
|
||||
.getOrCreate();
|
||||
}
|
||||
|
||||
private static <T extends Oaf> String updateDeletedByInference(final String json, final Class<T> clazz) {
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
try {
|
||||
Oaf entity = mapper.readValue(json, clazz);
|
||||
if (entity.getDataInfo()== null)
|
||||
entity.setDataInfo(new DataInfo());
|
||||
entity.getDataInfo().setDeletedbyinference(true);
|
||||
return mapper.writeValueAsString(entity);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Unable to convert json", e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,121 @@
|
|||
package eu.dnetlib.dedup;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.PairFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.dom4j.DocumentException;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class SparkUpdateEntity {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntity.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/updateEntity_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
new SparkUpdateEntity().run(parser);
|
||||
}
|
||||
|
||||
public void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException {
|
||||
|
||||
final String graphBasePath = parser.get("graphBasePath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
final String dedupGraphPath = parser.get("dedupGraphPath");
|
||||
final String isLookUpUrl = parser.get("isLookUpUrl");
|
||||
final String actionSetId = parser.get("actionSetId");
|
||||
|
||||
try (SparkSession spark = getSparkSession(parser)) {
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
for (DedupConfig dedupConf : DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) {
|
||||
|
||||
String subEntity = dedupConf.getWf().getSubEntityValue();
|
||||
|
||||
final Dataset<Relation> df = spark.read().load(DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity)).as(Encoders.bean(Relation.class));
|
||||
final JavaPairRDD<String, String> mergedIds = df
|
||||
.where("relClass == 'merges'")
|
||||
.select(df.col("target"))
|
||||
.distinct()
|
||||
.toJavaRDD()
|
||||
.mapToPair((PairFunction<Row, String, String>) r -> new Tuple2<>(r.getString(0), "d"));
|
||||
|
||||
final JavaRDD<String> sourceEntity = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity));
|
||||
|
||||
final JavaRDD<String> dedupEntity = sc.textFile(DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity));
|
||||
|
||||
JavaPairRDD<String, String> entitiesWithId = sourceEntity.mapToPair((PairFunction<String, String, String>) s -> new Tuple2<>(MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s), s));
|
||||
|
||||
Class<? extends Oaf> mainClass;
|
||||
switch (subEntity) {
|
||||
case "publication":
|
||||
mainClass = Publication.class;
|
||||
break;
|
||||
case "dataset":
|
||||
mainClass = eu.dnetlib.dhp.schema.oaf.Dataset.class;
|
||||
break;
|
||||
case "datasource":
|
||||
mainClass = Datasource.class;
|
||||
break;
|
||||
case "software":
|
||||
mainClass = Software.class;
|
||||
break;
|
||||
case "organization":
|
||||
mainClass = Organization.class;
|
||||
break;
|
||||
case "otherresearchproduct":
|
||||
mainClass = OtherResearchProduct.class;
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Illegal type " + subEntity);
|
||||
}
|
||||
|
||||
JavaRDD<String> map = entitiesWithId.leftOuterJoin(mergedIds).map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), mainClass) : k._2()._1());
|
||||
map.union(dedupEntity).saveAsTextFile(dedupGraphPath + "/" + subEntity, GzipCodec.class);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static <T extends Oaf> String updateDeletedByInference(final String json, final Class<T> clazz) {
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
try {
|
||||
Oaf entity = mapper.readValue(json, clazz);
|
||||
if (entity.getDataInfo()== null)
|
||||
entity.setDataInfo(new DataInfo());
|
||||
entity.getDataInfo().setDeletedbyinference(true);
|
||||
return mapper.writeValueAsString(entity);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Unable to convert json", e);
|
||||
}
|
||||
}
|
||||
|
||||
private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
return SparkSession
|
||||
.builder()
|
||||
.appName(SparkUpdateEntity.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.config(conf)
|
||||
.enableHiveSupport()
|
||||
.getOrCreate();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
[
|
||||
{
|
||||
"paramName": "mt",
|
||||
"paramLongName": "master",
|
||||
"paramDescription": "should be local or yarn",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "i",
|
||||
"paramLongName": "graphBasePath",
|
||||
"paramDescription": "the base path of raw graph",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "w",
|
||||
"paramLongName": "workingPath",
|
||||
"paramDescription": "the working directory path",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "la",
|
||||
"paramLongName": "isLookUpUrl",
|
||||
"paramDescription": "the url of the lookup service",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "asi",
|
||||
"paramLongName": "actionSetId",
|
||||
"paramDescription": "the id of the actionset (orchestrator)",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,129 @@
|
|||
<workflow-app name="Build Root Records" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>graphBasePath</name>
|
||||
<description>the raw graph base path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>isLookUpUrl</name>
|
||||
<description>the address of the lookUp service</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>actionSetId</name>
|
||||
<description>id of the actionSet</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>workingPath</name>
|
||||
<description>path of the working directory</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dedupGraphPath</name>
|
||||
<description>path of the dedup graph</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="DeleteWorkingPath"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="DeleteWorkingPath">
|
||||
<fs>
|
||||
<delete path='${workingPath}/${actionSetId}/*_mergerel'/>
|
||||
<delete path='${workingPath}/${actionSetId}/*_deduprecord'/>
|
||||
</fs>
|
||||
<ok to="CreateMergeRel"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="CreateMergeRel">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Create Merge Relations</name>
|
||||
<class>eu.dnetlib.dedup.SparkCreateConnectedComponent</class>
|
||||
<jar>dhp-dedup-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory} --conf
|
||||
spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf
|
||||
spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf
|
||||
spark.sql.warehouse.dir="/user/hive/warehouse"
|
||||
</spark-opts>
|
||||
<arg>-mt</arg><arg>yarn-cluster</arg>
|
||||
<arg>--i</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--w</arg><arg>${workingPath}</arg>
|
||||
<arg>--la</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--asi</arg><arg>${actionSetId}</arg>
|
||||
</spark>
|
||||
<ok to="CreateDedupRecord"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="CreateDedupRecord">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Create Dedup Record</name>
|
||||
<class>eu.dnetlib.dedup.SparkCreateDedupRecord</class>
|
||||
<jar>dhp-dedup-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory} --conf
|
||||
spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf
|
||||
spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf
|
||||
spark.sql.warehouse.dir="/user/hive/warehouse"
|
||||
</spark-opts>
|
||||
<arg>-mt</arg><arg>yarn-cluster</arg>
|
||||
<arg>--i</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--w</arg><arg>${workingPath}</arg>
|
||||
<arg>--la</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--asi</arg><arg>${actionSetId}</arg>
|
||||
</spark>
|
||||
<ok to="UpdateEntity"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="UpdateEntity">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Create Dedup Record</name>
|
||||
<class>eu.dnetlib.dedup.SparkUpdateEntity</class>
|
||||
<jar>dhp-dedup-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory} --conf
|
||||
spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf
|
||||
spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf
|
||||
spark.sql.warehouse.dir="/user/hive/warehouse"
|
||||
</spark-opts>
|
||||
<arg>-mt</arg><arg>yarn-cluster</arg>
|
||||
<arg>--i</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--w</arg><arg>${workingPath}</arg>
|
||||
<arg>--la</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--asi</arg><arg>${actionSetId}</arg>
|
||||
<arg>--o</arg><arg>${dedupGraphPath}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -55,7 +55,7 @@
|
|||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Create Similarity Relations</name>
|
||||
<class>eu.dnetlib.dedup.SparkCreateSimRels2</class>
|
||||
<class>eu.dnetlib.dedup.SparkCreateSimRels</class>
|
||||
<jar>dhp-dedup-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory} --conf
|
||||
|
@ -64,10 +64,11 @@
|
|||
spark.sql.warehouse.dir="/user/hive/warehouse"
|
||||
</spark-opts>
|
||||
<arg>-mt</arg><arg>yarn-cluster</arg>
|
||||
<arg>--i</arg><arg>${rawGraphBasePath}</arg>
|
||||
<arg>--i</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--o</arg><arg>${rawSet}</arg>
|
||||
<arg>--la</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--asi</arg><arg>${actionSetId}</arg>
|
||||
<arg>--w</arg><arg>${workingPath}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
<workflow-app name="Create Similarity Relations" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>graphBasePath</name>
|
||||
<description>the raw graph base path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>workingPath</name>
|
||||
<description>path for the working directory</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dedupGraphPath</name>
|
||||
<description>path of the dedup graph</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="DeleteWorkingPath"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="DeleteWorkingPath">
|
||||
<fs>
|
||||
<delete path='${dedupGraphPath}/relation'/>
|
||||
</fs>
|
||||
<ok to="DuplicateScan"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="DuplicateScan">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Update Relations</name>
|
||||
<class>eu.dnetlib.dedup.SparkPropagateRelation</class>
|
||||
<jar>dhp-dedup-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory} --conf
|
||||
spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf
|
||||
spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf
|
||||
spark.sql.warehouse.dir="/user/hive/warehouse"
|
||||
</spark-opts>
|
||||
<arg>-mt</arg><arg>yarn-cluster</arg>
|
||||
<arg>--i</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--o</arg><arg>${dedupGraphPath}</arg>
|
||||
<arg>--w</arg><arg>${workingPath}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -0,0 +1,26 @@
|
|||
[
|
||||
{
|
||||
"paramName": "mt",
|
||||
"paramLongName": "master",
|
||||
"paramDescription": "should be local or yarn",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "i",
|
||||
"paramLongName": "graphBasePath",
|
||||
"paramDescription": "the base path of raw graph",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "w",
|
||||
"paramLongName": "workingPath",
|
||||
"paramDescription": "the working directory path",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "o",
|
||||
"paramLongName": "dedupGraphPath",
|
||||
"paramDescription": "the path of the dedup graph",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,38 @@
|
|||
[
|
||||
{
|
||||
"paramName": "mt",
|
||||
"paramLongName": "master",
|
||||
"paramDescription": "should be local or yarn",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "i",
|
||||
"paramLongName": "graphBasePath",
|
||||
"paramDescription": "the base path of raw graph",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "w",
|
||||
"paramLongName": "workingPath",
|
||||
"paramDescription": "the working directory path",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "la",
|
||||
"paramLongName": "isLookUpUrl",
|
||||
"paramDescriptions": "the url of the lookup service",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "asi",
|
||||
"paramLongName": "actionSetId",
|
||||
"paramDescriptions": "the id of the actionset (orchestrator)",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "o",
|
||||
"paramLongName": "dedupGraphPath",
|
||||
"paramDescription": "the path of the dedup graph",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -1,21 +1,14 @@
|
|||
package eu.dnetlib.dedup;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.hash.HashFunction;
|
||||
import com.google.common.hash.Hashing;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
public class SparkCreateDedupTest {
|
||||
|
||||
|
@ -27,22 +20,10 @@ public class SparkCreateDedupTest {
|
|||
configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/org.curr.conf.json"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void createSimRelsTest() throws Exception {
|
||||
SparkCreateSimRels.main(new String[] {
|
||||
"-mt", "local[*]",
|
||||
"-s", "/Users/miconis/dumps",
|
||||
"-e", entity,
|
||||
"-c", ArgumentApplicationParser.compressArgument(configuration),
|
||||
"-t", "/tmp/dedup",
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void createSimRelsTest2() throws Exception {
|
||||
SparkCreateSimRels2.main(new String[] {
|
||||
SparkCreateSimRels.main(new String[] {
|
||||
"-mt", "local[*]",
|
||||
"-s", "/Users/miconis/dumps",
|
||||
"-e", entity,
|
||||
|
@ -98,4 +79,14 @@ public class SparkCreateDedupTest {
|
|||
System.out.println(hashFunction.hashUnencodedChars(s2).asLong());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJoinEntities() throws Exception{
|
||||
SparkJoinEntities.main(new String[] {
|
||||
"-mt", "local[*]",
|
||||
"-i", "/tmp/dedup",
|
||||
"-w", "/tmp/dedup",
|
||||
"-o", "/tmp/dedup",
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue