actionmanager implementation prototyping

This commit is contained in:
Przemysław Jacewicz 2020-02-06 19:14:41 +01:00
parent 24219d1204
commit 86b60268bb
16 changed files with 242 additions and 0 deletions

View File

@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows</artifactId>
<version>1.0.5-SNAPSHOT</version>
</parent>
<artifactId>dhp-actionmanager</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>net.sf.saxon</groupId>
<artifactId>Saxon-HE</artifactId>
</dependency>
<dependency>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
</dependency>
<dependency>
<groupId>xml-apis</groupId>
<artifactId>xml-apis</artifactId>
</dependency>
<dependency>
<groupId>jaxen</groupId>
<artifactId>jaxen</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.25.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>
<version>1.0.5-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,100 @@
package eu.dnetlib.dhp.actionmanager;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Software;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;
import scala.Tuple2;
import java.io.IOException;
import java.util.Collections;
import static org.apache.spark.sql.functions.*;
public class PromoteActionSetFromHDFSJob {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(
PromoteActionSetFromHDFSJob.class
.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json")));
parser.parseArgument(args);
String inputActionSetPath = parser.get("input");
String outputPath = parser.get("output");
final SparkConf conf = new SparkConf();
conf.setMaster(parser.get("master"));
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
try (SparkSession spark = SparkSession.builder().config(conf).getOrCreate()) {
// reading actions as RDD
JavaRDD<Row> actionsRDD = JavaSparkContext
.fromSparkContext(spark.sparkContext())
.sequenceFile(inputActionSetPath, Text.class, Text.class)
.map(x -> RowFactory.create(x._2().toString()));
// converting actions to DataFrame and deserializing content of TargetValue
// using unbase64 on TargetValue content to get String representation
StructType rowSchema = StructType$.MODULE$.apply(
Collections.singletonList(
StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty())
));
Dataset<Row> deserializedTargetValue = spark.createDataFrame(actionsRDD, rowSchema)
.withColumn("TargetValue", get_json_object(col("value"), "$.TargetValue"))
.select(unbase64(col("TargetValue")).cast(DataTypes.StringType).as("target_value_json"))
.cache();
// printing: only for testing
deserializedTargetValue.printSchema();
deserializedTargetValue.show();
System.out.println(deserializedTargetValue.first().toString());
// grouping and merging: should be generic
Dataset<Software> softwareDS = deserializedTargetValue
.map((MapFunction<Row, Software>) PromoteActionSetFromHDFSJob::rowToOafEntity, Encoders.kryo(Software.class))
.groupByKey((MapFunction<Software, String>) OafEntity::getId, Encoders.STRING())
.reduceGroups((ReduceFunction<Software>) (software1, software2) -> {
software1.mergeFrom(software2);
return software1;
})
.map((MapFunction<Tuple2<String, Software>, Software>) pair -> pair._2, Encoders.kryo(Software.class));
softwareDS.printSchema();
softwareDS.show();
// save
// softwareDS.toDF()
// .write()
// .partitionBy("id")
// .save(outputPath);
// another approach: using only DataFrames i.e. DataSet<Row>, not DataSets<Software>
}
}
private static Software rowToOafEntity(Row row) {
// converts row with JSON into Software object: should be generic
// currently extracts only "entity.id" field from JSON
ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode jsonNode = objectMapper.readTree(row.getString(0));
String id = jsonNode.at("/entity/id").asText();
Software software = new Software();
software.setId(id);
return software;
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,20 @@
[
{
"paramName": "mt",
"paramLongName": "master",
"paramDescription": "should be local or yarn",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "input",
"paramDescription": "the path of the input sequential file to read",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "output",
"paramDescription": "the path of the result DataFrame on HDFS",
"paramRequired": true
}
]

View File

@ -0,0 +1,59 @@
package eu.dnetlib.dhp.actionmanager;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Objects;
public class PromoteActionSetFromHDFSJobTest {
private ClassLoader cl = getClass().getClassLoader();
private Path workingDir;
private Path inputActionSetDir;
private Path outputDir;
@Before
public void before() throws IOException {
workingDir = Files.createTempDirectory("promote_action_set");
inputActionSetDir = workingDir.resolve("input");
outputDir = workingDir.resolve("output");
}
@After
public void after() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
}
@Test
public void shouldReadAtomicActionsFromHDFSAndWritePartitionedAsParquetFiles() throws Exception {
// given
// NOTE: test resource should contain atomic actions in a human readable form, probably as json files; here the
// files should be converted to a serialized format and written out to workingDir/input
// for current testing: actions from software export, given as sequence file are copied to workingDir/input/
Path exportedActionSetDir = Paths.get(Objects.requireNonNull(cl.getResource("entities/entities_software")).getFile());
Path inputDir = inputActionSetDir.resolve("entities_software");
Files.createDirectories(inputDir);
copyFiles(exportedActionSetDir, inputDir);
PromoteActionSetFromHDFSJob.main(new String[]{
"-mt", "local[*]",
"-i", inputDir.toString(),
"-o", outputDir.toString()
});
}
private static void copyFiles(Path source, Path target) throws IOException {
Files.list(source).forEach(f -> {
try {
Files.copy(f, target.resolve(f.getFileName()));
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
});
}
}

View File

@ -18,6 +18,7 @@
<module>dhp-distcp</module>
<module>dhp-graph-mapper</module>
<module>dhp-dedup</module>
<module>dhp-actionmanager</module>
</modules>
<pluginRepositories>