This commit is contained in:
Miriam Baglioni 2021-06-09 13:25:19 +02:00
parent 6cdc4d3bf3
commit 72771a1254
9 changed files with 591 additions and 324 deletions

View File

@ -3,28 +3,40 @@ package eu.dnetlib.dhp.ircdl_extention;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.ircdl_extention.model.Result;
public class PrepareCrossrefSpark {
final static String OUTPUT_PATH = "/tmp/miriam/IRCDL_Extention/ResultFromDOIMinter/Crossref/";
final static String RESULT_PATH = "/tmp/miriam/IRCDL_Extention/ResultWithOrcid/Normalized/";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
Boolean isSparkSessionManaged = Boolean.TRUE;
String jsonConfiguration = IOUtils
.toString(
PrepareCrossrefSpark.class
.getResourceAsStream(
"/eu/dnetlib/dhp/ircdl_extention/prepare_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
final String inputPath = parser.get("inputPath");
final String outputPath = parser.get("outputPath");
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
@ -33,13 +45,11 @@ public class PrepareCrossrefSpark {
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, OUTPUT_PATH);
exec(spark, OUTPUT_PATH, RESULT_PATH);
Utils.removeOutputDir(spark, outputPath);
exec(spark, outputPath, inputPath);
});
}
private static void exec(SparkSession spark, String output_path, String result_path)
throws Exception {
@ -62,16 +72,16 @@ public class PrepareCrossrefSpark {
}
private static Dataset<Result> selectResult(SparkSession spark, String result_path, String output_path) {
Dataset<Result> res = Utils.readPath(
spark, result_path, Result.class)
.filter(
(FilterFunction<Result>) r -> !r.getId().startsWith("50|dedup") &&
r.getCf().stream().anyMatch(cf -> cf.getValue().equals("Crossref")));
Dataset<Result> res = Utils
.readPath(
spark, result_path, Result.class)
.filter(
(FilterFunction<Result>) r -> !r.getId().startsWith("50|dedup") &&
r.getCf().stream().anyMatch(cf -> cf.getValue().equals("Crossref")));
res.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(output_path);
return res;
}
}

View File

@ -3,28 +3,40 @@ package eu.dnetlib.dhp.ircdl_extention;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.ircdl_extention.model.Result;
public class PrepareDataciteSpark {
final static String OUTPUT_PATH = "/tmp/miriam/IRCDL_Extention/ResultFromDOIMinter/Datacite/";
final static String RESULT_PATH = "/tmp/miriam/IRCDL_Extention/ResultWithOrcid/Normalized/";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
Boolean isSparkSessionManaged = Boolean.TRUE;
String jsonConfiguration = IOUtils
.toString(
PrepareDataciteSpark.class
.getResourceAsStream(
"/eu/dnetlib/dhp/ircdl_extention/prepare_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
final String inputPath = parser.get("inputPath");
final String outputPath = parser.get("outputPath");
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
@ -33,13 +45,11 @@ public class PrepareDataciteSpark {
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, OUTPUT_PATH);
exec(spark, OUTPUT_PATH, RESULT_PATH);
Utils.removeOutputDir(spark, outputPath);
exec(spark, outputPath, inputPath);
});
}
private static void exec(SparkSession spark, String output_path, String result_path)
throws Exception {
@ -80,15 +90,15 @@ public class PrepareDataciteSpark {
private static Dataset<Result> selectResult(SparkSession spark, String result_path, String output_path) {
Dataset<eu.dnetlib.dhp.ircdl_extention.Result> res = Utils.readPath(
spark, result_path, eu.dnetlib.dhp.ircdl_extention.Result.class)
.filter(
(FilterFunction<eu.dnetlib.dhp.ircdl_extention.Result>) r -> r.getId().startsWith("50|datacite"));
Dataset<Result> res = Utils
.readPath(
spark, result_path, Result.class)
.filter(
(FilterFunction<Result>) r -> r.getId().startsWith("50|datacite"));
res.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(output_path);
res.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(output_path + "Datacite");
return res;
}
}

View File

@ -1,6 +1,12 @@
package eu.dnetlib.dhp.ircdl_extention;
import com.fasterxml.jackson.databind.ObjectMapper;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
@ -8,48 +14,62 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.ircdl_extention.model.Orcid;
public class PrepareNormalizedOrcid {
final static String OUTPUT_PATH = "/tmp/miriam/IRCDL_Extention/ORCID/entrySetMayNormalized/";
final static String INPUT_PATH = "/tmp/IRCDL_Extention/entrySetMay/";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareNormalizedOrcid.class
.getResourceAsStream(
"/eu/dnetlib/dhp/ircdl_extention/prepare_parameters.json"));
Boolean isSparkSessionManaged = Boolean.TRUE;
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
parser.parseArgument(args);
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, OUTPUT_PATH);
execNormalize(spark, OUTPUT_PATH, INPUT_PATH);
});
}
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
private static void execNormalize(SparkSession spark, String outputPath, String inputPath) {
Dataset<Orcid> orcid = Utils.readPath(spark, inputPath, Orcid.class);
orcid.map((MapFunction<Orcid, Orcid>) o ->{
o.setName(Utils.normalizeString(Utils.replace_chars(o.getName())));
o.setSurname(Utils.normalizeString(Utils.replace_chars(o.getSurname())));
o.setCreditname(Utils.normalizeString(Utils.replace_chars(o.getCreditname())));
o.setOtherNames(o.getOtherNames().stream()
.map(on -> Utils.normalizeString(Utils.replace_chars(on))).collect(Collectors.toList()));
return o;
}, Encoders.bean(Orcid.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
final String inputPath = parser.get("inputPath");
final String outputPath = parser.get("outputPath");
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
execNormalize(spark, outputPath, inputPath);
});
}
private static void execNormalize(SparkSession spark, String outputPath, String inputPath) {
Dataset<Orcid> orcid = Utils.readPath(spark, inputPath, Orcid.class);
orcid.map((MapFunction<Orcid, Orcid>) o -> {
o.setName(Utils.normalizeString(o.getName()));
o.setSurname(Utils.normalizeString(o.getSurname()));
o.setCreditname(Utils.normalizeString(o.getCreditname()));
o
.setOtherNames(
o
.getOtherNames()
.stream()
.map(on -> Utils.normalizeString(on))
.collect(Collectors.toList()));
return o;
}, Encoders.bean(Orcid.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
}

View File

@ -1,52 +1,88 @@
package eu.dnetlib.dhp.ircdl_extention;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.ResultTypeComparator;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.ircdl_extention.model.Result;
public class PrepareNormalizedResultSpark {
final static String OUTPUT_PATH = "/tmp/miriam/IRCDL_Extention/ResultWithOrcid/Normalized/";
final static String INPUT_PATH = "/tmp/IRCDL_Extention/entrySetMay/";
public static void main(String[] args) throws Exception {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
String jsonConfiguration = IOUtils
.toString(
PrepareNormalizedResultSpark.class
.getResourceAsStream(
"/eu/dnetlib/dhp/ircdl_extention/prepare_parameters.json"));
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
Boolean isSparkSessionManaged = Boolean.TRUE;
parser.parseArgument(args);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, OUTPUT_PATH);
execNormalize(spark, OUTPUT_PATH, INPUT_PATH);
});
}
final String inputPath = parser.get("inputPath");
private static void execNormalize(SparkSession spark, String outputPath, String inputPath) {
Dataset<Result> result = Utils.readPath(spark, inputPath, Result.class);
result.map((MapFunction<Result, Result>) r ->{
r.setName(Utils.normalizeString(Utils.replace_chars(r.getName())));
r.setSurname(Utils.normalizeString(Utils.replace_chars(r.getSurname())));
r.setFullname(Utils.normalizeString(Utils.replace_chars(r.getFullname())));
return r;
}, Encoders.bean(Result.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
final String outputPath = parser.get("outputPath");
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
execNormalize(spark, outputPath, inputPath);
});
}
private static void execNormalize(SparkSession spark, String outputPath, String inputPath) {
Dataset<Result> normalized_result = Utils
.readPath(spark, inputPath + "publicationsWithOrcid", Result.class)
.union(Utils.readPath(spark, inputPath + "datasetWithOrcid", Result.class))
.union(Utils.readPath(spark, inputPath + "softwareWithOrcid", Result.class))
.union(Utils.readPath(spark, inputPath + "otherWithOrcid", Result.class))
.map((MapFunction<Result, Result>) r -> {
r.setName(Utils.normalizeString(r.getName()));
r.setSurname(Utils.normalizeString(r.getSurname()));
r.setFullname(Utils.normalizeString(r.getFullname()));
return r;
}, Encoders.bean(Result.class));
normalized_result
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath + "ResultWithOrcid");
normalized_result
.filter((FilterFunction<Result>) r -> !r.getId().startsWith("50|dedup"))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath + "collectedResultWithOrcid");
normalized_result
.filter((FilterFunction<Result>) r -> !r.getDeletedbyinference())
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath + "notDeletedByInferenceResultWithOrcid");
}
}

View File

@ -6,31 +6,44 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.ircdl_extention.model.Result;
import scala.Tuple2;
public class PrepareResultAllTheRestSpark {
final static String RESULT_PATH = "/tmp/miriam/IRCDL_Extention/ResultWithOrcid/Normalized/";
final static String INSTREPO_PATH = "/tmp/miriam/IRCDL_Extention/ResultFromInstReposWithOrcid/";
final static String DATACITE_PATH = "/tmp/miriam/IRCDL_Extention/ResultFromDOIMinter/Datacite/";
final static String CROSSREF_PATH = "/tmp/miriam/IRCDL_Extention/ResultFromDOIMinter/Crossref/";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
Boolean isSparkSessionManaged = Boolean.TRUE;
String jsonConfiguration = IOUtils
.toString(
PrepareResultAllTheRestSpark.class
.getResourceAsStream(
"/eu/dnetlib/dhp/ircdl_extention/prepare_alltherest_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
final String inputPath = parser.get("inputPath");
final String outputPath = parser.get("outputPath");
final String instRepoPath = parser.get("instrepoPath");
final String crossrefPath = parser.get("crossrefPath");
final String datacitePath = parser.get("datacitePath");
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
@ -39,16 +52,14 @@ public class PrepareResultAllTheRestSpark {
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, RESULT_PATH + "CollectedNotDataciteNotCrossrefNotFromInstrepos");
Utils.removeOutputDir(spark, outputPath + "allTheRest");
exec(
spark, RESULT_PATH + "CollectedNotDataciteNotCrossrefNotFromInstrepos",
RESULT_PATH + "graphCollectedWithOrcid", INSTREPO_PATH,
DATACITE_PATH, CROSSREF_PATH);
spark, outputPath + "allTheRest",
inputPath, instRepoPath,
datacitePath, crossrefPath);
});
}
/**
* Leggo tutti i result di crossref, datacite ed associati agli institutional repositories
* Leggo tutti i result collezionati
@ -63,17 +74,20 @@ public class PrepareResultAllTheRestSpark {
Dataset<Result> result = Utils.readPath(spark, result_path, Result.class);
Dataset<Result> inst_repo = Utils.readPath(spark, inst_repo_path + "graphResultPubsWithOrcidInstRepos", Result.class)
Dataset<Result> inst_repo = Utils
.readPath(spark, inst_repo_path + "graphResultPubsWithOrcidInstRepos", Result.class)
.union(Utils.readPath(spark, inst_repo_path + "graphResultDatsWithOrcidInstRepos", Result.class))
.union(Utils.readPath(spark, inst_repo_path + "graphResultSwWithOrcidInstRepos", Result.class))
.union(Utils.readPath(spark, inst_repo_path + "graphResultOtherWithOrcidInstRepos", Result.class));
Dataset<Result> datacite = Utils.readPath(spark, datacite_path + "datacitePubsWithOrcid", Result.class)
Dataset<Result> datacite = Utils
.readPath(spark, datacite_path + "datacitePubsWithOrcid", Result.class)
.union(Utils.readPath(spark, datacite_path + "dataciteDatsWithOrcid", Result.class))
.union(Utils.readPath(spark, datacite_path + "dataciteSwWithOrcid", Result.class))
.union(Utils.readPath(spark, datacite_path + "dataciteOtherWithOrcid", Result.class));
Dataset<Result> crossref = Utils.readPath(spark, crossref_path + "crossrefPubsWithOrcid", Result.class)
Dataset<Result> crossref = Utils
.readPath(spark, crossref_path + "crossrefPubsWithOrcid", Result.class)
.union(Utils.readPath(spark, crossref_path + "crossrefDatsWithOrcid", Result.class))
.union(Utils.readPath(spark, crossref_path + "crossrefSwWithOrcid", Result.class))
.union(Utils.readPath(spark, crossref_path + "crossrefOtherWithOrcid", Result.class));
@ -95,6 +109,4 @@ public class PrepareResultAllTheRestSpark {
}
}

View File

@ -3,35 +3,44 @@ package eu.dnetlib.dhp.ircdl_extention;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import javax.xml.crypto.Data;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.ircdl_extention.model.Result;
import eu.dnetlib.dhp.schema.oaf.Datasource;
public class PrepareResultFromInstRepo {
final static String OUTPUT_PATH = "/tmp/miriam/IRCDL_Extention/ResultFromInstReposWithOrcid/";
final static String INPUT_PATH = "/tmp/miriam/graph_20210503/datasource";
final static String RESULT_PATH = "/tmp/miriam/IRCDL_Extention/ResultWithOrcid/Normalized/";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
Boolean isSparkSessionManaged = Boolean.TRUE;
String jsonConfiguration = IOUtils
.toString(
PrepareResultFromInstRepo.class
.getResourceAsStream(
"/eu/dnetlib/dhp/ircdl_extention/prepare_instrepo_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
final String inputPath = parser.get("inputPath");
final String outputPath = parser.get("outputPath");
final String datasourcePath = parser.get("datasourcePath");
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
@ -40,12 +49,11 @@ public class PrepareResultFromInstRepo {
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, OUTPUT_PATH);
exec(spark, INPUT_PATH, OUTPUT_PATH, RESULT_PATH);
Utils.removeOutputDir(spark, outputPath);
exec(spark, inputPath, outputPath, datasourcePath);
});
}
private static void exec(SparkSession spark, String input_path, String output_path, String result_path)
throws Exception {
@ -71,13 +79,14 @@ public class PrepareResultFromInstRepo {
private static void selectResultFromInstRepo(SparkSession spark, String result_path, String output_path,
String input_path) {
Dataset<Datasource> datasource = Utils.readPath(spark, input_path, Datasource.class);
Dataset<eu.dnetlib.dhp.ircdl_extention.Result> res = Utils.readPath(
spark, result_path, eu.dnetlib.dhp.ircdl_extention.Result.class)
.filter(
(FilterFunction<eu.dnetlib.dhp.ircdl_extention.Result>) r -> !r.getId().startsWith("50|doiboost")
&& !r.getId().startsWith("50|scholix")
&& !r.getId().startsWith("50|datacite")
&& !r.getId().startsWith("50|dedup"));
Dataset<Result> res = Utils
.readPath(
spark, result_path, Result.class)
.filter(
(FilterFunction<Result>) r -> !r.getId().startsWith("50|doiboost")
&& !r.getId().startsWith("50|scholix")
&& !r.getId().startsWith("50|datacite")
&& !r.getId().startsWith("50|dedup"));
datasource.createOrReplaceTempView("datasource");
res.createOrReplaceTempView("result");
@ -102,5 +111,4 @@ public class PrepareResultFromInstRepo {
}
}

View File

@ -8,45 +8,60 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.PrepareProgramme;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.ircdl_extention.model.KeyValue;
import eu.dnetlib.dhp.ircdl_extention.model.Result;
import eu.dnetlib.dhp.schema.common.ModelConstants;
public class PrepareResultSpark {
private static final Logger log = LoggerFactory.getLogger(PrepareProgramme.class);
public static void main(String[] args) throws Exception {
final String OUTPUT_PATH = "/tmp/miriam/IRCDL_Extention/ResultWithOrcid/";
final String INPUT_PATH = "/tmp/miriam/graph_20210503/";
String jsonConfiguration = IOUtils
.toString(
PrepareResultSpark.class
.getResourceAsStream(
"/eu/dnetlib/dhp/ircdl_extention/prepare_result_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final String resultClassName = parser.get("resultClass");
Class<? extends eu.dnetlib.dhp.schema.oaf.Result> resultClazz = (Class<? extends eu.dnetlib.dhp.schema.oaf.Result>) Class
.forName(resultClassName);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
final String inputPath = parser.get("inputPath");
final String outputPath = parser.get("outputPath");
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
Boolean.TRUE,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, OUTPUT_PATH);
exec(spark, INPUT_PATH, OUTPUT_PATH);
Utils.removeOutputDir(spark, outputPath);
mapToResult(spark, inputPath, resultClazz, outputPath);
});
}
private static <R extends eu.dnetlib.dhp.schema.oaf.Result> Dataset<Result> mapToResult(SparkSession spark,
private static <R extends eu.dnetlib.dhp.schema.oaf.Result> void mapToResult(SparkSession spark,
String input_path,
Class<R> resultClazz, String output_path) {
Dataset<R> publicationDataset = Utils.readPath(spark, input_path, resultClazz);
@ -82,9 +97,9 @@ public class PrepareResultSpark {
pid -> KeyValue
.newInstance(pid.getQualifier().getClassid(), pid.getValue()))
.collect(Collectors.toList()));
r.setName(a.getName());
r.setSurname(a.getSurname());
r.setFullname(a.getFullname());
r.setName(a.getName().toLowerCase());
r.setSurname(a.getSurname().toLowerCase());
r.setFullname(a.getFullname().toLowerCase());
r.setOid(apid.getValue());
reslist.add(r);
}
@ -98,46 +113,6 @@ public class PrepareResultSpark {
.option("compressio", "gzip")
.mode(SaveMode.Overwrite)
.json(output_path);
return result;
}
private static void exec(SparkSession spark, String input_path, String output_path) throws Exception {
Dataset<Result> result = mapToResult(
spark, input_path + "publication",
(Class<? extends eu.dnetlib.dhp.schema.oaf.Result>) Class.forName("eu.dnetlib.dhp.schema.oaf.Publication"),
output_path + "graphResultPubsWithOrcid")
.union(
mapToResult(
spark, input_path + "dataset",
(Class<? extends eu.dnetlib.dhp.schema.oaf.Result>) Class
.forName("eu.dnetlib.dhp.schema.oaf.Dataset"),
output_path + "graphResultDatsWithOrcid"))
.union(
mapToResult(
spark, input_path + "software",
(Class<? extends eu.dnetlib.dhp.schema.oaf.Result>) Class
.forName("eu.dnetlib.dhp.schema.oaf.Software"),
output_path + "graphResultSwWithOrcid"))
.union(
mapToResult(
spark, input_path + "otherresearchproduct",
(Class<? extends eu.dnetlib.dhp.schema.oaf.Result>) Class
.forName("eu.dnetlib.dhp.schema.oaf.OtherResearchProduct"),
output_path + "graphResultOtherWithOrcid"));
result
.filter((FilterFunction<Result>) r -> !r.getId().startsWith("50|dedup"))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(output_path + "graphCollectedWithOrcid");
result
.filter((FilterFunction<Result>) r -> !r.getDeletedbyinference())
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(output_path + "graphNotDeletedbyinferenceWithOrcid");
}

View File

@ -1,65 +1,194 @@
package eu.dnetlib.dhp.ircdl_extention;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport;
import java.io.Serializable;
import java.text.Normalizer;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.similarity.CosineDistance;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import java.io.Serializable;
import java.text.Normalizer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wcohen.ss.JaroWinkler;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.ircdl_extention.model.Orcid;
import eu.dnetlib.dhp.ircdl_extention.model.Result;
import scala.Tuple2;
public class Utils implements Serializable {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static String normalizeString(String input){
if(input == null)
return new String();
return Normalizer
.normalize(input, Normalizer.Form.NFKD)
.replaceAll("[^\\p{ASCII}]", "");
public static String normalizeString(String input) {
if (input == null || input.equals("void"))
return new String();
String tmp = Normalizer
.normalize(input, Normalizer.Form.NFKD)
.replaceAll("[^\\p{ASCII}]", "");
tmp = tmp
.replaceAll("[^\\p{Alpha}]+", " ")
.replaceAll("\\s+", " ")
.trim();
return tmp;
}
}
public static String replace_chars(String input){
String tmp = input.replaceAll("@", " ")
.replaceAll(".", " ")
.replaceAll("=", " ")
.replaceAll(",", " ")
.replaceAll("", " ")
.replaceAll("", " ")
.replaceAll("'", " ")
.replaceAll("\\+", " ")
.replaceAll("-", " ")
.replaceAll("", " ")
.replaceAll("", " ")
.replaceAll("", " ")
.replaceAll("", " ")
.replaceAll("", " ")
.replace("\"", " ")
.replaceAll("/" , " ")
.replace("", " ")
.replaceAll("\\[" ," ")
.replaceAll("]" , " ")
.replaceAll("\\(", " ")
.replaceAll("\\)", " ").replaceAll("s+", " ");
return tmp.trim();
}
public static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
public static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
public static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
private static List<String> getList(List<String> input) {
return input.stream().map(st -> st.trim()).filter(st -> st.length() > 0).collect(Collectors.toList());
public static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
}
public static boolean filterFunction(Tuple2<Result, Orcid> input) {
List<String> res = getList(Arrays.asList(input._1().getFullname().split(" ")))
.stream()
.sorted()
.collect(Collectors.toList());
Orcid or = input._2();
return checkContains(res, getList(Arrays.asList(new String[] {
or.getName(), or.getSurname()
}))
.stream()
.sorted()
.collect(Collectors.toList())) ||
checkContains(
res, getList(Arrays.asList(or.getCreditname().split(" ")))
.stream()
.sorted()
.collect(Collectors.toList()))
||
or
.getOtherNames()
.stream()
.anyMatch(
on -> checkContains(
res, getList(Arrays.asList(on.split(" ")))
.stream()
.sorted()
.collect(Collectors.toList())));
}
private static boolean checkContains(List<String> result, List<String> orcid) {
if (result.size() == 0 || orcid.size() == 0) {
return true;
}
String[][] input = {
{
"1", StringUtils.joinWith(" ", result)
},
{
"2", StringUtils.joinWith(" ", orcid)
}
};
// exact match word by word
Double cosineDistance = new CosineDistance().apply(input[0][1], input[1][1]);
if (Math.round((1 - cosineDistance) * 100) == 100) {
return true;
}
// check containment one list can be greater than the other, and also composition of words to create the name
// e.g. pengli yan = li peng yan
if (orcid.size() < result.size()) {
if (isIn(orcid, result))
return true;
} else {
if (isIn(result, orcid))
return true;
}
// apply JaroWinkler distance
double score = new JaroWinkler()
.score(StringUtils.joinWith(" ", result), StringUtils.joinWith(" ", orcid));
return score > 0.95;
}
private static boolean isIn(List<String> lst1, List<String> lst2) {
int index = 0;
for (String word : lst1) {
int i = index;
boolean found = false;
while (i < lst2.size()) {
String wordlist = lst2.get(i);
if (word == wordlist) {
index = i + 1;
i = lst2.size();
found = true;
} else {
if (word.charAt(0) < wordlist.charAt(0)) {
if (!checkComposition(word, lst2)) {
return false;
} else {
index = 0;
i = lst2.size();
found = true;
}
} else {
if (word.length() == 1 || wordlist.length() == 1) {
if (word.charAt(0) == wordlist.charAt(0)) {
index = i + 1;
i = lst2.size();
found = true;
} else {
i++;
}
} else {
i++;
}
}
}
}
if (!found) {
if (!checkComposition(word, lst2)) {
return false;
} else {
index = 0;
}
}
}
return true;
}
private static boolean checkComposition(String word, List<String> lst2) {
for (int i = 0; i < lst2.size(); i++) {
for (int j = 0; j < lst2.size(); j++) {
if (i != j) {
String w = lst2.get(i) + lst2.get(j);
if (word.equals(w)) {
if (i > j) {
lst2.remove(i);
lst2.remove(j);
} else {
lst2.remove(j);
lst2.remove(i);
}
return true;
}
}
}
}
return false;
}
}

View File

@ -1,87 +1,154 @@
package eu.dnetlib.dhp.ircdl_extention;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.PrepareProgramme;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapGroupsFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.ircdl_extention.model.Orcid;
import eu.dnetlib.dhp.ircdl_extention.model.Result;
import eu.dnetlib.dhp.ircdl_extention.model.ShuffleInfo;
import scala.Tuple2;
public class WrongSpark {
/**
* takes as input the orcid normalized and the entry normalized to be checked against orcid
* returns the lower bound of wrong attribution
*/
/**
* takes as input the orcid normalized and the entry normalized to be checked against orcid
* returns the lower bound of wrong attribution
*/
//
// final static String OUTPUT_PATH = "/tmp/miriam/IRCDL_Extention/Wrong/";
// final static String ORCID_PATH = "/tmp/miriam/IRCDL_Extention/ORCID/entrySetMayNormalized/";
// final static String RESULT_PATH = "/tmp/miriam/IRCDL_Extention/ResultWithOrcid/";
public static void main(String[] args) throws Exception {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
String jsonConfiguration = IOUtils
.toString(
WrongSpark.class
.getResourceAsStream(
"/eu/dnetlib/dhp/ircdl_extention/wrong_orcid_parameters.json"));
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
String jsonConfiguration = IOUtils
.toString(
PrepareProgramme.class
.getResourceAsStream(
"/eu/dnetlib/dhp/ircdl_extention/wrong_orcid_parameters.json"));
parser.parseArgument(args);
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
parser.parseArgument(args);
final String orcidPath = parser.get("orcidPath");
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
final String outputPath = parser.get("outputPath");
final String resultPath = parser.get("inputPath");
final String orcidPath = parser.get("orcidPath");
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
findWrong(spark, orcidPath, outputPath + "/wrong", resultPath);
findShuffle(spark, orcidPath, outputPath + "/shuffle", resultPath);
});
}
final String outputPath = parser.get("outputPath");
private static void findShuffle(SparkSession spark, String orcidPath, String outputPath, String resultPath) {
final String resultPath = parser.get("resultPath");
Utils
.readPath(spark, resultPath, Result.class)
.map(
(MapFunction<Result, ShuffleInfo>) r -> ShuffleInfo
.newInstance(r.getName(), r.getSurname(), r.getFullname(), r.getId()),
Encoders.bean(ShuffleInfo.class))
.union(
getWrong(spark, orcidPath, resultPath)
.map((MapFunction<Tuple2<Result, Orcid>, ShuffleInfo>) t2 ->
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
ShuffleInfo
.newInstance(
t2._1().getName(), t2._1().getSurname(), t2._1().getFullname(),
t2._1().getId(), t2._2().getName(), t2._2().getSurname(),
t2._2().getCreditname(), t2._2().getOtherNames()),
Encoders.bean(ShuffleInfo.class)))
.groupByKey((MapFunction<ShuffleInfo, String>) si -> si.getId(), Encoders.STRING())
.flatMapGroups((FlatMapGroupsFunction<String, ShuffleInfo, ShuffleInfo>) (s, it) -> {
List<ShuffleInfo> shuffleInfoList = new ArrayList();
List<ShuffleInfo> ret = new ArrayList<>();
shuffleInfoList.add(it.next());
it.forEachRemaining(e -> shuffleInfoList.add(e));
shuffleInfoList
.stream()
.filter(e -> Optional.ofNullable(e.getOrcid()).isPresent())
.forEach(e -> {
if (checkShuffle(e, shuffleInfoList))
ret.add(e);
});
return ret.iterator();
}, Encoders.bean(ShuffleInfo.class))
.filter(Objects::nonNull);
/*
* def checkShuffle(x): alis = [a for a in x[1]] dic = {} count = 0 for entry in alis: if entry['orcid'] != '':
* dic[entry['orcid']] = entry for orcid in dic: name = dic[orcid]['oname'] surname = dic[orcid]['osurname'] for
* author in alis: if author['aname'] == "" or author['asurname'] == "": if checkContains([author['afullname']],
* addInListAll([], name + " " + surname)): count += 1 break else: if checkContains([author['aname'] + " " +
* author['asurname']], addInListAll([], name + " " + surname)): count += 1 break return count
*/
// candidate_shuffle = zenodo_normalized.map(lambda x: (x['id'], {'aname':x['name'], 'asurname': x['surname'],
// 'afullname': x['fullname'], 'oname':"", 'osurname':"", 'orcid':''})). union (
// join_orcid_filtered.map(lambda e: (e['id'], {'aname':e['nameg'], 'asurname':e['surnameg'],
// 'afullname':e['fullnameg'], 'oname':e['name'],
// 'osurname':e['surname'],'orcid':e['orcid']}))).groupByKey().filter(toBeChecked)
}
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
findWrong(spark, orcidPath, outputPath, resultPath);
});
}
private static boolean checkShuffle(ShuffleInfo e, List<ShuffleInfo> shuffleInfoList) {
private static void findWrong(SparkSession spark, String orcidPath, String outputPath, String resultPath) {
return shuffleInfoList
.stream()
.anyMatch(
si -> Utils
.filterFunction(
new Tuple2<>(Result.newInstance(si.getAfullname()),
Orcid
.newInstance(
si.getOname(), si.getOsurname(), si.getOcreditName(), si.getoOtherNames()))));
}
Dataset<Orcid> orcidDataset = Utils.readPath(spark, orcidPath, Orcid.class);
Dataset<Result> resultDataset = Utils.readPath(spark, resultPath, Result.class);
private static Dataset<Tuple2<Result, Orcid>> getWrong(SparkSession spark, String orcidPath, String resultPath) {
Dataset<Orcid> orcidDataset = Utils
.readPath(spark, orcidPath, Orcid.class)
.filter((FilterFunction<Orcid>) o -> !o.getName().contains("deactivated"));
Dataset<Result> resultDataset = Utils.readPath(spark, resultPath, Result.class);
resultDataset.joinWith(orcidDataset, resultDataset.col("oid")
.equalTo(orcidDataset.col("oid")), "left")
.filter((FilterFunction<Tuple2<Result, Orcid>>) tp -> isWrong(tp))
.write()
.option("compression","gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
return resultDataset
.joinWith(
orcidDataset, resultDataset
.col("oid")
.equalTo(orcidDataset.col("orcid")),
"inner")
.filter((FilterFunction<Tuple2<Result, Orcid>>) t2 -> !Utils.filterFunction(t2));
}
}
private static void findWrong(SparkSession spark, String orcidPath, String outputPath, String resultPath) {
getWrong(spark, orcidPath, resultPath)
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
private static boolean isWrong(Tuple2<Result, Orcid> tp) {
return false;
}
}
}