diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java index 40cd212daa..7f715fa7d0 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java @@ -1,14 +1,21 @@ package eu.dnetlib.doiboost.orcidnodoi; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.IOException; -import java.util.Objects; -import java.util.Optional; - +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.doiboost.orcid.json.JsonHelper; +import eu.dnetlib.doiboost.orcid.model.AuthorData; +import eu.dnetlib.doiboost.orcidnodoi.model.WorkDataNoDoi; +import eu.dnetlib.doiboost.orcidnodoi.oaf.PublicationToOaf; +import eu.dnetlib.doiboost.orcidnodoi.similarity.AuthorMatcher; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -16,24 +23,17 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonParser; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.doiboost.orcid.json.JsonHelper; -import eu.dnetlib.doiboost.orcid.model.AuthorData; -import eu.dnetlib.doiboost.orcidnodoi.model.WorkDataNoDoi; -import eu.dnetlib.doiboost.orcidnodoi.oaf.PublicationToOaf; -import eu.dnetlib.doiboost.orcidnodoi.similarity.AuthorMatcher; import scala.Tuple2; +import java.io.IOException; +import java.util.Objects; +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + /** * This spark job generates one parquet file, containing orcid publications dataset */ @@ -42,6 +42,8 @@ public class SparkGenEnrichedOrcidWorks { static Logger logger = LoggerFactory.getLogger(SparkGenEnrichedOrcidWorks.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static void main(String[] args) throws IOException, Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -58,6 +60,7 @@ public class SparkGenEnrichedOrcidWorks { final String workingPath = parser.get("workingPath"); final String outputEnrichedWorksPath = parser.get("outputEnrichedWorksPath"); final String outputWorksPath = parser.get("outputWorksPath"); + final String hdfsServerUri = parser.get("hdfsServerUri"); SparkConf conf = new SparkConf(); runWithSparkSession( @@ -96,7 +99,6 @@ public class SparkGenEnrichedOrcidWorks { Encoders.tuple(Encoders.STRING(), Encoders.STRING())) .filter(Objects::nonNull) .toJavaRDD(); - enrichedWorksRDD.saveAsTextFile(workingPath + "enrichedWorksText/"); logger.info("Enriched works RDD ready."); final LongAccumulator parsedPublications = spark.sparkContext().longAccumulator("parsedPublications"); @@ -124,15 +126,17 @@ public class SparkGenEnrichedOrcidWorks { }) .filter(p -> p != null); - Dataset publicationDataset = spark - .createDataset( - oafPublicationRDD.repartition(1).rdd(), - Encoders.bean(Publication.class)); - publicationDataset - .write() - .format("parquet") - .mode(SaveMode.Overwrite) - .save(workingPath + outputEnrichedWorksPath); + oafPublicationRDD + .mapToPair( + p -> new Tuple2<>(p.getClass().toString(), + OBJECT_MAPPER.writeValueAsString(new AtomicAction<>(Publication.class, (Publication) p)))) + .mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2()))) + .saveAsNewAPIHadoopFile( + workingPath.concat(outputEnrichedWorksPath), + Text.class, + Text.class, + SequenceFileOutputFormat.class, + sc.hadoopConfiguration()); logger.info("parsedPublications: " + parsedPublications.value().toString()); logger.info("enrichedPublications: " + enrichedPublications.value().toString());