From 1dbb765343bb42a22ef44cf3d89275884e1809c6 Mon Sep 17 00:00:00 2001 From: miconis Date: Tue, 26 Mar 2019 15:40:40 +0100 Subject: [PATCH] minor changes --- dnet-dedup-test/pom.xml | 18 ++++- .../src/main/java/eu/dnetlib/SparkTest.java | 9 +-- .../java/eu/dnetlib/pace/DedupTestIT.java | 80 +++++++++++++++++++ .../dnetlib/test/properties/config.properties | 2 + 4 files changed, 100 insertions(+), 9 deletions(-) create mode 100644 dnet-dedup-test/src/test/java/eu/dnetlib/pace/DedupTestIT.java create mode 100644 dnet-dedup-test/src/test/resources/eu/dnetlib/test/properties/config.properties diff --git a/dnet-dedup-test/pom.xml b/dnet-dedup-test/pom.xml index 900d214..00d1a45 100644 --- a/dnet-dedup-test/pom.xml +++ b/dnet-dedup-test/pom.xml @@ -14,14 +14,15 @@ jar + src/main/java org.apache.maven.plugins maven-deploy-plugin 2.7 - - true - + + + org.apache.maven.plugins @@ -60,6 +61,11 @@ spark-graphx_2.11 + + org.apache.spark + spark-sql_2.11 + + eu.dnetlib dnet-openaireplus-mapping-utils @@ -72,6 +78,12 @@ test + + org.apache.oozie + oozie-client + test + + \ No newline at end of file diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/SparkTest.java b/dnet-dedup-test/src/main/java/eu/dnetlib/SparkTest.java index c1a0284..aa4dcd6 100644 --- a/dnet-dedup-test/src/main/java/eu/dnetlib/SparkTest.java +++ b/dnet-dedup-test/src/main/java/eu/dnetlib/SparkTest.java @@ -10,8 +10,6 @@ import eu.dnetlib.pace.utils.PaceUtils; import eu.dnetlib.reporter.SparkCounter; import eu.dnetlib.reporter.SparkReporter; import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -28,18 +26,17 @@ import java.util.stream.Collectors; public class SparkTest { public static SparkCounter counter ; - private static final Log log = LogFactory.getLog(SparkTest.class); public static void main(String[] args) { - final JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Deduplication").setMaster("local[*]")); + final JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Deduplication").setMaster("yarn")); - final URL dataset = SparkTest.class.getResource("/eu/dnetlib/pace/organization.to.fix.json"); + final URL dataset = SparkTest.class.getResource(args[1]); final JavaRDD dataRDD = context.textFile(dataset.getPath()); counter = new SparkCounter(context); //read the configuration from the classpath - final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/org.curr.conf")); + final DedupConfig config = DedupConfig.load(readFromClasspath(args[0])); BlockProcessor.constructAccumulator(config); BlockProcessor.accumulators.forEach(acc -> { diff --git a/dnet-dedup-test/src/test/java/eu/dnetlib/pace/DedupTestIT.java b/dnet-dedup-test/src/test/java/eu/dnetlib/pace/DedupTestIT.java new file mode 100644 index 0000000..3b9ff1b --- /dev/null +++ b/dnet-dedup-test/src/test/java/eu/dnetlib/pace/DedupTestIT.java @@ -0,0 +1,80 @@ +package eu.dnetlib.pace; + +import org.apache.commons.io.IOUtils; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.OozieClientException; +import org.apache.oozie.client.WorkflowJob; +import org.junit.Test; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Properties; + +import static junit.framework.Assert.assertEquals; + +public class DedupTestIT { + + @Test + public void deduplicationTest() throws OozieClientException, InterruptedException { + + //read properties to use in the oozie workflow + Properties prop = readProperties("/eu/dnetlib/test/properties/config.properties"); + + /*OOZIE WORKFLOW CREATION AND LAUNCH*/ + // get a OozieClient for local Oozie + OozieClient wc = new OozieClient("http://hadoop-edge3.garr-pa1.d4science.org:11000/oozie"); + + // create a workflow job configuration and set the workflow application path + Properties conf = wc.createConfiguration(); + conf.setProperty(OozieClient.APP_PATH, "hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/michele.debonis/oozieJob/workflow.xml"); + conf.setProperty(OozieClient.USER_NAME, "michele.debonis"); + conf.setProperty("oozie.action.sharelib.for.spark", "spark2"); + + // setting workflow parameters + conf.setProperty("jobTracker", "hadoop-rm3.garr-pa1.d4science.org:8032"); + conf.setProperty("nameNode", "hdfs://hadoop-rm1.garr-pa1.d4science.org:8020"); + conf.setProperty("dedupConfiguration", prop.getProperty("dedup.configuration")); + conf.setProperty("inputSpace", prop.getProperty("input.space")); +// conf.setProperty("inputDir", "/usr/tucu/inputdir"); +// conf.setProperty("outputDir", "/usr/tucu/outputdir"); + + // submit and start the workflow job + String jobId = wc.run(conf); + System.out.println("Workflow job submitted"); + + // wait until the workflow job finishes printing the status every 10 seconds + while (wc.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) { + System.out.println(wc.getJobInfo(jobId));; + Thread.sleep(10 * 1000); + } + + // print the final status of the workflow job + System.out.println(wc.getJobInfo(jobId)); + System.out.println("JOB LOG = " + wc.getJobLog(jobId)); + + assertEquals(WorkflowJob.Status.SUCCEEDED, wc.getJobInfo(jobId).getStatus()); + } + + + static Properties readProperties(final String propFile) { + + Properties prop = new Properties(); + try { + prop.load(DedupTestIT.class.getResourceAsStream(propFile)); + } catch (IOException e) { + e.printStackTrace(); + } + return prop; + } + + static String readFromClasspath(final String filename) { + final StringWriter sw = new StringWriter(); + try { + IOUtils.copy(DedupTestIT.class.getResourceAsStream(filename), sw); + return sw.toString(); + } catch (final IOException e) { + throw new RuntimeException("cannot load resource from classpath: " + filename); + } + } + +} diff --git a/dnet-dedup-test/src/test/resources/eu/dnetlib/test/properties/config.properties b/dnet-dedup-test/src/test/resources/eu/dnetlib/test/properties/config.properties new file mode 100644 index 0000000..13a0d19 --- /dev/null +++ b/dnet-dedup-test/src/test/resources/eu/dnetlib/test/properties/config.properties @@ -0,0 +1,2 @@ +input.space = /eu/dnetlib/pace/organization.to.fix.json +dedup.configuration = /eu/dnetlib/pace/org.curr.conf