From 869a53040e17cf03b92c4188d250e11c161e613b Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Thu, 21 May 2020 00:41:21 +0200 Subject: [PATCH] save to text file format --- .../orcid/SparkGenerateDoiAuthorList.java | 90 +++++++++++++++---- .../doiboost/orcid/model/AuthorData.java | 4 +- .../doiboost/orcid/model/WorkData.java | 4 +- .../oozie_app/workflow.xml | 2 +- 4 files changed, 78 insertions(+), 22 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenerateDoiAuthorList.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenerateDoiAuthorList.java index 2f476a73f9..b4239bba29 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenerateDoiAuthorList.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenerateDoiAuthorList.java @@ -2,43 +2,38 @@ package eu.dnetlib.doiboost.orcid; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import static org.apache.spark.sql.functions.col; -import static org.apache.spark.sql.functions.collect_list; import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Date; +import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.RelationalGroupedDataset; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.TypedColumn; -import org.apache.spark.util.LongAccumulator; -import org.mortbay.log.Log; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.esotericsoftware.minlog.Log; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.JsonElement; import com.google.gson.JsonParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation; import eu.dnetlib.doiboost.orcid.model.AuthorData; -import eu.dnetlib.doiboost.orcid.model.DownloadedRecordData; import eu.dnetlib.doiboost.orcid.model.WorkData; import scala.Tuple2; @@ -86,13 +81,70 @@ public class SparkGenerateDoiAuthorList { activitiesRDD.map(seq -> loadWorkFromJson(seq._1(), seq._2())).rdd(), Encoders.bean(WorkData.class)); - RelationalGroupedDataset group = activitiesDataset - .where("oid='0000-0002-9710-779X'") + Function, Tuple2>> toAuthorListFunction = data -> { + try { + String doi = data._1(); + if (doi == null) { + return null; + } + AuthorData author = data._2(); + if (author == null) { + return null; + } + List toAuthorList = Arrays.asList(author); + return new Tuple2<>(doi, toAuthorList); + } catch (Exception e) { + Log.error("toAuthorListFunction ERROR", e); + return null; + } + }; + + JavaRDD>> doisRDD = activitiesDataset .joinWith( summariesDataset, activitiesDataset.col("oid").equalTo(summariesDataset.col("oid")), "inner") - .groupBy(col("doi")); + .map( + (MapFunction, Tuple2>) value -> { + WorkData w = value._1; + AuthorData a = value._2; + return new Tuple2<>(w.getDoi(), a); + }, + Encoders.tuple(Encoders.STRING(), Encoders.bean(AuthorData.class))) + .filter(Objects::nonNull) + .toJavaRDD() + .map(toAuthorListFunction); + JavaPairRDD + .fromJavaRDD(doisRDD) + .reduceByKey((d1, d2) -> { + try { + if (d1 != null && d2 != null) { + Stream mergedStream = Stream + .concat( + d1.stream(), + d2.stream()); + List mergedAuthors = mergedStream.collect(Collectors.toList()); + return mergedAuthors; + } + if (d1 != null) { + return d1; + } + if (d2 != null) { + return d2; + } + } catch (Exception e) { + Log.error("mergeAuthorsFunction ERROR", e); + return null; + } + return null; + }) + .mapToPair( + s -> { + ObjectMapper mapper = new ObjectMapper(); + return new Tuple2<>(s._1(), mapper.writeValueAsString(s._2())); + }) + .repartition(10) + .saveAsTextFile(workingPath + outputDoiAuthorListPath); }); } @@ -119,7 +171,7 @@ public class SparkGenerateDoiAuthorList { if (jElement.getAsJsonObject().has(property)) { JsonElement name = null; name = jElement.getAsJsonObject().get(property); - if (name != null && name.isJsonObject()) { + if (name != null && !name.isJsonNull()) { return name.getAsString(); } } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/model/AuthorData.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/model/AuthorData.java index 1e1ef5c1de..29551c347a 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/model/AuthorData.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/model/AuthorData.java @@ -1,7 +1,9 @@ package eu.dnetlib.doiboost.orcid.model; -public class AuthorData { +import java.io.Serializable; + +public class AuthorData implements Serializable { private String oid; private String name; diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/model/WorkData.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/model/WorkData.java index edd565686c..db1728a9b4 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/model/WorkData.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/model/WorkData.java @@ -1,7 +1,9 @@ package eu.dnetlib.doiboost.orcid.model; -public class WorkData { +import java.io.Serializable; + +public class WorkData implements Serializable { private String oid; private String doi; diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/workflow.xml index ed4f920479..21d092a83e 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/workflow.xml @@ -42,7 +42,7 @@ Gen_Doi_Author_List eu.dnetlib.doiboost.orcid.SparkGenerateDoiAuthorList dhp-doiboost-1.2.1-SNAPSHOT.jar - --num-executors 5 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --num-executors 10 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} -w${workingPath}/ -odoi_author_list/