enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
2 changed files with 44 additions and 28 deletions
Showing only changes of commit 4589c428b1 - Show all commits

View File

@ -3,11 +3,16 @@ package eu.dnetlib.dhp.actionmanager.project;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Optional;
import java.util.function.Consumer;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
@ -28,6 +33,15 @@ import eu.dnetlib.dhp.schema.oaf.Programme;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.SequenceFile;
import org.apache.spark.rdd.SequenceFileRDDFunctions;
import org.apache.hadoop.io.Text;
import scala.Function1;
import scala.Tuple2;
import scala.runtime.BoxedUnit;
public class SparkAtomicActionJob {
private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@ -61,6 +75,8 @@ public class SparkAtomicActionJob {
final String programmePath = parser.get("programmePath");
log.info("programmePath {}: ", programmePath);
final String nameNode = parser.get("hdfsNameNode");
SparkConf conf = new SparkConf();
runWithSparkSession(
@ -72,7 +88,8 @@ public class SparkAtomicActionJob {
spark,
projectPath,
programmePath,
outputPath);
outputPath,
nameNode);
});
}
@ -82,38 +99,38 @@ public class SparkAtomicActionJob {
private static void getAtomicActions(SparkSession spark, String projectPatH,
String programmePath,
String outputPath) {
String outputPath,
String nameNode) throws Exception{
Dataset<CSVProject> project = readPath(spark, projectPatH, CSVProject.class);
Dataset<CSVProgramme> programme = readPath(spark, programmePath, CSVProgramme.class);
project
.joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left")
.map(c -> {
CSVProject csvProject = c._1();
Optional<CSVProgramme> csvProgramme = Optional.ofNullable(c._2());
if (csvProgramme.isPresent()) {
Project p = new Project();
p
.setId(
createOpenaireId(
ModelSupport.entityIdPrefix.get("project"),
"corda__h2020", csvProject.getId()));
Programme pm = new Programme();
pm.setCode(csvProject.getProgramme());
pm.setDescription(csvProgramme.get().getShortTitle());
p.setProgramme(Arrays.asList(pm));
return p;
}
.joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left")
.map(c -> {
CSVProject csvProject = c._1();
Optional<CSVProgramme> csvProgramme = Optional.ofNullable(c._2());
if (csvProgramme.isPresent()) {
Project p = new Project();
p
.setId(
createOpenaireId(
ModelSupport.entityIdPrefix.get("project"),
"corda__h2020", csvProject.getId()));
Programme pm = new Programme();
pm.setCode(csvProject.getProgramme());
pm.setDescription(csvProgramme.get().getShortTitle());
p.setProgramme(Arrays.asList(pm));
return new AtomicAction<>(Project.class, p);
}
return null;
}, Encoders.bean(AtomicAction.class))
.filter(aa -> !(aa == null))
.toJavaRDD()
.mapToPair(aa->new Tuple2<>(aa.getClazz().getCanonicalName(), OBJECT_MAPPER.writeValueAsString(aa)))
.saveAsHadoopFile(outputPath, Text.class, Text.class, null);
return null;
}, Encoders.bean(Project.class))
.filter(p -> !(p == null))
// .map(p -> new AtomicAction<>(Project.class, p), Encoders.bean(AtomicAction.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
public static <R> Dataset<R> readPath(

View File

@ -24,7 +24,6 @@
<fs>
<delete path='${outputPath}'/>
<mkdir path='${outputPath}'/>
<delete path="/tmp/h2020programme"/>
</fs>
<ok to="get_project_file"/>
<error to="Kill"/>