first implementation for the dump in csv of the subset of the graph related to specific communities. The only relations considered are the cites. the source must be within the set of communties, the target con be outside => we also have to map nodes not related to the communities of interest. These communities are given as parameter

This commit is contained in:
Miriam Baglioni 2023-05-11 16:44:54 +02:00
parent b6e0c7d660
commit d0f144d422
18 changed files with 1445 additions and 0 deletions

View File

@ -1,12 +1,18 @@
package eu.dnetlib.dhp.oa.graph.dump;
import java.io.BufferedWriter;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import eu.dnetlib.dhp.oa.graph.dump.csv.Constats;
import eu.dnetlib.dhp.oa.graph.dump.csv.DumpCommunities;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import org.xml.sax.SAXException;
@ -71,4 +77,25 @@ public class QueryInformationSystem {
return map;
}
public List<String> getCommunityCsv(String toString) throws ISLookUpException, SAXException, DocumentException {
List<String> communities = new ArrayList<>();
for (String xml : isLookUp.quickSearchProfile(toString)) {
final Document doc;
final SAXReader reader = new SAXReader();
reader.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
doc = reader.read(new StringReader(xml));
Element root = doc.getRootElement();
StringBuilder builder = new StringBuilder();
builder.append(DHPUtils.md5(root.attribute("id").getValue()));
builder.append(Constats.SEP);
builder.append(root.attribute("label").getValue());
builder.append(Constats.SEP);
builder.append(root.attribute("id").getValue());
builder.append(Constats.SEP);
builder.append(((Node) (root .selectNodes("/description").get(0))).getText());
communities.add(builder.toString());
}
return communities;
}
}

View File

@ -0,0 +1,93 @@
package eu.dnetlib.dhp.oa.graph.dump.csv;
import eu.dnetlib.dhp.utils.DHPUtils;
import java.io.Serializable;
/**
* @author miriam.baglioni
* @Date 05/05/23
*/
public class AuthorResult implements Serializable {
private String authorId;
private String firstName;
private String lastName;
private String fullName;
private String orcid;
private String resultId;
private String rank;
public String getFullName() {
return fullName;
}
public void setFullName(String fullName) {
this.fullName = fullName;
}
public String getAuthorId() {
return authorId;
}
public void setAuthorId(String authorId) {
this.authorId = authorId;
}
public String getResultId() {
return resultId;
}
public void setResultId(String resultId) {
this.resultId = resultId;
}
public String getRank() {
return rank;
}
public void setRank(String rank) {
this.rank = rank;
}
public String getId() {
return authorId;
}
public void setId(String id) {
this.authorId = id;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public String getOrcid() {
return orcid;
}
public void setOrcid(String orcid) {
this.orcid = orcid;
}
public void autosetId() {
if(orcid != null){
authorId = DHPUtils.md5(orcid);
}else{
authorId = DHPUtils.md5(resultId + firstName + lastName + rank);
}
}
}

View File

@ -0,0 +1,11 @@
package eu.dnetlib.dhp.oa.graph.dump.csv;
import java.io.Serializable;
/**
* @author miriam.baglioni
* @Date 10/05/23
*/
public class Constats implements Serializable {
public final static String SEP = "\t";
}

View File

@ -0,0 +1,108 @@
package eu.dnetlib.dhp.oa.graph.dump.csv;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem;
import eu.dnetlib.dhp.oa.graph.dump.SaveCommunityMap;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
/**
* @author miriam.baglioni
* @Date 09/05/23
*/
public class DumpCommunities implements Serializable {
private static final Logger log = LoggerFactory.getLogger(DumpCommunities.class);
private final BufferedWriter writer;
private final transient QueryInformationSystem queryInformationSystem;
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
DumpCommunities.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_dump_csv_step3.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String workingPath = parser.get("workingPath");
final String nameNode = parser.get("nameNode");
log.info("nameNode: {}", nameNode);
final DumpCommunities dc = new DumpCommunities(outputPath, nameNode, parser.get("isLookUp)"));
dc.writeCommunity();
}
private void writeCommunity() throws IOException, ISLookUpException, DocumentException, SAXException {
for(String community : queryInformationSystem.getCommunityCsv(IOUtils.toString(
DumpCommunities.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/dump/xqueries/set_of_communities.xq"))))
{
writer
.write(
community);
writer.write("\n");
}
writer.close();
}
public DumpCommunities(String hdfsPath, String hdfsNameNode, String isLookUpUrl) throws Exception {
final Configuration conf = new Configuration();
queryInformationSystem= new QueryInformationSystem();
queryInformationSystem.setIsLookUp(Utils.getIsLookUpService(isLookUpUrl));
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
Path hdfsWritePath = new Path(hdfsPath);
if (fileSystem.exists(hdfsWritePath)) {
fileSystem.delete(hdfsWritePath, true);
}
FSDataOutputStream fos = fileSystem.create(hdfsWritePath);
writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8));
}
}

View File

@ -0,0 +1,288 @@
package eu.dnetlib.dhp.oa.graph.dump.csv;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVAuthor;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVPid;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVResult;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
/**
* @author miriam.baglioni
* @Date 04/05/23
*/
public class SparkDumpResults implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkDumpResults.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkDumpResults.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_dump_csv_step2.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String resultType = parser.get("resultType");
log.info("resultType: {}", resultType);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final String workingPath = parser.get("workingPath");
Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath );
run(spark, inputPath, outputPath, inputClazz, resultType, workingPath);
});
}
private static <R extends Result> void run(SparkSession spark, String inputPath, String outputPath,
Class<R> inputClazz, String resultType, String workingPath) {
Dataset<String> resultIds = spark.read().textFile(workingPath + "/resultIds");
Dataset<R> results = Utils
.readPath(spark, inputPath + "/" + resultType, inputClazz)
.filter((FilterFunction<R>) p -> !p.getDataInfo().getDeletedbyinference() && !p.getDataInfo().getInvisible());
// map results
resultIds.joinWith(results, resultIds.col("value").equalTo(results.col("id")))
.map((MapFunction<Tuple2<String,R>, CSVResult>) t2 -> mapResultInfo(t2._2()), Encoders.bean(CSVResult.class) )
.write()
.option("compression","gzip")
.mode(SaveMode.Overwrite)
.csv(workingPath + "/" + resultType + "/result");
// map relations between pid and result
resultIds.joinWith(results, resultIds.col("value").equalTo(results.col("id")))
.flatMap((FlatMapFunction<Tuple2<String,R>, CSVPid>) t2 ->
{
List<CSVPid> pids = new ArrayList<>();
if(Optional.ofNullable(t2._2().getPid()).isPresent() && t2._2().getPid().size() > 0){
pids.addAll(mapPid(t2._2().getPid(), t2._1()));
}
return pids.iterator();
}, Encoders.bean(CSVPid.class))
.filter(Objects::nonNull)
.write()
.option("compression","gzip")
.mode(SaveMode.Overwrite)
.csv(workingPath + "/" + resultType + "/result_pid");
//map authors from the result
//per ogni autore nel result
//se l'autore ha un orcid il suo id dipende dall'orcid (tipo md5(orcid))
//se non ha orcid il suo id si costruisce come result_id + author_name + authorrank ( se non ha il rank si sua
//la sua posizione nell'insieme degli autori) sempre con md5
Dataset<AuthorResult> authorResult = resultIds.joinWith(results, resultIds.col("value").equalTo(results.col("id")))
.flatMap((FlatMapFunction<Tuple2<String, R>, AuthorResult>) t2 ->
{
int count = 0;
List<AuthorResult> arl = new ArrayList<>();
for (Author a : t2._2().getAuthor()) {
count += 1;
AuthorResult ar = new AuthorResult();
ar.setResultId(t2._1());
if (Optional.ofNullable(a.getRank()).isPresent()) {
if (a.getRank() > 0) {
ar.setRank(String.valueOf(a.getRank()));
} else {
ar.setRank(String.valueOf(count));
}
}
ar.setFirstName(a.getName());
ar.setLastName(a.getSurname());
ar.setFullName(a.getFullname());
ar.setOrcid(getOrcid(a.getPid()));
ar.autosetId();
arl.add(ar);
}
return arl.iterator();
}
, Encoders.bean(AuthorResult.class));
// map the relation between author and result
authorResult.map((MapFunction<AuthorResult, String>) ar -> ar.getResultId() + Constats.SEP + ar.getAuthorId(), Encoders.STRING() )
.write()
.option("compression","gzip")
.mode(SaveMode.Overwrite)
.csv(workingPath + "/" + resultType + "/result_author");
// ma the authors in the working dir. I do not want to have them repeated
authorResult.groupByKey((MapFunction<AuthorResult, String>) ar -> ar.getAuthorId(), Encoders.STRING() )
.mapGroups((MapGroupsFunction<String, AuthorResult, CSVAuthor>) (k, it) -> getAuthorDump(it.next()) , Encoders.bean(CSVAuthor.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.csv(workingPath + "/" + resultType + "/author");
}
private static List<CSVPid> mapPid(List<StructuredProperty> pid, String resultId) {
return pid.stream().map(p -> {
CSVPid ret = new CSVPid();
ret.setId(DHPUtils.md5(p.getQualifier().getClassid() + p.getValue()));
ret.setResult_id(resultId);
ret.setPid(p.getValue());
ret.setType(p.getQualifier().getClassid());
return ret;
}).collect(Collectors.toList());
}
private static CSVAuthor getAuthorDump(AuthorResult ar) {
CSVAuthor ret = new CSVAuthor();
ret.setFirstname(ar.getFirstName());
ret.setId(ar.getAuthorId());
ret.setLastname(ar.getLastName());
ret.setFullname(ar.getFullName());
if(ar.getOrcid() != null){
ret.setOrcid(ar.getOrcid());
}else{
ret.setOrcid("");
}
return ret;
}
private static String getOrcid(List<StructuredProperty> pid) {
if(!Optional.ofNullable(pid).isPresent())
return null;
if(pid.size() == 0)
return null;
for(StructuredProperty p : pid){
if (p.getQualifier().getClassid().equals(ModelConstants.ORCID)){
return p.getValue();
}
}
return null;
}
private static <R extends Result> CSVResult mapResultInfo(R r) {
CSVResult ret = new CSVResult();
ret.setId(r.getId());
ret.setType(r.getResulttype().getClassid());
ret.setTitle(getTitle(r.getTitle()));
ret.setDescription(getAbstract(r.getDescription()));
ret.setAccessright(r.getBestaccessright().getClassid());
ret.setPublication_date(r.getDateofacceptance().getValue());
if (StringUtils.isNotEmpty(r.getPublisher().getValue())) {
ret.setPublisher(r.getPublisher().getValue());
} else {
ret.setPublisher("");
}
StringBuilder sbjs = new StringBuilder();
for(StructuredProperty sbj : r.getSubject()){
if(StringUtils.isNotEmpty(sbj.getValue())){
sbjs.append(sbj.getValue());
sbjs.append(",");
}
}
ret.setKeywords(sbjs.toString());
StringBuilder countries = new StringBuilder();
for(Country c: r.getCountry()){
if(StringUtils.isNotEmpty(c.getClassid())){
countries.append(c.getClassid());
}
}
ret.setCountry(countries.toString());
if(StringUtils.isNotEmpty(r.getLanguage().getClassid())){
ret.setLanguage(r.getLanguage().getClassid());
}else{
ret.setLanguage("");
}
return ret;
}
private static String getAbstract(List<Field<String>> description) {
for(Field<String> abs:description){
if(StringUtils.isNotEmpty(abs.getValue())){
return abs.getValue();
}
}
return "";
}
private static String getTitle(List<StructuredProperty> titles) {
String firstTitle = null;
for(StructuredProperty title : titles){
if(StringUtils.isEmpty(firstTitle)){
if(StringUtils.isNotEmpty(title.getValue()))
firstTitle = title.getValue();
}
if(title.getQualifier().getClassid().equals(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid())){
if(StringUtils.isNotEmpty(title.getValue()))
return title.getValue();
}
}
return "";
}
}

View File

@ -0,0 +1,109 @@
package eu.dnetlib.dhp.oa.graph.dump.csv;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
/**
* @author miriam.baglioni
* @Date 10/05/23
*/
public class SparkMoveOnSigleDir implements Serializable {
//All the products saved in different directories are put under the same one.
// For the authors also a step of reconciliation mast be done, since the same author id can be saved in more that one directory
private static final Logger log = LoggerFactory.getLogger(SparkMoveOnSigleDir.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkMoveOnSigleDir.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_dump_csv_step2.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath );
run(spark, outputPath, workingPath);
});
}
private static <R extends Result> void run(SparkSession spark, String outputPath,
String workingPath) {
spark.read().textFile(workingPath + "/publication/result", workingPath + "/dataset/result", workingPath + "/software/result", workingPath + "/otherresearchproduct/result")
.write()
.mode(SaveMode.Overwrite)
.csv(outputPath + "/result");
spark.read().textFile(workingPath + "/publication/result_pid", workingPath + "/dataset/result_pid", workingPath + "/software/result_pid", workingPath + "/otherresearchproduct/result_pid")
.write()
.mode(SaveMode.Overwrite)
.csv(outputPath + "/result_pid");
spark.read().textFile(workingPath + "/publication/result_author", workingPath + "/dataset/result_author", workingPath + "/software/result_author", workingPath + "/otherresearchproduct/result_author")
.write()
.mode(SaveMode.Overwrite)
.csv(outputPath + "/result_author");
spark.read().textFile(workingPath + "/publication/result_author", workingPath + "/dataset/result_author", workingPath + "/software/result_author", workingPath + "/otherresearchproduct/result_author")
.groupByKey((MapFunction<String, String>) a -> a.split("\t")[0], Encoders.STRING())
.mapGroups((MapGroupsFunction<String, String, String>) (k, it) -> it.next(), Encoders.STRING() )
.write()
.mode(SaveMode.Overwrite)
.csv(outputPath + "/author");
}
}

View File

@ -0,0 +1,181 @@
package eu.dnetlib.dhp.oa.graph.dump.csv;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVCitation;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVRELCommunityResult;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
/**
* @author miriam.baglioni
* @Date 04/05/23
*/
public class SparkSelectResultsAndDumpRelations implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkSelectResultsAndDumpRelations.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkSelectResultsAndDumpRelations.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_dump_csv_step1.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("isLoo");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String workingPath = parser.get("workingPath");
List<String> communityList = null;
Optional<String> communities = Optional.ofNullable(parser.get("communities"));
if(communities.isPresent()){
communityList = Arrays.asList(communities.get().split(";"));
}
SparkConf conf = new SparkConf();
List<String> finalCommunityList = communityList;
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath );
run(spark, inputPath, outputPath, workingPath, finalCommunityList);
});
}
private static <R extends Result> void run(SparkSession spark, String inputPath, String outputPath, String workingPath,
List<String> communityList) {
//select the result ids related to the set of communities considered
writeCommunityRelatedIds(spark, inputPath + "/publication", Publication.class, communityList, workingPath + "/communityResultIds");
writeCommunityRelatedIds(spark, inputPath + "/dataset", Dataset.class, communityList, workingPath + "/communityResultIds");
writeCommunityRelatedIds(spark, inputPath + "/software", Software.class, communityList, workingPath + "/communityResultIds" );
writeCommunityRelatedIds(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class, communityList, workingPath + "/communityResultIds");
//write the relations result communities
writeCommunityResultRelations(spark, inputPath + "/publication", Publication.class, communityList, workingPath + "/communityResultIds");
writeCommunityResultRelations(spark, inputPath + "/dataset", Dataset.class, communityList, workingPath + "/communityResultIds");
writeCommunityResultRelations(spark, inputPath + "/software", Software.class, communityList, workingPath + "/communityResultIds" );
writeCommunityResultRelations(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class, communityList, workingPath + "/communityResultIds");
//select the relations with semantics cites
org.apache.spark.sql.Dataset<Relation> relations = Utils.readPath(spark, inputPath + "/relation", Relation.class)
.filter((FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
r.getRelClass().equals(ModelConstants.CITES));
//select the results target of the selected relations having as source one of the results related to the communities
org.apache.spark.sql.Dataset<String> resultIds = spark.read().textFile(outputPath + "/communityResultIds").distinct();
resultIds.joinWith(relations, resultIds.col("value").equalTo(relations.col("source")), "left")
.flatMap((FlatMapFunction<Tuple2<String, Relation>, String>) t2 -> {
if(Optional.ofNullable(t2._2()).isPresent()){
return Arrays.asList(t2._1(), t2._2().getTarget()).iterator();
}else{
return Arrays.asList(t2._1()).iterator();
}
}, Encoders.STRING())
.distinct()
.write()
.mode(SaveMode.Overwrite)
.option("compression" , "gzip")
.text(workingPath + "/resultIds");
resultIds.joinWith(relations, resultIds.col("value").equalTo(relations.col("source")))
.map((MapFunction<Tuple2<String, Relation>, CSVCitation>) t2 -> mapToCitation(t2._2()), Encoders.bean(CSVCitation.class) )
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.csv(outputPath + "/relation");
}
private static CSVCitation mapToCitation(Relation relation) {
CSVCitation ret = new CSVCitation();
ret.setId(DHPUtils.md5(relation.getSource() + relation.getRelClass().toLowerCase() + relation.getTarget()));
ret.setResult_id_cites(relation.getSource());
ret.setResult_id_cited(relation.getTarget());
return ret;
}
private static <R extends Result> void writeCommunityResultRelations(SparkSession spark, String inputPath, Class<R> clazz, List<String> communityList, String outputPath) {
Utils
.readPath(spark, inputPath , clazz)
.filter((FilterFunction<R>) p -> !p.getDataInfo().getDeletedbyinference() &&
!p.getDataInfo().getInvisible() )
.flatMap((FlatMapFunction<R, CSVRELCommunityResult>) p-> {
List<CSVRELCommunityResult> ret = new ArrayList<>();
for(Context context : p.getContext()){
String cId = context.getId().contains("::") ? context.getId().substring(0, context.getId().indexOf("::")) : context.getId();
if (communityList.contains(cId)){
CSVRELCommunityResult crc = new CSVRELCommunityResult();
crc.setResult_id(p.getId());
crc.setCommunity_id(DHPUtils.md5(cId));
}
}
return ret.iterator();
}, Encoders.bean(CSVRELCommunityResult.class) )
.write()
.option("compression","gzip")
.mode(SaveMode.Append)
.text(outputPath );
}
private static <R extends Result> void writeCommunityRelatedIds(SparkSession spark, String inputPath, Class<R> clazz, List<String> communityList, String outputPath) {
Utils
.readPath(spark, inputPath , clazz)
.filter((FilterFunction<R>) p -> !p.getDataInfo().getDeletedbyinference() &&
!p.getDataInfo().getInvisible() &&
isRelatedToCommunities(p, communityList))
.map((MapFunction<R, String>) p-> p.getId(), Encoders.STRING() )
.write()
.option("compression","gzip")
.mode(SaveMode.Append)
.text(outputPath );
}
private static <R extends Result> boolean isRelatedToCommunities(R p, List<String> communityList) {
return p.getContext().stream().anyMatch(c -> communityList.contains(c.getId()) ||
(c.getId().contains("::") && communityList.contains(c.getId().substring(0, c.getId().indexOf("::")))));
}
}

View File

@ -0,0 +1,57 @@
package eu.dnetlib.dhp.oa.graph.dump.csv.model;
import java.io.Serializable;
/**
* @author miriam.baglioni
* @Date 11/05/23
*/
public class CSVAuthor implements Serializable {
private String id;
private String firstname;
private String lastname;
private String fullname;
private String orcid;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getFirstname() {
return firstname;
}
public void setFirstname(String firstname) {
this.firstname = firstname;
}
public String getLastname() {
return lastname;
}
public void setLastname(String lastname) {
this.lastname = lastname;
}
public String getFullname() {
return fullname;
}
public void setFullname(String fullname) {
this.fullname = fullname;
}
public String getOrcid() {
return orcid;
}
public void setOrcid(String orcid) {
this.orcid = orcid;
}
}

View File

@ -0,0 +1,37 @@
package eu.dnetlib.dhp.oa.graph.dump.csv.model;
import java.io.Serializable;
/**
* @author miriam.baglioni
* @Date 11/05/23
*/
public class CSVCitation implements Serializable {
private String id;
private String result_id_cites;
private String result_id_cited;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getResult_id_cites() {
return result_id_cites;
}
public void setResult_id_cites(String result_id_cites) {
this.result_id_cites = result_id_cites;
}
public String getResult_id_cited() {
return result_id_cited;
}
public void setResult_id_cited(String result_id_cited) {
this.result_id_cited = result_id_cited;
}
}

View File

@ -0,0 +1,51 @@
package eu.dnetlib.dhp.oa.graph.dump.csv.model;
import eu.dnetlib.dhp.oa.graph.dump.csv.Constats;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.DHPUtils;
import java.io.Serializable;
/**
* @author miriam.baglioni
* @Date 11/05/23
*/
public class CSVPid implements Serializable {
private String id;
private String result_id;
private String pid;
private String type;
public String getResult_id() {
return result_id;
}
public void setResult_id(String result_id) {
this.result_id = result_id;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}

View File

@ -0,0 +1,28 @@
package eu.dnetlib.dhp.oa.graph.dump.csv.model;
import java.io.Serializable;
/**
* @author miriam.baglioni
* @Date 11/05/23
*/
public class CSVRELCommunityResult implements Serializable {
private String result_id;
private String community_id;
public String getResult_id() {
return result_id;
}
public void setResult_id(String result_id) {
this.result_id = result_id;
}
public String getCommunity_id() {
return community_id;
}
public void setCommunity_id(String community_id) {
this.community_id = community_id;
}
}

View File

@ -0,0 +1,28 @@
package eu.dnetlib.dhp.oa.graph.dump.csv.model;
import java.io.Serializable;
/**
* @author miriam.baglioni
* @Date 11/05/23
*/
public class CSVRelResAut implements Serializable {
private String result_id;
private String author_id;
public String getResult_id() {
return result_id;
}
public void setResult_id(String result_id) {
this.result_id = result_id;
}
public String getAuthor_id() {
return author_id;
}
public void setAuthor_id(String author_id) {
this.author_id = author_id;
}
}

View File

@ -0,0 +1,108 @@
package eu.dnetlib.dhp.oa.graph.dump.csv.model;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import eu.dnetlib.dhp.schema.oaf.Country;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import org.apache.commons.lang.StringUtils;
import java.io.Serializable;
/**
* @author miriam.baglioni
* @Date 11/05/23
*/
public class CSVResult implements Serializable {
private String id;
private String type;
private String title;
private String description;
private String accessright;
private String publication_date;
private String publisher;
private String keywords;
private String country;
private String language;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getAccessright() {
return accessright;
}
public void setAccessright(String accessright) {
this.accessright = accessright;
}
public String getPublication_date() {
return publication_date;
}
public void setPublication_date(String publication_date) {
this.publication_date = publication_date;
}
public String getPublisher() {
return publisher;
}
public void setPublisher(String publisher) {
this.publisher = publisher;
}
public String getKeywords() {
return keywords;
}
public void setKeywords(String keywords) {
this.keywords = keywords;
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
public String getLanguage() {
return language;
}
public void setLanguage(String language) {
this.language = language;
}
}

View File

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,262 @@
<workflow-app name="dump_graph" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
<property>
<name>communities</name>
<description>the communities whose products should be dumped</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="fork_select_result" />
<fork name="fork_select_result">
<path start="select_publication"/>
<path start="select_dataset"/>
<path start="select_orp"/>
<path start="select_software"/>
</fork>
<action name="select_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>select results from publication </name>
<class>eu.dnetlib.dhp.oa.graph.dump.csv.SparkDumpResults</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}</arg>
<arg>--communities</arg><arg>${communities}</arg>
<arg>--resultType</arg><arg>publication</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="select_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>select results from dataset </name>
<class>eu.dnetlib.dhp.oa.graph.dump.csv.SparkDumpResults</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}</arg>
<arg>--communities</arg><arg>${communities}</arg>
<arg>--resultType</arg><arg>dataset</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="select_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>select results from other </name>
<class>eu.dnetlib.dhp.oa.graph.dump.csv.SparkDumpResults</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}</arg>
<arg>--communities</arg><arg>${communities}</arg>
<arg>--resultType</arg><arg>otherresearchproduct</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="select_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>select results from software</name>
<class>eu.dnetlib.dhp.oa.graph.dump.csv.SparkDumpResults</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}</arg>
<arg>--communities</arg><arg>${communities}</arg>
<arg>--resultType</arg><arg>software</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="dump_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table project </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpEntitiesJob</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/project</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--outputPath</arg><arg>${workingDir}/project</arg>
<arg>--communityMapPath</arg><arg>noneed</arg>
</spark>
<ok to="get_new_projects"/>
<error to="Kill"/>
</action>
<action name="get_new_projects">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table project </name>
<class>eu.dnetlib.dhp.oa.graph.dump.projectssubset.ProjectsSubsetSparkJob</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/project</arg>
<arg>--outputPath</arg><arg>${workingDir}/tar/project</arg>
<arg>--projectListPath</arg><arg>${projectListPath}</arg>
</spark>
<ok to="make_archive"/>
<error to="Kill"/>
</action>
<action name="make_archive">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.MakeTar</main-class>
<arg>--hdfsPath</arg><arg>${outputPath}</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--sourcePath</arg><arg>${workingDir}/tar</arg>
</java>
<ok to="send_zenodo"/>
<error to="Kill"/>
</action>
<action name="send_zenodo">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.SendToZenodoHDFS</main-class>
<arg>--hdfsPath</arg><arg>${outputPath}</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--accessToken</arg><arg>${accessToken}</arg>
<arg>--connectionUrl</arg><arg>${connectionUrl}</arg>
<arg>--metadata</arg><arg>${metadata}</arg>
<arg>--conceptRecordId</arg><arg>${conceptRecordId}</arg>
<arg>--depositionType</arg><arg>${depositionType}</arg>
<arg>--depositionId</arg><arg>${depositionId}</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,8 @@
for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')
where $x//CONFIGURATION/context[./@type='community' or ./@type='ri']
and ($x//context/param[./@name = 'status']/text() = 'all')
return
<community>
{$x//CONFIGURATION/context/@id}
{$x//CONFIGURATION/context/@label}
</community>

View File

@ -0,0 +1,11 @@
for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')
where $x//CONFIGURATION/context[./@type='community' or ./@type='ri']
and ($x//CONFIGURATION/context[./@id='dh-ch'] or $x//CONFIGURATION/context[./@id='dariah'] or $x//CONFIGURATION/context[./@id='enermaps'] or $x//CONFIGURATION/context[./@id='beopen'])
return
<community>
{$x//CONFIGURATION/context/@id}
{$x//CONFIGURATION/context/@label}
<description>
{$x//CONFIGURATION/context/param[@name='description']/text()}
</description>
</community>

View File

@ -0,0 +1,8 @@
for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')
where $x//CONFIGURATION/context[./@type='community' or ./@type='ri']
and $x//CONFIGURATION/context[./@id=%s]
return
<community>
{$x//CONFIGURATION/context/@id}
{$x//CONFIGURATION/context/@label}
</community>