diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java index fc83a8d69..a4970a928 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java @@ -24,12 +24,13 @@ public class ArgumentApplicationParser implements Serializable { } private void createOptionMap(final OptionsParameter[] configuration) { - Arrays.stream(configuration).map(conf -> Option.builder(conf.getParamName()) - .longOpt(conf.getParamLongName()) - .required(conf.isParamRequired()) - .desc(conf.getParamDescription()) - .hasArg() // This option has an argument. - .build()).forEach(options::addOption); + + Arrays.stream(configuration).map(conf -> { + final Option o = new Option(conf.getParamName(), true, conf.getParamDescription()); + o.setLongOpt(conf.getParamLongName()); + o.setRequired(conf.isParamRequired()); + return o; + }).forEach(options::addOption); // HelpFormatter formatter = new HelpFormatter(); // formatter.printHelp("myapp", null, options, null, true); @@ -38,7 +39,7 @@ public class ArgumentApplicationParser implements Serializable { } public void parseArgument(final String[] args) throws Exception { - CommandLineParser parser = new DefaultParser(); + CommandLineParser parser = new BasicParser(); CommandLine cmd = parser.parse(options, args); Arrays.stream(cmd.getOptions()).forEach(it -> objectMap.put(it.getLongOpt(), it.getValue())); } diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml index 1f9c38b56..a8535fc93 100644 --- a/dhp-schemas/pom.xml +++ b/dhp-schemas/pom.xml @@ -17,18 +17,7 @@ - - com.fasterxml.jackson.core - jackson-core - - - com.fasterxml.jackson.core - jackson-annotations - - - com.fasterxml.jackson.core - jackson-databind - + commons-io commons-io diff --git a/dhp-schemas/src/test/java/eu/dnetlib/dhp/schema/proto/TestParseProtoJson.java b/dhp-schemas/src/test/java/eu/dnetlib/dhp/schema/proto/TestParseProtoJson.java index e16050ab2..7f5139631 100644 --- a/dhp-schemas/src/test/java/eu/dnetlib/dhp/schema/proto/TestParseProtoJson.java +++ b/dhp-schemas/src/test/java/eu/dnetlib/dhp/schema/proto/TestParseProtoJson.java @@ -5,18 +5,19 @@ import eu.dnetlib.data.proto.OafProtos; import org.apache.commons.io.IOUtils; import org.junit.Test; - public class TestParseProtoJson { - @Test public void testParse() throws Exception { final String json = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/schema/proto/hugeRecord.json")); - OafProtos.Oaf.Builder oafBuilder =OafProtos.Oaf.newBuilder(); - JsonFormat.merge(json,oafBuilder); + final OafProtos.Oaf.Builder oafBuilder = OafProtos.Oaf.newBuilder(); - System.out.println(JsonFormat.printToString(oafBuilder.build())); + JsonFormat jf = new JsonFormat(); + jf.merge(IOUtils.toInputStream(json), oafBuilder); + OafProtos.Oaf oaf = oafBuilder.build(); + System.out.println(jf.printToString(oaf)); } + } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java index 61e8cc34f..f4da193a1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java @@ -74,7 +74,7 @@ public class GenerateNativeStoreSparkJob { final Map ongoingMap = new HashMap<>(); final Map reportMap = new HashMap<>(); - final boolean test = parser.get("isTest") == null?false: Boolean.valueOf(parser.get("isTest")); + final boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest")); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); @@ -86,7 +86,7 @@ public class GenerateNativeStoreSparkJob { final MessageManager manager = new MessageManager(parser.get("rabbitHost"), parser.get("rabbitUser"), parser.get("rabbitPassword"), false, false, null); - final JavaRDD mappeRDD = inputRDD.map(item -> parseRecord(item._2().toString(), parser.get("xpath"), parser.get("encoding"),provenance, dateOfCollection, totalItems, invalidRecords)) + final JavaRDD mappeRDD = inputRDD.map(item -> parseRecord(item._2().toString(), parser.get("xpath"), parser.get("encoding"), provenance, dateOfCollection, totalItems, invalidRecords)) .filter(Objects::nonNull).distinct(); ongoingMap.put("ongoing", "0"); diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java index c6e50343b..848fbe17d 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java @@ -17,18 +17,15 @@ public class CollectionJobTest { @Before public void setup() throws IOException { testDir = Files.createTempDirectory("dhp-collection"); - - } - @After public void teadDown() throws IOException { FileUtils.deleteDirectory(testDir.toFile()); } @Test - public void tesCollection () throws Exception { + public void tesCollection() throws Exception { Provenance provenance = new Provenance("pippo", "puppa", "ns_prefix"); GenerateNativeStoreSparkJob.main(new String[] { "-mt", "local", diff --git a/dhp-workflows/dhp-graph-mapper/pom.xml b/dhp-workflows/dhp-graph-mapper/pom.xml index 3af9b376a..4a4492cee 100644 --- a/dhp-workflows/dhp-graph-mapper/pom.xml +++ b/dhp-workflows/dhp-graph-mapper/pom.xml @@ -16,8 +16,6 @@ org.apache.spark spark-core_2.11 - - org.apache.spark spark-sql_2.11 diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoUtils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoUtils.java index 61a401c39..41e0baa57 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoUtils.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ProtoUtils.java @@ -5,14 +5,19 @@ import eu.dnetlib.data.proto.FieldTypeProtos; import eu.dnetlib.data.proto.OafProtos; import eu.dnetlib.data.proto.ResultProtos; import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import java.io.IOException; import java.util.stream.Collectors; public class ProtoUtils { - public static OafProtos.Oaf parse(String json) throws JsonFormat.ParseException { + public static OafProtos.Oaf parse(String json) throws IOException { final OafProtos.Oaf.Builder builder = OafProtos.Oaf.newBuilder(); - JsonFormat.merge(json, builder); + + final JsonFormat jf = new JsonFormat(); + jf.merge(IOUtils.toInputStream(json), builder); + return builder.build(); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java index a2fe08b67..9d8d384b6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java @@ -1,6 +1,5 @@ package eu.dnetlib.dhp.graph; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; @@ -15,10 +14,8 @@ import scala.Tuple2; public class SparkGraphImporterJob { - public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json"))); parser.parseArgument(args); final SparkSession spark = SparkSession @@ -34,7 +31,6 @@ public class SparkGraphImporterJob { final JavaRDD> inputRDD = sc.sequenceFile(inputPath, Text.class, Text.class) .map(item -> new Tuple2<>(item._1.toString(), item._2.toString())); - final JavaRDD oafRdd = inputRDD.filter(s -> !StringUtils.isBlank(s._2()) && !s._1().contains("@update")).map(Tuple2::_2).map(ProtoConverter::convert); final Encoder organizationEncoder = Encoders.bean(Organization.class); @@ -48,7 +44,6 @@ public class SparkGraphImporterJob { final Encoder relationEncoder = Encoders.bean(Relation.class); - spark.createDataset(oafRdd.filter(s -> s instanceof Organization).map(s -> (Organization) s).rdd(), organizationEncoder).write().save(outputPath + "/organizations"); spark.createDataset(oafRdd.filter(s -> s instanceof Project).map(s -> (Project) s).rdd(), projectEncoder).write().save(outputPath + "/projects"); spark.createDataset(oafRdd.filter(s -> s instanceof Datasource).map(s -> (Datasource) s).rdd(), datasourceEncoder).write().save(outputPath + "/datasources"); @@ -59,8 +54,5 @@ public class SparkGraphImporterJob { spark.createDataset(oafRdd.filter(s -> s instanceof OtherResearchProducts).map(s -> (OtherResearchProducts) s).rdd(), otherResearchProductsEncoder).write().save(outputPath + "/otherResearchProducts"); spark.createDataset(oafRdd.filter(s -> s instanceof Relation).map(s -> (Relation) s).rdd(), relationEncoder).write().save(outputPath + "/relations"); - - - } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json index bb142a0b1..50378b465 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json @@ -1,5 +1,5 @@ [ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, {"paramName":"i", "paramLongName":"input", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, - {"paramName":"o", "paramLongName":"outputDir", "paramDescription": "the path where store DataFrames on HDFS", "paramRequired": true} + {"paramName":"o", "paramLongName":"outputDir", "paramDescription": "the path where store DataFrames on HDFS", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml index c814c07bb..a23106cd7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + sourcePath @@ -8,6 +8,18 @@ targetPath the target path + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + @@ -24,14 +36,14 @@ cluster MapGraphIntoDataFrame eu.dnetlib.dhp.graph.SparkGraphImporterJob - dhp-aggregations-1.0.0-SNAPSHOT.jar - --num-executors 50 --conf -spark.extraListeners=com.cloudera.spark.lineage.NavigatorAppListener -spark.sql.queryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListener" + dhp-graph-mapper-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} -mt yarn-cluster --input${sourcePath} --outputDir${targetPath} - + diff --git a/pom.xml b/pom.xml index 09c5f23a7..31a408923 100644 --- a/pom.xml +++ b/pom.xml @@ -155,7 +155,8 @@ commons-cli commons-cli - 1.4 + 1.2 + provided @@ -208,6 +209,7 @@ provided + javax.persistence javax.persistence-api @@ -244,7 +246,7 @@ com.googlecode.protobuf-java-format protobuf-java-format - 1.2 + 1.4 @@ -450,8 +452,8 @@ cdh5.9.2 2.6.0-${dhp.cdh.version} 4.1.0-${dhp.cdh.version} - 2.2.0 - 2.6.5 + 2.4.0.cloudera2 + 2.9.6 3.5 2.11.8 2.5.0