diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob2.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java similarity index 58% rename from dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob2.java rename to dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index a6e49e93b..a6662b9fc 100644 --- a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob2.java +++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -22,21 +22,18 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.community.*; import eu.dnetlib.dhp.schema.oaf.*; -public class SparkBulkTagJob2 { +public class SparkBulkTagJob { - private static final Logger log = LoggerFactory.getLogger(SparkBulkTagJob2.class); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final Logger log = LoggerFactory.getLogger(SparkBulkTagJob.class); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( - SparkBulkTagJob2.class + SparkBulkTagJob.class .getResourceAsStream( "/eu/dnetlib/dhp/bulktag/input_bulkTag_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); Boolean isSparkSessionManaged = Optional @@ -58,7 +55,6 @@ public class SparkBulkTagJob2 { log.info("outputPath: {}", outputPath); ProtoMap protoMappingParams = new Gson().fromJson(parser.get("pathMap"), ProtoMap.class); - ; log.info("pathMap: {}", new Gson().toJson(protoMappingParams)); final String resultClassName = parser.get("resultTableName"); @@ -89,45 +85,6 @@ public class SparkBulkTagJob2 { spark -> { execBulkTag(spark, inputPath, outputPath, protoMappingParams, resultClazz, cc); }); - - // runWithSparkSession(conf, isSparkSessionManaged, - // spark -> { - // if(isTest(parser)) { - // removeOutputDir(spark, outputPath); - // } - // if(saveGraph) - // execPropagation(spark, possibleUpdates, inputPath, outputPath, - // resultClazz); - // }); - // - // - // - // - // - // - // sc.textFile(inputPath + "/publication") - // .map(item -> new ObjectMapper().readValue(item, Publication.class)) - // .map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams)) - // .map(p -> new ObjectMapper().writeValueAsString(p)) - // .saveAsTextFile(outputPath+"/publication"); - // sc.textFile(inputPath + "/dataset") - // .map(item -> new ObjectMapper().readValue(item, Dataset.class)) - // .map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams)) - // .map(p -> new ObjectMapper().writeValueAsString(p)) - // .saveAsTextFile(outputPath+"/dataset"); - // sc.textFile(inputPath + "/software") - // .map(item -> new ObjectMapper().readValue(item, Software.class)) - // .map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams)) - // .map(p -> new ObjectMapper().writeValueAsString(p)) - // .saveAsTextFile(outputPath+"/software"); - // sc.textFile(inputPath + "/otherresearchproduct") - // .map(item -> new ObjectMapper().readValue(item, - // OtherResearchProduct.class)) - // .map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams)) - // .map(p -> new ObjectMapper().writeValueAsString(p)) - // .saveAsTextFile(outputPath+"/otherresearchproduct"); - // - } private static void execBulkTag( @@ -139,28 +96,23 @@ public class SparkBulkTagJob2 { CommunityConfiguration communityConfiguration) { ResultTagger resultTagger = new ResultTagger(); - Dataset result = readPathEntity(spark, inputPath, resultClazz); - result - .map( - value -> resultTagger + readPath(spark, inputPath, resultClazz) + .map((MapFunction) value -> resultTagger .enrichContextCriteria( - value, communityConfiguration, protoMappingParams), - Encoders.bean(resultClazz)) - .toJSON() + value, communityConfiguration, protoMappingParams), + Encoders.bean(resultClazz)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .text(outputPath); + .json(outputPath); } - private static org.apache.spark.sql.Dataset readPathEntity( - SparkSession spark, String inputEntityPath, Class resultClazz) { - + private static Dataset readPath( + SparkSession spark, String inputEntityPath, Class clazz) { return spark .read() - .textFile(inputEntityPath) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, resultClazz), - Encoders.bean(resultClazz)); + .json(inputEntityPath) + .as(Encoders.bean(clazz)); } + } diff --git a/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml b/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml index 718ad40ec..2fea9ff41 100644 --- a/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml @@ -106,7 +106,7 @@ yarn-cluster cluster bulkTagging-publication - eu.dnetlib.dhp.bulktag.SparkBulkTagJob2 + eu.dnetlib.dhp.bulktag.SparkBulkTagJob dhp-bulktag-${projectVersion}.jar --num-executors=${sparkExecutorNumber} @@ -134,7 +134,7 @@ yarn-cluster cluster bulkTagging-dataset - eu.dnetlib.dhp.bulktag.SparkBulkTagJob2 + eu.dnetlib.dhp.bulktag.SparkBulkTagJob dhp-bulktag-${projectVersion}.jar --num-executors=${sparkExecutorNumber} @@ -162,7 +162,7 @@ yarn-cluster cluster bulkTagging-orp - eu.dnetlib.dhp.bulktag.SparkBulkTagJob2 + eu.dnetlib.dhp.bulktag.SparkBulkTagJob dhp-bulktag-${projectVersion}.jar --num-executors=${sparkExecutorNumber} @@ -190,7 +190,7 @@ yarn-cluster cluster bulkTagging-software - eu.dnetlib.dhp.bulktag.SparkBulkTagJob2 + eu.dnetlib.dhp.bulktag.SparkBulkTagJob dhp-bulktag-${projectVersion}.jar --num-executors=${sparkExecutorNumber} diff --git a/dhp-workflows/dhp-bulktag/src/test/java/eu/dnetlib/dhp/BulkTagJobTest.java b/dhp-workflows/dhp-bulktag/src/test/java/eu/dnetlib/dhp/BulkTagJobTest.java index cb5015ba1..75ecb0298 100644 --- a/dhp-workflows/dhp-bulktag/src/test/java/eu/dnetlib/dhp/BulkTagJobTest.java +++ b/dhp-workflows/dhp-bulktag/src/test/java/eu/dnetlib/dhp/BulkTagJobTest.java @@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.bulktag.SparkBulkTagJob2; +import eu.dnetlib.dhp.bulktag.SparkBulkTagJob; import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; import eu.dnetlib.dhp.schema.oaf.Publication; @@ -84,7 +84,7 @@ public class BulkTagJobTest { @Test public void noUpdatesTest() throws Exception { - SparkBulkTagJob2 + SparkBulkTagJob .main( new String[] { "-isTest", @@ -134,7 +134,7 @@ public class BulkTagJobTest { @Test public void bulktagBySubjectNoPreviousContextTest() throws Exception { - SparkBulkTagJob2 + SparkBulkTagJob .main( new String[] { "-isTest", @@ -240,7 +240,7 @@ public class BulkTagJobTest { @Test public void bulktagBySubjectPreviousContextNoProvenanceTest() throws Exception { - SparkBulkTagJob2 + SparkBulkTagJob .main( new String[] { "-isTest", @@ -332,7 +332,7 @@ public class BulkTagJobTest { @Test public void bulktagByDatasourceTest() throws Exception { - SparkBulkTagJob2 + SparkBulkTagJob .main( new String[] { "-isTest", @@ -415,7 +415,7 @@ public class BulkTagJobTest { @Test public void bulktagByZenodoCommunityTest() throws Exception { - SparkBulkTagJob2 + SparkBulkTagJob .main( new String[] { "-isTest", @@ -548,7 +548,7 @@ public class BulkTagJobTest { @Test public void bulktagBySubjectDatasourceTest() throws Exception { - SparkBulkTagJob2 + SparkBulkTagJob .main( new String[] { "-isTest", @@ -688,7 +688,7 @@ public class BulkTagJobTest { @Test public void bulktagBySubjectDatasourceZenodoCommunityTest() throws Exception { - SparkBulkTagJob2 + SparkBulkTagJob .main( new String[] { "-isTest", @@ -796,7 +796,7 @@ public class BulkTagJobTest { @Test public void bulktagDatasourcewithConstraintsTest() throws Exception { - SparkBulkTagJob2 + SparkBulkTagJob .main( new String[] { "-isTest", diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/PropagationConstant.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/PropagationConstant.java index ebd25fa35..8d2fede82 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/PropagationConstant.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/PropagationConstant.java @@ -1,12 +1,11 @@ package eu.dnetlib.dhp; -import java.io.IOException; -import java.util.*; +import java.util.List; +import java.util.Optional; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -67,6 +66,12 @@ public class PropagationConstant { public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String cfHbforResultQuery = "select distinct r.id, inst.collectedfrom.key cf, inst.hostedby.key hb " + + + "from result r " + + "lateral view explode(instance) i as inst " + + "where r.datainfo.deletedbyinference=false"; + public static Country getCountry(String classid, String classname) { Country nc = new Country(); nc.setClassid(classid); @@ -130,13 +135,6 @@ public class PropagationConstant { return ret; } - public static void createOutputDirs(String outputPath, FileSystem fs) throws IOException { - if (fs.exists(new Path(outputPath))) { - fs.delete(new Path(outputPath), true); - } - fs.mkdirs(new Path(outputPath)); - } - public static void removeOutputDir(SparkSession spark, String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } @@ -155,50 +153,17 @@ public class PropagationConstant { .orElse(Boolean.FALSE); } - public static void createCfHbforresult(SparkSession spark) { - String query; -// query = "SELECT id, inst.collectedfrom.key cf , inst.hostedby.key hb " -// + "FROM ( SELECT id, instance " -// + "FROM result " -// + " WHERE datainfo.deletedbyinference = false) ds " -// + "LATERAL VIEW EXPLODE(instance) i AS inst"; - query = "select distinct r.id, inst.collectedfrom.key cf, inst.hostedby.key hb " + - "from result r " + - "lateral view explode(instance) i as inst " + - "where r.datainfo.deletedbyinference=false"; - - org.apache.spark.sql.Dataset cfhb = spark.sql(query); + public static void createCfHbforResult(SparkSession spark) { + org.apache.spark.sql.Dataset cfhb = spark.sql(cfHbforResultQuery); cfhb.createOrReplaceTempView("cfhb"); } - public static org.apache.spark.sql.Dataset readPathEntity( - SparkSession spark, String inputEntityPath, Class resultClazz) { - - return spark - .read() - .textFile(inputEntityPath) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, resultClazz), - Encoders.bean(resultClazz)); - } - - public static org.apache.spark.sql.Dataset readRelations( - SparkSession spark, String inputPath) { + public static Dataset readPath( + SparkSession spark, String inputPath, Class clazz) { return spark .read() .textFile(inputPath) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, Relation.class), - Encoders.bean(Relation.class)); + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); } - public static org.apache.spark.sql.Dataset readResultCommunityList( - SparkSession spark, String possibleUpdatesPath) { - return spark - .read() - .textFile(possibleUpdatesPath) - .map( - value -> OBJECT_MAPPER.readValue(value, ResultCommunityList.class), - Encoders.bean(ResultCommunityList.class)); - } } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/QueryInformationSystem.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/QueryInformationSystem.java deleted file mode 100644 index c29043a2d..000000000 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/QueryInformationSystem.java +++ /dev/null @@ -1,20 +0,0 @@ - -package eu.dnetlib.dhp; - -import java.util.List; - -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; - -public class QueryInformationSystem { - private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')" - + " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri']" - + " and $x//CONFIGURATION/context/param[./@name='status']/text() != 'hidden'" - + " return $x//CONFIGURATION/context/@id/string()"; - - public static List getCommunityList(final String isLookupUrl) throws ISLookUpException { - ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); - return isLookUp.quickSearchProfile(XQUERY); - } -} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java index 56185eb72..e91a1e48a 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java @@ -13,6 +13,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +31,6 @@ import eu.dnetlib.dhp.schema.oaf.*; public class PrepareDatasourceCountryAssociation { private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(String[] args) throws Exception { @@ -80,31 +80,10 @@ public class PrepareDatasourceCountryAssociation { for (String i : whitelist) { whitelisted += " OR id = '" + i + "'"; } - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - Dataset datasource = spark - .createDataset( - sc - .textFile(inputPath + "/datasource") - .map(item -> OBJECT_MAPPER.readValue(item, Datasource.class)) - .rdd(), - Encoders.bean(Datasource.class)); - - Dataset relation = spark - .createDataset( - sc - .textFile(inputPath + "/relation") - .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)) - .rdd(), - Encoders.bean(Relation.class)); - - Dataset organization = spark - .createDataset( - sc - .textFile(inputPath + "/organization") - .map(item -> OBJECT_MAPPER.readValue(item, Organization.class)) - .rdd(), - Encoders.bean(Organization.class)); + Dataset datasource = readPath(spark, inputPath + "/datasource", Datasource.class); + Dataset relation = readPath(spark, inputPath + "/relation", Relation.class); + Dataset organization = readPath(spark, inputPath + "/organization", Organization.class); datasource.createOrReplaceTempView("datasource"); relation.createOrReplaceTempView("relation"); @@ -128,14 +107,15 @@ public class PrepareDatasourceCountryAssociation { + "JOIN (SELECT id, country " + " FROM organization " + " WHERE datainfo.deletedbyinference = false " - + " AND length(country.classid)>0) o " + + " AND length(country.classid) > 0) o " + "ON o.id = rel.target"; spark .sql(query) .as(Encoders.bean(DatasourceCountry.class)) - .toJavaRDD() - .map(c -> OBJECT_MAPPER.writeValueAsString(c)) - .saveAsTextFile(outputPath, GzipCodec.class); + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath); } } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountrySet.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountrySet.java index fec4a08ce..34b376413 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountrySet.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountrySet.java @@ -4,31 +4,31 @@ package eu.dnetlib.dhp.countrypropagation; import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; -import java.util.Arrays; -import java.util.stream.Collectors; - import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.*; public class PrepareResultCountrySet { private static final Logger log = LoggerFactory.getLogger(PrepareResultCountrySet.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String RESULT_COUNTRYSET_QUERY = "SELECT id resultId, collect_set(country) countrySet " + + "FROM ( SELECT id, country " + + "FROM datasource_country JOIN cfhb ON cf = dataSourceId " + + "UNION ALL " + + "SELECT id, country FROM datasource_country " + + "JOIN cfhb ON hb = dataSourceId ) tmp " + + "GROUP BY id"; public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( - SparkCountryPropagationJob2.class + PrepareResultCountrySet.class .getResourceAsStream( "/eu/dnetlib/dhp/countrypropagation/input_prepareresultcountry_parameters.json")); @@ -42,6 +42,9 @@ public class PrepareResultCountrySet { String inputPath = parser.get("sourcePath"); log.info("inputPath: {}", inputPath); + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + final String datasourcecountrypath = parser.get("preparedInfoPath"); log.info("preparedInfoPath: {}", datasourcecountrypath); @@ -60,75 +63,36 @@ public class PrepareResultCountrySet { getPotentialResultToUpdate( spark, inputPath, + outputPath, datasourcecountrypath, resultClazz); }); - } private static void getPotentialResultToUpdate( SparkSession spark, String inputPath, + String outputPath, String datasourcecountrypath, Class resultClazz) { - Dataset result = readPathEntity(spark, inputPath, resultClazz); + Dataset result = readPath(spark, inputPath, resultClazz); result.createOrReplaceTempView("result"); // log.info("number of results: {}", result.count()); - createCfHbforresult(spark); - Dataset datasourcecountryassoc = readAssocDatasourceCountry(spark, datasourcecountrypath); - countryPropagationAssoc(spark, datasourcecountryassoc) - .map((MapFunction) value -> { - R ret = resultClazz.newInstance(); - ret.setId(value.getResultId()); - ret - .setCountry( - value - .getCountrySet() - .stream() - .map(c -> getCountry(c.getClassid(), c.getClassname())) - .collect(Collectors.toList())); - return ret; - }, Encoders.bean(resultClazz)) + createCfHbforResult(spark); + + Dataset datasource_country = readPath(spark, datasourcecountrypath, DatasourceCountry.class); + + datasource_country.createOrReplaceTempView("datasource_country"); + // log.info("datasource_country number : {}", datasource_country.count()); + + spark + .sql(RESULT_COUNTRYSET_QUERY) + .as(Encoders.bean(ResultCountrySet.class)) .write() .option("compression", "gzip") .mode(SaveMode.Append) - .json(inputPath); + .json(outputPath); } - private static Dataset countryPropagationAssoc( - SparkSession spark, - Dataset datasource_country) { - - // Dataset datasource_country = broadcast_datasourcecountryassoc.value(); - datasource_country.createOrReplaceTempView("datasource_country"); - log.info("datasource_country number : {}", datasource_country.count()); - - String query = "SELECT id resultId, collect_set(country) countrySet " - + "FROM ( SELECT id, country " - + "FROM datasource_country " - + "JOIN cfhb " - + " ON cf = dataSourceId " - + "UNION ALL " - + "SELECT id , country " - + "FROM datasource_country " - + "JOIN cfhb " - + " ON hb = dataSourceId ) tmp " - + "GROUP BY id"; - Dataset potentialUpdates = spark - .sql(query) - .as(Encoders.bean(ResultCountrySet.class)); - // log.info("potential update number : {}", potentialUpdates.count()); - return potentialUpdates; - } - - private static Dataset readAssocDatasourceCountry( - SparkSession spark, String relationPath) { - return spark - .read() - .textFile(relationPath) - .map( - value -> OBJECT_MAPPER.readValue(value, DatasourceCountry.class), - Encoders.bean(DatasourceCountry.class)); - } } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob3.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java similarity index 59% rename from dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob3.java rename to dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index 8f12fcf3d..9dc17701b 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob3.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -5,17 +5,11 @@ import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.*; -import java.util.function.Function; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -26,15 +20,13 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.Country; -import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Result; import scala.Tuple2; -public class SparkCountryPropagationJob3 { +public class SparkCountryPropagationJob { - private static final Logger log = LoggerFactory.getLogger(SparkCountryPropagationJob3.class); + private static final Logger log = LoggerFactory.getLogger(SparkCountryPropagationJob.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -42,7 +34,7 @@ public class SparkCountryPropagationJob3 { String jsonConfiguration = IOUtils .toString( - SparkCountryPropagationJob3.class + SparkCountryPropagationJob.class .getResourceAsStream( "/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json")); @@ -53,8 +45,11 @@ public class SparkCountryPropagationJob3 { Boolean isSparkSessionManaged = isSparkSessionManaged(parser); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); + String sourcePath = parser.get("sourcePath"); + log.info("sourcePath: {}", sourcePath); + + String preparedInfoPath = parser.get("preparedInfoPath"); + log.info("preparedInfoPath: {}", preparedInfoPath); final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); @@ -76,7 +71,8 @@ public class SparkCountryPropagationJob3 { isSparkSessionManaged, spark -> execPropagation( spark, - inputPath, + sourcePath, + preparedInfoPath, outputPath, resultClazz, saveGraph)); @@ -84,21 +80,26 @@ public class SparkCountryPropagationJob3 { private static void execPropagation( SparkSession spark, - String inputPath, + String sourcePath, + String preparedInfoPath, String outputPath, Class resultClazz, boolean saveGraph) { if (saveGraph) { // updateResultTable(spark, potentialUpdates, inputPath, resultClazz, outputPath); - log.info("Reading Graph table from: {}", inputPath); + log.info("Reading Graph table from: {}", sourcePath); + Dataset res = readPath(spark, sourcePath, resultClazz); - spark + log.info("Reading prepared info: {}", preparedInfoPath); + Dataset prepared = spark .read() - .json(inputPath) - .as(Encoders.bean(resultClazz)) - .groupByKey((MapFunction) r -> r.getId(), Encoders.STRING()) - .mapGroups(getCountryMergeFn(resultClazz), Encoders.bean(resultClazz)) + .json(preparedInfoPath) + .as(Encoders.bean(ResultCountrySet.class)); + + res + .joinWith(prepared, res.col("id").equalTo(prepared.col("resultId")), "left_outer") + .map(getCountryMergeFn(), Encoders.bean(resultClazz)) .write() .option("compression", "gzip") .mode(SaveMode.Overwrite) @@ -106,37 +107,26 @@ public class SparkCountryPropagationJob3 { } } - private static MapGroupsFunction getCountryMergeFn(Class resultClazz) { - return (MapGroupsFunction) (key, values) -> { - R res = resultClazz.newInstance(); - List countries = new ArrayList<>(); - values.forEachRemaining(r -> { - res.mergeFrom(r); - countries.addAll(r.getCountry()); + private static MapFunction, R> getCountryMergeFn() { + return (MapFunction, R>) t -> { + Optional.ofNullable(t._2()).ifPresent(r -> { + t._1().getCountry().addAll(merge(t._1().getCountry(), r.getCountrySet())); }); - res - .setCountry( - countries - .stream() - .collect( - Collectors - .toMap( - Country::getClassid, - Function.identity(), - (c1, c2) -> { - if (Optional - .ofNullable( - c1.getDataInfo().getInferenceprovenance()) - .isPresent()) { - return c2; - } - return c1; - })) - .values() - .stream() - .collect(Collectors.toList())); - return res; + return t._1(); }; } + private static List merge(List c1, List c2) { + HashSet countries = c1 + .stream() + .map(c -> c.getClassid()) + .collect(Collectors.toCollection(HashSet::new)); + + return c2 + .stream() + .filter(c -> !countries.contains(c.getClassid())) + .map(c -> getCountry(c.getClassid(), c.getClassname())) + .collect(Collectors.toList()); + } + } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java deleted file mode 100644 index 059e388db..000000000 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java +++ /dev/null @@ -1,289 +0,0 @@ - -package eu.dnetlib.dhp.countrypropagation; - -import static eu.dnetlib.dhp.PropagationConstant.*; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; -import static jdk.nashorn.internal.objects.NativeDebug.map; - -import java.util.*; -import java.util.stream.Collectors; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.broadcast.Broadcast; -import org.apache.spark.sql.*; -import org.apache.spark.sql.Dataset; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.*; -import scala.Tuple2; - -public class SparkCountryPropagationJob2 { - - private static final Logger log = LoggerFactory.getLogger(SparkCountryPropagationJob2.class); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - public static void main(String[] args) throws Exception { - - String jsonConfiguration = IOUtils - .toString( - SparkCountryPropagationJob2.class - .getResourceAsStream( - "/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json")); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - - parser.parseArgument(args); - - Boolean isSparkSessionManaged = isSparkSessionManaged(parser); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - final String datasourcecountrypath = parser.get("preparedInfoPath"); - log.info("preparedInfoPath: {}", datasourcecountrypath); - - final String resultClassName = parser.get("resultTableName"); - log.info("resultTableName: {}", resultClassName); - - final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); - log.info("resultType: {}", resultType); - - final String possibleUpdatesPath = datasourcecountrypath - .substring(0, datasourcecountrypath.lastIndexOf("/") + 1) - + "possibleUpdates/" + resultType; - log.info("possibleUpdatesPath: {}", possibleUpdatesPath); - - final Boolean saveGraph = Optional - .ofNullable(parser.get("saveGraph")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("saveGraph: {}", saveGraph); - - Class resultClazz = (Class) Class.forName(resultClassName); - - SparkConf conf = new SparkConf(); - conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - - runWithSparkHiveSession( - conf, - isSparkSessionManaged, - spark -> { - removeOutputDir(spark, possibleUpdatesPath); - execPropagation( - spark, - datasourcecountrypath, - inputPath, - outputPath, - resultClazz, - saveGraph, possibleUpdatesPath); - }); - } - - private static void execPropagation( - SparkSession spark, - String datasourcecountrypath, - String inputPath, - String outputPath, - Class resultClazz, - boolean saveGraph, String possilbeUpdatesPath) { - // final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - - // Load file with preprocessed association datasource - country - Dataset datasourcecountryassoc = readAssocDatasourceCountry(spark, datasourcecountrypath); - // broadcasting the result of the preparation step - // Broadcast> broadcast_datasourcecountryassoc = - // sc.broadcast(datasourcecountryassoc); - - Dataset potentialUpdates = getPotentialResultToUpdate( - spark, inputPath, resultClazz, datasourcecountryassoc) - .as(Encoders.bean(ResultCountrySet.class)); - - potentialUpdates.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(possilbeUpdatesPath); - - if (saveGraph) { - // updateResultTable(spark, potentialUpdates, inputPath, resultClazz, outputPath); - potentialUpdates = spark - .read() - .textFile(possilbeUpdatesPath) - .map( - (MapFunction) value -> OBJECT_MAPPER - .readValue(value, ResultCountrySet.class), - Encoders.bean(ResultCountrySet.class)); - updateResultTable(spark, potentialUpdates, inputPath, resultClazz, outputPath); - } - } - - private static void updateResultTable( - SparkSession spark, - Dataset potentialUpdates, - String inputPath, - Class resultClazz, - String outputPath) { - - log.info("Reading Graph table from: {}", inputPath); - Dataset result = readPathEntity(spark, inputPath, resultClazz); - - Dataset new_table = result - .joinWith( - potentialUpdates, result - .col("id") - .equalTo(potentialUpdates.col("resultId")), - "left_outer") - .map((MapFunction, R>) value -> { - R r = value._1(); - Optional potentialNewCountries = Optional.ofNullable(value._2()); - if (potentialNewCountries.isPresent()) { - HashSet countries = r - .getCountry() - .stream() - .map(c -> c.getClassid()) - .collect(Collectors.toCollection(HashSet::new)); - - r - .getCountry() - .addAll( - potentialNewCountries - .get() - .getCountrySet() - .stream() - .filter(c -> !countries.contains(c.getClassid())) - .map(c -> getCountry(c.getClassid(), c.getClassname())) - .collect(Collectors.toList())); - -// Result res = new Result(); -// res.setId(r.getId()); -// List countryList = new ArrayList<>(); -// for (CountrySbs country : potentialNewCountries -// .get() -// .getCountrySet()) { -// if (!countries.contains(country.getClassid())) { -// countryList -// .add( -// getCountry( -// country.getClassid(), -// country.getClassname())); -// } -// } -// res.setCountry(countryList); -// r.mergeFrom(res); - } - return r; - }, Encoders.bean(resultClazz)); -// Dataset> result_pair = result -// .map( -// r -> new Tuple2<>(r.getId(), r), -// Encoders.tuple(Encoders.STRING(), Encoders.bean(resultClazz))); -// -// Dataset new_table = result_pair -// .joinWith( -// potentialUpdates, -// result_pair.col("_1").equalTo(potentialUpdates.col("resultId")), -// "left_outer") -// .map( -// (MapFunction, ResultCountrySet>, R>) value -> { -// R r = value._1()._2(); -// Optional potentialNewCountries = Optional.ofNullable(value._2()); -// if (potentialNewCountries.isPresent()) { -// HashSet countries = new HashSet<>(); -// for (Qualifier country : r.getCountry()) { -// countries.add(country.getClassid()); -// } -// Result res = new Result(); -// res.setId(r.getId()); -// List countryList = new ArrayList<>(); -// for (CountrySbs country : potentialNewCountries -// .get() -// .getCountrySet()) { -// if (!countries.contains(country.getClassid())) { -// countryList -// .add( -// getCountry( -// country.getClassid(), -// country.getClassname())); -// } -// } -// res.setCountry(countryList); -// r.mergeFrom(res); -// } -// return r; -// }, -// Encoders.bean(resultClazz)); - - log.info("Saving graph table to path: {}", outputPath); - log.info("number of saved recordsa: {}", new_table.count()); - new_table.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(outputPath); - - } - - private static Dataset getPotentialResultToUpdate( - SparkSession spark, - String inputPath, - Class resultClazz, - Dataset datasourcecountryassoc) { - - Dataset result = readPathEntity(spark, inputPath, resultClazz); - result.createOrReplaceTempView("result"); - // log.info("number of results: {}", result.count()); - createCfHbforresult(spark); - return countryPropagationAssoc(spark, datasourcecountryassoc); - } - - private static Dataset countryPropagationAssoc( - SparkSession spark, - Dataset datasource_country) { - - // Dataset datasource_country = broadcast_datasourcecountryassoc.value(); - datasource_country.createOrReplaceTempView("datasource_country"); - log.info("datasource_country number : {}", datasource_country.count()); - - String query = "SELECT id resultId, collect_set(country) countrySet " - + "FROM ( SELECT id, country " - + "FROM datasource_country " - + "JOIN cfhb " - + " ON cf = dataSourceId " - + "UNION ALL " - + "SELECT id , country " - + "FROM datasource_country " - + "JOIN cfhb " - + " ON hb = dataSourceId ) tmp " - + "GROUP BY id"; - - Dataset potentialUpdates = spark - .sql(query) - .as(Encoders.bean(ResultCountrySet.class)) - .map((MapFunction) r -> { - final ArrayList c = r - .getCountrySet() - .stream() - .limit(100) - .collect(Collectors.toCollection(ArrayList::new)); - r.setCountrySet(c); - return r; - }, Encoders.bean(ResultCountrySet.class)); - // log.info("potential update number : {}", potentialUpdates.count()); - return potentialUpdates; - } - - private static Dataset readAssocDatasourceCountry( - SparkSession spark, String relationPath) { - return spark - .read() - .textFile(relationPath) - .map( - (MapFunction) value -> OBJECT_MAPPER - .readValue(value, DatasourceCountry.class), - Encoders.bean(DatasourceCountry.class)); - } -} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/AutoritativeAuthor.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/AutoritativeAuthor.java index c1644a589..a5fcab360 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/AutoritativeAuthor.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/AutoritativeAuthor.java @@ -2,10 +2,11 @@ package eu.dnetlib.dhp.orcidtoresultfromsemrel; public class AutoritativeAuthor { - String name; - String surname; - String fullname; - String orcid; + + private String name; + private String surname; + private String fullname; + private String orcid; public String getName() { return name; @@ -38,4 +39,5 @@ public class AutoritativeAuthor { public void setOrcid(String orcid) { this.orcid = orcid; } + } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java index 1baec07c5..3e16b4b4b 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java @@ -13,6 +13,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,17 +28,14 @@ import eu.dnetlib.dhp.schema.oaf.Result; public class PrepareResultOrcidAssociationStep1 { private static final Logger log = LoggerFactory.getLogger(PrepareResultOrcidAssociationStep1.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils + String jsonConf = IOUtils .toString( - SparkOrcidToResultFromSemRelJob3.class + PrepareResultOrcidAssociationStep1.class .getResourceAsStream( "/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConf); parser.parseArgument(args); Boolean isSparkSessionManaged = isSparkSessionManaged(parser); @@ -63,6 +61,15 @@ public class PrepareResultOrcidAssociationStep1 { SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + String inputRelationPath = inputPath + "/relation"; + log.info("inputRelationPath: {}", inputRelationPath); + + String inputResultPath = inputPath + "/" + resultType; + log.info("inputResultPath: {}", inputResultPath); + + String outputResultPath = outputPath + "/" + resultType; + log.info("outputResultPath: {}", outputResultPath); + runWithSparkHiveSession( conf, isSparkSessionManaged, @@ -71,39 +78,25 @@ public class PrepareResultOrcidAssociationStep1 { removeOutputDir(spark, outputPath); } prepareInfo( - spark, inputPath, outputPath, resultClazz, resultType, allowedsemrel); + spark, inputRelationPath, inputResultPath, outputResultPath, resultClazz, allowedsemrel); }); } private static void prepareInfo( SparkSession spark, - String inputPath, - String outputPath, + String inputRelationPath, + String inputResultPath, + String outputResultPath, Class resultClazz, - String resultType, List allowedsemrel) { - // read the relation table and the table related to the result it is using - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - org.apache.spark.sql.Dataset relation = spark - .createDataset( - sc - .textFile(inputPath + "/relation") - .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)) - .rdd(), - Encoders.bean(Relation.class)); + Dataset relation = readPath(spark, inputRelationPath, Relation.class); relation.createOrReplaceTempView("relation"); - log.info("Reading Graph table from: {}", inputPath + "/" + resultType); - Dataset result = readPathEntity(spark, inputPath + "/" + resultType, resultClazz); - + log.info("Reading Graph table from: {}", inputResultPath); + Dataset result = readPath(spark, inputResultPath, resultClazz); result.createOrReplaceTempView("result"); - getPossibleResultOrcidAssociation(spark, allowedsemrel, outputPath + "/" + resultType); - } - - private static void getPossibleResultOrcidAssociation( - SparkSession spark, List allowedsemrel, String outputPath) { String query = " select target resultId, author authorList" + " from (select id, collect_set(named_struct('name', name, 'surname', surname, 'fullname', fullname, 'orcid', orcid)) author " + " from ( " @@ -120,18 +113,13 @@ public class PrepareResultOrcidAssociationStep1 { + getConstraintList(" relclass = '", allowedsemrel) + ") rel_rel " + " on source = id"; - spark .sql(query) .as(Encoders.bean(ResultOrcidList.class)) - .toJavaRDD() - .map(r -> OBJECT_MAPPER.writeValueAsString(r)) - .saveAsTextFile(outputPath, GzipCodec.class); - // .toJSON() - // .write() - // .mode(SaveMode.Append) - // .option("compression","gzip") - // .text(outputPath) - // ; + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputResultPath); } + } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java index a8380e8b9..65d8811bc 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java @@ -59,10 +59,10 @@ public class PrepareResultOrcidAssociationStep2 { private static void mergeInfo(SparkSession spark, String inputPath, String outputPath) { - Dataset resultOrcidAssoc = readAssocResultOrcidList(spark, inputPath + "/publication") - .union(readAssocResultOrcidList(spark, inputPath + "/dataset")) - .union(readAssocResultOrcidList(spark, inputPath + "/otherresearchproduct")) - .union(readAssocResultOrcidList(spark, inputPath + "/software")); + Dataset resultOrcidAssoc = readPath(spark, inputPath + "/publication", ResultOrcidList.class) + .union(readPath(spark, inputPath + "/dataset", ResultOrcidList.class)) + .union(readPath(spark, inputPath + "/otherresearchproduct", ResultOrcidList.class)) + .union(readPath(spark, inputPath + "/software", ResultOrcidList.class)); resultOrcidAssoc .toJavaRDD() @@ -77,7 +77,6 @@ public class PrepareResultOrcidAssociationStep2 { } Set orcid_set = new HashSet<>(); a.getAuthorList().stream().forEach(aa -> orcid_set.add(aa.getOrcid())); - b .getAuthorList() .stream() @@ -95,13 +94,4 @@ public class PrepareResultOrcidAssociationStep2 { .saveAsTextFile(outputPath, GzipCodec.class); } - private static Dataset readAssocResultOrcidList( - SparkSession spark, String relationPath) { - return spark - .read() - .textFile(relationPath) - .map( - value -> OBJECT_MAPPER.readValue(value, ResultOrcidList.class), - Encoders.bean(ResultOrcidList.class)); - } } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob3.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java similarity index 81% rename from dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob3.java rename to dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java index 6214e18ca..ebb75a5a6 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob3.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java @@ -6,11 +6,11 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.util.List; import java.util.Optional; -import java.util.function.Consumer; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -25,21 +25,19 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import scala.Tuple2; -public class SparkOrcidToResultFromSemRelJob3 { - private static final Logger log = LoggerFactory.getLogger(SparkOrcidToResultFromSemRelJob3.class); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); +public class SparkOrcidToResultFromSemRelJob { + private static final Logger log = LoggerFactory.getLogger(SparkOrcidToResultFromSemRelJob.class); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( - SparkOrcidToResultFromSemRelJob3.class + SparkOrcidToResultFromSemRelJob.class .getResourceAsStream( "/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); Boolean isSparkSessionManaged = isSparkSessionManaged(parser); @@ -88,9 +86,9 @@ public class SparkOrcidToResultFromSemRelJob3 { Class resultClazz) { // read possible updates (resultId and list of possible orcid to add - Dataset possible_updates = readAssocResultOrcidList(spark, possibleUpdatesPath); + Dataset possible_updates = readPath(spark, possibleUpdatesPath, ResultOrcidList.class); // read the result we have been considering - Dataset result = readPathEntity(spark, inputPath, resultClazz); + Dataset result = readPath(spark, inputPath, resultClazz); // make join result left_outer with possible updates result @@ -98,38 +96,29 @@ public class SparkOrcidToResultFromSemRelJob3 { possible_updates, result.col("id").equalTo(possible_updates.col("resultId")), "left_outer") - .map( - value -> { - R ret = value._1(); - Optional rol = Optional.ofNullable(value._2()); - if (rol.isPresent()) { - List toenrich_author = ret.getAuthor(); - List autoritativeAuthors = rol.get().getAuthorList(); - for (Author author : toenrich_author) { - if (!containsAllowedPid(author)) { - enrichAuthor(author, autoritativeAuthors); - } - } - } - - return ret; - }, - Encoders.bean(resultClazz)) - .toJSON() + .map(authorEnrichFn(), Encoders.bean(resultClazz)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .text(outputPath); + .json(outputPath); } - private static Dataset readAssocResultOrcidList( - SparkSession spark, String relationPath) { - return spark - .read() - .textFile(relationPath) - .map( - value -> OBJECT_MAPPER.readValue(value, ResultOrcidList.class), - Encoders.bean(ResultOrcidList.class)); + private static MapFunction, R> authorEnrichFn() { + return (MapFunction, R>) value -> { + R ret = value._1(); + Optional rol = Optional.ofNullable(value._2()); + if (rol.isPresent()) { + List toenrich_author = ret.getAuthor(); + List autoritativeAuthors = rol.get().getAuthorList(); + for (Author author : toenrich_author) { + if (!containsAllowedPid(author)) { + enrichAuthor(author, autoritativeAuthors); + } + } + } + + return ret; + }; } private static void enrichAuthor(Author a, List au) { diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java index b8579156b..05dcdc692 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java @@ -25,7 +25,6 @@ import eu.dnetlib.dhp.schema.oaf.Relation; public class PrepareProjectResultsAssociation { private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(String[] args) throws Exception { @@ -61,8 +60,6 @@ public class PrepareProjectResultsAssociation { conf, isSparkSessionManaged, spark -> { - // removeOutputDir(spark, potentialUpdatePath); - // removeOutputDir(spark, alreadyLinkedPath); prepareResultProjProjectResults( spark, inputPath, @@ -78,28 +75,21 @@ public class PrepareProjectResultsAssociation { String potentialUpdatePath, String alreadyLinkedPath, List allowedsemrel) { - JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - Dataset relation = spark - .createDataset( - sc - .textFile(inputPath) - .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)) - .rdd(), - Encoders.bean(Relation.class)); + Dataset relation = readPath(spark, inputPath, Relation.class); relation.createOrReplaceTempView("relation"); - String query = "SELECT source, target " + String resproj_relation_query = "SELECT source, target " + " FROM relation " + " WHERE datainfo.deletedbyinference = false " + " AND relClass = '" + RELATION_RESULT_PROJECT_REL_CLASS + "'"; - Dataset resproj_relation = spark.sql(query); + Dataset resproj_relation = spark.sql(resproj_relation_query); resproj_relation.createOrReplaceTempView("resproj_relation"); - query = "SELECT resultId, collect_set(projectId) projectSet " + String potential_update_query = "SELECT resultId, collect_set(projectId) projectSet " + "FROM ( " + "SELECT r1.target resultId, r2.target projectId " + " FROM (SELECT source, target " @@ -111,46 +101,26 @@ public class PrepareProjectResultsAssociation { + " ON r1.source = r2.source " + " ) tmp " + "GROUP BY resultId "; - // query = - // "SELECT projectId, collect_set(resId) resultSet " - // + "FROM (" - // + " SELECT r1.target resId, r2.target projectId " - // + " FROM (SELECT source, target " - // + " FROM relation " - // + " WHERE datainfo.deletedbyinference = false " - // + getConstraintList(" relClass = '", allowedsemrel) - // + ") r1" - // + " JOIN resproj_relation r2 " - // + " ON r1.source = r2.source " - // + " ) tmp " - // + "GROUP BY projectId "; spark - .sql(query) + .sql(potential_update_query) .as(Encoders.bean(ResultProjectSet.class)) - // .toJSON() - // .write() - // .mode(SaveMode.Overwrite) - // .option("compression", "gzip") - // .text(potentialUpdatePath); - .toJavaRDD() - .map(r -> OBJECT_MAPPER.writeValueAsString(r)) - .saveAsTextFile(potentialUpdatePath, GzipCodec.class); + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(potentialUpdatePath); - query = "SELECT source resultId, collect_set(target) projectSet " + String result_projectset_query = "SELECT source resultId, collect_set(target) projectSet " + "FROM resproj_relation " + "GROUP BY source"; spark - .sql(query) + .sql(result_projectset_query) .as(Encoders.bean(ResultProjectSet.class)) - // .toJSON() - // .write() - // .mode(SaveMode.Overwrite) - // .option("compression", "gzip") - // .text(alreadyLinkedPath); - .toJavaRDD() - .map(r -> OBJECT_MAPPER.writeValueAsString(r)) - .saveAsTextFile(alreadyLinkedPath, GzipCodec.class); + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(alreadyLinkedPath); } + } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob.java new file mode 100644 index 000000000..36694b3dd --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob.java @@ -0,0 +1,147 @@ + +package eu.dnetlib.dhp.projecttoresult; + +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.countrypropagation.PrepareDatasourceCountryAssociation; +import eu.dnetlib.dhp.schema.oaf.Relation; +import scala.Tuple2; + +public class SparkResultToProjectThroughSemRelJob { + + private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + SparkResultToProjectThroughSemRelJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); + + final String potentialUpdatePath = parser.get("potentialUpdatePath"); + log.info("potentialUpdatePath {}: ", potentialUpdatePath); + + final String alreadyLinkedPath = parser.get("alreadyLinkedPath"); + log.info("alreadyLinkedPath {}: ", alreadyLinkedPath); + + final Boolean saveGraph = Boolean.valueOf(parser.get("saveGraph")); + log.info("saveGraph: {}", saveGraph); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + if (isTest(parser)) { + removeOutputDir(spark, outputPath); + } + execPropagation( + spark, outputPath, alreadyLinkedPath, potentialUpdatePath, saveGraph); + }); + } + + private static void execPropagation( + SparkSession spark, + String outputPath, + String alreadyLinkedPath, + String potentialUpdatePath, + Boolean saveGraph) { + + Dataset toaddrelations = readPath(spark, potentialUpdatePath, ResultProjectSet.class); + Dataset alreadyLinked = readPath(spark, alreadyLinkedPath, ResultProjectSet.class); + + if (saveGraph) { + toaddrelations + .joinWith( + alreadyLinked, + toaddrelations.col("resultId").equalTo(alreadyLinked.col("resultId")), + "left_outer") + .flatMap(mapRelationRn(), Encoders.bean(Relation.class)) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json(outputPath); + } + } + + private static FlatMapFunction, Relation> mapRelationRn() { + return (FlatMapFunction, Relation>) value -> { + List new_relations = new ArrayList<>(); + ResultProjectSet potential_update = value._1(); + Optional already_linked = Optional.ofNullable(value._2()); + if (already_linked.isPresent()) { + already_linked + .get() + .getProjectSet() + .stream() + .forEach( + (p -> { + if (potential_update + .getProjectSet() + .contains(p)) { + potential_update.getProjectSet().remove(p); + } + })); + } + String resId = potential_update.getResultId(); + potential_update + .getProjectSet() + .stream() + .forEach( + projectId -> { + new_relations + .add( + getRelation( + resId, + projectId, + RELATION_RESULT_PROJECT_REL_CLASS, + RELATION_RESULTPROJECT_REL_TYPE, + RELATION_RESULTPROJECT_SUBREL_TYPE, + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); + new_relations + .add( + getRelation( + projectId, + resId, + RELATION_PROJECT_RESULT_REL_CLASS, + RELATION_RESULTPROJECT_REL_TYPE, + RELATION_RESULTPROJECT_SUBREL_TYPE, + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); + }); + return new_relations.iterator(); + }; + } + +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob3.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob3.java deleted file mode 100644 index 4be072901..000000000 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob3.java +++ /dev/null @@ -1,159 +0,0 @@ - -package eu.dnetlib.dhp.projecttoresult; - -import static eu.dnetlib.dhp.PropagationConstant.*; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.sql.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.countrypropagation.PrepareDatasourceCountryAssociation; -import eu.dnetlib.dhp.schema.oaf.Relation; - -public class SparkResultToProjectThroughSemRelJob3 { - - private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - public static void main(String[] args) throws Exception { - - String jsonConfiguration = IOUtils - .toString( - SparkResultToProjectThroughSemRelJob3.class - .getResourceAsStream( - "/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json")); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - - parser.parseArgument(args); - - Boolean isSparkSessionManaged = isSparkSessionManaged(parser); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath {}: ", outputPath); - - final String potentialUpdatePath = parser.get("potentialUpdatePath"); - log.info("potentialUpdatePath {}: ", potentialUpdatePath); - - final String alreadyLinkedPath = parser.get("alreadyLinkedPath"); - log.info("alreadyLinkedPath {}: ", alreadyLinkedPath); - - final Boolean saveGraph = Boolean.valueOf(parser.get("saveGraph")); - log.info("saveGraph: {}", saveGraph); - - SparkConf conf = new SparkConf(); - - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - if (isTest(parser)) { - removeOutputDir(spark, outputPath); - } - execPropagation( - spark, outputPath, alreadyLinkedPath, potentialUpdatePath, saveGraph); - }); - } - - private static void execPropagation( - SparkSession spark, - String outputPath, - String alreadyLinkedPath, - String potentialUpdatePath, - Boolean saveGraph) { - - Dataset toaddrelations = readAssocResultProjects(spark, potentialUpdatePath); - Dataset alreadyLinked = readAssocResultProjects(spark, alreadyLinkedPath); - - if (saveGraph) { - getNewRelations(alreadyLinked, toaddrelations) - .toJSON() - .write() - .mode(SaveMode.Append) - .option("compression", "gzip") - .text(outputPath); - } - } - - private static Dataset getNewRelations( - Dataset alreadyLinked, Dataset toaddrelations) { - - return toaddrelations - .joinWith( - alreadyLinked, - toaddrelations.col("resultId").equalTo(alreadyLinked.col("resultId")), - "left_outer") - .flatMap( - value -> { - List new_relations = new ArrayList<>(); - ResultProjectSet potential_update = value._1(); - Optional already_linked = Optional.ofNullable(value._2()); - if (already_linked.isPresent()) { - already_linked - .get() - .getProjectSet() - .stream() - .forEach( - (p -> { - if (potential_update - .getProjectSet() - .contains(p)) { - potential_update.getProjectSet().remove(p); - } - })); - } - String resId = potential_update.getResultId(); - potential_update - .getProjectSet() - .stream() - .forEach( - pId -> { - new_relations - .add( - getRelation( - resId, - pId, - RELATION_RESULT_PROJECT_REL_CLASS, - RELATION_RESULTPROJECT_REL_TYPE, - RELATION_RESULTPROJECT_SUBREL_TYPE, - PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); - new_relations - .add( - getRelation( - pId, - resId, - RELATION_PROJECT_RESULT_REL_CLASS, - RELATION_RESULTPROJECT_REL_TYPE, - RELATION_RESULTPROJECT_SUBREL_TYPE, - PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); - }); - return new_relations.iterator(); - }, - Encoders.bean(Relation.class)); - } - - private static Dataset readAssocResultProjects( - SparkSession spark, String potentialUpdatePath) { - return spark - .read() - .textFile(potentialUpdatePath) - .map( - value -> OBJECT_MAPPER.readValue(value, ResultProjectSet.class), - Encoders.bean(ResultProjectSet.class)); - } -} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java index fbe598e89..e2d4d5687 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java @@ -8,6 +8,7 @@ import java.util.*; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,8 +23,6 @@ public class PrepareResultCommunitySet { private static final Logger log = LoggerFactory.getLogger(PrepareResultCommunitySet.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( @@ -32,7 +31,6 @@ public class PrepareResultCommunitySet { "/eu/dnetlib/dhp/resulttocommunityfromorganization/input_preparecommunitytoresult_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); Boolean isSparkSessionManaged = isSparkSessionManaged(parser); @@ -69,7 +67,8 @@ public class PrepareResultCommunitySet { String inputPath, String outputPath, OrganizationMap organizationMap) { - Dataset relation = readRelations(spark, inputPath); + + Dataset relation = readPath(spark, inputPath, Relation.class); relation.createOrReplaceTempView("relation"); String query = "SELECT result_organization.source resultId, result_organization.target orgId, org_set merges " @@ -88,46 +87,44 @@ public class PrepareResultCommunitySet { + " GROUP BY source) organization_organization " + "ON result_organization.target = organization_organization.source "; - org.apache.spark.sql.Dataset result_organizationset = spark + Dataset result_organizationset = spark .sql(query) .as(Encoders.bean(ResultOrganizations.class)); result_organizationset - .map( - value -> { - String rId = value.getResultId(); - Optional> orgs = Optional.ofNullable(value.getMerges()); - String oTarget = value.getOrgId(); - Set communitySet = new HashSet<>(); - if (organizationMap.containsKey(oTarget)) { - communitySet.addAll(organizationMap.get(oTarget)); - } - if (orgs.isPresent()) - // try{ - for (String oId : orgs.get()) { - if (organizationMap.containsKey(oId)) { - communitySet.addAll(organizationMap.get(oId)); - } - } - // }catch(Exception e){ - // - // } - if (communitySet.size() > 0) { - ResultCommunityList rcl = new ResultCommunityList(); - rcl.setResultId(rId); - ArrayList communityList = new ArrayList<>(); - communityList.addAll(communitySet); - rcl.setCommunityList(communityList); - return rcl; - } - return null; - }, - Encoders.bean(ResultCommunityList.class)) - .filter(r -> r != null) - .toJSON() + .map(mapResultCommunityFn(organizationMap), Encoders.bean(ResultCommunityList.class)) + .filter(Objects::nonNull) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .text(outputPath); + .json(outputPath); + } + + private static MapFunction mapResultCommunityFn( + OrganizationMap organizationMap) { + return (MapFunction) value -> { + String rId = value.getResultId(); + Optional> orgs = Optional.ofNullable(value.getMerges()); + String oTarget = value.getOrgId(); + Set communitySet = new HashSet<>(); + if (organizationMap.containsKey(oTarget)) { + communitySet.addAll(organizationMap.get(oTarget)); + } + if (orgs.isPresent()) + for (String oId : orgs.get()) { + if (organizationMap.containsKey(oId)) { + communitySet.addAll(organizationMap.get(oId)); + } + } + if (communitySet.size() > 0) { + ResultCommunityList rcl = new ResultCommunityList(); + rcl.setResultId(rId); + ArrayList communityList = new ArrayList<>(); + communityList.addAll(communitySet); + rcl.setCommunityList(communityList); + return rcl; + } + return null; + }; } } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java similarity index 61% rename from dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob2.java rename to dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java index 7cc3d6d59..71275cc7f 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java @@ -9,6 +9,8 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; @@ -19,17 +21,16 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.*; +import scala.Tuple2; -public class SparkResultToCommunityFromOrganizationJob2 { +public class SparkResultToCommunityFromOrganizationJob { - private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityFromOrganizationJob2.class); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityFromOrganizationJob.class); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( - SparkResultToCommunityFromOrganizationJob2.class + SparkResultToCommunityFromOrganizationJob.class .getResourceAsStream( "/eu/dnetlib/dhp/resulttocommunityfromorganization/input_communitytoresult_parameters.json")); @@ -81,54 +82,56 @@ public class SparkResultToCommunityFromOrganizationJob2 { String outputPath, Class resultClazz, String possibleUpdatesPath) { - org.apache.spark.sql.Dataset possibleUpdates = readResultCommunityList( - spark, possibleUpdatesPath); - org.apache.spark.sql.Dataset result = readPathEntity(spark, inputPath, resultClazz); + + Dataset possibleUpdates = readPath(spark, possibleUpdatesPath, ResultCommunityList.class); + Dataset result = readPath(spark, inputPath, resultClazz); result .joinWith( possibleUpdates, result.col("id").equalTo(possibleUpdates.col("resultId")), "left_outer") - .map( - value -> { - R ret = value._1(); - Optional rcl = Optional.ofNullable(value._2()); - if (rcl.isPresent()) { - ArrayList communitySet = rcl.get().getCommunityList(); - List contextList = ret - .getContext() - .stream() - .map(con -> con.getId()) - .collect(Collectors.toList()); - Result res = new Result(); - res.setId(ret.getId()); - List propagatedContexts = new ArrayList<>(); - for (String cId : communitySet) { - if (!contextList.contains(cId)) { - Context newContext = new Context(); - newContext.setId(cId); - newContext - .setDataInfo( - Arrays - .asList( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME))); - propagatedContexts.add(newContext); - } - } - res.setContext(propagatedContexts); - ret.mergeFrom(res); - } - return ret; - }, - Encoders.bean(resultClazz)) - .toJSON() + .map(resultCommunityFn(), Encoders.bean(resultClazz)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .text(outputPath); + .json(outputPath); } + + private static MapFunction, R> resultCommunityFn() { + return (MapFunction, R>) value -> { + R ret = value._1(); + Optional rcl = Optional.ofNullable(value._2()); + if (rcl.isPresent()) { + ArrayList communitySet = rcl.get().getCommunityList(); + List contextList = ret + .getContext() + .stream() + .map(con -> con.getId()) + .collect(Collectors.toList()); + Result res = new Result(); + res.setId(ret.getId()); + List propagatedContexts = new ArrayList<>(); + for (String cId : communitySet) { + if (!contextList.contains(cId)) { + Context newContext = new Context(); + newContext.setId(cId); + newContext + .setDataInfo( + Arrays + .asList( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME))); + propagatedContexts.add(newContext); + } + } + res.setContext(propagatedContexts); + ret.mergeFrom(res); + } + return ret; + }; + } + } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java index 95fad98d7..4f5ac2552 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java @@ -8,29 +8,56 @@ import java.util.Arrays; import java.util.List; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; -import eu.dnetlib.dhp.QueryInformationSystem; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; public class PrepareResultCommunitySetStep1 { private static final Logger log = LoggerFactory.getLogger(PrepareResultCommunitySetStep1.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String COMMUNITY_LIST_XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')" + + " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri']" + + " and $x//CONFIGURATION/context/param[./@name='status']/text() != 'hidden'" + + " return $x//CONFIGURATION/context/@id/string()"; + + /** + * associates to each result the set of community contexts they are associated to; associates to each target of a + * relation with allowed semantics the set of community context it could possibly inherit from the source of the + * relation + */ + // TODO + private static final String RESULT_CONTEXT_QUERY_TEMPLATE = "select target resultId, community_context " + + "from (select id, collect_set(co.id) community_context " + + " from result " + + " lateral view explode (context) c as co " + + " where datainfo.deletedbyinference = false %s group by id) p " + + " JOIN " + + " (select source, target from relation " + + " where datainfo.deletedbyinference = false %s ) r ON p.id = r.source"; + + /** + * a dataset for example could be linked to more than one publication. For each publication linked to that dataset + * the previous query will produce a row: targetId set of community context the target could possibly inherit with + * the following query there will be a single row for each result linked to more than one result of the result type + * currently being used + */ + // TODO + private static final String RESULT_COMMUNITY_LIST_QUERY = "select resultId , collect_set(co) communityList " + + "from result_context " + + "lateral view explode (community_context) c as co " + + "where length(co) > 0 " + + "group by resultId"; public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils @@ -64,7 +91,7 @@ public class PrepareResultCommunitySetStep1 { final String isLookupUrl = parser.get("isLookUpUrl"); log.info("isLookupUrl: {}", isLookupUrl); - final List communityIdList = QueryInformationSystem.getCommunityList(isLookupUrl); + final List communityIdList = getCommunityList(isLookupUrl); log.info("communityIdList: {}", new Gson().toJson(communityIdList)); final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); @@ -98,78 +125,43 @@ public class PrepareResultCommunitySetStep1 { Class resultClazz, String resultType, List communityIdList) { - // read the relation table and the table related to the result it is using - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - org.apache.spark.sql.Dataset relation = spark - .createDataset( - sc - .textFile(inputPath + "/relation") - .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)) - .rdd(), - Encoders.bean(Relation.class)); + + final String inputResultPath = inputPath + "/" + resultType; + log.info("Reading Graph table from: {}", inputResultPath); + + final String inputRelationPath = inputPath + "/relation"; + log.info("Reading relation table from: {}", inputResultPath); + + Dataset relation = readPath(spark, inputRelationPath, Relation.class); relation.createOrReplaceTempView("relation"); - log.info("Reading Graph table from: {}", inputPath + "/" + resultType); - Dataset result = readPathEntity(spark, inputPath + "/" + resultType, resultClazz); - + Dataset result = readPath(spark, inputResultPath, resultClazz); result.createOrReplaceTempView("result"); - getPossibleResultcommunityAssociation( - spark, allowedsemrel, outputPath + "/" + resultType, communityIdList); - } + final String outputResultPath = outputPath + "/" + resultType; + log.info("writing output results to: {}", outputResultPath); - private static void getPossibleResultcommunityAssociation( - SparkSession spark, - List allowedsemrel, - String outputPath, - List communityIdList) { + String resultContextQuery = String + .format( + RESULT_CONTEXT_QUERY_TEMPLATE, + getConstraintList(" co.id = '", communityIdList), + getConstraintList(" relClass = '", allowedsemrel)); - String communitylist = getConstraintList(" co.id = '", communityIdList); - String semrellist = getConstraintList(" relClass = '", allowedsemrel); - - /* - * associates to each result the set of community contexts they are associated to select id, collect_set(co.id) - * community_context " + " from result " + " lateral view explode (context) c as co " + - * " where datainfo.deletedbyinference = false "+ communitylist + " group by id associates to each target - * of a relation with allowed semantics the set of community context it could possibly inherit from the source - * of the relation - */ - String query = "Select target resultId, community_context " - + "from (select id, collect_set(co.id) community_context " - + " from result " - + " lateral view explode (context) c as co " - + " where datainfo.deletedbyinference = false " - + communitylist - + " group by id) p " - + "JOIN " - + "(select source, target " - + "from relation " - + "where datainfo.deletedbyinference = false " - + semrellist - + ") r " - + "ON p.id = r.source"; - - org.apache.spark.sql.Dataset result_context = spark.sql(query); + Dataset result_context = spark.sql(resultContextQuery); result_context.createOrReplaceTempView("result_context"); - // ( target, (mes, dh-ch-, ni)) - /* - * a dataset for example could be linked to more than one publication. For each publication linked to that - * dataset the previous query will produce a row: targetId set of community context the target could possibly - * inherit with the following query there will be a single row for each result linked to more than one result of - * the result type currently being used - */ - query = "select resultId , collect_set(co) communityList " - + "from result_context " - + "lateral view explode (community_context) c as co " - + "where length(co) > 0 " - + "group by resultId"; - spark - .sql(query) + .sql(RESULT_COMMUNITY_LIST_QUERY) .as(Encoders.bean(ResultCommunityList.class)) - .toJavaRDD() - .map(r -> OBJECT_MAPPER.writeValueAsString(r)) - .saveAsTextFile(outputPath, GzipCodec.class); + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputResultPath); } + + public static List getCommunityList(final String isLookupUrl) throws ISLookUpException { + ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); + return isLookUp.quickSearchProfile(COMMUNITY_LIST_XQUERY); + } + } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java index cbd7e5e50..723aa8960 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java @@ -62,11 +62,11 @@ public class PrepareResultCommunitySetStep2 { private static void mergeInfo(SparkSession spark, String inputPath, String outputPath) { - Dataset resultOrcidAssocCommunityList = readResultCommunityList( - spark, inputPath + "/publication") - .union(readResultCommunityList(spark, inputPath + "/dataset")) - .union(readResultCommunityList(spark, inputPath + "/otherresearchproduct")) - .union(readResultCommunityList(spark, inputPath + "/software")); + Dataset resultOrcidAssocCommunityList = readPath( + spark, inputPath + "/publication", ResultCommunityList.class) + .union(readPath(spark, inputPath + "/dataset", ResultCommunityList.class)) + .union(readPath(spark, inputPath + "/otherresearchproduct", ResultCommunityList.class)) + .union(readPath(spark, inputPath + "/software", ResultCommunityList.class)); resultOrcidAssocCommunityList .toJavaRDD() @@ -80,9 +80,7 @@ public class PrepareResultCommunitySetStep2 { return a; } Set community_set = new HashSet<>(); - a.getCommunityList().stream().forEach(aa -> community_set.add(aa)); - b .getCommunityList() .stream() @@ -100,13 +98,4 @@ public class PrepareResultCommunitySetStep2 { .saveAsTextFile(outputPath, GzipCodec.class); } - private static Dataset readResultCommunityList( - SparkSession spark, String relationPath) { - return spark - .read() - .textFile(relationPath) - .map( - value -> OBJECT_MAPPER.readValue(value, ResultCommunityList.class), - Encoders.bean(ResultCommunityList.class)); - } } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob4.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java similarity index 61% rename from dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob4.java rename to dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java index b513ddd79..0c613d1b4 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob4.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java @@ -9,30 +9,28 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.ximpleware.extended.xpath.parser; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; import eu.dnetlib.dhp.schema.oaf.*; +import scala.Tuple2; -public class SparkResultToCommunityThroughSemRelJob4 { +public class SparkResultToCommunityThroughSemRelJob { - private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityThroughSemRelJob4.class); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityThroughSemRelJob.class); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( - SparkResultToCommunityThroughSemRelJob4.class + SparkResultToCommunityThroughSemRelJob.class .getResourceAsStream( "/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_communitytoresult_parameters.json")); @@ -87,58 +85,59 @@ public class SparkResultToCommunityThroughSemRelJob4 { String preparedInfoPath, Class resultClazz) { - org.apache.spark.sql.Dataset possibleUpdates = readResultCommunityList( - spark, preparedInfoPath); - org.apache.spark.sql.Dataset result = readPathEntity(spark, inputPath, resultClazz); + Dataset possibleUpdates = readPath(spark, preparedInfoPath, ResultCommunityList.class); + Dataset result = readPath(spark, inputPath, resultClazz); result .joinWith( possibleUpdates, result.col("id").equalTo(possibleUpdates.col("resultId")), "left_outer") - .map( - value -> { - R ret = value._1(); - Optional rcl = Optional.ofNullable(value._2()); - if (rcl.isPresent()) { - Set context_set = new HashSet<>(); - ret.getContext().stream().forEach(c -> context_set.add(c.getId())); - List contextList = rcl - .get() - .getCommunityList() - .stream() - .map( - c -> { - if (!context_set.contains(c)) { - Context newContext = new Context(); - newContext.setId(c); - newContext - .setDataInfo( - Arrays - .asList( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME))); - return newContext; - } - return null; - }) - .filter(c -> c != null) - .collect(Collectors.toList()); - Result r = new Result(); - r.setId(ret.getId()); - r.setContext(contextList); - ret.mergeFrom(r); - } - - return ret; - }, - Encoders.bean(resultClazz)) - .toJSON() + .map(contextUpdaterFn(), Encoders.bean(resultClazz)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .text(outputPath); + .json(outputPath); } + + private static MapFunction, R> contextUpdaterFn() { + return (MapFunction, R>) value -> { + R ret = value._1(); + Optional rcl = Optional.ofNullable(value._2()); + if (rcl.isPresent()) { + Set context_set = new HashSet<>(); + ret.getContext().stream().forEach(c -> context_set.add(c.getId())); + List contextList = rcl + .get() + .getCommunityList() + .stream() + .map( + c -> { + if (!context_set.contains(c)) { + Context newContext = new Context(); + newContext.setId(c); + newContext + .setDataInfo( + Arrays + .asList( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME))); + return newContext; + } + return null; + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + Result r = new Result(); + r.setId(ret.getId()); + r.setContext(contextList); + ret.mergeFrom(r); + } + + return ret; + }; + } + } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java index 02faf0086..f8fe1668f 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java @@ -7,7 +7,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; @@ -58,8 +58,7 @@ public class PrepareResultInstRepoAssociation { isSparkSessionManaged, spark -> { readNeededResources(spark, inputPath); - prepareDatasourceOrganizationAssociations( - spark, datasourceOrganizationPath, alreadyLinkedPath); + prepareDatasourceOrganization(spark, datasourceOrganizationPath); prepareAlreadyLinkedAssociation(spark, alreadyLinkedPath); }); } @@ -77,45 +76,25 @@ public class PrepareResultInstRepoAssociation { spark .sql(query) .as(Encoders.bean(ResultOrganizationSet.class)) + // TODO retry to stick with datasets .toJavaRDD() .map(r -> OBJECT_MAPPER.writeValueAsString(r)) .saveAsTextFile(alreadyLinkedPath, GzipCodec.class); } private static void readNeededResources(SparkSession spark, String inputPath) { - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - - org.apache.spark.sql.Dataset datasource = spark - .createDataset( - sc - .textFile(inputPath + "/datasource") - .map(item -> new ObjectMapper().readValue(item, Datasource.class)) - .rdd(), - Encoders.bean(Datasource.class)); - - org.apache.spark.sql.Dataset relation = spark - .createDataset( - sc - .textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)) - .rdd(), - Encoders.bean(Relation.class)); - - org.apache.spark.sql.Dataset organization = spark - .createDataset( - sc - .textFile(inputPath + "/organization") - .map(item -> new ObjectMapper().readValue(item, Organization.class)) - .rdd(), - Encoders.bean(Organization.class)); - + Dataset datasource = readPath(spark, inputPath + "/datasource", Datasource.class); datasource.createOrReplaceTempView("datasource"); + + Dataset relation = readPath(spark, inputPath + "/relation", Relation.class); relation.createOrReplaceTempView("relation"); + + Dataset organization = readPath(spark, inputPath + "/organization", Organization.class); organization.createOrReplaceTempView("organization"); } - private static void prepareDatasourceOrganizationAssociations( - SparkSession spark, String datasourceOrganizationPath, String alreadyLinkedPath) { + private static void prepareDatasourceOrganization( + SparkSession spark, String datasourceOrganizationPath) { String query = "SELECT source datasourceId, target organizationId " + "FROM ( SELECT id " @@ -135,10 +114,9 @@ public class PrepareResultInstRepoAssociation { spark .sql(query) .as(Encoders.bean(DatasourceOrganization.class)) - .toJSON() .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .text(datasourceOrganizationPath); + .json(datasourceOrganizationPath); } } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java new file mode 100644 index 000000000..86634d43f --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java @@ -0,0 +1,193 @@ + +package eu.dnetlib.dhp.resulttoorganizationfrominstrepo; + +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +import java.util.*; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.*; +import scala.Tuple2; + +public class SparkResultToOrganizationFromIstRepoJob { + + private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromIstRepoJob.class); + + private static final String RESULT_ORGANIZATIONSET_QUERY = "SELECT id resultId, collect_set(organizationId) organizationSet " + + "FROM ( SELECT id, organizationId " + + "FROM rels " + + "JOIN cfhb " + + " ON cf = datasourceId " + + "UNION ALL " + + "SELECT id , organizationId " + + "FROM rels " + + "JOIN cfhb " + + " ON hb = datasourceId ) tmp " + + "GROUP BY id"; + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + SparkResultToOrganizationFromIstRepoJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_propagationresulaffiliationfrominstrepo_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String datasourceorganization = parser.get("datasourceOrganizationPath"); + log.info("datasourceOrganizationPath: {}", datasourceorganization); + + final String alreadylinked = parser.get("alreadyLinkedPath"); + log.info("alreadyLinkedPath: {}", alreadylinked); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + final Boolean saveGraph = Optional + .ofNullable(parser.get("saveGraph")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("saveGraph: {}", saveGraph); + + Class resultClazz = (Class) Class.forName(resultClassName); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + if (isTest(parser)) { + removeOutputDir(spark, outputPath); + } + if (saveGraph) + execPropagation( + spark, + datasourceorganization, + alreadylinked, + inputPath, + outputPath, + resultClazz); + }); + } + + private static void execPropagation( + SparkSession spark, + String datasourceorganization, + String alreadyLinkedPath, + String inputPath, + String outputPath, + Class clazz) { + + Dataset ds_org = readPath(spark, datasourceorganization, DatasourceOrganization.class); + + Dataset potentialUpdates = getPotentialRelations(spark, inputPath, clazz, ds_org); + + Dataset alreadyLinked = readPath(spark, alreadyLinkedPath, ResultOrganizationSet.class); + + potentialUpdates + .joinWith( + alreadyLinked, + potentialUpdates.col("resultId").equalTo(alreadyLinked.col("resultId")), + "left_outer") + .flatMap(createRelationFn(), Encoders.bean(Relation.class)) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json(outputPath); + } + + private static FlatMapFunction, Relation> createRelationFn() { + return (FlatMapFunction, Relation>) value -> { + List new_relations = new ArrayList<>(); + ResultOrganizationSet potential_update = value._1(); + Optional already_linked = Optional.ofNullable(value._2()); + List organization_list = potential_update.getOrganizationSet(); + if (already_linked.isPresent()) { + already_linked + .get() + .getOrganizationSet() + .stream() + .forEach( + rId -> { + if (organization_list.contains(rId)) { + organization_list.remove(rId); + } + }); + } + String resultId = potential_update.getResultId(); + organization_list + .stream() + .forEach( + orgId -> { + new_relations + .add( + getRelation( + orgId, + resultId, + RELATION_ORGANIZATION_RESULT_REL_CLASS, + RELATION_RESULTORGANIZATION_REL_TYPE, + RELATION_RESULTORGANIZATION_SUBREL_TYPE, + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); + new_relations + .add( + getRelation( + resultId, + orgId, + RELATION_RESULT_ORGANIZATION_REL_CLASS, + RELATION_RESULTORGANIZATION_REL_TYPE, + RELATION_RESULTORGANIZATION_SUBREL_TYPE, + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); + }); + return new_relations.iterator(); + }; + } + + private static Dataset getPotentialRelations( + SparkSession spark, + String inputPath, + Class resultClazz, + Dataset ds_org) { + + Dataset result = readPath(spark, inputPath, resultClazz); + result.createOrReplaceTempView("result"); + createCfHbforResult(spark); + + ds_org.createOrReplaceTempView("rels"); + + return spark + .sql(RESULT_ORGANIZATIONSET_QUERY) + .as(Encoders.bean(ResultOrganizationSet.class)); + } + +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java deleted file mode 100644 index 72c75e8a6..000000000 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java +++ /dev/null @@ -1,232 +0,0 @@ - -package eu.dnetlib.dhp.resulttoorganizationfrominstrepo; - -import static eu.dnetlib.dhp.PropagationConstant.*; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; - -import java.util.*; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.broadcast.Broadcast; -import org.apache.spark.sql.*; -import org.apache.spark.sql.Dataset; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.*; -import scala.Tuple2; - -public class SparkResultToOrganizationFromIstRepoJob2 { - - private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromIstRepoJob2.class); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - public static void main(String[] args) throws Exception { - - String jsonConfiguration = IOUtils - .toString( - SparkResultToOrganizationFromIstRepoJob2.class - .getResourceAsStream( - "/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_propagationresulaffiliationfrominstrepo_parameters.json")); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - - parser.parseArgument(args); - - Boolean isSparkSessionManaged = isSparkSessionManaged(parser); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - final String datasourceorganization = parser.get("datasourceOrganizationPath"); - log.info("datasourceOrganizationPath: {}", datasourceorganization); - - final String alreadylinked = parser.get("alreadyLinkedPath"); - log.info("alreadyLinkedPath: {}", alreadylinked); - - final String resultClassName = parser.get("resultTableName"); - log.info("resultTableName: {}", resultClassName); - - final Boolean saveGraph = Optional - .ofNullable(parser.get("saveGraph")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("saveGraph: {}", saveGraph); - - Class resultClazz = (Class) Class.forName(resultClassName); - - SparkConf conf = new SparkConf(); - conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - - runWithSparkHiveSession( - conf, - isSparkSessionManaged, - spark -> { - if (isTest(parser)) { - removeOutputDir(spark, outputPath); - } - if (saveGraph) - execPropagation( - spark, - datasourceorganization, - alreadylinked, - inputPath, - outputPath, - resultClazz); - }); - } - - private static void execPropagation( - SparkSession spark, - String datasourceorganization, - String alreadylinked, - String inputPath, - String outputPath, - Class resultClazz) { - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - - org.apache.spark.sql.Dataset datasourceorganizationassoc = readAssocDatasourceOrganization( - spark, datasourceorganization); - - // broadcasting the result of the preparation step - Broadcast> broadcast_datasourceorganizationassoc = sc - .broadcast(datasourceorganizationassoc); - - org.apache.spark.sql.Dataset potentialUpdates = getPotentialRelations( - spark, - inputPath, - resultClazz, - broadcast_datasourceorganizationassoc) - .as(Encoders.bean(ResultOrganizationSet.class)); - - getNewRelations( - spark - .read() - .textFile(alreadylinked) - .map( - value -> OBJECT_MAPPER - .readValue( - value, ResultOrganizationSet.class), - Encoders.bean(ResultOrganizationSet.class)), - potentialUpdates) - .toJSON() - .write() - .mode(SaveMode.Append) - .option("compression", "gzip") - .text(outputPath); - - } - - private static Dataset getNewRelations( - Dataset alreadyLinked, - Dataset potentialUpdates) { - - return potentialUpdates - .joinWith( - alreadyLinked, - potentialUpdates.col("resultId").equalTo(alreadyLinked.col("resultId")), - "left_outer") - .flatMap( - (FlatMapFunction, Relation>) value -> { - List new_relations = new ArrayList<>(); - ResultOrganizationSet potential_update = value._1(); - Optional already_linked = Optional.ofNullable(value._2()); - List organization_list = potential_update.getOrganizationSet(); - if (already_linked.isPresent()) { - already_linked - .get() - .getOrganizationSet() - .stream() - .forEach( - rId -> { - if (organization_list.contains(rId)) { - organization_list.remove(rId); - } - }); - } - String resultId = potential_update.getResultId(); - organization_list - .stream() - .forEach( - orgId -> { - new_relations - .add( - getRelation( - orgId, - resultId, - RELATION_ORGANIZATION_RESULT_REL_CLASS, - RELATION_RESULTORGANIZATION_REL_TYPE, - RELATION_RESULTORGANIZATION_SUBREL_TYPE, - PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, - PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); - new_relations - .add( - getRelation( - resultId, - orgId, - RELATION_RESULT_ORGANIZATION_REL_CLASS, - RELATION_RESULTORGANIZATION_REL_TYPE, - RELATION_RESULTORGANIZATION_SUBREL_TYPE, - PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, - PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); - }); - return new_relations.iterator(); - }, - Encoders.bean(Relation.class)); - } - - private static org.apache.spark.sql.Dataset getPotentialRelations( - SparkSession spark, - String inputPath, - Class resultClazz, - Broadcast> broadcast_datasourceorganizationassoc) { - org.apache.spark.sql.Dataset result = readPathEntity(spark, inputPath, resultClazz); - result.createOrReplaceTempView("result"); - createCfHbforresult(spark); - - return organizationPropagationAssoc(spark, broadcast_datasourceorganizationassoc); - } - - private static org.apache.spark.sql.Dataset readAssocDatasourceOrganization( - SparkSession spark, String datasourcecountryorganization) { - return spark - .read() - .textFile(datasourcecountryorganization) - .map( - value -> OBJECT_MAPPER.readValue(value, DatasourceOrganization.class), - Encoders.bean(DatasourceOrganization.class)); - } - - private static org.apache.spark.sql.Dataset organizationPropagationAssoc( - SparkSession spark, - Broadcast> broadcast_datasourceorganizationassoc) { - org.apache.spark.sql.Dataset datasourceorganization = broadcast_datasourceorganizationassoc - .value(); - datasourceorganization.createOrReplaceTempView("rels"); - String query = "SELECT id resultId, collect_set(organizationId) organizationSet " - + "FROM ( SELECT id, organizationId " - + "FROM rels " - + "JOIN cfhb " - + " ON cf = datasourceId " - + "UNION ALL " - + "SELECT id , organizationId " - + "FROM rels " - + "JOIN cfhb " - + " ON hb = datasourceId ) tmp " - + "GROUP BY id"; - return spark.sql(query).as(Encoders.bean(ResultOrganizationSet.class)); - } -} diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_prepareresultcountry_parameters.json b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_prepareresultcountry_parameters.json index 9956f3474..5efa3dbd6 100644 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_prepareresultcountry_parameters.json +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_prepareresultcountry_parameters.json @@ -5,6 +5,12 @@ "paramDescription": "the path of the sequencial file to read", "paramRequired": true }, + { + "paramName":"out", + "paramLongName":"outputPath", + "paramDescription": "the output path", + "paramRequired": true + }, { "paramName":"h", "paramLongName":"hive_metastore_uris", diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml index ac0fff2c0..90b004883 100644 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml @@ -19,27 +19,22 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + - - - - - - - - - - - - - + + + + + + + + @@ -50,11 +45,8 @@ - - - - + ${jobTracker} @@ -98,50 +90,6 @@ - - - ${jobTracker} - ${nameNode} - ${nameNode}/${sourcePath}/publication - ${nameNode}/${workingDir}/publication - - - - - - - - ${jobTracker} - ${nameNode} - ${nameNode}/${sourcePath}/dataset - ${nameNode}/${workingDir}/dataset - - - - - - - - ${jobTracker} - ${nameNode} - ${nameNode}/${sourcePath}/otherresearchproduct - ${nameNode}/${workingDir}/otherresearchproduct - - - - - - - - ${jobTracker} - ${nameNode} - ${nameNode}/${sourcePath}/software - ${nameNode}/${workingDir}/software - - - - - @@ -159,7 +107,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=300 + --conf spark.sql.shuffle.partitions=3840 --sourcePath${sourcePath} --whitelist${whitelist} @@ -198,7 +146,8 @@ --conf spark.hadoop.mapreduce.reduce.speculative=false --conf spark.sql.shuffle.partitions=3840 - --sourcePath${workingDir}/publication + --sourcePath${sourcePath}/publication + --outputPath${workingDir}/publication --hive_metastore_uris${hive_metastore_uris} --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication --preparedInfoPath${workingDir}/preparedInfo @@ -227,7 +176,8 @@ --conf spark.hadoop.mapreduce.reduce.speculative=false --conf spark.sql.shuffle.partitions=3840 - --sourcePath${workingDir}/dataset + --sourcePath${sourcePath}/dataset + --outputPath${workingDir}/dataset --hive_metastore_uris${hive_metastore_uris} --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset --preparedInfoPath${workingDir}/preparedInfo @@ -256,7 +206,8 @@ --conf spark.hadoop.mapreduce.reduce.speculative=false --conf spark.sql.shuffle.partitions=3840 - --sourcePath${workingDir}/otherresearchproduct + --sourcePath${sourcePath}/otherresearchproduct + --outputPath${workingDir}/otherresearchproduct --hive_metastore_uris${hive_metastore_uris} --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --preparedInfoPath${workingDir}/preparedInfo @@ -285,7 +236,8 @@ --conf spark.hadoop.mapreduce.reduce.speculative=false --conf spark.sql.shuffle.partitions=3840 - --sourcePath${workingDir}/software + --sourcePath${sourcePath}/software + --outputPath${workingDir}/software --hive_metastore_uris${hive_metastore_uris} --resultTableNameeu.dnetlib.dhp.schema.oaf.Software --preparedInfoPath${workingDir}/preparedInfo @@ -308,7 +260,7 @@ yarn cluster countryPropagationForPublications - eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob3 + eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -323,7 +275,8 @@ --conf spark.hadoop.mapreduce.reduce.speculative=false --conf spark.sql.shuffle.partitions=3840 - --sourcePath${workingDir}/publication + --sourcePath${sourcePath}/publication + --preparedInfoPath${workingDir}/publication --saveGraph${saveGraph} --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication --outputPath${outputPath}/publication @@ -337,7 +290,7 @@ yarn cluster countryPropagationForDataset - eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob3 + eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -352,7 +305,8 @@ --conf spark.hadoop.mapreduce.reduce.speculative=false --conf spark.sql.shuffle.partitions=3840 - --sourcePath${workingDir}/dataset + --sourcePath${sourcePath}/dataset + --preparedInfoPath${workingDir}/dataset --saveGraph${saveGraph} --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset --outputPath${outputPath}/dataset @@ -366,7 +320,7 @@ yarn cluster countryPropagationForORP - eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob3 + eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -381,7 +335,8 @@ --conf spark.hadoop.mapreduce.reduce.speculative=false --conf spark.sql.shuffle.partitions=3840 - --sourcePath${workingDir}/otherresearchproduct + --sourcePath${sourcePath}/otherresearchproduct + --preparedInfoPath${workingDir}/otherresearchproduct --saveGraph${saveGraph} --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --outputPath${outputPath}/otherresearchproduct @@ -395,7 +350,7 @@ yarn cluster countryPropagationForSoftware - eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob3 + eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -410,7 +365,8 @@ --conf spark.hadoop.mapreduce.reduce.speculative=false --conf spark.sql.shuffle.partitions=3840 - --sourcePath${workingDir}/software + --sourcePath${sourcePath}/software + --preparedInfoPath${workingDir}/software --saveGraph${saveGraph} --resultTableNameeu.dnetlib.dhp.schema.oaf.Software --outputPath${outputPath}/software diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml index 4c4b74b52..243167bd6 100644 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml @@ -253,7 +253,7 @@ yarn cluster ORCIDPropagation-Publication - eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob3 + eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -285,7 +285,7 @@ yarn cluster ORCIDPropagation-Dataset - eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob3 + eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -316,7 +316,7 @@ yarn cluster ORCIDPropagation-ORP - eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob3 + eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -347,7 +347,7 @@ yarn cluster ORCIDPropagation-Software - eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob3 + eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml index 72ced0905..850a2f498 100644 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml @@ -166,7 +166,7 @@ yarn cluster ProjectToResultPropagation - eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob3 + eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/oozie_app/workflow.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/oozie_app/workflow.xml index bf200e242..e041fc39c 100644 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/oozie_app/workflow.xml @@ -127,7 +127,7 @@ yarn cluster community2resultfromorganization-Publication - eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob2 + eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -155,7 +155,7 @@ yarn cluster community2resultfromorganization-Dataset - eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob2 + eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -183,7 +183,7 @@ yarn cluster community2resultfromorganization-ORP - eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob2 + eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -211,7 +211,7 @@ yarn cluster community2resultfromorganization-Software - eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob2 + eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromsemrel/oozie_app/workflow.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromsemrel/oozie_app/workflow.xml index 977e4838c..f2d406ad9 100644 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromsemrel/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromsemrel/oozie_app/workflow.xml @@ -252,7 +252,7 @@ yarn cluster Result2CommunitySemRelPropagation-Publication - eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob4 + eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -280,7 +280,7 @@ yarn cluster Result2CommunitySemRelPropagation-Dataset - eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob4 + eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -308,7 +308,7 @@ yarn cluster Result2CommunitySemRelPropagation-ORP - eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob4 + eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -336,7 +336,7 @@ yarn cluster Result2CommunitySemRelPropagation-Software - eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob4 + eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/oozie_app/workflow.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/oozie_app/workflow.xml index 7e124f843..6bf7d0cec 100644 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/oozie_app/workflow.xml @@ -166,7 +166,7 @@ yarn cluster resultToOrganizationFromInstRepoPropagationForPublications - eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob2 + eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -196,7 +196,7 @@ yarn cluster resultToOrganizationFromInstRepoPropagationForDataset - eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob2 + eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -225,7 +225,7 @@ yarn cluster resultToOrganizationFromInstRepoPropagationForORP - eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob2 + eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -255,7 +255,7 @@ yarn cluster resultToOrganizationFromInstRepoPropagationForSoftware - eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob2 + eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} diff --git a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/countrypropagation/CountryPropagationJobTest.java b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/countrypropagation/CountryPropagationJobTest.java index 6c66606e1..10ff48cae 100644 --- a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/countrypropagation/CountryPropagationJobTest.java +++ b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/countrypropagation/CountryPropagationJobTest.java @@ -66,30 +66,25 @@ public class CountryPropagationJobTest { @Test public void testCountryPropagationSoftware() throws Exception { - SparkCountryPropagationJob2 + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/countrypropagation/sample/software") + .getPath(); + final String preparedInfoPath = getClass() + .getResource("/eu/dnetlib/dhp/countrypropagation/preparedInfo") + .getPath(); + SparkCountryPropagationJob .main( new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-sourcePath", - getClass() - .getResource("/eu/dnetlib/dhp/countrypropagation/sample/software") - .getPath(), - "-hive_metastore_uris", - "", - "-saveGraph", - "true", - "-resultTableName", - "eu.dnetlib.dhp.schema.oaf.Software", - "-outputPath", - workingDir.toString() + "/software", - "-preparedInfoPath", - getClass() - .getResource("/eu/dnetlib/dhp/countrypropagation/preparedInfo") - .getPath(), + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", sourcePath, + "--hive_metastore_uris", "", + "-saveGraph", "true", + "-resultTableName", Software.class.getCanonicalName(), + "-outputPath", workingDir.toString() + "/software", + "-preparedInfoPath", preparedInfoPath }); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc .textFile(workingDir.toString() + "/software") diff --git a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/OrcidPropagationJobTest.java b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/OrcidPropagationJobTest.java index d18acd550..0b0ec62d1 100644 --- a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/OrcidPropagationJobTest.java +++ b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/OrcidPropagationJobTest.java @@ -65,33 +65,27 @@ public class OrcidPropagationJobTest { @Test public void noUpdateTest() throws Exception { - SparkOrcidToResultFromSemRelJob3 + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/noupdate") + .getPath(); + final String possibleUpdatesPath = getClass() + .getResource( + "/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/mergedOrcidAssoc") + .getPath(); + SparkOrcidToResultFromSemRelJob .main( new String[] { - "-isTest", - Boolean.TRUE.toString(), - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-sourcePath", - getClass() - .getResource("/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/noupdate") - .getPath(), - "-hive_metastore_uris", - "", - "-saveGraph", - "true", - "-resultTableName", - "eu.dnetlib.dhp.schema.oaf.Dataset", - "-outputPath", - workingDir.toString() + "/dataset", - "-possibleUpdatesPath", - getClass() - .getResource( - "/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/mergedOrcidAssoc") - .getPath() + "-isTest", Boolean.TRUE.toString(), + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", sourcePath, + "-hive_metastore_uris", "", + "-saveGraph", "true", + "-resultTableName", Dataset.class.getCanonicalName(), + "-outputPath", workingDir.toString() + "/dataset", + "-possibleUpdatesPath", possibleUpdatesPath }); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc .textFile(workingDir.toString() + "/dataset") @@ -117,7 +111,7 @@ public class OrcidPropagationJobTest { @Test public void oneUpdateTest() throws Exception { - SparkOrcidToResultFromSemRelJob3 + SparkOrcidToResultFromSemRelJob .main( new String[] { "-isTest", @@ -182,7 +176,7 @@ public class OrcidPropagationJobTest { @Test public void twoUpdatesTest() throws Exception { - SparkOrcidToResultFromSemRelJob3 + SparkOrcidToResultFromSemRelJob .main( new String[] { "-isTest", diff --git a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/projecttoresult/ProjectPropagationJobTest.java b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/projecttoresult/ProjectPropagationJobTest.java index ac28e9d4b..7ed26b6b2 100644 --- a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/projecttoresult/ProjectPropagationJobTest.java +++ b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/projecttoresult/ProjectPropagationJobTest.java @@ -72,7 +72,7 @@ public class ProjectPropagationJobTest { @Test public void NoUpdateTest() throws Exception { - SparkResultToProjectThroughSemRelJob3 + SparkResultToProjectThroughSemRelJob .main( new String[] { "-isTest", @@ -115,7 +115,7 @@ public class ProjectPropagationJobTest { */ @Test public void UpdateTenTest() throws Exception { - SparkResultToProjectThroughSemRelJob3 + SparkResultToProjectThroughSemRelJob .main( new String[] { "-isTest", @@ -194,7 +194,7 @@ public class ProjectPropagationJobTest { */ @Test public void UpdateMixTest() throws Exception { - SparkResultToProjectThroughSemRelJob3 + SparkResultToProjectThroughSemRelJob .main( new String[] { "-isTest", diff --git a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttocommunityfromorganization/ResultToCommunityJobTest.java b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttocommunityfromorganization/ResultToCommunityJobTest.java index 0dd8c6bd4..ba8fb0831 100644 --- a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttocommunityfromorganization/ResultToCommunityJobTest.java +++ b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttocommunityfromorganization/ResultToCommunityJobTest.java @@ -67,8 +67,8 @@ public class ResultToCommunityJobTest { } @Test - public void test1() throws Exception { - SparkResultToCommunityFromOrganizationJob2 + public void testSparkResultToCommunityFromOrganizationJob() throws Exception { + SparkResultToCommunityFromOrganizationJob .main( new String[] { "-isTest", diff --git a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/ResultToCommunityJobTest.java b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/ResultToCommunityJobTest.java index e0ee12be6..13941b4a3 100644 --- a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/ResultToCommunityJobTest.java +++ b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/ResultToCommunityJobTest.java @@ -78,7 +78,7 @@ public class ResultToCommunityJobTest { @Test public void test1() throws Exception { - SparkResultToCommunityThroughSemRelJob4 + SparkResultToCommunityThroughSemRelJob .main( new String[] { "-isTest", Boolean.TRUE.toString(), diff --git a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/Result2OrganizationJobTest.java b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/Result2OrganizationJobTest.java index 447cf47b0..e7adb260e 100644 --- a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/Result2OrganizationJobTest.java +++ b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/Result2OrganizationJobTest.java @@ -39,11 +39,11 @@ public class Result2OrganizationJobTest { public static void beforeAll() throws IOException { workingDir = Files .createTempDirectory( - SparkResultToOrganizationFromIstRepoJob2.class.getSimpleName()); + SparkResultToOrganizationFromIstRepoJob.class.getSimpleName()); log.info("using work dir {}", workingDir); SparkConf conf = new SparkConf(); - conf.setAppName(SparkResultToOrganizationFromIstRepoJob2.class.getSimpleName()); + conf.setAppName(SparkResultToOrganizationFromIstRepoJob.class.getSimpleName()); conf.setMaster("local[*]"); conf.set("spark.driver.host", "localhost"); @@ -54,7 +54,7 @@ public class Result2OrganizationJobTest { spark = SparkSession .builder() - .appName(SparkResultToOrganizationFromIstRepoJob2.class.getSimpleName()) + .appName(SparkResultToOrganizationFromIstRepoJob.class.getSimpleName()) .config(conf) .getOrCreate(); } @@ -72,7 +72,7 @@ public class Result2OrganizationJobTest { */ @Test public void NoUpdateTest() throws Exception { - SparkResultToOrganizationFromIstRepoJob2 + SparkResultToOrganizationFromIstRepoJob .main( new String[] { "-isTest", @@ -123,7 +123,7 @@ public class Result2OrganizationJobTest { */ @Test public void UpdateNoMixTest() throws Exception { - SparkResultToOrganizationFromIstRepoJob2 + SparkResultToOrganizationFromIstRepoJob .main( new String[] { "-isTest", @@ -197,7 +197,7 @@ public class Result2OrganizationJobTest { @Test public void UpdateMixTest() throws Exception { - SparkResultToOrganizationFromIstRepoJob2 + SparkResultToOrganizationFromIstRepoJob .main( new String[] { "-isTest",