forked from D-Net/dnet-hadoop
implemented new phase in doiboost to make the dataset Distinct by ID
This commit is contained in:
parent
da9d6f3887
commit
d9a0bbda7b
|
@ -7,6 +7,7 @@ import eu.dnetlib.dhp.schema.oaf.{Organization, Publication, Relation, Dataset =
|
||||||
import eu.dnetlib.doiboost.mag.ConversionUtil
|
import eu.dnetlib.doiboost.mag.ConversionUtil
|
||||||
import org.apache.commons.io.IOUtils
|
import org.apache.commons.io.IOUtils
|
||||||
import org.apache.spark.SparkConf
|
import org.apache.spark.SparkConf
|
||||||
|
import org.apache.spark.sql.expressions.Aggregator
|
||||||
import org.apache.spark.sql.functions.col
|
import org.apache.spark.sql.functions.col
|
||||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
||||||
import org.slf4j.{Logger, LoggerFactory}
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
@ -15,6 +16,9 @@ import scala.collection.JavaConverters._
|
||||||
|
|
||||||
object SparkGenerateDoiBoost {
|
object SparkGenerateDoiBoost {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def main(args: Array[String]): Unit = {
|
def main(args: Array[String]): Unit = {
|
||||||
|
|
||||||
val logger: Logger = LoggerFactory.getLogger(getClass)
|
val logger: Logger = LoggerFactory.getLogger(getClass)
|
||||||
|
@ -33,6 +37,54 @@ object SparkGenerateDoiBoost {
|
||||||
val hostedByMapPath = parser.get("hostedByMapPath")
|
val hostedByMapPath = parser.get("hostedByMapPath")
|
||||||
val workingDirPath = parser.get("workingPath")
|
val workingDirPath = parser.get("workingPath")
|
||||||
|
|
||||||
|
val crossrefAggregator = new Aggregator[(String, Publication), Publication, Publication] with Serializable {
|
||||||
|
override def zero: Publication = new Publication
|
||||||
|
|
||||||
|
override def reduce(b: Publication, a: (String, Publication)): Publication = {
|
||||||
|
|
||||||
|
if (b == null) {
|
||||||
|
if (a != null && a._2 != null) {
|
||||||
|
a._2.setId(a._1)
|
||||||
|
return a._2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
if (a != null && a._2 != null) {
|
||||||
|
b.mergeFrom(a._2)
|
||||||
|
b.setId(a._1)
|
||||||
|
val authors =AuthorMerger.mergeAuthor(b.getAuthor, a._2.getAuthor)
|
||||||
|
b.setAuthor(authors)
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
new Publication
|
||||||
|
}
|
||||||
|
|
||||||
|
override def merge(b1: Publication, b2: Publication): Publication = {
|
||||||
|
if (b1 == null) {
|
||||||
|
if (b2 != null)
|
||||||
|
return b2
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
if (b2 != null ) {
|
||||||
|
b1.mergeFrom(b2)
|
||||||
|
val authors =AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor)
|
||||||
|
b1.setAuthor(authors)
|
||||||
|
if (b2.getId!= null && b2.getId.nonEmpty)
|
||||||
|
b1.setId(b2.getId)
|
||||||
|
return b1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
new Publication
|
||||||
|
}
|
||||||
|
|
||||||
|
override def finish(reduction: Publication): Publication = reduction
|
||||||
|
|
||||||
|
override def bufferEncoder: Encoder[Publication] = Encoders.kryo[Publication]
|
||||||
|
|
||||||
|
override def outputEncoder: Encoder[Publication] = Encoders.kryo[Publication]
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
|
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
|
||||||
implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization]
|
implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization]
|
||||||
|
@ -77,6 +129,10 @@ object SparkGenerateDoiBoost {
|
||||||
|
|
||||||
doiBoostPublication.joinWith(hostedByDataset, doiBoostPublication("_1").equalTo(hostedByDataset("_1")), "left")
|
doiBoostPublication.joinWith(hostedByDataset, doiBoostPublication("_1").equalTo(hostedByDataset("_1")), "left")
|
||||||
.map(DoiBoostMappingUtil.fixPublication)
|
.map(DoiBoostMappingUtil.fixPublication)
|
||||||
|
.map(p => (p.getId, p))
|
||||||
|
.groupByKey(_._1)
|
||||||
|
.agg(crossrefAggregator.toColumn)
|
||||||
|
.map(p => p._2)
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationFiltered")
|
.write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationFiltered")
|
||||||
|
|
||||||
val affiliationPath = parser.get("affiliationPath")
|
val affiliationPath = parser.get("affiliationPath")
|
||||||
|
@ -139,6 +195,6 @@ object SparkGenerateDoiBoost {
|
||||||
else
|
else
|
||||||
null
|
null
|
||||||
}).filter(o=> o!=null).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostOrganization")
|
}).filter(o=> o!=null).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostOrganization")
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -20,7 +20,7 @@ object SparkMapDumpIntoOAF {
|
||||||
|
|
||||||
val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
|
val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
|
||||||
val conf: SparkConf = new SparkConf()
|
val conf: SparkConf = new SparkConf()
|
||||||
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json")))
|
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_crossref_to_oaf_params.json")))
|
||||||
parser.parseArgument(args)
|
parser.parseArgument(args)
|
||||||
val spark: SparkSession =
|
val spark: SparkSession =
|
||||||
SparkSession
|
SparkSession
|
||||||
|
|
|
@ -18,7 +18,7 @@ object SparkMapUnpayWallToOAF {
|
||||||
|
|
||||||
val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
|
val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
|
||||||
val conf: SparkConf = new SparkConf()
|
val conf: SparkConf = new SparkConf()
|
||||||
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json")))
|
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_crossref_to_oaf_params.json")))
|
||||||
parser.parseArgument(args)
|
parser.parseArgument(args)
|
||||||
val spark: SparkSession =
|
val spark: SparkSession =
|
||||||
SparkSession
|
SparkSession
|
||||||
|
|
|
@ -0,0 +1,6 @@
|
||||||
|
[
|
||||||
|
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source dir path", "paramRequired": true},
|
||||||
|
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the target dir path", "paramRequired": true},
|
||||||
|
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
|
||||||
|
|
||||||
|
]
|
Loading…
Reference in New Issue