implementation of a new workflow to compute statistics on the blocks

This commit is contained in:
miconis 2020-07-13 18:22:34 +02:00
parent 2c4ed9a043
commit 9258e4f095
6 changed files with 323 additions and 0 deletions

View File

@ -100,6 +100,11 @@ public class DedupUtility {
return String.format("%s/%s/%s_mergerel", basePath, actionSetId, entityType);
}
public static String createBlockStatsPath(
final String basePath, final String actionSetId, final String entityType) {
return String.format("%s/%s/%s_blockstats", basePath, actionSetId, entityType);
}
public static List<DedupConfig> getConfigurations(String isLookUpUrl, String orchestrator)
throws ISLookUpException, DocumentException {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl);

View File

@ -0,0 +1,121 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import eu.dnetlib.dhp.oa.dedup.model.BlockStats;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.model.Block;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.FieldListImpl;
import eu.dnetlib.pace.model.FieldValueImpl;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
public class SparkBlockStats extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class);
public SparkBlockStats(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json")));
parser.parseArgument(args);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf
.registerKryoClasses(
new Class[] {
MapDocument.class, FieldListImpl.class, FieldValueImpl.class, Block.class
});
new SparkCreateSimRels(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
@Override
public void run(ISLookUpService isLookUpService)
throws DocumentException, IOException, ISLookUpException {
// read oozie parameters
final String graphBasePath = parser.get("graphBasePath");
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
// for each dedup configuration
for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
final String subEntity = dedupConf.getWf().getSubEntityValue();
log.info("Creating blockstats for: '{}'", subEntity);
final String outputPath = DedupUtility.createBlockStatsPath(workingPath, actionSetId, subEntity);
removeOutputDir(spark, outputPath);
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaPairRDD<String, MapDocument> mapDocuments = sc
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
.mapToPair(
(PairFunction<String, String, MapDocument>) s -> {
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
return new Tuple2<>(d.getIdentifier(), d);
});
// create blocks for deduplication
JavaPairRDD<String, Block> blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf);
JavaRDD<BlockStats> blockStats = blocks.map(b ->
new BlockStats(
b._1(),
(long) b._2().getDocuments().size(),
computeComparisons(
(long) b._2().getDocuments().size(), (long) dedupConf.getWf().getSlidingWindowSize()))
);
// save the blockstats in the workingdir
spark
.createDataset(blockStats.rdd(), Encoders.bean(BlockStats.class))
.write()
.mode(SaveMode.Overwrite)
.save(outputPath);
}
}
public Long computeComparisons(Long blockSize, Long slidingWindowSize){
if (slidingWindowSize >= blockSize)
return (slidingWindowSize * (slidingWindowSize - 1)) / 2;
else {
return (blockSize - slidingWindowSize + 1) * (slidingWindowSize * (slidingWindowSize - 1)) / 2;
}
}
}

View File

@ -0,0 +1,42 @@
package eu.dnetlib.dhp.oa.dedup.model;
public class BlockStats {
private String key; //key of the block
private Long size; //number of elements in the block
private Long comparisons; //number of comparisons in the block
public BlockStats() {
}
public BlockStats(String key, Long size, Long comparisons) {
this.key = key;
this.size = size;
this.comparisons = comparisons;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public Long getSize() {
return size;
}
public void setSize(Long size) {
this.size = size;
}
public Long getComparisons() {
return comparisons;
}
public void setComparisons(Long comparisons) {
this.comparisons = comparisons;
}
}

View File

@ -0,0 +1,26 @@
[
{
"paramName": "la",
"paramLongName": "isLookUpUrl",
"paramDescription": "address for the LookUp",
"paramRequired": true
},
{
"paramName": "asi",
"paramLongName": "actionSetId",
"paramDescription": "action set identifier (name of the orchestrator)",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "graphBasePath",
"paramDescription": "the base path of the raw graph",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "path of the working directory",
"paramRequired": true
}
]

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -0,0 +1,111 @@
<workflow-app name="Duplicate Scan" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphBasePath</name>
<description>the raw graph base path</description>
</property>
<property>
<name>isLookUpUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>actionSetId</name>
<description>id of the actionSet</description>
</property>
<property>
<name>workingPath</name>
<description>path for the working directory</description>
</property>
<property>
<name>dedupGraphPath</name>
<description>path for the output graph</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="CreateBlockStats"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="CreateBlockStats">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Create Similarity Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkBlockStats</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg>
<arg>--w</arg><arg>${workingPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>