diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopySimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopySimRels.java new file mode 100644 index 0000000000..4409aaee72 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopySimRels.java @@ -0,0 +1,110 @@ +package eu.dnetlib.dhp.oa.dedup; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.config.DedupConfig; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.dom4j.DocumentException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Optional; + +//copy simrels (verified) from relation to the workdir in order to make them available for the deduplication +public class SparkCopySimRels extends AbstractSparkAction{ + private static final Logger log = LoggerFactory.getLogger(SparkCopySimRels.class); + + public SparkCopySimRels(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } + + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); + parser.parseArgument(args); + + SparkConf conf = new SparkConf(); + new SparkCreateSimRels(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } + + @Override + public void run(ISLookUpService isLookUpService) + throws DocumentException, IOException, ISLookUpException { + + // read oozie parameters + final String graphBasePath = parser.get("graphBasePath"); + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(NUM_PARTITIONS); + + log.info("numPartitions: '{}'", numPartitions); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); + + // for each dedup configuration + for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { + + final String entity = dedupConf.getWf().getEntityType(); + final String subEntity = dedupConf.getWf().getSubEntityValue(); + log.info("Copying simrels for: '{}'", subEntity); + + final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity); + removeOutputDir(spark, outputPath); + + final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); + + JavaRDD simRels = spark.read().textFile(relationPath).map(patchRelFn(), Encoders.bean(Relation.class)).toJavaRDD().filter(r -> filterRels(r, entity)); + + simRels.saveAsTextFile(outputPath); + } + } + + private static MapFunction patchRelFn() { + return value -> { + final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } + return rel; + }; + } + + private boolean filterRels(Relation rel, String entityType) { + + switch(entityType) { + case "result": + if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("resultResult") && rel.getSubRelType().equals("dedup")) + return true; + break; + case "organization": + if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("organizationOrganization") && rel.getSubRelType().equals("dedup")) + return true; + break; + default: + return false; + } + return false; + } +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkRemoveDiffRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkRemoveDiffRels.java new file mode 100644 index 0000000000..030f3b7833 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkRemoveDiffRels.java @@ -0,0 +1,233 @@ +package eu.dnetlib.dhp.oa.dedup; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent; +import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor; +import eu.dnetlib.dhp.oa.dedup.model.Block; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.model.MapDocument; +import eu.dnetlib.pace.util.MapDocumentUtil; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.graphx.Edge; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.dom4j.DocumentException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels.DNET_PROVENANCE_ACTIONS; +import static eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels.PROVENANCE_ACTION_CLASS; +import static eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels.hash; + +public class SparkRemoveDiffRels extends AbstractSparkAction { + + private static final Logger log = LoggerFactory.getLogger(SparkRemoveDiffRels.class); + + public SparkRemoveDiffRels(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } + + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); + parser.parseArgument(args); + + SparkConf conf = new SparkConf(); + new SparkCreateSimRels(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } + + @Override + public void run(ISLookUpService isLookUpService) + throws DocumentException, IOException, ISLookUpException { + + // read oozie parameters + final String graphBasePath = parser.get("graphBasePath"); + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(NUM_PARTITIONS); + + log.info("numPartitions: '{}'", numPartitions); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); + + // for each dedup configuration + for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { + + final String entity = dedupConf.getWf().getEntityType(); + final String subEntity = dedupConf.getWf().getSubEntityValue(); + log.info("Removing diffrels for: '{}'", subEntity); + + final String mergeRelsPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); + + final String relationPath = DedupUtility.createEntityPath(graphBasePath, subEntity); + + final int maxIterations = dedupConf.getWf().getMaxIterations(); + log.info("Max iterations {}", maxIterations); + + JavaRDD mergeRelsRDD = spark + .read() + .load(mergeRelsPath) + .as(Encoders.bean(Relation.class)) + .where("relClass == 'merges'") + .toJavaRDD(); + + JavaRDD, String>> diffRelsRDD = spark + .read() + .textFile(relationPath) + .map(patchRelFn(), Encoders.bean(Relation.class)) + .toJavaRDD().filter(r -> filterRels(r, entity)) + .map(rel -> { + if (rel.getSource().compareTo(rel.getTarget()) < 0) + return new Tuple2<>(new Tuple2<>(rel.getSource(), rel.getTarget()), "diffRel"); + else + return new Tuple2<>(new Tuple2<>(rel.getTarget(), rel.getSource()), "diffRel"); + }); + + JavaRDD, String>> flatMergeRels = mergeRelsRDD + .mapToPair(rel -> new Tuple2<>(rel.getSource(), rel.getTarget())) + .groupByKey() + .flatMap(g -> { + List, String>> rels = new ArrayList<>(); + + List ids = StreamSupport + .stream(g._2().spliterator(), false) + .collect(Collectors.toList()); + + for (int i = 0; i < ids.size(); i++){ + for (int j = i+1; j < ids.size(); j++){ + if (ids.get(i).compareTo(ids.get(j)) < 0) + rels.add(new Tuple2<>(new Tuple2<>(ids.get(i), ids.get(j)), g._1())); + else + rels.add(new Tuple2<>(new Tuple2<>(ids.get(j), ids.get(i)), g._1())); + } + } + return rels.iterator(); + + }); + + JavaRDD purgedMergeRels = flatMergeRels.union(diffRelsRDD) + .mapToPair(rel -> new Tuple2<>(rel._1(), Arrays.asList(rel._2()))) + .reduceByKey((a, b) -> { + List list = new ArrayList(); + list.addAll(a); + list.addAll(b); + return list; + }) + .filter(rel -> rel._2().size() == 1) + .mapToPair(rel -> new Tuple2<>(rel._2().get(0), rel._1())) + .flatMap(rel -> { + List> rels = new ArrayList<>(); + String source = rel._1(); + rels.add(new Tuple2<>(source, rel._2()._1())); + rels.add(new Tuple2<>(source, rel._2()._2())); + return rels.iterator(); + }) + .distinct() + .flatMap(rel -> tupleToMergeRel(rel, dedupConf)); + + spark + .createDataset(purgedMergeRels.rdd(), Encoders.bean(Relation.class)) + .write() + .mode(SaveMode.Overwrite).parquet(mergeRelsPath); + } + } + + private static MapFunction patchRelFn() { + return value -> { + final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } + return rel; + }; + } + + private boolean filterRels(Relation rel, String entityType) { + + switch(entityType) { + case "result": + if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("resultResult") && rel.getSubRelType().equals("dedup")) + return true; + break; + case "organization": + if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("organizationOrganization") && rel.getSubRelType().equals("dedup")) + return true; + break; + default: + return false; + } + return false; + } + + public Iterator tupleToMergeRel(Tuple2 rel, DedupConfig dedupConf) { + + List rels = new ArrayList<>(); + + rels.add(rel(rel._1(), rel._2(), "merges", dedupConf)); + rels.add(rel(rel._2(), rel._1(), "isMergedIn", dedupConf)); + + return rels.iterator(); + } + + private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) { + + String entityType = dedupConf.getWf().getEntityType(); + + Relation r = new Relation(); + r.setSource(source); + r.setTarget(target); + r.setRelClass(relClass); + r.setRelType(entityType + entityType.substring(0, 1).toUpperCase() + entityType.substring(1)); + r.setSubRelType("dedup"); + + DataInfo info = new DataInfo(); + info.setDeletedbyinference(false); + info.setInferred(true); + info.setInvisible(false); + info.setInferenceprovenance(dedupConf.getWf().getConfigurationId()); + Qualifier provenanceAction = new Qualifier(); + provenanceAction.setClassid(PROVENANCE_ACTION_CLASS); + provenanceAction.setClassname(PROVENANCE_ACTION_CLASS); + provenanceAction.setSchemeid(DNET_PROVENANCE_ACTIONS); + provenanceAction.setSchemename(DNET_PROVENANCE_ACTIONS); + info.setProvenanceaction(provenanceAction); + + // TODO calculate the trust value based on the similarity score of the elements in the CC + // info.setTrust(); + + r.setDataInfo(info); + return r; + } +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/config-default.xml new file mode 100644 index 0000000000..2e0ed9aeea --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml new file mode 100644 index 0000000000..092a5c30e9 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml @@ -0,0 +1,324 @@ + + + + graphBasePath + the raw graph base path + + + isLookUpUrl + the address of the lookUp service + + + actionSetId + id of the actionSet + + + workingPath + path for the working directory + + + dedupGraphPath + path for the output graph + + + cutConnectedComponent + max number of elements in a connected component + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + yarn + cluster + Copy Similarity Relations + eu.dnetlib.dhp.oa.dedup.SparkCopySimRels + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} + --numPartitions8000 + + + + + + + + + + + + yarn + cluster + Create Dedup Record + eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} + + + + + + + + yarn + cluster + Update Entity + eu.dnetlib.dhp.oa.dedup.SparkUpdateEntity + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --dedupGraphPath${dedupGraphPath} + + + + + + + + yarn + cluster + Create Similarity Relations + eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} + --workingPath${workingPath} + --numPartitions8000 + + + + + + + + yarn + cluster + Create Merge Relations + eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} + --cutConnectedComponent${cutConnectedComponent} + + + + + + + + yarn + cluster + Create Merge Relations + eu.dnetlib.dhp.oa.dedup.SparkRemoveDiffRels + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} + --workingPath${workingPath} + --numPartitions8000 + + + + + + + + yarn + cluster + Prepare Organization Relations + eu.dnetlib.dhp.oa.dedup.SparkPrepareOrgRels + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} + --dbUrl${dbUrl} + --dbTable${dbTable} + --dbUser${dbUser} + --dbPwd${dbPwd} + --numConnections20 + + + + + + + + yarn + cluster + Prepare New Organizations + eu.dnetlib.dhp.oa.dedup.SparkPrepareNewOrgs + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} + --apiUrl${apiUrl} + --dbUrl${dbUrl} + --dbTable${dbTable} + --dbUser${dbUser} + --dbPwd${dbPwd} + --numConnections20 + + + + + + + \ No newline at end of file