diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java new file mode 100644 index 000000000..5daaceeea --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java @@ -0,0 +1,113 @@ +package eu.dnetlib.dhp.projecttoresult; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.countrypropagation.PrepareDatasourceCountryAssociation; +import eu.dnetlib.dhp.schema.oaf.Relation; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.TaskResultLost; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.PropagationConstant.getConstraintList; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +public class PrepareResultProjectAssociation { + private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception{ + + String jsonConfiguration = IOUtils.toString(PrepareDatasourceCountryAssociation.class + .getResourceAsStream("/eu/dnetlib/dhp/countrypropagation/input_prepareprojecttoresult_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String potentialUpdatePath = parser.get("potentialUpdatePath"); + log.info("potentialUpdatePath {}: ", potentialUpdatePath); + + String alreadyLinkedPath = parser.get("alreadyLinkedPath"); + log.info("alreadyLinkedPath: {} ", alreadyLinkedPath); + + final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); + log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel)); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + + runWithSparkHiveSession(conf, isSparkSessionManaged, + spark -> { + createOutputDirs(potentialUpdatePath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); + prepareResultProjAssoc(spark, inputPath, potentialUpdatePath, alreadyLinkedPath, allowedsemrel); + + }); + } + + private static void prepareResultProjAssoc(SparkSession spark, String inputPath, String potentialUpdatePath, + String alreadyLinkedPath, List allowedsemrel) { + JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); + + relation.createOrReplaceTempView("relation"); + + String query = "SELECT source, target " + + " FROM relation " + + " WHERE datainfo.deletedbyinference = false " + + " AND relClass = '" + RELATION_RESULT_PROJECT_REL_CLASS + "'"; + + Dataset resproj_relation = spark.sql(query); + resproj_relation.createOrReplaceTempView("resproj_relation"); + + query ="SELECT projectId, collect_set(r1target) resultSet " + + "FROM (" + + " SELECT r1.source as source, r1.target as r1target, r2.target as proj " + + " FROM (SELECT source, target " + + " FROM relation " + + " WHERE datainfo.deletedbyinference = false " + + getConstraintList(" relClass = '", allowedsemrel ) + ") r1" + + " JOIN resproj_relation r2 " + + " ON r1.source = r2.source " + + " ) tmp " + + "GROUP BY proj "; + + spark.sql(query).as(Encoders.bean(ProjectResultSet.class)) + .toJavaRDD() + .map(r -> OBJECT_MAPPER.writeValueAsString(r)) + .saveAsTextFile(potentialUpdatePath, GzipCodec.class); + + + query = "SELECT target, collect_set(source) result_list " + + "FROM resproj_relation " + + "GROUP BY target"; + + spark.sql(query) + .as(Encoders.bean(ProjectResultSet.class)) + .toJavaRDD() + .map(r -> OBJECT_MAPPER.writeValueAsString(r)) + .saveAsTextFile(alreadyLinkedPath, GzipCodec.class); + + + } +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareResultProjectAssociation.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareResultProjectAssociation.java deleted file mode 100644 index d7c29a697..000000000 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareResultProjectAssociation.java +++ /dev/null @@ -1,69 +0,0 @@ -package eu.dnetlib.dhp.projecttoresult; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.gson.Gson; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.countrypropagation.PrepareDatasourceCountryAssociation; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.spark.SparkConf; -import org.apache.spark.TaskResultLost; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.List; -import java.util.Optional; - -import static eu.dnetlib.dhp.PropagationConstant.TRUE; -import static eu.dnetlib.dhp.PropagationConstant.createOutputDirs; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; - -public class PrepareResultProjectAssociation { - private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - public static void main(String[] args) throws Exception{ - - String jsonConfiguration = IOUtils.toString(PrepareDatasourceCountryAssociation.class - .getResourceAsStream("/eu/dnetlib/dhp/countrypropagation/input_projecttoresult_parameters.json")); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - jsonConfiguration); - - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath {}: ", outputPath); - - boolean writeUpdates = TRUE.equals(parser.get("writeUpdate")); - log.info("writeUpdates: {} ", writeUpdates); - - boolean saveGraph = TRUE.equals(parser.get("saveGraph")); - log.info("saveGraph {}", saveGraph); - - final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); - log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel)); - - SparkConf conf = new SparkConf(); - conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - - runWithSparkHiveSession(conf, isSparkSessionManaged, - spark -> { - createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - - }); - - - - } -} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/ProjectResultSet.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/ProjectResultSet.java new file mode 100644 index 000000000..39af1f59c --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/ProjectResultSet.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.projecttoresult; + +public class ProjectResultSet { +} diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_prepareassoc_parameters.json b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_prepareassoc_parameters.json new file mode 100644 index 000000000..5d66e4edb --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_prepareassoc_parameters.json @@ -0,0 +1,62 @@ +[ + { + "paramName":"mt", + "paramLongName":"master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName":"h", + "paramLongName":"hive_metastore_uris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName":"wu", + "paramLongName":"writeUpdate", + "paramDescription": "true if the update must be written. No double check if information is already present", + "paramRequired": false + }, + { + "paramName":"sg", + "paramLongName":"saveGraph", + "paramDescription": "true if the new version of the graph must be saved", + "paramRequired": false + }, + { + "paramName":"tn", + "paramLongName":"resultTableName", + "paramDescription": "the name of the result table we are currently working on", + "paramRequired": false + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": false + }, + { + "paramName": "p", + "paramLongName": "preparedInfoPath", + "paramDescription": "the path where prepared info have been stored", + "paramRequired": false + }, + { + "paramName": "w", + "paramLongName": "whitelist", + "paramDescription": "the datasource having a type different from the allowed ones but that we want to add anyway", + "paramRequired": true + }, + { + "paramName": "at", + "paramLongName": "allowedtypes", + "paramDescription": "the allowed datasource types for country propagation", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/input_prepareprojecttoresult_parameters.json b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/input_prepareprojecttoresult_parameters.json new file mode 100644 index 000000000..9c790f219 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/input_prepareprojecttoresult_parameters.json @@ -0,0 +1,39 @@ +[ + { + "paramName":"mt", + "paramLongName":"master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + + { + "paramName":"asr", + "paramLongName":"allowedsemrels", + "paramDescription": "the types of the allowed datasources. Split by ;", + "paramRequired": true + }, + { + "paramName":"h", + "paramLongName":"hive_metastore_uris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName":"wu", + "paramLongName":"writeUpdate", + "paramDescription": "true if the update must be writte. No double check if information is already present", + "paramRequired": true + }, + { + "paramName":"sg", + "paramLongName":"saveGraph", + "paramDescription": "true if the new version of the graph must be saved", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/countrypropagation/CountryPropagationJobTest.java b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/countrypropagation/CountryPropagationJobTest.java new file mode 100644 index 000000000..3c24abaf7 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/countrypropagation/CountryPropagationJobTest.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.countrypropagation; + +public class CountryPropagationJobTest { +} diff --git a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/countrypropagation/PrepareDataForTest.java b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/countrypropagation/PrepareDataForTest.java new file mode 100644 index 000000000..4fb31bf94 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/countrypropagation/PrepareDataForTest.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.countrypropagation; + +public class PrepareDataForTest { +}