diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java index 3d8226f4d..78aed1a69 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java @@ -57,7 +57,7 @@ public class PrepareProjects { final String outputPath = parser.get("outputPath"); log.info("outputPath {}: ", outputPath); - final String dbProjectPath = parser.get("dbProjectPath"); + final String dbProjectPath = parser.get("dbProjectPath"); log.info("dbProjectPath {}: ", dbProjectPath); SparkConf conf = new SparkConf(); @@ -75,56 +75,39 @@ public class PrepareProjects { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } - private static void exec(SparkSession spark, String progjectPath, String dbProjectPath, String outputPath) { - Dataset project = readPath(spark, progjectPath, CSVProject.class); + private static void exec(SparkSession spark, String projectPath, String dbProjectPath, String outputPath) { + Dataset project = readPath(spark, projectPath, CSVProject.class); Dataset dbProjects = readPath(spark, dbProjectPath, ProjectSubset.class); - dbProjects.joinWith(project, dbProjects.col("code").equalTo(project.col("id")), "left") - .flatMap((FlatMapFunction, CSVProject>) value -> { - Optional csvProject = Optional.ofNullable(value._2()); - if(! csvProject.isPresent()){ - return null; - } - List csvProjectList = new ArrayList<>(); - String[] programme = csvProject.get().getProgramme().split(";"); - Arrays - .stream(programme) - .forEach(p -> { - CSVProject proj = new CSVProject(); - proj.setProgramme(p); - proj.setId(csvProject.get().getId()); - csvProjectList.add(proj); - }); + dbProjects + .joinWith(project, dbProjects.col("code").equalTo(project.col("id")), "left") + .flatMap(getTuple2CSVProjectFlatMapFunction(), Encoders.bean(CSVProject.class)) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); - return csvProjectList.iterator(); - }, Encoders.bean(CSVProject.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); -// -// .map(value -> { -// Optional csvProject = Optional.ofNullable(value._2()); -// }, Encoders.bean(CSVProject.class)) -// .filter(Objects::nonNull) -// .toJavaRDD() -// .flatMap(p -> { -// List csvProjectList = new ArrayList<>(); -// String[] programme = p.getProgramme().split(";"); -// Arrays -// .stream(programme) -// .forEach(value -> { -// CSVProject csvProject = new CSVProject(); -// csvProject.setProgramme(value); -// csvProject.setId(p.getId()); -// csvProjectList.add(csvProject); -// }); -// -// return csvProjectList.iterator(); -// }) -// .map(p -> OBJECT_MAPPER.writeValueAsString(p)) -// .saveAsTextFile(outputPath); + } + private static FlatMapFunction, CSVProject> getTuple2CSVProjectFlatMapFunction() { + return (FlatMapFunction, CSVProject>) value -> { + Optional csvProject = Optional.ofNullable(value._2()); + List csvProjectList = new ArrayList<>(); + if (csvProject.isPresent()) { + + String[] programme = csvProject.get().getProgramme().split(";"); + Arrays + .stream(programme) + .forEach(p -> { + CSVProject proj = new CSVProject(); + proj.setProgramme(p); + proj.setId(csvProject.get().getId()); + csvProjectList.add(proj); + }); + } + return csvProjectList.iterator(); + }; } public static Dataset readPath( diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsFromDB.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsFromDB.java index 0015dc60f..2d541d2f9 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsFromDB.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsFromDB.java @@ -1,15 +1,5 @@ -package eu.dnetlib.dhp.actionmanager.project; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.DbClient; -import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +package eu.dnetlib.dhp.actionmanager.project; import java.io.BufferedWriter; import java.io.Closeable; @@ -22,92 +12,104 @@ import java.util.List; import java.util.function.Consumer; import java.util.function.Function; +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.DbClient; + public class ReadProjectsFromDB implements Closeable { - private final DbClient dbClient; - private static final Log log = LogFactory.getLog(ReadProjectsFromDB.class); - private final Configuration conf; - private final BufferedWriter writer; - private final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final DbClient dbClient; + private static final Log log = LogFactory.getLog(ReadProjectsFromDB.class); + private final Configuration conf; + private final BufferedWriter writer; + private final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private final static String query = "SELECT code " + - "from projects where id like 'corda__h2020%' " ; + private final static String query = "SELECT code " + + "from projects where id like 'corda__h2020%' "; - public static void main(final String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - ReadProjectsFromDB.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/project/read_projects_db.json"))); + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + ReadProjectsFromDB.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/project/read_projects_db.json"))); - parser.parseArgument(args); + parser.parseArgument(args); - final String dbUrl = parser.get("postgresUrl"); - final String dbUser = parser.get("postgresUser"); - final String dbPassword = parser.get("postgresPassword"); - final String hdfsPath = parser.get("hdfsPath") ; - final String hdfsNameNode = parser.get("hdfsNameNode"); + final String dbUrl = parser.get("postgresUrl"); + final String dbUser = parser.get("postgresUser"); + final String dbPassword = parser.get("postgresPassword"); + final String hdfsPath = parser.get("hdfsPath"); + final String hdfsNameNode = parser.get("hdfsNameNode"); - try (final ReadProjectsFromDB rbl = new ReadProjectsFromDB(hdfsPath, hdfsNameNode, dbUrl, dbUser, - dbPassword)) { + try (final ReadProjectsFromDB rbl = new ReadProjectsFromDB(hdfsPath, hdfsNameNode, dbUrl, dbUser, + dbPassword)) { - log.info("Processing blacklist..."); - rbl.execute(query, rbl::processProjectsEntry); + log.info("Processing projects..."); + rbl.execute(query, rbl::processProjectsEntry); - } - } - public void execute(final String sql, final Function> producer) throws Exception { + } + } - final Consumer consumer = rs -> producer.apply(rs).forEach(r -> writeProject(r)); + public void execute(final String sql, final Function> producer) throws Exception { - dbClient.processResults(sql, consumer); - } + final Consumer consumer = rs -> producer.apply(rs).forEach(r -> writeProject(r)); - public List processProjectsEntry(ResultSet rs) { - try { - ProjectSubset p = new ProjectSubset(); - p.setCode(rs.getString("code")); + dbClient.processResults(sql, consumer); + } - return Arrays.asList(p); + public List processProjectsEntry(ResultSet rs) { + try { + ProjectSubset p = new ProjectSubset(); + p.setCode(rs.getString("code")); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } + return Arrays.asList(p); - protected void writeProject(final ProjectSubset r) { - try { - writer.write(OBJECT_MAPPER.writeValueAsString(r)); - writer.newLine(); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } + } catch (final Exception e) { + throw new RuntimeException(e); + } + } - public ReadProjectsFromDB( - final String hdfsPath, String hdfsNameNode, final String dbUrl, final String dbUser, final String dbPassword) - throws Exception { + protected void writeProject(final ProjectSubset r) { + try { + writer.write(OBJECT_MAPPER.writeValueAsString(r)); + writer.newLine(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } - this.dbClient = new DbClient(dbUrl, dbUser, dbPassword); - this.conf = new Configuration(); - this.conf.set("fs.defaultFS", hdfsNameNode); - FileSystem fileSystem = FileSystem.get(this.conf); - Path hdfsWritePath = new Path(hdfsPath); - FSDataOutputStream fsDataOutputStream = null; - if (fileSystem.exists(hdfsWritePath)) { - fileSystem.delete(hdfsWritePath, false); - } - fsDataOutputStream = fileSystem.create(hdfsWritePath); + public ReadProjectsFromDB( + final String hdfsPath, String hdfsNameNode, final String dbUrl, final String dbUser, final String dbPassword) + throws Exception { + this.dbClient = new DbClient(dbUrl, dbUser, dbPassword); + this.conf = new Configuration(); + this.conf.set("fs.defaultFS", hdfsNameNode); + FileSystem fileSystem = FileSystem.get(this.conf); + Path hdfsWritePath = new Path(hdfsPath); + FSDataOutputStream fsDataOutputStream = null; + if (fileSystem.exists(hdfsWritePath)) { + fileSystem.delete(hdfsWritePath, false); + } + fsDataOutputStream = fileSystem.create(hdfsWritePath); - this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); - } + this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); + } - @Override - public void close() throws IOException { - dbClient.close(); - writer.close(); - } + @Override + public void close() throws IOException { + dbClient.close(); + writer.close(); + } } - diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml index ca0a73b97..1e3445675 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml @@ -96,7 +96,7 @@ yarn cluster - PrepareProgramme + PrepareProjects eu.dnetlib.dhp.actionmanager.project.PrepareProjects dhp-aggregation-${projectVersion}.jar @@ -111,6 +111,7 @@ --projectPath${workingDir}/projects --outputPath${workingDir}/preparedProjects + --dbProjectPath${workingDir}/dbProjects diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjectTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjectTest.java index 73bedb741..5ff88e46f 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjectTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjectTest.java @@ -67,7 +67,7 @@ public class PrepareProjectTest { } @Test - public void numberDistinctProgrammeTest() throws Exception { + public void numberDistinctProjectTest() throws Exception { PrepareProjects .main( new String[] { @@ -76,7 +76,10 @@ public class PrepareProjectTest { "-projectPath", getClass().getResource("/eu/dnetlib/dhp/actionmanager/project/projects_subset.json").getPath(), "-outputPath", - workingDir.toString() + "/preparedProjects" + workingDir.toString() + "/preparedProjects", + "-dbProjectPath", + getClass().getResource("/eu/dnetlib/dhp/actionmanager/project/dbProject").getPath(), + }); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); @@ -85,7 +88,7 @@ public class PrepareProjectTest { .textFile(workingDir.toString() + "/preparedProjects") .map(item -> OBJECT_MAPPER.readValue(item, CSVProject.class)); - Assertions.assertEquals(20, tmp.count()); + Assertions.assertEquals(8, tmp.count()); Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(CSVProject.class)); diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/SparkUpdateProjectTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/SparkUpdateProjectTest.java index 64c6ac32f..4d3ec140b 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/SparkUpdateProjectTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/SparkUpdateProjectTest.java @@ -5,7 +5,6 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import eu.dnetlib.dhp.schema.action.AtomicAction; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -21,6 +20,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.oaf.Project; public class SparkUpdateProjectTest { @@ -84,15 +84,12 @@ public class SparkUpdateProjectTest { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) - .map(aa -> ((Project)aa.getPayload())) - ; + .map(aa -> ((Project) aa.getPayload())); Assertions.assertEquals(14, tmp.count()); -// Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(CSVProgramme.class)); -// -// Assertions.assertEquals(0, verificationDataset.filter("shortTitle =''").count()); + } } diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/project/dbProject b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/project/dbProject new file mode 100644 index 000000000..f8e3c4589 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/project/dbProject @@ -0,0 +1,8 @@ +{"code":"894593"} +{"code":"897004"} +{"code":"896300"} +{"code":"892890"} +{"code":"886828"} +{"code":"8867767"} +{"code":"101003374"} +{"code":"886776"} \ No newline at end of file