forked from D-Net/dnet-hadoop
Merge branch 'beta' into datasource_model_eosc_beta
commit
931f430129
@ -1,2 +1,2 @@
|
||||
# dnet-hadoop
|
||||
Dnet-hadoop is a tool for
|
||||
Dnet-hadoop is the project that defined all the OOZIE workflows for the OpenAIRE Graph construction, processing, provisioning.
|
@ -1,14 +0,0 @@
|
||||
|
||||
package eu.dnetlib.dhp.application;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
public class ApplicationUtils {
|
||||
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
|
||||
package eu.dnetlib.dhp.collection;
|
||||
package eu.dnetlib.dhp.common.collection;
|
||||
|
||||
public class CollectorException extends Exception {
|
||||
|
@ -0,0 +1,56 @@
|
||||
|
||||
package eu.dnetlib.dhp.common.collection;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.opencsv.bean.CsvToBeanBuilder;
|
||||
|
||||
public class GetCSV {
|
||||
|
||||
public static final char DEFAULT_DELIMITER = ',';
|
||||
|
||||
private GetCSV() {
|
||||
}
|
||||
|
||||
public static void getCsv(FileSystem fileSystem, BufferedReader reader, String hdfsPath,
|
||||
String modelClass) throws IOException, ClassNotFoundException {
|
||||
getCsv(fileSystem, reader, hdfsPath, modelClass, DEFAULT_DELIMITER);
|
||||
}
|
||||
|
||||
public static void getCsv(FileSystem fileSystem, Reader reader, String hdfsPath,
|
||||
String modelClass, char delimiter) throws IOException, ClassNotFoundException {
|
||||
|
||||
Path hdfsWritePath = new Path(hdfsPath);
|
||||
FSDataOutputStream fsDataOutputStream = null;
|
||||
if (fileSystem.exists(hdfsWritePath)) {
|
||||
fileSystem.delete(hdfsWritePath, false);
|
||||
}
|
||||
fsDataOutputStream = fileSystem.create(hdfsWritePath);
|
||||
|
||||
try (BufferedWriter writer = new BufferedWriter(
|
||||
new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8))) {
|
||||
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
final List lines = new CsvToBeanBuilder(reader)
|
||||
.withType(Class.forName(modelClass))
|
||||
.withSeparator(delimiter)
|
||||
.build()
|
||||
.parse();
|
||||
|
||||
for (Object line : lines) {
|
||||
writer.write(mapper.writeValueAsString(line));
|
||||
writer.newLine();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
|
||||
package eu.dnetlib.dhp.collection;
|
||||
package eu.dnetlib.dhp.common.collection;
|
||||
|
||||
/**
|
||||
* Bundles the http connection parameters driving the client behaviour.
|
@ -1,40 +0,0 @@
|
||||
|
||||
package eu.dnetlib.dhp.actionmanager.project.utils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.csv.CSVFormat;
|
||||
import org.apache.commons.csv.CSVRecord;
|
||||
import org.apache.commons.lang.reflect.FieldUtils;
|
||||
|
||||
/**
|
||||
* Reads a generic csv and maps it into classes that mirror its schema
|
||||
*/
|
||||
public class CSVParser {
|
||||
|
||||
public <R> List<R> parse(String csvFile, String classForName)
|
||||
throws ClassNotFoundException, IOException, IllegalAccessException, InstantiationException {
|
||||
final CSVFormat format = CSVFormat.EXCEL
|
||||
.withHeader()
|
||||
.withDelimiter(';')
|
||||
.withQuote('"')
|
||||
.withTrim();
|
||||
List<R> ret = new ArrayList<>();
|
||||
final org.apache.commons.csv.CSVParser parser = org.apache.commons.csv.CSVParser.parse(csvFile, format);
|
||||
final Set<String> headers = parser.getHeaderMap().keySet();
|
||||
Class<?> clazz = Class.forName(classForName);
|
||||
for (CSVRecord csvRecord : parser.getRecords()) {
|
||||
final Object cc = clazz.newInstance();
|
||||
for (String header : headers) {
|
||||
FieldUtils.writeField(cc, header, csvRecord.get(header), true);
|
||||
|
||||
}
|
||||
ret.add((R) cc);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
}
|
@ -1,200 +0,0 @@
|
||||
|
||||
package eu.dnetlib.dhp.actionmanager.project.utils;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* the mmodel for the projects csv file
|
||||
*/
|
||||
public class CSVProject implements Serializable {
|
||||
private String rcn;
|
||||
private String id;
|
||||
private String acronym;
|
||||
private String status;
|
||||
private String programme;
|
||||
private String topics;
|
||||
private String frameworkProgramme;
|
||||
private String title;
|
||||
private String startDate;
|
||||
private String endDate;
|
||||
private String projectUrl;
|
||||
private String objective;
|
||||
private String totalCost;
|
||||
private String ecMaxContribution;
|
||||
private String call;
|
||||
private String fundingScheme;
|
||||
private String coordinator;
|
||||
private String coordinatorCountry;
|
||||
private String participants;
|
||||
private String participantCountries;
|
||||
private String subjects;
|
||||
|
||||
public String getRcn() {
|
||||
return rcn;
|
||||
}
|
||||
|
||||
public void setRcn(String rcn) {
|
||||
this.rcn = rcn;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public String getAcronym() {
|
||||
return acronym;
|
||||
}
|
||||
|
||||
public void setAcronym(String acronym) {
|
||||
this.acronym = acronym;
|
||||
}
|
||||
|
||||
public String getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public void setStatus(String status) {
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
public String getProgramme() {
|
||||
return programme;
|
||||
}
|
||||
|
||||
public void setProgramme(String programme) {
|
||||
this.programme = programme;
|
||||
}
|
||||
|
||||
public String getTopics() {
|
||||
return topics;
|
||||
}
|
||||
|
||||
public void setTopics(String topics) {
|
||||
this.topics = topics;
|
||||
}
|
||||
|
||||
public String getFrameworkProgramme() {
|
||||
return frameworkProgramme;
|
||||
}
|
||||
|
||||
public void setFrameworkProgramme(String frameworkProgramme) {
|
||||
this.frameworkProgramme = frameworkProgramme;
|
||||
}
|
||||
|
||||
public String getTitle() {
|
||||
return title;
|
||||
}
|
||||
|
||||
public void setTitle(String title) {
|
||||
this.title = title;
|
||||
}
|
||||
|
||||
public String getStartDate() {
|
||||
return startDate;
|
||||
}
|
||||
|
||||
public void setStartDate(String startDate) {
|
||||
this.startDate = startDate;
|
||||
}
|
||||
|
||||
public String getEndDate() {
|
||||
return endDate;
|
||||
}
|
||||
|
||||
public void setEndDate(String endDate) {
|
||||
this.endDate = endDate;
|
||||
}
|
||||
|
||||
public String getProjectUrl() {
|
||||
return projectUrl;
|
||||
}
|
||||
|
||||
public void setProjectUrl(String projectUrl) {
|
||||
this.projectUrl = projectUrl;
|
||||
}
|
||||
|
||||
public String getObjective() {
|
||||
return objective;
|
||||
}
|
||||
|
||||
public void setObjective(String objective) {
|
||||
this.objective = objective;
|
||||
}
|
||||
|
||||
public String getTotalCost() {
|
||||
return totalCost;
|
||||
}
|
||||
|
||||
public void setTotalCost(String totalCost) {
|
||||
this.totalCost = totalCost;
|
||||
}
|
||||
|
||||
public String getEcMaxContribution() {
|
||||
return ecMaxContribution;
|
||||
}
|
||||
|
||||
public void setEcMaxContribution(String ecMaxContribution) {
|
||||
this.ecMaxContribution = ecMaxContribution;
|
||||
}
|
||||
|
||||
public String getCall() {
|
||||
return call;
|
||||
}
|
||||
|
||||
public void setCall(String call) {
|
||||
this.call = call;
|
||||
}
|
||||
|
||||
public String getFundingScheme() {
|
||||
return fundingScheme;
|
||||
}
|
||||
|
||||
public void setFundingScheme(String fundingScheme) {
|
||||
this.fundingScheme = fundingScheme;
|
||||
}
|
||||
|
||||
public String getCoordinator() {
|
||||
return coordinator;
|
||||
}
|
||||
|
||||
public void setCoordinator(String coordinator) {
|
||||
this.coordinator = coordinator;
|
||||
}
|
||||
|
||||
public String getCoordinatorCountry() {
|
||||
return coordinatorCountry;
|
||||
}
|
||||
|
||||
public void setCoordinatorCountry(String coordinatorCountry) {
|
||||
this.coordinatorCountry = coordinatorCountry;
|
||||
}
|
||||
|
||||
public String getParticipants() {
|
||||
return participants;
|
||||
}
|
||||
|
||||
public void setParticipants(String participants) {
|
||||
this.participants = participants;
|
||||
}
|
||||
|
||||
public String getParticipantCountries() {
|
||||
return participantCountries;
|
||||
}
|
||||
|
||||
public void setParticipantCountries(String participantCountries) {
|
||||
this.participantCountries = participantCountries;
|
||||
}
|
||||
|
||||
public String getSubjects() {
|
||||
return subjects;
|
||||
}
|
||||
|
||||
public void setSubjects(String subjects) {
|
||||
this.subjects = subjects;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,46 @@
|
||||
|
||||
package eu.dnetlib.dhp.actionmanager.project.utils.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
import com.opencsv.bean.CsvBindByName;
|
||||
|
||||
/**
|
||||
* the mmodel for the projects csv file
|
||||
*/
|
||||
public class CSVProject implements Serializable {
|
||||
|
||||
@CsvBindByName(column = "id")
|
||||
private String id;
|
||||
|
||||
@CsvBindByName(column = "programme")
|
||||
private String programme;
|
||||
|
||||
@CsvBindByName(column = "topics")
|
||||
private String topics;
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public String getProgramme() {
|
||||
return programme;
|
||||
}
|
||||
|
||||
public void setProgramme(String programme) {
|
||||
this.programme = programme;
|
||||
}
|
||||
|
||||
public String getTopics() {
|
||||
return topics;
|
||||
}
|
||||
|
||||
public void setTopics(String topics) {
|
||||
this.topics = topics;
|
||||
}
|
||||
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
|
||||
package eu.dnetlib.dhp.actionmanager.project.utils;
|
||||
package eu.dnetlib.dhp.actionmanager.project.utils.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
@ -0,0 +1,73 @@
|
||||
package eu.dnetlib.dhp.actionmanager.scholix
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result}
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.sql._
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import scala.io.Source
|
||||
|
||||
object SparkCreateActionset {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val log: Logger = LoggerFactory.getLogger(getClass)
|
||||
val conf: SparkConf = new SparkConf()
|
||||
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/actionset/generate_actionset.json")).mkString)
|
||||
parser.parseArgument(args)
|
||||
|
||||
|
||||
val spark: SparkSession =
|
||||
SparkSession
|
||||
.builder()
|
||||
.config(conf)
|
||||
.appName(getClass.getSimpleName)
|
||||
.master(parser.get("master")).getOrCreate()
|
||||
|
||||
|
||||
val sourcePath = parser.get("sourcePath")
|
||||
log.info(s"sourcePath -> $sourcePath")
|
||||
|
||||
val targetPath = parser.get("targetPath")
|
||||
log.info(s"targetPath -> $targetPath")
|
||||
|
||||
val workingDirFolder = parser.get("workingDirFolder")
|
||||
log.info(s"workingDirFolder -> $workingDirFolder")
|
||||
|
||||
implicit val oafEncoders: Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||
implicit val resultEncoders: Encoder[Result] = Encoders.kryo[Result]
|
||||
implicit val relationEncoders: Encoder[Relation] = Encoders.kryo[Relation]
|
||||
|
||||
import spark.implicits._
|
||||
|
||||
val relation = spark.read.load(s"$sourcePath/relation").as[Relation]
|
||||
|
||||
relation.filter(r => (r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
|
||||
.flatMap(r => List(r.getSource, r.getTarget)).distinct().write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/id_relation")
|
||||
|
||||
|
||||
val idRelation = spark.read.load(s"$workingDirFolder/id_relation").as[String]
|
||||
|
||||
log.info("extract source and target Identifier involved in relations")
|
||||
|
||||
|
||||
log.info("save relation filtered")
|
||||
|
||||
relation.filter(r => (r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
|
||||
.write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/actionSetOaf")
|
||||
|
||||
log.info("saving entities")
|
||||
|
||||
val entities: Dataset[(String, Result)] = spark.read.load(s"$sourcePath/entities/*").as[Result].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING, resultEncoders))
|
||||
|
||||
|
||||
entities.filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result])
|
||||
entities
|
||||
.joinWith(idRelation, entities("_1").equalTo(idRelation("value")))
|
||||
.map(p => p._1._2)
|
||||
.write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf")
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,86 @@
|
||||
package eu.dnetlib.dhp.actionmanager.scholix
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction
|
||||
import eu.dnetlib.dhp.schema.oaf.{Oaf, Dataset => OafDataset,Publication, Software, OtherResearchProduct, Relation}
|
||||
import org.apache.hadoop.io.Text
|
||||
import org.apache.hadoop.io.compress.GzipCodec
|
||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import scala.io.Source
|
||||
|
||||
object SparkSaveActionSet {
|
||||
|
||||
|
||||
def toActionSet(item: Oaf): (String, String) = {
|
||||
val mapper = new ObjectMapper()
|
||||
|
||||
item match {
|
||||
case dataset: OafDataset =>
|
||||
val a: AtomicAction[OafDataset] = new AtomicAction[OafDataset]
|
||||
a.setClazz(classOf[OafDataset])
|
||||
a.setPayload(dataset)
|
||||
(dataset.getClass.getCanonicalName, mapper.writeValueAsString(a))
|
||||
case publication: Publication =>
|
||||
val a: AtomicAction[Publication] = new AtomicAction[Publication]
|
||||
a.setClazz(classOf[Publication])
|
||||
a.setPayload(publication)
|
||||
(publication.getClass.getCanonicalName, mapper.writeValueAsString(a))
|
||||
case software: Software =>
|
||||
val a: AtomicAction[Software] = new AtomicAction[Software]
|
||||
a.setClazz(classOf[Software])
|
||||
a.setPayload(software)
|
||||
(software.getClass.getCanonicalName, mapper.writeValueAsString(a))
|
||||
case orp: OtherResearchProduct =>
|
||||
val a: AtomicAction[OtherResearchProduct] = new AtomicAction[OtherResearchProduct]
|
||||
a.setClazz(classOf[OtherResearchProduct])
|
||||
a.setPayload(orp)
|
||||
(orp.getClass.getCanonicalName, mapper.writeValueAsString(a))
|
||||
|
||||
case relation: Relation =>
|
||||
val a: AtomicAction[Relation] = new AtomicAction[Relation]
|
||||
a.setClazz(classOf[Relation])
|
||||
a.setPayload(relation)
|
||||
(relation.getClass.getCanonicalName, mapper.writeValueAsString(a))
|
||||
case _ =>
|
||||
null
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val log: Logger = LoggerFactory.getLogger(getClass)
|
||||
val conf: SparkConf = new SparkConf()
|
||||
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/actionset/save_actionset.json")).mkString)
|
||||
parser.parseArgument(args)
|
||||
|
||||
|
||||
val spark: SparkSession =
|
||||
SparkSession
|
||||
.builder()
|
||||
.config(conf)
|
||||
.appName(getClass.getSimpleName)
|
||||
.master(parser.get("master")).getOrCreate()
|
||||
|
||||
|
||||
val sourcePath = parser.get("sourcePath")
|
||||
log.info(s"sourcePath -> $sourcePath")
|
||||
|
||||
val targetPath = parser.get("targetPath")
|
||||
log.info(s"targetPath -> $targetPath")
|
||||
|
||||
implicit val oafEncoders: Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||
implicit val tEncoder: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING)
|
||||
|
||||
spark.read.load(sourcePath).as[Oaf]
|
||||
.map(o => toActionSet(o))
|
||||
.filter(o => o != null)
|
||||
.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text, Text]], classOf[GzipCodec])
|
||||
|
||||
}
|
||||
|
||||
}
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue