adjusted parameters for the dedup stats workflow

This commit is contained in:
Claudio Atzori 2020-07-13 19:26:46 +02:00
parent 03ecfa5ebd
commit 66f9f6d323
4 changed files with 237 additions and 246 deletions

View File

@ -1,8 +1,8 @@
package eu.dnetlib.dhp.oa.dedup; package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException; import java.io.IOException;
import eu.dnetlib.dhp.oa.dedup.model.BlockStats;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
@ -15,8 +15,10 @@ import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.model.Block; import eu.dnetlib.dhp.oa.dedup.model.Block;
import eu.dnetlib.dhp.oa.dedup.model.BlockStats;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -29,93 +31,87 @@ import scala.Tuple2;
public class SparkBlockStats extends AbstractSparkAction { public class SparkBlockStats extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class); private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class);
public SparkBlockStats(ArgumentApplicationParser parser, SparkSession spark) { public SparkBlockStats(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark); super(parser, spark);
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(
SparkCreateSimRels.class SparkCreateSimRels.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json"))); "/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json")));
parser.parseArgument(args); parser.parseArgument(args);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf
.registerKryoClasses(
new Class[] {
MapDocument.class, FieldListImpl.class, FieldValueImpl.class, Block.class
});
new SparkCreateSimRels(parser, getSparkSession(conf)) new SparkCreateSimRels(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
} }
@Override @Override
public void run(ISLookUpService isLookUpService) public void run(ISLookUpService isLookUpService)
throws DocumentException, IOException, ISLookUpException { throws DocumentException, IOException, ISLookUpException {
// read oozie parameters // read oozie parameters
final String graphBasePath = parser.get("graphBasePath"); final String graphBasePath = parser.get("graphBasePath");
final String isLookUpUrl = parser.get("isLookUpUrl"); final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId"); final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
log.info("graphBasePath: '{}'", graphBasePath); log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl); log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId); log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath); log.info("workingPath: '{}'", workingPath);
// for each dedup configuration // for each dedup configuration
for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
final String subEntity = dedupConf.getWf().getSubEntityValue(); final String subEntity = dedupConf.getWf().getSubEntityValue();
log.info("Creating blockstats for: '{}'", subEntity); log.info("Creating blockstats for: '{}'", subEntity);
final String outputPath = DedupUtility.createBlockStatsPath(workingPath, actionSetId, subEntity); final String outputPath = DedupUtility.createBlockStatsPath(workingPath, actionSetId, subEntity);
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaPairRDD<String, MapDocument> mapDocuments = sc JavaPairRDD<String, MapDocument> mapDocuments = sc
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
.mapToPair( .mapToPair(
(PairFunction<String, String, MapDocument>) s -> { (PairFunction<String, String, MapDocument>) s -> {
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
return new Tuple2<>(d.getIdentifier(), d); return new Tuple2<>(d.getIdentifier(), d);
}); });
// create blocks for deduplication // create blocks for deduplication
JavaPairRDD<String, Block> blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf); JavaPairRDD<String, Block> blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf);
JavaRDD<BlockStats> blockStats = blocks.map(b -> JavaRDD<BlockStats> blockStats = blocks
new BlockStats( .map(
b._1(), b -> new BlockStats(
(long) b._2().getDocuments().size(), b._1(),
computeComparisons( (long) b._2().getDocuments().size(),
(long) b._2().getDocuments().size(), (long) dedupConf.getWf().getSlidingWindowSize())) computeComparisons(
); (long) b._2().getDocuments().size(), (long) dedupConf.getWf().getSlidingWindowSize())));
// save the blockstats in the workingdir // save the blockstats in the workingdir
spark spark
.createDataset(blockStats.rdd(), Encoders.bean(BlockStats.class)) .createDataset(blockStats.rdd(), Encoders.bean(BlockStats.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.save(outputPath); .save(outputPath);
} }
} }
public Long computeComparisons(Long blockSize, Long slidingWindowSize){ public Long computeComparisons(Long blockSize, Long slidingWindowSize) {
if (slidingWindowSize >= blockSize) if (slidingWindowSize >= blockSize)
return (slidingWindowSize * (slidingWindowSize - 1)) / 2; return (slidingWindowSize * (slidingWindowSize - 1)) / 2;
else { else {
return (blockSize - slidingWindowSize + 1) * (slidingWindowSize * (slidingWindowSize - 1)) / 2; return (blockSize - slidingWindowSize + 1) * (slidingWindowSize * (slidingWindowSize - 1)) / 2;
} }
} }
} }

View File

@ -1,44 +1,45 @@
package eu.dnetlib.dhp.oa.dedup.model; package eu.dnetlib.dhp.oa.dedup.model;
import java.io.Serializable; import java.io.Serializable;
public class BlockStats implements Serializable { public class BlockStats implements Serializable {
private String key; //key of the block private String key; // key of the block
private Long size; //number of elements in the block private Long size; // number of elements in the block
private Long comparisons; //number of comparisons in the block private Long comparisons; // number of comparisons in the block
public BlockStats() { public BlockStats() {
} }
public BlockStats(String key, Long size, Long comparisons) { public BlockStats(String key, Long size, Long comparisons) {
this.key = key; this.key = key;
this.size = size; this.size = size;
this.comparisons = comparisons; this.comparisons = comparisons;
} }
public String getKey() { public String getKey() {
return key; return key;
} }
public void setKey(String key) { public void setKey(String key) {
this.key = key; this.key = key;
} }
public Long getSize() { public Long getSize() {
return size; return size;
} }
public void setSize(Long size) { public void setSize(Long size) {
this.size = size; this.size = size;
} }
public Long getComparisons() { public Long getComparisons() {
return comparisons; return comparisons;
} }
public void setComparisons(Long comparisons) { public void setComparisons(Long comparisons) {
this.comparisons = comparisons; this.comparisons = comparisons;
} }
} }

View File

@ -1,4 +1,4 @@
<workflow-app name="Duplicate Scan" xmlns="uri:oozie:workflow:0.5"> <workflow-app name="Create dedup blocks" xmlns="uri:oozie:workflow:0.5">
<parameters> <parameters>
<property> <property>
<name>graphBasePath</name> <name>graphBasePath</name>
@ -12,14 +12,6 @@
<name>actionSetId</name> <name>actionSetId</name>
<description>id of the actionSet</description> <description>id of the actionSet</description>
</property> </property>
<property>
<name>workingPath</name>
<description>path for the working directory</description>
</property>
<property>
<name>dedupGraphPath</name>
<description>path for the output graph</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -85,7 +77,7 @@
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Create Similarity Relations</name> <name>Create deduplication blocks</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkBlockStats</class> <class>eu.dnetlib.dhp.oa.dedup.SparkBlockStats</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar> <jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
@ -101,7 +93,7 @@
<arg>--i</arg><arg>${graphBasePath}</arg> <arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg> <arg>--la</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg> <arg>--asi</arg><arg>${actionSetId}</arg>
<arg>--w</arg><arg>${workingPath}</arg> <arg>--w</arg><arg>${workingDir}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -1,8 +1,17 @@
package eu.dnetlib.dhp.oa.dedup; package eu.dnetlib.dhp.oa.dedup;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import static java.nio.file.Files.createTempDirectory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -14,162 +23,155 @@ import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import java.io.File; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import java.io.IOException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import java.io.Serializable; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import static java.nio.file.Files.createTempDirectory;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class SparkStatsTest implements Serializable { public class SparkStatsTest implements Serializable {
@Mock(serializable = true) @Mock(serializable = true)
ISLookUpService isLookUpService; ISLookUpService isLookUpService;
private static SparkSession spark; private static SparkSession spark;
private static JavaSparkContext jsc; private static JavaSparkContext jsc;
private static String testGraphBasePath; private static String testGraphBasePath;
private static String testOutputBasePath; private static String testOutputBasePath;
private static final String testActionSetId = "test-orchestrator"; private static final String testActionSetId = "test-orchestrator";
@BeforeAll @BeforeAll
public static void cleanUp() throws IOException, URISyntaxException { public static void cleanUp() throws IOException, URISyntaxException {
testGraphBasePath = Paths testGraphBasePath = Paths
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI()) .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI())
.toFile() .toFile()
.getAbsolutePath(); .getAbsolutePath();
testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
.toAbsolutePath() .toAbsolutePath()
.toString(); .toString();
FileUtils.deleteDirectory(new File(testOutputBasePath)); FileUtils.deleteDirectory(new File(testOutputBasePath));
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
conf.set("spark.sql.shuffle.partitions", "200"); conf.set("spark.sql.shuffle.partitions", "200");
spark = SparkSession spark = SparkSession
.builder() .builder()
.appName(SparkDedupTest.class.getSimpleName()) .appName(SparkDedupTest.class.getSimpleName())
.master("local[*]") .master("local[*]")
.config(conf) .config(conf)
.getOrCreate(); .getOrCreate();
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
} }
@BeforeEach @BeforeEach
public void setUp() throws IOException, ISLookUpException { public void setUp() throws IOException, ISLookUpException {
lenient() lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId))) .when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId)))
.thenReturn( .thenReturn(
IOUtils IOUtils
.toString( .toString(
SparkDedupTest.class SparkDedupTest.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml"))); "/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml")));
lenient() lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization"))) .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization")))
.thenReturn( .thenReturn(
IOUtils IOUtils
.toString( .toString(
SparkDedupTest.class SparkDedupTest.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); "/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
lenient() lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication"))) .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication")))
.thenReturn( .thenReturn(
IOUtils IOUtils
.toString( .toString(
SparkDedupTest.class SparkDedupTest.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json"))); "/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")));
lenient() lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software"))) .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software")))
.thenReturn( .thenReturn(
IOUtils IOUtils
.toString( .toString(
SparkDedupTest.class SparkDedupTest.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json"))); "/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json")));
lenient() lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("dataset"))) .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("dataset")))
.thenReturn( .thenReturn(
IOUtils IOUtils
.toString( .toString(
SparkDedupTest.class SparkDedupTest.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json"))); "/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json")));
lenient() lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("otherresearchproduct"))) .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("otherresearchproduct")))
.thenReturn( .thenReturn(
IOUtils IOUtils
.toString( .toString(
SparkDedupTest.class SparkDedupTest.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json"))); "/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json")));
} }
@Test @Test
public void createBlockStatsTest() throws Exception { public void createBlockStatsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(
SparkCreateSimRels.class SparkCreateSimRels.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json"))); "/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json")));
parser parser
.parseArgument( .parseArgument(
new String[] { new String[] {
"-i", testGraphBasePath, "-i", testGraphBasePath,
"-asi", testActionSetId, "-asi", testActionSetId,
"-la", "lookupurl", "-la", "lookupurl",
"-w", testOutputBasePath "-w", testOutputBasePath
}); });
new SparkBlockStats(parser, spark).run(isLookUpService); new SparkBlockStats(parser, spark).run(isLookUpService);
long orgs_blocks = spark long orgs_blocks = spark
.read() .read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/organization_blockstats") .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_blockstats")
.count(); .count();
long pubs_blocks = spark long pubs_blocks = spark
.read() .read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/publication_blockstats") .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_blockstats")
.count(); .count();
long sw_blocks = spark long sw_blocks = spark
.read() .read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/software_blockstats") .textFile(testOutputBasePath + "/" + testActionSetId + "/software_blockstats")
.count(); .count();
long ds_blocks = spark long ds_blocks = spark
.read() .read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_blockstats") .textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_blockstats")
.count(); .count();
long orp_blocks = spark long orp_blocks = spark
.read() .read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_blockstats") .textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_blockstats")
.count(); .count();
assertEquals(121, orgs_blocks); assertEquals(121, orgs_blocks);
assertEquals(110, pubs_blocks); assertEquals(110, pubs_blocks);
assertEquals(21, sw_blocks); assertEquals(21, sw_blocks);
assertEquals(67, ds_blocks); assertEquals(67, ds_blocks);
assertEquals(55, orp_blocks); assertEquals(55, orp_blocks);
} }
} }