diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java index 61bd952db2..5b038b49ae 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java @@ -3,11 +3,16 @@ package eu.dnetlib.dhp.actionmanager.project; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Optional; +import java.util.function.Consumer; +import eu.dnetlib.dhp.schema.action.AtomicAction; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -28,6 +33,15 @@ import eu.dnetlib.dhp.schema.oaf.Programme; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.utils.DHPUtils; +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.io.SequenceFile; +import org.apache.spark.rdd.SequenceFileRDDFunctions; +import org.apache.hadoop.io.Text; +import scala.Function1; +import scala.Tuple2; +import scala.runtime.BoxedUnit; + public class SparkAtomicActionJob { private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionJob.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -61,6 +75,8 @@ public class SparkAtomicActionJob { final String programmePath = parser.get("programmePath"); log.info("programmePath {}: ", programmePath); + final String nameNode = parser.get("hdfsNameNode"); + SparkConf conf = new SparkConf(); runWithSparkSession( @@ -72,7 +88,8 @@ public class SparkAtomicActionJob { spark, projectPath, programmePath, - outputPath); + outputPath, + nameNode); }); } @@ -82,38 +99,38 @@ public class SparkAtomicActionJob { private static void getAtomicActions(SparkSession spark, String projectPatH, String programmePath, - String outputPath) { + String outputPath, + String nameNode) throws Exception{ Dataset project = readPath(spark, projectPatH, CSVProject.class); Dataset programme = readPath(spark, programmePath, CSVProgramme.class); project - .joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left") - .map(c -> { - CSVProject csvProject = c._1(); - Optional csvProgramme = Optional.ofNullable(c._2()); - if (csvProgramme.isPresent()) { - Project p = new Project(); - p - .setId( - createOpenaireId( - ModelSupport.entityIdPrefix.get("project"), - "corda__h2020", csvProject.getId())); - Programme pm = new Programme(); - pm.setCode(csvProject.getProgramme()); - pm.setDescription(csvProgramme.get().getShortTitle()); - p.setProgramme(Arrays.asList(pm)); - return p; - } + .joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left") + .map(c -> { + CSVProject csvProject = c._1(); + Optional csvProgramme = Optional.ofNullable(c._2()); + if (csvProgramme.isPresent()) { + Project p = new Project(); + p + .setId( + createOpenaireId( + ModelSupport.entityIdPrefix.get("project"), + "corda__h2020", csvProject.getId())); + Programme pm = new Programme(); + pm.setCode(csvProject.getProgramme()); + pm.setDescription(csvProgramme.get().getShortTitle()); + p.setProgramme(Arrays.asList(pm)); + return new AtomicAction<>(Project.class, p); + } + + return null; + }, Encoders.bean(AtomicAction.class)) + .filter(aa -> !(aa == null)) + .toJavaRDD() + .mapToPair(aa->new Tuple2<>(aa.getClazz().getCanonicalName(), OBJECT_MAPPER.writeValueAsString(aa))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, null); - return null; - }, Encoders.bean(Project.class)) - .filter(p -> !(p == null)) - // .map(p -> new AtomicAction<>(Project.class, p), Encoders.bean(AtomicAction.class)) - .write() - .option("compression", "gzip") - .mode(SaveMode.Overwrite) - .json(outputPath); } public static Dataset readPath( diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml index cd4d79ab77..ba99fb3143 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml @@ -24,7 +24,6 @@ -