forked from D-Net/dnet-hadoop
removed pid dump
This commit is contained in:
parent
0cac5436ff
commit
16c54a96f8
|
@ -1,28 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.pid;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
|
||||||
|
|
||||||
public class ResultOrganization implements Serializable {
|
|
||||||
private String resultId;
|
|
||||||
private List<StructuredProperty> orgPids;
|
|
||||||
|
|
||||||
public String getResultId() {
|
|
||||||
return resultId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setResultId(String resultId) {
|
|
||||||
this.resultId = resultId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<StructuredProperty> getOrgPid() {
|
|
||||||
return orgPids;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setOrgPid(List<StructuredProperty> pid) {
|
|
||||||
this.orgPids = pid;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,43 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.pid;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.dump.oaf.KeyValue;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Needed to create relations between pids in the result. The list of resultAllowedPids will produce relation of type
|
|
||||||
* source hasOtherMaterialization target (and vice-versa) where source will be identified by one of the pids in the list
|
|
||||||
* and target by another. A couple of relation between every two nodes. The list of authorAllowedPids will produce
|
|
||||||
* relation of type source hasAuthor target and target isAuthorOf source for every couple of nodes in result and author.
|
|
||||||
*/
|
|
||||||
public class ResultPidsList implements Serializable {
|
|
||||||
private String resultId;
|
|
||||||
private List<KeyValue> resultAllowedPids;
|
|
||||||
private List<List<KeyValue>> authorAllowedPids;
|
|
||||||
|
|
||||||
public String getResultId() {
|
|
||||||
return resultId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setResultId(String resultId) {
|
|
||||||
this.resultId = resultId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<KeyValue> getResultAllowedPids() {
|
|
||||||
return resultAllowedPids;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setResultAllowedPids(List<KeyValue> resultAllowedPids) {
|
|
||||||
this.resultAllowedPids = resultAllowedPids;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<List<KeyValue>> getAuthorAllowedPids() {
|
|
||||||
return authorAllowedPids;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setAuthorAllowedPids(List<List<KeyValue>> authorAllowedPids) {
|
|
||||||
this.authorAllowedPids = authorAllowedPids;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,35 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.pid;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
public class ResultProject implements Serializable {
|
|
||||||
private String resultId;
|
|
||||||
private String code;
|
|
||||||
private List<String> fundings;
|
|
||||||
|
|
||||||
public String getResultId() {
|
|
||||||
return resultId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setResultId(String resultId) {
|
|
||||||
this.resultId = resultId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getCode() {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setCode(String code) {
|
|
||||||
this.code = code;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<String> getFundings() {
|
|
||||||
return fundings;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setFundings(List<String> fundings) {
|
|
||||||
this.fundings = fundings;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,80 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.pid;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
import javax.rmi.CORBA.Util;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
|
||||||
import org.apache.spark.sql.Dataset;
|
|
||||||
import org.apache.spark.sql.Encoders;
|
|
||||||
import org.apache.spark.sql.SaveMode;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.google.gson.Gson;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
|
||||||
import eu.dnetlib.dhp.schema.dump.oaf.KeyValue;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
|
||||||
|
|
||||||
public class SparkCollectPreparedInfo implements Serializable {
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkCollectPreparedInfo.class);
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
String jsonConfiguration = IOUtils
|
|
||||||
.toString(
|
|
||||||
SparkCollectPreparedInfo.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/oa/graph/dump_pid/input_collectandsave.json"));
|
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
|
||||||
parser.parseArgument(args);
|
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.TRUE);
|
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
||||||
|
|
||||||
final String inputPath = parser.get("preparedInfoPath");
|
|
||||||
log.info("inputPath: {}", inputPath);
|
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
|
||||||
log.info("outputPath: {}", outputPath);
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
|
||||||
runWithSparkSession(
|
|
||||||
conf,
|
|
||||||
isSparkSessionManaged,
|
|
||||||
spark -> {
|
|
||||||
Utils.removeOutputDir(spark, outputPath);
|
|
||||||
collectAndSave(spark, inputPath, outputPath);
|
|
||||||
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void collectAndSave(SparkSession spark, String inputPath, String outputPath) {
|
|
||||||
|
|
||||||
Utils
|
|
||||||
.readPath(spark, inputPath + "/publication", ResultPidsList.class)
|
|
||||||
.union(Utils.readPath(spark, inputPath + "/dataset", ResultPidsList.class))
|
|
||||||
.union(Utils.readPath(spark, inputPath + "/software", ResultPidsList.class))
|
|
||||||
.union(Utils.readPath(spark, inputPath + "/otherresearchproduct", ResultPidsList.class))
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.json(outputPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,87 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.pid;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
|
||||||
import org.apache.spark.sql.Dataset;
|
|
||||||
import org.apache.spark.sql.Encoders;
|
|
||||||
import org.apache.spark.sql.SaveMode;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.google.gson.Gson;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
|
||||||
import eu.dnetlib.dhp.schema.dump.pidgraph.Entity;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
|
||||||
|
|
||||||
public class SparkDumpOrganization implements Serializable {
|
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkDumpOrganization.class);
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
String jsonConfiguration = IOUtils
|
|
||||||
.toString(
|
|
||||||
SparkDumpOrganization.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_organization.json"));
|
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
|
||||||
parser.parseArgument(args);
|
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.TRUE);
|
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
||||||
|
|
||||||
final String inputPath = parser.get("sourcePath");
|
|
||||||
log.info("inputPath: {}", inputPath);
|
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
|
||||||
log.info("outputPath: {}", outputPath);
|
|
||||||
|
|
||||||
final List<String> allowedOrgPid = new Gson().fromJson(parser.get("allowedOrganizationPids"), List.class);
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
|
||||||
runWithSparkSession(
|
|
||||||
conf,
|
|
||||||
isSparkSessionManaged,
|
|
||||||
spark -> {
|
|
||||||
Utils.removeOutputDir(spark, outputPath);
|
|
||||||
dumpPidOrgs(spark, allowedOrgPid, inputPath, outputPath);
|
|
||||||
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void dumpPidOrgs(SparkSession spark, List<String> allowedOrgPid, String inputPath,
|
|
||||||
String outputPath) {
|
|
||||||
Dataset<Organization> resultPids = Utils.readPath(spark, inputPath, Organization.class);
|
|
||||||
|
|
||||||
resultPids.flatMap((FlatMapFunction<Organization, Entity>) r -> {
|
|
||||||
List<Entity> ret = new ArrayList<>();
|
|
||||||
r.getPid().forEach(pid -> {
|
|
||||||
if (allowedOrgPid.contains(pid.getQualifier().getClassid().toLowerCase())) {
|
|
||||||
ret.add(Entity.newInstance(pid.getQualifier().getClassid() + ":" + pid.getValue()));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return ret.iterator();
|
|
||||||
}, Encoders.bean(Entity.class))
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.json(outputPath + "/organization");
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,142 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.pid;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.api.java.function.FilterFunction;
|
|
||||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
|
||||||
import org.apache.spark.sql.*;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.google.gson.Gson;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
|
||||||
import eu.dnetlib.dhp.schema.dump.oaf.KeyValue;
|
|
||||||
import eu.dnetlib.dhp.schema.dump.pidgraph.Entity;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
|
||||||
|
|
||||||
public class SparkDumpPidAuthor implements Serializable {
|
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkDumpPidAuthor.class);
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
String jsonConfiguration = IOUtils
|
|
||||||
.toString(
|
|
||||||
SparkDumpPidAuthor.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_author.json"));
|
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
|
||||||
parser.parseArgument(args);
|
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.TRUE);
|
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
||||||
|
|
||||||
final String inputPath = parser.get("sourcePath");
|
|
||||||
log.info("inputPath: {}", inputPath);
|
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
|
||||||
log.info("outputPath: {}", outputPath);
|
|
||||||
|
|
||||||
final List<String> allowedAuthorPids = new Gson().fromJson(parser.get("allowedAuthorPids"), List.class);
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
|
||||||
runWithSparkSession(
|
|
||||||
conf,
|
|
||||||
isSparkSessionManaged,
|
|
||||||
spark -> {
|
|
||||||
Utils.removeOutputDir(spark, outputPath);
|
|
||||||
dumpPidAuthor(spark, inputPath, outputPath, allowedAuthorPids);
|
|
||||||
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void dumpPidAuthor(SparkSession spark, String inputPath, String outputPath, List<String> aap) {
|
|
||||||
Dataset<Publication> publication = Utils.readPath(spark, inputPath + "/publication", Publication.class);
|
|
||||||
Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> dataset = Utils
|
|
||||||
.readPath(spark, inputPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
|
|
||||||
Dataset<Software> software = Utils.readPath(spark, inputPath + "/software", Software.class);
|
|
||||||
Dataset<OtherResearchProduct> other = Utils
|
|
||||||
.readPath(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class);
|
|
||||||
|
|
||||||
publication.createOrReplaceTempView("publication");
|
|
||||||
dataset.createOrReplaceTempView("dataset");
|
|
||||||
software.createOrReplaceTempView("software");
|
|
||||||
other.createOrReplaceTempView("other");
|
|
||||||
|
|
||||||
Dataset<KeyValue> pids = spark
|
|
||||||
.sql(
|
|
||||||
"SELECT DISTINCT apid.value value , apid.qualifier.classid key " +
|
|
||||||
"FROM publication " +
|
|
||||||
"LATERAL VIEW EXPLODE (author) a as auth " +
|
|
||||||
"LATERAL VIEW EXPLODE (auth.pid) p as apid ")
|
|
||||||
.as(Encoders.bean(KeyValue.class))
|
|
||||||
.union(
|
|
||||||
spark
|
|
||||||
.sql(
|
|
||||||
"SELECT DISTINCT apid.value value , apid.qualifier.classid key " +
|
|
||||||
"FROM dataset " +
|
|
||||||
"LATERAL VIEW EXPLODE (author) a as auth " +
|
|
||||||
"LATERAL VIEW EXPLODE (auth.pid) p as apid ")
|
|
||||||
.as(Encoders.bean(KeyValue.class)))
|
|
||||||
.union(
|
|
||||||
spark
|
|
||||||
.sql(
|
|
||||||
"SELECT DISTINCT apid.value value , apid.qualifier.classid key " +
|
|
||||||
"FROM software " +
|
|
||||||
"LATERAL VIEW EXPLODE (author) a as auth " +
|
|
||||||
"LATERAL VIEW EXPLODE (auth.pid) p as apid ")
|
|
||||||
.as(Encoders.bean(KeyValue.class)))
|
|
||||||
.union(
|
|
||||||
spark
|
|
||||||
.sql(
|
|
||||||
"SELECT DISTINCT apid.value value , apid.qualifier.classid key " +
|
|
||||||
"FROM other " +
|
|
||||||
"LATERAL VIEW EXPLODE (author) a as auth " +
|
|
||||||
"LATERAL VIEW EXPLODE (auth.pid) p as apid ")
|
|
||||||
.as(Encoders.bean(KeyValue.class)));
|
|
||||||
|
|
||||||
pids.createOrReplaceTempView("pids");
|
|
||||||
|
|
||||||
spark
|
|
||||||
.sql(
|
|
||||||
"Select distinct key, value " +
|
|
||||||
"FROM pids")
|
|
||||||
.as(Encoders.bean(KeyValue.class))
|
|
||||||
.filter((FilterFunction<KeyValue>) p -> aap.contains(p.getKey()))
|
|
||||||
.map(
|
|
||||||
(MapFunction<KeyValue, Entity>) pid -> Entity.newInstance(pid.getKey() + ":" + pid.getValue()),
|
|
||||||
Encoders.bean(Entity.class))
|
|
||||||
.write()
|
|
||||||
|
|
||||||
// resultPids.flatMap((FlatMapFunction<ResultPidsList, Entity>) r-> {
|
|
||||||
// List<Entity> ret = new ArrayList<>();
|
|
||||||
// r.getAuthorAllowedPids().forEach(pid -> {
|
|
||||||
// ret.addAll(pid.stream().map(p -> Entity.newInstance(p.getKey() + ":" + p.getValue())).collect(Collectors.toList()));
|
|
||||||
//
|
|
||||||
// });
|
|
||||||
// return ret.iterator();
|
|
||||||
// }, Encoders.bean(Entity.class))
|
|
||||||
// .write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.json(outputPath + "/author");
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,82 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.pid;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
|
||||||
import org.apache.spark.sql.Dataset;
|
|
||||||
import org.apache.spark.sql.Encoders;
|
|
||||||
import org.apache.spark.sql.SaveMode;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.google.gson.Gson;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
|
||||||
import eu.dnetlib.dhp.schema.dump.pidgraph.Entity;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
|
||||||
|
|
||||||
public class SparkDumpPidResult implements Serializable {
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkDumpPidResult.class);
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
String jsonConfiguration = IOUtils
|
|
||||||
.toString(
|
|
||||||
SparkDumpPidResult.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_result.json"));
|
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
|
||||||
parser.parseArgument(args);
|
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.TRUE);
|
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
||||||
|
|
||||||
final String inputPath = parser.get("preparedInfoPath");
|
|
||||||
log.info("inputPath: {}", inputPath);
|
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
|
||||||
log.info("outputPath: {}", outputPath);
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
|
||||||
runWithSparkSession(
|
|
||||||
conf,
|
|
||||||
isSparkSessionManaged,
|
|
||||||
spark -> {
|
|
||||||
Utils.removeOutputDir(spark, outputPath);
|
|
||||||
dumpPidEntities(spark, inputPath, outputPath);
|
|
||||||
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void dumpPidEntities(SparkSession spark, String inputPath, String outputPath) {
|
|
||||||
Dataset<ResultPidsList> resultPids = Utils.readPath(spark, inputPath, ResultPidsList.class);
|
|
||||||
|
|
||||||
resultPids.flatMap((FlatMapFunction<ResultPidsList, Entity>) r -> {
|
|
||||||
List<Entity> ret = new ArrayList<>();
|
|
||||||
r.getResultAllowedPids().forEach(pid -> {
|
|
||||||
if (StringUtils.isNoneEmpty(pid.getKey(), pid.getValue()))
|
|
||||||
ret.add(Entity.newInstance(pid.getKey() + ":" + pid.getValue()));
|
|
||||||
});
|
|
||||||
return ret.iterator();
|
|
||||||
}, Encoders.bean(Entity.class))
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.json(outputPath + "/result");
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,90 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.pid;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.io.StringReader;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
|
||||||
import org.apache.spark.sql.Dataset;
|
|
||||||
import org.apache.spark.sql.Encoders;
|
|
||||||
import org.apache.spark.sql.SaveMode;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.dom4j.Document;
|
|
||||||
import org.dom4j.DocumentException;
|
|
||||||
import org.dom4j.io.SAXReader;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.google.gson.Gson;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
|
||||||
import eu.dnetlib.dhp.schema.dump.pidgraph.Entity;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
|
||||||
|
|
||||||
public class SparkDumpProject implements Serializable {
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkDumpOrganization.class);
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
String jsonConfiguration = IOUtils
|
|
||||||
.toString(
|
|
||||||
SparkDumpOrganization.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_project.json"));
|
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
|
||||||
parser.parseArgument(args);
|
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.TRUE);
|
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
||||||
|
|
||||||
final String inputPath = parser.get("sourcePath");
|
|
||||||
log.info("inputPath: {}", inputPath);
|
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
|
||||||
log.info("outputPath: {}", outputPath);
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
|
||||||
runWithSparkSession(
|
|
||||||
conf,
|
|
||||||
isSparkSessionManaged,
|
|
||||||
spark -> {
|
|
||||||
Utils.removeOutputDir(spark, outputPath);
|
|
||||||
dumpProjects(spark, inputPath, outputPath);
|
|
||||||
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void dumpProjects(SparkSession spark, String inputPath, String outputPath) {
|
|
||||||
Dataset<Project> projectDataset = Utils.readPath(spark, inputPath, Project.class);
|
|
||||||
|
|
||||||
projectDataset.flatMap((FlatMapFunction<Project, Entity>) project -> {
|
|
||||||
List<Entity> projs = new ArrayList<>();
|
|
||||||
project.getFundingtree().forEach(fund -> {
|
|
||||||
try {
|
|
||||||
projs.add(Utils.getEntity(fund.getValue(), project.getCode().getValue()));
|
|
||||||
} catch (DocumentException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return projs.iterator();
|
|
||||||
}, Encoders.bean(Entity.class))
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.json(outputPath + "/project");
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,132 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.pid;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
|
||||||
import org.apache.spark.sql.*;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.google.gson.Gson;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.Constants;
|
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
|
||||||
import eu.dnetlib.dhp.schema.dump.oaf.KeyValue;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
public class SparkDumpResultOrganizationRelation implements Serializable {
|
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkDumpResultOrganizationRelation.class);
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
String jsonConfiguration = IOUtils
|
|
||||||
.toString(
|
|
||||||
SparkDumpResultOrganizationRelation.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_organizationrels.json"));
|
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
|
||||||
parser.parseArgument(args);
|
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.TRUE);
|
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
||||||
|
|
||||||
final String inputPath = parser.get("sourcePath");
|
|
||||||
log.info("inputPath: {}", inputPath);
|
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
|
||||||
log.info("outputPath: {}", outputPath);
|
|
||||||
|
|
||||||
final String resultPidListPath = parser.get("preparedInfoPath");
|
|
||||||
|
|
||||||
final List<String> allowedPids = new Gson().fromJson(parser.get("allowedOrganizationPids"), List.class);
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
|
||||||
runWithSparkSession(
|
|
||||||
conf,
|
|
||||||
isSparkSessionManaged,
|
|
||||||
spark -> {
|
|
||||||
Utils.removeOutputDir(spark, outputPath);
|
|
||||||
dumpResultOrganziationRelations(spark, inputPath, resultPidListPath, allowedPids, outputPath);
|
|
||||||
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void dumpResultOrganziationRelations(SparkSession spark, String inputPath, String preparedInfoPath,
|
|
||||||
List<String> allowedPids, String outputPath) {
|
|
||||||
Dataset<Relation> relations = Utils.readPath(spark, inputPath + "/relation", Relation.class);
|
|
||||||
Dataset<Organization> organizations = Utils.readPath(spark, inputPath + "/organization", Organization.class);
|
|
||||||
Dataset<ResultPidsList> resultPid = Utils.readPath(spark, preparedInfoPath, ResultPidsList.class);
|
|
||||||
|
|
||||||
relations.createOrReplaceTempView("relation");
|
|
||||||
organizations.createOrReplaceTempView("organization");
|
|
||||||
|
|
||||||
Dataset<ResultOrganization> resultOrg = spark
|
|
||||||
.sql(
|
|
||||||
"SELECT source resultId , pid orgPids" +
|
|
||||||
"FROM relation r " +
|
|
||||||
"JOIN organization o " +
|
|
||||||
"ON r.target = o.id " +
|
|
||||||
"WHERE r.datainfo.deletedbyinference = false " +
|
|
||||||
"AND o.datainfo.deletedbyinference = false " +
|
|
||||||
"AND lower(relclass) = '" + ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase() + "'")
|
|
||||||
.as(Encoders.bean(ResultOrganization.class));
|
|
||||||
|
|
||||||
resultOrg
|
|
||||||
.joinWith(resultPid, resultOrg.col("resultId").equalTo(resultPid.col("resultId")), "left")
|
|
||||||
.flatMap(
|
|
||||||
(FlatMapFunction<Tuple2<ResultOrganization, ResultPidsList>, eu.dnetlib.dhp.schema.dump.oaf.graph.Relation>) value -> {
|
|
||||||
List<eu.dnetlib.dhp.schema.dump.oaf.graph.Relation> relList = new ArrayList<>();
|
|
||||||
Optional<ResultPidsList> orel = Optional.ofNullable(value._2());
|
|
||||||
if (orel.isPresent()) {
|
|
||||||
List<String> orgList = value
|
|
||||||
._1()
|
|
||||||
.getOrgPid()
|
|
||||||
.stream()
|
|
||||||
.filter(p -> allowedPids.contains(p.getQualifier().getClassid()))
|
|
||||||
.map(pid -> pid.getQualifier().getClassid() + ":" + pid.getValue())
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
if (orgList.size() > 0) {
|
|
||||||
List<KeyValue> resList = orel.get().getResultAllowedPids();
|
|
||||||
for (int i = 0; i < resList.size(); i++) {
|
|
||||||
String pid = resList.get(i).getKey() + ":" + resList.get(i).getValue();
|
|
||||||
for (int j = 0; j < orgList.size(); j++) {
|
|
||||||
relList
|
|
||||||
.addAll(
|
|
||||||
Utils
|
|
||||||
.getRelationPair(
|
|
||||||
pid, orgList.get(j), Constants.RESULT, Constants.ORGANIZATION,
|
|
||||||
ModelConstants.AFFILIATION, ModelConstants.HAS_AUTHOR_INSTITUTION,
|
|
||||||
ModelConstants.IS_AUTHOR_INSTITUTION_OF));
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return relList.iterator();
|
|
||||||
}, Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.graph.Relation.class))
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.json(outputPath + "/relationOrganization");
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,129 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.pid;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
|
||||||
import org.apache.spark.sql.Dataset;
|
|
||||||
import org.apache.spark.sql.Encoders;
|
|
||||||
import org.apache.spark.sql.SaveMode;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.dom4j.DocumentException;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.Constants;
|
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
|
||||||
import eu.dnetlib.dhp.schema.dump.oaf.KeyValue;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
public class SparkDumpResultProjectRelation implements Serializable {
|
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkDumpResultProjectRelation.class);
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
String jsonConfiguration = IOUtils
|
|
||||||
.toString(
|
|
||||||
SparkDumpResultProjectRelation.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_projectrels.json"));
|
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
|
||||||
parser.parseArgument(args);
|
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.TRUE);
|
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
||||||
|
|
||||||
final String inputPath = parser.get("sourcePath");
|
|
||||||
log.info("inputPath: {}", inputPath);
|
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
|
||||||
log.info("outputPath: {}", outputPath);
|
|
||||||
|
|
||||||
final String resultPidListPath = parser.get("preparedInfoPath");
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
|
||||||
runWithSparkSession(
|
|
||||||
conf,
|
|
||||||
isSparkSessionManaged,
|
|
||||||
spark -> {
|
|
||||||
Utils.removeOutputDir(spark, outputPath);
|
|
||||||
dumpResultProjectRelations(spark, inputPath, resultPidListPath, outputPath);
|
|
||||||
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void dumpResultProjectRelations(SparkSession spark, String inputPath, String preparedInfoPath,
|
|
||||||
String outputPath) {
|
|
||||||
Dataset<Relation> relations = Utils.readPath(spark, inputPath + "/relation", Relation.class);
|
|
||||||
Dataset<Project> projects = Utils.readPath(spark, inputPath + "/project", Project.class);
|
|
||||||
Dataset<ResultPidsList> resultPid = Utils.readPath(spark, preparedInfoPath, ResultPidsList.class);
|
|
||||||
|
|
||||||
relations.createOrReplaceTempView("relation");
|
|
||||||
projects.createOrReplaceTempView("project");
|
|
||||||
|
|
||||||
Dataset<ResultProject> resultProj = spark
|
|
||||||
.sql(
|
|
||||||
"SELECT source resultId , code, fundingtree.value fundings" +
|
|
||||||
"FROM relation r " +
|
|
||||||
"JOIN project p " +
|
|
||||||
"ON r.target = p.id " +
|
|
||||||
"WHERE r.datainfo.deletedbyinference = false " +
|
|
||||||
"AND lower(relclass) = '" + ModelConstants.IS_PRODUCED_BY.toLowerCase() + "'")
|
|
||||||
.as(Encoders.bean(ResultProject.class));
|
|
||||||
|
|
||||||
resultProj
|
|
||||||
.joinWith(resultPid, resultProj.col("resultId").equalTo(resultPid.col("resultId")), "left")
|
|
||||||
.flatMap(
|
|
||||||
(FlatMapFunction<Tuple2<ResultProject, ResultPidsList>, eu.dnetlib.dhp.schema.dump.oaf.graph.Relation>) value -> {
|
|
||||||
List<eu.dnetlib.dhp.schema.dump.oaf.graph.Relation> relList = new ArrayList<>();
|
|
||||||
Optional<ResultPidsList> orel = Optional.ofNullable(value._2());
|
|
||||||
if (orel.isPresent()) {
|
|
||||||
List<String> projList = new ArrayList<>();
|
|
||||||
String code = value._1().getCode();
|
|
||||||
for (String fund : value._1().getFundings()) {
|
|
||||||
projList.add(Utils.getEntity(fund, code).getId());
|
|
||||||
}
|
|
||||||
|
|
||||||
List<KeyValue> resList = orel.get().getResultAllowedPids();
|
|
||||||
for (int i = 0; i < resList.size(); i++) {
|
|
||||||
String pid = resList.get(i).getKey() + ":" + resList.get(i).getValue();
|
|
||||||
for (int j = 0; j < projList.size(); j++) {
|
|
||||||
relList
|
|
||||||
.addAll(
|
|
||||||
Utils
|
|
||||||
.getRelationPair(
|
|
||||||
pid, projList.get(j), Constants.RESULT, Constants.PROJECT,
|
|
||||||
ModelConstants.OUTCOME, ModelConstants.IS_PRODUCED_BY,
|
|
||||||
ModelConstants.PRODUCES));
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
return relList.iterator();
|
|
||||||
}, Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.graph.Relation.class))
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.json(outputPath + "/relationProject");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,175 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.pid;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.*;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.api.java.function.FilterFunction;
|
|
||||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
|
||||||
import org.apache.spark.api.java.function.MapGroupsFunction;
|
|
||||||
import org.apache.spark.sql.*;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.Constants;
|
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.community.ResultProject;
|
|
||||||
import eu.dnetlib.dhp.schema.dump.oaf.KeyValue;
|
|
||||||
import eu.dnetlib.dhp.schema.dump.oaf.graph.Relation;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
|
||||||
|
|
||||||
public class SparkDumpResultRelation implements Serializable {
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkDumpResultRelation.class);
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
String jsonConfiguration = IOUtils
|
|
||||||
.toString(
|
|
||||||
SparkDumpResultRelation.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_result.json"));
|
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
|
||||||
parser.parseArgument(args);
|
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.TRUE);
|
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
||||||
|
|
||||||
final String inputPath = parser.get("preparedInfoPath");
|
|
||||||
log.info("inputPath: {}", inputPath);
|
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
|
||||||
log.info("outputPath: {}", outputPath);
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
|
||||||
runWithSparkSession(
|
|
||||||
conf,
|
|
||||||
isSparkSessionManaged,
|
|
||||||
spark -> {
|
|
||||||
Utils.removeOutputDir(spark, outputPath);
|
|
||||||
dumpPidRelations(spark, inputPath, outputPath);
|
|
||||||
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Dataset<Relation> distinctRelations(Dataset<Relation> rels) {
|
|
||||||
return rels
|
|
||||||
.filter(getRelationFilterFunction())
|
|
||||||
.groupByKey(
|
|
||||||
(MapFunction<Relation, String>) r -> String
|
|
||||||
.join(
|
|
||||||
r.getSource().getId(), r.getTarget().getId(), r.getReltype().getName(),
|
|
||||||
r.getReltype().getType()),
|
|
||||||
Encoders.STRING())
|
|
||||||
.mapGroups(
|
|
||||||
(MapGroupsFunction<String, Relation, Relation>) (key, relationIterator) -> relationIterator.next(),
|
|
||||||
Encoders.bean(Relation.class));
|
|
||||||
}
|
|
||||||
|
|
||||||
private static FilterFunction<Relation> getRelationFilterFunction() {
|
|
||||||
return (FilterFunction<Relation>) r -> StringUtils.isNotBlank(r.getSource().getId()) ||
|
|
||||||
StringUtils.isNotBlank(r.getTarget().getId()) ||
|
|
||||||
StringUtils.isNotBlank(r.getReltype().getName()) ||
|
|
||||||
StringUtils.isNotBlank(r.getReltype().getType());
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void dumpPidRelations(SparkSession spark, String inputPath, String outputPath) {
|
|
||||||
Dataset<ResultPidsList> resultPids = Utils.readPath(spark, inputPath, ResultPidsList.class);
|
|
||||||
|
|
||||||
distinctRelations(resultPids.flatMap((FlatMapFunction<ResultPidsList, Relation>) r -> {
|
|
||||||
List<Relation> ret = new ArrayList<>();
|
|
||||||
List<KeyValue> resPids = r.getResultAllowedPids();
|
|
||||||
List<List<KeyValue>> authPids = r.getAuthorAllowedPids();
|
|
||||||
|
|
||||||
for (int i = 0; i < resPids.size() - 1; i++) {
|
|
||||||
String pid = resPids.get(i).getKey() + ":" + resPids.get(i).getValue();
|
|
||||||
for (int j = i + 1; j < resPids.size(); j++) {
|
|
||||||
ret
|
|
||||||
.addAll(
|
|
||||||
Utils
|
|
||||||
.getRelationPair(
|
|
||||||
pid, resPids.get(j).getKey() + ":" + resPids.get(j).getValue(),
|
|
||||||
Constants.RESULT, Constants.RESULT, Constants.SIMILARITY,
|
|
||||||
Constants.RESPID_RESPID_RELATION, Constants.RESPID_RESPID_RELATION));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < authPids.size() - 1; i++) {
|
|
||||||
for (int j = i + 1; j < authPids.size(); j++) {
|
|
||||||
ret.addAll(getAuthRelations(authPids.get(i), authPids.get(j)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < resPids.size(); i++) {
|
|
||||||
String pid = resPids.get(i).getKey() + ":" + resPids.get(i).getValue();
|
|
||||||
for (int j = 0; j < authPids.size(); j++) {
|
|
||||||
for (int k = 0; k < authPids.get(j).size(); k++) {
|
|
||||||
ret
|
|
||||||
.addAll(
|
|
||||||
Utils
|
|
||||||
.getRelationPair(
|
|
||||||
pid,
|
|
||||||
authPids.get(j).get(k).getKey() + ":" + authPids.get(j).get(k).getValue(),
|
|
||||||
Constants.RESULT, Constants.AUTHOR, Constants.AUTHORSHIP,
|
|
||||||
Constants.RES_AUTHOR_RELATION, Constants.AUTHOR_RES_RELATION));
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ret.iterator();
|
|
||||||
}, Encoders.bean(Relation.class)))
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.json(outputPath + "/relation");
|
|
||||||
}
|
|
||||||
|
|
||||||
private static List<Relation> getAuthRelations(List<KeyValue> a1, List<KeyValue> a2) {
|
|
||||||
List<Relation> ret = new ArrayList<>();
|
|
||||||
if (a1.size() > 1) {
|
|
||||||
ret.addAll(getSameAs(a1));
|
|
||||||
}
|
|
||||||
if (a2.size() > 1) {
|
|
||||||
ret.addAll(getSameAs(a2));
|
|
||||||
}
|
|
||||||
for (int i = 0; i < a1.size(); i++) {
|
|
||||||
String pid = a1.get(i).getKey() + ":" + a1.get(i).getValue();
|
|
||||||
for (int j = 0; j < a2.size(); j++) {
|
|
||||||
ret
|
|
||||||
.addAll(
|
|
||||||
Utils
|
|
||||||
.getRelationPair(
|
|
||||||
pid, a2.get(j).getKey() + ":" + a2.get(j).getValue(),
|
|
||||||
Constants.AUTHOR, Constants.AUTHOR, Constants.AUTHORSHIP,
|
|
||||||
Constants.AUTHOR_AUTHOR_RELATION, Constants.AUTHOR_AUTHOR_RELATION));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static List<Relation> getSameAs(List<KeyValue> a1) {
|
|
||||||
List<Relation> ret = new ArrayList<>();
|
|
||||||
for (int i = 0; i < a1.size() - 1; i++) {
|
|
||||||
ret
|
|
||||||
.addAll(
|
|
||||||
Utils
|
|
||||||
.getRelationPair(
|
|
||||||
a1.get(i).getKey() + ":" + a1.get(i).getValue(),
|
|
||||||
a1.get(i + 1).getKey() + ":" + a1.get(i + 1).getValue(),
|
|
||||||
Constants.AUTHOR, Constants.AUTHOR, Constants.SIMILARITY,
|
|
||||||
Constants.SAME_AS, Constants.SAME_AS));
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,127 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.pid;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
|
||||||
import org.apache.spark.sql.Dataset;
|
|
||||||
import org.apache.spark.sql.Encoders;
|
|
||||||
import org.apache.spark.sql.SaveMode;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.google.gson.Gson;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
|
||||||
import eu.dnetlib.dhp.schema.dump.oaf.KeyValue;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
|
||||||
|
|
||||||
public class SparkPrepareResultPids implements Serializable {
|
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkPrepareResultPids.class);
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
String jsonConfiguration = IOUtils
|
|
||||||
.toString(
|
|
||||||
SparkPrepareResultPids.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/oa/graph/dump_pid/input_parameters.json"));
|
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
|
||||||
parser.parseArgument(args);
|
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.TRUE);
|
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
||||||
|
|
||||||
final String inputPath = parser.get("sourcePath");
|
|
||||||
log.info("inputPath: {}", inputPath);
|
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
|
||||||
log.info("outputPath: {}", outputPath);
|
|
||||||
|
|
||||||
final String resultClassName = parser.get("resultTableName");
|
|
||||||
log.info("resultTableName: {}", resultClassName);
|
|
||||||
|
|
||||||
final List<String> allowedResultPid = new Gson().fromJson(parser.get("allowedResultPids"), List.class);
|
|
||||||
final List<String> allowedAuthorPid = new Gson().fromJson(parser.get("allowedAuthorPids"), List.class);
|
|
||||||
|
|
||||||
final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase();
|
|
||||||
log.info("resultType: {}", resultType);
|
|
||||||
|
|
||||||
Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
|
||||||
runWithSparkSession(
|
|
||||||
conf,
|
|
||||||
isSparkSessionManaged,
|
|
||||||
spark -> {
|
|
||||||
Utils.removeOutputDir(spark, outputPath);
|
|
||||||
preparePidEntities(
|
|
||||||
spark, inputPath, outputPath + "/" + resultType, inputClazz, allowedResultPid, allowedAuthorPid);
|
|
||||||
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private static <R extends Result> void preparePidEntities(SparkSession spark, String inputPath, String outputPath,
|
|
||||||
Class<R> inputClazz, List<String> allowedResultPid,
|
|
||||||
List<String> allowedAuthorPid) {
|
|
||||||
|
|
||||||
Dataset<R> result = Utils.readPath(spark, inputPath, inputClazz);
|
|
||||||
|
|
||||||
result.map((MapFunction<R, ResultPidsList>) res -> {
|
|
||||||
ResultPidsList ret = new ResultPidsList();
|
|
||||||
ret.setResultId(res.getId());
|
|
||||||
List<KeyValue> pidList = new ArrayList<>();
|
|
||||||
Optional
|
|
||||||
.ofNullable(res.getPid())
|
|
||||||
.ifPresent(pids -> pids.forEach(pid -> {
|
|
||||||
if (allowedResultPid.contains(pid.getQualifier().getClassid().toLowerCase())) {
|
|
||||||
pidList.add(KeyValue.newInstance(pid.getQualifier().getClassid(), pid.getValue()));
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
ret.setResultAllowedPids(pidList);
|
|
||||||
List<List<KeyValue>> authorPidList = new ArrayList<>();
|
|
||||||
Optional
|
|
||||||
.ofNullable(res.getAuthor())
|
|
||||||
.ifPresent(authors -> authors.forEach(a -> {
|
|
||||||
Optional
|
|
||||||
.ofNullable(a.getPid())
|
|
||||||
.ifPresent(pids -> pids.forEach(p -> {
|
|
||||||
List<KeyValue> authorPids = new ArrayList<>();
|
|
||||||
if (allowedAuthorPid.contains(p.getQualifier().getClassid().toLowerCase())) {
|
|
||||||
authorPids.add(KeyValue.newInstance(p.getQualifier().getClassid(), p.getValue()));
|
|
||||||
}
|
|
||||||
if (authorPids.size() > 0) {
|
|
||||||
authorPidList.add(authorPids);
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
}));
|
|
||||||
ret.setAuthorAllowedPids(authorPidList);
|
|
||||||
|
|
||||||
if (authorPidList.size() == 0 && pidList.size() == 0) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}, Encoders.bean(ResultPidsList.class))
|
|
||||||
.filter(Objects::nonNull)
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Append)
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.json(outputPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
Loading…
Reference in New Issue