diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index 0261e3887..3fbba47ba 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -6,6 +6,8 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.Dataset; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -14,6 +16,7 @@ import org.apache.spark.sql.*; import scala.Tuple2; import java.io.File; +import java.io.IOException; import java.util.*; import static eu.dnetlib.dhp.PropagationConstant.*; @@ -34,45 +37,43 @@ public class SparkCountryPropagationJob { final String inputPath = parser.get("sourcePath"); final String outputPath = "/tmp/provision/propagation/countrytoresultfrominstitutionalrepositories"; - File directory = new File(outputPath); + createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - if(!directory.exists()){ - directory.mkdirs(); - } List whitelist = Arrays.asList(parser.get("whitelist").split(";")); List allowedtypes = Arrays.asList(parser.get("allowedtypes").split(";")); - JavaPairRDD organizations = sc.sequenceFile(inputPath + "/organization", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Organization.class)) + JavaPairRDD organizations = sc.textFile(inputPath + "/organization") + .map(item -> new ObjectMapper().readValue(item, Organization.class)) .filter(org -> !org.getDataInfo().getDeletedbyinference()) .map(org -> new TypedRow().setSourceId(org.getId()).setValue(org.getCountry().getClassid())) .mapToPair(toPair()); - JavaPairRDD organization_datasource = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)) + JavaPairRDD organization_datasource = + sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)) .filter(r -> !r.getDataInfo().getDeletedbyinference()) .filter(r -> RELATION_DATASOURCEORGANIZATION_REL_TYPE.equals(r.getRelClass()) && RELATION_ORGANIZATION_DATASOURCE_REL_CLASS.equals(r.getRelType())) .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) .mapToPair(toPair()); //id is the organization identifier - JavaPairRDD datasources = sc.sequenceFile(inputPath + "/datasource", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Datasource.class)) + JavaPairRDD datasources = sc.textFile(inputPath + "/datasource") + .map(item -> new ObjectMapper().readValue(item, Datasource.class)) .filter(ds -> whitelist.contains(ds.getId()) || allowedtypes.contains(ds.getDatasourcetype().getClassid())) .map(ds -> new TypedRow().setSourceId(ds.getId())) .mapToPair(toPair()); - JavaRDD publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)); - JavaRDD datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)); - JavaRDD software = sc.sequenceFile(inputPath + "/software", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class)); - JavaRDD other = sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)); + JavaRDD publications = sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, Publication.class)); + JavaRDD datasets = sc.textFile(inputPath + "/dataset") + .map(item -> new ObjectMapper().readValue(item, Dataset.class)); + JavaRDD software = sc.textFile(inputPath + "/software") + .map(item -> new ObjectMapper().readValue(item, Software.class)); + JavaRDD other = sc.textFile(inputPath + "/otherresearchproduct") + .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)); JavaPairRDD datasource_results = publications .map(oaf -> getTypedRowsDatasourceResult(oaf)) @@ -147,6 +148,8 @@ public class SparkCountryPropagationJob { } + + private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { results.leftOuterJoin(toupdateresult) .map(c -> { @@ -177,33 +180,6 @@ public class SparkCountryPropagationJob { - private static JavaPairRDD getResults(JavaSparkContext sc , String inputPath){ - - return - sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)) - .filter(ds -> !ds.getDataInfo().getDeletedbyinference()) - .map(oaf -> new TypedRow().setType("dataset").setSourceId(oaf.getId())) - .mapToPair(toPair()) - .union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)) - .filter(o -> !o.getDataInfo().getDeletedbyinference()) - .map(oaf -> new TypedRow().setType("otherresearchproduct").setSourceId(oaf.getId())) - .mapToPair(toPair())) - .union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class)) - .filter(s -> !s.getDataInfo().getDeletedbyinference()) - .map(oaf -> new TypedRow().setType("software").setSourceId(oaf.getId())) - .mapToPair(toPair())) - .union(sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)) - .filter(p -> !p.getDataInfo().getDeletedbyinference()) - .map(oaf -> new TypedRow().setType("publication").setSourceId(oaf.getId())) - .mapToPair(toPair())); - - - } -