forked from D-Net/dnet-hadoop
added preparation classes
This commit is contained in:
parent
f0f14caf99
commit
9447d78ef3
|
@ -1,53 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.actionmanager.project;
|
|
||||||
|
|
||||||
import java.io.*;
|
|
||||||
import java.net.URL;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
|
|
||||||
public class GetFile {
|
|
||||||
|
|
||||||
private static final Log log = LogFactory.getLog(GetFile.class);
|
|
||||||
|
|
||||||
public static void main(final String[] args) throws Exception {
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
|
||||||
IOUtils
|
|
||||||
.toString(
|
|
||||||
GetFile.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/actionmanager/project/parameters.json")));
|
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
|
|
||||||
parser.parseArgument(args);
|
|
||||||
|
|
||||||
final String fileURL = parser.get("fileURL");
|
|
||||||
final String hdfsPath = parser.get("hdfsPath");
|
|
||||||
final String hdfsNameNode = parser.get("hdfsNameNode");
|
|
||||||
|
|
||||||
conf.set("fs.defaultFS", hdfsNameNode);
|
|
||||||
FileSystem fileSystem = FileSystem.get(conf);
|
|
||||||
Path hdfsWritePath = new Path(hdfsPath);
|
|
||||||
FSDataOutputStream fsDataOutputStream = null;
|
|
||||||
if (fileSystem.exists(hdfsWritePath)) {
|
|
||||||
fsDataOutputStream = fileSystem.append(hdfsWritePath);
|
|
||||||
} else {
|
|
||||||
fsDataOutputStream = fileSystem.create(hdfsWritePath);
|
|
||||||
}
|
|
||||||
|
|
||||||
InputStream is = new BufferedInputStream(new URL(fileURL).openStream());
|
|
||||||
|
|
||||||
org.apache.hadoop.io.IOUtils.copyBytes(is, fsDataOutputStream, 4096, true);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -0,0 +1,124 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.project;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.sql.*;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme;
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
public class PrepareProgramme {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(PrepareProgramme.class);
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
private static final HashMap<String, CSVProgramme> programmeMap = new HashMap<>();
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
String jsonConfiguration = IOUtils
|
||||||
|
.toString(
|
||||||
|
PrepareProgramme.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/project/prepare_programme_parameters.json"));
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
|
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
final String programmePath = parser.get("programmePath");
|
||||||
|
log.info("programmePath {}: ", programmePath);
|
||||||
|
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath {}: ", outputPath);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
runWithSparkSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> {
|
||||||
|
removeOutputDir(spark, outputPath);
|
||||||
|
exec(spark, programmePath, outputPath);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void removeOutputDir(SparkSession spark, String path) {
|
||||||
|
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void exec(SparkSession spark, String programmePath, String outputPath) {
|
||||||
|
Dataset<CSVProgramme> programme = readPath(spark, programmePath, CSVProgramme.class);
|
||||||
|
|
||||||
|
programme
|
||||||
|
.toJavaRDD()
|
||||||
|
.filter(p -> !p.getCode().contains("FP7"))
|
||||||
|
.mapToPair(csvProgramme -> new Tuple2<>(csvProgramme.getCode(), csvProgramme))
|
||||||
|
.reduceByKey((a, b) -> {
|
||||||
|
if (StringUtils.isEmpty(a.getShortTitle())) {
|
||||||
|
if (StringUtils.isEmpty(b.getShortTitle())) {
|
||||||
|
if (StringUtils.isEmpty(a.getTitle())) {
|
||||||
|
if (StringUtils.isNotEmpty(b.getTitle())) {
|
||||||
|
a.setShortTitle(b.getTitle());
|
||||||
|
a.setLanguage(b.getLanguage());
|
||||||
|
}
|
||||||
|
} else {// notIsEmpty a.getTitle
|
||||||
|
if (StringUtils.isEmpty(b.getTitle())) {
|
||||||
|
a.setShortTitle(a.getTitle());
|
||||||
|
} else {
|
||||||
|
if (b.getLanguage().equalsIgnoreCase("en")) {
|
||||||
|
a.setShortTitle(b.getTitle());
|
||||||
|
a.setLanguage(b.getLanguage());
|
||||||
|
} else {
|
||||||
|
a.setShortTitle(a.getTitle());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {// not isEmpty b.getShortTitle
|
||||||
|
a.setShortTitle(b.getShortTitle());
|
||||||
|
// a.setLanguage(b.getLanguage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return a;
|
||||||
|
|
||||||
|
})
|
||||||
|
.map(p -> {
|
||||||
|
CSVProgramme csvProgramme = p._2();
|
||||||
|
if (StringUtils.isEmpty(csvProgramme.getShortTitle())) {
|
||||||
|
csvProgramme.setShortTitle(csvProgramme.getTitle());
|
||||||
|
}
|
||||||
|
return OBJECT_MAPPER.writeValueAsString(csvProgramme);
|
||||||
|
})
|
||||||
|
.saveAsTextFile(outputPath);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <R> Dataset<R> readPath(
|
||||||
|
SparkSession spark, String inputPath, Class<R> clazz) {
|
||||||
|
return spark
|
||||||
|
.read()
|
||||||
|
.textFile(inputPath)
|
||||||
|
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,139 +1,108 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.actionmanager.project;
|
package eu.dnetlib.dhp.actionmanager.project;
|
||||||
|
|
||||||
import java.io.BufferedWriter;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
import java.io.Closeable;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.OutputStreamWriter;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.sql.ResultSet;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import org.apache.commons.csv.CSVParser;
|
import java.util.ArrayList;
|
||||||
import org.apache.commons.csv.CSVFormat;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme;
|
||||||
|
import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProject;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
import eu.dnetlib.dhp.schema.common.RelationInverse;
|
import scala.Tuple2;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
|
||||||
|
|
||||||
public class PrepareProjects implements Closeable {
|
public class PrepareProjects {
|
||||||
private static final Log log = LogFactory.getLog(PrepareProjects.class);
|
|
||||||
private final Configuration conf;
|
|
||||||
private final BufferedWriter writer;
|
|
||||||
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
|
||||||
private final HttpConnector httpConnector;
|
|
||||||
|
|
||||||
public static void main(final String[] args) throws Exception {
|
private static final Logger log = LoggerFactory.getLogger(PrepareProgramme.class);
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
IOUtils
|
private static final HashMap<String, CSVProgramme> programmeMap = new HashMap<>();
|
||||||
.toString(
|
|
||||||
PrepareProjects.class
|
public static void main(String[] args) throws Exception {
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/actionmanager/project/parameters.json")));
|
String jsonConfiguration = IOUtils
|
||||||
|
.toString(
|
||||||
|
PrepareProjects.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/project/prepare_project_parameters.json"));
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
|
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
final String fileURL = parser.get("fileURL");
|
Boolean isSparkSessionManaged = Optional
|
||||||
final String hdfsPath = parser.get("hdfsPath");
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
final String hdfsNameNode = parser.get("hdfsNameNode");
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
|
||||||
try (final PrepareProjects prepareProjects = new PrepareProjects(hdfsPath, hdfsNameNode)) {
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
log.info("Getting projects...");
|
final String projectPath = parser.get("projectPath");
|
||||||
prepareProjects.execute(fileURL);
|
log.info("projectPath {}: ", projectPath);
|
||||||
|
|
||||||
}
|
final String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath {}: ", outputPath);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
runWithSparkSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> {
|
||||||
|
removeOutputDir(spark, outputPath);
|
||||||
|
exec(spark, projectPath, outputPath);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void execute(final String fileURL) throws Exception {
|
private static void removeOutputDir(SparkSession spark, String path) {
|
||||||
|
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||||
String projects = httpConnector.getInputSource(fileURL);
|
|
||||||
final CSVFormat format = CSVFormat.EXCEL
|
|
||||||
.withHeader()
|
|
||||||
.withDelimiter(';')
|
|
||||||
.withQuote('"')
|
|
||||||
.withTrim();
|
|
||||||
final CSVParser parser = CSVParser.parse(projects, format);
|
|
||||||
final Set<String> headers = parser.getHeaderMap().keySet();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<Relation> processBlacklistEntry(ResultSet rs) {
|
private static void exec(SparkSession spark, String progjectPath, String outputPath) {
|
||||||
try {
|
Dataset<CSVProject> project = readPath(spark, progjectPath, CSVProject.class);
|
||||||
Relation direct = new Relation();
|
|
||||||
Relation inverse = new Relation();
|
|
||||||
|
|
||||||
String source_prefix = ModelSupport.entityIdPrefix.get(rs.getString("source_type"));
|
project
|
||||||
String target_prefix = ModelSupport.entityIdPrefix.get(rs.getString("target_type"));
|
.toJavaRDD()
|
||||||
|
.flatMap(p -> {
|
||||||
|
List<CSVProject> csvProjectList = new ArrayList<>();
|
||||||
|
String[] programme = p.getProgramme().split(";");
|
||||||
|
if (programme.length > 1) {
|
||||||
|
for (int i = 0; i < programme.length; i++) {
|
||||||
|
CSVProject csvProject = new CSVProject();
|
||||||
|
csvProject.setProgramme(programme[i]);
|
||||||
|
csvProjectList.add(csvProject);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
csvProjectList.add(p);
|
||||||
|
}
|
||||||
|
|
||||||
String source_direct = source_prefix + "|" + rs.getString("source");
|
return csvProjectList.iterator();
|
||||||
direct.setSource(source_direct);
|
})
|
||||||
inverse.setTarget(source_direct);
|
.map(p -> OBJECT_MAPPER.writeValueAsString(p))
|
||||||
|
.saveAsTextFile(outputPath);
|
||||||
String target_direct = target_prefix + "|" + rs.getString("target");
|
|
||||||
direct.setTarget(target_direct);
|
|
||||||
inverse.setSource(target_direct);
|
|
||||||
|
|
||||||
String encoding = rs.getString("relationship");
|
|
||||||
RelationInverse ri = ModelSupport.relationInverseMap.get(encoding);
|
|
||||||
direct.setRelClass(ri.getRelation());
|
|
||||||
inverse.setRelClass(ri.getInverse());
|
|
||||||
direct.setRelType(ri.getRelType());
|
|
||||||
inverse.setRelType(ri.getRelType());
|
|
||||||
direct.setSubRelType(ri.getSubReltype());
|
|
||||||
inverse.setSubRelType(ri.getSubReltype());
|
|
||||||
|
|
||||||
return Arrays.asList(direct, inverse);
|
|
||||||
|
|
||||||
} catch (final Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
writer.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
public PrepareProjects(
|
|
||||||
final String hdfsPath, String hdfsNameNode)
|
|
||||||
throws Exception {
|
|
||||||
|
|
||||||
this.conf = new Configuration();
|
|
||||||
this.conf.set("fs.defaultFS", hdfsNameNode);
|
|
||||||
this.httpConnector = new HttpConnector();
|
|
||||||
FileSystem fileSystem = FileSystem.get(this.conf);
|
|
||||||
Path hdfsWritePath = new Path(hdfsPath);
|
|
||||||
FSDataOutputStream fsDataOutputStream = null;
|
|
||||||
if (fileSystem.exists(hdfsWritePath)) {
|
|
||||||
fsDataOutputStream = fileSystem.append(hdfsWritePath);
|
|
||||||
} else {
|
|
||||||
fsDataOutputStream = fileSystem.create(hdfsWritePath);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void writeRelation(final Relation r) {
|
public static <R> Dataset<R> readPath(
|
||||||
try {
|
SparkSession spark, String inputPath, Class<R> clazz) {
|
||||||
writer.write(OBJECT_MAPPER.writeValueAsString(r));
|
return spark
|
||||||
writer.newLine();
|
.read()
|
||||||
} catch (final Exception e) {
|
.textFile(inputPath)
|
||||||
throw new RuntimeException(e);
|
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,22 +3,35 @@ package eu.dnetlib.dhp.actionmanager.project;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme;
|
||||||
|
import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProject;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Programme;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||||
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||||
|
|
||||||
public class SparkAtomicActionJob {
|
public class SparkAtomicActionJob {
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionJob.class);
|
private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionJob.class);
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
private static final HashMap<String, String> programmeMap = new HashMap<>();
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
@ -67,8 +80,54 @@ public class SparkAtomicActionJob {
|
||||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void getAtomicActions(SparkSession spark, String projectPatj, String programmePath,
|
private static void getAtomicActions(SparkSession spark, String projectPatH,
|
||||||
|
String programmePath,
|
||||||
String outputPath) {
|
String outputPath) {
|
||||||
|
|
||||||
|
Dataset<CSVProject> project = readPath(spark, projectPatH, CSVProject.class);
|
||||||
|
Dataset<CSVProgramme> programme = readPath(spark, programmePath, CSVProgramme.class);
|
||||||
|
|
||||||
|
project
|
||||||
|
.joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left")
|
||||||
|
.map(c -> {
|
||||||
|
CSVProject csvProject = c._1();
|
||||||
|
Optional<CSVProgramme> csvProgramme = Optional.ofNullable(c._2());
|
||||||
|
if (csvProgramme.isPresent()) {
|
||||||
|
Project p = new Project();
|
||||||
|
p
|
||||||
|
.setId(
|
||||||
|
createOpenaireId(
|
||||||
|
ModelSupport.entityIdPrefix.get("project"),
|
||||||
|
"corda__h2020", csvProject.getId()));
|
||||||
|
Programme pm = new Programme();
|
||||||
|
pm.setCode(csvProject.getProgramme());
|
||||||
|
pm.setDescription(csvProgramme.get().getShortTitle());
|
||||||
|
p.setProgramme(Arrays.asList(pm));
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}, Encoders.bean(Project.class))
|
||||||
|
.filter(p -> !(p == null))
|
||||||
|
// .map(p -> new AtomicAction<>(Project.class, p), Encoders.bean(AtomicAction.class))
|
||||||
|
.write()
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.json(outputPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <R> Dataset<R> readPath(
|
||||||
|
SparkSession spark, String inputPath, Class<R> clazz) {
|
||||||
|
return spark
|
||||||
|
.read()
|
||||||
|
.textFile(inputPath)
|
||||||
|
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String createOpenaireId(
|
||||||
|
final String prefix, final String nsPrefix, final String id) {
|
||||||
|
|
||||||
|
return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(id));
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.project.csvutils;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.commons.csv.CSVFormat;
|
||||||
|
import org.apache.commons.csv.CSVRecord;
|
||||||
|
import org.apache.commons.lang.reflect.FieldUtils;
|
||||||
|
|
||||||
|
public class CSVParser {
|
||||||
|
|
||||||
|
public <R> List<R> parse(String csvFile, String classForName)
|
||||||
|
throws ClassNotFoundException, IOException, IllegalAccessException, InstantiationException {
|
||||||
|
final CSVFormat format = CSVFormat.EXCEL
|
||||||
|
.withHeader()
|
||||||
|
.withDelimiter(';')
|
||||||
|
.withQuote('"')
|
||||||
|
.withTrim();
|
||||||
|
List<R> ret = new ArrayList<>();
|
||||||
|
final org.apache.commons.csv.CSVParser parser = org.apache.commons.csv.CSVParser.parse(csvFile, format);
|
||||||
|
final Set<String> headers = parser.getHeaderMap().keySet();
|
||||||
|
Class<?> clazz = Class.forName(classForName);
|
||||||
|
for (CSVRecord csvRecord : parser.getRecords()) {
|
||||||
|
final Object cc = clazz.newInstance();
|
||||||
|
for (String header : headers) {
|
||||||
|
FieldUtils.writeField(cc, header, csvRecord.get(header), true);
|
||||||
|
|
||||||
|
}
|
||||||
|
ret.add((R) cc);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,9 +1,9 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.actionmanager.project;
|
package eu.dnetlib.dhp.actionmanager.project.csvutils;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
public class Programme implements Serializable {
|
public class CSVProgramme implements Serializable {
|
||||||
private String rcn;
|
private String rcn;
|
||||||
private String code;
|
private String code;
|
||||||
private String title;
|
private String title;
|
|
@ -1,9 +1,9 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.actionmanager.project;
|
package eu.dnetlib.dhp.actionmanager.project.csvutils;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
public class Project implements Serializable {
|
public class CSVProject implements Serializable {
|
||||||
private String rcn;
|
private String rcn;
|
||||||
private String id;
|
private String id;
|
||||||
private String acronym;
|
private String acronym;
|
||||||
|
@ -193,4 +193,5 @@ public class Project implements Serializable {
|
||||||
public void setSubjects(String subjects) {
|
public void setSubjects(String subjects) {
|
||||||
this.subjects = subjects;
|
this.subjects = subjects;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -0,0 +1,98 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.project.csvutils;
|
||||||
|
|
||||||
|
import java.io.BufferedWriter;
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStreamWriter;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.actionmanager.project.httpconnector.HttpConnector;
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
|
||||||
|
public class ReadCSV implements Closeable {
|
||||||
|
private static final Log log = LogFactory.getLog(ReadCSV.class);
|
||||||
|
private final Configuration conf;
|
||||||
|
private final BufferedWriter writer;
|
||||||
|
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
private String csvFile;
|
||||||
|
|
||||||
|
public static void main(final String[] args) throws Exception {
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
ReadCSV.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/project/parameters.json")));
|
||||||
|
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
final String fileURL = parser.get("fileURL");
|
||||||
|
final String hdfsPath = parser.get("hdfsPath");
|
||||||
|
final String hdfsNameNode = parser.get("hdfsNameNode");
|
||||||
|
final String classForName = parser.get("classForName");
|
||||||
|
|
||||||
|
try (final ReadCSV readCSV = new ReadCSV(hdfsPath, hdfsNameNode, fileURL)) {
|
||||||
|
|
||||||
|
log.info("Getting CSV file...");
|
||||||
|
readCSV.execute(classForName);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void execute(final String classForName) throws Exception {
|
||||||
|
CSVParser csvParser = new CSVParser();
|
||||||
|
csvParser
|
||||||
|
.parse(csvFile, classForName)
|
||||||
|
.stream()
|
||||||
|
.forEach(p -> write(p));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
writer.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ReadCSV(
|
||||||
|
final String hdfsPath,
|
||||||
|
final String hdfsNameNode,
|
||||||
|
final String fileURL)
|
||||||
|
throws Exception {
|
||||||
|
this.conf = new Configuration();
|
||||||
|
this.conf.set("fs.defaultFS", hdfsNameNode);
|
||||||
|
HttpConnector httpConnector = new HttpConnector();
|
||||||
|
FileSystem fileSystem = FileSystem.get(this.conf);
|
||||||
|
Path hdfsWritePath = new Path(hdfsPath);
|
||||||
|
FSDataOutputStream fsDataOutputStream = null;
|
||||||
|
if (fileSystem.exists(hdfsWritePath)) {
|
||||||
|
fsDataOutputStream = fileSystem.append(hdfsWritePath);
|
||||||
|
} else {
|
||||||
|
fsDataOutputStream = fileSystem.create(hdfsWritePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
|
||||||
|
this.csvFile = httpConnector.getInputSource(fileURL);
|
||||||
|
;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void write(final Object p) {
|
||||||
|
try {
|
||||||
|
writer.write(OBJECT_MAPPER.writeValueAsString(p));
|
||||||
|
writer.newLine();
|
||||||
|
} catch (final Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.actionmanager.project;
|
package eu.dnetlib.dhp.actionmanager.project.httpconnector;
|
||||||
|
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.actionmanager.project;
|
package eu.dnetlib.dhp.actionmanager.project.httpconnector;
|
||||||
|
|
||||||
public class CollectorServiceException extends Exception {
|
public class CollectorServiceException extends Exception {
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.actionmanager.project;
|
package eu.dnetlib.dhp.actionmanager.project.httpconnector;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
|
@ -0,0 +1,26 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "issm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "when true will stop SparkSession after job execution",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "pjp",
|
||||||
|
"paramLongName": "projectPath",
|
||||||
|
"paramDescription": "the URL from where to get the projects file",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "pp",
|
||||||
|
"paramLongName": "programmePath",
|
||||||
|
"paramDescription": "the URL from where to get the programme file",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "o",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the path of the new ActionSet",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,20 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "issm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "when true will stop SparkSession after job execution",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "pp",
|
||||||
|
"paramLongName": "programmePath",
|
||||||
|
"paramDescription": "the URL from where to get the programme file",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "o",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the path of the new ActionSet",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,43 @@
|
||||||
|
package eu.dnetlib.dhp.actionmanager.project;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
|
||||||
|
public class ReadCSVTest {
|
||||||
|
|
||||||
|
private static Path workingDir;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void beforeAll() throws IOException {
|
||||||
|
workingDir = Files.createTempDirectory(eu.dnetlib.dhp.actionmanager.project.ReadCSVTest.class.getSimpleName());
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
public void readProgrammeTest() throws Exception {
|
||||||
|
|
||||||
|
String programmecsv = IOUtils.toString(getClass()
|
||||||
|
.getClassLoader().getResourceAsStream("eu/dnetlib/dhp/actionmanager/project/programme.csv"));
|
||||||
|
ReadCSV
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-fileURL",
|
||||||
|
"http://cordis.europa.eu/data/reference/cordisref-H2020programmes.csv",
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/project",
|
||||||
|
"-hdfsPath",
|
||||||
|
getClass().getResource("/eu/dnetlib/dhp/blacklist/blacklist").getPath(),
|
||||||
|
"-mergesPath",
|
||||||
|
getClass().getResource("/eu/dnetlib/dhp/blacklist/mergesRelOneMerge").getPath(),
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,4 @@
|
||||||
|
package eu.dnetlib.dhp.actionmanager.project;
|
||||||
|
|
||||||
|
public class PrepareProgrammeTest {
|
||||||
|
}
|
|
@ -0,0 +1,4 @@
|
||||||
|
package eu.dnetlib.dhp.actionmanager.project;
|
||||||
|
|
||||||
|
public class SparkUpdateProjectSet {
|
||||||
|
}
|
|
@ -0,0 +1,4 @@
|
||||||
|
package eu.dnetlib.dhp.actionmanager.project.httpconnector;
|
||||||
|
|
||||||
|
public class HttpConnectorTest {
|
||||||
|
}
|
|
Loading…
Reference in New Issue