1
0
Fork 0

new classes to execute the dump for products associated to community, enrich each result with project information and assign the result to each community it belongs to

This commit is contained in:
Miriam Baglioni 2020-06-09 15:39:03 +02:00
parent 5121cbaf6a
commit 6bbe27587f
10 changed files with 510 additions and 42 deletions

View File

@ -1,4 +1,7 @@
package eu.dnetlib.dhp.oa.graph.dump;
public class CommunityMap {
import java.util.HashMap;
public class CommunityMap extends HashMap<String, String> {
}

View File

@ -1,4 +1,18 @@
package eu.dnetlib.dhp.oa.graph.dump;
import com.google.common.collect.Maps;
import java.util.Map;
public class Constants {
public static final Map<String, String> accessRightsCoarMap = Maps.newHashMap();
static {
accessRightsCoarMap.put("OPEN", "http://purl.org/coar/access_right/c_abf2");
accessRightsCoarMap.put("RESTRICTED", "http://purl.org/coar/access_right/c_16ec");
accessRightsCoarMap.put("OPEN SOURCE", "http://purl.org/coar/access_right/c_abf2");
accessRightsCoarMap.put("CLOSED", "http://purl.org/coar/access_right/c_14cb //metadataonly for coar");
accessRightsCoarMap.put("EMBARGO", "http://purl.org/coar/access_right/c_f1cf");
}
}

View File

@ -1,4 +1,223 @@
package eu.dnetlib.dhp.oa.graph.dump;
public class Mapper {
import eu.dnetlib.dhp.schema.dump.oaf.*;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import scala.collection.immutable.Stream;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
public class Mapper implements Serializable {
public static <I extends eu.dnetlib.dhp.schema.oaf.Result, O extends eu.dnetlib.dhp.schema.dump.oaf.Result> O map(
I input, Map<String,String> communityMap){
O out = null;
switch (input.getResulttype().getClassid()){
case "publication":
out = (O)new Publication();
Optional<Journal> journal = Optional.ofNullable(((eu.dnetlib.dhp.schema.oaf.Publication) input).getJournal());
if(journal.isPresent()){
Journal j = journal.get();
Container c = new Container();
c.setConferencedate(j.getConferencedate());
c.setConferenceplace(j.getConferenceplace());
c.setEdition(j.getEdition());
c.setEp(j.getEp());
c.setIss(j.getIss());
c.setIssnLinking(j.getIssnLinking());
c.setIssnOnline(j.getIssnOnline());
c.setIssnPrinted(j.getIssnPrinted());
c.setName(j.getName());
c.setSp(j.getSp());
c.setVol(j.getVol());
out.setContainer(c);
}
break;
case "dataset":
Dataset d = new Dataset();
eu.dnetlib.dhp.schema.oaf.Dataset id = (eu.dnetlib.dhp.schema.oaf.Dataset)input;
d.setSize(id.getSize().getValue());
d.setVersion(id.getVersion().getValue());
List<eu.dnetlib.dhp.schema.oaf.GeoLocation> igl = id.getGeolocation();
d.setGeolocation(igl.stream()
.filter(Objects::nonNull)
.map(gli -> {
GeoLocation gl = new GeoLocation();
gl.setBox(gli.getBox());
gl.setPlace(gli.getPlace());
gl.setPoint(gli.getPoint());
return gl;
}).collect(Collectors.toList()));
out = (O)d;
break;
case "software":
Software s = new Software();
eu.dnetlib.dhp.schema.oaf.Software is = (eu.dnetlib.dhp.schema.oaf.Software)input;
s.setCodeRepositoryUrl(is.getCodeRepositoryUrl().getValue());
s.setDocumentationUrl(is.getDocumentationUrl()
.stream()
.map(du -> du.getValue()).collect(Collectors.toList()));
s.setProgrammingLanguage(is.getProgrammingLanguage().getClassid());
out = (O) s;
break;
case "otherresearchproduct":
OtherResearchProduct or = new OtherResearchProduct();
eu.dnetlib.dhp.schema.oaf.OtherResearchProduct ir = (eu.dnetlib.dhp.schema.oaf.OtherResearchProduct)input;
or.setContactgroup(ir.getContactgroup().stream().map(cg -> cg.getValue()).collect(Collectors.toList()));
or.setContactperson(ir.getContactperson().stream().map(cp->cp.getValue()).collect(Collectors.toList()));
or.setTool(ir.getTool().stream().map(t -> t.getValue()).collect(Collectors.toList()));
out = (O) or;
break;
}
out.setAuthor(input.getAuthor()
.stream()
.map(oa -> {
Author a = new Author();
a.setAffiliation(oa.getAffiliation().stream().map(aff -> aff.getValue()).collect(Collectors.toList()));
a.setFullname(oa.getFullname());
a.setName(oa.getName());
a.setSurname(oa.getSurname());
a.setRank(oa.getRank());
a.setPid(oa.getPid().stream().map(p -> {
ControlledField cf = new ControlledField();
cf.setScheme( p.getQualifier().getClassid());
cf.setValue( p.getValue());
return cf;
}).collect(Collectors.toList()));
return a;
}).collect(Collectors.toList()));
//I do not map Access Right UNKNOWN or OTHER
if (Constants.accessRightsCoarMap.containsKey(input.getBestaccessright().getClassid())){
AccessRight ar = new AccessRight();
ar.setSchema(Constants.accessRightsCoarMap.get(input.getBestaccessright().getClassid()));
ar.setCode(input.getBestaccessright().getClassid());
ar.setLabel(input.getBestaccessright().getClassname());
out.setBestaccessright(ar);
}
out.setCollectedfrom(input.getCollectedfrom().stream().map(cf -> KeyValue.newInstance(cf.getKey(), cf.getValue()))
.collect(Collectors.toList()));
Set<String> communities = communityMap.keySet();
List<Context> contextList = input.getContext()
.stream()
.map(c -> {
if(communities.contains(c.getId())){
Context context = new Context();
context.setCode(c.getId());
context.setLabel(communityMap.get(c.getId()));
Optional<List<DataInfo>> dataInfo = Optional.ofNullable(c.getDataInfo());
if(dataInfo.isPresent()){
context.setProvenance(dataInfo.get().stream()
.map(di -> {
if (di.getInferred()){
return di.getProvenanceaction().getClassid();
}
return null;
}).filter(Objects::nonNull)
.collect(Collectors.toList()));
}
return context;
}
return null;
}
).filter(Objects::nonNull)
.collect(Collectors.toList());
if(contextList.size() > 0){
out.setContext(contextList);
}
out.setContributor(input.getContributor()
.stream()
.map(c -> c.getValue()).collect(Collectors.toList()));
out.setCountry(input.getCountry()
.stream()
.map(c -> {
Country country = new Country();
country.setCode(c.getClassid());
country.setLabel(c.getClassname());
Optional<DataInfo> dataInfo = Optional.ofNullable(c.getDataInfo());
if(dataInfo.isPresent()){
country.setProvenance(dataInfo.get().getProvenanceaction().getClassid());
}
return country;
}).collect(Collectors.toList()));
out.setCoverage(input.getCoverage().stream().map(c->c.getValue()).collect(Collectors.toList()));
out.setDateofcollection(input.getDateofcollection());
out.setDescription(input.getDescription().stream().map(d->d.getValue()).collect(Collectors.toList()));
out.setEmbargoenddate(input.getEmbargoenddate().getValue());
out.setFormat(input.getFormat().stream().map(f->f.getValue()).collect(Collectors.toList()));
out.setId(input.getId());
out.setOriginalId(input.getOriginalId());
out.setInstance(input.getInstance()
.stream()
.map(i -> {
Instance instance = new Instance();
AccessRight ar = new AccessRight();
ar.setCode(i.getAccessright().getClassid());
ar.setLabel(i.getAccessright().getClassname());
if(Constants.accessRightsCoarMap.containsKey(i.getAccessright().getClassid())){
ar.setSchema(Constants.accessRightsCoarMap.get(i.getAccessright().getClassid()));
}
instance.setAccessright(ar);
instance.setCollectedfrom(KeyValue.newInstance(i.getCollectedfrom().getKey(), i.getCollectedfrom().getValue()));
instance.setHostedby(KeyValue.newInstance(i.getHostedby().getKey(),i.getHostedby().getValue()));
instance.setLicense(i.getLicense().getValue());
instance.setPublicationdata(i.getDateofacceptance().getValue());
instance.setRefereed(i.getRefereed().getValue());
instance.setType(i.getInstancetype().getClassid());
instance.setUrl(i.getUrl());
return instance;
}).collect(Collectors.toList()));
out.setLanguage(Qualifier.newInstance(input.getLanguage().getClassid(), input.getLanguage().getClassname()));
out.setLastupdatetimestamp(input.getLastupdatetimestamp());
Optional<List<StructuredProperty>> otitle = Optional.ofNullable(input.getTitle());
if(otitle.isPresent()){
List<StructuredProperty> iTitle = otitle.get()
.stream()
.filter(t -> t.getQualifier().getClassid().equalsIgnoreCase("main title"))
.collect(Collectors.toList());
if(iTitle.size() > 0 ){
out.setMaintitle(iTitle.get(0).getValue());
}
iTitle = otitle.get()
.stream()
.filter(t -> t.getQualifier().getClassid().equalsIgnoreCase("subtitle"))
.collect(Collectors.toList());
if(iTitle.size() > 0){
out.setSubtitle(iTitle.get(0).getValue());
}
}
out.setPid(input.getPid().stream().map(p -> {
ControlledField pid = new ControlledField();
pid.setScheme(p.getQualifier().getClassid());
pid.setValue(p.getValue());
return pid;
}).collect(Collectors.toList()));
out.setPublicationdata(input.getDateofacceptance().getValue());
out.setPublisher(input.getPublisher().getValue());
out.setSource(input.getSource().stream().map(s -> s.getValue()).collect(Collectors.toList()));
out.setSubject(input.getSubject().stream().map(s->{
ControlledField subject = new ControlledField();
subject.setScheme(s.getQualifier().getClassid());
subject.setValue(s.getValue());
return subject;
}).collect(Collectors.toList()));
out.setType(input.getResulttype().getClassid());
return out;
}
}

View File

@ -3,9 +3,14 @@ package eu.dnetlib.dhp.oa.graph.dump;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
@ -17,7 +22,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class PrepareCommunityMap {
public class QueryInformationSystem {
private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') " +
" where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] " +
" return " +
@ -26,6 +31,8 @@ public class PrepareCommunityMap {
"{$x//CONFIGURATION/context/@label}" +
"</community>";
public static Map<String,String> getCommunityMap(final String isLookupUrl)
throws ISLookUpException, DocumentException {
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
@ -51,4 +58,8 @@ public class PrepareCommunityMap {
}
}

View File

@ -1,4 +1,27 @@
package eu.dnetlib.dhp.oa.graph.dump;
public class ResultProject {
import eu.dnetlib.dhp.schema.dump.oaf.Projects;
import java.io.Serializable;
import java.util.List;
public class ResultProject implements Serializable {
private String resultId;
private List<Projects> projectsList;
public String getResultId() {
return resultId;
}
public void setResultId(String resultId) {
this.resultId = resultId;
}
public List<Projects> getProjectsList() {
return projectsList;
}
public void setProjectsList(List<Projects> projectsList) {
this.projectsList = projectsList;
}
}

View File

@ -6,11 +6,9 @@ import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Context;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
@ -18,22 +16,20 @@ import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Result;
public class DumpCommunityProducts implements Serializable {
public class SparkDumpCommunityProducts implements Serializable {
private static final Logger log = LoggerFactory.getLogger(DumpCommunityProducts.class);
private static final Logger log = LoggerFactory.getLogger(SparkDumpCommunityProducts.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
DumpCommunityProducts.class
SparkDumpCommunityProducts.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_parameters.json"));

View File

@ -1,4 +1,99 @@
package eu.dnetlib.dhp.oa.graph.dump;
public class SparkPrepareResultProject {
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.dump.oaf.Projects;
import eu.dnetlib.dhp.schema.dump.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class SparkPrepareResultProject implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkPrepareResultProject.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkPrepareResultProject.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/project_prepare_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
prepareResultProjectList(spark, inputPath, outputPath);
});
}
private static void prepareResultProjectList(SparkSession spark, String inputPath, String outputPath) {
Dataset<Relation> relation = Utils.readPath(spark, inputPath + "/relation" , Relation.class)
.filter("dataInfo.deletedbyinference = false and relClass = 'produces'");
Dataset<Project> projects = Utils.readPath(spark, inputPath + "/project" , Project.class);
projects.joinWith(relation, projects.col("id").equalTo(relation.col("source")))
.groupByKey((MapFunction<Tuple2<Project,Relation>,String>)value -> value._2().getTarget(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Tuple2<Project,Relation>, ResultProject>) (s, it) ->
{
Tuple2<Project, Relation> first = it.next();
ResultProject rp = new ResultProject();
rp.setResultId(first._2().getTarget());
Project p = first._1();
Projects ps = Projects.newInstance(p.getId(), p.getCode().getValue(), p.getAcronym().getValue(),
p.getTitle().getValue(), p.getFundingtree()
.stream()
.map(ft -> ft.getValue()).collect(Collectors.toList()));
List<Projects> projList = Arrays.asList(ps);
rp.setProjectsList(projList);
it.forEachRemaining(c -> {
Project op = c._1();
projList.add(Projects.newInstance(op.getId(), op.getCode().getValue(),
op.getAcronym().getValue(), op.getTitle().getValue(),
op.getFundingtree().stream().map(ft -> ft.getValue()).collect(Collectors.toList())));
});
return rp;
} ,Encoders.bean(ResultProject.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
}

View File

@ -1,4 +1,98 @@
package eu.dnetlib.dhp.oa.graph.dump;
public class SparkSplitForCommunity {
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.dump.oaf.Result;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class SparkSplitForCommunity implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkSplitForCommunity.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkSplitForCommunity.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/split_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final String isLookUpUrl = parser.get("isLookUpUrl");
log.info("isLookUpUrl: {}", isLookUpUrl);
Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
Map<String,String>
communityMap = QueryInformationSystem.getCommunityMap(isLookUpUrl);
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
execSplit(spark, inputPath, outputPath , communityMap.keySet(), inputClazz);
});
}
private static <R extends Result> void execSplit(SparkSession spark, String inputPath, String outputPath, Set<String> communities
, Class<R> inputClazz) {
Dataset<R> result = Utils.readPath(spark, inputPath, inputClazz);
communities.stream()
.forEach(c -> printResult(c, result, outputPath));
}
private static <R extends Result> void printResult(String c, Dataset<R> result, String outputPath) {
result.filter(r -> containsCommunity(r, c))
.write()
.option("compression","gzip")
.mode(SaveMode.Append)
.json(outputPath + "/" + c);
}
private static <R extends Result> boolean containsCommunity(R r, String c) {
if(Optional.ofNullable(r.getContext()).isPresent()) {
return r.getContext().stream().filter(con -> con.getCode().equals(c)).collect(Collectors.toList()).size() > 0;
}
return false;
}
}

View File

@ -12,26 +12,26 @@ import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class UpdateProjectInfo implements Serializable {
public class SparkUpdateProjectInfo implements Serializable {
private static final Logger log = LoggerFactory.getLogger(UpdateProjectInfo.class);
private static final Logger log = LoggerFactory.getLogger(SparkUpdateProjectInfo.class);
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
UpdateProjectInfo.class
SparkUpdateProjectInfo.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/project_input_parameters.json"));
@ -53,8 +53,9 @@ public class UpdateProjectInfo implements Serializable {
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final String resultType = parser.get("resultType");
log.info("resultType: {}", resultType);
final String preparedInfoPath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {}", preparedInfoPath);
Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName);
@ -65,7 +66,7 @@ public class UpdateProjectInfo implements Serializable {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
extend(spark, inputPath, outputPath , resultType, inputClazz);
extend(spark, inputPath, outputPath , preparedInfoPath, inputClazz);
});
}
@ -73,31 +74,24 @@ public class UpdateProjectInfo implements Serializable {
SparkSession spark,
String inputPath,
String outputPath,
String resultType,
String preparedInfoPath,
Class<R> inputClazz) {
Dataset<R> result = Utils.readPath(spark, inputPath + "/" + resultType, inputClazz);
Dataset<Relation> relation = Utils.readPath(spark, inputPath + "/relation", Relation.class)
.filter("dataInfo.deletedbyinference = false and relClass = 'produces'");
Dataset<Project> project = Utils.readPath(spark,inputPath + "/project", Project.class);
relation.joinWith(project, relation.col("source").equalTo(project.col("id")))
result.joinWith(relation, result.col("id").equalTo(relation.col("target")), "left")
.groupByKey(
(MapFunction<Tuple2<R,Relation>, String>) p -> p._1().getId(),
Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Tuple2<R, Relation>, R>)(c, it) -> {
Tuple2<R, Relation> first = it.next();
}, Encoders.bean(inputClazz));
.mapGroups((MapGroupsFunction<String, Project, Project>) (s, it) -> {
Project first = it.next();
it.forEachRemaining(p -> {
first.mergeFrom(p);
Dataset<R> result = Utils.readPath(spark, inputPath , inputClazz);
Dataset<ResultProject> resultProject = Utils.readPath(spark, preparedInfoPath, ResultProject.class);
result.joinWith(resultProject, result.col("id").equalTo(resultProject.col("resultId")),
"left")
.map(value -> {
R r = value._1();
Optional.ofNullable(value._2()).ifPresent(rp -> {
r.setProjects(rp.getProjectsList());
});
return first;
}
return r;
},Encoders.bean(inputClazz))
.write()
.option("compression","gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}

View File

@ -1,4 +1,23 @@
package eu.dnetlib.dhp.oa.graph.dump;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
public class Utils {
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
public static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
}