diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala index 11f9828db4..462434bbbe 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala @@ -7,6 +7,7 @@ import eu.dnetlib.dhp.schema.oaf.{Organization, Publication, Relation, Dataset = import eu.dnetlib.doiboost.mag.ConversionUtil import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf +import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.functions.col import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} @@ -15,6 +16,9 @@ import scala.collection.JavaConverters._ object SparkGenerateDoiBoost { + + + def main(args: Array[String]): Unit = { val logger: Logger = LoggerFactory.getLogger(getClass) @@ -33,6 +37,54 @@ object SparkGenerateDoiBoost { val hostedByMapPath = parser.get("hostedByMapPath") val workingDirPath = parser.get("workingPath") + val crossrefAggregator = new Aggregator[(String, Publication), Publication, Publication] with Serializable { + override def zero: Publication = new Publication + + override def reduce(b: Publication, a: (String, Publication)): Publication = { + + if (b == null) { + if (a != null && a._2 != null) { + a._2.setId(a._1) + return a._2 + } + } + else { + if (a != null && a._2 != null) { + b.mergeFrom(a._2) + b.setId(a._1) + val authors =AuthorMerger.mergeAuthor(b.getAuthor, a._2.getAuthor) + b.setAuthor(authors) + return b + } + } + new Publication + } + + override def merge(b1: Publication, b2: Publication): Publication = { + if (b1 == null) { + if (b2 != null) + return b2 + } + else { + if (b2 != null ) { + b1.mergeFrom(b2) + val authors =AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor) + b1.setAuthor(authors) + if (b2.getId!= null && b2.getId.nonEmpty) + b1.setId(b2.getId) + return b1 + } + } + new Publication + } + + override def finish(reduction: Publication): Publication = reduction + + override def bufferEncoder: Encoder[Publication] = Encoders.kryo[Publication] + + override def outputEncoder: Encoder[Publication] = Encoders.kryo[Publication] + } + implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization] @@ -77,6 +129,10 @@ object SparkGenerateDoiBoost { doiBoostPublication.joinWith(hostedByDataset, doiBoostPublication("_1").equalTo(hostedByDataset("_1")), "left") .map(DoiBoostMappingUtil.fixPublication) + .map(p => (p.getId, p)) + .groupByKey(_._1) + .agg(crossrefAggregator.toColumn) + .map(p => p._2) .write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationFiltered") val affiliationPath = parser.get("affiliationPath") @@ -139,6 +195,6 @@ object SparkGenerateDoiBoost { else null }).filter(o=> o!=null).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostOrganization") - } + } -} +} \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala index 0036459bfe..01bf9dd62f 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala @@ -20,7 +20,7 @@ object SparkMapDumpIntoOAF { val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass) val conf: SparkConf = new SparkConf() - val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json"))) + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_crossref_to_oaf_params.json"))) parser.parseArgument(args) val spark: SparkSession = SparkSession diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala index 9ac6a08381..83205e3451 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala @@ -18,7 +18,7 @@ object SparkMapUnpayWallToOAF { val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass) val conf: SparkConf = new SparkConf() - val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json"))) + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_crossref_to_oaf_params.json"))) parser.parseArgument(args) val spark: SparkSession = SparkSession diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_crossref_to_oaf_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_crossref_to_oaf_params.json new file mode 100644 index 0000000000..297452465a --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_crossref_to_oaf_params.json @@ -0,0 +1,6 @@ +[ + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source dir path", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the target dir path", "paramRequired": true}, + {"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true} + +] \ No newline at end of file