diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJob.java index 099b0b720d..daaf047d14 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJob.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJob.java @@ -1,7 +1,7 @@ package eu.dnetlib.dhp.actionmanager; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.data.proto.OafProtos; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; @@ -10,15 +10,17 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.*; import org.apache.spark.sql.types.*; import java.io.IOException; import java.util.Arrays; -import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.function.Function; -import static eu.dnetlib.dhp.actionmanager.PromoteActionSetFromHDFSFunctions.*; import static org.apache.spark.sql.functions.*; public class PromoteActionSetFromHDFSJob { @@ -29,128 +31,132 @@ public class PromoteActionSetFromHDFSJob { .getResourceAsStream("/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json"))); parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); String inputGraphPath = parser.get("inputGraphPath"); - List inputActionSetPaths = Arrays.asList(parser.get("inputActionSetPaths").split(",")); + String inputActionSetPaths = parser.get("inputActionSetPaths"); + String graphTableName = parser.get("graphTableName"); String outputGraphPath = parser.get("outputGraphPath"); + OafMergeAndGet.Strategy strategy = OafMergeAndGet.Strategy.valueOf(parser.get("mergeAndGetStrategy")); SparkConf conf = new SparkConf(); - conf.setMaster(parser.get("master")); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(new Class[]{ + Author.class, + Context.class, + Country.class, + DataInfo.class, + eu.dnetlib.dhp.schema.oaf.Dataset.class, + Datasource.class, + ExternalReference.class, + ExtraInfo.class, + Field.class, + GeoLocation.class, + Instance.class, + Journal.class, + KeyValue.class, + Oaf.class, + OafEntity.class, + OAIProvenance.class, + Organization.class, + OriginDescription.class, + OtherResearchProduct.class, + Project.class, + Publication.class, + Qualifier.class, + Relation.class, + Result.class, + Software.class, + StructuredProperty.class + }); - try (SparkSession spark = SparkSession.builder().config(conf).getOrCreate()) { + SparkSession spark = null; + try { + spark = SparkSession.builder().config(conf).getOrCreate(); - // ----- READ ----- - // dataset - Dataset datasetDS = readGraphTable( - spark, String.format("%s/dataset", inputGraphPath), eu.dnetlib.dhp.schema.oaf.Dataset.class) - .cache(); - datasetDS.printSchema(); - datasetDS.show(); - - // datasource - Dataset datasourceDS = - readGraphTable(spark, String.format("%s/datasource", inputGraphPath), Datasource.class) - .cache(); - datasourceDS.printSchema(); - datasourceDS.show(); - - // organization - Dataset organizationDS = - readGraphTable(spark, String.format("%s/organization", inputGraphPath), Organization.class) - .cache(); - organizationDS.printSchema(); - organizationDS.show(); - - // otherresearchproduct - Dataset otherResearchProductDS = - readGraphTable(spark, String.format("%s/otherresearchproduct", inputGraphPath), OtherResearchProduct.class) - .cache(); - otherResearchProductDS.printSchema(); - otherResearchProductDS.show(); - - // project - Dataset projectDS = - readGraphTable(spark, String.format("%s/project", inputGraphPath), Project.class) - .cache(); - projectDS.printSchema(); - projectDS.show(); - - // publication - Dataset publicationDS = - readGraphTable(spark, String.format("%s/publication", inputGraphPath), Publication.class) - .cache(); - publicationDS.printSchema(); - publicationDS.show(); - - // relation - Dataset relationDS = - readGraphTable(spark, String.format("%s/relation", inputGraphPath), Relation.class) - .cache(); - relationDS.printSchema(); - relationDS.show(); - - // software - Dataset softwareDS = - readGraphTable(spark, String.format("%s/software", inputGraphPath), Software.class) - .cache(); - softwareDS.printSchema(); - softwareDS.show(); - - // actions - Dataset actionPayloadDS = inputActionSetPaths.stream() - .map(inputActionSetPath -> readActionSetPayload(spark, inputActionSetPath)) - .reduce(Dataset::union) - .get() - .cache(); - actionPayloadDS.printSchema(); - actionPayloadDS.show(); - System.out.println(String.join("\n", actionPayloadDS.takeAsList(20))); - - Dataset relationActionPayloadDS = filterActionPayloadForRelations(actionPayloadDS) - .cache(); - relationActionPayloadDS.printSchema(); - relationActionPayloadDS.show(); - - Dataset entityActionPayloadDS = filterActionPayloadForEntity(actionPayloadDS) - .cache(); - entityActionPayloadDS.printSchema(); - entityActionPayloadDS.show(); - - // ----- LOGIC ----- - Dataset processedDatasetDS = - processEntityDS(datasetDS, entityActionPayloadDS, eu.dnetlib.dhp.schema.oaf.Dataset.class); - Dataset processedDatasourceDS = - processEntityDS(datasourceDS, entityActionPayloadDS, Datasource.class); - Dataset processedOrganizationDS = - processEntityDS(organizationDS, entityActionPayloadDS, Organization.class); - Dataset processedOtherResearchProductDS = - processEntityDS(otherResearchProductDS, entityActionPayloadDS, OtherResearchProduct.class); - Dataset processedProjectDS = - processEntityDS(projectDS, entityActionPayloadDS, Project.class); - Dataset processedPublicationDS = - processEntityDS(publicationDS, entityActionPayloadDS, Publication.class); - Dataset processedRelationDS = - processRelationDS(relationDS, relationActionPayloadDS); - Dataset processedSoftwareDS = - processEntityDS(softwareDS, entityActionPayloadDS, Software.class); - - // ----- SAVE ----- - processedDatasetDS.write() - .save(String.format("%s/dataset", outputGraphPath)); - processedDatasourceDS.write() - .save(String.format("%s/datasource", outputGraphPath)); - processedOrganizationDS.write() - .save(String.format("%s/organization", outputGraphPath)); - processedOtherResearchProductDS.write() - .save(String.format("%s/otherresearchproduct", outputGraphPath)); - processedProjectDS.write() - .save(String.format("%s/project", outputGraphPath)); - processedPublicationDS.write() - .save(String.format("%s/publication", outputGraphPath)); - processedRelationDS.write() - .save(String.format("%s/relation", outputGraphPath)); - processedSoftwareDS.write() - .save(String.format("%s/software", outputGraphPath)); + //TODO make graph table generic using enums + switch (graphTableName) { + case "dataset": + processWith(spark, + String.format("%s/%s", inputGraphPath, graphTableName), + inputActionSetPaths, + outputGraphPath, + graphTableName, + strategy, + eu.dnetlib.dhp.schema.oaf.Dataset.class); + break; + case "datasource": + processWith(spark, + String.format("%s/%s", inputGraphPath, graphTableName), + inputActionSetPaths, + outputGraphPath, + graphTableName, + strategy, + Datasource.class); + break; + case "organization": + processWith(spark, + String.format("%s/%s", inputGraphPath, graphTableName), + inputActionSetPaths, + outputGraphPath, + graphTableName, + strategy, + Organization.class); + break; + case "otherresearchproduct": + processWith(spark, + String.format("%s/%s", inputGraphPath, graphTableName), + inputActionSetPaths, + outputGraphPath, + graphTableName, + strategy, + OtherResearchProduct.class); + break; + case "project": + processWith(spark, + String.format("%s/%s", inputGraphPath, graphTableName), + inputActionSetPaths, + outputGraphPath, + graphTableName, + strategy, + Project.class); + break; + case "publication": + processWith(spark, + String.format("%s/%s", inputGraphPath, graphTableName), + inputActionSetPaths, + outputGraphPath, + graphTableName, + strategy, + Publication.class); + break; + case "relation": + processWith(spark, + String.format("%s/%s", inputGraphPath, graphTableName), + inputActionSetPaths, + outputGraphPath, + graphTableName, + strategy, + Relation.class); + break; + case "software": + processWith(spark, + String.format("%s/%s", inputGraphPath, graphTableName), + inputActionSetPaths, + outputGraphPath, + graphTableName, + strategy, + Software.class); + break; + default: + throw new RuntimeException("error processing table: " + graphTableName); + } + } finally { + if (Objects.nonNull(spark) && isSparkSessionManaged) { + spark.stop(); + } } } @@ -160,7 +166,7 @@ public class PromoteActionSetFromHDFSJob { StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty()) )); - private static Dataset readGraphTable(SparkSession spark, String path, Class clazz) { + private static Dataset readGraphTable(SparkSession spark, String path, Class clazz) { JavaRDD rows = JavaSparkContext .fromSparkContext(spark.sparkContext()) .sequenceFile(path, Text.class, Text.class) @@ -168,7 +174,15 @@ public class PromoteActionSetFromHDFSJob { return spark.createDataFrame(rows, KV_SCHEMA) .map((MapFunction) row -> new ObjectMapper().readValue(row.getAs("value"), clazz), - Encoders.bean(clazz)); + Encoders.kryo(clazz)); + } + + private static Dataset readActionSetPayloads(SparkSession spark, String inputActionSetPaths) { + return Arrays + .stream(inputActionSetPaths.split(",")) + .map(inputActionSetPath -> readActionSetPayload(spark, inputActionSetPath)) + .reduce(Dataset::union) + .get(); } private static Dataset readActionSetPayload(SparkSession spark, String inputActionSetPath) { @@ -183,54 +197,99 @@ public class PromoteActionSetFromHDFSJob { .as(Encoders.STRING()); } - private static Dataset filterActionPayloadForRelations(Dataset actionPayloadDS) { - return actionPayloadDS - .where(get_json_object(col("target_value_json"), "$.kind").equalTo("relation")); + private static void processWith(SparkSession spark, + String inputGraphTablePath, + String inputActionSetPaths, + String outputGraphPath, + String graphTableName, + OafMergeAndGet.Strategy strategy, + Class clazz) { +// System.out.println("===== tableDS ====="); + Dataset tableDS = readGraphTable(spark, inputGraphTablePath, clazz) + .cache(); +// tableDS.printSchema(); +// tableDS.show(); +// tableDS.explain(); +// System.out.println("DEBUG: tableDS.partitions=" + tableDS.rdd().getNumPartitions()); + +// System.out.println("===== actionPayloadDS ====="); + Dataset actionPayloadDS = readActionSetPayloads(spark, inputActionSetPaths) + .cache(); +// actionPayloadDS.printSchema(); +// actionPayloadDS.show(); +// actionPayloadDS.explain(); +// System.out.println("DEBUG: actionPayloadDS.partitions=" + actionPayloadDS.rdd().getNumPartitions()); + +// System.out.println("===== processed ====="); + Dataset processed = processGraphTable(tableDS, actionPayloadDS, strategy, clazz); +// processed.printSchema(); +// processed.show(); +// processed.explain(); +// System.out.println("DEBUG: processed.partitions=" + processed.rdd().getNumPartitions()); + +// System.out.println("===== result ====="); + Dataset result = processed + .map((MapFunction) value -> value, Encoders.bean(clazz)); +// result.printSchema(); +// result.show(); +// result.explain(); +// System.out.println("DEBUG: result.partitions=" + result.rdd().getNumPartitions()); + + String outputGraphTablePath = String.format("%s/%s", outputGraphPath, graphTableName); + result.write() + .format("parquet") + .save(outputGraphTablePath); } - private static Dataset filterActionPayloadForEntity(Dataset actionPayloadDS) { - return actionPayloadDS - .where(get_json_object(col("target_value_json"), "$.kind").equalTo("entity")); - } - - - private static Dataset processEntityDS(Dataset entityDS, - Dataset actionPayloadDS, - Class clazz) { - Dataset groupedAndMerged = groupEntitiesByIdAndMerge(entityDS, clazz); - Dataset joinedAndMerged = joinEntitiesWithActionPayloadAndMerge(groupedAndMerged, - actionPayloadDS, - PromoteActionSetFromHDFSJob::entityToActionPayloadJoinExpr, - PromoteActionSetFromHDFSJob::actionPayloadToEntity, - clazz); - return groupEntitiesByIdAndMerge(joinedAndMerged, clazz); - } - - private static Column entityToActionPayloadJoinExpr(Dataset left, - Dataset right) { - return left.col("id").equalTo( - get_json_object(right.col("target_value_json"), "$.entity.id")); - } - - public static T actionPayloadToEntity(String actionPayload, + private static Dataset processGraphTable(Dataset tableDS, + Dataset actionPayloadDS, + OafMergeAndGet.Strategy strategy, Class clazz) { - try { - OafProtos.Oaf oldEntity = new ObjectMapper().readValue(actionPayload, OafProtos.Oaf.class); - return entityOldToNew(oldEntity, clazz); - } catch (IOException e) { - throw new RuntimeException(e); - } - } + SerializableSupplier> oafIdFn = () -> x -> { + if (x instanceof Relation) { + Relation r = (Relation) x; + return Optional.ofNullable(r.getSource()) + .map(source -> Optional.ofNullable(r.getTarget()) + .map(target -> Optional.ofNullable(r.getRelType()) + .map(relType -> Optional.ofNullable(r.getSubRelType()) + .map(subRelType -> Optional.ofNullable(r.getRelClass()) + .map(relClass -> String.join(source, target, relType, subRelType, relClass)) + .orElse(String.join(source, target, relType, subRelType)) + ) + .orElse(String.join(source, target, relType)) + ) + .orElse(String.join(source, target)) + ) + .orElse(source) + ) + .orElse(null); + } + return ((OafEntity) x).getId(); + }; - //TODO - private static T entityOldToNew(OafProtos.Oaf old, - Class clazz) { - return null; - } + SerializableSupplier> mergeAndGetFn = OafMergeAndGet.functionFor(strategy); - //TODO - private static Dataset processRelationDS(Dataset relationDS, - Dataset actionPayloadDS) { - return null; + Dataset joinedAndMerged = PromoteActionSetFromHDFSFunctions + .joinOafEntityWithActionPayloadAndMerge( + tableDS, + actionPayloadDS, + oafIdFn, + () -> (json, c) -> { + try { + return new ObjectMapper().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES).readValue(json, clazz); + } catch (IOException e) { + return null; + } + }, + mergeAndGetFn, + clazz); + + return PromoteActionSetFromHDFSFunctions + .groupOafByIdAndMerge( + joinedAndMerged, + oafIdFn, + mergeAndGetFn, + clazz + ); } } diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json index 3dc02ef8c3..8face9e856 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json @@ -1,26 +1,38 @@ [ { - "paramName": "mt", - "paramLongName": "master", - "paramDescription": "should be local or yarn", - "paramRequired": true + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "TODO", + "paramRequired": false }, { - "paramName": "ig", + "paramName": "igp", "paramLongName": "inputGraphPath", - "paramDescription": "#TODO: input graph path", + "paramDescription": "TODO: input graph path", "paramRequired": true }, { - "paramName": "ia", + "paramName": "iasp", "paramLongName": "inputActionSetPaths", - "paramDescription": "#TODO: comma separated list of paths to input action sets", + "paramDescription": "TODO: comma separated list of paths to input action sets", "paramRequired": true }, { - "paramName": "og", + "paramName": "gtn", + "paramLongName": "graphTableName", + "paramDescription": "TODO", + "paramRequired": true + }, + { + "paramName": "ogp", "paramLongName": "outputGraphPath", - "paramDescription": "#TODO: the path of the result DataFrame on HDFS", + "paramDescription": "TODO: the path of the result DataFrame on HDFS", + "paramRequired": true + }, + { + "paramName": "mags", + "paramLongName": "mergeAndGetStrategy", + "paramDescription": "TODO", "paramRequired": true } ] \ No newline at end of file