produces AtomicActions instead of Projects

This commit is contained in:
Miriam Baglioni 2020-05-22 15:26:57 +02:00
parent eae12a6586
commit 473c6d3a23
1 changed files with 35 additions and 36 deletions

View File

@ -9,12 +9,17 @@ import java.util.HashMap;
import java.util.Optional; import java.util.Optional;
import java.util.function.Consumer; import java.util.function.Consumer;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.rdd.SequenceFileRDDFunctions;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
@ -28,16 +33,11 @@ import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme;
import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProject; import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProject;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Programme; import eu.dnetlib.dhp.schema.oaf.Programme;
import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.SequenceFile;
import org.apache.spark.rdd.SequenceFileRDDFunctions;
import org.apache.hadoop.io.Text;
import scala.Function1; import scala.Function1;
import scala.Tuple2; import scala.Tuple2;
import scala.runtime.BoxedUnit; import scala.runtime.BoxedUnit;
@ -75,8 +75,6 @@ public class SparkAtomicActionJob {
final String programmePath = parser.get("programmePath"); final String programmePath = parser.get("programmePath");
log.info("programmePath {}: ", programmePath); log.info("programmePath {}: ", programmePath);
final String nameNode = parser.get("hdfsNameNode");
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
@ -88,8 +86,7 @@ public class SparkAtomicActionJob {
spark, spark,
projectPath, projectPath,
programmePath, programmePath,
outputPath, outputPath);
nameNode);
}); });
} }
@ -99,37 +96,39 @@ public class SparkAtomicActionJob {
private static void getAtomicActions(SparkSession spark, String projectPatH, private static void getAtomicActions(SparkSession spark, String projectPatH,
String programmePath, String programmePath,
String outputPath, String outputPath) {
String nameNode) throws Exception{
Dataset<CSVProject> project = readPath(spark, projectPatH, CSVProject.class); Dataset<CSVProject> project = readPath(spark, projectPatH, CSVProject.class);
Dataset<CSVProgramme> programme = readPath(spark, programmePath, CSVProgramme.class); Dataset<CSVProgramme> programme = readPath(spark, programmePath, CSVProgramme.class);
project project
.joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left") .joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left")
.map(c -> { .map(c -> {
CSVProject csvProject = c._1(); CSVProject csvProject = c._1();
Optional<CSVProgramme> csvProgramme = Optional.ofNullable(c._2()); Optional<CSVProgramme> csvProgramme = Optional.ofNullable(c._2());
if (csvProgramme.isPresent()) { if (csvProgramme.isPresent()) {
Project p = new Project(); Project p = new Project();
p p
.setId( .setId(
createOpenaireId( createOpenaireId(
ModelSupport.entityIdPrefix.get("project"), ModelSupport.entityIdPrefix.get("project"),
"corda__h2020", csvProject.getId())); "corda__h2020", csvProject.getId()));
Programme pm = new Programme(); Programme pm = new Programme();
pm.setCode(csvProject.getProgramme()); pm.setCode(csvProject.getProgramme());
pm.setDescription(csvProgramme.get().getShortTitle()); pm.setDescription(csvProgramme.get().getShortTitle());
p.setProgramme(Arrays.asList(pm)); p.setProgramme(Arrays.asList(pm));
return new AtomicAction<>(Project.class, p); return p;
} }
return null; return null;
}, Encoders.bean(AtomicAction.class)) }, Encoders.bean(Project.class))
.filter(aa -> !(aa == null)) .filter(p -> !(p == null))
.toJavaRDD() .toJavaRDD()
.mapToPair(aa->new Tuple2<>(aa.getClazz().getCanonicalName(), OBJECT_MAPPER.writeValueAsString(aa))) .map(p -> new AtomicAction(Project.class, p))
.saveAsHadoopFile(outputPath, Text.class, Text.class, null); .mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
} }