diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java index 222794d64d..01065510ae 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java @@ -100,6 +100,11 @@ public class DedupUtility { return String.format("%s/%s/%s_mergerel", basePath, actionSetId, entityType); } + public static String createBlockStatsPath( + final String basePath, final String actionSetId, final String entityType) { + return String.format("%s/%s/%s_blockstats", basePath, actionSetId, entityType); + } + public static List getConfigurations(String isLookUpUrl, String orchestrator) throws ISLookUpException, DocumentException { final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java new file mode 100644 index 0000000000..bfd98e78ee --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java @@ -0,0 +1,121 @@ +package eu.dnetlib.dhp.oa.dedup; + +import java.io.IOException; + +import eu.dnetlib.dhp.oa.dedup.model.BlockStats; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.dom4j.DocumentException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.dedup.model.Block; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.model.FieldListImpl; +import eu.dnetlib.pace.model.FieldValueImpl; +import eu.dnetlib.pace.model.MapDocument; +import eu.dnetlib.pace.util.MapDocumentUtil; +import scala.Tuple2; + +public class SparkBlockStats extends AbstractSparkAction { + + private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class); + + public SparkBlockStats(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } + + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json"))); + parser.parseArgument(args); + + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf + .registerKryoClasses( + new Class[] { + MapDocument.class, FieldListImpl.class, FieldValueImpl.class, Block.class + }); + + new SparkCreateSimRels(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } + + @Override + public void run(ISLookUpService isLookUpService) + throws DocumentException, IOException, ISLookUpException { + + // read oozie parameters + final String graphBasePath = parser.get("graphBasePath"); + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + + log.info("graphBasePath: '{}'", graphBasePath); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); + + // for each dedup configuration + for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { + + final String subEntity = dedupConf.getWf().getSubEntityValue(); + log.info("Creating blockstats for: '{}'", subEntity); + + final String outputPath = DedupUtility.createBlockStatsPath(workingPath, actionSetId, subEntity); + removeOutputDir(spark, outputPath); + + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaPairRDD mapDocuments = sc + .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) + .mapToPair( + (PairFunction) s -> { + MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); + return new Tuple2<>(d.getIdentifier(), d); + }); + + // create blocks for deduplication + JavaPairRDD blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf); + + JavaRDD blockStats = blocks.map(b -> + new BlockStats( + b._1(), + (long) b._2().getDocuments().size(), + computeComparisons( + (long) b._2().getDocuments().size(), (long) dedupConf.getWf().getSlidingWindowSize())) + ); + + // save the blockstats in the workingdir + spark + .createDataset(blockStats.rdd(), Encoders.bean(BlockStats.class)) + .write() + .mode(SaveMode.Overwrite) + .save(outputPath); + } + } + + public Long computeComparisons(Long blockSize, Long slidingWindowSize){ + + if (slidingWindowSize >= blockSize) + return (slidingWindowSize * (slidingWindowSize - 1)) / 2; + else { + return (blockSize - slidingWindowSize + 1) * (slidingWindowSize * (slidingWindowSize - 1)) / 2; + } + } +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/BlockStats.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/BlockStats.java new file mode 100644 index 0000000000..ef8505e8fa --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/BlockStats.java @@ -0,0 +1,42 @@ +package eu.dnetlib.dhp.oa.dedup.model; + +public class BlockStats { + + private String key; //key of the block + private Long size; //number of elements in the block + private Long comparisons; //number of comparisons in the block + + public BlockStats() { + } + + public BlockStats(String key, Long size, Long comparisons) { + this.key = key; + this.size = size; + this.comparisons = comparisons; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public Long getSize() { + return size; + } + + public void setSize(Long size) { + this.size = size; + } + + public Long getComparisons() { + return comparisons; + } + + public void setComparisons(Long comparisons) { + this.comparisons = comparisons; + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json new file mode 100644 index 0000000000..ce38dc6f00 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "la", + "paramLongName": "isLookUpUrl", + "paramDescription": "address for the LookUp", + "paramRequired": true + }, + { + "paramName": "asi", + "paramLongName": "actionSetId", + "paramDescription": "action set identifier (name of the orchestrator)", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "graphBasePath", + "paramDescription": "the base path of the raw graph", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingPath", + "paramDescription": "path of the working directory", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/statistics/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/statistics/oozie_app/config-default.xml new file mode 100644 index 0000000000..2e0ed9aeea --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/statistics/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/statistics/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/statistics/oozie_app/workflow.xml new file mode 100644 index 0000000000..e1019cd014 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/statistics/oozie_app/workflow.xml @@ -0,0 +1,111 @@ + + + + graphBasePath + the raw graph base path + + + isLookUpUrl + the address of the lookUp service + + + actionSetId + id of the actionSet + + + workingPath + path for the working directory + + + dedupGraphPath + path for the output graph + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Create Similarity Relations + eu.dnetlib.dhp.oa.dedup.SparkBlockStats + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --i${graphBasePath} + --la${isLookUpUrl} + --asi${actionSetId} + --w${workingPath} + + + + + + + \ No newline at end of file