From 9447d78ef38abc918e6f0b7bb4f338ebe9f02c86 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 19 May 2020 18:42:50 +0200 Subject: [PATCH] added preparation classes --- .../dhp/actionmanager/project/GetFile.java | 53 ----- .../project/PrepareProgramme.java | 124 ++++++++++++ .../project/PrepareProjects.java | 189 ++++++++---------- .../project/SparkAtomicActionJob.java | 61 +++++- .../project/csvutils/CSVParser.java | 37 ++++ .../CSVProgramme.java} | 4 +- .../CSVProject.java} | 5 +- .../project/csvutils/ReadCSV.java | 98 +++++++++ .../CollectorPluginErrorLogList.java | 2 +- .../CollectorServiceException.java | 2 +- .../{ => httpconnector}/HttpConnector.java | 2 +- .../project/prepare_programme_parameters.json | 26 +++ .../project/prepare_project_parameters.json | 20 ++ .../actionmanager/project/CSVParserTest.java | 43 ++++ .../project/PrepareProgrammeTest.java | 4 + .../project/SparkUpdateProjectTest.java | 4 + .../httpconnector/HttpConnectorTest.java | 4 + .../preparedProgramme_whole.json | 0 .../dhp/actionmanager/project/programme.csv | 0 .../dhp/actionmanager/projects_subset.json | 0 .../dhp/actionmanager/whole_programme.json | 0 21 files changed, 507 insertions(+), 171 deletions(-) delete mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/GetFile.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVParser.java rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/{Programme.java => csvutils/CSVProgramme.java} (87%) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/{Project.java => csvutils/CSVProject.java} (97%) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/ReadCSV.java rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/{ => httpconnector}/CollectorPluginErrorLogList.java (86%) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/{ => httpconnector}/CollectorServiceException.java (86%) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/{ => httpconnector}/HttpConnector.java (99%) create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_programme_parameters.json create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_project_parameters.json create mode 100644 dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/CSVParserTest.java create mode 100644 dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgrammeTest.java create mode 100644 dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/SparkUpdateProjectTest.java create mode 100644 dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/preparedProgramme_whole.json create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/project/programme.csv create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/projects_subset.json create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/whole_programme.json diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/GetFile.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/GetFile.java deleted file mode 100644 index bbf59a20f..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/GetFile.java +++ /dev/null @@ -1,53 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.project; - -import java.io.*; -import java.net.URL; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; - -public class GetFile { - - private static final Log log = LogFactory.getLog(GetFile.class); - - public static void main(final String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - GetFile.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/project/parameters.json"))); - - Configuration conf = new Configuration(); - - parser.parseArgument(args); - - final String fileURL = parser.get("fileURL"); - final String hdfsPath = parser.get("hdfsPath"); - final String hdfsNameNode = parser.get("hdfsNameNode"); - - conf.set("fs.defaultFS", hdfsNameNode); - FileSystem fileSystem = FileSystem.get(conf); - Path hdfsWritePath = new Path(hdfsPath); - FSDataOutputStream fsDataOutputStream = null; - if (fileSystem.exists(hdfsWritePath)) { - fsDataOutputStream = fileSystem.append(hdfsWritePath); - } else { - fsDataOutputStream = fileSystem.create(hdfsWritePath); - } - - InputStream is = new BufferedInputStream(new URL(fileURL).openStream()); - - org.apache.hadoop.io.IOUtils.copyBytes(is, fsDataOutputStream, 4096, true); - - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java new file mode 100644 index 000000000..a5abb9ea7 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java @@ -0,0 +1,124 @@ + +package eu.dnetlib.dhp.actionmanager.project; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.HashMap; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import scala.Tuple2; + +public class PrepareProgramme { + + private static final Logger log = LoggerFactory.getLogger(PrepareProgramme.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final HashMap programmeMap = new HashMap<>(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + PrepareProgramme.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/project/prepare_programme_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String programmePath = parser.get("programmePath"); + log.info("programmePath {}: ", programmePath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + exec(spark, programmePath, outputPath); + }); + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + + private static void exec(SparkSession spark, String programmePath, String outputPath) { + Dataset programme = readPath(spark, programmePath, CSVProgramme.class); + + programme + .toJavaRDD() + .filter(p -> !p.getCode().contains("FP7")) + .mapToPair(csvProgramme -> new Tuple2<>(csvProgramme.getCode(), csvProgramme)) + .reduceByKey((a, b) -> { + if (StringUtils.isEmpty(a.getShortTitle())) { + if (StringUtils.isEmpty(b.getShortTitle())) { + if (StringUtils.isEmpty(a.getTitle())) { + if (StringUtils.isNotEmpty(b.getTitle())) { + a.setShortTitle(b.getTitle()); + a.setLanguage(b.getLanguage()); + } + } else {// notIsEmpty a.getTitle + if (StringUtils.isEmpty(b.getTitle())) { + a.setShortTitle(a.getTitle()); + } else { + if (b.getLanguage().equalsIgnoreCase("en")) { + a.setShortTitle(b.getTitle()); + a.setLanguage(b.getLanguage()); + } else { + a.setShortTitle(a.getTitle()); + } + } + } + } else {// not isEmpty b.getShortTitle + a.setShortTitle(b.getShortTitle()); + // a.setLanguage(b.getLanguage()); + } + } + return a; + + }) + .map(p -> { + CSVProgramme csvProgramme = p._2(); + if (StringUtils.isEmpty(csvProgramme.getShortTitle())) { + csvProgramme.setShortTitle(csvProgramme.getTitle()); + } + return OBJECT_MAPPER.writeValueAsString(csvProgramme); + }) + .saveAsTextFile(outputPath); + + } + + public static Dataset readPath( + SparkSession spark, String inputPath, Class clazz) { + return spark + .read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java index 8955edeb4..1c98199f8 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java @@ -1,139 +1,108 @@ package eu.dnetlib.dhp.actionmanager.project; -import java.io.BufferedWriter; -import java.io.Closeable; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.nio.charset.StandardCharsets; -import java.sql.ResultSet; -import java.util.Arrays; -import java.util.List; -import java.util.Set; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import org.apache.commons.csv.CSVParser; -import org.apache.commons.csv.CSVFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme; +import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProject; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.common.RelationInverse; -import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.common.HdfsSupport; +import scala.Tuple2; -public class PrepareProjects implements Closeable { - private static final Log log = LogFactory.getLog(PrepareProjects.class); - private final Configuration conf; - private final BufferedWriter writer; - private final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private final HttpConnector httpConnector; +public class PrepareProjects { - public static void main(final String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - PrepareProjects.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/project/parameters.json"))); + private static final Logger log = LoggerFactory.getLogger(PrepareProgramme.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final HashMap programmeMap = new HashMap<>(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + PrepareProjects.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/project/prepare_project_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); - final String fileURL = parser.get("fileURL"); - final String hdfsPath = parser.get("hdfsPath"); - final String hdfsNameNode = parser.get("hdfsNameNode"); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); - try (final PrepareProjects prepareProjects = new PrepareProjects(hdfsPath, hdfsNameNode)) { + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - log.info("Getting projects..."); - prepareProjects.execute(fileURL); + final String projectPath = parser.get("projectPath"); + log.info("projectPath {}: ", projectPath); - } + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + exec(spark, projectPath, outputPath); + }); } - public void execute(final String fileURL) throws Exception { - - String projects = httpConnector.getInputSource(fileURL); - final CSVFormat format = CSVFormat.EXCEL - .withHeader() - .withDelimiter(';') - .withQuote('"') - .withTrim(); - final CSVParser parser = CSVParser.parse(projects, format); - final Set headers = parser.getHeaderMap().keySet(); + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } - public List processBlacklistEntry(ResultSet rs) { - try { - Relation direct = new Relation(); - Relation inverse = new Relation(); + private static void exec(SparkSession spark, String progjectPath, String outputPath) { + Dataset project = readPath(spark, progjectPath, CSVProject.class); - String source_prefix = ModelSupport.entityIdPrefix.get(rs.getString("source_type")); - String target_prefix = ModelSupport.entityIdPrefix.get(rs.getString("target_type")); + project + .toJavaRDD() + .flatMap(p -> { + List csvProjectList = new ArrayList<>(); + String[] programme = p.getProgramme().split(";"); + if (programme.length > 1) { + for (int i = 0; i < programme.length; i++) { + CSVProject csvProject = new CSVProject(); + csvProject.setProgramme(programme[i]); + csvProjectList.add(csvProject); + } + } else { + csvProjectList.add(p); + } - String source_direct = source_prefix + "|" + rs.getString("source"); - direct.setSource(source_direct); - inverse.setTarget(source_direct); - - String target_direct = target_prefix + "|" + rs.getString("target"); - direct.setTarget(target_direct); - inverse.setSource(target_direct); - - String encoding = rs.getString("relationship"); - RelationInverse ri = ModelSupport.relationInverseMap.get(encoding); - direct.setRelClass(ri.getRelation()); - inverse.setRelClass(ri.getInverse()); - direct.setRelType(ri.getRelType()); - inverse.setRelType(ri.getRelType()); - direct.setSubRelType(ri.getSubReltype()); - inverse.setSubRelType(ri.getSubReltype()); - - return Arrays.asList(direct, inverse); - - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public void close() throws IOException { - writer.close(); - } - - public PrepareProjects( - final String hdfsPath, String hdfsNameNode) - throws Exception { - - this.conf = new Configuration(); - this.conf.set("fs.defaultFS", hdfsNameNode); - this.httpConnector = new HttpConnector(); - FileSystem fileSystem = FileSystem.get(this.conf); - Path hdfsWritePath = new Path(hdfsPath); - FSDataOutputStream fsDataOutputStream = null; - if (fileSystem.exists(hdfsWritePath)) { - fsDataOutputStream = fileSystem.append(hdfsWritePath); - } else { - fsDataOutputStream = fileSystem.create(hdfsWritePath); - } - - this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); + return csvProjectList.iterator(); + }) + .map(p -> OBJECT_MAPPER.writeValueAsString(p)) + .saveAsTextFile(outputPath); } - protected void writeRelation(final Relation r) { - try { - writer.write(OBJECT_MAPPER.writeValueAsString(r)); - writer.newLine(); - } catch (final Exception e) { - throw new RuntimeException(e); - } + public static Dataset readPath( + SparkSession spark, String inputPath, Class clazz) { + return spark + .read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); } - } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java index b8703378e..61bd952db 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java @@ -3,22 +3,35 @@ package eu.dnetlib.dhp.actionmanager.project; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.util.Arrays; +import java.util.HashMap; import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme; +import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProject; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Programme; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.utils.DHPUtils; public class SparkAtomicActionJob { private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionJob.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final HashMap programmeMap = new HashMap<>(); public static void main(String[] args) throws Exception { @@ -67,8 +80,54 @@ public class SparkAtomicActionJob { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } - private static void getAtomicActions(SparkSession spark, String projectPatj, String programmePath, + private static void getAtomicActions(SparkSession spark, String projectPatH, + String programmePath, String outputPath) { + Dataset project = readPath(spark, projectPatH, CSVProject.class); + Dataset programme = readPath(spark, programmePath, CSVProgramme.class); + + project + .joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left") + .map(c -> { + CSVProject csvProject = c._1(); + Optional csvProgramme = Optional.ofNullable(c._2()); + if (csvProgramme.isPresent()) { + Project p = new Project(); + p + .setId( + createOpenaireId( + ModelSupport.entityIdPrefix.get("project"), + "corda__h2020", csvProject.getId())); + Programme pm = new Programme(); + pm.setCode(csvProject.getProgramme()); + pm.setDescription(csvProgramme.get().getShortTitle()); + p.setProgramme(Arrays.asList(pm)); + return p; + } + + return null; + }, Encoders.bean(Project.class)) + .filter(p -> !(p == null)) + // .map(p -> new AtomicAction<>(Project.class, p), Encoders.bean(AtomicAction.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath); + } + + public static Dataset readPath( + SparkSession spark, String inputPath, Class clazz) { + return spark + .read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); + } + + public static String createOpenaireId( + final String prefix, final String nsPrefix, final String id) { + + return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(id)); + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVParser.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVParser.java new file mode 100644 index 000000000..ef29a6b6a --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVParser.java @@ -0,0 +1,37 @@ + +package eu.dnetlib.dhp.actionmanager.project.csvutils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.lang.reflect.FieldUtils; + +public class CSVParser { + + public List parse(String csvFile, String classForName) + throws ClassNotFoundException, IOException, IllegalAccessException, InstantiationException { + final CSVFormat format = CSVFormat.EXCEL + .withHeader() + .withDelimiter(';') + .withQuote('"') + .withTrim(); + List ret = new ArrayList<>(); + final org.apache.commons.csv.CSVParser parser = org.apache.commons.csv.CSVParser.parse(csvFile, format); + final Set headers = parser.getHeaderMap().keySet(); + Class clazz = Class.forName(classForName); + for (CSVRecord csvRecord : parser.getRecords()) { + final Object cc = clazz.newInstance(); + for (String header : headers) { + FieldUtils.writeField(cc, header, csvRecord.get(header), true); + + } + ret.add((R) cc); + } + + return ret; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/Programme.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProgramme.java similarity index 87% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/Programme.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProgramme.java index 20877b1a1..a9069e510 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/Programme.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProgramme.java @@ -1,9 +1,9 @@ -package eu.dnetlib.dhp.actionmanager.project; +package eu.dnetlib.dhp.actionmanager.project.csvutils; import java.io.Serializable; -public class Programme implements Serializable { +public class CSVProgramme implements Serializable { private String rcn; private String code; private String title; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/Project.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProject.java similarity index 97% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/Project.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProject.java index abee7f861..ff18c6260 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/Project.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProject.java @@ -1,9 +1,9 @@ -package eu.dnetlib.dhp.actionmanager.project; +package eu.dnetlib.dhp.actionmanager.project.csvutils; import java.io.Serializable; -public class Project implements Serializable { +public class CSVProject implements Serializable { private String rcn; private String id; private String acronym; @@ -193,4 +193,5 @@ public class Project implements Serializable { public void setSubjects(String subjects) { this.subjects = subjects; } + } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/ReadCSV.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/ReadCSV.java new file mode 100644 index 000000000..905194232 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/ReadCSV.java @@ -0,0 +1,98 @@ + +package eu.dnetlib.dhp.actionmanager.project.csvutils; + +import java.io.BufferedWriter; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.project.httpconnector.HttpConnector; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class ReadCSV implements Closeable { + private static final Log log = LogFactory.getLog(ReadCSV.class); + private final Configuration conf; + private final BufferedWriter writer; + private final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private String csvFile; + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + ReadCSV.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/project/parameters.json"))); + + parser.parseArgument(args); + + final String fileURL = parser.get("fileURL"); + final String hdfsPath = parser.get("hdfsPath"); + final String hdfsNameNode = parser.get("hdfsNameNode"); + final String classForName = parser.get("classForName"); + + try (final ReadCSV readCSV = new ReadCSV(hdfsPath, hdfsNameNode, fileURL)) { + + log.info("Getting CSV file..."); + readCSV.execute(classForName); + + } + } + + public void execute(final String classForName) throws Exception { + CSVParser csvParser = new CSVParser(); + csvParser + .parse(csvFile, classForName) + .stream() + .forEach(p -> write(p)); + + } + + @Override + public void close() throws IOException { + writer.close(); + } + + public ReadCSV( + final String hdfsPath, + final String hdfsNameNode, + final String fileURL) + throws Exception { + this.conf = new Configuration(); + this.conf.set("fs.defaultFS", hdfsNameNode); + HttpConnector httpConnector = new HttpConnector(); + FileSystem fileSystem = FileSystem.get(this.conf); + Path hdfsWritePath = new Path(hdfsPath); + FSDataOutputStream fsDataOutputStream = null; + if (fileSystem.exists(hdfsWritePath)) { + fsDataOutputStream = fileSystem.append(hdfsWritePath); + } else { + fsDataOutputStream = fileSystem.create(hdfsWritePath); + } + + this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); + this.csvFile = httpConnector.getInputSource(fileURL); + ; + } + + protected void write(final Object p) { + try { + writer.write(OBJECT_MAPPER.writeValueAsString(p)); + writer.newLine(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/CollectorPluginErrorLogList.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorPluginErrorLogList.java similarity index 86% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/CollectorPluginErrorLogList.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorPluginErrorLogList.java index bc00e4604..9d3f88265 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/CollectorPluginErrorLogList.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorPluginErrorLogList.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.actionmanager.project; +package eu.dnetlib.dhp.actionmanager.project.httpconnector; import java.util.LinkedList; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/CollectorServiceException.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorServiceException.java similarity index 86% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/CollectorServiceException.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorServiceException.java index a417de50d..9167d97b4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/CollectorServiceException.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorServiceException.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.actionmanager.project; +package eu.dnetlib.dhp.actionmanager.project.httpconnector; public class CollectorServiceException extends Exception { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/HttpConnector.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnector.java similarity index 99% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/HttpConnector.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnector.java index 63f67f145..e20518b55 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/HttpConnector.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnector.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.actionmanager.project; +package eu.dnetlib.dhp.actionmanager.project.httpconnector; import java.io.IOException; import java.io.InputStream; diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_programme_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_programme_parameters.json new file mode 100644 index 000000000..a0856e10e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_programme_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false +}, +{ +"paramName": "pjp", +"paramLongName": "projectPath", +"paramDescription": "the URL from where to get the projects file", +"paramRequired": true +}, +{ +"paramName": "pp", +"paramLongName": "programmePath", +"paramDescription": "the URL from where to get the programme file", +"paramRequired": true +}, +{ +"paramName": "o", +"paramLongName": "outputPath", +"paramDescription": "the path of the new ActionSet", +"paramRequired": true +} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_project_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_project_parameters.json new file mode 100644 index 000000000..54083e108 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_project_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false +}, +{ +"paramName": "pp", +"paramLongName": "programmePath", +"paramDescription": "the URL from where to get the programme file", +"paramRequired": true +}, +{ +"paramName": "o", +"paramLongName": "outputPath", +"paramDescription": "the path of the new ActionSet", +"paramRequired": true +} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/CSVParserTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/CSVParserTest.java new file mode 100644 index 000000000..d344f3118 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/CSVParserTest.java @@ -0,0 +1,43 @@ +package eu.dnetlib.dhp.actionmanager.project; + +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +public class ReadCSVTest { + + private static Path workingDir; + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(eu.dnetlib.dhp.actionmanager.project.ReadCSVTest.class.getSimpleName()); + + + } + @Test + public void readProgrammeTest() throws Exception { + + String programmecsv = IOUtils.toString(getClass() + .getClassLoader().getResourceAsStream("eu/dnetlib/dhp/actionmanager/project/programme.csv")); + ReadCSV + .main( + new String[] { + "-fileURL", + "http://cordis.europa.eu/data/reference/cordisref-H2020programmes.csv", + "-outputPath", + workingDir.toString() + "/project", + "-hdfsPath", + getClass().getResource("/eu/dnetlib/dhp/blacklist/blacklist").getPath(), + "-mergesPath", + getClass().getResource("/eu/dnetlib/dhp/blacklist/mergesRelOneMerge").getPath(), + }); + + + + + } +} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgrammeTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgrammeTest.java new file mode 100644 index 000000000..b22e6bd6d --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgrammeTest.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.actionmanager.project; + +public class PrepareProgrammeTest { +} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/SparkUpdateProjectTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/SparkUpdateProjectTest.java new file mode 100644 index 000000000..f7d271fe0 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/SparkUpdateProjectTest.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.actionmanager.project; + +public class SparkUpdateProjectSet { +} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java new file mode 100644 index 000000000..29e9a6cce --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.actionmanager.project.httpconnector; + +public class HttpConnectorTest { +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/preparedProgramme_whole.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/preparedProgramme_whole.json new file mode 100644 index 000000000..e69de29bb diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/project/programme.csv b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/project/programme.csv new file mode 100644 index 000000000..e69de29bb diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/projects_subset.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/projects_subset.json new file mode 100644 index 000000000..e69de29bb diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/whole_programme.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/whole_programme.json new file mode 100644 index 000000000..e69de29bb