[ECclassification] added new classes

This commit is contained in:
Miriam Baglioni 2023-03-01 15:29:11 +01:00
parent 4f2df876cd
commit c1f9848953
11 changed files with 373 additions and 81 deletions

View File

@ -266,11 +266,15 @@ public class PrepareProgramme {
String code = csvProgramme.getCode();
if (!code.endsWith(".") && !code.contains("Euratom")
&& !code.equals("H2020-EC"))
&& !code.equals("H2020-EC") && !code.equals("H2020") &&
!code.equals("H2020-Topics"))
code += ".";
csvProgramme.setClassification(map.get(code)._1());
csvProgramme.setClassification_short(map.get(code)._2());
if (map.containsKey(code)) {
csvProgramme.setClassification(map.get(code)._1());
csvProgramme.setClassification_short(map.get(code)._2());
} else
log.info("WARNING: No entry in map for code " + code);
return csvProgramme;
}).collect();

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.utils.model.CSVProgramme;
import eu.dnetlib.dhp.actionmanager.project.utils.model.CSVProject;
import eu.dnetlib.dhp.actionmanager.project.utils.model.EXCELTopic;
import eu.dnetlib.dhp.actionmanager.project.utils.model.JsonTopic;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.action.AtomicAction;
@ -110,7 +111,7 @@ public class SparkAtomicActionJob {
Dataset<CSVProject> project = readPath(spark, projectPatH, CSVProject.class);
Dataset<CSVProgramme> programme = readPath(spark, programmePath, CSVProgramme.class);
Dataset<EXCELTopic> topic = readPath(spark, topicPath, EXCELTopic.class);
Dataset<JsonTopic> topic = readPath(spark, topicPath, JsonTopic.class);
Dataset<Project> aaproject = project
.joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left")
@ -144,9 +145,9 @@ public class SparkAtomicActionJob {
.filter(Objects::nonNull);
aaproject
.joinWith(topic, aaproject.col("h2020topiccode").equalTo(topic.col("code")), "left")
.map((MapFunction<Tuple2<Project, EXCELTopic>, Project>) p -> {
Optional<EXCELTopic> op = Optional.ofNullable(p._2());
.joinWith(topic, aaproject.col("id").equalTo(topic.col("projectId")), "left")
.map((MapFunction<Tuple2<Project, JsonTopic>, Project>) p -> {
Optional<JsonTopic> op = Optional.ofNullable(p._2());
Project rp = p._1();
op.ifPresent(excelTopic -> rp.setH2020topicdescription(excelTopic.getTitle()));
return rp;

View File

@ -1,12 +1,13 @@
package eu.dnetlib.dhp.actionmanager.project.utils;
import java.io.*;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
@ -30,7 +31,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
* @author miriam.baglioni
* @Date 28/02/23
*/
public class ExtractProjects implements Serializable {
public class ExtractFromZip implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PrepareProjects.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@ -40,17 +41,17 @@ public class ExtractProjects implements Serializable {
.toString(
PrepareProjects.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/project/extract_project_parameters.json"));
"/eu/dnetlib/dhp/actionmanager/project/extract_fromzip_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final String projectPath = parser.get("projectPath");
log.info("projectPath {}: ", projectPath);
final String inputPath = parser.get("inputPath");
log.info("inputPath {}: ", inputPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath {}: ", workingPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
final String hdfsNameNode = parser.get("hdfsNameNode");
log.info("hdfsNameNode {}", hdfsNameNode);
@ -60,8 +61,8 @@ public class ExtractProjects implements Serializable {
FileSystem fs = FileSystem.get(conf);
doExtract(projectPath, workingPath, fs);
readProjects(workingPath + "json/project.json", workingPath + "projects/h2020_projects_nld.json", fs);
doExtract(inputPath, outputPath, fs);
}
private static void doExtract(String inputFile, String workingPath, FileSystem fileSystem)
@ -97,30 +98,4 @@ public class ExtractProjects implements Serializable {
}
private static void readProjects(String inputFile, String workingPath, FileSystem fs) throws IOException {
Path hdfsreadpath = new Path(inputFile);
FSDataInputStream inputStream = fs.open(hdfsreadpath);
ArrayList<Project> projects = OBJECT_MAPPER
.readValue(
IOUtils.toString(inputStream, "UTF-8"),
new TypeReference<List<Project>>() {
});
Path hdfsWritePath = new Path(workingPath);
if (fs.exists(hdfsWritePath)) {
fs.delete(hdfsWritePath, false);
}
FSDataOutputStream fos = fs.create(hdfsWritePath);
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))) {
for (Project p : projects) {
writer.write(OBJECT_MAPPER.writeValueAsString(p));
writer.newLine();
}
}
}
}

View File

@ -6,7 +6,9 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.collection.GetCSV;
@ -40,8 +42,11 @@ public class ReadCSV {
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
FSDataInputStream inputStream = fileSystem.open(new Path(fileURL));
BufferedReader reader = new BufferedReader(
new InputStreamReader(new HttpConnector2().getInputSourceAsStream(fileURL)));
new InputStreamReader(inputStream));
GetCSV.getCsv(fileSystem, reader, hdfsPath, classForName, del);

View File

@ -0,0 +1,90 @@
package eu.dnetlib.dhp.actionmanager.project.utils;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.PrepareProjects;
import eu.dnetlib.dhp.actionmanager.project.utils.model.Project;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
/**
* @author miriam.baglioni
* @Date 28/02/23
*/
public class ReadProjects implements Serializable {
private static final Logger log = LoggerFactory.getLogger(ReadProjects.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareProjects.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/project/read_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final String inputPath = parser.get("inputPath");
log.info("inputPath {}: ", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
final String hdfsNameNode = parser.get("hdfsNameNode");
log.info("hdfsNameNode {}", hdfsNameNode);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fs = FileSystem.get(conf);
readProjects(inputPath, outputPath, fs);
}
private static void readProjects(String inputFile, String workingPath, FileSystem fs) throws IOException {
Path hdfsreadpath = new Path(inputFile);
FSDataInputStream inputStream = fs.open(hdfsreadpath);
ArrayList<Project> projects = OBJECT_MAPPER
.readValue(
IOUtils.toString(inputStream, "UTF-8"),
new TypeReference<List<Project>>() {
});
Path hdfsWritePath = new Path(workingPath);
if (fs.exists(hdfsWritePath)) {
fs.delete(hdfsWritePath, false);
}
FSDataOutputStream fos = fs.create(hdfsWritePath);
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))) {
for (Project p : projects) {
writer.write(OBJECT_MAPPER.writeValueAsString(p));
writer.newLine();
}
}
}
}

View File

@ -0,0 +1,92 @@
package eu.dnetlib.dhp.actionmanager.project.utils;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.PrepareProjects;
import eu.dnetlib.dhp.actionmanager.project.utils.model.JsonTopic;
import eu.dnetlib.dhp.actionmanager.project.utils.model.Project;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
/**
* @author miriam.baglioni
* @Date 28/02/23
*/
public class ReadTopics implements Serializable {
private static final Logger log = LoggerFactory.getLogger(ReadTopics.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareProjects.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/project/read_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final String inputPath = parser.get("inputPath");
log.info("inputPath {}: ", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
final String hdfsNameNode = parser.get("hdfsNameNode");
log.info("hdfsNameNode {}", hdfsNameNode);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fs = FileSystem.get(conf);
readTopics(inputPath, outputPath, fs);
}
private static void readTopics(String inputFile, String workingPath, FileSystem fs) throws IOException {
Path hdfsreadpath = new Path(inputFile);
FSDataInputStream inputStream = fs.open(hdfsreadpath);
ArrayList<JsonTopic> topics = OBJECT_MAPPER
.readValue(
IOUtils.toString(inputStream, "UTF-8"),
new TypeReference<List<JsonTopic>>() {
});
Path hdfsWritePath = new Path(workingPath);
if (fs.exists(hdfsWritePath)) {
fs.delete(hdfsWritePath, false);
}
FSDataOutputStream fos = fs.create(hdfsWritePath);
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))) {
for (JsonTopic p : topics) {
writer.write(OBJECT_MAPPER.writeValueAsString(p));
writer.newLine();
}
}
}
}

View File

@ -0,0 +1,38 @@
package eu.dnetlib.dhp.actionmanager.project.utils.model;
import java.io.Serializable;
/**
* @author miriam.baglioni
* @Date 28/02/23
*/
public class JsonTopic implements Serializable {
private String projectID;
private String title;
private String topic;
public String getProjectID() {
return projectID;
}
public void setProjectID(String projectID) {
this.projectID = projectID;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
}

View File

@ -1,16 +1,16 @@
[
{
"paramName": "pjp",
"paramLongName": "projectPath",
"paramName": "ip",
"paramLongName": "inputPath",
"paramDescription": "the path where the projects are stored ",
"paramRequired": true
},
{
"paramName": "wp",
"paramLongName": "workingPath",
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "the path for the extracted folder",
"paramRequired": true
},

View File

@ -0,0 +1,3 @@
#!/bin/bash
hdfs dfs -rm $2
curl -LSs $1 | hdfs dfs -put - $2

View File

@ -14,7 +14,6 @@
</property>
<property>
<name>outputPath</name>
<value>noneed</value>
<description>path where to store the action set</description>
</property>
<property>
@ -35,42 +34,104 @@
<delete path='${workingDir}'/>
<mkdir path='${workingDir}'/>
</fs>
<ok to="fork_get_projects"/>
<ok to="fork_download_info"/>
<error to="Kill"/>
</action>
<fork name="fork_get_info">
<fork name="fork_download_info">
<path start="fork_get_projects"/>
<path start="get_programme_file"/>
<path start="get_topic_file"/>
<path start="download_programme_file"/>
<!-- <path start="read_topic_file"/>-->
</fork>
<fork name="fork_get_projects">
<path start="extract_projects"/>
<path start="read_projects"/>
<path start="download_projects"/>
<path start="read_projects_from_db"/>
</fork>
<action name="extract_projects">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.project.utils.ExtractProjects</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--projectPath</arg><arg>/tmp/miriam/cordis-h2020projects-json_.zip</arg>
<!-- <arg>&#45;&#45;workingPath</arg><arg>/tmp/miriam/cordis_h2020/</arg>-->
<!-- <arg>&#45;&#45;projectPath</arg><arg>${projectPath}</arg>-->
<arg>--workingPath</arg><arg>${workingDir}/</arg>
</java>
<ok to="wait_projects"/>
<!-- <ok to="End"/>-->
<action name="download_projects">
<shell xmlns="uri:oozie:shell-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>download.sh</exec>
<argument>${downloadH2020Projects}</argument>
<argument>${projectPath}</argument>
<env-var>HADOOP_USER_NAME=${wf:user()}</env-var>
<file>download.sh</file>
<capture-output/>
</shell>
<ok to="extract_projects"/>
<error to="Kill"/>
</action>
<action name="get_programme_file">
<action name="extract_projects">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.project.utils.ExtractFromZip</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--inputPath</arg><arg>${projectPath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/</arg>
</java>
<ok to="read_from_folder"/>
<error to="Kill"/>
</action>
<fork name="read_from_folder">
<path start="read_projects"/>
<path start="read_topic_file"/>
</fork>
<action name="read_projects">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.project.utils.ReadProjects</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--inputPath</arg><arg>${workingDir}/json/project.json</arg>
<arg>--outputPath</arg><arg>${workingDir}/projects</arg>
</java>
<ok to="wait_read_from_folder"/>
<error to="Kill"/>
</action>
<action name="download_programme_file">
<shell xmlns="uri:oozie:shell-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>download.sh</exec>
<argument>${downloadH2020Programme}</argument>
<argument>${programmePath}</argument>
<env-var>HADOOP_USER_NAME=${wf:user()}</env-var>
<file>download.sh</file>
<capture-output/>
</shell>
<ok to="extract_programme"/>
<error to="Kill"/>
</action>
<action name="extract_programme">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.project.utils.ExtractFromZip</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--inputPath</arg><arg>${programmePath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/downloadedProgramme/</arg>
</java>
<ok to="read_programme"/>
<error to="Kill"/>
</action>
<action name="read_programme">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.project.utils.ReadCSV</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--fileURL</arg><arg>${programmeFileURL}</arg>
<arg>--fileURL</arg><arg>${workingDir}/downloadedProgramme/csv/programme.csv</arg>
<arg>--hdfsPath</arg><arg>${workingDir}/programme</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.model.CSVProgramme</arg>
</java>
@ -78,20 +139,18 @@
<error to="Kill"/>
</action>
<action name="get_topic_file">
<action name="read_topic_file">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.project.utils.ReadExcel</main-class>
<main-class>eu.dnetlib.dhp.actionmanager.project.utils.ReadTopics</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--fileURL</arg><arg>${topicFileURL}</arg>
<arg>--hdfsPath</arg><arg>${workingDir}/topic</arg>
<arg>--sheetName</arg><arg>${sheetName}</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.model.EXCELTopic</arg>
<arg>--inputPath</arg><arg>${workingDir}/json/topics.json</arg>
<arg>--outputPath</arg><arg>${workingDir}/topic</arg>
</java>
<ok to="wait"/>
<ok to="wait_read_from_folder"/>
<error to="Kill"/>
</action>
<action name="read_projects">
<action name="read_projects_from_db">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.project.ReadProjectsFromDB</main-class>
<arg>--hdfsPath</arg><arg>${workingDir}/dbProjects</arg>
@ -125,9 +184,11 @@
<arg>--outputPath</arg><arg>${workingDir}/preparedProgramme</arg>
</spark>
<ok to="wait"/>
<!-- <ok to="End"/>-->
<error to="Kill"/>
</action>
<join name="wait_read_from_folder" to="wait_projects"/>
<join name="wait" to="create_updates"/>
<join name="wait_projects" to="prepare_project"/>
@ -154,8 +215,8 @@
<arg>--outputPath</arg><arg>${workingDir}/preparedProjects</arg>
<arg>--dbProjectPath</arg><arg>${workingDir}/dbProjects</arg>
</spark>
<!-- <ok to="wait"/>-->
<ok to="End"/>
<ok to="wait"/>
<!-- <ok to="End"/>-->
<error to="Kill"/>
</action>

View File

@ -0,0 +1,23 @@
[
{
"paramName": "ip",
"paramLongName": "inputPath",
"paramDescription": "the path where the projects are stored ",
"paramRequired": true
},
{
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "the path for the extracted folder",
"paramRequired": true
},
{
"paramName": "hnn",
"paramLongName": "hdfsNameNode",
"paramDescription": "the hdfs namenode",
"paramRequired": true
}
]