From 8fea29177ccd9d98940151862cea853167bdc045 Mon Sep 17 00:00:00 2001 From: miconis Date: Mon, 18 Jan 2021 16:48:08 +0100 Subject: [PATCH] refactoring, minor changes and implementation of the wf for openorgs with integration of organization phases into the scan wf --- .../dhp/oa/dedup/SparkCopyOpenorgs.java | 100 ++++++++++++++++++ ...arkCopySimRels.java => SparkCopyRels.java} | 53 +++++----- .../dhp/oa/dedup/copyOpenorgs_parameters.json | 26 +++++ .../dhp/oa/dedup/copyRels_parameters.json | 32 ++++++ .../oa/dedup/openorgs/oozie_app/workflow.xml | 65 +----------- .../dhp/oa/dedup/scan/oozie_app/workflow.xml | 55 ++++++++++ 6 files changed, 247 insertions(+), 84 deletions(-) create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java rename dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/{SparkCopySimRels.java => SparkCopyRels.java} (70%) create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyRels_parameters.json diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java new file mode 100644 index 000000000..12ae4e73a --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java @@ -0,0 +1,100 @@ +package eu.dnetlib.dhp.oa.dedup; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.dom4j.DocumentException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Optional; + +public class SparkCopyOpenorgs extends AbstractSparkAction{ + private static final Logger log = LoggerFactory.getLogger(SparkCopyRels.class); + + public SparkCopyOpenorgs(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } + + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json"))); + parser.parseArgument(args); + + SparkConf conf = new SparkConf(); + new SparkCopyOpenorgs(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } + + @Override + public void run(ISLookUpService isLookUpService) + throws DocumentException, IOException, ISLookUpException { + + // read oozie parameters + final String graphBasePath = parser.get("graphBasePath"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(NUM_PARTITIONS); + + log.info("numPartitions: '{}'", numPartitions); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); + + String subEntity = "organization"; + log.info("Copying openorgs to the working dir"); + + final String outputPath = DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity); + removeOutputDir(spark, outputPath); + + final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity); + + final Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); + + filterEntities(spark, entityPath, clazz) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + + } + + public static Dataset filterEntities( + final SparkSession spark, + final String entitiesInputPath, + final Class clazz) { + + // + Dataset entities = spark + .read() + .textFile(entitiesInputPath) + .map( + (MapFunction) it -> { + T entity = OBJECT_MAPPER.readValue(it, clazz); + return entity; + }, + Encoders.kryo(clazz)); + + return entities.filter(entities.col("id").contains("openorgs____")); + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopySimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRels.java similarity index 70% rename from dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopySimRels.java rename to dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRels.java index 4409aaee7..802085ab9 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopySimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRels.java @@ -6,12 +6,10 @@ import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.config.DedupConfig; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.dom4j.DocumentException; @@ -22,10 +20,10 @@ import java.io.IOException; import java.util.Optional; //copy simrels (verified) from relation to the workdir in order to make them available for the deduplication -public class SparkCopySimRels extends AbstractSparkAction{ - private static final Logger log = LoggerFactory.getLogger(SparkCopySimRels.class); +public class SparkCopyRels extends AbstractSparkAction{ + private static final Logger log = LoggerFactory.getLogger(SparkCopyRels.class); - public SparkCopySimRels(ArgumentApplicationParser parser, SparkSession spark) { + public SparkCopyRels(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); } @@ -33,13 +31,13 @@ public class SparkCopySimRels extends AbstractSparkAction{ ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - SparkCreateSimRels.class + SparkCopyRels.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); + "/eu/dnetlib/dhp/oa/dedup/copyRels_parameters.json"))); parser.parseArgument(args); SparkConf conf = new SparkConf(); - new SparkCreateSimRels(parser, getSparkSession(conf)) + new SparkCopyRels(parser, getSparkSession(conf)) .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); } @@ -49,9 +47,10 @@ public class SparkCopySimRels extends AbstractSparkAction{ // read oozie parameters final String graphBasePath = parser.get("graphBasePath"); - final String isLookUpUrl = parser.get("isLookUpUrl"); final String actionSetId = parser.get("actionSetId"); final String workingPath = parser.get("workingPath"); + final String destination = parser.get("destination"); + final String entity = parser.get("entityType"); final int numPartitions = Optional .ofNullable(parser.get("numPartitions")) .map(Integer::valueOf) @@ -59,26 +58,32 @@ public class SparkCopySimRels extends AbstractSparkAction{ log.info("numPartitions: '{}'", numPartitions); log.info("graphBasePath: '{}'", graphBasePath); - log.info("isLookUpUrl: '{}'", isLookUpUrl); log.info("actionSetId: '{}'", actionSetId); log.info("workingPath: '{}'", workingPath); + log.info("entity: '{}'", entity); - // for each dedup configuration - for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { + log.info("Copying " + destination + " for: '{}'", entity); - final String entity = dedupConf.getWf().getEntityType(); - final String subEntity = dedupConf.getWf().getSubEntityValue(); - log.info("Copying simrels for: '{}'", subEntity); - - final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity); - removeOutputDir(spark, outputPath); - - final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); - - JavaRDD simRels = spark.read().textFile(relationPath).map(patchRelFn(), Encoders.bean(Relation.class)).toJavaRDD().filter(r -> filterRels(r, entity)); - - simRels.saveAsTextFile(outputPath); + final String outputPath; + if (destination.contains("mergerel")) { + outputPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, entity); } + else { + outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, entity); + } + + removeOutputDir(spark, outputPath); + + final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); + + JavaRDD simRels = + spark.read() + .textFile(relationPath) + .map(patchRelFn(), Encoders.bean(Relation.class)) + .toJavaRDD() + .filter(r -> filterRels(r, entity)); + + simRels.saveAsTextFile(outputPath); } private static MapFunction patchRelFn() { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json new file mode 100644 index 000000000..e45efca01 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "asi", + "paramLongName": "actionSetId", + "paramDescription": "action set identifier (name of the orchestrator)", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "graphBasePath", + "paramDescription": "the base path of the raw graph", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingPath", + "paramDescription": "path of the working directory", + "paramRequired": true + }, + { + "paramName": "np", + "paramLongName": "numPartitions", + "paramDescription": "number of partitions for the similarity relations intermediate phases", + "paramRequired": false + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyRels_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyRels_parameters.json new file mode 100644 index 000000000..715b0e74e --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyRels_parameters.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "asi", + "paramLongName": "actionSetId", + "paramDescription": "action set identifier (name of the orchestrator)", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "graphBasePath", + "paramDescription": "the base path of the raw graph", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingPath", + "paramDescription": "path of the working directory", + "paramRequired": true + }, + { + "paramName": "e", + "paramLongName": "entityType", + "paramDescription": "type of the entity for the merge relations", + "paramRequired": true + }, + { + "paramName": "np", + "paramLongName": "numPartitions", + "paramDescription": "number of partitions for the similarity relations intermediate phases", + "paramRequired": false + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml index 092a5c30e..2f961face 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml @@ -93,13 +93,12 @@ - - + yarn cluster - Copy Similarity Relations - eu.dnetlib.dhp.oa.dedup.SparkCopySimRels + Copy Merge Relations + eu.dnetlib.dhp.oa.dedup.SparkCopyRels dhp-dedup-openaire-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -113,65 +112,11 @@ --graphBasePath${graphBasePath} --workingPath${workingPath} - --isLookUpUrl${isLookUpUrl} --actionSetId${actionSetId} + --entityTypeorganization + --destinationsimrel --numPartitions8000 - - - - - - - - - - - yarn - cluster - Create Dedup Record - eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - --graphBasePath${graphBasePath} - --workingPath${workingPath} - --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} - - - - - - - - yarn - cluster - Update Entity - eu.dnetlib.dhp.oa.dedup.SparkUpdateEntity - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - --graphBasePath${graphBasePath} - --workingPath${workingPath} - --dedupGraphPath${dedupGraphPath} - diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml index c42ce1263..d22f05ca8 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml @@ -169,6 +169,61 @@ --isLookUpUrl${isLookUpUrl} --actionSetId${actionSetId} + + + + + + + + yarn + cluster + Copy Merge Relations + eu.dnetlib.dhp.oa.dedup.SparkCopyRels + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --actionSetId${actionSetId} + --entityTypeorganization + --destinationmergerel + --numPartitions8000 + + + + + + + + yarn + cluster + Copy Entities + eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgs + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} +