Compare commits
3 Commits
Author | SHA1 | Date |
---|---|---|
Miriam Baglioni | 552169ecd0 | |
Miriam Baglioni | 712704ac77 | |
Miriam Baglioni | b6d4ff8e69 |
|
@ -42,18 +42,32 @@ public class UtilCommunityAPI {
|
|||
public List<String> getCommunityCsv(List<String> comms) {
|
||||
return comms.stream().map(c -> {
|
||||
try {
|
||||
CommunityModel community = getCommunity(c);
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append(DHPUtils.md5(community.getId()));
|
||||
builder.append(Constants.SEP);
|
||||
builder.append(community.getName());
|
||||
builder.append(Constants.SEP);
|
||||
builder.append(community.getId());
|
||||
builder.append(Constants.SEP);
|
||||
builder
|
||||
.append(
|
||||
community.getDescription());
|
||||
return builder.toString();
|
||||
if (c.equalsIgnoreCase("eosc")) {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append(DHPUtils.md5("eosc"));
|
||||
builder.append(Constants.SEP);
|
||||
builder.append("EOSC");
|
||||
builder.append(Constants.SEP);
|
||||
builder.append("eosc");
|
||||
builder.append(Constants.SEP);
|
||||
builder
|
||||
.append(
|
||||
"European Open Science Cloud");
|
||||
return builder.toString();
|
||||
} else {
|
||||
CommunityModel community = getCommunity(c);
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append(DHPUtils.md5(community.getId()));
|
||||
builder.append(Constants.SEP);
|
||||
builder.append(community.getName());
|
||||
builder.append(Constants.SEP);
|
||||
builder.append(community.getId());
|
||||
builder.append(Constants.SEP);
|
||||
builder
|
||||
.append(
|
||||
community.getDescription());
|
||||
return builder.toString();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
|
|
@ -2,12 +2,10 @@
|
|||
package eu.dnetlib.dhp.oa.graph.dump.csv;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
import static org.apache.commons.lang3.StringUtils.remove;
|
||||
import static org.apache.commons.lang3.StringUtils.split;
|
||||
import static org.apache.commons.lang3.StringUtils.*;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collector;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
@ -19,14 +17,9 @@ import org.apache.spark.sql.Dataset;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVAuthor;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVPid;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVRelResAut;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVResult;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.csv.model.*;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||
|
@ -94,6 +87,9 @@ public class SparkDumpResults implements Serializable {
|
|||
.filter(
|
||||
(FilterFunction<R>) p -> !p.getDataInfo().getDeletedbyinference() && !p.getDataInfo().getInvisible());
|
||||
|
||||
Dataset<Datasource> datasources = Utils
|
||||
.readPath(spark, inputPath + "/datasource", Datasource.class)
|
||||
.filter((FilterFunction<Datasource>) d -> !d.getDataInfo().getDeletedbyinference());
|
||||
resultIds
|
||||
.joinWith(results, resultIds.col("value").equalTo(results.col("id")))
|
||||
.map((MapFunction<Tuple2<String, R>, R>) t2 -> t2._2(), Encoders.bean(inputClazz))
|
||||
|
@ -104,35 +100,166 @@ public class SparkDumpResults implements Serializable {
|
|||
|
||||
// map results
|
||||
results = Utils.readPath(spark, workingPath + "/" + resultType + "/temp/result", inputClazz);
|
||||
results
|
||||
.map(
|
||||
(MapFunction<R, CSVResult>) r -> mapResultInfo(r),
|
||||
Encoders.bean(CSVResult.class))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(workingPath + "/" + resultType + "/result");
|
||||
mapResultToCSV(resultType, workingPath, results);
|
||||
|
||||
// map relations between pid and result
|
||||
results
|
||||
.flatMap((FlatMapFunction<R, CSVPid>) r -> {
|
||||
List<CSVPid> pids = new ArrayList<>();
|
||||
if (Optional.ofNullable(r.getPid()).isPresent() && r.getPid().size() > 0) {
|
||||
pids.addAll(mapPid(r.getPid(), r.getId()));
|
||||
}
|
||||
return pids.iterator();
|
||||
}, Encoders.bean(CSVPid.class))
|
||||
.filter(Objects::nonNull)
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(workingPath + "/" + resultType + "/result_pid");
|
||||
mapPidResultRelations(resultType, workingPath, results);
|
||||
|
||||
// map authors from the result
|
||||
// per ogni autore nel result
|
||||
// se l'autore ha un orcid il suo id dipende dall'orcid (tipo md5(orcid))
|
||||
// se non ha orcid il suo id si costruisce come result_id + authorrank ( se non ha il rank si sua
|
||||
// la sua posizione nell'insieme degli autori) sempre con md5
|
||||
mapAuthorResults(spark, resultType, workingPath, results);
|
||||
|
||||
// map result-datasource association for collectedfrom
|
||||
mapRelationResultCollectedFrom(resultType, workingPath, results, datasources);
|
||||
|
||||
// map result datasource association for hostedby
|
||||
mapRelationResultHostedBy(resultType, workingPath, results, datasources);
|
||||
|
||||
// get the identifiers for the datasources to be dumped
|
||||
mapDatasourceForResults(resultType, workingPath, results, datasources);
|
||||
|
||||
mapRelationResultFos(resultType, workingPath, results);
|
||||
|
||||
mapFosFromResult(resultType, workingPath, results);
|
||||
}
|
||||
|
||||
private static <R extends Result> void mapFosFromResult(String resultType, String workingPath, Dataset<R> results) {
|
||||
results.flatMap((FlatMapFunction<R, CSVFos>) r -> {
|
||||
List<CSVFos> fosList = new ArrayList<>();
|
||||
if (Optional.ofNullable(r.getSubject()).isPresent()) {
|
||||
r
|
||||
.getSubject()
|
||||
.stream()
|
||||
.filter(s -> s.getQualifier().getClassid().equalsIgnoreCase("fos"))
|
||||
.forEach(
|
||||
s -> fosList
|
||||
.add(
|
||||
CSVFos
|
||||
.newInstance(
|
||||
s.getValue().split(" ")[0],
|
||||
s.getValue().substring(s.getValue().indexOf(" ")))));
|
||||
}
|
||||
return fosList.iterator();
|
||||
}, Encoders.bean(CSVFos.class))
|
||||
.groupByKey((MapFunction<CSVFos, String>) CSVFos::getId, Encoders.STRING())
|
||||
.mapGroups((MapGroupsFunction<String, CSVFos, CSVFos>) (k, v) -> v.next(), Encoders.bean(CSVFos.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingPath + "/" + resultType + "/fos");
|
||||
}
|
||||
|
||||
private static <R extends Result> void mapRelationResultFos(String resultType, String workingPath,
|
||||
Dataset<R> results) {
|
||||
results.flatMap((FlatMapFunction<R, CSVRelResFos>) r -> {
|
||||
List<CSVRelResFos> relList = new ArrayList<>();
|
||||
if (Optional.ofNullable(r.getSubject()).isPresent()) {
|
||||
r
|
||||
.getSubject()
|
||||
.stream()
|
||||
.filter(s -> s.getQualifier().getClassid().equalsIgnoreCase("fos"))
|
||||
.forEach(s -> relList.add(CSVRelResFos.newInstance(r.getId(), s.getValue().split(" ")[0])));
|
||||
}
|
||||
return relList.iterator();
|
||||
}, Encoders.bean(CSVRelResFos.class))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(workingPath + "/" + resultType + "/result_fos");
|
||||
}
|
||||
|
||||
private static <R extends Result> void mapDatasourceForResults(String resultType, String workingPath,
|
||||
Dataset<R> results, Dataset<Datasource> datasources) {
|
||||
Dataset<String> datasourceId = results.flatMap((FlatMapFunction<R, String>) r -> {
|
||||
List<String> ret = r
|
||||
.getInstance()
|
||||
.stream()
|
||||
.map(i -> i.getCollectedfrom().getKey())
|
||||
.collect(Collectors.toList());
|
||||
ret.addAll(r.getInstance().stream().map(i -> i.getHostedby().getKey()).collect(Collectors.toList()));
|
||||
return ret.iterator();
|
||||
}, Encoders.STRING())
|
||||
.distinct();
|
||||
datasources
|
||||
.joinWith(datasourceId, datasources.col("id").equalTo(datasourceId.col("value")))
|
||||
.map((MapFunction<Tuple2<Datasource, String>, CSVDatasource>) t2 -> {
|
||||
CSVDatasource csvd = new CSVDatasource();
|
||||
csvd.setId(t2._1().getId());
|
||||
csvd.setType(removeBreaks(t2._1.getDatasourcetype().getClassname()));
|
||||
csvd
|
||||
.setOfficialName(
|
||||
removeBreaks(
|
||||
Optional
|
||||
.ofNullable(t2._1().getOfficialname())
|
||||
.map(Field::getValue)
|
||||
.orElse("")));
|
||||
csvd
|
||||
.setEnglish_name(
|
||||
removeBreaks(Optional.ofNullable(t2._1().getEnglishname()).map(Field::getValue).orElse("")));
|
||||
return csvd;
|
||||
}, Encoders.bean(CSVDatasource.class))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(workingPath + "/" + resultType + "/datasource");
|
||||
}
|
||||
|
||||
private static <R extends Result> void mapRelationResultHostedBy(String resultType, String workingPath,
|
||||
Dataset<R> results, Dataset<Datasource> datasource) {
|
||||
Dataset<CSVRelResDat> hostedby = results
|
||||
.flatMap(
|
||||
(FlatMapFunction<R, CSVRelResDat>) r -> r
|
||||
.getInstance()
|
||||
.stream()
|
||||
.map(i -> CSVRelResDat.newInstance(r.getId(), i.getHostedby().getKey()))
|
||||
.collect(Collectors.toList())
|
||||
.iterator(),
|
||||
Encoders.bean(CSVRelResDat.class));
|
||||
getVerifiedSet(datasource, hostedby)
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(workingPath + "/" + resultType + "/result_hostedby");
|
||||
}
|
||||
|
||||
private static Dataset<CSVRelResDat> getVerifiedSet(Dataset<Datasource> datasource, Dataset<CSVRelResDat> linkedDatasources) {
|
||||
return linkedDatasources
|
||||
.joinWith(datasource, linkedDatasources.col("datasource_id").equalTo(datasource.col("id")), "left")
|
||||
.map(
|
||||
(MapFunction<Tuple2<CSVRelResDat, Datasource>, CSVRelResDat>) t2 -> {
|
||||
CSVRelResDat csvRelResDat = t2._1();
|
||||
if (!Optional.ofNullable(t2._2()).isPresent())
|
||||
csvRelResDat.setDatasource_id(ModelConstants.UNKNOWN_REPOSITORY.getKey());
|
||||
return csvRelResDat;
|
||||
},
|
||||
Encoders.bean(CSVRelResDat.class))
|
||||
.groupByKey((MapFunction<CSVRelResDat, String>) rel -> rel.getResult_id() + "::" + rel.getDatasource_id(), Encoders.STRING())
|
||||
.mapGroups((MapGroupsFunction<String, CSVRelResDat, CSVRelResDat>) (k,v) -> v.next(), Encoders.bean(CSVRelResDat.class));
|
||||
}
|
||||
|
||||
private static <R extends Result> void mapRelationResultCollectedFrom(String resultType, String workingPath,
|
||||
Dataset<R> results, Dataset<Datasource> datasource) {
|
||||
Dataset<CSVRelResDat> collectedfrom = results
|
||||
.flatMap(
|
||||
(FlatMapFunction<R, CSVRelResDat>) r -> r
|
||||
.getInstance()
|
||||
.stream()
|
||||
.map(i -> CSVRelResDat.newInstance(r.getId(), i.getCollectedfrom().getKey()))
|
||||
.collect(Collectors.toList())
|
||||
.iterator(),
|
||||
Encoders.bean(CSVRelResDat.class));
|
||||
getVerifiedSet(datasource, collectedfrom )
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(workingPath + "/" + resultType + "/result_collectedfrom");
|
||||
}
|
||||
|
||||
private static <R extends Result> void mapAuthorResults(SparkSession spark, String resultType, String workingPath,
|
||||
Dataset<R> results) {
|
||||
results
|
||||
.flatMap((FlatMapFunction<R, AuthorResult>) r -> {
|
||||
int count = 0;
|
||||
|
@ -193,7 +320,7 @@ public class SparkDumpResults implements Serializable {
|
|||
.mode(SaveMode.Overwrite)
|
||||
.json(workingPath + "/" + resultType + "/result_author");
|
||||
|
||||
// ma the authors in the working dir. I do not want to have them repeated. If I have an orcid as id, I choose
|
||||
// map the authors in the working dir. I do not want to have them repeated. If I have an orcid as id, I choose
|
||||
// the one from orcid if any
|
||||
authorResult
|
||||
.groupByKey((MapFunction<AuthorResult, String>) ar -> ar.getAuthorId(), Encoders.STRING())
|
||||
|
@ -214,7 +341,34 @@ public class SparkDumpResults implements Serializable {
|
|||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(workingPath + "/" + resultType + "/author");
|
||||
}
|
||||
|
||||
private static <R extends Result> void mapPidResultRelations(String resultType, String workingPath,
|
||||
Dataset<R> results) {
|
||||
results
|
||||
.flatMap((FlatMapFunction<R, CSVPid>) r -> {
|
||||
List<CSVPid> pids = new ArrayList<>();
|
||||
if (Optional.ofNullable(r.getPid()).isPresent() && r.getPid().size() > 0) {
|
||||
pids.addAll(mapPid(r.getPid(), r.getId()));
|
||||
}
|
||||
return pids.iterator();
|
||||
}, Encoders.bean(CSVPid.class))
|
||||
.filter(Objects::nonNull)
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(workingPath + "/" + resultType + "/result_pid");
|
||||
}
|
||||
|
||||
private static <R extends Result> void mapResultToCSV(String resultType, String workingPath, Dataset<R> results) {
|
||||
results
|
||||
.map(
|
||||
(MapFunction<R, CSVResult>) r -> mapResultInfo(r),
|
||||
Encoders.bean(CSVResult.class))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(workingPath + "/" + resultType + "/result");
|
||||
}
|
||||
|
||||
private static List<CSVPid> mapPid(List<StructuredProperty> pid, String resultId) {
|
||||
|
@ -292,12 +446,24 @@ public class SparkDumpResults implements Serializable {
|
|||
ret.setPublisher(removeBreaks(getFieldValue(r.getPublisher())));
|
||||
|
||||
if (Optional.ofNullable(r.getSubject()).isPresent())
|
||||
ret.setKeywords(String.join(", ", r.getSubject().stream().map(s -> {
|
||||
if (StringUtils.isNotEmpty(s.getValue()))
|
||||
return removeBreaks(s.getValue().toLowerCase());
|
||||
else
|
||||
return null;
|
||||
}).filter(Objects::nonNull).distinct().collect(Collectors.toList())));
|
||||
ret
|
||||
.setKeywords(
|
||||
String
|
||||
.join(
|
||||
", ",
|
||||
r
|
||||
.getSubject()
|
||||
.stream()
|
||||
.filter(s -> !s.getQualifier().getClassid().equalsIgnoreCase("fos"))
|
||||
.map(s -> {
|
||||
if (StringUtils.isNotEmpty(s.getValue()))
|
||||
return removeBreaks(s.getValue().toLowerCase());
|
||||
else
|
||||
return null;
|
||||
})
|
||||
.filter(Objects::nonNull)
|
||||
.distinct()
|
||||
.collect(Collectors.toList())));
|
||||
else
|
||||
ret.setKeywords("");
|
||||
|
||||
|
|
|
@ -20,10 +20,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVAuthor;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVPid;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVRelResAut;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVResult;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.csv.model.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
|
||||
/**
|
||||
|
@ -112,7 +109,6 @@ public class SparkMoveOnSigleDir implements Serializable {
|
|||
.option("delimiter", Constants.SEP)
|
||||
.option("compression", "gzip")
|
||||
.csv(outputPath + "/result_author");
|
||||
|
||||
Utils
|
||||
.readPath(spark, workingPath + "/publication/author", CSVAuthor.class)
|
||||
.union(Utils.readPath(spark, workingPath + "/dataset/author", CSVAuthor.class))
|
||||
|
@ -128,6 +124,74 @@ public class SparkMoveOnSigleDir implements Serializable {
|
|||
.option("compression", "gzip")
|
||||
.csv(outputPath + "/author");
|
||||
|
||||
Utils
|
||||
.readPath(spark, workingPath + "/publication/result_collectedfrom", CSVRelResDat.class)
|
||||
.union(Utils.readPath(spark, workingPath + "/dataset/result_collectedfrom", CSVRelResDat.class))
|
||||
.union(Utils.readPath(spark, workingPath + "/software/result_collectedfrom", CSVRelResDat.class))
|
||||
.union(
|
||||
Utils.readPath(spark, workingPath + "/otherresearchproduct/result_collectedfrom", CSVRelResDat.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("header", "true")
|
||||
.option("delimiter", Constants.SEP)
|
||||
.option("compression", "gzip")
|
||||
.csv(outputPath + "/result_collectedfrom");
|
||||
|
||||
Utils
|
||||
.readPath(spark, workingPath + "/publication/result_hostedby", CSVRelResDat.class)
|
||||
.union(Utils.readPath(spark, workingPath + "/dataset/result_hostedby", CSVRelResDat.class))
|
||||
.union(Utils.readPath(spark, workingPath + "/software/result_hostedby", CSVRelResDat.class))
|
||||
.union(Utils.readPath(spark, workingPath + "/otherresearchproduct/result_hostedby", CSVRelResDat.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("header", "true")
|
||||
.option("delimiter", Constants.SEP)
|
||||
.option("compression", "gzip")
|
||||
.csv(outputPath + "/result_hostedby");
|
||||
|
||||
Utils
|
||||
.readPath(spark, workingPath + "/publication/datasource", CSVDatasource.class)
|
||||
.union(Utils.readPath(spark, workingPath + "/dataset/datasource", CSVDatasource.class))
|
||||
.union(Utils.readPath(spark, workingPath + "/software/datasource", CSVDatasource.class))
|
||||
.union(Utils.readPath(spark, workingPath + "/otherresearchproduct/datasource", CSVDatasource.class))
|
||||
.groupByKey((MapFunction<CSVDatasource, String>) r -> r.getId(), Encoders.STRING())
|
||||
.mapGroups(
|
||||
(MapGroupsFunction<String, CSVDatasource, CSVDatasource>) (k, it) -> it.next(),
|
||||
Encoders.bean(CSVDatasource.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("header", "true")
|
||||
.option("delimiter", Constants.SEP)
|
||||
.option("compression", "gzip")
|
||||
.csv(outputPath + "/datasource");
|
||||
|
||||
Utils
|
||||
.readPath(spark, workingPath + "/publication/result_fos", CSVRelResFos.class)
|
||||
.union(Utils.readPath(spark, workingPath + "/dataset/result_fos", CSVRelResFos.class))
|
||||
.union(Utils.readPath(spark, workingPath + "/software/result_fos", CSVRelResFos.class))
|
||||
.union(Utils.readPath(spark, workingPath + "/otherresearchproduct/result_fos", CSVRelResFos.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("header", "true")
|
||||
.option("delimiter", Constants.SEP)
|
||||
.option("compression", "gzip")
|
||||
.csv(outputPath + "/result_fos");
|
||||
|
||||
Utils
|
||||
.readPath(spark, workingPath + "/publication/fos", CSVFos.class)
|
||||
.union(Utils.readPath(spark, workingPath + "/dataset/fos", CSVFos.class))
|
||||
.union(Utils.readPath(spark, workingPath + "/software/fos", CSVFos.class))
|
||||
.union(Utils.readPath(spark, workingPath + "/otherresearchproduct/fos", CSVFos.class))
|
||||
.groupByKey((MapFunction<CSVFos, String>) r -> r.getId(), Encoders.STRING())
|
||||
.mapGroups(
|
||||
(MapGroupsFunction<String, CSVFos, CSVFos>) (k, it) -> it.next(), Encoders.bean(CSVFos.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("header", "true")
|
||||
.option("delimiter", Constants.SEP)
|
||||
.option("compression", "gzip")
|
||||
.csv(outputPath + "/fos");
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.dump.csv.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class CSVDatasource implements Serializable {
|
||||
private String id;
|
||||
private String type;
|
||||
private String officialName;
|
||||
private String english_name;
|
||||
private String interface_url;
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public void setType(String type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public String getOfficialName() {
|
||||
return officialName;
|
||||
}
|
||||
|
||||
public void setOfficialName(String officialName) {
|
||||
this.officialName = officialName;
|
||||
}
|
||||
|
||||
public String getEnglish_name() {
|
||||
return english_name;
|
||||
}
|
||||
|
||||
public void setEnglish_name(String english_name) {
|
||||
this.english_name = english_name;
|
||||
}
|
||||
|
||||
public String getInterface_url() {
|
||||
return interface_url;
|
||||
}
|
||||
|
||||
public void setInterface_url(String interface_url) {
|
||||
this.interface_url = interface_url;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.dump.csv.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class CSVFos implements Serializable {
|
||||
private String id;
|
||||
private String label;
|
||||
|
||||
public static CSVFos newInstance(String s, String s1) {
|
||||
CSVFos csf = new CSVFos();
|
||||
csf.id = s;
|
||||
csf.label = s1;
|
||||
return csf;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public String getLabel() {
|
||||
return label;
|
||||
}
|
||||
|
||||
public void setLabel(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.dump.csv.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class CSVRelResDat implements Serializable {
|
||||
private String result_id;
|
||||
private String datasource_id;
|
||||
|
||||
public static CSVRelResDat newInstance(String result_id, String datasource_id) {
|
||||
CSVRelResDat rrd = new CSVRelResDat();
|
||||
rrd.result_id = result_id;
|
||||
rrd.datasource_id = datasource_id;
|
||||
return rrd;
|
||||
}
|
||||
|
||||
public String getResult_id() {
|
||||
return result_id;
|
||||
}
|
||||
|
||||
public void setResult_id(String result_id) {
|
||||
this.result_id = result_id;
|
||||
}
|
||||
|
||||
public String getDatasource_id() {
|
||||
return datasource_id;
|
||||
}
|
||||
|
||||
public void setDatasource_id(String datasource_id) {
|
||||
this.datasource_id = datasource_id;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.dump.csv.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class CSVRelResFos implements Serializable {
|
||||
private String result_id;
|
||||
private String fos_id;
|
||||
|
||||
public String getResult_id() {
|
||||
return result_id;
|
||||
}
|
||||
|
||||
public void setResult_id(String result_id) {
|
||||
this.result_id = result_id;
|
||||
}
|
||||
|
||||
public String getFos_id() {
|
||||
return fos_id;
|
||||
}
|
||||
|
||||
public void setFos_id(String fos_id) {
|
||||
this.fos_id = fos_id;
|
||||
}
|
||||
|
||||
public static CSVRelResFos newInstance(String result_id, String fos_id) {
|
||||
CSVRelResFos rrf = new CSVRelResFos();
|
||||
rrf.result_id = result_id;
|
||||
rrf.fos_id = fos_id;
|
||||
return rrf;
|
||||
}
|
||||
}
|
|
@ -3,16 +3,7 @@ package eu.dnetlib.dhp.oa.graph.dump.csv.model;
|
|||
|
||||
import java.io.Serializable;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonGetter;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonSetter;
|
||||
|
||||
import eu.dnetlib.dhp.oa.graph.dump.csv.Constants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Country;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
import sun.swing.StringUIClientPropertyKey;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
|
|
|
@ -110,11 +110,11 @@
|
|||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||
<arg>--communities</arg><arg>${communities}</arg>
|
||||
</spark>
|
||||
<ok to="fork_dump_result_author_pid"/>
|
||||
<ok to="fork_dump_result_author_datasource_fos_pid"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<fork name="fork_dump_result_author_pid">
|
||||
<fork name="fork_dump_result_author_datasource_fos_pid">
|
||||
<path start="dump_publication"/>
|
||||
<path start="dump_dataset"/>
|
||||
<path start="dump_other"/>
|
||||
|
|
|
@ -7,7 +7,9 @@ import java.io.IOException;
|
|||
import java.io.StringReader;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
@ -33,6 +35,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.oa.graph.dump.UtilCommunityAPI;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVAuthor;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVResult;
|
||||
|
@ -126,9 +129,7 @@ public class DumpResultTest {
|
|||
Assertions.assertEquals(ModelConstants.OPEN_ACCESS_RIGHT().getClassid(), row.getAccessright());
|
||||
Assertions.assertEquals("FI", row.getCountry());
|
||||
Assertions.assertEquals("Lit.opg., bijl.", row.getDescription());
|
||||
Assertions.assertEquals(3, split(row.getKeywords(), ", ").length);
|
||||
Assertions.assertTrue(row.getKeywords().toString().contains("archeologie"));
|
||||
Assertions.assertTrue(row.getKeywords().toString().contains("prospectie"));
|
||||
Assertions.assertEquals(1, split(row.getKeywords(), ",").length);
|
||||
Assertions.assertTrue(row.getKeywords().toString().contains("archaeology"));
|
||||
Assertions.assertEquals("nl", row.getLanguage());
|
||||
Assertions.assertEquals("2007-01-01", row.getPublication_date());
|
||||
|
@ -244,7 +245,6 @@ public class DumpResultTest {
|
|||
|
||||
SparkDumpResults.main(new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-outputPath", workingDir.toString() + "/output",
|
||||
"-workingPath", workingDir.toString() + "/working",
|
||||
"-resultType", "publication",
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||
|
@ -257,9 +257,9 @@ public class DumpResultTest {
|
|||
.read()
|
||||
.option("header", "true")
|
||||
.option("delimiter", Constants.SEP)
|
||||
.csv(workingDir.toString() + "/working/publication/result_author");
|
||||
.json(workingDir.toString() + "/working/publication/result_author");
|
||||
|
||||
Assertions.assertEquals(6, tmp.count());
|
||||
Assertions.assertEquals(14, tmp.count());
|
||||
|
||||
Assertions.assertEquals(2, tmp.where("author_id == '" + DHPUtils.md5("0000-0003-2914-2734") + "'").count());
|
||||
Assertions
|
||||
|
@ -295,7 +295,6 @@ public class DumpResultTest {
|
|||
|
||||
SparkDumpResults.main(new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-outputPath", workingDir.toString() + "/output",
|
||||
"-workingPath", workingDir.toString() + "/working",
|
||||
"-resultType", "publication",
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||
|
@ -308,10 +307,10 @@ public class DumpResultTest {
|
|||
.read()
|
||||
.option("header", "true")
|
||||
.option("delimiter", Constants.SEP)
|
||||
.csv(workingDir.toString() + "/working/publication/result_pid");
|
||||
.json(workingDir.toString() + "/working/publication/result_pid");
|
||||
|
||||
tmp.show(false);
|
||||
Assertions.assertEquals(4, tmp.count());
|
||||
Assertions.assertEquals(6, tmp.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(2, tmp.where("result_id == '50|DansKnawCris::0224aae28af558f21768dbc6439c7a95'").count());
|
||||
|
@ -325,6 +324,202 @@ public class DumpResultTest {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDumpDatasource() throws Exception {
|
||||
|
||||
final String sourcePath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/input/")
|
||||
.getPath();
|
||||
|
||||
spark
|
||||
.read()
|
||||
.text(
|
||||
getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds")
|
||||
.getPath())
|
||||
.write()
|
||||
.text(workingDir.toString() + "/working/resultIds/");
|
||||
|
||||
SparkDumpResults.main(new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-workingPath", workingDir.toString() + "/working",
|
||||
"-resultType", "publication",
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||
"-sourcePath", sourcePath
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
Dataset<Row> tmp = spark
|
||||
.read()
|
||||
.option("header", "true")
|
||||
.option("delimiter", Constants.SEP)
|
||||
.json(workingDir.toString() + "/working/publication/datasource");
|
||||
|
||||
tmp.show(false);
|
||||
Assertions.assertEquals(2, tmp.count());
|
||||
|
||||
tmp.foreach((ForeachFunction<Row>) d -> Assertions.assertEquals("Journal", d.getString(3)));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"Reflektika",
|
||||
tmp.where("id == '10|doajarticles::0f7fe9973717cb7ac2bba8685bc88a2a'").first().getString(0));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"Reflektika",
|
||||
tmp.where("id == '10|doajarticles::0f7fe9973717cb7ac2bba8685bc88a2a'").first().getString(2));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDumpDatasourceRels() throws Exception {
|
||||
|
||||
final String sourcePath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/input/")
|
||||
.getPath();
|
||||
|
||||
spark
|
||||
.read()
|
||||
.text(
|
||||
getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds")
|
||||
.getPath())
|
||||
.write()
|
||||
.text(workingDir.toString() + "/working/resultIds/");
|
||||
|
||||
SparkDumpResults.main(new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-workingPath", workingDir.toString() + "/working",
|
||||
"-resultType", "publication",
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||
"-sourcePath", sourcePath
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
Dataset<Row> tmp = spark
|
||||
.read()
|
||||
.option("header", "true")
|
||||
.option("delimiter", Constants.SEP)
|
||||
.json(workingDir.toString() + "/working/publication/result_collectedfrom");
|
||||
|
||||
tmp.show(false);
|
||||
Assertions.assertEquals(5, tmp.count());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDumpDatasourceRels2() throws Exception {
|
||||
|
||||
final String sourcePath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/input/")
|
||||
.getPath();
|
||||
|
||||
spark
|
||||
.read()
|
||||
.text(
|
||||
getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds")
|
||||
.getPath())
|
||||
.write()
|
||||
.text(workingDir.toString() + "/working/resultIds/");
|
||||
|
||||
SparkDumpResults.main(new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-workingPath", workingDir.toString() + "/working",
|
||||
"-resultType", "publication",
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||
"-sourcePath", sourcePath
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
Dataset<Row> tmp = spark
|
||||
.read()
|
||||
.option("header", "true")
|
||||
.option("delimiter", Constants.SEP)
|
||||
.json(workingDir.toString() + "/working/publication/result_hostedby");
|
||||
|
||||
tmp.show(false);
|
||||
Assertions.assertEquals(5, tmp.count());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDumpTopic() throws Exception {
|
||||
|
||||
final String sourcePath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/input/")
|
||||
.getPath();
|
||||
|
||||
spark
|
||||
.read()
|
||||
.text(
|
||||
getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds")
|
||||
.getPath())
|
||||
.write()
|
||||
.text(workingDir.toString() + "/working/resultIds/");
|
||||
|
||||
SparkDumpResults.main(new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-workingPath", workingDir.toString() + "/working",
|
||||
"-resultType", "publication",
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||
"-sourcePath", sourcePath
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
Dataset<Row> tmp = spark
|
||||
.read()
|
||||
.option("header", "true")
|
||||
.option("delimiter", Constants.SEP)
|
||||
.json(workingDir.toString() + "/working/publication/fos");
|
||||
|
||||
tmp.show(false);
|
||||
Assertions.assertEquals(8, tmp.count());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDumpTopicRel() throws Exception {
|
||||
|
||||
final String sourcePath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/input/")
|
||||
.getPath();
|
||||
|
||||
spark
|
||||
.read()
|
||||
.text(
|
||||
getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds")
|
||||
.getPath())
|
||||
.write()
|
||||
.text(workingDir.toString() + "/working/resultIds/");
|
||||
|
||||
SparkDumpResults.main(new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-workingPath", workingDir.toString() + "/working",
|
||||
"-resultType", "publication",
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||
"-sourcePath", sourcePath
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
Dataset<Row> tmp = spark
|
||||
.read()
|
||||
.option("header", "true")
|
||||
.option("delimiter", Constants.SEP)
|
||||
.json(workingDir.toString() + "/working/publication/result_fos");
|
||||
|
||||
tmp.show(false);
|
||||
Assertions.assertEquals(9, tmp.count());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void prova() throws DocumentException {
|
||||
String input = "<community id=\"dh-ch\" label=\"Digital Humanities and Cultural Heritage\">" +
|
||||
|
@ -355,4 +550,10 @@ public class DumpResultTest {
|
|||
|
||||
System.out.println(st.replace("\\\"", " "));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getCommunitiesFromApi() {
|
||||
UtilCommunityAPI queryCommunityApi = new UtilCommunityAPI();
|
||||
queryCommunityApi.getCommunityCsv(Arrays.asList("eosc")).forEach(r -> System.out.println(r));
|
||||
}
|
||||
}
|
||||
|
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue