forked from D-Net/dnet-hadoop
first implementation for the persistence of project and community relations to result having doi
This commit is contained in:
parent
8e0e090d7a
commit
9d33f0e6d7
|
@ -0,0 +1,47 @@
|
|||
package eu.dnetlib.dhp.actionmanager.remapping;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class ASResultInfo implements Serializable {
|
||||
private String id;
|
||||
private String type; //result or relation
|
||||
private List<InferenceInfo> value; //the community or the project
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public void setType(String type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public List<InferenceInfo> getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public void setValue(List<InferenceInfo> value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
|
||||
public static ASResultInfo copy(ASResultInfo as){
|
||||
ASResultInfo ar = new ASResultInfo();
|
||||
ar.id = as.id;
|
||||
ar.type = as.type;
|
||||
ar.value = as.value.stream().map(InferenceInfo::copy).collect(Collectors.toList());
|
||||
return ar;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
package eu.dnetlib.dhp.actionmanager.remapping;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class ActionSet implements Serializable {
|
||||
private String name;
|
||||
private String rawset;
|
||||
private String directory;
|
||||
|
||||
public String getDirectory() {
|
||||
return directory;
|
||||
}
|
||||
|
||||
public void setDirectory(String directory) {
|
||||
this.directory = directory;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getRawset() {
|
||||
return rawset;
|
||||
}
|
||||
|
||||
public void setRawset(String rawset) {
|
||||
this.rawset = rawset;
|
||||
}
|
||||
|
||||
public static ActionSet newInstance(String name, String rawset, String directory){
|
||||
ActionSet as = new ActionSet();
|
||||
as.name = name;
|
||||
as.rawset = rawset;
|
||||
as.directory = directory;
|
||||
return as;
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
package eu.dnetlib.dhp.actionmanager.remapping;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class InferenceInfo implements Serializable {
|
||||
private String value;
|
||||
private String trust;
|
||||
private String inference_provenance;
|
||||
|
||||
public String getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public void setValue(String value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public String getTrust() {
|
||||
return trust;
|
||||
}
|
||||
|
||||
public void setTrust(String trust) {
|
||||
this.trust = trust;
|
||||
}
|
||||
|
||||
public String getInference_provenance() {
|
||||
return inference_provenance;
|
||||
}
|
||||
|
||||
public void setInference_provenance(String inference_provenance) {
|
||||
this.inference_provenance = inference_provenance;
|
||||
}
|
||||
|
||||
public static InferenceInfo copy(InferenceInfo ii){
|
||||
InferenceInfo iinfo = new InferenceInfo();
|
||||
iinfo.value = ii.value;
|
||||
iinfo.trust = ii.trust;
|
||||
iinfo.inference_provenance = ii.inference_provenance;
|
||||
return iinfo;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,147 @@
|
|||
package eu.dnetlib.dhp.actionmanager.remapping;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
public class PrepareInfo implements Serializable {
|
||||
private String outputPath;
|
||||
private Boolean isSparkSessionManaged;
|
||||
private String asInputPath;
|
||||
private String inputGraphPath;
|
||||
private List<String> actionSetActive;
|
||||
private List<ActionSet> actionSetList;
|
||||
|
||||
public PrepareInfo(Boolean isSparkSessionManaged,
|
||||
String outputPath,
|
||||
String asInputPath,
|
||||
String inputGraphPath,
|
||||
List<String> actionSetActive,
|
||||
List<ActionSet> actionSetList) {
|
||||
this.isSparkSessionManaged = isSparkSessionManaged;
|
||||
this.outputPath = outputPath;
|
||||
this.inputGraphPath = inputGraphPath;
|
||||
this.asInputPath = asInputPath;
|
||||
this.actionSetActive = actionSetActive;
|
||||
this.actionSetList = actionSetList;
|
||||
}
|
||||
|
||||
public void run(){
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
common.removeOutputDir(spark, outputPath);
|
||||
exec(spark, asInputPath, inputGraphPath, outputPath,
|
||||
actionSetList.stream().map(as -> {
|
||||
if(actionSetActive.contains(as.getName())){
|
||||
return as;
|
||||
}
|
||||
return null;
|
||||
}).filter(Objects::nonNull)
|
||||
.collect(Collectors.toList()));
|
||||
});
|
||||
}
|
||||
|
||||
private static void exec(SparkSession spark, String asInputPath, String graphInputPath, String outputPath,
|
||||
List<ActionSet> actionSets){
|
||||
|
||||
|
||||
|
||||
Dataset<Relation> relation = common.readPath(spark, graphInputPath + "/relation", Relation.class);
|
||||
|
||||
|
||||
actionSets.forEach(as -> resolveActionSet(spark, asInputPath + "/" + as.getDirectory() + "/" + as.getRawset() ,
|
||||
outputPath ));
|
||||
|
||||
relation.createOrReplaceTempView("relation");
|
||||
|
||||
spark.sql("SELECT source dedupId, collect_set(target) as merges " +
|
||||
"FROM relation " +
|
||||
"WHERE relclass = 'merges' " +
|
||||
"GROUP BY source")
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath + "/relation/");
|
||||
|
||||
}
|
||||
|
||||
private static void resolveActionSet(SparkSession spark, String asInputPath, String outputPath){
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
spark
|
||||
.createDataset(sc.sequenceFile(asInputPath, Text.class, Text.class)
|
||||
.map(a -> common.OBJECT_MAPPER.readValue(a._2().toString(),AtomicAction.class))
|
||||
.map(aa -> getAsResultInfo(aa)).filter(Objects::nonNull)
|
||||
.rdd(), Encoders.bean(ASResultInfo.class))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Append)
|
||||
.json(outputPath + "/actionset/");
|
||||
|
||||
}
|
||||
|
||||
private static ASResultInfo getAsResultInfo(AtomicAction aa) {
|
||||
ASResultInfo ri = new ASResultInfo();
|
||||
if(ModelSupport.resultRelationType.get(aa.getClazz()).equals("result")){
|
||||
|
||||
Result result = (Result) aa.getPayload();
|
||||
ri.setId(result.getId());
|
||||
ri.setType("result");
|
||||
List<InferenceInfo> inferenceInfoList = new ArrayList<>();
|
||||
|
||||
result.getContext().forEach(c -> {
|
||||
String id = c.getId();
|
||||
c.getDataInfo().forEach(di -> {
|
||||
InferenceInfo ii = new InferenceInfo();
|
||||
ii.setValue(id);
|
||||
ii.setTrust(di.getTrust());
|
||||
ii.setInference_provenance(di.getInferenceprovenance());
|
||||
inferenceInfoList.add(ii);
|
||||
});
|
||||
});
|
||||
|
||||
ri.setValue(inferenceInfoList);
|
||||
|
||||
}else{
|
||||
Relation rel = (Relation)aa.getPayload();
|
||||
if(rel.getSource().startsWith("50|")){
|
||||
ri.setId(rel.getSource());
|
||||
ri.setType("relation");
|
||||
InferenceInfo ii = new InferenceInfo();
|
||||
ii.setInference_provenance(rel.getDataInfo().getInferenceprovenance());
|
||||
ii.setTrust(rel.getDataInfo().getTrust());
|
||||
ii.setValue(rel.getTarget());
|
||||
ri.setValue(Arrays.asList(ii));
|
||||
}else{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return ri;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
package eu.dnetlib.dhp.actionmanager.remapping;
|
||||
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class QueryInformationSystem implements Serializable {
|
||||
|
||||
private static final String ACTION_MANAGER_PATH_QUERY = "for $x in " +
|
||||
"collection(' /db/DRIVER/ServiceResources/ActionManagerServiceResourceType') " +
|
||||
"return data($x//PROPERTY[./@key='basePath']/@value)";
|
||||
|
||||
private static final String ACTION_SET_QUERY = "for $x in " +
|
||||
"collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') " +
|
||||
"return concat (data($x//SET/@id),'@@',data($x//LATEST/@id),'@@',data($x//SET/@directory))";
|
||||
|
||||
public static String getActionManagerPath(final String isLookupUrl)
|
||||
throws ISLookUpException {
|
||||
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
return isLookUp.quickSearchProfile(ACTION_MANAGER_PATH_QUERY).get(0);
|
||||
|
||||
}
|
||||
|
||||
public static List<ActionSet> getActionSet (final String isLookupUrl)
|
||||
throws ISLookUpException {
|
||||
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
return isLookUp.quickSearchProfile(ACTION_SET_QUERY)
|
||||
.stream()
|
||||
.map(res -> {
|
||||
String[] tmp = res.split("@@");
|
||||
return ActionSet.newInstance(tmp[0], tmp[1], tmp[2]);
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
package eu.dnetlib.dhp.actionmanager.remapping;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
public class RelationMerges implements Serializable {
|
||||
private String dedupId;
|
||||
private List<String> merges ;
|
||||
|
||||
public String getDedupId() {
|
||||
return dedupId;
|
||||
}
|
||||
|
||||
public void setDedupId(String dedupId) {
|
||||
this.dedupId = dedupId;
|
||||
}
|
||||
|
||||
public List<String> getMerges() {
|
||||
return merges;
|
||||
}
|
||||
|
||||
public void setMerges(List<String> merges) {
|
||||
this.merges = merges;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package eu.dnetlib.dhp.actionmanager.remapping;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class ResultPid implements Serializable {
|
||||
private String resultId;
|
||||
private String doi;
|
||||
|
||||
public String getResultId() {
|
||||
return resultId;
|
||||
}
|
||||
|
||||
public void setResultId(String resultId) {
|
||||
this.resultId = resultId;
|
||||
}
|
||||
|
||||
public String getDoi() {
|
||||
return doi;
|
||||
}
|
||||
|
||||
public void setDoi(String doi) {
|
||||
this.doi = doi;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
package eu.dnetlib.dhp.actionmanager.remapping;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
public class SparkExpandResultInfo implements Serializable {
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkExpandResultInfo.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
SparkExpandResultInfo.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/actionmanager/remapping/input_expand_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
final String relationInputPath = parser.get("relationInputPath");
|
||||
|
||||
final String asInputPath = parser.get("asInputPath");
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
common.removeOutputDir(spark, outputPath);
|
||||
exec(spark, relationInputPath, asInputPath, outputPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static void exec(SparkSession spark, String relationInputPath, String asInputPath, String outputPath){
|
||||
Dataset<RelationMerges> rel = common.readPath(spark, relationInputPath, RelationMerges.class);
|
||||
|
||||
Dataset<ASResultInfo> asResultInfo = common.readPath(spark, asInputPath, ASResultInfo.class);
|
||||
|
||||
asResultInfo.joinWith(rel, asResultInfo.col("id").equalTo(rel.col("dedupId")), "left")
|
||||
.flatMap((FlatMapFunction<Tuple2<ASResultInfo, RelationMerges>, ASResultInfo>) value -> {
|
||||
ASResultInfo ri = value._1();
|
||||
if(Objects.isNull(value._2())){
|
||||
return Arrays.asList(ri).iterator();
|
||||
}
|
||||
|
||||
return value._2().getMerges().stream()
|
||||
.map(res -> {
|
||||
ASResultInfo copy = ASResultInfo.copy(ri);
|
||||
copy.setId(res);
|
||||
return copy;
|
||||
}).collect(Collectors.toList()).iterator();
|
||||
|
||||
}, Encoders.bean(ASResultInfo.class)
|
||||
)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath);
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
package eu.dnetlib.dhp.actionmanager.remapping;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.spark.SparkConf;
|
||||
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
|
||||
import org.apache.spark.sql.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
public class SparkPrepareInfo implements Serializable {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkPrepareInfo.class);
|
||||
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
SparkPrepareInfo.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/actionmanager/remapping/input_prepare_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String isLookUpUrl = parser.get("isLookUpUrl");
|
||||
log.info("isLookUpUrl: {}", isLookUpUrl);
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
final String inputPath = parser.get("inputPath");
|
||||
|
||||
final List<String> actionSetActive = new Gson().fromJson(parser.get("actionSets"), List.class);
|
||||
|
||||
final String asInputPath = QueryInformationSystem.getActionManagerPath(isLookUpUrl);
|
||||
|
||||
final List<ActionSet> actionSetList = QueryInformationSystem.getActionSet(isLookUpUrl);
|
||||
|
||||
PrepareInfo prepareInfo = new PrepareInfo(isSparkSessionManaged, outputPath, asInputPath, inputPath, actionSetActive, actionSetList);
|
||||
|
||||
prepareInfo.run();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,176 @@
|
|||
package eu.dnetlib.dhp.actionmanager.remapping;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
public class SparkRedistributeIISResult implements Serializable {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkRedistributeIISResult.class);
|
||||
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
SparkRedistributeIISResult.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/actionmanager/remapping/input_redistribute_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String outputPathResult = parser.get("outputPathResult");
|
||||
log.info("outputPathResult: {}", outputPathResult);
|
||||
|
||||
final String outputPathRelation = parser.get("outputPathRelation");
|
||||
log.info("outputPathRelation: {}", outputPathRelation);
|
||||
|
||||
final String inputPath = parser.get("inputPath");
|
||||
|
||||
final String asInputPath = parser.get("asInputPath");
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
common.removeOutputDir(spark, outputPathResult);
|
||||
Dataset<ResultPid> resultPidDataset = common.readPath(spark, inputPath + "/publication", ResultPid.class)
|
||||
.union(common.readPath(spark, inputPath + "/dataset", ResultPid.class))
|
||||
.union(common.readPath(spark, inputPath +"/software" , ResultPid.class))
|
||||
.union(common.readPath(spark, inputPath +"/otherresearchproduct", ResultPid.class));
|
||||
Dataset<ASResultInfo> asResultInfoDataset = common.readPath(spark, asInputPath, ASResultInfo.class);
|
||||
execResult(spark, asResultInfoDataset.filter("type = 'result'"), resultPidDataset, outputPathResult);
|
||||
execRelation(spark, asResultInfoDataset.filter("type = 'relation'"), resultPidDataset, outputPathRelation);
|
||||
});
|
||||
}
|
||||
|
||||
private static void execRelation(SparkSession spark, Dataset<ASResultInfo> asResultInfoDataset,
|
||||
Dataset<ResultPid> resultPidDataset, String outputPathRelation) {
|
||||
resultPidDataset.joinWith(asResultInfoDataset, resultPidDataset.col("resultId").equalTo(asResultInfoDataset.col("id")), "left")
|
||||
.flatMap((FlatMapFunction<Tuple2<ResultPid,ASResultInfo>, Relation>) value -> {
|
||||
List<Relation> relationList = new ArrayList<>();
|
||||
if(Objects.nonNull(value._2())){
|
||||
relationList.add(getRelation(value._2(), "result"));
|
||||
relationList.add(getRelation(value._2(), "project"));
|
||||
|
||||
}
|
||||
return relationList.iterator();
|
||||
}, Encoders.bean(Relation.class))
|
||||
.filter(Objects::nonNull)
|
||||
.toJavaRDD()
|
||||
.map(p -> new AtomicAction(Relation.class, p))
|
||||
.mapToPair(
|
||||
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||
new Text(common.OBJECT_MAPPER.writeValueAsString(aa))))
|
||||
.saveAsHadoopFile(outputPathRelation, Text.class, Text.class, SequenceFileOutputFormat.class);
|
||||
|
||||
}
|
||||
|
||||
private static Relation getRelation(ASResultInfo asResultInfo, String type) {
|
||||
Relation r = new Relation();
|
||||
if(type.equals("result")){
|
||||
r.setSource(asResultInfo.getId());
|
||||
r.setRelClass("isProducedBy");
|
||||
r.setTarget(asResultInfo.getValue().get(0).getValue());
|
||||
}else{
|
||||
r.setRelClass("produces");
|
||||
r.setSource(asResultInfo.getValue().get(0).getValue());
|
||||
r.setTarget(asResultInfo.getId());
|
||||
}
|
||||
|
||||
|
||||
r.setRelType("resultProject");
|
||||
r.setSubRelType("outcome");
|
||||
|
||||
r.setDataInfo(getDataInfo(asResultInfo));
|
||||
return r;
|
||||
}
|
||||
|
||||
private static DataInfo getDataInfo(ASResultInfo asResultInfo) {
|
||||
DataInfo di = new DataInfo();
|
||||
di.setInvisible(false) ;
|
||||
di.setInvisible(true);
|
||||
di.setDeletedbyinference(false);
|
||||
di.setTrust(asResultInfo.getValue().get(0).getTrust());
|
||||
di.setInferenceprovenance(asResultInfo.getValue().get(0).getInference_provenance());
|
||||
Qualifier pAction = new Qualifier();
|
||||
pAction.setClassid("iis");
|
||||
pAction.setClassname("iss");
|
||||
pAction.setSchemename("dnet:provenanceActions");
|
||||
pAction.setSchemeid("dnet:provenanceActions");
|
||||
di.setProvenanceaction(pAction);
|
||||
return di;
|
||||
}
|
||||
|
||||
private static void execResult(SparkSession spark, Dataset<ASResultInfo> asResultInfoDataset, Dataset<ResultPid> resultPidDataset, String outputPath){
|
||||
|
||||
resultPidDataset.joinWith(asResultInfoDataset, resultPidDataset.col("resultId").equalTo(asResultInfoDataset.col("id")), "left")
|
||||
.map(value -> {
|
||||
if(Objects.isNull(value._2())){
|
||||
return null;
|
||||
}
|
||||
Result r = new Result();
|
||||
ASResultInfo asResultInfo = value._2();
|
||||
r.setId(asResultInfo.getId());
|
||||
StructuredProperty pid = new StructuredProperty();
|
||||
pid.setValue(value._1().getDoi());
|
||||
Qualifier qualifier = new Qualifier();
|
||||
qualifier.setClassid("doi");
|
||||
qualifier.setClassname("doi");
|
||||
qualifier.setSchemeid("dnet:pid");
|
||||
qualifier.setSchemename("dnet:pid");
|
||||
pid.setQualifier(qualifier);
|
||||
r.setContext(asResultInfo.getValue().stream().map(v -> {
|
||||
Context c = new Context();
|
||||
c.setId(v.getValue());
|
||||
DataInfo dataInfo = new DataInfo();
|
||||
dataInfo.setTrust(v.getTrust());
|
||||
dataInfo.setInferenceprovenance(v.getInference_provenance());
|
||||
dataInfo.setDeletedbyinference(false);
|
||||
dataInfo.setInferred(true);
|
||||
dataInfo.setInvisible(false);
|
||||
Qualifier pAction = new Qualifier();
|
||||
pAction.setClassid("iis");
|
||||
pAction.setClassname("iis");
|
||||
pAction.setSchemeid("dnet:provenanceActions");
|
||||
pAction.setSchemename("dnet:provenanceActions");
|
||||
dataInfo.setProvenanceaction(pAction);
|
||||
c.setDataInfo(Arrays.asList(dataInfo));
|
||||
return c;
|
||||
}).collect(Collectors.toList()));
|
||||
return r;
|
||||
}, Encoders.bean(Result.class))
|
||||
.filter(Objects::nonNull)
|
||||
.toJavaRDD()
|
||||
.map(p -> new AtomicAction(Result.class, p))
|
||||
.mapToPair(
|
||||
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||
new Text(common.OBJECT_MAPPER.writeValueAsString(aa))))
|
||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,77 @@
|
|||
package eu.dnetlib.dhp.actionmanager.remapping;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Optional;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
public class SparkSelectResults implements Serializable {
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkSelectResults.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
SparkSelectResults.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/actionmanager/remapping/input_select_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
final String inputPath = parser.get("inputPath");
|
||||
|
||||
final String resultClassName = parser.get("resultTableName");
|
||||
log.info("resultTableName: {}", resultClassName);
|
||||
|
||||
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
||||
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
common.removeOutputDir(spark, outputPath);
|
||||
exec(spark, inputPath, outputPath, resultClazz);
|
||||
});
|
||||
}
|
||||
|
||||
private static <R extends Result> void exec(SparkSession spark, String inputPath, String outputPath, Class<R> resultClazz ){
|
||||
Dataset<R> result = common.readPath(spark, inputPath, resultClazz);
|
||||
|
||||
result.createOrReplaceTempView("result");
|
||||
|
||||
spark.sql("SELECT id resultId, persId.value doi " +
|
||||
"from result " +
|
||||
"lateral view explode(pid) p as persId " +
|
||||
"lateral view explode(collectedfrom) c as cf " +
|
||||
"where persId.qualifier.classid = 'doi' " +
|
||||
"and (cf.key = '10|openaire____::9e3be59865b2c1c335d32dae2fe7b254' or " +
|
||||
"cf.key = '10|openaire____::081b82f96300b6a6e3d282bad31cb6e2') " +
|
||||
"and result.id not like '50|dedup%' ")
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
package eu.dnetlib.dhp.actionmanager.remapping;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class common implements Serializable {
|
||||
|
||||
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static <R> Dataset<R> readPath(
|
||||
SparkSession spark, String inputPath, Class<R> clazz) {
|
||||
return spark
|
||||
.read()
|
||||
.textFile(inputPath)
|
||||
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||
}
|
||||
|
||||
public static void removeOutputDir(SparkSession spark, String path) {
|
||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,4 @@
|
|||
package eu.dnetlib.dhp.actionmanager.remapping;
|
||||
|
||||
public class PrepareInfoTest {
|
||||
}
|
Loading…
Reference in New Issue