changed to dump the whole results graph by usign classes already implemented for communities. Added class to dump also organization

This commit is contained in:
Miriam Baglioni 2020-07-20 17:54:28 +02:00
parent e47ea9349c
commit 08dbd99455
13 changed files with 320 additions and 203 deletions

View File

@ -1,8 +1,11 @@
package eu.dnetlib.dhp.oa.graph.dump.community; package eu.dnetlib.dhp.oa.graph.dump;
import eu.dnetlib.dhp.oa.graph.dump.ResultMapper; import eu.dnetlib.dhp.oa.graph.dump.ResultMapper;
import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -21,7 +24,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class DumpProducts implements Serializable { public class DumpProducts implements Serializable {
public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, CommunityMap communityMap, Class<? extends Result> inputClazz, boolean graph) { public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, CommunityMap communityMap, Class<? extends OafEntity> inputClazz, boolean graph) {
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
@ -35,7 +38,7 @@ public class DumpProducts implements Serializable {
}); });
} }
public static <I extends Result, O extends eu.dnetlib.dhp.schema.dump.oaf.Result> void execDump(SparkSession spark, public static <I extends OafEntity, O extends eu.dnetlib.dhp.schema.dump.oaf.Result> void execDump(SparkSession spark,
String inputPath, String inputPath,
String outputPath, String outputPath,
CommunityMap communityMap, CommunityMap communityMap,
@ -54,14 +57,14 @@ public class DumpProducts implements Serializable {
} }
private static <I extends Result> eu.dnetlib.dhp.schema.dump.oaf.Result execMap(I value, private static <I extends OafEntity> eu.dnetlib.dhp.schema.dump.oaf.Result execMap(I value,
CommunityMap communityMap, CommunityMap communityMap,
boolean graph) { boolean graph) {
if (!graph) { if (!graph) {
Set<String> communities = communityMap.keySet(); Set<String> communities = communityMap.keySet();
Optional<List<Context>> inputContext = Optional.ofNullable(value.getContext()); Optional<List<Context>> inputContext = Optional.ofNullable(((eu.dnetlib.dhp.schema.oaf.Result)value).getContext());
if (!inputContext.isPresent()) { if (!inputContext.isPresent()) {
return null; return null;
} }

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.oa.graph.dump.community; package eu.dnetlib.dhp.oa.graph.dump;
import java.io.StringReader; import java.io.StringReader;
import java.util.List; import java.util.List;

View File

@ -12,12 +12,13 @@ import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Journal; import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class Mapper implements Serializable { public class ResultMapper implements Serializable {
public static <I extends eu.dnetlib.dhp.schema.oaf.Result> Result map( public static <I extends eu.dnetlib.dhp.schema.oaf.OafEntity> Result map(
I input, Map<String, String> communityMap) { I in, Map<String, String> communityMap) {
final Result out = new Result(); final Result out = new Result();
eu.dnetlib.dhp.schema.oaf.Result input = (eu.dnetlib.dhp.schema.oaf.Result)in;
Optional<eu.dnetlib.dhp.schema.oaf.Qualifier> ort = Optional.ofNullable(input.getResulttype()); Optional<eu.dnetlib.dhp.schema.oaf.Qualifier> ort = Optional.ofNullable(input.getResulttype());
if (ort.isPresent()) { if (ort.isPresent()) {
switch (ort.get().getClassid()) { switch (ort.get().getClassid()) {
@ -152,10 +153,11 @@ public class Mapper implements Serializable {
.map(cf -> KeyValue.newInstance(cf.getKey(), cf.getValue())) .map(cf -> KeyValue.newInstance(cf.getKey(), cf.getValue()))
.collect(Collectors.toList())); .collect(Collectors.toList()));
Set<String> communities = communityMap.keySet(); Set<String> communities = communityMap.keySet();
List<Context> contextList = input List<Context> contextList = Optional.ofNullable(input
.getContext() .getContext())
.stream() .map(value -> value.stream()
.map(c -> { .map(c -> {
String community_id = c.getId(); String community_id = c.getId();
if (community_id.indexOf("::") > 0) { if (community_id.indexOf("::") > 0) {
@ -167,7 +169,7 @@ public class Mapper implements Serializable {
context.setLabel(communityMap.get(community_id)); context.setLabel(communityMap.get(community_id));
Optional<List<DataInfo>> dataInfo = Optional.ofNullable(c.getDataInfo()); Optional<List<DataInfo>> dataInfo = Optional.ofNullable(c.getDataInfo());
if (dataInfo.isPresent()) { if (dataInfo.isPresent()) {
List<String> provenance = new ArrayList<>(); List<Provenance> provenance = new ArrayList<>();
provenance provenance
.addAll( .addAll(
dataInfo dataInfo
@ -175,7 +177,7 @@ public class Mapper implements Serializable {
.stream() .stream()
.map(di -> { .map(di -> {
if (di.getInferred()) { if (di.getInferred()) {
return di.getProvenanceaction().getClassname(); return Provenance.newInstance(di.getProvenanceaction().getClassname(), di.getTrust());
} }
return null; return null;
}) })
@ -188,7 +190,9 @@ public class Mapper implements Serializable {
return null; return null;
}) })
.filter(Objects::nonNull) .filter(Objects::nonNull)
.collect(Collectors.toList()); .collect(Collectors.toList()))
.orElse(new ArrayList<>());
if (contextList.size() > 0) { if (contextList.size() > 0) {
out.setContext(contextList); out.setContext(contextList);
} }
@ -214,9 +218,9 @@ public class Mapper implements Serializable {
.ifPresent( .ifPresent(
provenance -> country provenance -> country
.setProvenance( .setProvenance(
provenance Provenance.newInstance(provenance
.getProvenanceaction() .getProvenanceaction()
.getClassname())); .getClassname(), c.getDataInfo().getTrust())));
countryList countryList
.add(country); .add(country);
})); }));
@ -378,9 +382,9 @@ public class Mapper implements Serializable {
private static Subject getSubject(StructuredProperty s){ private static Subject getSubject(StructuredProperty s){
Subject subject = new Subject(); Subject subject = new Subject();
subject.setSubject(ControlledField.newInstance(s.getQualifier().getClassid(), s.getValue())); subject.setSubject(ControlledField.newInstance(s.getQualifier().getClassid(), s.getValue()));
Optional<DataInfo> di = Optional.of(s.getDataInfo()); Optional<DataInfo> di = Optional.ofNullable(s.getDataInfo());
Provenance p = new Provenance();
if (di.isPresent()){ if (di.isPresent()){
Provenance p = new Provenance();
p.setProvenance(di.get().getProvenanceaction().getClassname()); p.setProvenance(di.get().getProvenanceaction().getClassname());
p.setTrust(di.get().getTrust()); p.setTrust(di.get().getTrust());
subject.setProvenance(p); subject.setProvenance(p);

View File

@ -4,11 +4,11 @@ package eu.dnetlib.dhp.oa.graph.dump;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
import eu.dnetlib.dhp.schema.dump.oaf.Projects; import eu.dnetlib.dhp.schema.dump.oaf.community.Project;
public class ResultProject implements Serializable { public class ResultProject implements Serializable {
private String resultId; private String resultId;
private List<Projects> projectsList; private List<Project> projectsList;
public String getResultId() { public String getResultId() {
return resultId; return resultId;
@ -18,11 +18,11 @@ public class ResultProject implements Serializable {
this.resultId = resultId; this.resultId = resultId;
} }
public List<Projects> getProjectsList() { public List<Project> getProjectsList() {
return projectsList; return projectsList;
} }
public void setProjectsList(List<Projects> projectsList) { public void setProjectsList(List<Project> projectsList) {
this.projectsList = projectsList; this.projectsList = projectsList;
} }
} }

View File

@ -3,10 +3,8 @@ package eu.dnetlib.dhp.oa.graph.dump;
import java.io.File; import java.io.File;
import java.io.Serializable; import java.io.Serializable;
import java.util.Arrays;
import javax.management.Query;
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -16,10 +14,7 @@ import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import com.google.gson.Gson;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.zenodo.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
public class SendToZenodo implements Serializable { public class SendToZenodo implements Serializable {

View File

@ -8,6 +8,9 @@ import java.io.StringReader;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.dump.oaf.community.Project;
import eu.dnetlib.dhp.schema.dump.oaf.Provenance;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
@ -18,20 +21,13 @@ import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.dom4j.Document; import org.dom4j.Document;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.Node; import org.dom4j.Node;
import org.dom4j.io.SAXReader; import org.dom4j.io.SAXReader;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.dump.oaf.Funder; import eu.dnetlib.dhp.schema.dump.oaf.Funder;
import eu.dnetlib.dhp.schema.dump.oaf.Projects;
import eu.dnetlib.dhp.schema.dump.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2; import scala.Tuple2;
@ -75,48 +71,45 @@ public class SparkPrepareResultProject implements Serializable {
Dataset<Relation> relation = Utils Dataset<Relation> relation = Utils
.readPath(spark, inputPath + "/relation", Relation.class) .readPath(spark, inputPath + "/relation", Relation.class)
.filter("dataInfo.deletedbyinference = false and relClass = 'produces'"); .filter("dataInfo.deletedbyinference = false and relClass = 'produces'");
Dataset<Project> projects = Utils.readPath(spark, inputPath + "/project", Project.class); Dataset<eu.dnetlib.dhp.schema.oaf.Project> projects = Utils.readPath(spark, inputPath + "/project", eu.dnetlib.dhp.schema.oaf.Project.class);
projects projects
.joinWith(relation, projects.col("id").equalTo(relation.col("source"))) .joinWith(relation, projects.col("id").equalTo(relation.col("source")))
.groupByKey( .groupByKey(
(MapFunction<Tuple2<Project, Relation>, String>) value -> value._2().getTarget(), Encoders.STRING()) (MapFunction<Tuple2<eu.dnetlib.dhp.schema.oaf.Project, Relation>, String>) value -> value._2().getTarget(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Tuple2<Project, Relation>, ResultProject>) (s, it) -> { .mapGroups((MapGroupsFunction<String, Tuple2<eu.dnetlib.dhp.schema.oaf.Project, Relation>, ResultProject>) (s, it) -> {
Set<String> projectSet = new HashSet<>(); Set<String> projectSet = new HashSet<>();
Tuple2<Project, Relation> first = it.next(); Tuple2<eu.dnetlib.dhp.schema.oaf.Project, Relation> first = it.next();
ResultProject rp = new ResultProject(); ResultProject rp = new ResultProject();
rp.setResultId(first._2().getTarget()); rp.setResultId(first._2().getTarget());
Project p = first._1(); eu.dnetlib.dhp.schema.oaf.Project p = first._1();
projectSet.add(p.getId()); projectSet.add(p.getId());
Projects ps = Projects Project ps = getProject(p);
.newInstance(
p.getId(), p.getCode().getValue(), List<Project> projList = new ArrayList<>();
Optional
.ofNullable(p.getAcronym())
.map(a -> a.getValue())
.orElse(null),
Optional
.ofNullable(p.getTitle())
.map(v -> v.getValue())
.orElse(null),
Optional
.ofNullable(p.getFundingtree())
.map(
value -> value
.stream()
.map(ft -> getFunder(ft.getValue()))
.collect(Collectors.toList())
.get(0))
.orElse(null));
List<Projects> projList = new ArrayList<>();
projList.add(ps); projList.add(ps);
rp.setProjectsList(projList); rp.setProjectsList(projList);
it.forEachRemaining(c -> { it.forEachRemaining(c -> {
Project op = c._1(); eu.dnetlib.dhp.schema.oaf.Project op = c._1();
if (!projectSet.contains(op.getId())) { if (!projectSet.contains(op.getId())) {
projList projList
.add( .add(getProject(op));
Projects
projectSet.add(op.getId());
}
});
return rp;
}, Encoders.bean(ResultProject.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
private static Project getProject(eu.dnetlib.dhp.schema.oaf.Project op) {
Project p = Project
.newInstance( .newInstance(
op.getId(), op.getId(),
op.getCode().getValue(), op.getCode().getValue(),
@ -136,18 +129,18 @@ public class SparkPrepareResultProject implements Serializable {
.map(ft -> getFunder(ft.getValue())) .map(ft -> getFunder(ft.getValue()))
.collect(Collectors.toList()) .collect(Collectors.toList())
.get(0)) .get(0))
.orElse(null))); .orElse(null));
projectSet.add(op.getId());
Optional<DataInfo> di = Optional.ofNullable(op.getDataInfo());
Provenance provenance = new Provenance();
if(di.isPresent()){
provenance.setProvenance(di.get().getProvenanceaction().getClassname());
provenance.setTrust(di.get().getTrust());
p.setProvenance(provenance);
} }
}); return p;
return rp;
}, Encoders.bean(ResultProject.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
} }
private static Funder getFunder(String fundingtree) { private static Funder getFunder(String fundingtree) {

View File

@ -1,6 +1,8 @@
package eu.dnetlib.dhp.oa.graph.dump; package eu.dnetlib.dhp.oa.graph.dump;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
@ -24,4 +26,8 @@ public class Utils {
.textFile(inputPath) .textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); .map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
} }
public static ISLookUpService getIsLookUpService(String isLookUpUrl) {
return ISLookupClientFactory.getLookUpService(isLookUpUrl);
}
} }

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.oa.graph.dump; package eu.dnetlib.dhp.oa.graph.dump.community;
import java.io.Serializable; import java.io.Serializable;
import java.util.HashMap; import java.util.HashMap;

View File

@ -1,30 +1,18 @@
package eu.dnetlib.dhp.oa.graph.dump; package eu.dnetlib.dhp.oa.graph.dump.community;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable; import java.io.Serializable;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
import javax.management.Query;
import eu.dnetlib.dhp.oa.graph.dump.DumpProducts;
import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class SparkDumpCommunityProducts implements Serializable { public class SparkDumpCommunityProducts implements Serializable {
@ -63,74 +51,18 @@ public class SparkDumpCommunityProducts implements Serializable {
final Optional<String> cm = Optional.ofNullable(parser.get("communityMap")); final Optional<String> cm = Optional.ofNullable(parser.get("communityMap"));
Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName); Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
CommunityMap communityMap;
if (!isLookUpUrl.equals("BASEURL:8280/is/services/isLookUp")) {
queryInformationSystem = new QueryInformationSystem(); queryInformationSystem = new QueryInformationSystem();
queryInformationSystem.setIsLookUp(getIsLookUpService(isLookUpUrl)); queryInformationSystem.setIsLookUp(Utils.getIsLookUpService(isLookUpUrl));
communityMap = queryInformationSystem.getCommunityMap(); CommunityMap communityMap = queryInformationSystem.getCommunityMap();
} else {
communityMap = new Gson().fromJson(cm.get(), CommunityMap.class);
}
runWithSparkSession( DumpProducts dump = new DumpProducts();
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
execDump(spark, inputPath, outputPath, communityMap, inputClazz);// , dumpClazz);
}); dump.run(isSparkSessionManaged, inputPath, outputPath, communityMap, inputClazz, false);
} }
public static ISLookUpService getIsLookUpService(String isLookUpUrl) {
return ISLookupClientFactory.getLookUpService(isLookUpUrl);
}
public static <I extends Result, O extends eu.dnetlib.dhp.schema.dump.oaf.Result> void execDump(SparkSession spark,
String inputPath,
String outputPath,
CommunityMap communityMap,
Class<I> inputClazz) {// Class<O> dumpClazz) {
// Set<String> communities = communityMap.keySet();
Dataset<I> tmp = Utils.readPath(spark, inputPath, inputClazz);
tmp
.map(value -> execMap(value, communityMap), Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.Result.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
private static <I extends Result> eu.dnetlib.dhp.schema.dump.oaf.Result execMap(I value,
CommunityMap communityMap) {
{
Set<String> communities = communityMap.keySet();
Optional<List<Context>> inputContext = Optional.ofNullable(value.getContext());
if (!inputContext.isPresent()) {
return null;
}
List<String> toDumpFor = inputContext.get().stream().map(c -> {
if (communities.contains(c.getId())) {
return c.getId();
}
if (c.getId().contains("::") && communities.contains(c.getId().substring(0, c.getId().indexOf("::")))) {
return c.getId().substring(0, 3);
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());
if (toDumpFor.size() == 0) {
return null;
}
return Mapper.map(value, communityMap);
}
}
} }

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.oa.graph.dump; package eu.dnetlib.dhp.oa.graph.dump.community;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
@ -8,6 +8,8 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;

View File

@ -1,4 +1,46 @@
package eu.dnetlib.dhp.oa.graph.dump.graph; package eu.dnetlib.dhp.oa.graph.dump.graph;
public class DumpOrganization { import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Organization;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.io.Serializable;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class DumpOrganization implements Serializable {
public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath ) {
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
execDump(spark, inputPath, outputPath);
});
}
private void execDump(SparkSession spark, String inputPath, String outputPath) {
Utils.readPath(spark, inputPath, Organization.class)
.map(org -> OrganizationMapper.map(org), Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.graph.Organization.class))
.write()
.option("compression","gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
} }

View File

@ -1,4 +1,15 @@
package eu.dnetlib.dhp.oa.graph.dump.graph; package eu.dnetlib.dhp.oa.graph.dump.graph;
public class OrganizationMapper { import eu.dnetlib.dhp.schema.dump.oaf.ControlledField;
import eu.dnetlib.dhp.schema.dump.oaf.Country;
import eu.dnetlib.dhp.schema.dump.oaf.graph.Organization;
import java.io.Serializable;
import java.util.List;
import java.util.stream.Collectors;
public class OrganizationMapper implements Serializable {
} }

View File

@ -1,4 +1,133 @@
package eu.dnetlib.dhp.oa.graph.dump.graph; package eu.dnetlib.dhp.oa.graph.dump.graph;
public class SparkDumpJob { import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.DumpProducts;
import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem;
import eu.dnetlib.dhp.oa.graph.dump.ResultMapper;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
import eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.dump.oaf.ControlledField;
import eu.dnetlib.dhp.schema.dump.oaf.Country;
import eu.dnetlib.dhp.schema.dump.oaf.Result;
import eu.dnetlib.dhp.schema.dump.oaf.graph.Organization;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.Optional;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class SparkDumpJob implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkDumpCommunityProducts.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkDumpJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_graphdump_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final String isLookUpUrl = parser.get("isLookUpUrl");
log.info("isLookUpUrl: {}", isLookUpUrl);
Class<? extends OafEntity> inputClazz = (Class<? extends OafEntity>) Class.forName(resultClassName);
QueryInformationSystem queryInformationSystem = new QueryInformationSystem();
queryInformationSystem.setIsLookUp(Utils.getIsLookUpService(isLookUpUrl));
CommunityMap communityMap = queryInformationSystem.getCommunityMap();
switch (ModelSupport.idPrefixMap.get(inputClazz)){
case "50":
DumpProducts d = new DumpProducts();
d.run(isSparkSessionManaged,inputPath,outputPath,communityMap, inputClazz, true);
break;
case "40":
break;
case "20":
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
organizationMap(spark, inputPath, outputPath);
});
break;
}
}
private static void organizationMap(SparkSession spark, String inputPath, String outputPath) {
Utils.readPath(spark, inputPath, eu.dnetlib.dhp.schema.oaf.Organization.class)
.map(o -> map(o), Encoders.bean(Organization.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.json(outputPath);
}
private static Organization map(eu.dnetlib.dhp.schema.oaf.Organization org){
Organization organization = new Organization();
Optional.ofNullable(org.getLegalshortname())
.ifPresent(value -> organization.setLegalshortname(value.getValue()));
Optional.ofNullable(org.getLegalname())
.ifPresent(value -> organization.setLegalname(value.getValue()));
Optional.ofNullable(org.getWebsiteurl())
.ifPresent(value -> organization.setWebsiteurl(value.getValue()));
Optional.ofNullable(org.getAlternativeNames())
.ifPresent(value -> organization.setAlternativenames(value.stream()
.map( v-> v.getValue()).collect(Collectors.toList())));
Optional.ofNullable(org.getCountry())
.ifPresent(value -> organization.setCountry(Country.newInstance(value.getClassid(), value.getClassname(), null)));
Optional.ofNullable(org.getId())
.ifPresent(value -> organization.setId(value));
Optional.ofNullable(org.getPid())
.ifPresent(value -> organization.setPid(
value.stream().map(p -> ControlledField.newInstance(p.getQualifier().getClassid(), p.getValue())).collect(Collectors.toList())
));
return organization;
}
} }