moved parameter file. Added 40| as prefix on projects for computing the delta
This commit is contained in:
parent
097905171a
commit
5fb58362c5
|
@ -176,7 +176,10 @@ public class QueryInformationSystem {
|
||||||
for (Object node : el.selectNodes(".//param")) {
|
for (Object node : el.selectNodes(".//param")) {
|
||||||
Node n = (Node) node;
|
Node n = (Node) node;
|
||||||
if (n.valueOf("./@name").equals("openaireId")) {
|
if (n.valueOf("./@name").equals("openaireId")) {
|
||||||
return prefix + "|" + n.getText();
|
String id = n.getText();
|
||||||
|
if (id.startsWith(prefix + "|"))
|
||||||
|
return id;
|
||||||
|
return prefix + "|" + id;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,7 +58,11 @@ public class ProjectsSubsetSparkJob implements Serializable {
|
||||||
String projectListPath) {
|
String projectListPath) {
|
||||||
Dataset<String> projectList = spark.read().textFile(projectListPath);
|
Dataset<String> projectList = spark.read().textFile(projectListPath);
|
||||||
Dataset<Project> projects;
|
Dataset<Project> projects;
|
||||||
projects = Utils.readPath(spark, inputPath, Project.class);
|
projects = Utils.readPath(spark, inputPath, Project.class)
|
||||||
|
.map((MapFunction<Project, Project>) p -> {
|
||||||
|
p.setId("40|" + p.getId());
|
||||||
|
return p;
|
||||||
|
}, Encoders.bean(Project.class));
|
||||||
projects
|
projects
|
||||||
.joinWith(projectList, projects.col("id").equalTo(projectList.col("value")), "left")
|
.joinWith(projectList, projects.col("id").equalTo(projectList.col("value")), "left")
|
||||||
.map((MapFunction<Tuple2<Project, String>, Project>) t2 -> {
|
.map((MapFunction<Tuple2<Project, String>, Project>) t2 -> {
|
||||||
|
|
Loading…
Reference in New Issue