added code for ircdl_extention

This commit is contained in:
Miriam Baglioni 2021-05-31 10:59:58 +02:00
parent 9d617a0a58
commit 5d8257b288
14 changed files with 1194 additions and 0 deletions

View File

@ -0,0 +1,33 @@
package eu.dnetlib.dhp.ircdl_extention;
import java.io.Serializable;
public class KeyValue implements Serializable {
private String key;
private String value;
public static KeyValue newInstance(String key, String value) {
KeyValue kv = new KeyValue();
kv.key = key;
kv.value = value;
return kv;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}

View File

@ -0,0 +1,79 @@
package eu.dnetlib.dhp.ircdl_extention;
import java.io.Serializable;
import java.util.List;
public class Orcid implements Serializable {
private List<String> otherNames;
private String inception;
private String surname;
private String mode;
private String creditname;
private String oid;
private Boolean works;
private String name;
public List<String> getOtherNames() {
return otherNames;
}
public void setOtherNames(List<String> otherNames) {
this.otherNames = otherNames;
}
public String getInception() {
return inception;
}
public void setInception(String inception) {
this.inception = inception;
}
public String getSurname() {
return surname;
}
public void setSurname(String surname) {
this.surname = surname;
}
public String getMode() {
return mode;
}
public void setMode(String mode) {
this.mode = mode;
}
public String getCreditname() {
return creditname;
}
public void setCreditname(String creditname) {
this.creditname = creditname;
}
public String getOid() {
return oid;
}
public void setOid(String oid) {
this.oid = oid;
}
public Boolean getWorks() {
return works;
}
public void setWorks(Boolean works) {
this.works = works;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}

View File

@ -0,0 +1,77 @@
package eu.dnetlib.dhp.ircdl_extention;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport;
public class PrepareCrossrefSpark {
final static String OUTPUT_PATH = "/tmp/miriam/IRCDL_Extention/ResultFromDOIMinter/Crossref/";
final static String RESULT_PATH = "/tmp/miriam/IRCDL_Extention/ResultWithOrcid/Normalized/";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
Boolean isSparkSessionManaged = Boolean.TRUE;
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, OUTPUT_PATH);
exec(spark, OUTPUT_PATH, RESULT_PATH);
});
}
private static void exec(SparkSession spark, String output_path, String result_path)
throws Exception {
Dataset<Result> datacite = selectResult(
spark, result_path + "graphResultPubsWithOrcid",
output_path + "crossrefPubsWithOrcid")
.union(
selectResult(
spark, result_path + "graphResultDatsWithOrcid",
output_path + "crossrefDatsWithOrcid"))
.union(
selectResult(
spark, result_path + "graphResultSwWithOrcid",
output_path + "crossrefSwWithOrcid"))
.union(
selectResult(
spark, result_path + "graphResultOtherWithOrcid",
output_path + "crossrefOtherWithOrcid"));
}
private static Dataset<Result> selectResult(SparkSession spark, String result_path, String output_path) {
Dataset<Result> res = Utils.readPath(
spark, result_path, Result.class)
.filter(
(FilterFunction<Result>) r -> !r.getId().startsWith("50|dedup") &&
r.getCf().stream().anyMatch(cf -> cf.getValue().equals("Crossref")));
res.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(output_path);
return res;
}
}

View File

@ -0,0 +1,94 @@
package eu.dnetlib.dhp.ircdl_extention;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport;
public class PrepareDataciteSpark {
final static String OUTPUT_PATH = "/tmp/miriam/IRCDL_Extention/ResultFromDOIMinter/Datacite/";
final static String RESULT_PATH = "/tmp/miriam/IRCDL_Extention/ResultWithOrcid/Normalized/";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
Boolean isSparkSessionManaged = Boolean.TRUE;
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, OUTPUT_PATH);
exec(spark, OUTPUT_PATH, RESULT_PATH);
});
}
private static void exec(SparkSession spark, String output_path, String result_path)
throws Exception {
Dataset<Result> datacite = selectResult(
spark, result_path + "graphResultPubsWithOrcid",
output_path + "datacitePubsWithOrcid")
.union(
selectResult(
spark, result_path + "graphResultDatsWithOrcid",
output_path + "dataciteDatsWithOrcid"))
.union(
selectResult(
spark, result_path + "graphResultSwWithOrcid",
output_path + "dataciteSwWithOrcid"))
.union(
selectResult(
spark, result_path + "graphResultOtherWithOrcid",
output_path + "dataciteOtherWithOrcid"));
getProviderResult(output_path, datacite, "Zenodo");
getProviderResult(output_path, datacite, "Figshare");
getProviderResult(output_path, datacite, "Dryad");
}
private static void getProviderResult(String output_path, Dataset<Result> datacite, String provider) {
datacite
.filter(
(FilterFunction<Result>) r -> r
.getPid()
.stream()
.anyMatch(p -> p.getKey().equals("doi") && p.getValue().contains(provider.toLowerCase())))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(output_path + provider);
}
private static Dataset<Result> selectResult(SparkSession spark, String result_path, String output_path) {
Dataset<eu.dnetlib.dhp.ircdl_extention.Result> res = Utils.readPath(
spark, result_path, eu.dnetlib.dhp.ircdl_extention.Result.class)
.filter(
(FilterFunction<eu.dnetlib.dhp.ircdl_extention.Result>) r -> r.getId().startsWith("50|datacite"));
res.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(output_path);
return res;
}
}

View File

@ -0,0 +1,55 @@
package eu.dnetlib.dhp.ircdl_extention;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
public class PrepareNormalizedOrcid {
final static String OUTPUT_PATH = "/tmp/miriam/IRCDL_Extention/ORCID/entrySetMayNormalized/";
final static String INPUT_PATH = "/tmp/IRCDL_Extention/entrySetMay/";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
Boolean isSparkSessionManaged = Boolean.TRUE;
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, OUTPUT_PATH);
execNormalize(spark, OUTPUT_PATH, INPUT_PATH);
});
}
private static void execNormalize(SparkSession spark, String outputPath, String inputPath) {
Dataset<Orcid> orcid = Utils.readPath(spark, inputPath, Orcid.class);
orcid.map((MapFunction<Orcid, Orcid>) o ->{
o.setName(Utils.normalizeString(Utils.replace_chars(o.getName())));
o.setSurname(Utils.normalizeString(Utils.replace_chars(o.getSurname())));
o.setCreditname(Utils.normalizeString(Utils.replace_chars(o.getCreditname())));
o.setOtherNames(o.getOtherNames().stream()
.map(on -> Utils.normalizeString(Utils.replace_chars(on))).collect(Collectors.toList()));
return o;
}, Encoders.bean(Orcid.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
}

View File

@ -0,0 +1,52 @@
package eu.dnetlib.dhp.ircdl_extention;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.ResultTypeComparator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
public class PrepareNormalizedResultSpark {
final static String OUTPUT_PATH = "/tmp/miriam/IRCDL_Extention/ResultWithOrcid/Normalized/";
final static String INPUT_PATH = "/tmp/IRCDL_Extention/entrySetMay/";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
Boolean isSparkSessionManaged = Boolean.TRUE;
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, OUTPUT_PATH);
execNormalize(spark, OUTPUT_PATH, INPUT_PATH);
});
}
private static void execNormalize(SparkSession spark, String outputPath, String inputPath) {
Dataset<Result> result = Utils.readPath(spark, inputPath, Result.class);
result.map((MapFunction<Result, Result>) r ->{
r.setName(Utils.normalizeString(Utils.replace_chars(r.getName())));
r.setSurname(Utils.normalizeString(Utils.replace_chars(r.getSurname())));
r.setFullname(Utils.normalizeString(Utils.replace_chars(r.getFullname())));
return r;
}, Encoders.bean(Result.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
}

View File

@ -0,0 +1,100 @@
package eu.dnetlib.dhp.ircdl_extention;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Objects;
import java.util.Optional;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport;
import scala.Tuple2;
public class PrepareResultAllTheRestSpark {
final static String RESULT_PATH = "/tmp/miriam/IRCDL_Extention/ResultWithOrcid/Normalized/";
final static String INSTREPO_PATH = "/tmp/miriam/IRCDL_Extention/ResultFromInstReposWithOrcid/";
final static String DATACITE_PATH = "/tmp/miriam/IRCDL_Extention/ResultFromDOIMinter/Datacite/";
final static String CROSSREF_PATH = "/tmp/miriam/IRCDL_Extention/ResultFromDOIMinter/Crossref/";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
Boolean isSparkSessionManaged = Boolean.TRUE;
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, RESULT_PATH + "CollectedNotDataciteNotCrossrefNotFromInstrepos");
exec(
spark, RESULT_PATH + "CollectedNotDataciteNotCrossrefNotFromInstrepos",
RESULT_PATH + "graphCollectedWithOrcid", INSTREPO_PATH,
DATACITE_PATH, CROSSREF_PATH);
});
}
/**
* Leggo tutti i result di crossref, datacite ed associati agli institutional repositories
* Leggo tutti i result collezionati
* faccio una left join tra i result collezionati e quelli letti al passo precedente
* prendo quelli che non hanno un match nella join
* @param spark
* @param output_path
* @param result_path
*/
private static void exec(SparkSession spark, String output_path, String result_path, String inst_repo_path,
String datacite_path, String crossref_path) {
Dataset<Result> result = Utils.readPath(spark, result_path, Result.class);
Dataset<Result> inst_repo = Utils.readPath(spark, inst_repo_path + "graphResultPubsWithOrcidInstRepos", Result.class)
.union(Utils.readPath(spark, inst_repo_path + "graphResultDatsWithOrcidInstRepos", Result.class))
.union(Utils.readPath(spark, inst_repo_path + "graphResultSwWithOrcidInstRepos", Result.class))
.union(Utils.readPath(spark, inst_repo_path + "graphResultOtherWithOrcidInstRepos", Result.class));
Dataset<Result> datacite = Utils.readPath(spark, datacite_path + "datacitePubsWithOrcid", Result.class)
.union(Utils.readPath(spark, datacite_path + "dataciteDatsWithOrcid", Result.class))
.union(Utils.readPath(spark, datacite_path + "dataciteSwWithOrcid", Result.class))
.union(Utils.readPath(spark, datacite_path + "dataciteOtherWithOrcid", Result.class));
Dataset<Result> crossref = Utils.readPath(spark, crossref_path + "crossrefPubsWithOrcid", Result.class)
.union(Utils.readPath(spark, crossref_path + "crossrefDatsWithOrcid", Result.class))
.union(Utils.readPath(spark, crossref_path + "crossrefSwWithOrcid", Result.class))
.union(Utils.readPath(spark, crossref_path + "crossrefOtherWithOrcid", Result.class));
Dataset<Result> union_dataset = inst_repo.union(datacite).union(crossref);
result
.joinWith(union_dataset, result.col("id").equalTo(union_dataset.col("id")), "left")
.map((MapFunction<Tuple2<Result, Result>, Result>) t2 -> {
if (!Optional.ofNullable(t2._2()).isPresent())
return t2._1();
return null;
}, Encoders.bean(Result.class))
.filter(Objects::nonNull)
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(output_path);
}
}

View File

@ -0,0 +1,106 @@
package eu.dnetlib.dhp.ircdl_extention;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import javax.xml.crypto.Data;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Datasource;
public class PrepareResultFromInstRepo {
final static String OUTPUT_PATH = "/tmp/miriam/IRCDL_Extention/ResultFromInstReposWithOrcid/";
final static String INPUT_PATH = "/tmp/miriam/graph_20210503/datasource";
final static String RESULT_PATH = "/tmp/miriam/IRCDL_Extention/ResultWithOrcid/Normalized/";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
Boolean isSparkSessionManaged = Boolean.TRUE;
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, OUTPUT_PATH);
exec(spark, INPUT_PATH, OUTPUT_PATH, RESULT_PATH);
});
}
private static void exec(SparkSession spark, String input_path, String output_path, String result_path)
throws Exception {
selectResultFromInstRepo(
spark, result_path + "graphResultPubsWithOrcid",
output_path + "graphResultPubsWithOrcidInstRepos", input_path);
selectResultFromInstRepo(
spark, result_path + "graphResultDatsWithOrcid",
output_path + "graphResultDatsWithOrcidInstRepos", input_path);
selectResultFromInstRepo(
spark, result_path + "graphResultSwWithOrcid",
output_path + "graphResultSwWithOrcidInstRepos", input_path);
selectResultFromInstRepo(
spark, result_path + "graphResultOtherWithOrcid",
output_path + "graphResultOtherWithOrcidInstRepos", input_path);
}
private static void selectResultFromInstRepo(SparkSession spark, String result_path, String output_path,
String input_path) {
Dataset<Datasource> datasource = Utils.readPath(spark, input_path, Datasource.class);
Dataset<eu.dnetlib.dhp.ircdl_extention.Result> res = Utils.readPath(
spark, result_path, eu.dnetlib.dhp.ircdl_extention.Result.class)
.filter(
(FilterFunction<eu.dnetlib.dhp.ircdl_extention.Result>) r -> !r.getId().startsWith("50|doiboost")
&& !r.getId().startsWith("50|scholix")
&& !r.getId().startsWith("50|datacite")
&& !r.getId().startsWith("50|dedup"));
datasource.createOrReplaceTempView("datasource");
res.createOrReplaceTempView("result");
spark
.sql(
"SELECT t.id, t.deletedbyinference, t.name, t.surname, t.cf, t.fullname, t.pid, t.oid " +
"FROM " +
"(Select * " +
"from result " +
"LATERAL VIEW explode(cf.key) c as cfromkey) as t " +
"join " +
"datasource d " +
"on " +
"d.id = t.cfromkey " +
"and d.datasourcetype.classid = 'pubsrepository::institutional'")
.as(Encoders.bean(Result.class))
.write()
.option("compressio", "gzip")
.mode(SaveMode.Overwrite)
.json(output_path);
}
}

View File

@ -0,0 +1,146 @@
package eu.dnetlib.dhp.ircdl_extention;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.PrepareProgramme;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelConstants;
public class PrepareResultSpark {
private static final Logger log = LoggerFactory.getLogger(PrepareProgramme.class);
public static void main(String[] args) throws Exception {
final String OUTPUT_PATH = "/tmp/miriam/IRCDL_Extention/ResultWithOrcid/";
final String INPUT_PATH = "/tmp/miriam/graph_20210503/";
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
Boolean.TRUE,
spark -> {
Utils.removeOutputDir(spark, OUTPUT_PATH);
exec(spark, INPUT_PATH, OUTPUT_PATH);
});
}
private static <R extends eu.dnetlib.dhp.schema.oaf.Result> Dataset<Result> mapToResult(SparkSession spark,
String input_path,
Class<R> resultClazz, String output_path) {
Dataset<R> publicationDataset = Utils.readPath(spark, input_path, resultClazz);
Dataset<Result> result = publicationDataset.filter((FilterFunction<R>) p -> {
if (p.getAuthor() == null)
return false;
if (p.getAuthor().size() == 0)
return false;
return true;
})
.flatMap((FlatMapFunction<R, Result>) p -> {
List<Result> reslist = new ArrayList<>();
p.getAuthor().forEach(a -> {
a.getPid().forEach(apid -> {
if (apid.getQualifier().getClassid().equals(ModelConstants.ORCID)
|| apid.getQualifier().getClassid().equals(ModelConstants.ORCID_PENDING)) {
Result r = new Result();
r.setDeletedbyinference(p.getDataInfo().getDeletedbyinference());
r.setId(p.getId());
r
.setCf(
p
.getCollectedfrom()
.stream()
.map(cf -> KeyValue.newInstance(cf.getKey(), cf.getValue()))
.collect(Collectors.toList()));
r
.setPid(
p
.getPid()
.stream()
.map(
pid -> KeyValue
.newInstance(pid.getQualifier().getClassid(), pid.getValue()))
.collect(Collectors.toList()));
r.setName(a.getName());
r.setSurname(a.getSurname());
r.setFullname(a.getFullname());
r.setOid(apid.getValue());
reslist.add(r);
}
});
});
return reslist.iterator();
}, Encoders.bean(Result.class));
result
.write()
.option("compressio", "gzip")
.mode(SaveMode.Overwrite)
.json(output_path);
return result;
}
private static void exec(SparkSession spark, String input_path, String output_path) throws Exception {
Dataset<Result> result = mapToResult(
spark, input_path + "publication",
(Class<? extends eu.dnetlib.dhp.schema.oaf.Result>) Class.forName("eu.dnetlib.dhp.schema.oaf.Publication"),
output_path + "graphResultPubsWithOrcid")
.union(
mapToResult(
spark, input_path + "dataset",
(Class<? extends eu.dnetlib.dhp.schema.oaf.Result>) Class
.forName("eu.dnetlib.dhp.schema.oaf.Dataset"),
output_path + "graphResultDatsWithOrcid"))
.union(
mapToResult(
spark, input_path + "software",
(Class<? extends eu.dnetlib.dhp.schema.oaf.Result>) Class
.forName("eu.dnetlib.dhp.schema.oaf.Software"),
output_path + "graphResultSwWithOrcid"))
.union(
mapToResult(
spark, input_path + "otherresearchproduct",
(Class<? extends eu.dnetlib.dhp.schema.oaf.Result>) Class
.forName("eu.dnetlib.dhp.schema.oaf.OtherResearchProduct"),
output_path + "graphResultOtherWithOrcid"));
result
.filter((FilterFunction<Result>) r -> !r.getId().startsWith("50|dedup"))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(output_path + "graphCollectedWithOrcid");
result
.filter((FilterFunction<Result>) r -> !r.getDeletedbyinference())
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(output_path + "graphNotDeletedbyinferenceWithOrcid");
}
}

View File

@ -0,0 +1,80 @@
package eu.dnetlib.dhp.ircdl_extention;
import java.io.Serializable;
import java.util.List;
public class Result implements Serializable {
private Boolean deletedbyinference;
private String id;
private List<KeyValue> cf;
private List<KeyValue> pid;
private String name;
private String surname;
private String fullname;
private String oid;
public Boolean getDeletedbyinference() {
return deletedbyinference;
}
public void setDeletedbyinference(Boolean deletedbyinference) {
this.deletedbyinference = deletedbyinference;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public List<KeyValue> getCf() {
return cf;
}
public void setCf(List<KeyValue> cf) {
this.cf = cf;
}
public List<KeyValue> getPid() {
return pid;
}
public void setPid(List<KeyValue> pid) {
this.pid = pid;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSurname() {
return surname;
}
public void setSurname(String surname) {
this.surname = surname;
}
public String getFullname() {
return fullname;
}
public void setFullname(String fullname) {
this.fullname = fullname;
}
public String getOid() {
return oid;
}
public void setOid(String oid) {
this.oid = oid;
}
}

View File

@ -0,0 +1,65 @@
package eu.dnetlib.dhp.ircdl_extention;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import java.io.Serializable;
import java.text.Normalizer;
public class Utils implements Serializable {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static String normalizeString(String input){
if(input == null)
return new String();
return Normalizer
.normalize(input, Normalizer.Form.NFKD)
.replaceAll("[^\\p{ASCII}]", "");
}
public static String replace_chars(String input){
String tmp = input.replaceAll("@", " ")
.replaceAll(".", " ")
.replaceAll("=", " ")
.replaceAll(",", " ")
.replaceAll("", " ")
.replaceAll("", " ")
.replaceAll("'", " ")
.replaceAll("\\+", " ")
.replaceAll("-", " ")
.replaceAll("", " ")
.replaceAll("", " ")
.replaceAll("", " ")
.replaceAll("", " ")
.replaceAll("", " ")
.replace("\"", " ")
.replaceAll("/" , " ")
.replace("", " ")
.replaceAll("\\[" ," ")
.replaceAll("]" , " ")
.replaceAll("\\(", " ")
.replaceAll("\\)", " ").replaceAll("s+", " ");
return tmp.trim();
}
public static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
public static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
}

View File

@ -0,0 +1,86 @@
package eu.dnetlib.dhp.ircdl_extention;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.PrepareProgramme;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
public class WrongSpark {
/**
* takes as input the orcid normalized and the entry normalized to be checked against orcid
* returns the lower bound of wrong attribution
*/
//
// final static String OUTPUT_PATH = "/tmp/miriam/IRCDL_Extention/Wrong/";
// final static String ORCID_PATH = "/tmp/miriam/IRCDL_Extention/ORCID/entrySetMayNormalized/";
// final static String RESULT_PATH = "/tmp/miriam/IRCDL_Extention/ResultWithOrcid/";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareProgramme.class
.getResourceAsStream(
"/eu/dnetlib/dhp/ircdl_extention/wrong_orcid_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
final String orcidPath = parser.get("orcidPath");
final String outputPath = parser.get("outputPath");
final String resultPath = parser.get("resultPath");
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
findWrong(spark, orcidPath, outputPath, resultPath);
});
}
private static void findWrong(SparkSession spark, String orcidPath, String outputPath, String resultPath) {
Dataset<Orcid> orcidDataset = Utils.readPath(spark, orcidPath, Orcid.class);
Dataset<Result> resultDataset = Utils.readPath(spark, resultPath, Result.class);
resultDataset.joinWith(orcidDataset, resultDataset.col("oid")
.equalTo(orcidDataset.col("oid")), "left")
.filter((FilterFunction<Tuple2<Result, Orcid>>) tp -> isWrong(tp))
.write()
.option("compression","gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
private static boolean isWrong(Tuple2<Result, Orcid> tp) {
}
}

View File

@ -0,0 +1,58 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
<property>
<name>sparkExecutorNumber</name>
<value>4</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>sparkDriverMemory</name>
<value>15G</value>
</property>
<property>
<name>sparkExecutorMemory</name>
<value>6G</value>
</property>
<property>
<name>sparkExecutorCores</name>
<value>1</value>
</property>
</configuration>

View File

@ -0,0 +1,163 @@
<workflow-app name="H2020Programme" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>projectFileURL</name>
<description>the url where to get the projects file</description>
</property>
<property>
<name>programmeFileURL</name>
<description>the url where to get the programme file</description>
</property>
<property>
<name>topicFileURL</name>
<description>the url where to get the topic file</description>
</property>
<property>
<name>outputPath</name>
<description>path where to store the action set</description>
</property>
</parameters>
<start to="deleteoutputpath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="deleteoutputpath">
<fs>
<delete path='${outputPath}'/>
<mkdir path='${outputPath}'/>
<delete path='${workingDir}'/>
<mkdir path='${workingDir}'/>
</fs>
<ok to="get_project_file"/>
<error to="Kill"/>
</action>
<action name="get_project_file">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.project.utils.ReadCSV</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--fileURL</arg><arg>${projectFileURL}</arg>
<arg>--hdfsPath</arg><arg>${workingDir}/projects</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.CSVProject</arg>
</java>
<ok to="get_programme_file"/>
<error to="Kill"/>
</action>
<action name="get_programme_file">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.project.utils.ReadCSV</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--fileURL</arg><arg>${programmeFileURL}</arg>
<arg>--hdfsPath</arg><arg>${workingDir}/programme</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.CSVProgramme</arg>
</java>
<ok to="get_topic_file"/>
<error to="Kill"/>
</action>
<action name="get_topic_file">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.project.utils.ReadExcel</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--fileURL</arg><arg>${topicFileURL}</arg>
<arg>--hdfsPath</arg><arg>${workingDir}/topic</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.EXCELTopic</arg>
</java>
<ok to="read_projects"/>
<error to="Kill"/>
</action>
<action name="read_projects">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.project.ReadProjectsFromDB</main-class>
<arg>--hdfsPath</arg><arg>${workingDir}/dbProjects</arg>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
</java>
<ok to="prepare_programme"/>
<error to="Kill"/>
</action>
<action name="prepare_programme">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareProgramme</name>
<class>eu.dnetlib.dhp.actionmanager.project.PrepareProgramme</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--programmePath</arg><arg>${workingDir}/programme</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedProgramme</arg>
</spark>
<ok to="prepare_project"/>
<error to="Kill"/>
</action>
<action name="prepare_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareProjects</name>
<class>eu.dnetlib.dhp.actionmanager.project.PrepareProjects</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--projectPath</arg><arg>${workingDir}/projects</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedProjects</arg>
<arg>--dbProjectPath</arg><arg>${workingDir}/dbProjects</arg>
</spark>
<ok to="create_updates"/>
<error to="Kill"/>
</action>
<action name="create_updates">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ProjectProgrammeAS</name>
<class>eu.dnetlib.dhp.actionmanager.project.SparkAtomicActionJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--projectPath</arg><arg>${workingDir}/preparedProjects</arg>
<arg>--programmePath</arg><arg>${workingDir}/preparedProgramme</arg>
<arg>--topicPath</arg><arg>${workingDir}/topic</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>