Compare commits

..

3 Commits

12 changed files with 674 additions and 79 deletions

View File

@ -42,18 +42,32 @@ public class UtilCommunityAPI {
public List<String> getCommunityCsv(List<String> comms) {
return comms.stream().map(c -> {
try {
CommunityModel community = getCommunity(c);
StringBuilder builder = new StringBuilder();
builder.append(DHPUtils.md5(community.getId()));
builder.append(Constants.SEP);
builder.append(community.getName());
builder.append(Constants.SEP);
builder.append(community.getId());
builder.append(Constants.SEP);
builder
.append(
community.getDescription());
return builder.toString();
if (c.equalsIgnoreCase("eosc")) {
StringBuilder builder = new StringBuilder();
builder.append(DHPUtils.md5("eosc"));
builder.append(Constants.SEP);
builder.append("EOSC");
builder.append(Constants.SEP);
builder.append("eosc");
builder.append(Constants.SEP);
builder
.append(
"European Open Science Cloud");
return builder.toString();
} else {
CommunityModel community = getCommunity(c);
StringBuilder builder = new StringBuilder();
builder.append(DHPUtils.md5(community.getId()));
builder.append(Constants.SEP);
builder.append(community.getName());
builder.append(Constants.SEP);
builder.append(community.getId());
builder.append(Constants.SEP);
builder
.append(
community.getDescription());
return builder.toString();
}
} catch (IOException e) {
throw new RuntimeException(e);
}

View File

@ -2,12 +2,10 @@
package eu.dnetlib.dhp.oa.graph.dump.csv;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static org.apache.commons.lang3.StringUtils.remove;
import static org.apache.commons.lang3.StringUtils.split;
import static org.apache.commons.lang3.StringUtils.*;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
@ -19,14 +17,9 @@ import org.apache.spark.sql.Dataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVAuthor;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVPid;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVRelResAut;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVResult;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.*;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Author;
@ -94,6 +87,9 @@ public class SparkDumpResults implements Serializable {
.filter(
(FilterFunction<R>) p -> !p.getDataInfo().getDeletedbyinference() && !p.getDataInfo().getInvisible());
Dataset<Datasource> datasources = Utils
.readPath(spark, inputPath + "/datasource", Datasource.class)
.filter((FilterFunction<Datasource>) d -> !d.getDataInfo().getDeletedbyinference());
resultIds
.joinWith(results, resultIds.col("value").equalTo(results.col("id")))
.map((MapFunction<Tuple2<String, R>, R>) t2 -> t2._2(), Encoders.bean(inputClazz))
@ -104,35 +100,166 @@ public class SparkDumpResults implements Serializable {
// map results
results = Utils.readPath(spark, workingPath + "/" + resultType + "/temp/result", inputClazz);
results
.map(
(MapFunction<R, CSVResult>) r -> mapResultInfo(r),
Encoders.bean(CSVResult.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(workingPath + "/" + resultType + "/result");
mapResultToCSV(resultType, workingPath, results);
// map relations between pid and result
results
.flatMap((FlatMapFunction<R, CSVPid>) r -> {
List<CSVPid> pids = new ArrayList<>();
if (Optional.ofNullable(r.getPid()).isPresent() && r.getPid().size() > 0) {
pids.addAll(mapPid(r.getPid(), r.getId()));
}
return pids.iterator();
}, Encoders.bean(CSVPid.class))
.filter(Objects::nonNull)
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(workingPath + "/" + resultType + "/result_pid");
mapPidResultRelations(resultType, workingPath, results);
// map authors from the result
// per ogni autore nel result
// se l'autore ha un orcid il suo id dipende dall'orcid (tipo md5(orcid))
// se non ha orcid il suo id si costruisce come result_id + authorrank ( se non ha il rank si sua
// la sua posizione nell'insieme degli autori) sempre con md5
mapAuthorResults(spark, resultType, workingPath, results);
// map result-datasource association for collectedfrom
mapRelationResultCollectedFrom(resultType, workingPath, results, datasources);
// map result datasource association for hostedby
mapRelationResultHostedBy(resultType, workingPath, results, datasources);
// get the identifiers for the datasources to be dumped
mapDatasourceForResults(resultType, workingPath, results, datasources);
mapRelationResultFos(resultType, workingPath, results);
mapFosFromResult(resultType, workingPath, results);
}
private static <R extends Result> void mapFosFromResult(String resultType, String workingPath, Dataset<R> results) {
results.flatMap((FlatMapFunction<R, CSVFos>) r -> {
List<CSVFos> fosList = new ArrayList<>();
if (Optional.ofNullable(r.getSubject()).isPresent()) {
r
.getSubject()
.stream()
.filter(s -> s.getQualifier().getClassid().equalsIgnoreCase("fos"))
.forEach(
s -> fosList
.add(
CSVFos
.newInstance(
s.getValue().split(" ")[0],
s.getValue().substring(s.getValue().indexOf(" ")))));
}
return fosList.iterator();
}, Encoders.bean(CSVFos.class))
.groupByKey((MapFunction<CSVFos, String>) CSVFos::getId, Encoders.STRING())
.mapGroups((MapGroupsFunction<String, CSVFos, CSVFos>) (k, v) -> v.next(), Encoders.bean(CSVFos.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath + "/" + resultType + "/fos");
}
private static <R extends Result> void mapRelationResultFos(String resultType, String workingPath,
Dataset<R> results) {
results.flatMap((FlatMapFunction<R, CSVRelResFos>) r -> {
List<CSVRelResFos> relList = new ArrayList<>();
if (Optional.ofNullable(r.getSubject()).isPresent()) {
r
.getSubject()
.stream()
.filter(s -> s.getQualifier().getClassid().equalsIgnoreCase("fos"))
.forEach(s -> relList.add(CSVRelResFos.newInstance(r.getId(), s.getValue().split(" ")[0])));
}
return relList.iterator();
}, Encoders.bean(CSVRelResFos.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(workingPath + "/" + resultType + "/result_fos");
}
private static <R extends Result> void mapDatasourceForResults(String resultType, String workingPath,
Dataset<R> results, Dataset<Datasource> datasources) {
Dataset<String> datasourceId = results.flatMap((FlatMapFunction<R, String>) r -> {
List<String> ret = r
.getInstance()
.stream()
.map(i -> i.getCollectedfrom().getKey())
.collect(Collectors.toList());
ret.addAll(r.getInstance().stream().map(i -> i.getHostedby().getKey()).collect(Collectors.toList()));
return ret.iterator();
}, Encoders.STRING())
.distinct();
datasources
.joinWith(datasourceId, datasources.col("id").equalTo(datasourceId.col("value")))
.map((MapFunction<Tuple2<Datasource, String>, CSVDatasource>) t2 -> {
CSVDatasource csvd = new CSVDatasource();
csvd.setId(t2._1().getId());
csvd.setType(removeBreaks(t2._1.getDatasourcetype().getClassname()));
csvd
.setOfficialName(
removeBreaks(
Optional
.ofNullable(t2._1().getOfficialname())
.map(Field::getValue)
.orElse("")));
csvd
.setEnglish_name(
removeBreaks(Optional.ofNullable(t2._1().getEnglishname()).map(Field::getValue).orElse("")));
return csvd;
}, Encoders.bean(CSVDatasource.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(workingPath + "/" + resultType + "/datasource");
}
private static <R extends Result> void mapRelationResultHostedBy(String resultType, String workingPath,
Dataset<R> results, Dataset<Datasource> datasource) {
Dataset<CSVRelResDat> hostedby = results
.flatMap(
(FlatMapFunction<R, CSVRelResDat>) r -> r
.getInstance()
.stream()
.map(i -> CSVRelResDat.newInstance(r.getId(), i.getHostedby().getKey()))
.collect(Collectors.toList())
.iterator(),
Encoders.bean(CSVRelResDat.class));
getVerifiedSet(datasource, hostedby)
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(workingPath + "/" + resultType + "/result_hostedby");
}
private static Dataset<CSVRelResDat> getVerifiedSet(Dataset<Datasource> datasource, Dataset<CSVRelResDat> linkedDatasources) {
return linkedDatasources
.joinWith(datasource, linkedDatasources.col("datasource_id").equalTo(datasource.col("id")), "left")
.map(
(MapFunction<Tuple2<CSVRelResDat, Datasource>, CSVRelResDat>) t2 -> {
CSVRelResDat csvRelResDat = t2._1();
if (!Optional.ofNullable(t2._2()).isPresent())
csvRelResDat.setDatasource_id(ModelConstants.UNKNOWN_REPOSITORY.getKey());
return csvRelResDat;
},
Encoders.bean(CSVRelResDat.class))
.groupByKey((MapFunction<CSVRelResDat, String>) rel -> rel.getResult_id() + "::" + rel.getDatasource_id(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, CSVRelResDat, CSVRelResDat>) (k,v) -> v.next(), Encoders.bean(CSVRelResDat.class));
}
private static <R extends Result> void mapRelationResultCollectedFrom(String resultType, String workingPath,
Dataset<R> results, Dataset<Datasource> datasource) {
Dataset<CSVRelResDat> collectedfrom = results
.flatMap(
(FlatMapFunction<R, CSVRelResDat>) r -> r
.getInstance()
.stream()
.map(i -> CSVRelResDat.newInstance(r.getId(), i.getCollectedfrom().getKey()))
.collect(Collectors.toList())
.iterator(),
Encoders.bean(CSVRelResDat.class));
getVerifiedSet(datasource, collectedfrom )
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(workingPath + "/" + resultType + "/result_collectedfrom");
}
private static <R extends Result> void mapAuthorResults(SparkSession spark, String resultType, String workingPath,
Dataset<R> results) {
results
.flatMap((FlatMapFunction<R, AuthorResult>) r -> {
int count = 0;
@ -193,7 +320,7 @@ public class SparkDumpResults implements Serializable {
.mode(SaveMode.Overwrite)
.json(workingPath + "/" + resultType + "/result_author");
// ma the authors in the working dir. I do not want to have them repeated. If I have an orcid as id, I choose
// map the authors in the working dir. I do not want to have them repeated. If I have an orcid as id, I choose
// the one from orcid if any
authorResult
.groupByKey((MapFunction<AuthorResult, String>) ar -> ar.getAuthorId(), Encoders.STRING())
@ -214,7 +341,34 @@ public class SparkDumpResults implements Serializable {
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(workingPath + "/" + resultType + "/author");
}
private static <R extends Result> void mapPidResultRelations(String resultType, String workingPath,
Dataset<R> results) {
results
.flatMap((FlatMapFunction<R, CSVPid>) r -> {
List<CSVPid> pids = new ArrayList<>();
if (Optional.ofNullable(r.getPid()).isPresent() && r.getPid().size() > 0) {
pids.addAll(mapPid(r.getPid(), r.getId()));
}
return pids.iterator();
}, Encoders.bean(CSVPid.class))
.filter(Objects::nonNull)
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(workingPath + "/" + resultType + "/result_pid");
}
private static <R extends Result> void mapResultToCSV(String resultType, String workingPath, Dataset<R> results) {
results
.map(
(MapFunction<R, CSVResult>) r -> mapResultInfo(r),
Encoders.bean(CSVResult.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(workingPath + "/" + resultType + "/result");
}
private static List<CSVPid> mapPid(List<StructuredProperty> pid, String resultId) {
@ -292,12 +446,24 @@ public class SparkDumpResults implements Serializable {
ret.setPublisher(removeBreaks(getFieldValue(r.getPublisher())));
if (Optional.ofNullable(r.getSubject()).isPresent())
ret.setKeywords(String.join(", ", r.getSubject().stream().map(s -> {
if (StringUtils.isNotEmpty(s.getValue()))
return removeBreaks(s.getValue().toLowerCase());
else
return null;
}).filter(Objects::nonNull).distinct().collect(Collectors.toList())));
ret
.setKeywords(
String
.join(
", ",
r
.getSubject()
.stream()
.filter(s -> !s.getQualifier().getClassid().equalsIgnoreCase("fos"))
.map(s -> {
if (StringUtils.isNotEmpty(s.getValue()))
return removeBreaks(s.getValue().toLowerCase());
else
return null;
})
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList())));
else
ret.setKeywords("");

View File

@ -20,10 +20,7 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVAuthor;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVPid;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVRelResAut;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVResult;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.*;
import eu.dnetlib.dhp.schema.oaf.*;
/**
@ -112,7 +109,6 @@ public class SparkMoveOnSigleDir implements Serializable {
.option("delimiter", Constants.SEP)
.option("compression", "gzip")
.csv(outputPath + "/result_author");
Utils
.readPath(spark, workingPath + "/publication/author", CSVAuthor.class)
.union(Utils.readPath(spark, workingPath + "/dataset/author", CSVAuthor.class))
@ -128,6 +124,74 @@ public class SparkMoveOnSigleDir implements Serializable {
.option("compression", "gzip")
.csv(outputPath + "/author");
Utils
.readPath(spark, workingPath + "/publication/result_collectedfrom", CSVRelResDat.class)
.union(Utils.readPath(spark, workingPath + "/dataset/result_collectedfrom", CSVRelResDat.class))
.union(Utils.readPath(spark, workingPath + "/software/result_collectedfrom", CSVRelResDat.class))
.union(
Utils.readPath(spark, workingPath + "/otherresearchproduct/result_collectedfrom", CSVRelResDat.class))
.write()
.mode(SaveMode.Overwrite)
.option("header", "true")
.option("delimiter", Constants.SEP)
.option("compression", "gzip")
.csv(outputPath + "/result_collectedfrom");
Utils
.readPath(spark, workingPath + "/publication/result_hostedby", CSVRelResDat.class)
.union(Utils.readPath(spark, workingPath + "/dataset/result_hostedby", CSVRelResDat.class))
.union(Utils.readPath(spark, workingPath + "/software/result_hostedby", CSVRelResDat.class))
.union(Utils.readPath(spark, workingPath + "/otherresearchproduct/result_hostedby", CSVRelResDat.class))
.write()
.mode(SaveMode.Overwrite)
.option("header", "true")
.option("delimiter", Constants.SEP)
.option("compression", "gzip")
.csv(outputPath + "/result_hostedby");
Utils
.readPath(spark, workingPath + "/publication/datasource", CSVDatasource.class)
.union(Utils.readPath(spark, workingPath + "/dataset/datasource", CSVDatasource.class))
.union(Utils.readPath(spark, workingPath + "/software/datasource", CSVDatasource.class))
.union(Utils.readPath(spark, workingPath + "/otherresearchproduct/datasource", CSVDatasource.class))
.groupByKey((MapFunction<CSVDatasource, String>) r -> r.getId(), Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, CSVDatasource, CSVDatasource>) (k, it) -> it.next(),
Encoders.bean(CSVDatasource.class))
.write()
.mode(SaveMode.Overwrite)
.option("header", "true")
.option("delimiter", Constants.SEP)
.option("compression", "gzip")
.csv(outputPath + "/datasource");
Utils
.readPath(spark, workingPath + "/publication/result_fos", CSVRelResFos.class)
.union(Utils.readPath(spark, workingPath + "/dataset/result_fos", CSVRelResFos.class))
.union(Utils.readPath(spark, workingPath + "/software/result_fos", CSVRelResFos.class))
.union(Utils.readPath(spark, workingPath + "/otherresearchproduct/result_fos", CSVRelResFos.class))
.write()
.mode(SaveMode.Overwrite)
.option("header", "true")
.option("delimiter", Constants.SEP)
.option("compression", "gzip")
.csv(outputPath + "/result_fos");
Utils
.readPath(spark, workingPath + "/publication/fos", CSVFos.class)
.union(Utils.readPath(spark, workingPath + "/dataset/fos", CSVFos.class))
.union(Utils.readPath(spark, workingPath + "/software/fos", CSVFos.class))
.union(Utils.readPath(spark, workingPath + "/otherresearchproduct/fos", CSVFos.class))
.groupByKey((MapFunction<CSVFos, String>) r -> r.getId(), Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, CSVFos, CSVFos>) (k, it) -> it.next(), Encoders.bean(CSVFos.class))
.write()
.mode(SaveMode.Overwrite)
.option("header", "true")
.option("delimiter", Constants.SEP)
.option("compression", "gzip")
.csv(outputPath + "/fos");
}
}

View File

@ -0,0 +1,52 @@
package eu.dnetlib.dhp.oa.graph.dump.csv.model;
import java.io.Serializable;
public class CSVDatasource implements Serializable {
private String id;
private String type;
private String officialName;
private String english_name;
private String interface_url;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getOfficialName() {
return officialName;
}
public void setOfficialName(String officialName) {
this.officialName = officialName;
}
public String getEnglish_name() {
return english_name;
}
public void setEnglish_name(String english_name) {
this.english_name = english_name;
}
public String getInterface_url() {
return interface_url;
}
public void setInterface_url(String interface_url) {
this.interface_url = interface_url;
}
}

View File

@ -0,0 +1,32 @@
package eu.dnetlib.dhp.oa.graph.dump.csv.model;
import java.io.Serializable;
public class CSVFos implements Serializable {
private String id;
private String label;
public static CSVFos newInstance(String s, String s1) {
CSVFos csf = new CSVFos();
csf.id = s;
csf.label = s1;
return csf;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getLabel() {
return label;
}
public void setLabel(String label) {
this.label = label;
}
}

View File

@ -0,0 +1,32 @@
package eu.dnetlib.dhp.oa.graph.dump.csv.model;
import java.io.Serializable;
public class CSVRelResDat implements Serializable {
private String result_id;
private String datasource_id;
public static CSVRelResDat newInstance(String result_id, String datasource_id) {
CSVRelResDat rrd = new CSVRelResDat();
rrd.result_id = result_id;
rrd.datasource_id = datasource_id;
return rrd;
}
public String getResult_id() {
return result_id;
}
public void setResult_id(String result_id) {
this.result_id = result_id;
}
public String getDatasource_id() {
return datasource_id;
}
public void setDatasource_id(String datasource_id) {
this.datasource_id = datasource_id;
}
}

View File

@ -0,0 +1,32 @@
package eu.dnetlib.dhp.oa.graph.dump.csv.model;
import java.io.Serializable;
public class CSVRelResFos implements Serializable {
private String result_id;
private String fos_id;
public String getResult_id() {
return result_id;
}
public void setResult_id(String result_id) {
this.result_id = result_id;
}
public String getFos_id() {
return fos_id;
}
public void setFos_id(String fos_id) {
this.fos_id = fos_id;
}
public static CSVRelResFos newInstance(String result_id, String fos_id) {
CSVRelResFos rrf = new CSVRelResFos();
rrf.result_id = result_id;
rrf.fos_id = fos_id;
return rrf;
}
}

View File

@ -3,16 +3,7 @@ package eu.dnetlib.dhp.oa.graph.dump.csv.model;
import java.io.Serializable;
import org.apache.commons.lang.StringUtils;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import eu.dnetlib.dhp.oa.graph.dump.csv.Constants;
import eu.dnetlib.dhp.schema.oaf.Country;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import sun.swing.StringUIClientPropertyKey;
/**
* @author miriam.baglioni

View File

@ -110,11 +110,11 @@
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--communities</arg><arg>${communities}</arg>
</spark>
<ok to="fork_dump_result_author_pid"/>
<ok to="fork_dump_result_author_datasource_fos_pid"/>
<error to="Kill"/>
</action>
<fork name="fork_dump_result_author_pid">
<fork name="fork_dump_result_author_datasource_fos_pid">
<path start="dump_publication"/>
<path start="dump_dataset"/>
<path start="dump_other"/>

View File

@ -7,7 +7,9 @@ import java.io.IOException;
import java.io.StringReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.FileUtils;
@ -33,6 +35,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.dump.UtilCommunityAPI;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVAuthor;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVResult;
@ -126,9 +129,7 @@ public class DumpResultTest {
Assertions.assertEquals(ModelConstants.OPEN_ACCESS_RIGHT().getClassid(), row.getAccessright());
Assertions.assertEquals("FI", row.getCountry());
Assertions.assertEquals("Lit.opg., bijl.", row.getDescription());
Assertions.assertEquals(3, split(row.getKeywords(), ", ").length);
Assertions.assertTrue(row.getKeywords().toString().contains("archeologie"));
Assertions.assertTrue(row.getKeywords().toString().contains("prospectie"));
Assertions.assertEquals(1, split(row.getKeywords(), ",").length);
Assertions.assertTrue(row.getKeywords().toString().contains("archaeology"));
Assertions.assertEquals("nl", row.getLanguage());
Assertions.assertEquals("2007-01-01", row.getPublication_date());
@ -244,7 +245,6 @@ public class DumpResultTest {
SparkDumpResults.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/output",
"-workingPath", workingDir.toString() + "/working",
"-resultType", "publication",
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
@ -257,9 +257,9 @@ public class DumpResultTest {
.read()
.option("header", "true")
.option("delimiter", Constants.SEP)
.csv(workingDir.toString() + "/working/publication/result_author");
.json(workingDir.toString() + "/working/publication/result_author");
Assertions.assertEquals(6, tmp.count());
Assertions.assertEquals(14, tmp.count());
Assertions.assertEquals(2, tmp.where("author_id == '" + DHPUtils.md5("0000-0003-2914-2734") + "'").count());
Assertions
@ -295,7 +295,6 @@ public class DumpResultTest {
SparkDumpResults.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/output",
"-workingPath", workingDir.toString() + "/working",
"-resultType", "publication",
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
@ -308,10 +307,10 @@ public class DumpResultTest {
.read()
.option("header", "true")
.option("delimiter", Constants.SEP)
.csv(workingDir.toString() + "/working/publication/result_pid");
.json(workingDir.toString() + "/working/publication/result_pid");
tmp.show(false);
Assertions.assertEquals(4, tmp.count());
Assertions.assertEquals(6, tmp.count());
Assertions
.assertEquals(2, tmp.where("result_id == '50|DansKnawCris::0224aae28af558f21768dbc6439c7a95'").count());
@ -325,6 +324,202 @@ public class DumpResultTest {
}
@Test
public void testDumpDatasource() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/input/")
.getPath();
spark
.read()
.text(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds")
.getPath())
.write()
.text(workingDir.toString() + "/working/resultIds/");
SparkDumpResults.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-workingPath", workingDir.toString() + "/working",
"-resultType", "publication",
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-sourcePath", sourcePath
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
Dataset<Row> tmp = spark
.read()
.option("header", "true")
.option("delimiter", Constants.SEP)
.json(workingDir.toString() + "/working/publication/datasource");
tmp.show(false);
Assertions.assertEquals(2, tmp.count());
tmp.foreach((ForeachFunction<Row>) d -> Assertions.assertEquals("Journal", d.getString(3)));
Assertions
.assertEquals(
"Reflektika",
tmp.where("id == '10|doajarticles::0f7fe9973717cb7ac2bba8685bc88a2a'").first().getString(0));
Assertions
.assertEquals(
"Reflektika",
tmp.where("id == '10|doajarticles::0f7fe9973717cb7ac2bba8685bc88a2a'").first().getString(2));
}
@Test
public void testDumpDatasourceRels() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/input/")
.getPath();
spark
.read()
.text(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds")
.getPath())
.write()
.text(workingDir.toString() + "/working/resultIds/");
SparkDumpResults.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-workingPath", workingDir.toString() + "/working",
"-resultType", "publication",
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-sourcePath", sourcePath
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
Dataset<Row> tmp = spark
.read()
.option("header", "true")
.option("delimiter", Constants.SEP)
.json(workingDir.toString() + "/working/publication/result_collectedfrom");
tmp.show(false);
Assertions.assertEquals(5, tmp.count());
}
@Test
public void testDumpDatasourceRels2() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/input/")
.getPath();
spark
.read()
.text(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds")
.getPath())
.write()
.text(workingDir.toString() + "/working/resultIds/");
SparkDumpResults.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-workingPath", workingDir.toString() + "/working",
"-resultType", "publication",
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-sourcePath", sourcePath
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
Dataset<Row> tmp = spark
.read()
.option("header", "true")
.option("delimiter", Constants.SEP)
.json(workingDir.toString() + "/working/publication/result_hostedby");
tmp.show(false);
Assertions.assertEquals(5, tmp.count());
}
@Test
public void testDumpTopic() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/input/")
.getPath();
spark
.read()
.text(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds")
.getPath())
.write()
.text(workingDir.toString() + "/working/resultIds/");
SparkDumpResults.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-workingPath", workingDir.toString() + "/working",
"-resultType", "publication",
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-sourcePath", sourcePath
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
Dataset<Row> tmp = spark
.read()
.option("header", "true")
.option("delimiter", Constants.SEP)
.json(workingDir.toString() + "/working/publication/fos");
tmp.show(false);
Assertions.assertEquals(8, tmp.count());
}
@Test
public void testDumpTopicRel() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/input/")
.getPath();
spark
.read()
.text(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds")
.getPath())
.write()
.text(workingDir.toString() + "/working/resultIds/");
SparkDumpResults.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-workingPath", workingDir.toString() + "/working",
"-resultType", "publication",
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-sourcePath", sourcePath
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
Dataset<Row> tmp = spark
.read()
.option("header", "true")
.option("delimiter", Constants.SEP)
.json(workingDir.toString() + "/working/publication/result_fos");
tmp.show(false);
Assertions.assertEquals(9, tmp.count());
}
@Test
public void prova() throws DocumentException {
String input = "<community id=\"dh-ch\" label=\"Digital Humanities and Cultural Heritage\">" +
@ -355,4 +550,10 @@ public class DumpResultTest {
System.out.println(st.replace("\\\"", " "));
}
@Test
public void getCommunitiesFromApi() {
UtilCommunityAPI queryCommunityApi = new UtilCommunityAPI();
queryCommunityApi.getCommunityCsv(Arrays.asList("eosc")).forEach(r -> System.out.println(r));
}
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long