diff --git a/dhp-workflows/dhp-actionmanager/pom.xml b/dhp-workflows/dhp-actionmanager/pom.xml new file mode 100644 index 0000000000..be76db7558 --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/pom.xml @@ -0,0 +1,62 @@ + + + 4.0.0 + + eu.dnetlib.dhp + dhp-workflows + 1.0.5-SNAPSHOT + + dhp-actionmanager + + + + org.apache.spark + spark-core_2.11 + + + + org.apache.spark + spark-sql_2.11 + + + + eu.dnetlib.dhp + dhp-common + ${project.version} + + + + net.sf.saxon + Saxon-HE + + + + dom4j + dom4j + + + + xml-apis + xml-apis + + + + jaxen + jaxen + + + + org.mockito + mockito-core + 2.25.0 + test + + + eu.dnetlib.dhp + dhp-schemas + 1.0.5-SNAPSHOT + compile + + + diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJob.java new file mode 100644 index 0000000000..e21dd2acef --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJob.java @@ -0,0 +1,100 @@ +package eu.dnetlib.dhp.actionmanager; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Software; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.ReduceFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.types.*; +import scala.Tuple2; + +import java.io.IOException; +import java.util.Collections; + +import static org.apache.spark.sql.functions.*; + +public class PromoteActionSetFromHDFSJob { + + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString( + PromoteActionSetFromHDFSJob.class + .getResourceAsStream("/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json"))); + parser.parseArgument(args); + String inputActionSetPath = parser.get("input"); + String outputPath = parser.get("output"); + + final SparkConf conf = new SparkConf(); + conf.setMaster(parser.get("master")); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + + try (SparkSession spark = SparkSession.builder().config(conf).getOrCreate()) { + // reading actions as RDD + JavaRDD actionsRDD = JavaSparkContext + .fromSparkContext(spark.sparkContext()) + .sequenceFile(inputActionSetPath, Text.class, Text.class) + .map(x -> RowFactory.create(x._2().toString())); + + // converting actions to DataFrame and deserializing content of TargetValue + // using unbase64 on TargetValue content to get String representation + StructType rowSchema = StructType$.MODULE$.apply( + Collections.singletonList( + StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty()) + )); + Dataset deserializedTargetValue = spark.createDataFrame(actionsRDD, rowSchema) + .withColumn("TargetValue", get_json_object(col("value"), "$.TargetValue")) + .select(unbase64(col("TargetValue")).cast(DataTypes.StringType).as("target_value_json")) + .cache(); + + // printing: only for testing + deserializedTargetValue.printSchema(); + deserializedTargetValue.show(); + System.out.println(deserializedTargetValue.first().toString()); + + // grouping and merging: should be generic + Dataset softwareDS = deserializedTargetValue + .map((MapFunction) PromoteActionSetFromHDFSJob::rowToOafEntity, Encoders.kryo(Software.class)) + .groupByKey((MapFunction) OafEntity::getId, Encoders.STRING()) + .reduceGroups((ReduceFunction) (software1, software2) -> { + software1.mergeFrom(software2); + return software1; + }) + .map((MapFunction, Software>) pair -> pair._2, Encoders.kryo(Software.class)); + + softwareDS.printSchema(); + softwareDS.show(); + + // save +// softwareDS.toDF() +// .write() +// .partitionBy("id") +// .save(outputPath); + + // another approach: using only DataFrames i.e. DataSet, not DataSets + } + } + + private static Software rowToOafEntity(Row row) { + // converts row with JSON into Software object: should be generic + // currently extracts only "entity.id" field from JSON + ObjectMapper objectMapper = new ObjectMapper(); + try { + JsonNode jsonNode = objectMapper.readTree(row.getString(0)); + String id = jsonNode.at("/entity/id").asText(); + Software software = new Software(); + software.setId(id); + return software; + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + +} diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json new file mode 100644 index 0000000000..3b95c90d38 --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "mt", + "paramLongName": "master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "input", + "paramDescription": "the path of the input sequential file to read", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "output", + "paramDescription": "the path of the result DataFrame on HDFS", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJobTest.java b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJobTest.java new file mode 100644 index 0000000000..3020d7b310 --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJobTest.java @@ -0,0 +1,59 @@ +package eu.dnetlib.dhp.actionmanager; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Objects; + +public class PromoteActionSetFromHDFSJobTest { + private ClassLoader cl = getClass().getClassLoader(); + private Path workingDir; + private Path inputActionSetDir; + private Path outputDir; + + @Before + public void before() throws IOException { + workingDir = Files.createTempDirectory("promote_action_set"); + inputActionSetDir = workingDir.resolve("input"); + outputDir = workingDir.resolve("output"); + } + + @After + public void after() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + } + + @Test + public void shouldReadAtomicActionsFromHDFSAndWritePartitionedAsParquetFiles() throws Exception { + // given + // NOTE: test resource should contain atomic actions in a human readable form, probably as json files; here the + // files should be converted to a serialized format and written out to workingDir/input + // for current testing: actions from software export, given as sequence file are copied to workingDir/input/ + Path exportedActionSetDir = Paths.get(Objects.requireNonNull(cl.getResource("entities/entities_software")).getFile()); + Path inputDir = inputActionSetDir.resolve("entities_software"); + Files.createDirectories(inputDir); + copyFiles(exportedActionSetDir, inputDir); + PromoteActionSetFromHDFSJob.main(new String[]{ + "-mt", "local[*]", + "-i", inputDir.toString(), + "-o", outputDir.toString() + }); + } + + private static void copyFiles(Path source, Path target) throws IOException { + Files.list(source).forEach(f -> { + try { + Files.copy(f, target.resolve(f.getFileName())); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + }); + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/_SUCCESS b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/_SUCCESS new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00000 b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00000 new file mode 100644 index 0000000000..a5bfdac140 Binary files /dev/null and b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00000 differ diff --git a/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00001 b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00001 new file mode 100644 index 0000000000..1af1d3a5e4 Binary files /dev/null and b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00001 differ diff --git a/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00002 b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00002 new file mode 100644 index 0000000000..92e0460698 Binary files /dev/null and b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00002 differ diff --git a/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00003 b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00003 new file mode 100644 index 0000000000..82b051a5d3 Binary files /dev/null and b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00003 differ diff --git a/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00004 b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00004 new file mode 100644 index 0000000000..c027243a7c Binary files /dev/null and b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00004 differ diff --git a/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00005 b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00005 new file mode 100644 index 0000000000..23e7ae70a9 Binary files /dev/null and b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00005 differ diff --git a/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00006 b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00006 new file mode 100644 index 0000000000..6c5a6c657e Binary files /dev/null and b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00006 differ diff --git a/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00007 b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00007 new file mode 100644 index 0000000000..159a5a43f4 Binary files /dev/null and b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00007 differ diff --git a/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00008 b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00008 new file mode 100644 index 0000000000..9cf258c189 Binary files /dev/null and b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00008 differ diff --git a/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00009 b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00009 new file mode 100644 index 0000000000..5429aab272 Binary files /dev/null and b/dhp-workflows/dhp-actionmanager/src/test/resources/entities/entities_software/part-r-00009 differ diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml index cf71190a43..b5324fcc67 100644 --- a/dhp-workflows/pom.xml +++ b/dhp-workflows/pom.xml @@ -18,6 +18,7 @@ dhp-distcp dhp-graph-mapper dhp-dedup + dhp-actionmanager