diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java index 5b038b49a..990e50abd 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java @@ -9,12 +9,17 @@ import java.util.HashMap; import java.util.Optional; import java.util.function.Consumer; -import eu.dnetlib.dhp.schema.action.AtomicAction; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.rdd.SequenceFileRDDFunctions; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -28,16 +33,11 @@ import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme; import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProject; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.Programme; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.utils.DHPUtils; - -import org.apache.hadoop.conf.Configuration; - -import org.apache.hadoop.io.SequenceFile; -import org.apache.spark.rdd.SequenceFileRDDFunctions; -import org.apache.hadoop.io.Text; import scala.Function1; import scala.Tuple2; import scala.runtime.BoxedUnit; @@ -75,8 +75,6 @@ public class SparkAtomicActionJob { final String programmePath = parser.get("programmePath"); log.info("programmePath {}: ", programmePath); - final String nameNode = parser.get("hdfsNameNode"); - SparkConf conf = new SparkConf(); runWithSparkSession( @@ -88,8 +86,7 @@ public class SparkAtomicActionJob { spark, projectPath, programmePath, - outputPath, - nameNode); + outputPath); }); } @@ -99,37 +96,39 @@ public class SparkAtomicActionJob { private static void getAtomicActions(SparkSession spark, String projectPatH, String programmePath, - String outputPath, - String nameNode) throws Exception{ + String outputPath) { Dataset project = readPath(spark, projectPatH, CSVProject.class); Dataset programme = readPath(spark, programmePath, CSVProgramme.class); project - .joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left") - .map(c -> { - CSVProject csvProject = c._1(); - Optional csvProgramme = Optional.ofNullable(c._2()); - if (csvProgramme.isPresent()) { - Project p = new Project(); - p - .setId( - createOpenaireId( - ModelSupport.entityIdPrefix.get("project"), - "corda__h2020", csvProject.getId())); - Programme pm = new Programme(); - pm.setCode(csvProject.getProgramme()); - pm.setDescription(csvProgramme.get().getShortTitle()); - p.setProgramme(Arrays.asList(pm)); - return new AtomicAction<>(Project.class, p); - } + .joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left") + .map(c -> { + CSVProject csvProject = c._1(); + Optional csvProgramme = Optional.ofNullable(c._2()); + if (csvProgramme.isPresent()) { + Project p = new Project(); + p + .setId( + createOpenaireId( + ModelSupport.entityIdPrefix.get("project"), + "corda__h2020", csvProject.getId())); + Programme pm = new Programme(); + pm.setCode(csvProject.getProgramme()); + pm.setDescription(csvProgramme.get().getShortTitle()); + p.setProgramme(Arrays.asList(pm)); + return p; + } - return null; - }, Encoders.bean(AtomicAction.class)) - .filter(aa -> !(aa == null)) - .toJavaRDD() - .mapToPair(aa->new Tuple2<>(aa.getClazz().getCanonicalName(), OBJECT_MAPPER.writeValueAsString(aa))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, null); + return null; + }, Encoders.bean(Project.class)) + .filter(p -> !(p == null)) + .toJavaRDD() + .map(p -> new AtomicAction(Project.class, p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); }