minor changes

This commit is contained in:
miconis 2019-03-26 15:40:40 +01:00
parent f87790f701
commit 1dbb765343
4 changed files with 100 additions and 9 deletions

View File

@ -14,14 +14,15 @@
<packaging>jar</packaging>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
<configuration>
<skip>true</skip>
</configuration>
<!--<configuration>-->
<!--<skip>true</skip>-->
<!--</configuration>-->
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@ -60,6 +61,11 @@
<artifactId>spark-graphx_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-openaireplus-mapping-utils</artifactId>
@ -72,6 +78,12 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.oozie</groupId>
<artifactId>oozie-client</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -10,8 +10,6 @@ import eu.dnetlib.pace.utils.PaceUtils;
import eu.dnetlib.reporter.SparkCounter;
import eu.dnetlib.reporter.SparkReporter;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
@ -28,18 +26,17 @@ import java.util.stream.Collectors;
public class SparkTest {
public static SparkCounter counter ;
private static final Log log = LogFactory.getLog(SparkTest.class);
public static void main(String[] args) {
final JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Deduplication").setMaster("local[*]"));
final JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Deduplication").setMaster("yarn"));
final URL dataset = SparkTest.class.getResource("/eu/dnetlib/pace/organization.to.fix.json");
final URL dataset = SparkTest.class.getResource(args[1]);
final JavaRDD<String> dataRDD = context.textFile(dataset.getPath());
counter = new SparkCounter(context);
//read the configuration from the classpath
final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/org.curr.conf"));
final DedupConfig config = DedupConfig.load(readFromClasspath(args[0]));
BlockProcessor.constructAccumulator(config);
BlockProcessor.accumulators.forEach(acc -> {

View File

@ -0,0 +1,80 @@
package eu.dnetlib.pace;
import org.apache.commons.io.IOUtils;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowJob;
import org.junit.Test;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Properties;
import static junit.framework.Assert.assertEquals;
public class DedupTestIT {
@Test
public void deduplicationTest() throws OozieClientException, InterruptedException {
//read properties to use in the oozie workflow
Properties prop = readProperties("/eu/dnetlib/test/properties/config.properties");
/*OOZIE WORKFLOW CREATION AND LAUNCH*/
// get a OozieClient for local Oozie
OozieClient wc = new OozieClient("http://hadoop-edge3.garr-pa1.d4science.org:11000/oozie");
// create a workflow job configuration and set the workflow application path
Properties conf = wc.createConfiguration();
conf.setProperty(OozieClient.APP_PATH, "hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/michele.debonis/oozieJob/workflow.xml");
conf.setProperty(OozieClient.USER_NAME, "michele.debonis");
conf.setProperty("oozie.action.sharelib.for.spark", "spark2");
// setting workflow parameters
conf.setProperty("jobTracker", "hadoop-rm3.garr-pa1.d4science.org:8032");
conf.setProperty("nameNode", "hdfs://hadoop-rm1.garr-pa1.d4science.org:8020");
conf.setProperty("dedupConfiguration", prop.getProperty("dedup.configuration"));
conf.setProperty("inputSpace", prop.getProperty("input.space"));
// conf.setProperty("inputDir", "/usr/tucu/inputdir");
// conf.setProperty("outputDir", "/usr/tucu/outputdir");
// submit and start the workflow job
String jobId = wc.run(conf);
System.out.println("Workflow job submitted");
// wait until the workflow job finishes printing the status every 10 seconds
while (wc.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) {
System.out.println(wc.getJobInfo(jobId));;
Thread.sleep(10 * 1000);
}
// print the final status of the workflow job
System.out.println(wc.getJobInfo(jobId));
System.out.println("JOB LOG = " + wc.getJobLog(jobId));
assertEquals(WorkflowJob.Status.SUCCEEDED, wc.getJobInfo(jobId).getStatus());
}
static Properties readProperties(final String propFile) {
Properties prop = new Properties();
try {
prop.load(DedupTestIT.class.getResourceAsStream(propFile));
} catch (IOException e) {
e.printStackTrace();
}
return prop;
}
static String readFromClasspath(final String filename) {
final StringWriter sw = new StringWriter();
try {
IOUtils.copy(DedupTestIT.class.getResourceAsStream(filename), sw);
return sw.toString();
} catch (final IOException e) {
throw new RuntimeException("cannot load resource from classpath: " + filename);
}
}
}

View File

@ -0,0 +1,2 @@
input.space = /eu/dnetlib/pace/organization.to.fix.json
dedup.configuration = /eu/dnetlib/pace/org.curr.conf