merge done

This commit is contained in:
miconis 2020-07-13 19:57:02 +02:00
commit b52c246aed
4 changed files with 226 additions and 228 deletions

View File

@ -1,8 +1,8 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import eu.dnetlib.dhp.oa.dedup.model.BlockStats;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
@ -15,8 +15,10 @@ import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.model.Block;
import eu.dnetlib.dhp.oa.dedup.model.BlockStats;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -56,60 +58,6 @@ public class SparkBlockStats extends AbstractSparkAction {
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
@Override
public void run(ISLookUpService isLookUpService)
throws DocumentException, IOException, ISLookUpException {
// read oozie parameters
final String graphBasePath = parser.get("graphBasePath");
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
// for each dedup configuration
for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
final String subEntity = dedupConf.getWf().getSubEntityValue();
log.info("Creating blockstats for: '{}'", subEntity);
final String outputPath = DedupUtility.createBlockStatsPath(workingPath, actionSetId, subEntity);
removeOutputDir(spark, outputPath);
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaPairRDD<String, MapDocument> mapDocuments = sc
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
.mapToPair(
(PairFunction<String, String, MapDocument>) s -> {
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
return new Tuple2<>(d.getIdentifier(), d);
});
// create blocks for deduplication
JavaPairRDD<String, Block> blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf);
JavaRDD<BlockStats> blockStats = blocks.map(b ->
new BlockStats(
b._1(),
(long) b._2().getDocuments().size(),
computeComparisons(
(long) b._2().getDocuments().size(), (long) dedupConf.getWf().getSlidingWindowSize()))
);
// save the blockstats in the workingdir
spark
.createDataset(blockStats.rdd(), Encoders.bean(BlockStats.class))
.write()
.mode(SaveMode.Overwrite)
.save(outputPath);
}
}
public Long computeComparisons(Long blockSize, Long slidingWindowSize){
if (slidingWindowSize >= blockSize)
@ -118,4 +66,59 @@ public class SparkBlockStats extends AbstractSparkAction {
return (blockSize - slidingWindowSize + 1) * (slidingWindowSize * (slidingWindowSize - 1)) / 2;
}
}
@Override
public void run(ISLookUpService isLookUpService)
throws DocumentException, IOException, ISLookUpException {
// read oozie parameters
final String graphBasePath = parser.get("graphBasePath");
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
// for each dedup configuration
for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
final String subEntity = dedupConf.getWf().getSubEntityValue();
log.info("Creating blockstats for: '{}'", subEntity);
final String outputPath = DedupUtility.createBlockStatsPath(workingPath, actionSetId, subEntity);
removeOutputDir(spark, outputPath);
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaPairRDD<String, MapDocument> mapDocuments = sc
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
.mapToPair(
(PairFunction<String, String, MapDocument>) s -> {
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
return new Tuple2<>(d.getIdentifier(), d);
});
// create blocks for deduplication
JavaPairRDD<String, Block> blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf);
JavaRDD<BlockStats> blockStats = blocks
.map(
b -> new BlockStats(
b._1(),
(long) b._2().getDocuments().size(),
computeComparisons(
(long) b._2().getDocuments().size(), (long) dedupConf.getWf().getSlidingWindowSize())));
// save the blockstats in the workingdir
spark
.createDataset(blockStats.rdd(), Encoders.bean(BlockStats.class))
.write()
.mode(SaveMode.Overwrite)
.save(outputPath);
}
}
}

View File

@ -1,44 +1,45 @@
package eu.dnetlib.dhp.oa.dedup.model;
import java.io.Serializable;
public class BlockStats implements Serializable {
private String key; //key of the block
private Long size; //number of elements in the block
private Long comparisons; //number of comparisons in the block
private String key; // key of the block
private Long size; // number of elements in the block
private Long comparisons; // number of comparisons in the block
public BlockStats() {
}
public BlockStats() {
}
public BlockStats(String key, Long size, Long comparisons) {
this.key = key;
this.size = size;
this.comparisons = comparisons;
}
public BlockStats(String key, Long size, Long comparisons) {
this.key = key;
this.size = size;
this.comparisons = comparisons;
}
public String getKey() {
return key;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public void setKey(String key) {
this.key = key;
}
public Long getSize() {
return size;
}
public Long getSize() {
return size;
}
public void setSize(Long size) {
this.size = size;
}
public void setSize(Long size) {
this.size = size;
}
public Long getComparisons() {
return comparisons;
}
public Long getComparisons() {
return comparisons;
}
public void setComparisons(Long comparisons) {
this.comparisons = comparisons;
}
public void setComparisons(Long comparisons) {
this.comparisons = comparisons;
}
}

View File

@ -1,4 +1,4 @@
<workflow-app name="Duplicate Scan" xmlns="uri:oozie:workflow:0.5">
<workflow-app name="Create dedup blocks" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphBasePath</name>
@ -12,14 +12,6 @@
<name>actionSetId</name>
<description>id of the actionSet</description>
</property>
<property>
<name>workingPath</name>
<description>path for the working directory</description>
</property>
<property>
<name>dedupGraphPath</name>
<description>path for the output graph</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
@ -85,7 +77,7 @@
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Create Similarity Relations</name>
<name>Create deduplication blocks</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkBlockStats</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
@ -101,7 +93,7 @@
<arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg>
<arg>--w</arg><arg>${workingPath}</arg>
<arg>--w</arg><arg>${workingDir}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>

View File

@ -1,8 +1,17 @@
package eu.dnetlib.dhp.oa.dedup;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import static java.nio.file.Files.createTempDirectory;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
@ -14,162 +23,155 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import static java.nio.file.Files.createTempDirectory;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ExtendWith(MockitoExtension.class)
public class SparkStatsTest implements Serializable {
@Mock(serializable = true)
ISLookUpService isLookUpService;
@Mock(serializable = true)
ISLookUpService isLookUpService;
private static SparkSession spark;
private static JavaSparkContext jsc;
private static SparkSession spark;
private static JavaSparkContext jsc;
private static String testGraphBasePath;
private static String testOutputBasePath;
private static final String testActionSetId = "test-orchestrator";
private static String testGraphBasePath;
private static String testOutputBasePath;
private static final String testActionSetId = "test-orchestrator";
@BeforeAll
public static void cleanUp() throws IOException, URISyntaxException {
@BeforeAll
public static void cleanUp() throws IOException, URISyntaxException {
testGraphBasePath = Paths
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI())
.toFile()
.getAbsolutePath();
testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
.toAbsolutePath()
.toString();
testGraphBasePath = Paths
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI())
.toFile()
.getAbsolutePath();
testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
.toAbsolutePath()
.toString();
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testOutputBasePath));
final SparkConf conf = new SparkConf();
conf.set("spark.sql.shuffle.partitions", "200");
spark = SparkSession
.builder()
.appName(SparkDedupTest.class.getSimpleName())
.master("local[*]")
.config(conf)
.getOrCreate();
final SparkConf conf = new SparkConf();
conf.set("spark.sql.shuffle.partitions", "200");
spark = SparkSession
.builder()
.appName(SparkDedupTest.class.getSimpleName())
.master("local[*]")
.config(conf)
.getOrCreate();
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
}
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
}
@BeforeEach
public void setUp() throws IOException, ISLookUpException {
@BeforeEach
public void setUp() throws IOException, ISLookUpException {
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId)))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml")));
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId)))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml")));
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization")))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization")))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication")))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")));
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication")))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")));
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software")))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json")));
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software")))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json")));
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("dataset")))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json")));
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("dataset")))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json")));
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("otherresearchproduct")))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json")));
}
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("otherresearchproduct")))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json")));
}
@Test
public void createBlockStatsTest() throws Exception {
@Test
public void createBlockStatsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json")));
parser
.parseArgument(
new String[] {
"-i", testGraphBasePath,
"-asi", testActionSetId,
"-la", "lookupurl",
"-w", testOutputBasePath
});
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json")));
parser
.parseArgument(
new String[] {
"-i", testGraphBasePath,
"-asi", testActionSetId,
"-la", "lookupurl",
"-w", testOutputBasePath
});
new SparkBlockStats(parser, spark).run(isLookUpService);
new SparkBlockStats(parser, spark).run(isLookUpService);
long orgs_blocks = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/organization_blockstats")
.count();
long orgs_blocks = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/organization_blockstats")
.count();
long pubs_blocks = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/publication_blockstats")
.count();
long pubs_blocks = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/publication_blockstats")
.count();
long sw_blocks = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/software_blockstats")
.count();
long sw_blocks = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/software_blockstats")
.count();
long ds_blocks = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_blockstats")
.count();
long ds_blocks = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_blockstats")
.count();
long orp_blocks = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_blockstats")
.count();
long orp_blocks = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_blockstats")
.count();
assertEquals(121, orgs_blocks);
assertEquals(110, pubs_blocks);
assertEquals(21, sw_blocks);
assertEquals(67, ds_blocks);
assertEquals(55, orp_blocks);
}
assertEquals(121, orgs_blocks);
assertEquals(110, pubs_blocks);
assertEquals(21, sw_blocks);
assertEquals(67, ds_blocks);
assertEquals(55, orp_blocks);
}
}