align parmeter names, graph import procedure WIP

This commit is contained in:
Claudio Atzori 2019-11-04 17:41:01 +01:00
parent f39148dab8
commit 1e7a2ac41d
7 changed files with 91 additions and 36 deletions

View File

@ -27,8 +27,8 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
public class TransformationJobTest {
@Mock
LongAccumulator accumulator;
@ -42,9 +42,8 @@ public class TransformationJobTest {
testDir = Files.createTempDirectory("dhp-collection");
}
@After
public void teadDown() throws IOException {
public void tearDown() throws IOException {
FileUtils.deleteDirectory(testDir.toFile());
}
@ -90,11 +89,8 @@ public class TransformationJobTest {
"-rh", "",
"-ro", "",
"-rr", ""});
}
@Test
public void tryLoadFolderOnCP() throws Exception {
final String path = this.getClass().getResource("/eu/dnetlib/dhp/transform/mdstorenative").getFile();
@ -102,7 +98,6 @@ public class TransformationJobTest {
Path tempDirWithPrefix = Files.createTempDirectory("mdsotre_output");
System.out.println(tempDirWithPrefix.toFile().getAbsolutePath());
Files.deleteIfExists(tempDirWithPrefix);
@ -140,10 +135,6 @@ public class TransformationJobTest {
Node node = document.selectSingleNode("//CODE/*[local-name()='stylesheet']");
System.out.println(node.asXML());
}
}

View File

@ -0,0 +1,23 @@
package eu.dnetlib.dhp.graph;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.*;
import java.util.Map;
public class GraphMappingUtils {
public final static Map<String, Class> types = Maps.newHashMap();
static {
types.put("datasource", Datasource.class);
types.put("organization", Organization.class);
types.put("project", Project.class);
types.put("dataset", Dataset.class);
types.put("otherresearchproduct", OtherResearchProduct.class);
types.put("software", Software.class);
types.put("publication", Publication.class);
types.put("relation", Relation.class);
}
}

View File

@ -1,9 +1,7 @@
package eu.dnetlib.dhp.graph;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.Text;
@ -13,8 +11,6 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.Map;
public class SparkGraphImporterJob {
public static void main(String[] args) throws Exception {
@ -27,8 +23,8 @@ public class SparkGraphImporterJob {
.master(parser.get("master"))
.getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final String inputPath = parser.get("input");
final String outputPath = parser.get("outputDir");
final String inputPath = parser.get("sourcePath");
final String outputPath = parser.get("targetPath");
final String filter = parser.get("filter");
@ -36,17 +32,7 @@ public class SparkGraphImporterJob {
final JavaRDD<Tuple2<String, String>> inputRDD = sc.sequenceFile(inputPath, Text.class, Text.class)
.map(item -> new Tuple2<>(item._1.toString(), item._2.toString()));
final Map<String, Class> types = Maps.newHashMap();
types.put("datasource", Datasource.class);
types.put("organization", Organization.class);
types.put("project", Project.class);
types.put("dataset", Dataset.class);
types.put("otherresearchproduct", OtherResearchProduct.class);
types.put("software", Software.class);
types.put("publication", Publication.class);
types.put("relation", Relation.class);
types.forEach((name, clazz) -> {
GraphMappingUtils.types.forEach((name, clazz) -> {
if (StringUtils.isNotBlank(filter) || filter.toLowerCase().contains(name)) {
spark.createDataset(inputRDD
.filter(s -> s._1().equals(clazz.getName()))

View File

@ -1,6 +1,6 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"i", "paramLongName":"input", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
{"paramName":"f", "paramLongName":"filter", "paramDescription": "csv of typology of dataframe to be generated", "paramRequired": false},
{"paramName":"o", "paramLongName":"outputDir", "paramDescription": "the path where store DataFrames on HDFS", "paramRequired": true}
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path where store DataFrames on HDFS", "paramRequired": true}
]

View File

@ -43,8 +43,8 @@
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--input</arg><arg>${sourcePath}</arg>
<arg>--outputDir</arg><arg>${targetPath}</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
<arg>--filter</arg><arg>${filter}</arg>
</spark>

View File

@ -0,0 +1,31 @@
package eu.dnetlib.dhp.graph;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.List;
import java.util.stream.Collectors;
public class SparkGraphImportCounterTest {
public static List<Tuple2<String, Long>> countEntities(final String inputPath) throws Exception {
final SparkSession spark = SparkSession
.builder()
.appName(SparkGraphImportCounterTest.class.getSimpleName())
.master("local[*]")
.getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
return GraphMappingUtils.types.entrySet()
.stream()
.map(entry -> {
final Long count = spark.read().load(inputPath + "/" + entry.getKey()).as(Encoders.bean(entry.getValue())).count();
return new Tuple2<String, Long>(entry.getKey(), count);
})
.collect(Collectors.toList());
}
}

View File

@ -1,15 +1,39 @@
package eu.dnetlib.dhp.graph;
import org.junit.Ignore;
import org.junit.Test;
import org.apache.commons.io.FileUtils;
import org.junit.*;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
public class SparkGraphImporterJobTest {
private static final long MAX = 1000L;
private Path testDir;
@Before
public void setup() throws IOException {
testDir = Files.createTempDirectory(getClass().getSimpleName());
}
@After
public void tearDown() throws IOException {
FileUtils.deleteDirectory(testDir.toFile());
}
@Test
@Ignore
public void testImport() throws Exception {
SparkGraphImporterJob.main(new String[]{"-mt", "local[*]","-i", "/home/sandro/part-m-02236", "-o", "/tmp/dataframes", "-f", "publication"});
SparkGraphImporterJob.main(new String[] {
"-mt", "local[*]",
"-i", getClass().getResource("/eu/dnetlib/dhp/dhp-sample/part-m-00010").getPath(),
"-o", testDir.toString()});
SparkGraphImportCounterTest.countEntities(testDir.toString()).forEach(t -> {
System.out.println(t);
//Assert.assertEquals(String.format("mapped %s must be %s", t._1(), MAX), MAX, t._2().longValue());
});
}
}