forked from D-Net/dnet-hadoop
Compare commits
12 Commits
9d617a0a58
...
066e1dc772
Author | SHA1 | Date |
---|---|---|
Miriam Baglioni | 066e1dc772 | |
Miriam Baglioni | 00dfaff973 | |
Miriam Baglioni | 464ac6301c | |
Miriam Baglioni | c07f820c21 | |
Miriam Baglioni | 2740b95f99 | |
Miriam Baglioni | ca7e10b3c0 | |
Miriam Baglioni | 2f6673e678 | |
Miriam Baglioni | 0eda93b3eb | |
Miriam Baglioni | 72771a1254 | |
Miriam Baglioni | 6cdc4d3bf3 | |
Miriam Baglioni | a106353cee | |
Miriam Baglioni | 5d8257b288 |
|
@ -77,7 +77,10 @@
|
|||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-compress</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-text</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
|
||||
package eu.dnetlib.dhp.ircdl_extention;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Result;
|
||||
|
||||
public class PrepareCrossrefSpark {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
PrepareCrossrefSpark.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/ircdl_extention/prepare_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
|
||||
final String inputPath = parser.get("inputPath");
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
|
||||
|
||||
runWithSparkHiveSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
Utils.removeOutputDir(spark, outputPath);
|
||||
selectResult(spark, inputPath, outputPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static Dataset<Result> selectResult(SparkSession spark, String input_path, String output_path) {
|
||||
Dataset<Result> res = Utils
|
||||
.readPath(
|
||||
spark, input_path, Result.class)
|
||||
.filter(
|
||||
(FilterFunction<Result>) r -> !r.getId().startsWith("50|dedup") &&
|
||||
r.getCf().stream().anyMatch(cf -> cf.getValue().equals("Crossref")));
|
||||
|
||||
res.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(output_path);
|
||||
return res;
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
|
||||
package eu.dnetlib.dhp.ircdl_extention;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Result;
|
||||
|
||||
public class PrepareDataciteSpark {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
PrepareDataciteSpark.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/ircdl_extention/prepare_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
|
||||
final String inputPath = parser.get("inputPath");
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
|
||||
|
||||
runWithSparkHiveSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
Utils.removeOutputDir(spark, outputPath);
|
||||
exec(spark, outputPath, inputPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static void exec(SparkSession spark, String output_path, String input_path) {
|
||||
|
||||
Dataset<Result> datacite = Utils
|
||||
.readPath(
|
||||
spark, input_path, Result.class)
|
||||
.filter(
|
||||
(FilterFunction<Result>) r -> r.getId().startsWith("50|datacite"));
|
||||
|
||||
datacite.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(output_path + "allDatacite");
|
||||
getProviderResult(output_path, datacite, "Zenodo");
|
||||
getProviderResult(output_path, datacite, "Figshare");
|
||||
getProviderResult(output_path, datacite, "Dryad");
|
||||
|
||||
}
|
||||
|
||||
private static void getProviderResult(String output_path, Dataset<Result> datacite, String provider) {
|
||||
datacite
|
||||
.filter(
|
||||
(FilterFunction<Result>) r -> r
|
||||
.getPid()
|
||||
.stream()
|
||||
.anyMatch(p -> p.getKey().equals("doi") && p.getValue().contains(provider.toLowerCase())))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(output_path + provider);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
|
||||
package eu.dnetlib.dhp.ircdl_extention;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Orcid;
|
||||
|
||||
public class PrepareNormalizedOrcid {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
PrepareNormalizedOrcid.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/ircdl_extention/prepare_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
|
||||
final String inputPath = parser.get("inputPath");
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
Utils.removeOutputDir(spark, outputPath);
|
||||
execNormalize(spark, outputPath, inputPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static void execNormalize(SparkSession spark, String outputPath, String inputPath) {
|
||||
Dataset<Orcid> orcid = Utils.readPath(spark, inputPath, Orcid.class);
|
||||
orcid.map((MapFunction<Orcid, Orcid>) o -> {
|
||||
o.setName(Utils.normalizeString(o.getName()));
|
||||
o.setSurname(Utils.normalizeString(o.getSurname()));
|
||||
o.setCreditname(Utils.normalizeString(o.getCreditname()));
|
||||
o
|
||||
.setOtherNames(
|
||||
o
|
||||
.getOtherNames()
|
||||
.stream()
|
||||
.map(on -> Utils.normalizeString(on))
|
||||
.collect(Collectors.toList()));
|
||||
return o;
|
||||
}, Encoders.bean(Orcid.class))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(outputPath);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
|
||||
package eu.dnetlib.dhp.ircdl_extention;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Result;
|
||||
|
||||
public class PrepareNormalizedResultSpark {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
PrepareNormalizedResultSpark.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/ircdl_extention/prepare_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
|
||||
final String inputPath = parser.get("inputPath");
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
|
||||
|
||||
runWithSparkHiveSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
Utils.removeOutputDir(spark, outputPath);
|
||||
execNormalize(spark, outputPath, inputPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static void execNormalize(SparkSession spark, String outputPath, String inputPath) {
|
||||
Dataset<Result> normalized_result = Utils
|
||||
.readPath(spark, inputPath + "publicationsWithOrcid", Result.class)
|
||||
.union(Utils.readPath(spark, inputPath + "datasetWithOrcid", Result.class))
|
||||
.union(Utils.readPath(spark, inputPath + "softwareWithOrcid", Result.class))
|
||||
.union(Utils.readPath(spark, inputPath + "otherWithOrcid", Result.class))
|
||||
.map((MapFunction<Result, Result>) r -> {
|
||||
r.setName(Utils.normalizeString(r.getName()));
|
||||
r.setSurname(Utils.normalizeString(r.getSurname()));
|
||||
r.setFullname(Utils.normalizeString(r.getFullname()));
|
||||
return r;
|
||||
}, Encoders.bean(Result.class));
|
||||
|
||||
normalized_result
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(outputPath + "ResultWithOrcid");
|
||||
|
||||
normalized_result
|
||||
.filter((FilterFunction<Result>) r -> !r.getId().startsWith("50|dedup"))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(outputPath + "collectedResultWithOrcid");
|
||||
|
||||
normalized_result
|
||||
.filter((FilterFunction<Result>) r -> !r.getDeletedbyinference())
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(outputPath + "notDeletedByInferenceResultWithOrcid");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
|
||||
package eu.dnetlib.dhp.ircdl_extention;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Result;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class PrepareResultAllTheRestSpark {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
PrepareResultAllTheRestSpark.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/ircdl_extention/prepare_alltherest_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
|
||||
final String inputPath = parser.get("inputPath");
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
|
||||
final String instRepoPath = parser.get("instRepoPath");
|
||||
final String crossrefPath = parser.get("crossrefPath");
|
||||
final String datacitePath = parser.get("datacitePath");
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
|
||||
|
||||
runWithSparkHiveSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
Utils.removeOutputDir(spark, outputPath + "allTheRest");
|
||||
exec(
|
||||
spark, outputPath + "allTheRest",
|
||||
inputPath, instRepoPath,
|
||||
datacitePath, crossrefPath);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Leggo tutti i result di crossref, datacite ed associati agli institutional repositories
|
||||
* Leggo tutti i result collezionati
|
||||
* faccio una left join tra i result collezionati e quelli letti al passo precedente
|
||||
* prendo quelli che non hanno un match nella join
|
||||
* @param spark
|
||||
* @param output_path
|
||||
* @param result_path
|
||||
*/
|
||||
private static void exec(SparkSession spark, String output_path, String result_path, String inst_repo_path,
|
||||
String datacite_path, String crossref_path) {
|
||||
|
||||
Dataset<Result> result = Utils.readPath(spark, result_path, Result.class);
|
||||
|
||||
Dataset<Result> inst_repo = Utils
|
||||
.readPath(spark, inst_repo_path, Result.class);
|
||||
|
||||
Dataset<Result> datacite = Utils
|
||||
.readPath(spark, datacite_path, Result.class);
|
||||
|
||||
Dataset<Result> crossref = Utils
|
||||
.readPath(spark, crossref_path, Result.class);
|
||||
|
||||
Dataset<Result> union_dataset = inst_repo.union(datacite).union(crossref);
|
||||
|
||||
result
|
||||
.joinWith(union_dataset, result.col("id").equalTo(union_dataset.col("id")), "left")
|
||||
.map((MapFunction<Tuple2<Result, Result>, Result>) t2 -> {
|
||||
if (!Optional.ofNullable(t2._2()).isPresent())
|
||||
return t2._1();
|
||||
return null;
|
||||
}, Encoders.bean(Result.class))
|
||||
.filter(Objects::nonNull)
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(output_path);
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,92 @@
|
|||
|
||||
package eu.dnetlib.dhp.ircdl_extention;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||
|
||||
public class PrepareResultFromInstRepo {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
PrepareResultFromInstRepo.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/ircdl_extention/prepare_instrepo_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
|
||||
final String inputPath = parser.get("inputPath");
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
|
||||
final String datasourcePath = parser.get("datasourcePath");
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
|
||||
|
||||
runWithSparkHiveSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
Utils.removeOutputDir(spark, outputPath);
|
||||
selectResultFromInstRepo(spark, inputPath, outputPath, datasourcePath);
|
||||
});
|
||||
}
|
||||
|
||||
private static void selectResultFromInstRepo(SparkSession spark, String inputPath, String output_path,
|
||||
String datasourcePath) {
|
||||
Dataset<Datasource> datasource = Utils.readPath(spark, datasourcePath, Datasource.class);
|
||||
Dataset<Result> res = Utils
|
||||
.readPath(
|
||||
spark, inputPath, Result.class)
|
||||
.filter(
|
||||
(FilterFunction<Result>) r -> !r.getId().startsWith("50|doiboost")
|
||||
&& !r.getId().startsWith("50|scholix")
|
||||
&& !r.getId().startsWith("50|datacite")
|
||||
&& !r.getId().startsWith("50|dedup"));
|
||||
|
||||
datasource.createOrReplaceTempView("datasource");
|
||||
res.createOrReplaceTempView("result");
|
||||
|
||||
spark
|
||||
.sql(
|
||||
"SELECT t.id, t.deletedbyinference, t.name, t.surname, t.cf, t.fullname, t.pid, t.oid " +
|
||||
"FROM " +
|
||||
"(Select * " +
|
||||
"from result " +
|
||||
"LATERAL VIEW explode(cf.key) c as cfromkey) as t " +
|
||||
"join " +
|
||||
"datasource d " +
|
||||
"on " +
|
||||
"d.id = t.cfromkey " +
|
||||
"and d.datasourcetype.classid = 'pubsrepository::institutional'")
|
||||
.as(Encoders.bean(Result.class))
|
||||
.write()
|
||||
.option("compressio", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(output_path);
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,119 @@
|
|||
|
||||
package eu.dnetlib.dhp.ircdl_extention;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.KeyValue;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Result;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
|
||||
public class PrepareResultSpark {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
PrepareResultSpark.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/ircdl_extention/prepare_result_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
final String resultClassName = parser.get("resultClass");
|
||||
|
||||
Class<? extends eu.dnetlib.dhp.schema.oaf.Result> resultClazz = (Class<? extends eu.dnetlib.dhp.schema.oaf.Result>) Class
|
||||
.forName(resultClassName);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
|
||||
final String inputPath = parser.get("inputPath");
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
Utils.removeOutputDir(spark, outputPath);
|
||||
mapToResult(spark, inputPath, resultClazz, outputPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static <R extends eu.dnetlib.dhp.schema.oaf.Result> void mapToResult(SparkSession spark,
|
||||
String input_path,
|
||||
Class<R> resultClazz, String output_path) {
|
||||
Dataset<R> publicationDataset = Utils.readPath(spark, input_path, resultClazz);
|
||||
Dataset<Result> result = publicationDataset.filter((FilterFunction<R>) p -> {
|
||||
if (p.getAuthor() == null)
|
||||
return false;
|
||||
if (p.getAuthor().size() == 0)
|
||||
return false;
|
||||
return true;
|
||||
})
|
||||
.flatMap((FlatMapFunction<R, Result>) p -> {
|
||||
List<Result> reslist = new ArrayList<>();
|
||||
p.getAuthor().forEach(a -> {
|
||||
a.getPid().forEach(apid -> {
|
||||
if (apid.getQualifier().getClassid().equals(ModelConstants.ORCID)
|
||||
|| apid.getQualifier().getClassid().equals(ModelConstants.ORCID_PENDING)) {
|
||||
Result r = new Result();
|
||||
r.setDeletedbyinference(p.getDataInfo().getDeletedbyinference());
|
||||
r.setId(p.getId());
|
||||
r
|
||||
.setCf(
|
||||
p
|
||||
.getCollectedfrom()
|
||||
.stream()
|
||||
.map(cf -> KeyValue.newInstance(cf.getKey(), cf.getValue()))
|
||||
.collect(Collectors.toList()));
|
||||
r
|
||||
.setPid(
|
||||
p
|
||||
.getPid()
|
||||
.stream()
|
||||
.map(
|
||||
pid -> KeyValue
|
||||
.newInstance(pid.getQualifier().getClassid(), pid.getValue()))
|
||||
.collect(Collectors.toList()));
|
||||
r.setName(a.getName());
|
||||
r.setSurname(a.getSurname());
|
||||
r.setFullname(a.getFullname());
|
||||
r.setOid(apid.getValue());
|
||||
reslist.add(r);
|
||||
}
|
||||
});
|
||||
|
||||
});
|
||||
return reslist.iterator();
|
||||
}, Encoders.bean(Result.class));
|
||||
result
|
||||
.write()
|
||||
.option("compressio", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(output_path);
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,199 @@
|
|||
|
||||
package eu.dnetlib.dhp.ircdl_extention;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.text.Normalizer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.text.similarity.CosineDistance;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.wcohen.ss.JaroWinkler;
|
||||
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Orcid;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Result;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class Utils implements Serializable {
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static String normalizeString(String input) {
|
||||
if (input == null || input.equals("void"))
|
||||
return new String();
|
||||
String tmp = Normalizer
|
||||
.normalize(input, Normalizer.Form.NFKD)
|
||||
.replaceAll("[^\\p{ASCII}]", "");
|
||||
tmp = tmp
|
||||
.replaceAll("[^\\p{Alpha}]+", " ")
|
||||
.replaceAll("\\s+", " ")
|
||||
.trim();
|
||||
return tmp;
|
||||
|
||||
}
|
||||
|
||||
public static void removeOutputDir(SparkSession spark, String path) {
|
||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||
}
|
||||
|
||||
public static <R> Dataset<R> readPath(
|
||||
SparkSession spark, String inputPath, Class<R> clazz) {
|
||||
return spark
|
||||
.read()
|
||||
.textFile(inputPath)
|
||||
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||
}
|
||||
|
||||
private static List<String> getList(List<String> input) {
|
||||
return input.stream().map(st -> st.trim()).filter(st -> st.length() > 0).collect(Collectors.toList());
|
||||
|
||||
}
|
||||
|
||||
public static boolean filterFunction(Tuple2<Result, Orcid> input) {
|
||||
|
||||
List<String> res = getList(Arrays.asList(input._1().getFullname().split(" ")))
|
||||
.stream()
|
||||
.sorted()
|
||||
.collect(Collectors.toList());
|
||||
Orcid or = input._2();
|
||||
List<String> tmp = new ArrayList<>();
|
||||
Collections.addAll(tmp, or.getName().split(" "));
|
||||
Collections.addAll(tmp, or.getSurname().split(" "));
|
||||
return checkContains(
|
||||
res, getList(tmp)
|
||||
.stream()
|
||||
.sorted()
|
||||
.collect(Collectors.toList()))
|
||||
||
|
||||
checkContains(
|
||||
res, getList(Arrays.asList(or.getCreditname().split(" ")))
|
||||
.stream()
|
||||
.sorted()
|
||||
.collect(Collectors.toList()))
|
||||
||
|
||||
or
|
||||
.getOtherNames()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
on -> checkContains(
|
||||
res, getList(Arrays.asList(on.split(" ")))
|
||||
.stream()
|
||||
.sorted()
|
||||
.collect(Collectors.toList())));
|
||||
}
|
||||
|
||||
private static boolean checkContains(List<String> result, List<String> orcid) {
|
||||
if (result.size() == 0 || orcid.size() == 0) {
|
||||
return true;
|
||||
}
|
||||
String[][] input = {
|
||||
{
|
||||
"1", StringUtils.joinWith(" ", result)
|
||||
},
|
||||
{
|
||||
"2", StringUtils.joinWith(" ", orcid)
|
||||
}
|
||||
};
|
||||
// exact match word by word
|
||||
Double cosineDistance = new CosineDistance().apply(input[0][1], input[1][1]);
|
||||
if (Math.round((1 - cosineDistance) * 100) == 100) {
|
||||
return true;
|
||||
}
|
||||
// check containment one list can be greater than the other, and also composition of words to create the name
|
||||
// e.g. pengli yan = li peng yan
|
||||
if (orcid.size() < result.size()) {
|
||||
if (isIn(orcid, result))
|
||||
return true;
|
||||
} else {
|
||||
if (isIn(result, orcid))
|
||||
return true;
|
||||
}
|
||||
// apply JaroWinkler distance
|
||||
double score = new JaroWinkler()
|
||||
.score(StringUtils.joinWith(" ", result), StringUtils.joinWith(" ", orcid));
|
||||
return score > 0.95;
|
||||
}
|
||||
|
||||
private static boolean isIn(List<String> lst1, List<String> lst2) {
|
||||
|
||||
int index = 0;
|
||||
for (String word : lst1) {
|
||||
int i = index;
|
||||
boolean found = false;
|
||||
while (i < lst2.size()) {
|
||||
String wordlist = lst2.get(i);
|
||||
if (word.equals(wordlist)) {
|
||||
index = i + 1;
|
||||
i = lst2.size();
|
||||
found = true;
|
||||
} else {
|
||||
if (word.charAt(0) < wordlist.charAt(0)) {
|
||||
if (!checkComposition(word, lst2)) {
|
||||
return false;
|
||||
} else {
|
||||
index = 0;
|
||||
i = lst2.size();
|
||||
found = true;
|
||||
}
|
||||
} else {
|
||||
if (word.length() == 1 || wordlist.length() == 1) {
|
||||
if (word.charAt(0) == wordlist.charAt(0)) {
|
||||
index = i + 1;
|
||||
i = lst2.size();
|
||||
found = true;
|
||||
} else {
|
||||
i++;
|
||||
}
|
||||
} else {
|
||||
i++;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
if (!found) {
|
||||
if (!checkComposition(word, lst2)) {
|
||||
return false;
|
||||
} else {
|
||||
index = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private static boolean checkComposition(String word, List<String> lst2) {
|
||||
for (int i = 0; i < lst2.size(); i++) {
|
||||
for (int j = 0; j < lst2.size(); j++) {
|
||||
if (i != j) {
|
||||
String w = lst2.get(i) + lst2.get(j);
|
||||
if (word.equals(w)) {
|
||||
if (i > j) {
|
||||
lst2.remove(i);
|
||||
lst2.remove(j);
|
||||
} else {
|
||||
lst2.remove(j);
|
||||
lst2.remove(i);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,154 @@
|
|||
|
||||
package eu.dnetlib.dhp.ircdl_extention;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.FlatMapGroupsFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Orcid;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Result;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.ShuffleInfo;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class WrongSpark {
|
||||
/**
|
||||
* takes as input the orcid normalized and the entry normalized to be checked against orcid
|
||||
* returns the lower bound of wrong attribution
|
||||
*/
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
WrongSpark.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/ircdl_extention/wrong_orcid_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
|
||||
final String orcidPath = parser.get("orcidPath");
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
|
||||
final String resultPath = parser.get("inputPath");
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
|
||||
|
||||
runWithSparkHiveSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
Utils.removeOutputDir(spark, outputPath);
|
||||
findWrong(spark, orcidPath, outputPath + "/wrong", resultPath);
|
||||
findShuffle(spark, orcidPath, outputPath + "/shuffle", resultPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static void findShuffle(SparkSession spark, String orcidPath, String outputPath, String resultPath) {
|
||||
|
||||
Utils
|
||||
.readPath(spark, resultPath, Result.class)
|
||||
.map(
|
||||
(MapFunction<Result, ShuffleInfo>) r -> ShuffleInfo
|
||||
.newInstance(r.getName(), r.getSurname(), r.getFullname(), r.getId()),
|
||||
Encoders.bean(ShuffleInfo.class))
|
||||
.union(
|
||||
getWrong(spark, orcidPath, resultPath)
|
||||
.map((MapFunction<Tuple2<Result, Orcid>, ShuffleInfo>) t2 ->
|
||||
|
||||
ShuffleInfo
|
||||
.newInstance(
|
||||
t2._1().getName(), t2._1().getSurname(), t2._1().getFullname(),
|
||||
t2._1().getId(), t2._2().getName(), t2._2().getSurname(),
|
||||
t2._2().getCreditname(), t2._2().getOtherNames()),
|
||||
Encoders.bean(ShuffleInfo.class)))
|
||||
.groupByKey((MapFunction<ShuffleInfo, String>) si -> si.getId(), Encoders.STRING())
|
||||
.flatMapGroups((FlatMapGroupsFunction<String, ShuffleInfo, ShuffleInfo>) (s, it) -> {
|
||||
List<ShuffleInfo> shuffleInfoList = new ArrayList();
|
||||
List<ShuffleInfo> ret = new ArrayList<>();
|
||||
shuffleInfoList.add(it.next());
|
||||
it.forEachRemaining(e -> shuffleInfoList.add(e));
|
||||
shuffleInfoList
|
||||
.stream()
|
||||
.filter(e -> Optional.ofNullable(e.getOrcid()).isPresent())
|
||||
.forEach(e -> {
|
||||
if (checkShuffle(e, shuffleInfoList))
|
||||
ret.add(e);
|
||||
});
|
||||
return ret.iterator();
|
||||
}, Encoders.bean(ShuffleInfo.class))
|
||||
.filter(Objects::nonNull);
|
||||
/*
|
||||
* def checkShuffle(x): alis = [a for a in x[1]] dic = {} count = 0 for entry in alis: if entry['orcid'] != '':
|
||||
* dic[entry['orcid']] = entry for orcid in dic: name = dic[orcid]['oname'] surname = dic[orcid]['osurname'] for
|
||||
* author in alis: if author['aname'] == "" or author['asurname'] == "": if checkContains([author['afullname']],
|
||||
* addInListAll([], name + " " + surname)): count += 1 break else: if checkContains([author['aname'] + " " +
|
||||
* author['asurname']], addInListAll([], name + " " + surname)): count += 1 break return count
|
||||
*/
|
||||
// candidate_shuffle = zenodo_normalized.map(lambda x: (x['id'], {'aname':x['name'], 'asurname': x['surname'],
|
||||
// 'afullname': x['fullname'], 'oname':"", 'osurname':"", 'orcid':''})). union (
|
||||
// join_orcid_filtered.map(lambda e: (e['id'], {'aname':e['nameg'], 'asurname':e['surnameg'],
|
||||
// 'afullname':e['fullnameg'], 'oname':e['name'],
|
||||
// 'osurname':e['surname'],'orcid':e['orcid']}))).groupByKey().filter(toBeChecked)
|
||||
}
|
||||
|
||||
private static boolean checkShuffle(ShuffleInfo e, List<ShuffleInfo> shuffleInfoList) {
|
||||
|
||||
return shuffleInfoList
|
||||
.stream()
|
||||
.anyMatch(
|
||||
si -> Utils
|
||||
.filterFunction(
|
||||
new Tuple2<>(Result.newInstance(si.getAfullname()),
|
||||
Orcid
|
||||
.newInstance(
|
||||
si.getOname(), si.getOsurname(), si.getOcreditName(), si.getoOtherNames()))));
|
||||
}
|
||||
|
||||
private static Dataset<Tuple2<Result, Orcid>> getWrong(SparkSession spark, String orcidPath, String resultPath) {
|
||||
Dataset<Orcid> orcidDataset = Utils
|
||||
.readPath(spark, orcidPath, Orcid.class)
|
||||
.filter((FilterFunction<Orcid>) o -> !o.getName().contains("deactivated"));
|
||||
Dataset<Result> resultDataset = Utils.readPath(spark, resultPath, Result.class);
|
||||
|
||||
return resultDataset
|
||||
.joinWith(
|
||||
orcidDataset, resultDataset
|
||||
.col("oid")
|
||||
.equalTo(orcidDataset.col("orcid")),
|
||||
"inner")
|
||||
.filter((FilterFunction<Tuple2<Result, Orcid>>) t2 -> !Utils.filterFunction(t2));
|
||||
}
|
||||
|
||||
private static void findWrong(SparkSession spark, String orcidPath, String outputPath, String resultPath) {
|
||||
getWrong(spark, orcidPath, resultPath)
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(outputPath);
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
|
||||
package eu.dnetlib.dhp.ircdl_extention.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class KeyValue implements Serializable {
|
||||
private String key;
|
||||
private String value;
|
||||
|
||||
public static KeyValue newInstance(String key, String value) {
|
||||
KeyValue kv = new KeyValue();
|
||||
kv.key = key;
|
||||
kv.value = value;
|
||||
|
||||
return kv;
|
||||
}
|
||||
|
||||
public String getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
public void setKey(String key) {
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
public String getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public void setValue(String value) {
|
||||
this.value = value;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
|
||||
package eu.dnetlib.dhp.ircdl_extention.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
public class Orcid implements Serializable {
|
||||
private List<String> otherNames;
|
||||
private String inception;
|
||||
private String surname;
|
||||
private String mode;
|
||||
private String creditname;
|
||||
private String orcid;
|
||||
private Boolean works;
|
||||
private String name;
|
||||
|
||||
public static Orcid newInstance(String oname, String osurname, String ocreditName, List<String> getoOtherNames) {
|
||||
Orcid o = new Orcid();
|
||||
o.name = oname;
|
||||
o.surname = osurname;
|
||||
o.creditname = ocreditName;
|
||||
o.otherNames = getoOtherNames;
|
||||
|
||||
return o;
|
||||
}
|
||||
|
||||
public List<String> getOtherNames() {
|
||||
return otherNames;
|
||||
}
|
||||
|
||||
public void setOtherNames(List<String> otherNames) {
|
||||
this.otherNames = otherNames;
|
||||
}
|
||||
|
||||
public String getInception() {
|
||||
return inception;
|
||||
}
|
||||
|
||||
public void setInception(String inception) {
|
||||
this.inception = inception;
|
||||
}
|
||||
|
||||
public String getSurname() {
|
||||
return surname;
|
||||
}
|
||||
|
||||
public void setSurname(String surname) {
|
||||
this.surname = surname;
|
||||
}
|
||||
|
||||
public String getMode() {
|
||||
return mode;
|
||||
}
|
||||
|
||||
public void setMode(String mode) {
|
||||
this.mode = mode;
|
||||
}
|
||||
|
||||
public String getCreditname() {
|
||||
return creditname;
|
||||
}
|
||||
|
||||
public void setCreditname(String creditname) {
|
||||
this.creditname = creditname;
|
||||
}
|
||||
|
||||
public String getOrcid() {
|
||||
return orcid;
|
||||
}
|
||||
|
||||
public void setOrcid(String oid) {
|
||||
this.orcid = oid;
|
||||
}
|
||||
|
||||
public Boolean getWorks() {
|
||||
return works;
|
||||
}
|
||||
|
||||
public void setWorks(Boolean works) {
|
||||
this.works = works;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
|
||||
package eu.dnetlib.dhp.ircdl_extention.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
public class Result implements Serializable {
|
||||
private Boolean deletedbyinference;
|
||||
private String id;
|
||||
private List<KeyValue> cf;
|
||||
private List<KeyValue> pid;
|
||||
private String name;
|
||||
private String surname;
|
||||
private String fullname;
|
||||
private String oid;
|
||||
|
||||
public static Result newInstance(String afullname) {
|
||||
Result r = new Result();
|
||||
r.fullname = afullname;
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
public Boolean getDeletedbyinference() {
|
||||
return deletedbyinference;
|
||||
}
|
||||
|
||||
public void setDeletedbyinference(Boolean deletedbyinference) {
|
||||
this.deletedbyinference = deletedbyinference;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public List<KeyValue> getCf() {
|
||||
return cf;
|
||||
}
|
||||
|
||||
public void setCf(List<KeyValue> cf) {
|
||||
this.cf = cf;
|
||||
}
|
||||
|
||||
public List<KeyValue> getPid() {
|
||||
return pid;
|
||||
}
|
||||
|
||||
public void setPid(List<KeyValue> pid) {
|
||||
this.pid = pid;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
if (name != null)
|
||||
this.name = name.toLowerCase();
|
||||
else
|
||||
this.name = new String();
|
||||
}
|
||||
|
||||
public String getSurname() {
|
||||
return surname;
|
||||
}
|
||||
|
||||
public void setSurname(String surname) {
|
||||
if (surname != null)
|
||||
this.surname = surname.toLowerCase();
|
||||
else
|
||||
this.surname = new String();
|
||||
}
|
||||
|
||||
public String getFullname() {
|
||||
return fullname;
|
||||
}
|
||||
|
||||
public void setFullname(String fullname) {
|
||||
if (fullname != null)
|
||||
this.fullname = fullname.toLowerCase();
|
||||
else
|
||||
this.fullname = new String();
|
||||
}
|
||||
|
||||
public String getOid() {
|
||||
return oid;
|
||||
}
|
||||
|
||||
public void setOid(String oid) {
|
||||
this.oid = oid;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,114 @@
|
|||
|
||||
package eu.dnetlib.dhp.ircdl_extention.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
public class ShuffleInfo implements Serializable {
|
||||
|
||||
private String aname;
|
||||
private String asurname;
|
||||
private String afullname;
|
||||
private String oname;
|
||||
private String osurname;
|
||||
private String ocreditName;
|
||||
private List<String> oOtherNames;
|
||||
private String orcid;
|
||||
private String id;
|
||||
|
||||
public String getAname() {
|
||||
return aname;
|
||||
}
|
||||
|
||||
public void setAname(String aname) {
|
||||
this.aname = aname;
|
||||
}
|
||||
|
||||
public String getAsurname() {
|
||||
return asurname;
|
||||
}
|
||||
|
||||
public void setAsurname(String asurname) {
|
||||
this.asurname = asurname;
|
||||
}
|
||||
|
||||
public String getAfullname() {
|
||||
return afullname;
|
||||
}
|
||||
|
||||
public void setAfullname(String afullname) {
|
||||
this.afullname = afullname;
|
||||
}
|
||||
|
||||
public String getOname() {
|
||||
return oname;
|
||||
}
|
||||
|
||||
public void setOname(String oname) {
|
||||
this.oname = oname;
|
||||
}
|
||||
|
||||
public String getOsurname() {
|
||||
return osurname;
|
||||
}
|
||||
|
||||
public void setOsurname(String osurname) {
|
||||
this.osurname = osurname;
|
||||
}
|
||||
|
||||
public String getOcreditName() {
|
||||
return ocreditName;
|
||||
}
|
||||
|
||||
public void setOcreditName(String ocreditName) {
|
||||
this.ocreditName = ocreditName;
|
||||
}
|
||||
|
||||
public List<String> getoOtherNames() {
|
||||
return oOtherNames;
|
||||
}
|
||||
|
||||
public void setoOtherNames(List<String> oOtherNames) {
|
||||
this.oOtherNames = oOtherNames;
|
||||
}
|
||||
|
||||
public String getOrcid() {
|
||||
return orcid;
|
||||
}
|
||||
|
||||
public void setOrcid(String orcid) {
|
||||
this.orcid = orcid;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public static ShuffleInfo newInstance(String aname, String asurname, String afullname, String id) {
|
||||
ShuffleInfo si = new ShuffleInfo();
|
||||
si.afullname = afullname;
|
||||
si.aname = aname;
|
||||
si.asurname = asurname;
|
||||
si.id = id;
|
||||
return si;
|
||||
}
|
||||
|
||||
public static ShuffleInfo newInstance(String aname, String asurname, String afullname, String id, String oname,
|
||||
String osurname, String ocredtname, List<String> oOthername) {
|
||||
ShuffleInfo si = new ShuffleInfo();
|
||||
si.afullname = afullname;
|
||||
si.aname = aname;
|
||||
si.asurname = asurname;
|
||||
si.id = id;
|
||||
si.oname = oname;
|
||||
si.osurname = osurname;
|
||||
si.ocreditName = ocredtname;
|
||||
si.oOtherNames = oOthername;
|
||||
return si;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hive_metastore_uris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorNumber</name>
|
||||
<value>4</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<value>/user/spark/spark2ApplicationHistory</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<value>15G</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<value>6G</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<value>1</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,479 @@
|
|||
<workflow-app name="IRCDL Extention" xmlns="uri:oozie:workflow:0.5">
|
||||
|
||||
|
||||
<start to="deleteoutputpath"/>
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
<action name="deleteoutputpath">
|
||||
<fs>
|
||||
<delete path='${outputPath}'/>
|
||||
<mkdir path='${outputPath}'/>
|
||||
<delete path='${workingDir}'/>
|
||||
<mkdir path='${workingDir}'/>
|
||||
</fs>
|
||||
<ok to="fork_prepare"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<fork name="fork_prepare">
|
||||
<path start="fork_prepare_result"/>
|
||||
<path start="prepare_orcid"/>
|
||||
</fork>
|
||||
|
||||
|
||||
<action name="prepare_orcid">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>PrepareResult</name>
|
||||
<class>eu.dnetlib.dhp.ircdl_extention.PrepareNormalizedOrcid</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${orcidInputPath}</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
|
||||
</spark>
|
||||
<ok to="join_fork"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<fork name="fork_prepare_result">
|
||||
<path start="prepare_publication"/>
|
||||
<path start="prepare_dataset"/>
|
||||
<path start="prepare_software"/>
|
||||
<path start="prepare_other"/>
|
||||
</fork>
|
||||
|
||||
<action name="prepare_publication">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>PrepareResult</name>
|
||||
<class>eu.dnetlib.dhp.ircdl_extention.PrepareResultSpark</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${inputPath}/publication</arg>
|
||||
<arg>--resultClass</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/publicationsWithOrcid</arg>
|
||||
</spark>
|
||||
<ok to="wait_prepare_result"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="prepare_dataset">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>PrepareResult</name>
|
||||
<class>eu.dnetlib.dhp.ircdl_extention.PrepareResultSpark</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${inputPath}/dataset</arg>
|
||||
<arg>--resultClass</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/datasetWithOrcid</arg>
|
||||
</spark>
|
||||
<ok to="wait_prepare_result"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="prepare_software">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>PrepareResult</name>
|
||||
<class>eu.dnetlib.dhp.ircdl_extention.PrepareResultSpark</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${inputPath}/software</arg>
|
||||
<arg>--resultClass</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/softwareWithOrcid</arg>
|
||||
</spark>
|
||||
<ok to="wait_prepare_result"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="prepare_other">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>PrepareResult</name>
|
||||
<class>eu.dnetlib.dhp.ircdl_extention.PrepareResultSpark</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg>
|
||||
<arg>--resultClass</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/otherWithOrcid</arg>
|
||||
</spark>
|
||||
<ok to="wait_prepare_result"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<join name="wait_prepare_result" to="normalize_result"/>
|
||||
|
||||
<action name="normalize_result">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>PrepareResult</name>
|
||||
<class>eu.dnetlib.dhp.ircdl_extention.PrepareNormalizedResultSpark</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/Normalized/</arg>
|
||||
</spark>
|
||||
<ok to="fork_get_result_info"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<fork name="fork_get_result_info">
|
||||
<path start="get_result_instrepo"/>
|
||||
<path start="get_result_datacite"/>
|
||||
<path start="get_result_crossref"/>
|
||||
</fork>
|
||||
|
||||
|
||||
<action name="get_result_instrepo">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GetResultInstRepo</name>
|
||||
<class>eu.dnetlib.dhp.ircdl_extention.PrepareResultFromInstRepo</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Normalized/ResultWithOrcid/</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/InstRepo/</arg>
|
||||
<arg>--datasourcePath</arg><arg>${datasourcePath}</arg>
|
||||
</spark>
|
||||
<ok to="wait_res_info"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="get_result_datacite">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GetResultInstRepo</name>
|
||||
<class>eu.dnetlib.dhp.ircdl_extention.PrepareDataciteSpark</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Normalized/ResultWithOrcid/</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/Datacite/</arg>
|
||||
</spark>
|
||||
<ok to="wait_res_info"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="get_result_crossref">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GetResultInstRepo</name>
|
||||
<class>eu.dnetlib.dhp.ircdl_extention.PrepareCrossrefSpark</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Normalized/ResultWithOrcid/</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/Crossref/</arg>
|
||||
</spark>
|
||||
<ok to="wait_res_info"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<join name="wait_res_info" to="get_result_alltherest"/>
|
||||
|
||||
<action name="get_result_alltherest">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GetResultInstRepo</name>
|
||||
<class>eu.dnetlib.dhp.ircdl_extention.PrepareResultAllTheRestSpark</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Normalized/ResultWithOrcid/</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/AllTheRest/</arg>
|
||||
<arg>--instRepoPath</arg><arg>${workingDir}/GRAPH/InstRepo/</arg>
|
||||
<arg>--datacitePath</arg><arg>${workingDir}/GRAPH/Datacite/</arg>
|
||||
<arg>--crossrefPath</arg><arg>${workingDir}/GRAPH/Crossref/</arg>
|
||||
</spark>
|
||||
<ok to="join_fork"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="join_fork" to="fork_get_wrong"/>
|
||||
|
||||
<fork name="fork_get_wrong">
|
||||
<path start="get_wrong_instrepo"/>
|
||||
<path start="get_wrong_datacite"/>
|
||||
<path start="get_wrong_crossref"/>
|
||||
<path start="get_wrong_alltherest"/>
|
||||
<path start="get_wrong_zenodo"/>
|
||||
<path start="get_wrong_figshare"/>
|
||||
<path start="get_wrong_dryad"/>
|
||||
</fork>
|
||||
|
||||
<action name="get_wrong_instrepo">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GetResultInstRepo</name>
|
||||
<class>eu.dnetlib.dhp.ircdl_extention.WrongSpark</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/InstRepo/</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/InstRepo/</arg>
|
||||
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
|
||||
</spark>
|
||||
<ok to="jojn_wrong"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="get_wrong_datacite">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GetResultInstRepo</name>
|
||||
<class>eu.dnetlib.dhp.ircdl_extention.WrongSpark</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Datacite/allDatacite/</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
|
||||
</spark>
|
||||
<ok to="jojn_wrong"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="get_wrong_crossref">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GetResultInstRepo</name>
|
||||
<class>eu.dnetlib.dhp.ircdl_extention.WrongSpark</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Crossref/</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/Crossref/</arg>
|
||||
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
|
||||
</spark>
|
||||
<ok to="jojn_wrong"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="get_wrong_alltherest">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GetResultInstRepo</name>
|
||||
<class>eu.dnetlib.dhp.ircdl_extention.WrongSpark</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/AllTheRest/</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/AllTheRest/</arg>
|
||||
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
|
||||
</spark>
|
||||
<ok to="jojn_wrong"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="get_wrong_zenodo">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GetResultInstRepo</name>
|
||||
<class>eu.dnetlib.dhp.ircdl_extention.WrongSpark</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Datacite/Zenodo/</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/Zenodo/</arg>
|
||||
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
|
||||
</spark>
|
||||
<ok to="jojn_wrong"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="get_wrong_figshare">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GetResultInstRepo</name>
|
||||
<class>eu.dnetlib.dhp.ircdl_extention.WrongSpark</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Datacite/Figshare/</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/Figshare/</arg>
|
||||
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
|
||||
</spark>
|
||||
<ok to="jojn_wrong"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="get_wrong_dryad">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GetResultInstRepo</name>
|
||||
<class>eu.dnetlib.dhp.ircdl_extention.WrongSpark</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Datacite/Dryad/</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/Dryad/</arg>
|
||||
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
|
||||
</spark>
|
||||
<ok to="jojn_wrong"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<join name="jojn_wrong" to="End"/>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -0,0 +1,35 @@
|
|||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "ip",
|
||||
"paramLongName": "inputPath",
|
||||
"paramDescription": "the URL from where to get the programme file",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "op",
|
||||
"paramLongName": "outputPath",
|
||||
"paramDescription": "the path of the new ActionSet",
|
||||
"paramRequired": true
|
||||
},{
|
||||
"paramName": "ir",
|
||||
"paramLongName": "instRepoPath",
|
||||
"paramDescription": "the path of the new ActionSet",
|
||||
"paramRequired": true
|
||||
},{
|
||||
"paramName": "dp",
|
||||
"paramLongName": "datacitePath",
|
||||
"paramDescription": "the path of the new ActionSet",
|
||||
"paramRequired": true
|
||||
},{
|
||||
"paramName": "cp",
|
||||
"paramLongName": "crossrefPath",
|
||||
"paramDescription": "the path of the new ActionSet",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,26 @@
|
|||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "ip",
|
||||
"paramLongName": "inputPath",
|
||||
"paramDescription": "the URL from where to get the programme file",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "op",
|
||||
"paramLongName": "outputPath",
|
||||
"paramDescription": "the path of the new ActionSet",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "dp",
|
||||
"paramLongName": "datasourcePath",
|
||||
"paramDescription": "the path of the new ActionSet",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,20 @@
|
|||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "ip",
|
||||
"paramLongName": "inputPath",
|
||||
"paramDescription": "the URL from where to get the programme file",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "op",
|
||||
"paramLongName": "outputPath",
|
||||
"paramDescription": "the path of the new ActionSet",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,26 @@
|
|||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "ip",
|
||||
"paramLongName": "inputPath",
|
||||
"paramDescription": "the URL from where to get the programme file",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "op",
|
||||
"paramLongName": "outputPath",
|
||||
"paramDescription": "the path of the new ActionSet",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "rc",
|
||||
"paramLongName": "resultClass",
|
||||
"paramDescription": "the path of the new ActionSet",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,26 @@
|
|||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "op",
|
||||
"paramLongName": "orcidPath",
|
||||
"paramDescription": "the URL from where to get the programme file",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "op",
|
||||
"paramLongName": "outputPath",
|
||||
"paramDescription": "the path of the new ActionSet",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "ip",
|
||||
"paramLongName": "inputPath",
|
||||
"paramDescription": "thepath of the new ActionSet",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,98 @@
|
|||
|
||||
package eu.dnetlib.dhp.ircdl_extention;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.actionmanager.project.SparkAtomicActionJob;
|
||||
import eu.dnetlib.dhp.actionmanager.project.SparkUpdateProjectTest;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Orcid;
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
|
||||
public class NormalizeOrcidTest {
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static final ClassLoader cl = eu.dnetlib.dhp.ircdl_extention.NormalizeOrcidTest.class
|
||||
.getClassLoader();
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
private static Path workingDir;
|
||||
private static final Logger log = LoggerFactory
|
||||
.getLogger(eu.dnetlib.dhp.ircdl_extention.NormalizeOrcidTest.class);
|
||||
|
||||
@BeforeAll
|
||||
public static void beforeAll() throws IOException {
|
||||
workingDir = Files
|
||||
.createTempDirectory(eu.dnetlib.dhp.ircdl_extention.NormalizeOrcidTest.class.getSimpleName());
|
||||
log.info("using work dir {}", workingDir);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.setAppName(eu.dnetlib.dhp.ircdl_extention.NormalizeOrcidTest.class.getSimpleName());
|
||||
|
||||
conf.setMaster("local[*]");
|
||||
conf.set("spark.driver.host", "localhost");
|
||||
// conf.set("hive.metastore.local", "true");
|
||||
conf.set("spark.ui.enabled", "false");
|
||||
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||
// conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||
|
||||
spark = SparkSession
|
||||
.builder()
|
||||
.appName(NormalizeOrcidTest.class.getSimpleName())
|
||||
.config(conf)
|
||||
.getOrCreate();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void afterAll() throws IOException {
|
||||
FileUtils.deleteDirectory(workingDir.toFile());
|
||||
spark.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void normalizeOrcid() throws Exception {
|
||||
PrepareNormalizedOrcid
|
||||
.main(
|
||||
new String[] {
|
||||
"-isSparkSessionManaged",
|
||||
Boolean.FALSE.toString(),
|
||||
"-inputPath",
|
||||
getClass()
|
||||
.getResource(
|
||||
"/eu/dnetlib/dhp/ircdl_extention/orcid_original.json")
|
||||
.getPath(),
|
||||
"-outputPath",
|
||||
workingDir.toString() + "/orcidNormalized"
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Orcid> tmp = sc
|
||||
.textFile(workingDir.toString() + "/orcidNormalized")
|
||||
.map(value -> OBJECT_MAPPER.readValue(value, Orcid.class));
|
||||
|
||||
tmp.foreach(v -> System.out.println(OBJECT_MAPPER.writeValueAsString(v)));
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,95 @@
|
|||
|
||||
package eu.dnetlib.dhp.ircdl_extention;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.neethi.Assertion;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Orcid;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Result;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class WrongOrcidTest {
|
||||
|
||||
@Test
|
||||
public void wrongOrcidFalse() throws Exception {
|
||||
Assertions
|
||||
.assertTrue(
|
||||
Utils
|
||||
.filterFunction(
|
||||
new Tuple2<>(Result.newInstance("veigas pires cristina"),
|
||||
Orcid
|
||||
.newInstance(
|
||||
"cristina", "veiga pires", "c veiga pires",
|
||||
Arrays.asList("c c veiga pires")))));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void wrongOrcidFalse2() throws Exception {
|
||||
Assertions
|
||||
.assertTrue(
|
||||
Utils
|
||||
.filterFunction(
|
||||
new Tuple2<>(Result.newInstance("yushkevich p"),
|
||||
Orcid
|
||||
.newInstance(
|
||||
"paul", "yushkevich", "paul a yushkevich",
|
||||
new ArrayList<>()))));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void wrongOrcidFalse3() throws Exception {
|
||||
Assertions
|
||||
.assertTrue(
|
||||
Utils
|
||||
.filterFunction(
|
||||
new Tuple2<>(Result.newInstance("ghosh ss"),
|
||||
Orcid
|
||||
.newInstance(
|
||||
"satrajit", "ghosh",
|
||||
"satra",
|
||||
Arrays.asList("satra","satrajit s ghosh")))));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void wrongOrcidTrue() throws Exception {
|
||||
Assertions
|
||||
.assertFalse(
|
||||
Utils
|
||||
.filterFunction(
|
||||
new Tuple2<>(Result.newInstance("kluft lukas"),
|
||||
Orcid
|
||||
.newInstance(
|
||||
"satrajit", "ghosh",
|
||||
"satra",
|
||||
Arrays.asList("satra","satrajit s ghosh")))));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void wrongOrcidFalse4() throws Exception {
|
||||
Assertions
|
||||
.assertTrue(
|
||||
Utils
|
||||
.filterFunction(
|
||||
new Tuple2<>(Result.newInstance("schulz s a"),
|
||||
Orcid
|
||||
.newInstance(
|
||||
"sebastian", "schulz",
|
||||
"sebastian a schulz",
|
||||
new ArrayList<>()))));
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
{"otherNames": [], "inception": "2017-05-22T16:38:30.236Z", "surname": "hyy37", "mode": "Direct", "creditname": "void", "orcid": "0000-0002-8748-6992", "works": false, "name": "1380"}
|
||||
{"otherNames": [], "inception": "2017-05-25T12:50:48.761Z", "surname": "hyy75", "mode": "Direct", "creditname": "void", "orcid": "0000-0001-7773-1109", "works": false, "name": "2775"}
|
||||
{"otherNames": [], "inception": "2017-05-28T12:07:09.154Z", "surname": "hyy13", "mode": "Direct", "creditname": "void", "orcid": "0000-0003-4728-6379", "works": false, "name": "434323"}
|
||||
{"otherNames": [], "inception": "2017-08-10T07:07:23.818Z", "surname": "hyy44", "mode": "Direct", "creditname": "void", "orcid": "0000-0001-9502-3093", "works": false, "name": "58"}
|
||||
{"otherNames": [], "inception": "2017-08-10T07:08:48.179Z", "surname": "hyy46", "mode": "Direct", "creditname": "void", "orcid": "0000-0003-2933-0057", "works": false, "name": "60"}
|
||||
{"otherNames": ["pang x y", "pang xueyong"], "inception": "2014-10-13T03:26:21.741Z", "surname": "?", "mode": "API", "creditname": "void", "orcid": "0000-0002-7397-5824", "works": true, "name": "??"}
|
||||
{"otherNames": [], "inception": "2019-08-27T07:55:06.340Z", "surname": "therasa alphonsa", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0001-7205-6036", "works": false, "name": "a"}
|
||||
{"otherNames": ["minto"], "inception": "2020-08-02T06:33:18.620Z", "surname": "karim", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0001-6111-6742", "works": false, "name": "a k mohammad fazlul"}
|
||||
{"otherNames": [], "inception": "2014-05-01T09:13:11.783Z", "surname": "al-sammak", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0001-6646-4295", "works": false, "name": "a-imam"}
|
||||
{"otherNames": [], "inception": "2019-12-06T12:53:04.045Z", "surname": "hassan", "mode": "Direct", "creditname": "void", "orcid": "0000-0003-2957-4641", "works": false, "name": "a-s.u."}
|
||||
{"otherNames": [], "inception": "2020-07-28T12:29:26.453Z", "surname": "ajakh", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0002-1081-8426", "works": false, "name": "a."}
|
||||
{"otherNames": [], "inception": "2017-01-10T12:35:05.016Z", "surname": "antol\u00ednez", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0002-5451-3421", "works": false, "name": "a. (ana)"}
|
||||
{"otherNames": [], "inception": "2018-08-20T05:00:15.964Z", "surname": "mahmudi", "mode": "Direct", "creditname": "void", "orcid": "0000-0003-3187-941X", "works": false, "name": "a. aviv"}
|
||||
{"otherNames": [], "inception": "2017-05-13T01:03:58.949Z", "surname": "akanmu", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0001-6223-5428", "works": false, "name": "a. c."}
|
||||
{"otherNames": [], "inception": "2018-01-20T02:58:05.199Z", "surname": "inci", "mode": "Direct", "creditname": "void", "orcid": "0000-0002-0427-9745", "works": true, "name": "a. can"}
|
||||
{"otherNames": ["a. kim ryan"], "inception": "2014-10-24T23:06:43.544Z", "surname": "hayes", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0002-2055-8269", "works": true, "name": "a. kim"}
|
||||
{"otherNames": [], "inception": "2017-08-10T13:38:29.172Z", "surname": "bahadir", "mode": "Direct", "creditname": "void", "orcid": "0000-0002-4045-0001", "works": false, "name": "a. tugba"}
|
||||
{"otherNames": [], "inception": "2018-08-29T07:49:31.093Z", "surname": "rayna", "mode": "Direct", "creditname": "void", "orcid": "0000-0002-7916-2031", "works": false, "name": "a.brite"}
|
||||
{"otherNames": [], "inception": "2014-07-12T08:02:39.568Z", "surname": "kalyani", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0003-2649-7126", "works": false, "name": "a.grace"}
|
||||
{"otherNames": [], "inception": "2018-07-21T12:00:22.042Z", "surname": "ahmed", "mode": "Direct", "creditname": "void", "orcid": "0000-0003-0777-5848", "works": false, "name": "a.i. mahbub uddin"}
|
||||
{"otherNames": [], "inception": "2018-04-11T13:58:53.355Z", "surname": "a.kathirvel murugan", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0003-2298-6301", "works": false, "name": "a.kathirvel murugan"}
|
||||
{"otherNames": [], "inception": "2017-08-31T11:35:48.559Z", "surname": "dar", "mode": "Direct", "creditname": "void", "orcid": "0000-0001-8781-6309", "works": false, "name": "a.rashid"}
|
||||
{"otherNames": [], "inception": "2014-08-26T00:25:30.968Z", "surname": "sayem", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0003-2461-4667", "works": false, "name": "a.s.m."}
|
||||
{"otherNames": [], "inception": "2019-10-03T01:27:08.212Z", "surname": "conte", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0003-2862-6139", "works": false, "name": "aaron"}
|
||||
{"otherNames": [], "inception": "2020-03-16T09:37:10.610Z", "surname": "rashmi", "mode": "Direct", "creditname": "void", "orcid": "0000-0003-4754-5465", "works": false, "name": "aarthi rashmi b"}
|
||||
{"otherNames": [], "inception": "2017-02-28T19:01:59.146Z", "surname": "bhaskar", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0002-5794-1165", "works": false, "name": "aastha"}
|
||||
{"otherNames": [], "inception": "2020-04-07T18:10:50.922Z", "surname": "belhabib", "mode": "Direct", "creditname": "void", "orcid": "0000-0001-6086-0588", "works": false, "name": "abdelfettah"}
|
||||
{"otherNames": [], "inception": "2019-01-13T21:50:51.923Z", "surname": "laamani", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0003-2055-2593", "works": false, "name": "abdellatif"}
|
||||
{"otherNames": ["fákē", "miñhō"], "inception": "2019-01-13T21:50:51.923Z", "surname": "laamani", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0003-2055-2593", "works": false, "name": "abdellatif"}
|
|
@ -0,0 +1,65 @@
|
|||
|
||||
package eu.dnetlib.doiboost.crossref;
|
||||
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.net.URI;
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
|
||||
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
||||
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
|
||||
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.mortbay.log.Log;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
|
||||
public class ExtractCrossrefRecords {
|
||||
public static void main(String[] args) throws Exception {
|
||||
String hdfsServerUri;
|
||||
String workingPath;
|
||||
String crossrefFileNameTarGz;
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
ExtractCrossrefRecords.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/doiboost/crossref_dump_reader.json")));
|
||||
parser.parseArgument(args);
|
||||
hdfsServerUri = parser.get("hdfsServerUri");
|
||||
workingPath = parser.get("workingPath");
|
||||
crossrefFileNameTarGz = parser.get("crossrefFileNameTarGz");
|
||||
|
||||
Path hdfsreadpath = new Path(hdfsServerUri.concat(workingPath).concat(crossrefFileNameTarGz));
|
||||
Configuration conf = new Configuration();
|
||||
conf.set("fs.defaultFS", hdfsServerUri.concat(workingPath));
|
||||
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
|
||||
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
|
||||
FileSystem fs = FileSystem.get(URI.create(hdfsServerUri.concat(workingPath)), conf);
|
||||
FSDataInputStream crossrefFileStream = fs.open(hdfsreadpath);
|
||||
try (TarArchiveInputStream tais = new TarArchiveInputStream(
|
||||
new GzipCompressorInputStream(crossrefFileStream))) {
|
||||
TarArchiveEntry entry = null;
|
||||
while ((entry = tais.getNextTarEntry()) != null) {
|
||||
if (entry.isDirectory()) {
|
||||
} else {
|
||||
try (
|
||||
FSDataOutputStream out = fs
|
||||
.create(new Path(workingPath.concat("filess/").concat(entry.getName()).concat(".gz")));
|
||||
GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) {
|
||||
|
||||
IOUtils.copy(tais, gzipOs);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
Log.info("Crossref dump reading completed");
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,7 @@
|
|||
[
|
||||
{"paramName":"n", "paramLongName":"hdfsServerUri", "paramDescription": "the server uri", "paramRequired": true},
|
||||
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the default work path", "paramRequired": true},
|
||||
{"paramName":"f", "paramLongName":"crossrefFileNameTarGz", "paramDescription": "the name of the activities orcid file", "paramRequired": true},
|
||||
{"paramName":"issm", "paramLongName":"isSparkSessionManaged", "paramDescription": "the name of the activities orcid file", "paramRequired": false}
|
||||
|
||||
]
|
|
@ -0,0 +1,42 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hive_metastore_uris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<value>/user/spark/spark2ApplicationHistory</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,68 @@
|
|||
<workflow-app name="read Crossref dump from HDFS" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<!-- <property>-->
|
||||
<!-- <name>workingPath</name>-->
|
||||
<!-- <description>the working dir base path</description>-->
|
||||
<!-- </property>-->
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
|
||||
</parameters>
|
||||
|
||||
<start to="ReadCrossRefDump"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="ReadCrossRefDump">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords</main-class>
|
||||
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg>
|
||||
<arg>--workingPath</arg><arg>/data/doiboost/crossref/</arg>
|
||||
<arg>--crossrefFileNameTarGz</arg><arg>crossref.tar.gz</arg>
|
||||
</java>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="SparkReadCrossRefDump">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>SparkReadCrossRefDump</name>
|
||||
<class>eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords</class>
|
||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--conf spark.dynamicAllocation.enabled=true
|
||||
--conf spark.dynamicAllocation.maxExecutors=20
|
||||
--executor-memory=6G
|
||||
--driver-memory=7G
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
</spark-opts>
|
||||
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg>
|
||||
<arg>--workingPath</arg><arg>/data/doiboost/crossref/</arg>
|
||||
<arg>--crossrefFileNameTarGz</arg><arg>crossref.tar.gz</arg>
|
||||
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -4,6 +4,11 @@ package eu.dnetlib.dhp.resulttoorganizationfrominstrepo;
|
|||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.spark.SparkConf;
|
||||
|
@ -22,11 +27,6 @@ import eu.dnetlib.dhp.schema.oaf.Datasource;
|
|||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
public class PrepareResultInstRepoAssociation {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PrepareResultInstRepoAssociation.class);
|
||||
|
@ -56,9 +56,10 @@ public class PrepareResultInstRepoAssociation {
|
|||
final String alreadyLinkedPath = parser.get("alreadyLinkedPath");
|
||||
log.info("alreadyLinkedPath {}: ", alreadyLinkedPath);
|
||||
|
||||
List<String> blacklist = Optional.ofNullable(parser.get("blacklist"))
|
||||
.map(v -> Arrays.asList(v.split(";")))
|
||||
.orElse(new ArrayList<>());
|
||||
List<String> blacklist = Optional
|
||||
.ofNullable(parser.get("blacklist"))
|
||||
.map(v -> Arrays.asList(v.split(";")))
|
||||
.orElse(new ArrayList<>());
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
||||
|
@ -91,21 +92,20 @@ public class PrepareResultInstRepoAssociation {
|
|||
private static void prepareDatasourceOrganization(
|
||||
SparkSession spark, String datasourceOrganizationPath, List<String> blacklist) {
|
||||
String blacklisted = "";
|
||||
if(blacklist.size() > 0 ){
|
||||
blacklisted = " AND d.id != '" + blacklist.get(0) + "'";
|
||||
if (blacklist.size() > 0) {
|
||||
blacklisted = " AND ds.id != '" + blacklist.get(0) + "'";
|
||||
for (int i = 1; i < blacklist.size(); i++) {
|
||||
blacklisted += " AND d.id != '" + blacklist.get(i) + "'";
|
||||
blacklisted += " AND ds.id != '" + blacklist.get(i) + "'";
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
String query = "SELECT source datasourceId, target organizationId "
|
||||
+ "FROM ( SELECT id "
|
||||
+ "FROM datasource "
|
||||
+ "WHERE datasourcetype.classid = '"
|
||||
+ "FROM datasource ds "
|
||||
+ "WHERE ds.datasourcetype.classid = '"
|
||||
+ INSTITUTIONAL_REPO_TYPE
|
||||
+ "' "
|
||||
+ "AND datainfo.deletedbyinference = false " + blacklisted + " ) d "
|
||||
+ "AND ds.datainfo.deletedbyinference = false " + blacklisted + " ) d "
|
||||
+ "JOIN ( SELECT source, target "
|
||||
+ "FROM relation "
|
||||
+ "WHERE lower(relclass) = '"
|
||||
|
|
Loading…
Reference in New Issue