integrating pull #109, H2020Classification

This commit is contained in:
Claudio Atzori 2021-05-27 12:22:47 +02:00
commit d512062b58
9 changed files with 44 additions and 95 deletions

View File

@ -143,7 +143,6 @@ public class PrepareProgramme {
JavaRDD<CSVProgramme> h2020Programmes = programme JavaRDD<CSVProgramme> h2020Programmes = programme
.toJavaRDD() .toJavaRDD()
.filter(p -> p.getFrameworkProgramme().trim().equalsIgnoreCase("H2020"))
.mapToPair(csvProgramme -> new Tuple2<>(csvProgramme.getCode(), csvProgramme)) .mapToPair(csvProgramme -> new Tuple2<>(csvProgramme.getCode(), csvProgramme))
.reduceByKey((a, b) -> { .reduceByKey((a, b) -> {
if (!a.getLanguage().equals("en")) { if (!a.getLanguage().equals("en")) {

View File

@ -18,7 +18,6 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.utils.CSVProgramme;
import eu.dnetlib.dhp.actionmanager.project.utils.CSVProject; import eu.dnetlib.dhp.actionmanager.project.utils.CSVProject;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
@ -32,7 +31,6 @@ public class PrepareProjects {
private static final Logger log = LoggerFactory.getLogger(PrepareProgramme.class); private static final Logger log = LoggerFactory.getLogger(PrepareProgramme.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final HashMap<String, CSVProgramme> programmeMap = new HashMap<>();
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {

View File

@ -120,7 +120,6 @@ public class SparkAtomicActionJob {
.map((MapFunction<Tuple2<CSVProject, CSVProgramme>, Project>) c -> { .map((MapFunction<Tuple2<CSVProject, CSVProgramme>, Project>) c -> {
CSVProject csvProject = c._1(); CSVProject csvProject = c._1();
Optional<CSVProgramme> ocsvProgramme = Optional.ofNullable(c._2());
return Optional return Optional
.ofNullable(c._2()) .ofNullable(c._2())
@ -135,9 +134,9 @@ public class SparkAtomicActionJob {
H2020Programme pm = new H2020Programme(); H2020Programme pm = new H2020Programme();
H2020Classification h2020classification = new H2020Classification(); H2020Classification h2020classification = new H2020Classification();
pm.setCode(csvProject.getProgramme()); pm.setCode(csvProject.getProgramme());
h2020classification.setClassification(ocsvProgramme.get().getClassification()); h2020classification.setClassification(csvProgramme.getClassification());
h2020classification.setH2020Programme(pm); h2020classification.setH2020Programme(pm);
setLevelsandProgramme(h2020classification, ocsvProgramme.get().getClassification_short()); setLevelsandProgramme(h2020classification, csvProgramme.getClassification_short());
// setProgramme(h2020classification, ocsvProgramme.get().getClassification()); // setProgramme(h2020classification, ocsvProgramme.get().getClassification());
pp.setH2020classification(Arrays.asList(h2020classification)); pp.setH2020classification(Arrays.asList(h2020classification));
@ -145,10 +144,11 @@ public class SparkAtomicActionJob {
}) })
.orElse(null); .orElse(null);
}, Encoders.bean(Project.class)); }, Encoders.bean(Project.class))
.filter(Objects::nonNull);
aaproject aaproject
.joinWith(topic, aaproject.col("h2020topiccode").equalTo(topic.col("code"))) .joinWith(topic, aaproject.col("h2020topiccode").equalTo(topic.col("code")), "left")
.map((MapFunction<Tuple2<Project, EXCELTopic>, Project>) p -> { .map((MapFunction<Tuple2<Project, EXCELTopic>, Project>) p -> {
Optional<EXCELTopic> op = Optional.ofNullable(p._2()); Optional<EXCELTopic> op = Optional.ofNullable(p._2());
Project rp = p._1(); Project rp = p._1();

View File

@ -7,14 +7,7 @@ import java.io.Serializable;
* The model for the programme csv file * The model for the programme csv file
*/ */
public class CSVProgramme implements Serializable { public class CSVProgramme implements Serializable {
private String parentProgramme;
private String frameworkProgramme;
private String startDate;
private String endDate;
private String objective;
private String subjects;
private String legalBasis;
private String call;
private String rcn; private String rcn;
private String code; private String code;
@ -80,67 +73,5 @@ public class CSVProgramme implements Serializable {
this.language = language; this.language = language;
} }
public String getParentProgramme() { //
return parentProgramme;
}
public void setParentProgramme(String parentProgramme) {
this.parentProgramme = parentProgramme;
}
public String getFrameworkProgramme() {
return frameworkProgramme;
}
public void setFrameworkProgramme(String frameworkProgramme) {
this.frameworkProgramme = frameworkProgramme;
}
public String getStartDate() {
return startDate;
}
public void setStartDate(String startDate) {
this.startDate = startDate;
}
public String getEndDate() {
return endDate;
}
public void setEndDate(String endDate) {
this.endDate = endDate;
}
public String getObjective() {
return objective;
}
public void setObjective(String objective) {
this.objective = objective;
}
public String getSubjects() {
return subjects;
}
public void setSubjects(String subjects) {
this.subjects = subjects;
}
public String getLegalBasis() {
return legalBasis;
}
public void setLegalBasis(String legalBasis) {
this.legalBasis = legalBasis;
}
public String getCall() {
return call;
}
public void setCall(String call) {
this.call = call;
}
} }

View File

@ -26,7 +26,6 @@ public class EXCELParser {
throws ClassNotFoundException, IOException, IllegalAccessException, InstantiationException, throws ClassNotFoundException, IOException, IllegalAccessException, InstantiationException,
InvalidFormatException { InvalidFormatException {
// OPCPackage pkg = OPCPackage.open(httpConnector.getInputSourceAsStream(URL));
OPCPackage pkg = OPCPackage.open(file); OPCPackage pkg = OPCPackage.open(file);
XSSFWorkbook wb = new XSSFWorkbook(pkg); XSSFWorkbook wb = new XSSFWorkbook(pkg);
@ -58,7 +57,6 @@ public class EXCELParser {
for (int i = 0; i < headers.size(); i++) { for (int i = 0; i < headers.size(); i++) {
Cell cell = row.getCell(i); Cell cell = row.getCell(i);
String value = dataFormatter.formatCellValue(cell);
FieldUtils.writeField(cc, headers.get(i), dataFormatter.formatCellValue(cell), true); FieldUtils.writeField(cc, headers.get(i), dataFormatter.formatCellValue(cell), true);
} }

View File

@ -1,4 +1,4 @@
<workflow-app name="H2020Programme" xmlns="uri:oozie:workflow:0.5"> <workflow-app name="H2020Classification" xmlns="uri:oozie:workflow:0.5">
<parameters> <parameters>
<property> <property>
<name>projectFileURL</name> <name>projectFileURL</name>
@ -18,6 +18,10 @@
<name>outputPath</name> <name>outputPath</name>
<description>path where to store the action set</description> <description>path where to store the action set</description>
</property> </property>
<property>
<name>sheetName</name>
<description>the name of the sheet to read</description>
</property>
</parameters> </parameters>
<start to="deleteoutputpath"/> <start to="deleteoutputpath"/>
@ -31,10 +35,23 @@
<delete path='${workingDir}'/> <delete path='${workingDir}'/>
<mkdir path='${workingDir}'/> <mkdir path='${workingDir}'/>
</fs> </fs>
<ok to="get_project_file"/> <ok to="fork_get_info"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<fork name="fork_get_info">
<path start="fork_get_projects"/>
<path start="get_programme_file"/>
<path start="get_topic_file"/>
</fork>
<fork name="fork_get_projects">
<path start="get_project_file"/>
<path start="read_projects"/>
</fork>
<action name="get_project_file"> <action name="get_project_file">
<java> <java>
<main-class>eu.dnetlib.dhp.actionmanager.project.utils.ReadCSV</main-class> <main-class>eu.dnetlib.dhp.actionmanager.project.utils.ReadCSV</main-class>
@ -43,7 +60,7 @@
<arg>--hdfsPath</arg><arg>${workingDir}/projects</arg> <arg>--hdfsPath</arg><arg>${workingDir}/projects</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.CSVProject</arg> <arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.CSVProject</arg>
</java> </java>
<ok to="get_programme_file"/> <ok to="wait_projects"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
@ -55,7 +72,7 @@
<arg>--hdfsPath</arg><arg>${workingDir}/programme</arg> <arg>--hdfsPath</arg><arg>${workingDir}/programme</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.CSVProgramme</arg> <arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.CSVProgramme</arg>
</java> </java>
<ok to="get_topic_file"/> <ok to="prepare_programme"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
@ -68,7 +85,7 @@
<arg>--sheetName</arg><arg>${sheetName}</arg> <arg>--sheetName</arg><arg>${sheetName}</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.EXCELTopic</arg> <arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.EXCELTopic</arg>
</java> </java>
<ok to="read_projects"/> <ok to="wait"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
@ -81,7 +98,7 @@
<arg>--postgresUser</arg><arg>${postgresUser}</arg> <arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg> <arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
</java> </java>
<ok to="prepare_programme"/> <ok to="wait_projects"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
@ -105,10 +122,15 @@
<arg>--programmePath</arg><arg>${workingDir}/programme</arg> <arg>--programmePath</arg><arg>${workingDir}/programme</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedProgramme</arg> <arg>--outputPath</arg><arg>${workingDir}/preparedProgramme</arg>
</spark> </spark>
<ok to="prepare_project"/> <ok to="wait"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="wait" to="create_updates"/>
<join name="wait_projects" to="prepare_project"/>
<action name="prepare_project"> <action name="prepare_project">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
@ -130,7 +152,7 @@
<arg>--outputPath</arg><arg>${workingDir}/preparedProjects</arg> <arg>--outputPath</arg><arg>${workingDir}/preparedProjects</arg>
<arg>--dbProjectPath</arg><arg>${workingDir}/dbProjects</arg> <arg>--dbProjectPath</arg><arg>${workingDir}/dbProjects</arg>
</spark> </spark>
<ok to="create_updates"/> <ok to="wait"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>

View File

@ -20,8 +20,8 @@ import eu.dnetlib.dhp.collection.HttpConnector2;
public class EXCELParserTest { public class EXCELParserTest {
private static Path workingDir; private static Path workingDir;
private final HttpConnector2 httpConnector = new HttpConnector2(); private HttpConnector2 httpConnector = new HttpConnector2();
private static final String URL = "http://cordis.europa.eu/data/reference/cordisref-H2020topics.xlsx"; private static final String URL = "https://cordis.europa.eu/data/reference/cordisref-h2020topics.xlsx";
@BeforeAll @BeforeAll
public static void beforeAll() throws IOException { public static void beforeAll() throws IOException {
@ -35,11 +35,12 @@ public class EXCELParserTest {
EXCELParser excelParser = new EXCELParser(); EXCELParser excelParser = new EXCELParser();
final String classForName = "eu.dnetlib.dhp.actionmanager.project.utils.ExcelTopic"; List<Object> pl = excelParser
final String sheetName = "Topics"; .parse(
List<Object> pl = excelParser.parse(httpConnector.getInputSourceAsStream(URL), classForName, sheetName); httpConnector.getInputSourceAsStream(URL), "eu.dnetlib.dhp.actionmanager.project.utils.EXCELTopic",
"Topics");
Assertions.assertEquals(3837, pl.size()); Assertions.assertEquals(3878, pl.size());
} }
} }