This commit is contained in:
Miriam Baglioni 2021-06-21 09:14:32 +02:00
parent 0eda93b3eb
commit 2f6673e678
19 changed files with 510 additions and 109 deletions

View File

@ -46,35 +46,14 @@ public class PrepareCrossrefSpark {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
exec(spark, outputPath, inputPath);
selectResult(spark, inputPath, outputPath);
});
}
private static void exec(SparkSession spark, String output_path, String result_path)
throws Exception {
Dataset<Result> datacite = selectResult(
spark, result_path + "graphResultPubsWithOrcid",
output_path + "crossrefPubsWithOrcid")
.union(
selectResult(
spark, result_path + "graphResultDatsWithOrcid",
output_path + "crossrefDatsWithOrcid"))
.union(
selectResult(
spark, result_path + "graphResultSwWithOrcid",
output_path + "crossrefSwWithOrcid"))
.union(
selectResult(
spark, result_path + "graphResultOtherWithOrcid",
output_path + "crossrefOtherWithOrcid"));
}
private static Dataset<Result> selectResult(SparkSession spark, String result_path, String output_path) {
private static Dataset<Result> selectResult(SparkSession spark, String input_path, String output_path) {
Dataset<Result> res = Utils
.readPath(
spark, result_path, Result.class)
spark, input_path, Result.class)
.filter(
(FilterFunction<Result>) r -> !r.getId().startsWith("50|dedup") &&
r.getCf().stream().anyMatch(cf -> cf.getValue().equals("Crossref")));

View File

@ -50,25 +50,15 @@ public class PrepareDataciteSpark {
});
}
private static void exec(SparkSession spark, String output_path, String result_path)
throws Exception {
private static void exec(SparkSession spark, String output_path, String input_path) {
Dataset<Result> datacite = selectResult(
spark, result_path + "graphResultPubsWithOrcid",
output_path + "datacitePubsWithOrcid")
.union(
selectResult(
spark, result_path + "graphResultDatsWithOrcid",
output_path + "dataciteDatsWithOrcid"))
.union(
selectResult(
spark, result_path + "graphResultSwWithOrcid",
output_path + "dataciteSwWithOrcid"))
.union(
selectResult(
spark, result_path + "graphResultOtherWithOrcid",
output_path + "dataciteOtherWithOrcid"));
Dataset<Result> datacite = Utils
.readPath(
spark, input_path, Result.class)
.filter(
(FilterFunction<Result>) r -> r.getId().startsWith("50|datacite"));
datacite.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(output_path + "allDatacite");
getProviderResult(output_path, datacite, "Zenodo");
getProviderResult(output_path, datacite, "Figshare");
getProviderResult(output_path, datacite, "Dryad");
@ -88,17 +78,4 @@ public class PrepareDataciteSpark {
.json(output_path + provider);
}
private static Dataset<Result> selectResult(SparkSession spark, String result_path, String output_path) {
Dataset<Result> res = Utils
.readPath(
spark, result_path, Result.class)
.filter(
(FilterFunction<Result>) r -> r.getId().startsWith("50|datacite"));
res.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(output_path + "Datacite");
return res;
}
}

View File

@ -41,7 +41,7 @@ public class PrepareResultAllTheRestSpark {
final String outputPath = parser.get("outputPath");
final String instRepoPath = parser.get("instrepoPath");
final String instRepoPath = parser.get("instRepoPath");
final String crossrefPath = parser.get("crossrefPath");
final String datacitePath = parser.get("datacitePath");
@ -75,22 +75,13 @@ public class PrepareResultAllTheRestSpark {
Dataset<Result> result = Utils.readPath(spark, result_path, Result.class);
Dataset<Result> inst_repo = Utils
.readPath(spark, inst_repo_path + "graphResultPubsWithOrcidInstRepos", Result.class)
.union(Utils.readPath(spark, inst_repo_path + "graphResultDatsWithOrcidInstRepos", Result.class))
.union(Utils.readPath(spark, inst_repo_path + "graphResultSwWithOrcidInstRepos", Result.class))
.union(Utils.readPath(spark, inst_repo_path + "graphResultOtherWithOrcidInstRepos", Result.class));
.readPath(spark, inst_repo_path, Result.class);
Dataset<Result> datacite = Utils
.readPath(spark, datacite_path + "datacitePubsWithOrcid", Result.class)
.union(Utils.readPath(spark, datacite_path + "dataciteDatsWithOrcid", Result.class))
.union(Utils.readPath(spark, datacite_path + "dataciteSwWithOrcid", Result.class))
.union(Utils.readPath(spark, datacite_path + "dataciteOtherWithOrcid", Result.class));
.readPath(spark, datacite_path, Result.class);
Dataset<Result> crossref = Utils
.readPath(spark, crossref_path + "crossrefPubsWithOrcid", Result.class)
.union(Utils.readPath(spark, crossref_path + "crossrefDatsWithOrcid", Result.class))
.union(Utils.readPath(spark, crossref_path + "crossrefSwWithOrcid", Result.class))
.union(Utils.readPath(spark, crossref_path + "crossrefOtherWithOrcid", Result.class));
.readPath(spark, crossref_path, Result.class);
Dataset<Result> union_dataset = inst_repo.union(datacite).union(crossref);

View File

@ -50,38 +50,16 @@ public class PrepareResultFromInstRepo {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
exec(spark, inputPath, outputPath, datasourcePath);
selectResultFromInstRepo(spark, inputPath, outputPath, datasourcePath);
});
}
private static void exec(SparkSession spark, String input_path, String output_path, String result_path)
throws Exception {
selectResultFromInstRepo(
spark, result_path + "graphResultPubsWithOrcid",
output_path + "graphResultPubsWithOrcidInstRepos", input_path);
selectResultFromInstRepo(
spark, result_path + "graphResultDatsWithOrcid",
output_path + "graphResultDatsWithOrcidInstRepos", input_path);
selectResultFromInstRepo(
spark, result_path + "graphResultSwWithOrcid",
output_path + "graphResultSwWithOrcidInstRepos", input_path);
selectResultFromInstRepo(
spark, result_path + "graphResultOtherWithOrcid",
output_path + "graphResultOtherWithOrcidInstRepos", input_path);
}
private static void selectResultFromInstRepo(SparkSession spark, String result_path, String output_path,
String input_path) {
Dataset<Datasource> datasource = Utils.readPath(spark, input_path, Datasource.class);
private static void selectResultFromInstRepo(SparkSession spark, String inputPath, String output_path,
String datasourcePath) {
Dataset<Datasource> datasource = Utils.readPath(spark, datasourcePath, Datasource.class);
Dataset<Result> res = Utils
.readPath(
spark, result_path, Result.class)
spark, inputPath, Result.class)
.filter(
(FilterFunction<Result>) r -> !r.getId().startsWith("50|doiboost")
&& !r.getId().startsWith("50|scholix")

View File

@ -97,9 +97,9 @@ public class PrepareResultSpark {
pid -> KeyValue
.newInstance(pid.getQualifier().getClassid(), pid.getValue()))
.collect(Collectors.toList()));
r.setName(a.getName().toLowerCase());
r.setSurname(a.getSurname().toLowerCase());
r.setFullname(a.getFullname().toLowerCase());
r.setName(a.getName());
r.setSurname(a.getSurname());
r.setFullname(a.getFullname());
r.setOid(apid.getValue());
reslist.add(r);
}
@ -116,6 +116,4 @@ public class PrepareResultSpark {
}
}

View File

@ -3,7 +3,9 @@ package eu.dnetlib.dhp.ircdl_extention;
import java.io.Serializable;
import java.text.Normalizer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@ -64,12 +66,15 @@ public class Utils implements Serializable {
.sorted()
.collect(Collectors.toList());
Orcid or = input._2();
return checkContains(res, getList(Arrays.asList(new String[] {
or.getName(), or.getSurname()
}))
.stream()
.sorted()
.collect(Collectors.toList())) ||
List<String> tmp = new ArrayList<>();
Collections.addAll(tmp, or.getName().split(" "));
Collections.addAll(tmp, or.getSurname().split(" "));
return checkContains(
res, getList(tmp)
.stream()
.sorted()
.collect(Collectors.toList()))
||
checkContains(
res, getList(Arrays.asList(or.getCreditname().split(" ")))
.stream()
@ -127,7 +132,7 @@ public class Utils implements Serializable {
boolean found = false;
while (i < lst2.size()) {
String wordlist = lst2.get(i);
if (word == wordlist) {
if (word.equals(wordlist)) {
index = i + 1;
i = lst2.size();
found = true;

View File

@ -58,7 +58,10 @@ public class Result implements Serializable {
}
public void setName(String name) {
this.name = name;
if (name != null)
this.name = name.toLowerCase();
else
this.name = new String();
}
public String getSurname() {
@ -66,7 +69,10 @@ public class Result implements Serializable {
}
public void setSurname(String surname) {
this.surname = surname;
if (surname != null)
this.surname = surname.toLowerCase();
else
this.surname = new String();
}
public String getFullname() {
@ -74,7 +80,10 @@ public class Result implements Serializable {
}
public void setFullname(String fullname) {
this.fullname = fullname;
if (fullname != null)
this.fullname = fullname.toLowerCase();
else
this.fullname = new String();
}
public String getOid() {

View File

@ -0,0 +1,35 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "ip",
"paramLongName": "inputPath",
"paramDescription": "the URL from where to get the programme file",
"paramRequired": true
},
{
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
},{
"paramName": "ir",
"paramLongName": "instRepoPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
},{
"paramName": "dp",
"paramLongName": "datacitePath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
},{
"paramName": "cp",
"paramLongName": "crossrefPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
}
]

View File

@ -0,0 +1,26 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "ip",
"paramLongName": "inputPath",
"paramDescription": "the URL from where to get the programme file",
"paramRequired": true
},
{
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
},
{
"paramName": "dp",
"paramLongName": "datasourcePath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
}
]

View File

@ -0,0 +1,20 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "ip",
"paramLongName": "inputPath",
"paramDescription": "the URL from where to get the programme file",
"paramRequired": true
},
{
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
}
]

View File

@ -0,0 +1,26 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "ip",
"paramLongName": "inputPath",
"paramDescription": "the URL from where to get the programme file",
"paramRequired": true
},
{
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
},
{
"paramName": "rc",
"paramLongName": "resultClass",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
}
]

View File

@ -0,0 +1,26 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "op",
"paramLongName": "orcidPath",
"paramDescription": "the URL from where to get the programme file",
"paramRequired": true
},
{
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
},
{
"paramName": "ip",
"paramLongName": "inputPath",
"paramDescription": "thepath of the new ActionSet",
"paramRequired": true
}
]

View File

@ -0,0 +1,98 @@
package eu.dnetlib.dhp.ircdl_extention;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.SparkAtomicActionJob;
import eu.dnetlib.dhp.actionmanager.project.SparkUpdateProjectTest;
import eu.dnetlib.dhp.ircdl_extention.model.Orcid;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.Project;
public class NormalizeOrcidTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ClassLoader cl = eu.dnetlib.dhp.ircdl_extention.NormalizeOrcidTest.class
.getClassLoader();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(eu.dnetlib.dhp.ircdl_extention.NormalizeOrcidTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(eu.dnetlib.dhp.ircdl_extention.NormalizeOrcidTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(eu.dnetlib.dhp.ircdl_extention.NormalizeOrcidTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
// conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
// conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(NormalizeOrcidTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
public void normalizeOrcid() throws Exception {
PrepareNormalizedOrcid
.main(
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
getClass()
.getResource(
"/eu/dnetlib/dhp/ircdl_extention/orcid_original.json")
.getPath(),
"-outputPath",
workingDir.toString() + "/orcidNormalized"
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Orcid> tmp = sc
.textFile(workingDir.toString() + "/orcidNormalized")
.map(value -> OBJECT_MAPPER.readValue(value, Orcid.class));
tmp.foreach(v -> System.out.println(OBJECT_MAPPER.writeValueAsString(v)));
}
}

View File

@ -0,0 +1,5 @@
package eu.dnetlib.dhp.ircdl_extention;
public class WrongOrcidTest {
}

View File

@ -0,0 +1,29 @@
{"otherNames": [], "inception": "2017-05-22T16:38:30.236Z", "surname": "hyy37", "mode": "Direct", "creditname": "void", "orcid": "0000-0002-8748-6992", "works": false, "name": "1380"}
{"otherNames": [], "inception": "2017-05-25T12:50:48.761Z", "surname": "hyy75", "mode": "Direct", "creditname": "void", "orcid": "0000-0001-7773-1109", "works": false, "name": "2775"}
{"otherNames": [], "inception": "2017-05-28T12:07:09.154Z", "surname": "hyy13", "mode": "Direct", "creditname": "void", "orcid": "0000-0003-4728-6379", "works": false, "name": "434323"}
{"otherNames": [], "inception": "2017-08-10T07:07:23.818Z", "surname": "hyy44", "mode": "Direct", "creditname": "void", "orcid": "0000-0001-9502-3093", "works": false, "name": "58"}
{"otherNames": [], "inception": "2017-08-10T07:08:48.179Z", "surname": "hyy46", "mode": "Direct", "creditname": "void", "orcid": "0000-0003-2933-0057", "works": false, "name": "60"}
{"otherNames": ["pang x y", "pang xueyong"], "inception": "2014-10-13T03:26:21.741Z", "surname": "?", "mode": "API", "creditname": "void", "orcid": "0000-0002-7397-5824", "works": true, "name": "??"}
{"otherNames": [], "inception": "2019-08-27T07:55:06.340Z", "surname": "therasa alphonsa", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0001-7205-6036", "works": false, "name": "a"}
{"otherNames": ["minto"], "inception": "2020-08-02T06:33:18.620Z", "surname": "karim", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0001-6111-6742", "works": false, "name": "a k mohammad fazlul"}
{"otherNames": [], "inception": "2014-05-01T09:13:11.783Z", "surname": "al-sammak", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0001-6646-4295", "works": false, "name": "a-imam"}
{"otherNames": [], "inception": "2019-12-06T12:53:04.045Z", "surname": "hassan", "mode": "Direct", "creditname": "void", "orcid": "0000-0003-2957-4641", "works": false, "name": "a-s.u."}
{"otherNames": [], "inception": "2020-07-28T12:29:26.453Z", "surname": "ajakh", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0002-1081-8426", "works": false, "name": "a."}
{"otherNames": [], "inception": "2017-01-10T12:35:05.016Z", "surname": "antol\u00ednez", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0002-5451-3421", "works": false, "name": "a. (ana)"}
{"otherNames": [], "inception": "2018-08-20T05:00:15.964Z", "surname": "mahmudi", "mode": "Direct", "creditname": "void", "orcid": "0000-0003-3187-941X", "works": false, "name": "a. aviv"}
{"otherNames": [], "inception": "2017-05-13T01:03:58.949Z", "surname": "akanmu", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0001-6223-5428", "works": false, "name": "a. c."}
{"otherNames": [], "inception": "2018-01-20T02:58:05.199Z", "surname": "inci", "mode": "Direct", "creditname": "void", "orcid": "0000-0002-0427-9745", "works": true, "name": "a. can"}
{"otherNames": ["a. kim ryan"], "inception": "2014-10-24T23:06:43.544Z", "surname": "hayes", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0002-2055-8269", "works": true, "name": "a. kim"}
{"otherNames": [], "inception": "2017-08-10T13:38:29.172Z", "surname": "bahadir", "mode": "Direct", "creditname": "void", "orcid": "0000-0002-4045-0001", "works": false, "name": "a. tugba"}
{"otherNames": [], "inception": "2018-08-29T07:49:31.093Z", "surname": "rayna", "mode": "Direct", "creditname": "void", "orcid": "0000-0002-7916-2031", "works": false, "name": "a.brite"}
{"otherNames": [], "inception": "2014-07-12T08:02:39.568Z", "surname": "kalyani", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0003-2649-7126", "works": false, "name": "a.grace"}
{"otherNames": [], "inception": "2018-07-21T12:00:22.042Z", "surname": "ahmed", "mode": "Direct", "creditname": "void", "orcid": "0000-0003-0777-5848", "works": false, "name": "a.i. mahbub uddin"}
{"otherNames": [], "inception": "2018-04-11T13:58:53.355Z", "surname": "a.kathirvel murugan", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0003-2298-6301", "works": false, "name": "a.kathirvel murugan"}
{"otherNames": [], "inception": "2017-08-31T11:35:48.559Z", "surname": "dar", "mode": "Direct", "creditname": "void", "orcid": "0000-0001-8781-6309", "works": false, "name": "a.rashid"}
{"otherNames": [], "inception": "2014-08-26T00:25:30.968Z", "surname": "sayem", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0003-2461-4667", "works": false, "name": "a.s.m."}
{"otherNames": [], "inception": "2019-10-03T01:27:08.212Z", "surname": "conte", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0003-2862-6139", "works": false, "name": "aaron"}
{"otherNames": [], "inception": "2020-03-16T09:37:10.610Z", "surname": "rashmi", "mode": "Direct", "creditname": "void", "orcid": "0000-0003-4754-5465", "works": false, "name": "aarthi rashmi b"}
{"otherNames": [], "inception": "2017-02-28T19:01:59.146Z", "surname": "bhaskar", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0002-5794-1165", "works": false, "name": "aastha"}
{"otherNames": [], "inception": "2020-04-07T18:10:50.922Z", "surname": "belhabib", "mode": "Direct", "creditname": "void", "orcid": "0000-0001-6086-0588", "works": false, "name": "abdelfettah"}
{"otherNames": [], "inception": "2019-01-13T21:50:51.923Z", "surname": "laamani", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0003-2055-2593", "works": false, "name": "abdellatif"}
{"otherNames": ["fákē", "miñhō"], "inception": "2019-01-13T21:50:51.923Z", "surname": "laamani", "mode": "Member-referred", "creditname": "void", "orcid": "0000-0003-2055-2593", "works": false, "name": "abdellatif"}

View File

@ -0,0 +1,88 @@
package eu.dnetlib.doiboost.crossref;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.BufferedOutputStream;
import java.net.URI;
import java.util.Optional;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.util.LongAccumulator;
import org.mortbay.log.Log;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class SparkExtractCrossrefRecords {
public static void main(String[] args) throws Exception {
String hdfsServerUri;
String workingPath;
String crossrefFileNameTarGz;
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkExtractCrossrefRecords.class
.getResourceAsStream(
"/eu/dnetlib/dhp/doiboost/crossref_dump_reader.json")));
parser.parseArgument(args);
hdfsServerUri = parser.get("hdfsServerUri");
workingPath = parser.get("workingPath");
crossrefFileNameTarGz = parser.get("crossrefFileNameTarGz");
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
SparkConf sparkConf = new SparkConf();
runWithSparkSession(
sparkConf,
isSparkSessionManaged,
spark -> {
LongAccumulator filesCounter = spark
.sparkContext()
.longAccumulator("filesCounter");
Path hdfsreadpath = new Path(hdfsServerUri.concat(workingPath).concat(crossrefFileNameTarGz));
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsServerUri.concat(workingPath));
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
FileSystem fs = FileSystem.get(URI.create(hdfsServerUri.concat(workingPath)), conf);
FSDataInputStream crossrefFileStream = fs.open(hdfsreadpath);
try (TarArchiveInputStream tais = new TarArchiveInputStream(
new GzipCompressorInputStream(crossrefFileStream))) {
TarArchiveEntry entry = null;
while ((entry = tais.getNextTarEntry()) != null) {
if (entry.isDirectory()) {
} else {
FSDataOutputStream out = fs
.create(new Path(workingPath.concat("filess/").concat(entry.getName())));
GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out));
try {
byte[] b = new byte[1024];
int numBytes = 0;
while ((numBytes = tais.read(b)) != -1) {
gzipOs.write(b, 0, numBytes);
}
filesCounter.add(1);
} finally {
IOUtils.closeQuietly(out);
IOUtils.closeQuietly(gzipOs);
}
}
}
}
Log.info("Crossref dump reading completed");
Log.info("Files counter: " + filesCounter.value());
});
}
}

View File

@ -0,0 +1,42 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
</property>
</configuration>

View File

@ -0,0 +1,69 @@
<workflow-app name="read Crossref dump from HDFS" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>workingPath</name>
<description>the working dir base path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="SparkReadCrossRefDump"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ReadCrossRefDump">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.doiboost.crossref.CrossrefDumpReader</main-class>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-w</arg><arg>/data/doiboost/crossref/</arg>
<arg>-f</arg><arg>crossref.tar.gz</arg>
<arg>-o</arg><arg>/user/enrico.ottonello/crossref/</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="SparkReadCrossRefDump">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>SparkReadCrossRefDump</name>
<class>eu.dnetlib.doiboost.crossref.SparkExtractCrossrefRecords</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=20
--executor-memory=6G
--driver-memory=7G
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
</spark-opts>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-w</arg><arg>/data/doiboost/crossref/</arg>
<arg>-f</arg><arg>crossref.tar.gz</arg>
<arg>-o</arg><arg>/data/doiboost/crossref/</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>