Compare commits

...

12 Commits

Author SHA1 Message Date
Miriam Baglioni 066e1dc772 added new dependency ag common-text 2021-06-21 09:26:40 +02:00
Miriam Baglioni 00dfaff973 refactoring and changed query that was wrong 2021-06-21 09:23:55 +02:00
Miriam Baglioni 464ac6301c added the reader for the dump of crossref 2021-06-21 09:22:29 +02:00
Miriam Baglioni c07f820c21 - 2021-06-21 09:16:31 +02:00
Miriam Baglioni 2740b95f99 - 2021-06-21 09:16:05 +02:00
Miriam Baglioni ca7e10b3c0 - 2021-06-21 09:15:43 +02:00
Miriam Baglioni 2f6673e678 - 2021-06-21 09:14:32 +02:00
Miriam Baglioni 0eda93b3eb - 2021-06-09 13:25:35 +02:00
Miriam Baglioni 72771a1254 - 2021-06-09 13:25:19 +02:00
Miriam Baglioni 6cdc4d3bf3 - 2021-05-31 11:07:24 +02:00
Miriam Baglioni a106353cee - 2021-05-31 11:00:30 +02:00
Miriam Baglioni 5d8257b288 added code for ircdl_extention 2021-05-31 10:59:58 +02:00
29 changed files with 2403 additions and 16 deletions

View File

@ -77,7 +77,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</dependency>
</dependencies>

View File

@ -0,0 +1,66 @@
package eu.dnetlib.dhp.ircdl_extention;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.ircdl_extention.model.Result;
public class PrepareCrossrefSpark {
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareCrossrefSpark.class
.getResourceAsStream(
"/eu/dnetlib/dhp/ircdl_extention/prepare_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
final String inputPath = parser.get("inputPath");
final String outputPath = parser.get("outputPath");
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
selectResult(spark, inputPath, outputPath);
});
}
private static Dataset<Result> selectResult(SparkSession spark, String input_path, String output_path) {
Dataset<Result> res = Utils
.readPath(
spark, input_path, Result.class)
.filter(
(FilterFunction<Result>) r -> !r.getId().startsWith("50|dedup") &&
r.getCf().stream().anyMatch(cf -> cf.getValue().equals("Crossref")));
res.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(output_path);
return res;
}
}

View File

@ -0,0 +1,81 @@
package eu.dnetlib.dhp.ircdl_extention;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.ircdl_extention.model.Result;
public class PrepareDataciteSpark {
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareDataciteSpark.class
.getResourceAsStream(
"/eu/dnetlib/dhp/ircdl_extention/prepare_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
final String inputPath = parser.get("inputPath");
final String outputPath = parser.get("outputPath");
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
exec(spark, outputPath, inputPath);
});
}
private static void exec(SparkSession spark, String output_path, String input_path) {
Dataset<Result> datacite = Utils
.readPath(
spark, input_path, Result.class)
.filter(
(FilterFunction<Result>) r -> r.getId().startsWith("50|datacite"));
datacite.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(output_path + "allDatacite");
getProviderResult(output_path, datacite, "Zenodo");
getProviderResult(output_path, datacite, "Figshare");
getProviderResult(output_path, datacite, "Dryad");
}
private static void getProviderResult(String output_path, Dataset<Result> datacite, String provider) {
datacite
.filter(
(FilterFunction<Result>) r -> r
.getPid()
.stream()
.anyMatch(p -> p.getKey().equals("doi") && p.getValue().contains(provider.toLowerCase())))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(output_path + provider);
}
}

View File

@ -0,0 +1,75 @@
package eu.dnetlib.dhp.ircdl_extention;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.ircdl_extention.model.Orcid;
public class PrepareNormalizedOrcid {
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareNormalizedOrcid.class
.getResourceAsStream(
"/eu/dnetlib/dhp/ircdl_extention/prepare_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
final String inputPath = parser.get("inputPath");
final String outputPath = parser.get("outputPath");
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
execNormalize(spark, outputPath, inputPath);
});
}
private static void execNormalize(SparkSession spark, String outputPath, String inputPath) {
Dataset<Orcid> orcid = Utils.readPath(spark, inputPath, Orcid.class);
orcid.map((MapFunction<Orcid, Orcid>) o -> {
o.setName(Utils.normalizeString(o.getName()));
o.setSurname(Utils.normalizeString(o.getSurname()));
o.setCreditname(Utils.normalizeString(o.getCreditname()));
o
.setOtherNames(
o
.getOtherNames()
.stream()
.map(on -> Utils.normalizeString(on))
.collect(Collectors.toList()));
return o;
}, Encoders.bean(Orcid.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
}

View File

@ -0,0 +1,88 @@
package eu.dnetlib.dhp.ircdl_extention;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.ircdl_extention.model.Result;
public class PrepareNormalizedResultSpark {
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareNormalizedResultSpark.class
.getResourceAsStream(
"/eu/dnetlib/dhp/ircdl_extention/prepare_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
final String inputPath = parser.get("inputPath");
final String outputPath = parser.get("outputPath");
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
execNormalize(spark, outputPath, inputPath);
});
}
private static void execNormalize(SparkSession spark, String outputPath, String inputPath) {
Dataset<Result> normalized_result = Utils
.readPath(spark, inputPath + "publicationsWithOrcid", Result.class)
.union(Utils.readPath(spark, inputPath + "datasetWithOrcid", Result.class))
.union(Utils.readPath(spark, inputPath + "softwareWithOrcid", Result.class))
.union(Utils.readPath(spark, inputPath + "otherWithOrcid", Result.class))
.map((MapFunction<Result, Result>) r -> {
r.setName(Utils.normalizeString(r.getName()));
r.setSurname(Utils.normalizeString(r.getSurname()));
r.setFullname(Utils.normalizeString(r.getFullname()));
return r;
}, Encoders.bean(Result.class));
normalized_result
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath + "ResultWithOrcid");
normalized_result
.filter((FilterFunction<Result>) r -> !r.getId().startsWith("50|dedup"))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath + "collectedResultWithOrcid");
normalized_result
.filter((FilterFunction<Result>) r -> !r.getDeletedbyinference())
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath + "notDeletedByInferenceResultWithOrcid");
}
}

View File

@ -0,0 +1,103 @@
package eu.dnetlib.dhp.ircdl_extention;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.ircdl_extention.model.Result;
import scala.Tuple2;
public class PrepareResultAllTheRestSpark {
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareResultAllTheRestSpark.class
.getResourceAsStream(
"/eu/dnetlib/dhp/ircdl_extention/prepare_alltherest_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
final String inputPath = parser.get("inputPath");
final String outputPath = parser.get("outputPath");
final String instRepoPath = parser.get("instRepoPath");
final String crossrefPath = parser.get("crossrefPath");
final String datacitePath = parser.get("datacitePath");
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath + "allTheRest");
exec(
spark, outputPath + "allTheRest",
inputPath, instRepoPath,
datacitePath, crossrefPath);
});
}
/**
* Leggo tutti i result di crossref, datacite ed associati agli institutional repositories
* Leggo tutti i result collezionati
* faccio una left join tra i result collezionati e quelli letti al passo precedente
* prendo quelli che non hanno un match nella join
* @param spark
* @param output_path
* @param result_path
*/
private static void exec(SparkSession spark, String output_path, String result_path, String inst_repo_path,
String datacite_path, String crossref_path) {
Dataset<Result> result = Utils.readPath(spark, result_path, Result.class);
Dataset<Result> inst_repo = Utils
.readPath(spark, inst_repo_path, Result.class);
Dataset<Result> datacite = Utils
.readPath(spark, datacite_path, Result.class);
Dataset<Result> crossref = Utils
.readPath(spark, crossref_path, Result.class);
Dataset<Result> union_dataset = inst_repo.union(datacite).union(crossref);
result
.joinWith(union_dataset, result.col("id").equalTo(union_dataset.col("id")), "left")
.map((MapFunction<Tuple2<Result, Result>, Result>) t2 -> {
if (!Optional.ofNullable(t2._2()).isPresent())
return t2._1();
return null;
}, Encoders.bean(Result.class))
.filter(Objects::nonNull)
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(output_path);
}
}

View File

@ -0,0 +1,92 @@
package eu.dnetlib.dhp.ircdl_extention;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.ircdl_extention.model.Result;
import eu.dnetlib.dhp.schema.oaf.Datasource;
public class PrepareResultFromInstRepo {
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareResultFromInstRepo.class
.getResourceAsStream(
"/eu/dnetlib/dhp/ircdl_extention/prepare_instrepo_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
final String inputPath = parser.get("inputPath");
final String outputPath = parser.get("outputPath");
final String datasourcePath = parser.get("datasourcePath");
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
selectResultFromInstRepo(spark, inputPath, outputPath, datasourcePath);
});
}
private static void selectResultFromInstRepo(SparkSession spark, String inputPath, String output_path,
String datasourcePath) {
Dataset<Datasource> datasource = Utils.readPath(spark, datasourcePath, Datasource.class);
Dataset<Result> res = Utils
.readPath(
spark, inputPath, Result.class)
.filter(
(FilterFunction<Result>) r -> !r.getId().startsWith("50|doiboost")
&& !r.getId().startsWith("50|scholix")
&& !r.getId().startsWith("50|datacite")
&& !r.getId().startsWith("50|dedup"));
datasource.createOrReplaceTempView("datasource");
res.createOrReplaceTempView("result");
spark
.sql(
"SELECT t.id, t.deletedbyinference, t.name, t.surname, t.cf, t.fullname, t.pid, t.oid " +
"FROM " +
"(Select * " +
"from result " +
"LATERAL VIEW explode(cf.key) c as cfromkey) as t " +
"join " +
"datasource d " +
"on " +
"d.id = t.cfromkey " +
"and d.datasourcetype.classid = 'pubsrepository::institutional'")
.as(Encoders.bean(Result.class))
.write()
.option("compressio", "gzip")
.mode(SaveMode.Overwrite)
.json(output_path);
}
}

View File

@ -0,0 +1,119 @@
package eu.dnetlib.dhp.ircdl_extention;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.ircdl_extention.model.KeyValue;
import eu.dnetlib.dhp.ircdl_extention.model.Result;
import eu.dnetlib.dhp.schema.common.ModelConstants;
public class PrepareResultSpark {
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareResultSpark.class
.getResourceAsStream(
"/eu/dnetlib/dhp/ircdl_extention/prepare_result_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final String resultClassName = parser.get("resultClass");
Class<? extends eu.dnetlib.dhp.schema.oaf.Result> resultClazz = (Class<? extends eu.dnetlib.dhp.schema.oaf.Result>) Class
.forName(resultClassName);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
final String inputPath = parser.get("inputPath");
final String outputPath = parser.get("outputPath");
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
mapToResult(spark, inputPath, resultClazz, outputPath);
});
}
private static <R extends eu.dnetlib.dhp.schema.oaf.Result> void mapToResult(SparkSession spark,
String input_path,
Class<R> resultClazz, String output_path) {
Dataset<R> publicationDataset = Utils.readPath(spark, input_path, resultClazz);
Dataset<Result> result = publicationDataset.filter((FilterFunction<R>) p -> {
if (p.getAuthor() == null)
return false;
if (p.getAuthor().size() == 0)
return false;
return true;
})
.flatMap((FlatMapFunction<R, Result>) p -> {
List<Result> reslist = new ArrayList<>();
p.getAuthor().forEach(a -> {
a.getPid().forEach(apid -> {
if (apid.getQualifier().getClassid().equals(ModelConstants.ORCID)
|| apid.getQualifier().getClassid().equals(ModelConstants.ORCID_PENDING)) {
Result r = new Result();
r.setDeletedbyinference(p.getDataInfo().getDeletedbyinference());
r.setId(p.getId());
r
.setCf(
p
.getCollectedfrom()
.stream()
.map(cf -> KeyValue.newInstance(cf.getKey(), cf.getValue()))
.collect(Collectors.toList()));
r
.setPid(
p
.getPid()
.stream()
.map(
pid -> KeyValue
.newInstance(pid.getQualifier().getClassid(), pid.getValue()))
.collect(Collectors.toList()));
r.setName(a.getName());
r.setSurname(a.getSurname());
r.setFullname(a.getFullname());
r.setOid(apid.getValue());
reslist.add(r);
}
});
});
return reslist.iterator();
}, Encoders.bean(Result.class));
result
.write()
.option("compressio", "gzip")
.mode(SaveMode.Overwrite)
.json(output_path);
}
}

View File

@ -0,0 +1,199 @@
package eu.dnetlib.dhp.ircdl_extention;
import java.io.Serializable;
import java.text.Normalizer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.similarity.CosineDistance;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wcohen.ss.JaroWinkler;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.ircdl_extention.model.Orcid;
import eu.dnetlib.dhp.ircdl_extention.model.Result;
import scala.Tuple2;
public class Utils implements Serializable {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static String normalizeString(String input) {
if (input == null || input.equals("void"))
return new String();
String tmp = Normalizer
.normalize(input, Normalizer.Form.NFKD)
.replaceAll("[^\\p{ASCII}]", "");
tmp = tmp
.replaceAll("[^\\p{Alpha}]+", " ")
.replaceAll("\\s+", " ")
.trim();
return tmp;
}
public static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
public static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
private static List<String> getList(List<String> input) {
return input.stream().map(st -> st.trim()).filter(st -> st.length() > 0).collect(Collectors.toList());
}
public static boolean filterFunction(Tuple2<Result, Orcid> input) {
List<String> res = getList(Arrays.asList(input._1().getFullname().split(" ")))
.stream()
.sorted()
.collect(Collectors.toList());
Orcid or = input._2();
List<String> tmp = new ArrayList<>();
Collections.addAll(tmp, or.getName().split(" "));
Collections.addAll(tmp, or.getSurname().split(" "));
return checkContains(
res, getList(tmp)
.stream()
.sorted()
.collect(Collectors.toList()))
||
checkContains(
res, getList(Arrays.asList(or.getCreditname().split(" ")))
.stream()
.sorted()
.collect(Collectors.toList()))
||
or
.getOtherNames()
.stream()
.anyMatch(
on -> checkContains(
res, getList(Arrays.asList(on.split(" ")))
.stream()
.sorted()
.collect(Collectors.toList())));
}
private static boolean checkContains(List<String> result, List<String> orcid) {
if (result.size() == 0 || orcid.size() == 0) {
return true;
}
String[][] input = {
{
"1", StringUtils.joinWith(" ", result)
},
{
"2", StringUtils.joinWith(" ", orcid)
}
};
// exact match word by word
Double cosineDistance = new CosineDistance().apply(input[0][1], input[1][1]);
if (Math.round((1 - cosineDistance) * 100) == 100) {
return true;
}
// check containment one list can be greater than the other, and also composition of words to create the name
// e.g. pengli yan = li peng yan
if (orcid.size() < result.size()) {
if (isIn(orcid, result))
return true;
} else {
if (isIn(result, orcid))
return true;
}
// apply JaroWinkler distance
double score = new JaroWinkler()
.score(StringUtils.joinWith(" ", result), StringUtils.joinWith(" ", orcid));
return score > 0.95;
}
private static boolean isIn(List<String> lst1, List<String> lst2) {
int index = 0;
for (String word : lst1) {
int i = index;
boolean found = false;
while (i < lst2.size()) {
String wordlist = lst2.get(i);
if (word.equals(wordlist)) {
index = i + 1;
i = lst2.size();
found = true;
} else {
if (word.charAt(0) < wordlist.charAt(0)) {
if (!checkComposition(word, lst2)) {
return false;
} else {
index = 0;
i = lst2.size();
found = true;
}
} else {
if (word.length() == 1 || wordlist.length() == 1) {
if (word.charAt(0) == wordlist.charAt(0)) {
index = i + 1;
i = lst2.size();
found = true;
} else {
i++;
}
} else {
i++;
}
}
}
}
if (!found) {
if (!checkComposition(word, lst2)) {
return false;
} else {
index = 0;
}
}
}
return true;
}
private static boolean checkComposition(String word, List<String> lst2) {
for (int i = 0; i < lst2.size(); i++) {
for (int j = 0; j < lst2.size(); j++) {
if (i != j) {
String w = lst2.get(i) + lst2.get(j);
if (word.equals(w)) {
if (i > j) {
lst2.remove(i);
lst2.remove(j);
} else {
lst2.remove(j);
lst2.remove(i);
}
return true;
}
}
}
}
return false;
}
}

View File

@ -0,0 +1,154 @@
package eu.dnetlib.dhp.ircdl_extention;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapGroupsFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.ircdl_extention.model.Orcid;
import eu.dnetlib.dhp.ircdl_extention.model.Result;
import eu.dnetlib.dhp.ircdl_extention.model.ShuffleInfo;
import scala.Tuple2;
public class WrongSpark {
/**
* takes as input the orcid normalized and the entry normalized to be checked against orcid
* returns the lower bound of wrong attribution
*/
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
WrongSpark.class
.getResourceAsStream(
"/eu/dnetlib/dhp/ircdl_extention/wrong_orcid_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
final String orcidPath = parser.get("orcidPath");
final String outputPath = parser.get("outputPath");
final String resultPath = parser.get("inputPath");
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
findWrong(spark, orcidPath, outputPath + "/wrong", resultPath);
findShuffle(spark, orcidPath, outputPath + "/shuffle", resultPath);
});
}
private static void findShuffle(SparkSession spark, String orcidPath, String outputPath, String resultPath) {
Utils
.readPath(spark, resultPath, Result.class)
.map(
(MapFunction<Result, ShuffleInfo>) r -> ShuffleInfo
.newInstance(r.getName(), r.getSurname(), r.getFullname(), r.getId()),
Encoders.bean(ShuffleInfo.class))
.union(
getWrong(spark, orcidPath, resultPath)
.map((MapFunction<Tuple2<Result, Orcid>, ShuffleInfo>) t2 ->
ShuffleInfo
.newInstance(
t2._1().getName(), t2._1().getSurname(), t2._1().getFullname(),
t2._1().getId(), t2._2().getName(), t2._2().getSurname(),
t2._2().getCreditname(), t2._2().getOtherNames()),
Encoders.bean(ShuffleInfo.class)))
.groupByKey((MapFunction<ShuffleInfo, String>) si -> si.getId(), Encoders.STRING())
.flatMapGroups((FlatMapGroupsFunction<String, ShuffleInfo, ShuffleInfo>) (s, it) -> {
List<ShuffleInfo> shuffleInfoList = new ArrayList();
List<ShuffleInfo> ret = new ArrayList<>();
shuffleInfoList.add(it.next());
it.forEachRemaining(e -> shuffleInfoList.add(e));
shuffleInfoList
.stream()
.filter(e -> Optional.ofNullable(e.getOrcid()).isPresent())
.forEach(e -> {
if (checkShuffle(e, shuffleInfoList))
ret.add(e);
});
return ret.iterator();
}, Encoders.bean(ShuffleInfo.class))
.filter(Objects::nonNull);
/*
* def checkShuffle(x): alis = [a for a in x[1]] dic = {} count = 0 for entry in alis: if entry['orcid'] != '':
* dic[entry['orcid']] = entry for orcid in dic: name = dic[orcid]['oname'] surname = dic[orcid]['osurname'] for
* author in alis: if author['aname'] == "" or author['asurname'] == "": if checkContains([author['afullname']],
* addInListAll([], name + " " + surname)): count += 1 break else: if checkContains([author['aname'] + " " +
* author['asurname']], addInListAll([], name + " " + surname)): count += 1 break return count
*/
// candidate_shuffle = zenodo_normalized.map(lambda x: (x['id'], {'aname':x['name'], 'asurname': x['surname'],
// 'afullname': x['fullname'], 'oname':"", 'osurname':"", 'orcid':''})). union (
// join_orcid_filtered.map(lambda e: (e['id'], {'aname':e['nameg'], 'asurname':e['surnameg'],
// 'afullname':e['fullnameg'], 'oname':e['name'],
// 'osurname':e['surname'],'orcid':e['orcid']}))).groupByKey().filter(toBeChecked)
}
private static boolean checkShuffle(ShuffleInfo e, List<ShuffleInfo> shuffleInfoList) {
return shuffleInfoList
.stream()
.anyMatch(
si -> Utils
.filterFunction(
new Tuple2<>(Result.newInstance(si.getAfullname()),
Orcid
.newInstance(
si.getOname(), si.getOsurname(), si.getOcreditName(), si.getoOtherNames()))));
}
private static Dataset<Tuple2<Result, Orcid>> getWrong(SparkSession spark, String orcidPath, String resultPath) {
Dataset<Orcid> orcidDataset = Utils
.readPath(spark, orcidPath, Orcid.class)
.filter((FilterFunction<Orcid>) o -> !o.getName().contains("deactivated"));
Dataset<Result> resultDataset = Utils.readPath(spark, resultPath, Result.class);
return resultDataset
.joinWith(
orcidDataset, resultDataset
.col("oid")
.equalTo(orcidDataset.col("orcid")),
"inner")
.filter((FilterFunction<Tuple2<Result, Orcid>>) t2 -> !Utils.filterFunction(t2));
}
private static void findWrong(SparkSession spark, String orcidPath, String outputPath, String resultPath) {
getWrong(spark, orcidPath, resultPath)
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
}

View File

@ -0,0 +1,33 @@
package eu.dnetlib.dhp.ircdl_extention.model;
import java.io.Serializable;
public class KeyValue implements Serializable {
private String key;
private String value;
public static KeyValue newInstance(String key, String value) {
KeyValue kv = new KeyValue();
kv.key = key;
kv.value = value;
return kv;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}

View File

@ -0,0 +1,90 @@
package eu.dnetlib.dhp.ircdl_extention.model;
import java.io.Serializable;
import java.util.List;
public class Orcid implements Serializable {
private List<String> otherNames;
private String inception;
private String surname;
private String mode;
private String creditname;
private String orcid;
private Boolean works;
private String name;
public static Orcid newInstance(String oname, String osurname, String ocreditName, List<String> getoOtherNames) {
Orcid o = new Orcid();
o.name = oname;
o.surname = osurname;
o.creditname = ocreditName;
o.otherNames = getoOtherNames;
return o;
}
public List<String> getOtherNames() {
return otherNames;
}
public void setOtherNames(List<String> otherNames) {
this.otherNames = otherNames;
}
public String getInception() {
return inception;
}
public void setInception(String inception) {
this.inception = inception;
}
public String getSurname() {
return surname;
}
public void setSurname(String surname) {
this.surname = surname;
}
public String getMode() {
return mode;
}
public void setMode(String mode) {
this.mode = mode;
}
public String getCreditname() {
return creditname;
}
public void setCreditname(String creditname) {
this.creditname = creditname;
}
public String getOrcid() {
return orcid;
}
public void setOrcid(String oid) {
this.orcid = oid;
}
public Boolean getWorks() {
return works;
}
public void setWorks(Boolean works) {
this.works = works;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}

View File

@ -0,0 +1,96 @@
package eu.dnetlib.dhp.ircdl_extention.model;
import java.io.Serializable;
import java.util.List;
public class Result implements Serializable {
private Boolean deletedbyinference;
private String id;
private List<KeyValue> cf;
private List<KeyValue> pid;
private String name;
private String surname;
private String fullname;
private String oid;
public static Result newInstance(String afullname) {
Result r = new Result();
r.fullname = afullname;
return r;
}
public Boolean getDeletedbyinference() {
return deletedbyinference;
}
public void setDeletedbyinference(Boolean deletedbyinference) {
this.deletedbyinference = deletedbyinference;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public List<KeyValue> getCf() {
return cf;
}
public void setCf(List<KeyValue> cf) {
this.cf = cf;
}
public List<KeyValue> getPid() {
return pid;
}
public void setPid(List<KeyValue> pid) {
this.pid = pid;
}
public String getName() {
return name;
}
public void setName(String name) {
if (name != null)
this.name = name.toLowerCase();
else
this.name = new String();
}
public String getSurname() {
return surname;
}
public void setSurname(String surname) {
if (surname != null)
this.surname = surname.toLowerCase();
else
this.surname = new String();
}
public String getFullname() {
return fullname;
}
public void setFullname(String fullname) {
if (fullname != null)
this.fullname = fullname.toLowerCase();
else
this.fullname = new String();
}
public String getOid() {
return oid;
}
public void setOid(String oid) {
this.oid = oid;
}
}

View File

@ -0,0 +1,114 @@
package eu.dnetlib.dhp.ircdl_extention.model;
import java.io.Serializable;
import java.util.List;
public class ShuffleInfo implements Serializable {
private String aname;
private String asurname;
private String afullname;
private String oname;
private String osurname;
private String ocreditName;
private List<String> oOtherNames;
private String orcid;
private String id;
public String getAname() {
return aname;
}
public void setAname(String aname) {
this.aname = aname;
}
public String getAsurname() {
return asurname;
}
public void setAsurname(String asurname) {
this.asurname = asurname;
}
public String getAfullname() {
return afullname;
}
public void setAfullname(String afullname) {
this.afullname = afullname;
}
public String getOname() {
return oname;
}
public void setOname(String oname) {
this.oname = oname;
}
public String getOsurname() {
return osurname;
}
public void setOsurname(String osurname) {
this.osurname = osurname;
}
public String getOcreditName() {
return ocreditName;
}
public void setOcreditName(String ocreditName) {
this.ocreditName = ocreditName;
}
public List<String> getoOtherNames() {
return oOtherNames;
}
public void setoOtherNames(List<String> oOtherNames) {
this.oOtherNames = oOtherNames;
}
public String getOrcid() {
return orcid;
}
public void setOrcid(String orcid) {
this.orcid = orcid;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public static ShuffleInfo newInstance(String aname, String asurname, String afullname, String id) {
ShuffleInfo si = new ShuffleInfo();
si.afullname = afullname;
si.aname = aname;
si.asurname = asurname;
si.id = id;
return si;
}
public static ShuffleInfo newInstance(String aname, String asurname, String afullname, String id, String oname,
String osurname, String ocredtname, List<String> oOthername) {
ShuffleInfo si = new ShuffleInfo();
si.afullname = afullname;
si.aname = aname;
si.asurname = asurname;
si.id = id;
si.oname = oname;
si.osurname = osurname;
si.ocreditName = ocredtname;
si.oOtherNames = oOthername;
return si;
}
}

View File

@ -0,0 +1,58 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
<property>
<name>sparkExecutorNumber</name>
<value>4</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>sparkDriverMemory</name>
<value>15G</value>
</property>
<property>
<name>sparkExecutorMemory</name>
<value>6G</value>
</property>
<property>
<name>sparkExecutorCores</name>
<value>1</value>
</property>
</configuration>

View File

@ -0,0 +1,479 @@
<workflow-app name="IRCDL Extention" xmlns="uri:oozie:workflow:0.5">
<start to="deleteoutputpath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="deleteoutputpath">
<fs>
<delete path='${outputPath}'/>
<mkdir path='${outputPath}'/>
<delete path='${workingDir}'/>
<mkdir path='${workingDir}'/>
</fs>
<ok to="fork_prepare"/>
<error to="Kill"/>
</action>
<fork name="fork_prepare">
<path start="fork_prepare_result"/>
<path start="prepare_orcid"/>
</fork>
<action name="prepare_orcid">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareResult</name>
<class>eu.dnetlib.dhp.ircdl_extention.PrepareNormalizedOrcid</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${orcidInputPath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
</spark>
<ok to="join_fork"/>
<error to="Kill"/>
</action>
<fork name="fork_prepare_result">
<path start="prepare_publication"/>
<path start="prepare_dataset"/>
<path start="prepare_software"/>
<path start="prepare_other"/>
</fork>
<action name="prepare_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareResult</name>
<class>eu.dnetlib.dhp.ircdl_extention.PrepareResultSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/publication</arg>
<arg>--resultClass</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/publicationsWithOrcid</arg>
</spark>
<ok to="wait_prepare_result"/>
<error to="Kill"/>
</action>
<action name="prepare_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareResult</name>
<class>eu.dnetlib.dhp.ircdl_extention.PrepareResultSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/dataset</arg>
<arg>--resultClass</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/datasetWithOrcid</arg>
</spark>
<ok to="wait_prepare_result"/>
<error to="Kill"/>
</action>
<action name="prepare_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareResult</name>
<class>eu.dnetlib.dhp.ircdl_extention.PrepareResultSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/software</arg>
<arg>--resultClass</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/softwareWithOrcid</arg>
</spark>
<ok to="wait_prepare_result"/>
<error to="Kill"/>
</action>
<action name="prepare_other">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareResult</name>
<class>eu.dnetlib.dhp.ircdl_extention.PrepareResultSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg>
<arg>--resultClass</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/otherWithOrcid</arg>
</spark>
<ok to="wait_prepare_result"/>
<error to="Kill"/>
</action>
<join name="wait_prepare_result" to="normalize_result"/>
<action name="normalize_result">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareResult</name>
<class>eu.dnetlib.dhp.ircdl_extention.PrepareNormalizedResultSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/</arg>
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/Normalized/</arg>
</spark>
<ok to="fork_get_result_info"/>
<error to="Kill"/>
</action>
<fork name="fork_get_result_info">
<path start="get_result_instrepo"/>
<path start="get_result_datacite"/>
<path start="get_result_crossref"/>
</fork>
<action name="get_result_instrepo">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GetResultInstRepo</name>
<class>eu.dnetlib.dhp.ircdl_extention.PrepareResultFromInstRepo</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Normalized/ResultWithOrcid/</arg>
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/InstRepo/</arg>
<arg>--datasourcePath</arg><arg>${datasourcePath}</arg>
</spark>
<ok to="wait_res_info"/>
<error to="Kill"/>
</action>
<action name="get_result_datacite">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GetResultInstRepo</name>
<class>eu.dnetlib.dhp.ircdl_extention.PrepareDataciteSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Normalized/ResultWithOrcid/</arg>
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/Datacite/</arg>
</spark>
<ok to="wait_res_info"/>
<error to="Kill"/>
</action>
<action name="get_result_crossref">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GetResultInstRepo</name>
<class>eu.dnetlib.dhp.ircdl_extention.PrepareCrossrefSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Normalized/ResultWithOrcid/</arg>
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/Crossref/</arg>
</spark>
<ok to="wait_res_info"/>
<error to="Kill"/>
</action>
<join name="wait_res_info" to="get_result_alltherest"/>
<action name="get_result_alltherest">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GetResultInstRepo</name>
<class>eu.dnetlib.dhp.ircdl_extention.PrepareResultAllTheRestSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Normalized/ResultWithOrcid/</arg>
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/AllTheRest/</arg>
<arg>--instRepoPath</arg><arg>${workingDir}/GRAPH/InstRepo/</arg>
<arg>--datacitePath</arg><arg>${workingDir}/GRAPH/Datacite/</arg>
<arg>--crossrefPath</arg><arg>${workingDir}/GRAPH/Crossref/</arg>
</spark>
<ok to="join_fork"/>
<error to="Kill"/>
</action>
<join name="join_fork" to="fork_get_wrong"/>
<fork name="fork_get_wrong">
<path start="get_wrong_instrepo"/>
<path start="get_wrong_datacite"/>
<path start="get_wrong_crossref"/>
<path start="get_wrong_alltherest"/>
<path start="get_wrong_zenodo"/>
<path start="get_wrong_figshare"/>
<path start="get_wrong_dryad"/>
</fork>
<action name="get_wrong_instrepo">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GetResultInstRepo</name>
<class>eu.dnetlib.dhp.ircdl_extention.WrongSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/InstRepo/</arg>
<arg>--outputPath</arg><arg>${outputPath}/InstRepo/</arg>
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
</spark>
<ok to="jojn_wrong"/>
<error to="Kill"/>
</action>
<action name="get_wrong_datacite">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GetResultInstRepo</name>
<class>eu.dnetlib.dhp.ircdl_extention.WrongSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Datacite/allDatacite/</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
</spark>
<ok to="jojn_wrong"/>
<error to="Kill"/>
</action>
<action name="get_wrong_crossref">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GetResultInstRepo</name>
<class>eu.dnetlib.dhp.ircdl_extention.WrongSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Crossref/</arg>
<arg>--outputPath</arg><arg>${outputPath}/Crossref/</arg>
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
</spark>
<ok to="jojn_wrong"/>
<error to="Kill"/>
</action>
<action name="get_wrong_alltherest">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GetResultInstRepo</name>
<class>eu.dnetlib.dhp.ircdl_extention.WrongSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/AllTheRest/</arg>
<arg>--outputPath</arg><arg>${outputPath}/AllTheRest/</arg>
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
</spark>
<ok to="jojn_wrong"/>
<error to="Kill"/>
</action>
<action name="get_wrong_zenodo">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GetResultInstRepo</name>
<class>eu.dnetlib.dhp.ircdl_extention.WrongSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Datacite/Zenodo/</arg>
<arg>--outputPath</arg><arg>${outputPath}/Zenodo/</arg>
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
</spark>
<ok to="jojn_wrong"/>
<error to="Kill"/>
</action>
<action name="get_wrong_figshare">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GetResultInstRepo</name>
<class>eu.dnetlib.dhp.ircdl_extention.WrongSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Datacite/Figshare/</arg>
<arg>--outputPath</arg><arg>${outputPath}/Figshare/</arg>
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
</spark>
<ok to="jojn_wrong"/>
<error to="Kill"/>
</action>
<action name="get_wrong_dryad">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GetResultInstRepo</name>
<class>eu.dnetlib.dhp.ircdl_extention.WrongSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Datacite/Dryad/</arg>
<arg>--outputPath</arg><arg>${outputPath}/Dryad/</arg>
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
</spark>
<ok to="jojn_wrong"/>
<error to="Kill"/>
</action>
<join name="jojn_wrong" to="End"/>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,35 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "ip",
"paramLongName": "inputPath",
"paramDescription": "the URL from where to get the programme file",
"paramRequired": true
},
{
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
},{
"paramName": "ir",
"paramLongName": "instRepoPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
},{
"paramName": "dp",
"paramLongName": "datacitePath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
},{
"paramName": "cp",
"paramLongName": "crossrefPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
}
]

View File

@ -0,0 +1,26 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "ip",
"paramLongName": "inputPath",
"paramDescription": "the URL from where to get the programme file",
"paramRequired": true
},
{
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
},
{
"paramName": "dp",
"paramLongName": "datasourcePath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
}
]

View File

@ -0,0 +1,20 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "ip",
"paramLongName": "inputPath",
"paramDescription": "the URL from where to get the programme file",
"paramRequired": true
},
{
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
}
]

View File

@ -0,0 +1,26 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "ip",
"paramLongName": "inputPath",
"paramDescription": "the URL from where to get the programme file",
"paramRequired": true
},
{
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
},
{
"paramName": "rc",
"paramLongName": "resultClass",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
}
]

View File

@ -0,0 +1,26 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "op",
"paramLongName": "orcidPath",
"paramDescription": "the URL from where to get the programme file",
"paramRequired": true
},
{
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
},
{
"paramName": "ip",
"paramLongName": "inputPath",
"paramDescription": "thepath of the new ActionSet",
"paramRequired": true
}
]

View File

@ -0,0 +1,98 @@
package eu.dnetlib.dhp.ircdl_extention;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.SparkAtomicActionJob;
import eu.dnetlib.dhp.actionmanager.project.SparkUpdateProjectTest;
import eu.dnetlib.dhp.ircdl_extention.model.Orcid;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.Project;
public class NormalizeOrcidTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ClassLoader cl = eu.dnetlib.dhp.ircdl_extention.NormalizeOrcidTest.class
.getClassLoader();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(eu.dnetlib.dhp.ircdl_extention.NormalizeOrcidTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(eu.dnetlib.dhp.ircdl_extention.NormalizeOrcidTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(eu.dnetlib.dhp.ircdl_extention.NormalizeOrcidTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
// conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
// conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(NormalizeOrcidTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
public void normalizeOrcid() throws Exception {
PrepareNormalizedOrcid
.main(
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
getClass()
.getResource(
"/eu/dnetlib/dhp/ircdl_extention/orcid_original.json")
.getPath(),
"-outputPath",
workingDir.toString() + "/orcidNormalized"
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Orcid> tmp = sc
.textFile(workingDir.toString() + "/orcidNormalized")
.map(value -> OBJECT_MAPPER.readValue(value, Orcid.class));
tmp.foreach(v -> System.out.println(OBJECT_MAPPER.writeValueAsString(v)));
}
}

View File

@ -0,0 +1,95 @@
package eu.dnetlib.dhp.ircdl_extention;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.neethi.Assertion;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.ircdl_extention.model.Orcid;
import eu.dnetlib.dhp.ircdl_extention.model.Result;
import scala.Tuple2;
public class WrongOrcidTest {
@Test
public void wrongOrcidFalse() throws Exception {
Assertions
.assertTrue(
Utils
.filterFunction(
new Tuple2<>(Result.newInstance("veigas pires cristina"),
Orcid
.newInstance(
"cristina", "veiga pires", "c veiga pires",
Arrays.asList("c c veiga pires")))));
}
@Test
public void wrongOrcidFalse2() throws Exception {
Assertions
.assertTrue(
Utils
.filterFunction(
new Tuple2<>(Result.newInstance("yushkevich p"),
Orcid
.newInstance(
"paul", "yushkevich", "paul a yushkevich",
new ArrayList<>()))));
}
@Test
public void wrongOrcidFalse3() throws Exception {
Assertions
.assertTrue(
Utils
.filterFunction(
new Tuple2<>(Result.newInstance("ghosh ss"),
Orcid
.newInstance(
"satrajit", "ghosh",
"satra",
Arrays.asList("satra","satrajit s ghosh")))));
}
@Test
public void wrongOrcidTrue() throws Exception {
Assertions
.assertFalse(
Utils
.filterFunction(
new Tuple2<>(Result.newInstance("kluft lukas"),
Orcid
.newInstance(
"satrajit", "ghosh",
"satra",
Arrays.asList("satra","satrajit s ghosh")))));
}
@Test
public void wrongOrcidFalse4() throws Exception {
Assertions
.assertTrue(
Utils
.filterFunction(
new Tuple2<>(Result.newInstance("schulz s a"),
Orcid
.newInstance(
"sebastian", "schulz",
"sebastian a schulz",
new ArrayList<>()))));
}
}

View File

@ -0,0 +1,29 @@
{"otherNames": [], "inception": "2017-05-22T16:38:30.236Z", "surname": "hyy37", "mode": "Direct", "creditname": "void", "orcid": "0000-0002-8748-6992", "works": false, "name": "1380"}
{"otherNames": [], "inception": "2017-05-25T12:50:48.761Z", "surname": "hyy75", "mode": "Direct", "creditname": "void", "orcid": "0000-0001-7773-1109", "works": false, "name": "2775"}
{"otherNames": [], "inception": "2017-05-28T12:07:09.154Z", "surname": "hyy13", "mode": "Direct", "creditname": "void", "orcid": "0000-0003-4728-6379", "works": false, "name": "434323"}
{"otherNames": [], "inception": "2017-08-10T07:07:23.818Z", "surname": "hyy44", "mode": "Direct", "creditname": "void", "orcid": "0000-0001-9502-3093", "works": false, "name": "58"}
{"otherNames": [], "inception": "2017-08-10T07:08:48.179Z", "surname": "hyy46", "mode": "Direct", "creditname": "void", "orcid": "0000-0003-2933-0057", "works": false, "name": "60"}
{"otherNames": ["pang x y", "pang xueyong"], "inception": "2014-10-13T03:26:21.741Z", "surname": "?", "mode": "API", "creditname": "void", "orcid": "0000-0002-7397-5824", "works": true, "name": "??"}
{"otherNames": [], "inception": "2019-08-27T07:55:06.340Z", "surname": "therasa alphonsa", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0001-7205-6036", "works": false, "name": "a"}
{"otherNames": ["minto"], "inception": "2020-08-02T06:33:18.620Z", "surname": "karim", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0001-6111-6742", "works": false, "name": "a k mohammad fazlul"}
{"otherNames": [], "inception": "2014-05-01T09:13:11.783Z", "surname": "al-sammak", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0001-6646-4295", "works": false, "name": "a-imam"}
{"otherNames": [], "inception": "2019-12-06T12:53:04.045Z", "surname": "hassan", "mode": "Direct", "creditname": "void", "orcid": "0000-0003-2957-4641", "works": false, "name": "a-s.u."}
{"otherNames": [], "inception": "2020-07-28T12:29:26.453Z", "surname": "ajakh", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0002-1081-8426", "works": false, "name": "a."}
{"otherNames": [], "inception": "2017-01-10T12:35:05.016Z", "surname": "antol\u00ednez", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0002-5451-3421", "works": false, "name": "a. (ana)"}
{"otherNames": [], "inception": "2018-08-20T05:00:15.964Z", "surname": "mahmudi", "mode": "Direct", "creditname": "void", "orcid": "0000-0003-3187-941X", "works": false, "name": "a. aviv"}
{"otherNames": [], "inception": "2017-05-13T01:03:58.949Z", "surname": "akanmu", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0001-6223-5428", "works": false, "name": "a. c."}
{"otherNames": [], "inception": "2018-01-20T02:58:05.199Z", "surname": "inci", "mode": "Direct", "creditname": "void", "orcid": "0000-0002-0427-9745", "works": true, "name": "a. can"}
{"otherNames": ["a. kim ryan"], "inception": "2014-10-24T23:06:43.544Z", "surname": "hayes", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0002-2055-8269", "works": true, "name": "a. kim"}
{"otherNames": [], "inception": "2017-08-10T13:38:29.172Z", "surname": "bahadir", "mode": "Direct", "creditname": "void", "orcid": "0000-0002-4045-0001", "works": false, "name": "a. tugba"}
{"otherNames": [], "inception": "2018-08-29T07:49:31.093Z", "surname": "rayna", "mode": "Direct", "creditname": "void", "orcid": "0000-0002-7916-2031", "works": false, "name": "a.brite"}
{"otherNames": [], "inception": "2014-07-12T08:02:39.568Z", "surname": "kalyani", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0003-2649-7126", "works": false, "name": "a.grace"}
{"otherNames": [], "inception": "2018-07-21T12:00:22.042Z", "surname": "ahmed", "mode": "Direct", "creditname": "void", "orcid": "0000-0003-0777-5848", "works": false, "name": "a.i. mahbub uddin"}
{"otherNames": [], "inception": "2018-04-11T13:58:53.355Z", "surname": "a.kathirvel murugan", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0003-2298-6301", "works": false, "name": "a.kathirvel murugan"}
{"otherNames": [], "inception": "2017-08-31T11:35:48.559Z", "surname": "dar", "mode": "Direct", "creditname": "void", "orcid": "0000-0001-8781-6309", "works": false, "name": "a.rashid"}
{"otherNames": [], "inception": "2014-08-26T00:25:30.968Z", "surname": "sayem", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0003-2461-4667", "works": false, "name": "a.s.m."}
{"otherNames": [], "inception": "2019-10-03T01:27:08.212Z", "surname": "conte", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0003-2862-6139", "works": false, "name": "aaron"}
{"otherNames": [], "inception": "2020-03-16T09:37:10.610Z", "surname": "rashmi", "mode": "Direct", "creditname": "void", "orcid": "0000-0003-4754-5465", "works": false, "name": "aarthi rashmi b"}
{"otherNames": [], "inception": "2017-02-28T19:01:59.146Z", "surname": "bhaskar", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0002-5794-1165", "works": false, "name": "aastha"}
{"otherNames": [], "inception": "2020-04-07T18:10:50.922Z", "surname": "belhabib", "mode": "Direct", "creditname": "void", "orcid": "0000-0001-6086-0588", "works": false, "name": "abdelfettah"}
{"otherNames": [], "inception": "2019-01-13T21:50:51.923Z", "surname": "laamani", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0003-2055-2593", "works": false, "name": "abdellatif"}
{"otherNames": ["fákē", "miñhō"], "inception": "2019-01-13T21:50:51.923Z", "surname": "laamani", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0003-2055-2593", "works": false, "name": "abdellatif"}

View File

@ -0,0 +1,65 @@
package eu.dnetlib.doiboost.crossref;
import java.io.BufferedOutputStream;
import java.net.URI;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.mortbay.log.Log;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class ExtractCrossrefRecords {
public static void main(String[] args) throws Exception {
String hdfsServerUri;
String workingPath;
String crossrefFileNameTarGz;
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
ExtractCrossrefRecords.class
.getResourceAsStream(
"/eu/dnetlib/dhp/doiboost/crossref_dump_reader.json")));
parser.parseArgument(args);
hdfsServerUri = parser.get("hdfsServerUri");
workingPath = parser.get("workingPath");
crossrefFileNameTarGz = parser.get("crossrefFileNameTarGz");
Path hdfsreadpath = new Path(hdfsServerUri.concat(workingPath).concat(crossrefFileNameTarGz));
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsServerUri.concat(workingPath));
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
FileSystem fs = FileSystem.get(URI.create(hdfsServerUri.concat(workingPath)), conf);
FSDataInputStream crossrefFileStream = fs.open(hdfsreadpath);
try (TarArchiveInputStream tais = new TarArchiveInputStream(
new GzipCompressorInputStream(crossrefFileStream))) {
TarArchiveEntry entry = null;
while ((entry = tais.getNextTarEntry()) != null) {
if (entry.isDirectory()) {
} else {
try (
FSDataOutputStream out = fs
.create(new Path(workingPath.concat("filess/").concat(entry.getName()).concat(".gz")));
GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) {
IOUtils.copy(tais, gzipOs);
}
}
}
}
Log.info("Crossref dump reading completed");
}
}

View File

@ -0,0 +1,7 @@
[
{"paramName":"n", "paramLongName":"hdfsServerUri", "paramDescription": "the server uri", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the default work path", "paramRequired": true},
{"paramName":"f", "paramLongName":"crossrefFileNameTarGz", "paramDescription": "the name of the activities orcid file", "paramRequired": true},
{"paramName":"issm", "paramLongName":"isSparkSessionManaged", "paramDescription": "the name of the activities orcid file", "paramRequired": false}
]

View File

@ -0,0 +1,42 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
</property>
</configuration>

View File

@ -0,0 +1,68 @@
<workflow-app name="read Crossref dump from HDFS" xmlns="uri:oozie:workflow:0.5">
<parameters>
<!-- <property>-->
<!-- <name>workingPath</name>-->
<!-- <description>the working dir base path</description>-->
<!-- </property>-->
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="ReadCrossRefDump"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ReadCrossRefDump">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords</main-class>
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg>
<arg>--workingPath</arg><arg>/data/doiboost/crossref/</arg>
<arg>--crossrefFileNameTarGz</arg><arg>crossref.tar.gz</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="SparkReadCrossRefDump">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>SparkReadCrossRefDump</name>
<class>eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=20
--executor-memory=6G
--driver-memory=7G
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
</spark-opts>
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg>
<arg>--workingPath</arg><arg>/data/doiboost/crossref/</arg>
<arg>--crossrefFileNameTarGz</arg><arg>crossref.tar.gz</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -4,6 +4,11 @@ package eu.dnetlib.dhp.resulttoorganizationfrominstrepo;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
@ -22,11 +27,6 @@ import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Relation;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
public class PrepareResultInstRepoAssociation {
private static final Logger log = LoggerFactory.getLogger(PrepareResultInstRepoAssociation.class);
@ -56,9 +56,10 @@ public class PrepareResultInstRepoAssociation {
final String alreadyLinkedPath = parser.get("alreadyLinkedPath");
log.info("alreadyLinkedPath {}: ", alreadyLinkedPath);
List<String> blacklist = Optional.ofNullable(parser.get("blacklist"))
.map(v -> Arrays.asList(v.split(";")))
.orElse(new ArrayList<>());
List<String> blacklist = Optional
.ofNullable(parser.get("blacklist"))
.map(v -> Arrays.asList(v.split(";")))
.orElse(new ArrayList<>());
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
@ -91,21 +92,20 @@ public class PrepareResultInstRepoAssociation {
private static void prepareDatasourceOrganization(
SparkSession spark, String datasourceOrganizationPath, List<String> blacklist) {
String blacklisted = "";
if(blacklist.size() > 0 ){
blacklisted = " AND d.id != '" + blacklist.get(0) + "'";
if (blacklist.size() > 0) {
blacklisted = " AND ds.id != '" + blacklist.get(0) + "'";
for (int i = 1; i < blacklist.size(); i++) {
blacklisted += " AND d.id != '" + blacklist.get(i) + "'";
blacklisted += " AND ds.id != '" + blacklist.get(i) + "'";
}
}
String query = "SELECT source datasourceId, target organizationId "
+ "FROM ( SELECT id "
+ "FROM datasource "
+ "WHERE datasourcetype.classid = '"
+ "FROM datasource ds "
+ "WHERE ds.datasourcetype.classid = '"
+ INSTITUTIONAL_REPO_TYPE
+ "' "
+ "AND datainfo.deletedbyinference = false " + blacklisted + " ) d "
+ "AND ds.datainfo.deletedbyinference = false " + blacklisted + " ) d "
+ "JOIN ( SELECT source, target "
+ "FROM relation "
+ "WHERE lower(relclass) = '"