WIP: promote job implementation snapshot

This commit is contained in:
Przemysław Jacewicz 2020-03-11 17:02:06 +01:00
parent 69540f6f78
commit cc63cdc9e6
2 changed files with 246 additions and 175 deletions

View File

@ -1,7 +1,7 @@
package eu.dnetlib.dhp.actionmanager; package eu.dnetlib.dhp.actionmanager;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.data.proto.OafProtos;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -10,15 +10,17 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*; import org.apache.spark.sql.types.*;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import static eu.dnetlib.dhp.actionmanager.PromoteActionSetFromHDFSFunctions.*;
import static org.apache.spark.sql.functions.*; import static org.apache.spark.sql.functions.*;
public class PromoteActionSetFromHDFSJob { public class PromoteActionSetFromHDFSJob {
@ -29,128 +31,132 @@ public class PromoteActionSetFromHDFSJob {
.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json"))); .getResourceAsStream("/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json")));
parser.parseArgument(args); parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
String inputGraphPath = parser.get("inputGraphPath"); String inputGraphPath = parser.get("inputGraphPath");
List<String> inputActionSetPaths = Arrays.asList(parser.get("inputActionSetPaths").split(",")); String inputActionSetPaths = parser.get("inputActionSetPaths");
String graphTableName = parser.get("graphTableName");
String outputGraphPath = parser.get("outputGraphPath"); String outputGraphPath = parser.get("outputGraphPath");
OafMergeAndGet.Strategy strategy = OafMergeAndGet.Strategy.valueOf(parser.get("mergeAndGetStrategy"));
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.setMaster(parser.get("master"));
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(new Class[]{
Author.class,
Context.class,
Country.class,
DataInfo.class,
eu.dnetlib.dhp.schema.oaf.Dataset.class,
Datasource.class,
ExternalReference.class,
ExtraInfo.class,
Field.class,
GeoLocation.class,
Instance.class,
Journal.class,
KeyValue.class,
Oaf.class,
OafEntity.class,
OAIProvenance.class,
Organization.class,
OriginDescription.class,
OtherResearchProduct.class,
Project.class,
Publication.class,
Qualifier.class,
Relation.class,
Result.class,
Software.class,
StructuredProperty.class
});
try (SparkSession spark = SparkSession.builder().config(conf).getOrCreate()) { SparkSession spark = null;
try {
spark = SparkSession.builder().config(conf).getOrCreate();
// ----- READ ----- //TODO make graph table generic using enums
// dataset switch (graphTableName) {
Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasetDS = readGraphTable( case "dataset":
spark, String.format("%s/dataset", inputGraphPath), eu.dnetlib.dhp.schema.oaf.Dataset.class) processWith(spark,
.cache(); String.format("%s/%s", inputGraphPath, graphTableName),
datasetDS.printSchema(); inputActionSetPaths,
datasetDS.show(); outputGraphPath,
graphTableName,
// datasource strategy,
Dataset<Datasource> datasourceDS = eu.dnetlib.dhp.schema.oaf.Dataset.class);
readGraphTable(spark, String.format("%s/datasource", inputGraphPath), Datasource.class) break;
.cache(); case "datasource":
datasourceDS.printSchema(); processWith(spark,
datasourceDS.show(); String.format("%s/%s", inputGraphPath, graphTableName),
inputActionSetPaths,
// organization outputGraphPath,
Dataset<Organization> organizationDS = graphTableName,
readGraphTable(spark, String.format("%s/organization", inputGraphPath), Organization.class) strategy,
.cache(); Datasource.class);
organizationDS.printSchema(); break;
organizationDS.show(); case "organization":
processWith(spark,
// otherresearchproduct String.format("%s/%s", inputGraphPath, graphTableName),
Dataset<OtherResearchProduct> otherResearchProductDS = inputActionSetPaths,
readGraphTable(spark, String.format("%s/otherresearchproduct", inputGraphPath), OtherResearchProduct.class) outputGraphPath,
.cache(); graphTableName,
otherResearchProductDS.printSchema(); strategy,
otherResearchProductDS.show(); Organization.class);
break;
// project case "otherresearchproduct":
Dataset<Project> projectDS = processWith(spark,
readGraphTable(spark, String.format("%s/project", inputGraphPath), Project.class) String.format("%s/%s", inputGraphPath, graphTableName),
.cache(); inputActionSetPaths,
projectDS.printSchema(); outputGraphPath,
projectDS.show(); graphTableName,
strategy,
// publication OtherResearchProduct.class);
Dataset<Publication> publicationDS = break;
readGraphTable(spark, String.format("%s/publication", inputGraphPath), Publication.class) case "project":
.cache(); processWith(spark,
publicationDS.printSchema(); String.format("%s/%s", inputGraphPath, graphTableName),
publicationDS.show(); inputActionSetPaths,
outputGraphPath,
// relation graphTableName,
Dataset<Relation> relationDS = strategy,
readGraphTable(spark, String.format("%s/relation", inputGraphPath), Relation.class) Project.class);
.cache(); break;
relationDS.printSchema(); case "publication":
relationDS.show(); processWith(spark,
String.format("%s/%s", inputGraphPath, graphTableName),
// software inputActionSetPaths,
Dataset<Software> softwareDS = outputGraphPath,
readGraphTable(spark, String.format("%s/software", inputGraphPath), Software.class) graphTableName,
.cache(); strategy,
softwareDS.printSchema(); Publication.class);
softwareDS.show(); break;
case "relation":
// actions processWith(spark,
Dataset<String> actionPayloadDS = inputActionSetPaths.stream() String.format("%s/%s", inputGraphPath, graphTableName),
.map(inputActionSetPath -> readActionSetPayload(spark, inputActionSetPath)) inputActionSetPaths,
.reduce(Dataset::union) outputGraphPath,
.get() graphTableName,
.cache(); strategy,
actionPayloadDS.printSchema(); Relation.class);
actionPayloadDS.show(); break;
System.out.println(String.join("\n", actionPayloadDS.takeAsList(20))); case "software":
processWith(spark,
Dataset<String> relationActionPayloadDS = filterActionPayloadForRelations(actionPayloadDS) String.format("%s/%s", inputGraphPath, graphTableName),
.cache(); inputActionSetPaths,
relationActionPayloadDS.printSchema(); outputGraphPath,
relationActionPayloadDS.show(); graphTableName,
strategy,
Dataset<String> entityActionPayloadDS = filterActionPayloadForEntity(actionPayloadDS) Software.class);
.cache(); break;
entityActionPayloadDS.printSchema(); default:
entityActionPayloadDS.show(); throw new RuntimeException("error processing table: " + graphTableName);
}
// ----- LOGIC ----- } finally {
Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> processedDatasetDS = if (Objects.nonNull(spark) && isSparkSessionManaged) {
processEntityDS(datasetDS, entityActionPayloadDS, eu.dnetlib.dhp.schema.oaf.Dataset.class); spark.stop();
Dataset<Datasource> processedDatasourceDS = }
processEntityDS(datasourceDS, entityActionPayloadDS, Datasource.class);
Dataset<Organization> processedOrganizationDS =
processEntityDS(organizationDS, entityActionPayloadDS, Organization.class);
Dataset<OtherResearchProduct> processedOtherResearchProductDS =
processEntityDS(otherResearchProductDS, entityActionPayloadDS, OtherResearchProduct.class);
Dataset<Project> processedProjectDS =
processEntityDS(projectDS, entityActionPayloadDS, Project.class);
Dataset<Publication> processedPublicationDS =
processEntityDS(publicationDS, entityActionPayloadDS, Publication.class);
Dataset<Relation> processedRelationDS =
processRelationDS(relationDS, relationActionPayloadDS);
Dataset<Software> processedSoftwareDS =
processEntityDS(softwareDS, entityActionPayloadDS, Software.class);
// ----- SAVE -----
processedDatasetDS.write()
.save(String.format("%s/dataset", outputGraphPath));
processedDatasourceDS.write()
.save(String.format("%s/datasource", outputGraphPath));
processedOrganizationDS.write()
.save(String.format("%s/organization", outputGraphPath));
processedOtherResearchProductDS.write()
.save(String.format("%s/otherresearchproduct", outputGraphPath));
processedProjectDS.write()
.save(String.format("%s/project", outputGraphPath));
processedPublicationDS.write()
.save(String.format("%s/publication", outputGraphPath));
processedRelationDS.write()
.save(String.format("%s/relation", outputGraphPath));
processedSoftwareDS.write()
.save(String.format("%s/software", outputGraphPath));
} }
} }
@ -160,7 +166,7 @@ public class PromoteActionSetFromHDFSJob {
StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty()) StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty())
)); ));
private static <T extends Oaf> Dataset<T> readGraphTable(SparkSession spark, String path, Class<T> clazz) { private static <T> Dataset<T> readGraphTable(SparkSession spark, String path, Class<T> clazz) {
JavaRDD<Row> rows = JavaSparkContext JavaRDD<Row> rows = JavaSparkContext
.fromSparkContext(spark.sparkContext()) .fromSparkContext(spark.sparkContext())
.sequenceFile(path, Text.class, Text.class) .sequenceFile(path, Text.class, Text.class)
@ -168,7 +174,15 @@ public class PromoteActionSetFromHDFSJob {
return spark.createDataFrame(rows, KV_SCHEMA) return spark.createDataFrame(rows, KV_SCHEMA)
.map((MapFunction<Row, T>) row -> new ObjectMapper().readValue(row.<String>getAs("value"), clazz), .map((MapFunction<Row, T>) row -> new ObjectMapper().readValue(row.<String>getAs("value"), clazz),
Encoders.bean(clazz)); Encoders.kryo(clazz));
}
private static Dataset<String> readActionSetPayloads(SparkSession spark, String inputActionSetPaths) {
return Arrays
.stream(inputActionSetPaths.split(","))
.map(inputActionSetPath -> readActionSetPayload(spark, inputActionSetPath))
.reduce(Dataset::union)
.get();
} }
private static Dataset<String> readActionSetPayload(SparkSession spark, String inputActionSetPath) { private static Dataset<String> readActionSetPayload(SparkSession spark, String inputActionSetPath) {
@ -183,54 +197,99 @@ public class PromoteActionSetFromHDFSJob {
.as(Encoders.STRING()); .as(Encoders.STRING());
} }
private static Dataset<String> filterActionPayloadForRelations(Dataset<String> actionPayloadDS) { private static <T extends Oaf> void processWith(SparkSession spark,
return actionPayloadDS String inputGraphTablePath,
.where(get_json_object(col("target_value_json"), "$.kind").equalTo("relation")); String inputActionSetPaths,
String outputGraphPath,
String graphTableName,
OafMergeAndGet.Strategy strategy,
Class<T> clazz) {
// System.out.println("===== tableDS =====");
Dataset<T> tableDS = readGraphTable(spark, inputGraphTablePath, clazz)
.cache();
// tableDS.printSchema();
// tableDS.show();
// tableDS.explain();
// System.out.println("DEBUG: tableDS.partitions=" + tableDS.rdd().getNumPartitions());
// System.out.println("===== actionPayloadDS =====");
Dataset<String> actionPayloadDS = readActionSetPayloads(spark, inputActionSetPaths)
.cache();
// actionPayloadDS.printSchema();
// actionPayloadDS.show();
// actionPayloadDS.explain();
// System.out.println("DEBUG: actionPayloadDS.partitions=" + actionPayloadDS.rdd().getNumPartitions());
// System.out.println("===== processed =====");
Dataset<T> processed = processGraphTable(tableDS, actionPayloadDS, strategy, clazz);
// processed.printSchema();
// processed.show();
// processed.explain();
// System.out.println("DEBUG: processed.partitions=" + processed.rdd().getNumPartitions());
// System.out.println("===== result =====");
Dataset<T> result = processed
.map((MapFunction<T, T>) value -> value, Encoders.bean(clazz));
// result.printSchema();
// result.show();
// result.explain();
// System.out.println("DEBUG: result.partitions=" + result.rdd().getNumPartitions());
String outputGraphTablePath = String.format("%s/%s", outputGraphPath, graphTableName);
result.write()
.format("parquet")
.save(outputGraphTablePath);
} }
private static Dataset<String> filterActionPayloadForEntity(Dataset<String> actionPayloadDS) { private static <T extends Oaf> Dataset<T> processGraphTable(Dataset<T> tableDS,
return actionPayloadDS Dataset<String> actionPayloadDS,
.where(get_json_object(col("target_value_json"), "$.kind").equalTo("entity")); OafMergeAndGet.Strategy strategy,
}
private static <T extends OafEntity> Dataset<T> processEntityDS(Dataset<T> entityDS,
Dataset<String> actionPayloadDS,
Class<T> clazz) {
Dataset<T> groupedAndMerged = groupEntitiesByIdAndMerge(entityDS, clazz);
Dataset<T> joinedAndMerged = joinEntitiesWithActionPayloadAndMerge(groupedAndMerged,
actionPayloadDS,
PromoteActionSetFromHDFSJob::entityToActionPayloadJoinExpr,
PromoteActionSetFromHDFSJob::actionPayloadToEntity,
clazz);
return groupEntitiesByIdAndMerge(joinedAndMerged, clazz);
}
private static <T extends OafEntity> Column entityToActionPayloadJoinExpr(Dataset<T> left,
Dataset<String> right) {
return left.col("id").equalTo(
get_json_object(right.col("target_value_json"), "$.entity.id"));
}
public static <T extends OafEntity> T actionPayloadToEntity(String actionPayload,
Class<T> clazz) { Class<T> clazz) {
try { SerializableSupplier<Function<T, String>> oafIdFn = () -> x -> {
OafProtos.Oaf oldEntity = new ObjectMapper().readValue(actionPayload, OafProtos.Oaf.class); if (x instanceof Relation) {
return entityOldToNew(oldEntity, clazz); Relation r = (Relation) x;
} catch (IOException e) { return Optional.ofNullable(r.getSource())
throw new RuntimeException(e); .map(source -> Optional.ofNullable(r.getTarget())
} .map(target -> Optional.ofNullable(r.getRelType())
} .map(relType -> Optional.ofNullable(r.getSubRelType())
.map(subRelType -> Optional.ofNullable(r.getRelClass())
.map(relClass -> String.join(source, target, relType, subRelType, relClass))
.orElse(String.join(source, target, relType, subRelType))
)
.orElse(String.join(source, target, relType))
)
.orElse(String.join(source, target))
)
.orElse(source)
)
.orElse(null);
}
return ((OafEntity) x).getId();
};
//TODO SerializableSupplier<BiFunction<T, T, T>> mergeAndGetFn = OafMergeAndGet.functionFor(strategy);
private static <T extends OafEntity> T entityOldToNew(OafProtos.Oaf old,
Class<T> clazz) {
return null;
}
//TODO Dataset<T> joinedAndMerged = PromoteActionSetFromHDFSFunctions
private static Dataset<Relation> processRelationDS(Dataset<Relation> relationDS, .joinOafEntityWithActionPayloadAndMerge(
Dataset<String> actionPayloadDS) { tableDS,
return null; actionPayloadDS,
oafIdFn,
() -> (json, c) -> {
try {
return new ObjectMapper().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES).readValue(json, clazz);
} catch (IOException e) {
return null;
}
},
mergeAndGetFn,
clazz);
return PromoteActionSetFromHDFSFunctions
.groupOafByIdAndMerge(
joinedAndMerged,
oafIdFn,
mergeAndGetFn,
clazz
);
} }
} }

View File

@ -1,26 +1,38 @@
[ [
{ {
"paramName": "mt", "paramName": "issm",
"paramLongName": "master", "paramLongName": "isSparkSessionManaged",
"paramDescription": "should be local or yarn", "paramDescription": "TODO",
"paramRequired": true "paramRequired": false
}, },
{ {
"paramName": "ig", "paramName": "igp",
"paramLongName": "inputGraphPath", "paramLongName": "inputGraphPath",
"paramDescription": "#TODO: input graph path", "paramDescription": "TODO: input graph path",
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "ia", "paramName": "iasp",
"paramLongName": "inputActionSetPaths", "paramLongName": "inputActionSetPaths",
"paramDescription": "#TODO: comma separated list of paths to input action sets", "paramDescription": "TODO: comma separated list of paths to input action sets",
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "og", "paramName": "gtn",
"paramLongName": "graphTableName",
"paramDescription": "TODO",
"paramRequired": true
},
{
"paramName": "ogp",
"paramLongName": "outputGraphPath", "paramLongName": "outputGraphPath",
"paramDescription": "#TODO: the path of the result DataFrame on HDFS", "paramDescription": "TODO: the path of the result DataFrame on HDFS",
"paramRequired": true
},
{
"paramName": "mags",
"paramLongName": "mergeAndGetStrategy",
"paramDescription": "TODO",
"paramRequired": true "paramRequired": true
} }
] ]