relation consistency workflow separated from dedup scan and creation of CCs

This commit is contained in:
Claudio Atzori 2020-04-17 13:12:44 +02:00
parent c92bfeeaee
commit 038ac7afd7
10 changed files with 330 additions and 163 deletions

View File

@ -22,6 +22,21 @@ public class HdfsSupport {
private HdfsSupport() { private HdfsSupport() {
} }
/**
* Checks a path (file or dir) exists on HDFS.
*
* @param path Path to be checked
* @param configuration Configuration of hadoop env
*/
public static boolean exists(String path, Configuration configuration) {
logger.info("Removing path: {}", path);
return rethrowAsRuntimeException(() -> {
Path f = new Path(path);
FileSystem fileSystem = FileSystem.get(configuration);
return fileSystem.exists(f);
});
}
/** /**
* Removes a path (file or dir) from HDFS. * Removes a path (file or dir) from HDFS.
* *

View File

@ -99,6 +99,8 @@ public class PartitionActionSetsByPayloadTypeJob {
List<String> inputActionSetPaths, List<String> inputActionSetPaths,
String outputPath) { String outputPath) {
inputActionSetPaths inputActionSetPaths
.stream()
.filter(path -> HdfsSupport.exists(path, spark.sparkContext().hadoopConfiguration()))
.forEach(inputActionSetPath -> { .forEach(inputActionSetPath -> {
Dataset<Row> actionDS = readActionSetFromPath(spark, inputActionSetPath); Dataset<Row> actionDS = readActionSetFromPath(spark, inputActionSetPath);
saveActions(actionDS, outputPath); saveActions(actionDS, outputPath);

View File

@ -1,7 +1,9 @@
package eu.dnetlib.dhp.oa.dedup; package eu.dnetlib.dhp.oa.dedup;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -21,6 +23,9 @@ import java.util.List;
abstract class AbstractSparkAction implements Serializable { abstract class AbstractSparkAction implements Serializable {
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
public ArgumentApplicationParser parser; //parameters for the spark action public ArgumentApplicationParser parser; //parameters for the spark action
public SparkSession spark; //the spark session public SparkSession spark; //the spark session
@ -108,4 +113,8 @@ abstract class AbstractSparkAction implements Serializable {
.config(conf) .config(conf)
.getOrCreate(); .getOrCreate();
} }
protected static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
} }

View File

@ -1,6 +1,5 @@
package eu.dnetlib.dhp.oa.dedup; package eu.dnetlib.dhp.oa.dedup;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
@ -42,26 +41,27 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
final String actionSetId = parser.get("actionSetId"); final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
System.out.println(String.format("graphBasePath: '%s'", graphBasePath)); log.info("graphBasePath: '{}'", graphBasePath);
System.out.println(String.format("isLookUpUrl: '%s'", isLookUpUrl)); log.info("isLookUpUrl: '{}'", isLookUpUrl);
System.out.println(String.format("actionSetId: '%s'", actionSetId)); log.info("actionSetId: '{}'", actionSetId);
System.out.println(String.format("workingPath: '%s'", workingPath)); log.info("workingPath: '{}'", workingPath);
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
for (DedupConfig dedupConf: getConfigurations(isLookUpService, actionSetId)) { for (DedupConfig dedupConf: getConfigurations(isLookUpService, actionSetId)) {
String subEntity = dedupConf.getWf().getSubEntityValue(); String subEntity = dedupConf.getWf().getSubEntityValue();
System.out.println(String.format("Creating deduprecords for: '%s'", subEntity)); log.info("Creating deduprecords for: '{}'", subEntity);
final String outputPath = DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity);
removeOutputDir(spark, outputPath);
final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity);
final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity); final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity);
final OafEntityType entityType = OafEntityType.valueOf(subEntity); final OafEntityType entityType = OafEntityType.valueOf(subEntity);
final JavaRDD<OafEntity> dedupRecord = final JavaRDD<OafEntity> dedupRecord =
DedupRecordFactory.createDedupRecord(sc, spark, mergeRelPath, entityPath, entityType, dedupConf); DedupRecordFactory.createDedupRecord(sc, spark, mergeRelPath, entityPath, entityType, dedupConf);
dedupRecord.map(r -> {
ObjectMapper mapper = new ObjectMapper(); dedupRecord.map(r -> OBJECT_MAPPER.writeValueAsString(r)).saveAsTextFile(outputPath);
return mapper.writeValueAsString(r);
}).saveAsTextFile(DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity));
} }
} }

View File

@ -4,6 +4,8 @@ import com.google.common.hash.Hashing;
import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent; import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor; import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
@ -19,6 +21,7 @@ import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD; import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
import scala.Tuple2; import scala.Tuple2;
@ -32,7 +35,9 @@ import java.util.List;
public class SparkCreateMergeRels extends AbstractSparkAction { public class SparkCreateMergeRels extends AbstractSparkAction {
public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup";
private static final Logger log = LoggerFactory.getLogger(SparkCreateMergeRels.class); private static final Logger log = LoggerFactory.getLogger(SparkCreateMergeRels.class);
public static final String DNET_PROVENANCE_ACTIONS = "dnet:provenanceActions";
public SparkCreateMergeRels(ArgumentApplicationParser parser, SparkSession spark) { public SparkCreateMergeRels(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark); super(parser, spark);
@ -44,7 +49,10 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
parser.parseArgument(args); parser.parseArgument(args);
new SparkCreateMergeRels(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); final String isLookUpUrl = parser.get("isLookUpUrl");
log.info("isLookupUrl {}", isLookUpUrl);
new SparkCreateMergeRels(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(isLookUpUrl));
} }
@Override @Override
@ -55,55 +63,91 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final String isLookUpUrl = parser.get("isLookUpUrl"); final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId"); final String actionSetId = parser.get("actionSetId");
System.out.println(String.format("graphBasePath: '%s'", graphBasePath)); log.info("graphBasePath: '{}'", graphBasePath);
System.out.println(String.format("isLookUpUrl: '%s'", isLookUpUrl)); log.info("isLookUpUrl: '{}'", isLookUpUrl);
System.out.println(String.format("actionSetId: '%s'", actionSetId)); log.info("actionSetId: '{}'", actionSetId);
System.out.println(String.format("workingPath: '%s'", workingPath)); log.info("workingPath: '{}'", workingPath);
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
for (DedupConfig dedupConf: getConfigurations(isLookUpService, actionSetId)) { for (DedupConfig dedupConf: getConfigurations(isLookUpService, actionSetId)) {
final String subEntity = dedupConf.getWf().getSubEntityValue(); final String subEntity = dedupConf.getWf().getSubEntityValue();
System.out.println(String.format("Creating mergerels for: '%s'", subEntity));
log.info("Creating mergerels for: '{}'", subEntity);
final int maxIterations = dedupConf.getWf().getMaxIterations();
log.info("Max iterations {}", maxIterations);
final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity);
final JavaPairRDD<Object, String> vertexes = sc.textFile(graphBasePath + "/" + subEntity) final JavaPairRDD<Object, String> vertexes = sc.textFile(graphBasePath + "/" + subEntity)
.map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s))
.mapToPair((PairFunction<String, Object, String>) .mapToPair((PairFunction<String, Object, String>)
s -> new Tuple2<Object, String>(getHashcode(s), s) s -> new Tuple2<>(getHashcode(s), s));
);
final Dataset<Relation> similarityRelations = spark.read().load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)).as(Encoders.bean(Relation.class)); final Dataset<Relation> similarityRelations = spark
final RDD<Edge<String>> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd(); .read()
final JavaRDD<ConnectedComponent> cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD(); .load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity))
final Dataset<Relation> mergeRelation = spark.createDataset(cc.filter(k -> k.getDocIds().size() > 1) .as(Encoders.bean(Relation.class));
.flatMap(this::ccToMergeRel).rdd(), Encoders.bean(Relation.class));
mergeRelation final RDD<Edge<String>> edgeRdd = similarityRelations
.write().mode("overwrite") .javaRDD()
.save(DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity)); .map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass()))
.rdd();
final RDD<Relation> connectedComponents = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, maxIterations)
.toJavaRDD()
.filter(k -> k.getDocIds().size() > 1)
.flatMap(cc -> ccToMergeRel(cc, dedupConf))
.rdd();
spark
.createDataset(connectedComponents, Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Append)
.save(mergeRelPath);
} }
} }
public Iterator<Relation> ccToMergeRel(ConnectedComponent cc){ public Iterator<Relation> ccToMergeRel(ConnectedComponent cc, DedupConfig dedupConf){
return cc.getDocIds() return cc.getDocIds()
.stream() .stream()
.flatMap(id -> { .flatMap(id -> {
List<Relation> tmp = new ArrayList<>(); List<Relation> tmp = new ArrayList<>();
Relation r = new Relation();
r.setSource(cc.getCcId()); tmp.add(rel(cc.getCcId(), id, "merges", dedupConf));
r.setTarget(id); tmp.add(rel(id, cc.getCcId(), "isMergedIn", dedupConf));
r.setRelClass("merges");
tmp.add(r);
r = new Relation();
r.setTarget(cc.getCcId());
r.setSource(id);
r.setRelClass("isMergedIn");
tmp.add(r);
return tmp.stream(); return tmp.stream();
}).iterator(); }).iterator();
} }
private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) {
Relation r = new Relation();
r.setSource(source);
r.setTarget(target);
r.setRelClass(relClass);
r.setSubRelType("dedup");
DataInfo info = new DataInfo();
info.setDeletedbyinference(false);
info.setInferred(true);
info.setInvisible(false);
info.setInferenceprovenance(dedupConf.getWf().getConfigurationId());
Qualifier provenanceAction = new Qualifier();
provenanceAction.setClassid(PROVENANCE_ACTION_CLASS);
provenanceAction.setClassname(PROVENANCE_ACTION_CLASS);
provenanceAction.setSchemeid(DNET_PROVENANCE_ACTIONS);
provenanceAction.setSchemename(DNET_PROVENANCE_ACTIONS);
info.setProvenanceaction(provenanceAction);
//TODO calculate the trust value based on the similarity score of the elements in the CC
//info.setTrust();
r.setDataInfo(info);
return r;
}
public static long getHashcode(final String id) { public static long getHashcode(final String id) {
return Hashing.murmur3_128().hashString(id).asLong(); return Hashing.murmur3_128().hashString(id).asLong();
} }

View File

@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
@ -18,6 +19,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -53,24 +55,27 @@ public class SparkCreateSimRels extends AbstractSparkAction {
final String actionSetId = parser.get("actionSetId"); final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
System.out.println(String.format("graphBasePath: '%s'", graphBasePath)); log.info("graphBasePath: '{}'", graphBasePath);
System.out.println(String.format("isLookUpUrl: '%s'", isLookUpUrl)); log.info("isLookUpUrl: '{}'", isLookUpUrl);
System.out.println(String.format("actionSetId: '%s'", actionSetId)); log.info("actionSetId: '{}'", actionSetId);
System.out.println(String.format("workingPath: '%s'", workingPath)); log.info("workingPath: '{}'", workingPath);
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
//for each dedup configuration //for each dedup configuration
for (DedupConfig dedupConf: getConfigurations(isLookUpService, actionSetId)) { for (DedupConfig dedupConf: getConfigurations(isLookUpService, actionSetId)) {
final String entity = dedupConf.getWf().getEntityType(); final String entity = dedupConf.getWf().getEntityType();
final String subEntity = dedupConf.getWf().getSubEntityValue(); final String subEntity = dedupConf.getWf().getSubEntityValue();
System.out.println(String.format("Creating simrels for: '%s'", subEntity)); log.info("Creating simrels for: '{}'", subEntity);
final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity);
removeOutputDir(spark, outputPath);
JavaPairRDD<String, MapDocument> mapDocument = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) JavaPairRDD<String, MapDocument> mapDocument = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
.mapToPair((PairFunction<String, String, MapDocument>) s -> { .mapToPair((PairFunction<String, String, MapDocument>) s -> {
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
return new Tuple2<String, MapDocument>(d.getIdentifier(), d); return new Tuple2<>(d.getIdentifier(), d);
}); });
//create blocks for deduplication //create blocks for deduplication
@ -84,46 +89,30 @@ public class SparkCreateSimRels extends AbstractSparkAction {
//save the simrel in the workingdir //save the simrel in the workingdir
spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class)) spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class))
.write() .write()
.mode("overwrite") .mode(SaveMode.Append)
.save(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)); .save(outputPath);
} }
} }
/**
* Utility method used to create an atomic action from a Relation object
* @param relation input relation
* @return A tuple2 with [id, json serialization of the atomic action]
* @throws JsonProcessingException
*/
public Tuple2<Text, Text> createSequenceFileRow(Relation relation) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
String id = relation.getSource() + "@" + relation.getRelClass() + "@" + relation.getTarget();
AtomicAction<Relation> aa = new AtomicAction<>(Relation.class, relation);
return new Tuple2<>(
new Text(id),
new Text(mapper.writeValueAsString(aa))
);
}
public Relation createSimRel(String source, String target, String entity) { public Relation createSimRel(String source, String target, String entity) {
final Relation r = new Relation(); final Relation r = new Relation();
r.setSource(source); r.setSource(source);
r.setTarget(target); r.setTarget(target);
r.setSubRelType("dedupSimilarity");
r.setRelClass("isSimilarTo");
r.setDataInfo(new DataInfo());
switch(entity){ switch(entity){
case "result": case "result":
r.setRelClass("resultResult_dedupSimilarity_isSimilarTo"); r.setRelType("resultResult");
break; break;
case "organization": case "organization":
r.setRelClass("organizationOrganization_dedupSimilarity_isSimilarTo"); r.setRelType("organizationOrganization");
break; break;
default: default:
r.setRelClass("isSimilarTo"); throw new IllegalArgumentException("unmanaged entity type: " + entity);
break;
} }
return r; return r;
} }
} }

View File

@ -27,9 +27,6 @@ public class SparkPropagateRelation extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkPropagateRelation.class); private static final Logger log = LoggerFactory.getLogger(SparkPropagateRelation.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
enum FieldType { enum FieldType {
SOURCE, SOURCE,
TARGET TARGET
@ -62,7 +59,7 @@ public class SparkPropagateRelation extends AbstractSparkAction {
log.info("dedupGraphPath: '{}'", dedupGraphPath); log.info("dedupGraphPath: '{}'", dedupGraphPath);
final String outputRelationPath = DedupUtility.createEntityPath(dedupGraphPath, "relation"); final String outputRelationPath = DedupUtility.createEntityPath(dedupGraphPath, "relation");
deletePath(outputRelationPath); removeOutputDir(spark, outputRelationPath);
Dataset<Relation> mergeRels = spark.read() Dataset<Relation> mergeRels = spark.read()
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) .load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))

View File

@ -3,17 +3,18 @@ package eu.dnetlib.dhp.oa.dedup;
import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil; import eu.dnetlib.pace.util.MapDocumentUtil;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
@ -27,7 +28,7 @@ import org.slf4j.LoggerFactory;
import scala.Tuple2; import scala.Tuple2;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.util.Map;
public class SparkUpdateEntity extends AbstractSparkAction { public class SparkUpdateEntity extends AbstractSparkAction {
@ -48,10 +49,61 @@ public class SparkUpdateEntity extends AbstractSparkAction {
new SparkUpdateEntity(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); new SparkUpdateEntity(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
} }
public boolean mergeRelExists(String basePath, String entity) throws IOException { public void run(ISLookUpService isLookUpService) throws IOException {
final String graphBasePath = parser.get("graphBasePath");
final String workingPath = parser.get("workingPath");
final String dedupGraphPath = parser.get("dedupGraphPath");
log.info("graphBasePath: '{}'", graphBasePath);
log.info("workingPath: '{}'", workingPath);
log.info("dedupGraphPath: '{}'", dedupGraphPath);
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
//for each entity
ModelSupport.entityTypes.forEach((entity, clazz) -> {
final String outputPath = dedupGraphPath + "/" + entity;
removeOutputDir(spark, outputPath);
JavaRDD<String> sourceEntity = sc.textFile(DedupUtility.createEntityPath(graphBasePath, entity.toString()));
if (mergeRelExists(workingPath, entity.toString())) {
final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, "*", entity.toString());
final String dedupRecordPath = DedupUtility.createDedupRecordPath(workingPath, "*", entity.toString());
final Dataset<Relation> rel = spark.read()
.load(mergeRelPath)
.as(Encoders.bean(Relation.class));
final JavaPairRDD<String, String> mergedIds = rel
.where("relClass == 'merges'")
.select(rel.col("target"))
.distinct()
.toJavaRDD()
.mapToPair((PairFunction<Row, String, String>) r -> new Tuple2<>(r.getString(0), "d"));
JavaPairRDD<String, String> entitiesWithId = sourceEntity
.mapToPair((PairFunction<String, String, String>) s -> new Tuple2<>(MapDocumentUtil.getJPathString(IDJSONPATH, s), s));
JavaRDD<String> map = entitiesWithId
.leftOuterJoin(mergedIds)
.map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), clazz) : k._2()._1());
sourceEntity = map.union(sc.textFile(dedupRecordPath));
}
sourceEntity.saveAsTextFile(outputPath, GzipCodec.class);
});
}
public boolean mergeRelExists(String basePath, String entity) {
boolean result = false; boolean result = false;
try {
FileSystem fileSystem = FileSystem.get(new Configuration()); FileSystem fileSystem = FileSystem.get(new Configuration());
FileStatus[] fileStatuses = fileSystem.listStatus(new Path(basePath)); FileStatus[] fileStatuses = fileSystem.listStatus(new Path(basePath));
@ -63,83 +115,21 @@ public class SparkUpdateEntity extends AbstractSparkAction {
} }
return result; return result;
} } catch (IOException e) {
throw new RuntimeException(e);
public void run(ISLookUpService isLookUpService) throws IOException {
final String graphBasePath = parser.get("graphBasePath");
final String workingPath = parser.get("workingPath");
final String dedupGraphPath = parser.get("dedupGraphPath");
System.out.println(String.format("graphBasePath: '%s'", graphBasePath));
System.out.println(String.format("workingPath: '%s'", workingPath));
System.out.println(String.format("dedupGraphPath: '%s'", dedupGraphPath));
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
//for each entity
for (OafEntityType entity: OafEntityType.values()) {
JavaRDD<String> sourceEntity = sc.textFile(DedupUtility.createEntityPath(graphBasePath, entity.toString()));
if (mergeRelExists(workingPath, entity.toString())) {
final Dataset<Relation> rel = spark.read().load(DedupUtility.createMergeRelPath(workingPath, "*", entity.toString())).as(Encoders.bean(Relation.class));
final JavaPairRDD<String, String> mergedIds = rel
.where("relClass == 'merges'")
.select(rel.col("target"))
.distinct()
.toJavaRDD()
.mapToPair((PairFunction<Row, String, String>) r -> new Tuple2<String, String>(r.getString(0), "d"));
final JavaRDD<String> dedupEntity = sc.textFile(DedupUtility.createDedupRecordPath(workingPath, "*", entity.toString()));
JavaPairRDD<String, String> entitiesWithId = sourceEntity.mapToPair((PairFunction<String, String, String>) s -> new Tuple2<String, String>(MapDocumentUtil.getJPathString(IDJSONPATH, s), s));
JavaRDD<String> map = entitiesWithId.leftOuterJoin(mergedIds).map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), getOafClass(entity)) : k._2()._1());
sourceEntity = map.union(dedupEntity);
}
sourceEntity.saveAsTextFile(dedupGraphPath + "/" + entity, GzipCodec.class);
}
}
public Class<? extends Oaf> getOafClass(OafEntityType className) {
switch (className.toString()) {
case "publication":
return Publication.class;
case "dataset":
return eu.dnetlib.dhp.schema.oaf.Dataset.class;
case "datasource":
return Datasource.class;
case "software":
return Software.class;
case "organization":
return Organization.class;
case "otherresearchproduct":
return OtherResearchProduct.class;
case "project":
return Project.class;
default:
throw new IllegalArgumentException("Illegal type " + className);
} }
} }
private static <T extends Oaf> String updateDeletedByInference(final String json, final Class<T> clazz) { private static <T extends OafEntity> String updateDeletedByInference(final String json, final Class<T> clazz) {
final ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
try { try {
Oaf entity = mapper.readValue(json, clazz); Oaf entity = OBJECT_MAPPER.readValue(json, clazz);
if (entity.getDataInfo()== null) if (entity.getDataInfo()== null)
entity.setDataInfo(new DataInfo()); entity.setDataInfo(new DataInfo());
entity.getDataInfo().setDeletedbyinference(true); entity.getDataInfo().setDeletedbyinference(true);
return mapper.writeValueAsString(entity); return OBJECT_MAPPER.writeValueAsString(entity);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Unable to convert json", e); throw new RuntimeException("Unable to convert json", e);
} }
} }
} }

View File

@ -68,38 +68,12 @@
</configuration> </configuration>
</global> </global>
<start to="UpdateEntity"/> <start to="PropagateRelation"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="UpdateEntity">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Update Entity</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkUpdateEntity</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>-mt</arg><arg>yarn</arg>
<arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg>
<arg>--o</arg><arg>${dedupGraphPath}</arg>
</spark>
<ok to="PropagateRelation"/>
<error to="Kill"/>
</action>
<action name="PropagateRelation"> <action name="PropagateRelation">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
@ -122,9 +96,112 @@
<arg>--o</arg><arg>${dedupGraphPath}</arg> <arg>--o</arg><arg>${dedupGraphPath}</arg>
<arg>--w</arg><arg>${workingPath}</arg> <arg>--w</arg><arg>${workingPath}</arg>
</spark> </spark>
<ok to="End"/> <ok to="fork_copy_entities"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<fork name="fork_copy_entities">
<path start="copy_datasource"/>
<path start="copy_project"/>
<path start="copy_organization"/>
<path start="copy_publication"/>
<path start="copy_dataset"/>
<path start="copy_software"/>
<path start="copy_otherresearchproduct"/>
</fork>
<action name="copy_datasource">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${dedupGraphPath}/datasource"/>
</prepare>
<arg>-pb</arg>
<arg>${graphBasePath}/datasource</arg>
<arg>${dedupGraphPath}/datasource</arg>
</distcp>
<ok to="wait_copy"/>
<error to="Kill"/>
</action>
<action name="copy_project">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${dedupGraphPath}/project"/>
</prepare>
<arg>-pb</arg>
<arg>${graphBasePath}/project</arg>
<arg>${dedupGraphPath}/project</arg>
</distcp>
<ok to="wait_copy"/>
<error to="Kill"/>
</action>
<action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${dedupGraphPath}/organization"/>
</prepare>
<arg>-pb</arg>
<arg>${graphBasePath}/organization</arg>
<arg>${dedupGraphPath}/organization</arg>
</distcp>
<ok to="wait_copy"/>
<error to="Kill"/>
</action>
<action name="copy_publication">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${dedupGraphPath}/publication"/>
</prepare>
<arg>-pb</arg>
<arg>${graphBasePath}/publication</arg>
<arg>${dedupGraphPath}/publication</arg>
</distcp>
<ok to="wait_copy"/>
<error to="Kill"/>
</action>
<action name="copy_dataset">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${dedupGraphPath}/dataset"/>
</prepare>
<arg>-pb</arg>
<arg>${graphBasePath}/dataset</arg>
<arg>${dedupGraphPath}/dataset</arg>
</distcp>
<ok to="wait_copy"/>
<error to="Kill"/>
</action>
<action name="copy_software">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${dedupGraphPath}/software"/>
</prepare>
<arg>-pb</arg>
<arg>${graphBasePath}/software</arg>
<arg>${dedupGraphPath}/software</arg>
</distcp>
<ok to="wait_copy"/>
<error to="Kill"/>
</action>
<action name="copy_otherresearchproduct">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${dedupGraphPath}/otherresearchproduct"/>
</prepare>
<arg>-pb</arg>
<arg>${graphBasePath}/otherresearchproduct</arg>
<arg>${dedupGraphPath}/otherresearchproduct</arg>
</distcp>
<ok to="wait_copy"/>
<error to="Kill"/>
</action>
<join name="wait_copy" to="End"/>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -16,6 +16,10 @@
<name>workingPath</name> <name>workingPath</name>
<description>path for the working directory</description> <description>path for the working directory</description>
</property> </property>
<property>
<name>dedupGraphPath</name>
<description>path for the output graph</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -146,6 +150,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>-mt</arg><arg>yarn</arg> <arg>-mt</arg><arg>yarn</arg>
<arg>--i</arg><arg>${graphBasePath}</arg> <arg>--i</arg><arg>${graphBasePath}</arg>
@ -153,6 +158,45 @@
<arg>--la</arg><arg>${isLookUpUrl}</arg> <arg>--la</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg> <arg>--asi</arg><arg>${actionSetId}</arg>
</spark> </spark>
<ok to="UpdateEntity"/>
<error to="Kill"/>
</action>
<action name="UpdateEntity">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Update Entity</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkUpdateEntity</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>-mt</arg><arg>yarn</arg>
<arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg>
<arg>--o</arg><arg>${dedupGraphPath}</arg>
</spark>
<ok to="copyRelations"/>
<error to="Kill"/>
</action>
<action name="copyRelations">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${dedupGraphPath}/relation"/>
</prepare>
<arg>-pb</arg>
<arg>${graphBasePath}/relation</arg>
<arg>${dedupGraphPath}/relation</arg>
</distcp>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>