From 038ac7afd71a01efd02dca666c27dca8044d09e0 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 17 Apr 2020 13:12:44 +0200 Subject: [PATCH] relation consistency workflow separated from dedup scan and creation of CCs --- .../eu/dnetlib/dhp/common/HdfsSupport.java | 15 ++ .../PartitionActionSetsByPayloadTypeJob.java | 2 + .../dhp/oa/dedup/AbstractSparkAction.java | 9 ++ .../dhp/oa/dedup/SparkCreateDedupRecord.java | 20 +-- .../dhp/oa/dedup/SparkCreateMergeRels.java | 100 +++++++++---- .../dhp/oa/dedup/SparkCreateSimRels.java | 55 +++----- .../dhp/oa/dedup/SparkPropagateRelation.java | 5 +- .../dhp/oa/dedup/SparkUpdateEntity.java | 110 +++++++-------- .../dedup/consistency/oozie_app/workflow.xml | 133 ++++++++++++++---- .../dhp/oa/dedup/scan/oozie_app/workflow.xml | 44 ++++++ 10 files changed, 330 insertions(+), 163 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java index 05beaa51e..f6b94c921 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java @@ -22,6 +22,21 @@ public class HdfsSupport { private HdfsSupport() { } + /** + * Checks a path (file or dir) exists on HDFS. + * + * @param path Path to be checked + * @param configuration Configuration of hadoop env + */ + public static boolean exists(String path, Configuration configuration) { + logger.info("Removing path: {}", path); + return rethrowAsRuntimeException(() -> { + Path f = new Path(path); + FileSystem fileSystem = FileSystem.get(configuration); + return fileSystem.exists(f); + }); + } + /** * Removes a path (file or dir) from HDFS. * diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java index 31a4da190..7c4d0616c 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java @@ -99,6 +99,8 @@ public class PartitionActionSetsByPayloadTypeJob { List inputActionSetPaths, String outputPath) { inputActionSetPaths + .stream() + .filter(path -> HdfsSupport.exists(path, spark.sparkContext().hadoopConfiguration())) .forEach(inputActionSetPath -> { Dataset actionDS = readActionSetFromPath(spark, inputActionSetPath); saveActions(actionDS, outputPath); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java index 8549839fc..6213f342a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java @@ -1,7 +1,9 @@ package eu.dnetlib.dhp.oa.dedup; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -21,6 +23,9 @@ import java.util.List; abstract class AbstractSparkAction implements Serializable { + protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + public ArgumentApplicationParser parser; //parameters for the spark action public SparkSession spark; //the spark session @@ -108,4 +113,8 @@ abstract class AbstractSparkAction implements Serializable { .config(conf) .getOrCreate(); } + + protected static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java index 22e6f145e..02cf6587a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java @@ -1,6 +1,5 @@ package eu.dnetlib.dhp.oa.dedup; -import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.utils.ISLookupClientFactory; @@ -42,26 +41,27 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { final String actionSetId = parser.get("actionSetId"); final String workingPath = parser.get("workingPath"); - System.out.println(String.format("graphBasePath: '%s'", graphBasePath)); - System.out.println(String.format("isLookUpUrl: '%s'", isLookUpUrl)); - System.out.println(String.format("actionSetId: '%s'", actionSetId)); - System.out.println(String.format("workingPath: '%s'", workingPath)); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); for (DedupConfig dedupConf: getConfigurations(isLookUpService, actionSetId)) { String subEntity = dedupConf.getWf().getSubEntityValue(); - System.out.println(String.format("Creating deduprecords for: '%s'", subEntity)); + log.info("Creating deduprecords for: '{}'", subEntity); + + final String outputPath = DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity); + removeOutputDir(spark, outputPath); final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity); final OafEntityType entityType = OafEntityType.valueOf(subEntity); final JavaRDD dedupRecord = DedupRecordFactory.createDedupRecord(sc, spark, mergeRelPath, entityPath, entityType, dedupConf); - dedupRecord.map(r -> { - ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(r); - }).saveAsTextFile(DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity)); + + dedupRecord.map(r -> OBJECT_MAPPER.writeValueAsString(r)).saveAsTextFile(outputPath); } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index fb607a87e..ef82b4eaf 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -4,6 +4,8 @@ import com.google.common.hash.Hashing; import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; @@ -19,6 +21,7 @@ import org.apache.spark.graphx.Edge; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.dom4j.DocumentException; import scala.Tuple2; @@ -32,7 +35,9 @@ import java.util.List; public class SparkCreateMergeRels extends AbstractSparkAction { + public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup"; private static final Logger log = LoggerFactory.getLogger(SparkCreateMergeRels.class); + public static final String DNET_PROVENANCE_ACTIONS = "dnet:provenanceActions"; public SparkCreateMergeRels(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); @@ -44,7 +49,10 @@ public class SparkCreateMergeRels extends AbstractSparkAction { SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); parser.parseArgument(args); - new SparkCreateMergeRels(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + final String isLookUpUrl = parser.get("isLookUpUrl"); + log.info("isLookupUrl {}", isLookUpUrl); + + new SparkCreateMergeRels(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(isLookUpUrl)); } @Override @@ -55,55 +63,91 @@ public class SparkCreateMergeRels extends AbstractSparkAction { final String isLookUpUrl = parser.get("isLookUpUrl"); final String actionSetId = parser.get("actionSetId"); - System.out.println(String.format("graphBasePath: '%s'", graphBasePath)); - System.out.println(String.format("isLookUpUrl: '%s'", isLookUpUrl)); - System.out.println(String.format("actionSetId: '%s'", actionSetId)); - System.out.println(String.format("workingPath: '%s'", workingPath)); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); for (DedupConfig dedupConf: getConfigurations(isLookUpService, actionSetId)) { final String subEntity = dedupConf.getWf().getSubEntityValue(); - System.out.println(String.format("Creating mergerels for: '%s'", subEntity)); + log.info("Creating mergerels for: '{}'", subEntity); + + final int maxIterations = dedupConf.getWf().getMaxIterations(); + log.info("Max iterations {}", maxIterations); + + final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); final JavaPairRDD vertexes = sc.textFile(graphBasePath + "/" + subEntity) .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) .mapToPair((PairFunction) - s -> new Tuple2(getHashcode(s), s) - ); + s -> new Tuple2<>(getHashcode(s), s)); - final Dataset similarityRelations = spark.read().load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)).as(Encoders.bean(Relation.class)); - final RDD> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd(); - final JavaRDD cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD(); - final Dataset mergeRelation = spark.createDataset(cc.filter(k -> k.getDocIds().size() > 1) - .flatMap(this::ccToMergeRel).rdd(), Encoders.bean(Relation.class)); + final Dataset similarityRelations = spark + .read() + .load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)) + .as(Encoders.bean(Relation.class)); - mergeRelation - .write().mode("overwrite") - .save(DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity)); + final RDD> edgeRdd = similarityRelations + .javaRDD() + .map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())) + .rdd(); + + final RDD connectedComponents = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, maxIterations) + .toJavaRDD() + .filter(k -> k.getDocIds().size() > 1) + .flatMap(cc -> ccToMergeRel(cc, dedupConf)) + .rdd(); + + spark + .createDataset(connectedComponents, Encoders.bean(Relation.class)) + .write() + .mode(SaveMode.Append) + .save(mergeRelPath); } } - public Iterator ccToMergeRel(ConnectedComponent cc){ + public Iterator ccToMergeRel(ConnectedComponent cc, DedupConfig dedupConf){ return cc.getDocIds() .stream() .flatMap(id -> { List tmp = new ArrayList<>(); - Relation r = new Relation(); - r.setSource(cc.getCcId()); - r.setTarget(id); - r.setRelClass("merges"); - tmp.add(r); - r = new Relation(); - r.setTarget(cc.getCcId()); - r.setSource(id); - r.setRelClass("isMergedIn"); - tmp.add(r); + + tmp.add(rel(cc.getCcId(), id, "merges", dedupConf)); + tmp.add(rel(id, cc.getCcId(), "isMergedIn", dedupConf)); + return tmp.stream(); }).iterator(); } + private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) { + Relation r = new Relation(); + r.setSource(source); + r.setTarget(target); + r.setRelClass(relClass); + r.setSubRelType("dedup"); + + DataInfo info = new DataInfo(); + info.setDeletedbyinference(false); + info.setInferred(true); + info.setInvisible(false); + info.setInferenceprovenance(dedupConf.getWf().getConfigurationId()); + Qualifier provenanceAction = new Qualifier(); + provenanceAction.setClassid(PROVENANCE_ACTION_CLASS); + provenanceAction.setClassname(PROVENANCE_ACTION_CLASS); + provenanceAction.setSchemeid(DNET_PROVENANCE_ACTIONS); + provenanceAction.setSchemename(DNET_PROVENANCE_ACTIONS); + info.setProvenanceaction(provenanceAction); + + //TODO calculate the trust value based on the similarity score of the elements in the CC + //info.setTrust(); + + r.setDataInfo(info); + return r; + } + public static long getHashcode(final String id) { return Hashing.murmur3_128().hashString(id).asLong(); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index a2b7e7b5d..de842d822 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; @@ -18,6 +19,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.dom4j.DocumentException; import org.slf4j.Logger; @@ -53,24 +55,27 @@ public class SparkCreateSimRels extends AbstractSparkAction { final String actionSetId = parser.get("actionSetId"); final String workingPath = parser.get("workingPath"); - System.out.println(String.format("graphBasePath: '%s'", graphBasePath)); - System.out.println(String.format("isLookUpUrl: '%s'", isLookUpUrl)); - System.out.println(String.format("actionSetId: '%s'", actionSetId)); - System.out.println(String.format("workingPath: '%s'", workingPath)); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); //for each dedup configuration for (DedupConfig dedupConf: getConfigurations(isLookUpService, actionSetId)) { final String entity = dedupConf.getWf().getEntityType(); final String subEntity = dedupConf.getWf().getSubEntityValue(); - System.out.println(String.format("Creating simrels for: '%s'", subEntity)); + log.info("Creating simrels for: '{}'", subEntity); + + final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity); + removeOutputDir(spark, outputPath); JavaPairRDD mapDocument = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) .mapToPair((PairFunction) s -> { MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); - return new Tuple2(d.getIdentifier(), d); + return new Tuple2<>(d.getIdentifier(), d); }); //create blocks for deduplication @@ -84,46 +89,30 @@ public class SparkCreateSimRels extends AbstractSparkAction { //save the simrel in the workingdir spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class)) .write() - .mode("overwrite") - .save(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)); + .mode(SaveMode.Append) + .save(outputPath); } } - /** - * Utility method used to create an atomic action from a Relation object - * @param relation input relation - * @return A tuple2 with [id, json serialization of the atomic action] - * @throws JsonProcessingException - */ - public Tuple2 createSequenceFileRow(Relation relation) throws JsonProcessingException { - - ObjectMapper mapper = new ObjectMapper(); - - String id = relation.getSource() + "@" + relation.getRelClass() + "@" + relation.getTarget(); - AtomicAction aa = new AtomicAction<>(Relation.class, relation); - - return new Tuple2<>( - new Text(id), - new Text(mapper.writeValueAsString(aa)) - ); - } - - public Relation createSimRel(String source, String target, String entity){ + public Relation createSimRel(String source, String target, String entity) { final Relation r = new Relation(); r.setSource(source); r.setTarget(target); + r.setSubRelType("dedupSimilarity"); + r.setRelClass("isSimilarTo"); + r.setDataInfo(new DataInfo()); switch(entity){ case "result": - r.setRelClass("resultResult_dedupSimilarity_isSimilarTo"); + r.setRelType("resultResult"); break; case "organization": - r.setRelClass("organizationOrganization_dedupSimilarity_isSimilarTo"); + r.setRelType("organizationOrganization"); break; default: - r.setRelClass("isSimilarTo"); - break; + throw new IllegalArgumentException("unmanaged entity type: " + entity); } return r; } + } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 4e012293b..371d70ba2 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -27,9 +27,6 @@ public class SparkPropagateRelation extends AbstractSparkAction { private static final Logger log = LoggerFactory.getLogger(SparkPropagateRelation.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - enum FieldType { SOURCE, TARGET @@ -62,7 +59,7 @@ public class SparkPropagateRelation extends AbstractSparkAction { log.info("dedupGraphPath: '{}'", dedupGraphPath); final String outputRelationPath = DedupUtility.createEntityPath(dedupGraphPath, "relation"); - deletePath(outputRelationPath); + removeOutputDir(spark, outputRelationPath); Dataset mergeRels = spark.read() .load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java index c2e1df78c..5347489e7 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java @@ -3,17 +3,18 @@ package eu.dnetlib.dhp.oa.dedup; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -27,7 +28,7 @@ import org.slf4j.LoggerFactory; import scala.Tuple2; import java.io.IOException; -import java.io.Serializable; +import java.util.Map; public class SparkUpdateEntity extends AbstractSparkAction { @@ -48,98 +49,87 @@ public class SparkUpdateEntity extends AbstractSparkAction { new SparkUpdateEntity(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); } - public boolean mergeRelExists(String basePath, String entity) throws IOException { - - boolean result = false; - - FileSystem fileSystem = FileSystem.get(new Configuration()); - - FileStatus[] fileStatuses = fileSystem.listStatus(new Path(basePath)); - - for (FileStatus fs : fileStatuses) { - if (fs.isDirectory()) - if (fileSystem.exists(new Path(DedupUtility.createMergeRelPath(basePath, fs.getPath().getName(), entity)))) - result = true; - } - - return result; - } - public void run(ISLookUpService isLookUpService) throws IOException { final String graphBasePath = parser.get("graphBasePath"); final String workingPath = parser.get("workingPath"); final String dedupGraphPath = parser.get("dedupGraphPath"); - System.out.println(String.format("graphBasePath: '%s'", graphBasePath)); - System.out.println(String.format("workingPath: '%s'", workingPath)); - System.out.println(String.format("dedupGraphPath: '%s'", dedupGraphPath)); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("workingPath: '{}'", workingPath); + log.info("dedupGraphPath: '{}'", dedupGraphPath); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); //for each entity - for (OafEntityType entity: OafEntityType.values()) { + ModelSupport.entityTypes.forEach((entity, clazz) -> { + final String outputPath = dedupGraphPath + "/" + entity; + removeOutputDir(spark, outputPath); JavaRDD sourceEntity = sc.textFile(DedupUtility.createEntityPath(graphBasePath, entity.toString())); if (mergeRelExists(workingPath, entity.toString())) { - final Dataset rel = spark.read().load(DedupUtility.createMergeRelPath(workingPath, "*", entity.toString())).as(Encoders.bean(Relation.class)); + final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, "*", entity.toString()); + final String dedupRecordPath = DedupUtility.createDedupRecordPath(workingPath, "*", entity.toString()); + + final Dataset rel = spark.read() + .load(mergeRelPath) + .as(Encoders.bean(Relation.class)); final JavaPairRDD mergedIds = rel .where("relClass == 'merges'") .select(rel.col("target")) .distinct() .toJavaRDD() - .mapToPair((PairFunction) r -> new Tuple2(r.getString(0), "d")); + .mapToPair((PairFunction) r -> new Tuple2<>(r.getString(0), "d")); - final JavaRDD dedupEntity = sc.textFile(DedupUtility.createDedupRecordPath(workingPath, "*", entity.toString())); + JavaPairRDD entitiesWithId = sourceEntity + .mapToPair((PairFunction) s -> new Tuple2<>(MapDocumentUtil.getJPathString(IDJSONPATH, s), s)); - JavaPairRDD entitiesWithId = sourceEntity.mapToPair((PairFunction) s -> new Tuple2(MapDocumentUtil.getJPathString(IDJSONPATH, s), s)); - - JavaRDD map = entitiesWithId.leftOuterJoin(mergedIds).map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), getOafClass(entity)) : k._2()._1()); - sourceEntity = map.union(dedupEntity); + JavaRDD map = entitiesWithId + .leftOuterJoin(mergedIds) + .map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), clazz) : k._2()._1()); + sourceEntity = map.union(sc.textFile(dedupRecordPath)); } - sourceEntity.saveAsTextFile(dedupGraphPath + "/" + entity, GzipCodec.class); + sourceEntity.saveAsTextFile(outputPath, GzipCodec.class); - } + }); } - public Class getOafClass(OafEntityType className) { - switch (className.toString()) { - case "publication": - return Publication.class; - case "dataset": - return eu.dnetlib.dhp.schema.oaf.Dataset.class; - case "datasource": - return Datasource.class; - case "software": - return Software.class; - case "organization": - return Organization.class; - case "otherresearchproduct": - return OtherResearchProduct.class; - case "project": - return Project.class; - default: - throw new IllegalArgumentException("Illegal type " + className); - } - } + public boolean mergeRelExists(String basePath, String entity) { - private static String updateDeletedByInference(final String json, final Class clazz) { - final ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + boolean result = false; try { - Oaf entity = mapper.readValue(json, clazz); + FileSystem fileSystem = FileSystem.get(new Configuration()); + + FileStatus[] fileStatuses = fileSystem.listStatus(new Path(basePath)); + + for (FileStatus fs : fileStatuses) { + if (fs.isDirectory()) + if (fileSystem.exists(new Path(DedupUtility.createMergeRelPath(basePath, fs.getPath().getName(), entity)))) + result = true; + } + + return result; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static String updateDeletedByInference(final String json, final Class clazz) { + try { + Oaf entity = OBJECT_MAPPER.readValue(json, clazz); if (entity.getDataInfo()== null) entity.setDataInfo(new DataInfo()); entity.getDataInfo().setDeletedbyinference(true); - return mapper.writeValueAsString(entity); + return OBJECT_MAPPER.writeValueAsString(entity); } catch (IOException e) { throw new RuntimeException("Unable to convert json", e); } } + } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml index 69411701b..df6877dfa 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml @@ -68,38 +68,12 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - yarn - cluster - Update Entity - eu.dnetlib.dhp.oa.dedup.SparkUpdateEntity - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - -mtyarn - --i${graphBasePath} - --w${workingPath} - --o${dedupGraphPath} - - - - - yarn @@ -122,9 +96,112 @@ --o${dedupGraphPath} --w${workingPath} - + + + + + + + + + + + + + + + + + -pb + ${graphBasePath}/datasource + ${dedupGraphPath}/datasource + + + + + + + + + + + -pb + ${graphBasePath}/project + ${dedupGraphPath}/project + + + + + + + + + + + -pb + ${graphBasePath}/organization + ${dedupGraphPath}/organization + + + + + + + + + + + -pb + ${graphBasePath}/publication + ${dedupGraphPath}/publication + + + + + + + + + + + -pb + ${graphBasePath}/dataset + ${dedupGraphPath}/dataset + + + + + + + + + + + -pb + ${graphBasePath}/software + ${dedupGraphPath}/software + + + + + + + + + + + -pb + ${graphBasePath}/otherresearchproduct + ${dedupGraphPath}/otherresearchproduct + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml index 6c74a204d..6790dde32 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml @@ -16,6 +16,10 @@ workingPath path for the working directory + + dedupGraphPath + path for the output graph + sparkDriverMemory memory for driver process @@ -146,6 +150,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 -mtyarn --i${graphBasePath} @@ -153,6 +158,45 @@ --la${isLookUpUrl} --asi${actionSetId} + + + + + + + yarn + cluster + Update Entity + eu.dnetlib.dhp.oa.dedup.SparkUpdateEntity + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + -mtyarn + --i${graphBasePath} + --w${workingPath} + --o${dedupGraphPath} + + + + + + + + + + + -pb + ${graphBasePath}/relation + ${dedupGraphPath}/relation +