forked from D-Net/dnet-hadoop
Compare commits
No commits in common. "53b9d87fecfbdae99c964a2be71d73aedd7b556c" and "9610224671febc599a02fc094e85f4febfc2b4ad" have entirely different histories.
53b9d87fec
...
9610224671
|
@ -143,6 +143,7 @@ public class PrepareProgramme {
|
||||||
|
|
||||||
JavaRDD<CSVProgramme> h2020Programmes = programme
|
JavaRDD<CSVProgramme> h2020Programmes = programme
|
||||||
.toJavaRDD()
|
.toJavaRDD()
|
||||||
|
.filter(p -> p.getFrameworkProgramme().trim().equalsIgnoreCase("H2020"))
|
||||||
.mapToPair(csvProgramme -> new Tuple2<>(csvProgramme.getCode(), csvProgramme))
|
.mapToPair(csvProgramme -> new Tuple2<>(csvProgramme.getCode(), csvProgramme))
|
||||||
.reduceByKey((a, b) -> {
|
.reduceByKey((a, b) -> {
|
||||||
if (!a.getLanguage().equals("en")) {
|
if (!a.getLanguage().equals("en")) {
|
||||||
|
|
|
@ -32,7 +32,7 @@ public class PrepareProjects {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(PrepareProgramme.class);
|
private static final Logger log = LoggerFactory.getLogger(PrepareProgramme.class);
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
//private static final HashMap<String, CSVProgramme> programmeMap = new HashMap<>();
|
private static final HashMap<String, CSVProgramme> programmeMap = new HashMap<>();
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
|
|
@ -120,6 +120,7 @@ public class SparkAtomicActionJob {
|
||||||
.map((MapFunction<Tuple2<CSVProject, CSVProgramme>, Project>) c -> {
|
.map((MapFunction<Tuple2<CSVProject, CSVProgramme>, Project>) c -> {
|
||||||
|
|
||||||
CSVProject csvProject = c._1();
|
CSVProject csvProject = c._1();
|
||||||
|
Optional<CSVProgramme> ocsvProgramme = Optional.ofNullable(c._2());
|
||||||
|
|
||||||
return Optional
|
return Optional
|
||||||
.ofNullable(c._2())
|
.ofNullable(c._2())
|
||||||
|
@ -134,9 +135,9 @@ public class SparkAtomicActionJob {
|
||||||
H2020Programme pm = new H2020Programme();
|
H2020Programme pm = new H2020Programme();
|
||||||
H2020Classification h2020classification = new H2020Classification();
|
H2020Classification h2020classification = new H2020Classification();
|
||||||
pm.setCode(csvProject.getProgramme());
|
pm.setCode(csvProject.getProgramme());
|
||||||
h2020classification.setClassification(csvProgramme.getClassification());
|
h2020classification.setClassification(ocsvProgramme.get().getClassification());
|
||||||
h2020classification.setH2020Programme(pm);
|
h2020classification.setH2020Programme(pm);
|
||||||
setLevelsandProgramme(h2020classification, csvProgramme.getClassification_short());
|
setLevelsandProgramme(h2020classification, ocsvProgramme.get().getClassification_short());
|
||||||
// setProgramme(h2020classification, ocsvProgramme.get().getClassification());
|
// setProgramme(h2020classification, ocsvProgramme.get().getClassification());
|
||||||
pp.setH2020classification(Arrays.asList(h2020classification));
|
pp.setH2020classification(Arrays.asList(h2020classification));
|
||||||
|
|
||||||
|
@ -144,11 +145,10 @@ public class SparkAtomicActionJob {
|
||||||
})
|
})
|
||||||
.orElse(null);
|
.orElse(null);
|
||||||
|
|
||||||
}, Encoders.bean(Project.class))
|
}, Encoders.bean(Project.class));
|
||||||
.filter(Objects::nonNull);
|
|
||||||
|
|
||||||
aaproject
|
aaproject
|
||||||
.joinWith(topic, aaproject.col("h2020topiccode").equalTo(topic.col("code")), "left")
|
.joinWith(topic, aaproject.col("h2020topiccode").equalTo(topic.col("code")))
|
||||||
.map((MapFunction<Tuple2<Project, EXCELTopic>, Project>) p -> {
|
.map((MapFunction<Tuple2<Project, EXCELTopic>, Project>) p -> {
|
||||||
Optional<EXCELTopic> op = Optional.ofNullable(p._2());
|
Optional<EXCELTopic> op = Optional.ofNullable(p._2());
|
||||||
Project rp = p._1();
|
Project rp = p._1();
|
||||||
|
|
|
@ -7,7 +7,14 @@ import java.io.Serializable;
|
||||||
* The model for the programme csv file
|
* The model for the programme csv file
|
||||||
*/
|
*/
|
||||||
public class CSVProgramme implements Serializable {
|
public class CSVProgramme implements Serializable {
|
||||||
|
private String parentProgramme;
|
||||||
|
private String frameworkProgramme;
|
||||||
|
private String startDate;
|
||||||
|
private String endDate;
|
||||||
|
private String objective;
|
||||||
|
private String subjects;
|
||||||
|
private String legalBasis;
|
||||||
|
private String call;
|
||||||
private String rcn;
|
private String rcn;
|
||||||
private String code;
|
private String code;
|
||||||
|
|
||||||
|
@ -73,5 +80,67 @@ public class CSVProgramme implements Serializable {
|
||||||
this.language = language;
|
this.language = language;
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
public String getParentProgramme() {
|
||||||
|
return parentProgramme;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setParentProgramme(String parentProgramme) {
|
||||||
|
this.parentProgramme = parentProgramme;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getFrameworkProgramme() {
|
||||||
|
return frameworkProgramme;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setFrameworkProgramme(String frameworkProgramme) {
|
||||||
|
this.frameworkProgramme = frameworkProgramme;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getStartDate() {
|
||||||
|
return startDate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setStartDate(String startDate) {
|
||||||
|
this.startDate = startDate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getEndDate() {
|
||||||
|
return endDate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEndDate(String endDate) {
|
||||||
|
this.endDate = endDate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getObjective() {
|
||||||
|
return objective;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setObjective(String objective) {
|
||||||
|
this.objective = objective;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSubjects() {
|
||||||
|
return subjects;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSubjects(String subjects) {
|
||||||
|
this.subjects = subjects;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLegalBasis() {
|
||||||
|
return legalBasis;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLegalBasis(String legalBasis) {
|
||||||
|
this.legalBasis = legalBasis;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getCall() {
|
||||||
|
return call;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCall(String call) {
|
||||||
|
this.call = call;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
<workflow-app name="H2020Classification" xmlns="uri:oozie:workflow:0.5">
|
<workflow-app name="H2020Programme" xmlns="uri:oozie:workflow:0.5">
|
||||||
<parameters>
|
<parameters>
|
||||||
<property>
|
<property>
|
||||||
<name>projectFileURL</name>
|
<name>projectFileURL</name>
|
||||||
|
@ -35,17 +35,10 @@
|
||||||
<delete path='${workingDir}'/>
|
<delete path='${workingDir}'/>
|
||||||
<mkdir path='${workingDir}'/>
|
<mkdir path='${workingDir}'/>
|
||||||
</fs>
|
</fs>
|
||||||
<ok to="fork_get_info"/>
|
<ok to="get_project_file"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
|
||||||
<fork name="fork_get_info">
|
|
||||||
<path start="get_project_file"/>
|
|
||||||
<path start="get_programme_file"/>
|
|
||||||
<path start="get_topic_file"/>
|
|
||||||
<path start="read_projects"/>
|
|
||||||
</fork>
|
|
||||||
<action name="get_project_file">
|
<action name="get_project_file">
|
||||||
<java>
|
<java>
|
||||||
<main-class>eu.dnetlib.dhp.actionmanager.project.utils.ReadCSV</main-class>
|
<main-class>eu.dnetlib.dhp.actionmanager.project.utils.ReadCSV</main-class>
|
||||||
|
@ -54,7 +47,7 @@
|
||||||
<arg>--hdfsPath</arg><arg>${workingDir}/projects</arg>
|
<arg>--hdfsPath</arg><arg>${workingDir}/projects</arg>
|
||||||
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.CSVProject</arg>
|
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.CSVProject</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="wait"/>
|
<ok to="get_programme_file"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
@ -66,7 +59,7 @@
|
||||||
<arg>--hdfsPath</arg><arg>${workingDir}/programme</arg>
|
<arg>--hdfsPath</arg><arg>${workingDir}/programme</arg>
|
||||||
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.CSVProgramme</arg>
|
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.CSVProgramme</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="wait"/>
|
<ok to="get_topic_file"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
@ -79,7 +72,7 @@
|
||||||
<arg>--sheetName</arg><arg>${sheetName}</arg>
|
<arg>--sheetName</arg><arg>${sheetName}</arg>
|
||||||
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.EXCELTopic</arg>
|
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.EXCELTopic</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="wait"/>
|
<ok to="read_projects"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
@ -92,12 +85,10 @@
|
||||||
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
|
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
|
||||||
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="wait"/>
|
<ok to="prepare_programme"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<join name="wait" to="prepare_programme"/>
|
|
||||||
|
|
||||||
<action name="prepare_programme">
|
<action name="prepare_programme">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
|
|
Loading…
Reference in New Issue