forked from D-Net/dnet-hadoop
implemented filtering step
This commit is contained in:
parent
244f6e50cf
commit
2408083566
|
@ -82,10 +82,11 @@ public class AuthorMerger {
|
||||||
.map(ba -> new Tuple2<>(sim(ba, a._2()), ba))
|
.map(ba -> new Tuple2<>(sim(ba, a._2()), ba))
|
||||||
.max(Comparator.comparing(Tuple2::_1));
|
.max(Comparator.comparing(Tuple2::_1));
|
||||||
|
|
||||||
if(simAuthor.isPresent()) {
|
if (simAuthor.isPresent()) {
|
||||||
double th = THRESHOLD;
|
double th = THRESHOLD;
|
||||||
//increase the threshold if the surname is too short
|
// increase the threshold if the surname is too short
|
||||||
if (simAuthor.get()._2().getSurname() != null && simAuthor.get()._2().getSurname().length()<=3)
|
if (simAuthor.get()._2().getSurname() != null
|
||||||
|
&& simAuthor.get()._2().getSurname().length() <= 3)
|
||||||
th = 0.99;
|
th = 0.99;
|
||||||
|
|
||||||
if (simAuthor.get()._1() > th) {
|
if (simAuthor.get()._1() > th) {
|
||||||
|
@ -100,8 +101,9 @@ public class AuthorMerger {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String pidToComparableString(StructuredProperty pid) {
|
public static String pidToComparableString(StructuredProperty pid) {
|
||||||
return (pid.getQualifier() != null ?
|
return (pid.getQualifier() != null
|
||||||
pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase() : "" : "")
|
? pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase() : ""
|
||||||
|
: "")
|
||||||
+ (pid.getValue() != null ? pid.getValue().toLowerCase() : "");
|
+ (pid.getValue() != null ? pid.getValue().toLowerCase() : "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,14 +125,13 @@ public class AuthorMerger {
|
||||||
final Person pa = parse(a);
|
final Person pa = parse(a);
|
||||||
final Person pb = parse(b);
|
final Person pb = parse(b);
|
||||||
|
|
||||||
//if both are accurate (e.g. they have name and surname)
|
// if both are accurate (e.g. they have name and surname)
|
||||||
if (pa.isAccurate() & pb.isAccurate()) {
|
if (pa.isAccurate() & pb.isAccurate()) {
|
||||||
return
|
return new JaroWinkler().score(normalize(pa.getSurnameString()), normalize(pb.getSurnameString())) * 0.5
|
||||||
new JaroWinkler().score(normalize(pa.getSurnameString()), normalize(pb.getSurnameString()))*0.5
|
+ new JaroWinkler().score(normalize(pa.getNameString()), normalize(pb.getNameString())) * 0.5;
|
||||||
+ new JaroWinkler().score(normalize(pa.getNameString()), normalize(pb.getNameString()))*0.5;
|
|
||||||
} else {
|
} else {
|
||||||
return
|
return new JaroWinkler()
|
||||||
new JaroWinkler().score(normalize(pa.getNormalisedFullname()), normalize(pb.getNormalisedFullname()));
|
.score(normalize(pa.getNormalisedFullname()), normalize(pb.getNormalisedFullname()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,13 +1,22 @@
|
||||||
package eu.dnetlib.doiboost
|
package eu.dnetlib.doiboost
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.{DataInfo, Dataset, Field, KeyValue, Qualifier, Result, StructuredProperty}
|
import eu.dnetlib.dhp.schema.oaf.{DataInfo, Dataset, Field, Instance, KeyValue, Publication, Qualifier, Result, StructuredProperty}
|
||||||
import eu.dnetlib.dhp.utils.DHPUtils
|
import eu.dnetlib.dhp.utils.DHPUtils
|
||||||
|
import org.json4s
|
||||||
|
import org.json4s.DefaultFormats
|
||||||
|
import org.json4s.jackson.JsonMethods.parse
|
||||||
|
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
import scala.io.Source
|
||||||
|
|
||||||
|
|
||||||
|
case class HostedByItemType(id: String, officialName: String, issn: String, eissn: String, lissn: String, openAccess: Boolean) {}
|
||||||
|
|
||||||
object DoiBoostMappingUtil {
|
object DoiBoostMappingUtil {
|
||||||
|
|
||||||
//STATIC STRING
|
//STATIC STRING
|
||||||
val MAG = "microsoft"
|
val MAG = "microsoft"
|
||||||
val MAG_NAME= "Microsoft Academic Graph"
|
val MAG_NAME = "Microsoft Academic Graph"
|
||||||
val ORCID = "ORCID"
|
val ORCID = "ORCID"
|
||||||
val CROSSREF = "Crossref"
|
val CROSSREF = "Crossref"
|
||||||
val UNPAYWALL = "UnpayWall"
|
val UNPAYWALL = "UnpayWall"
|
||||||
|
@ -19,112 +28,234 @@ object DoiBoostMappingUtil {
|
||||||
val DNET_LANGUAGES = "dnet:languages"
|
val DNET_LANGUAGES = "dnet:languages"
|
||||||
val PID_TYPES = "dnet:pid_types"
|
val PID_TYPES = "dnet:pid_types"
|
||||||
|
|
||||||
|
val invalidName = List(",", "none none", "none, none", "none &na;", "(:null)", "test test test", "test test", "test", "&na; &na;")
|
||||||
|
|
||||||
|
|
||||||
def generateDataInfo(): DataInfo = {
|
def retrieveHostedByMap(): Map[String, HostedByItemType] = {
|
||||||
generateDataInfo("0.9")
|
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||||
|
val jsonMap = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/hbMap.json")).mkString
|
||||||
|
lazy val json: json4s.JValue = parse(jsonMap)
|
||||||
|
json.extract[Map[String, HostedByItemType]]
|
||||||
}
|
}
|
||||||
|
|
||||||
def generateDataInfo(trust:String): DataInfo = {
|
def retrieveHostedByItem(issn: String, eissn: String, lissn: String, hostedByMap: Map[String, HostedByItemType]): HostedByItemType = {
|
||||||
|
if (issn != null && issn.nonEmpty && hostedByMap.contains(issn))
|
||||||
|
return hostedByMap(issn)
|
||||||
|
|
||||||
|
if (eissn != null && eissn.nonEmpty && hostedByMap.contains(eissn))
|
||||||
|
return hostedByMap(eissn)
|
||||||
|
|
||||||
|
if (lissn != null && lissn.nonEmpty && hostedByMap.contains(lissn))
|
||||||
|
return hostedByMap(lissn)
|
||||||
|
|
||||||
|
null
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def fixPublication(publication: Publication, hostedByMap: Map[String, HostedByItemType]): Publication = {
|
||||||
|
if (publication.getJournal == null)
|
||||||
|
return publication
|
||||||
|
|
||||||
|
val issn = publication.getJournal.getIssnPrinted
|
||||||
|
val eissn = publication.getJournal.getIssnOnline
|
||||||
|
val lissn = publication.getJournal.getIssnLinking
|
||||||
|
|
||||||
|
val item = retrieveHostedByItem(issn, eissn, lissn, hostedByMap)
|
||||||
|
if (item!= null) {
|
||||||
|
val l = publication.getInstance().asScala.map(i =>{
|
||||||
|
val hb = new KeyValue
|
||||||
|
hb.setValue (item.officialName)
|
||||||
|
hb.setKey (s"10|${item.id}" )
|
||||||
|
i.setHostedby(hb)
|
||||||
|
if(item.openAccess)
|
||||||
|
i.setAccessright(createQualifier("Open", "dnet:access_modes"))
|
||||||
|
i
|
||||||
|
}).asJava
|
||||||
|
|
||||||
|
publication.setInstance(l)
|
||||||
|
}
|
||||||
|
publication
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def generateDataInfo (): DataInfo = {
|
||||||
|
generateDataInfo ("0.9")
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def filterPublication (publication: Publication): Boolean = {
|
||||||
|
|
||||||
|
//Case empty publication
|
||||||
|
if (publication == null)
|
||||||
|
return false
|
||||||
|
|
||||||
|
//Case publication with no title
|
||||||
|
if (publication.getTitle == null || publication.getTitle.size == 0)
|
||||||
|
return false
|
||||||
|
|
||||||
|
|
||||||
|
val s = publication.getTitle.asScala.count (p => p.getValue != null
|
||||||
|
&& p.getValue.nonEmpty && ! p.getValue.equalsIgnoreCase ("[NO TITLE AVAILABLE]") )
|
||||||
|
|
||||||
|
if (s == 0)
|
||||||
|
return false
|
||||||
|
|
||||||
|
// fixes #4360 (test publisher)
|
||||||
|
val publisher = if (publication.getPublisher != null) publication.getPublisher.getValue else null
|
||||||
|
|
||||||
|
if (publisher != null && (publisher.equalsIgnoreCase ("Test accounts") || publisher.equalsIgnoreCase ("CrossRef Test Account") ) ) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Publication with no Author
|
||||||
|
if (publication.getAuthor == null || publication.getAuthor.size () == 0)
|
||||||
|
return false
|
||||||
|
|
||||||
|
|
||||||
|
//filter invalid author
|
||||||
|
val authors = publication.getAuthor.asScala.map (s => {
|
||||||
|
if (s.getFullname.nonEmpty) {
|
||||||
|
s.getFullname
|
||||||
|
}
|
||||||
|
else
|
||||||
|
s"${
|
||||||
|
s.getName
|
||||||
|
} ${
|
||||||
|
s.getSurname
|
||||||
|
}"
|
||||||
|
})
|
||||||
|
|
||||||
|
val c = authors.count (isValidAuthorName)
|
||||||
|
if (c == 0)
|
||||||
|
return false
|
||||||
|
|
||||||
|
// fixes #4368
|
||||||
|
if (authors.count (s => s.equalsIgnoreCase ("Addie Jackson") ) > 0 && "Elsevier BV".equalsIgnoreCase (publication.getPublisher.getValue) )
|
||||||
|
return false
|
||||||
|
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def isValidAuthorName (fullName: String): Boolean = {
|
||||||
|
if (fullName == null || fullName.isEmpty)
|
||||||
|
return false
|
||||||
|
if (invalidName.contains (fullName.toLowerCase.trim) )
|
||||||
|
return false
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def generateDataInfo (trust: String): DataInfo = {
|
||||||
val di = new DataInfo
|
val di = new DataInfo
|
||||||
di.setDeletedbyinference(false)
|
di.setDeletedbyinference (false)
|
||||||
di.setInferred(false)
|
di.setInferred (false)
|
||||||
di.setInvisible(false)
|
di.setInvisible (false)
|
||||||
di.setTrust(trust)
|
di.setTrust (trust)
|
||||||
di.setProvenanceaction(createQualifier("sysimport:actionset", "dnet:provenanceActions"))
|
di.setProvenanceaction (createQualifier ("sysimport:actionset", "dnet:provenanceActions") )
|
||||||
di
|
di
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def createSP(value: String, classId: String, schemeId: String): StructuredProperty = {
|
def createSP (value: String, classId: String, schemeId: String): StructuredProperty = {
|
||||||
val sp = new StructuredProperty
|
val sp = new StructuredProperty
|
||||||
sp.setQualifier(createQualifier(classId, schemeId))
|
sp.setQualifier (createQualifier (classId, schemeId) )
|
||||||
sp.setValue(value)
|
sp.setValue (value)
|
||||||
sp
|
sp
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def createSP(value: String, classId: String, schemeId: String, dataInfo: DataInfo): StructuredProperty = {
|
def createSP (value: String, classId: String, schemeId: String, dataInfo: DataInfo): StructuredProperty = {
|
||||||
val sp = new StructuredProperty
|
val sp = new StructuredProperty
|
||||||
sp.setQualifier(createQualifier(classId, schemeId))
|
sp.setQualifier (createQualifier (classId, schemeId) )
|
||||||
sp.setValue(value)
|
sp.setValue (value)
|
||||||
sp.setDataInfo(dataInfo)
|
sp.setDataInfo (dataInfo)
|
||||||
sp
|
sp
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def createCrossrefCollectedFrom(): KeyValue = {
|
def createCrossrefCollectedFrom (): KeyValue = {
|
||||||
|
|
||||||
val cf = new KeyValue
|
val cf = new KeyValue
|
||||||
cf.setValue(CROSSREF)
|
cf.setValue (CROSSREF)
|
||||||
cf.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5(CROSSREF.toLowerCase))
|
cf.setKey ("10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5 (CROSSREF.toLowerCase) )
|
||||||
cf
|
cf
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def createUnpayWallCollectedFrom(): KeyValue = {
|
def createUnpayWallCollectedFrom (): KeyValue = {
|
||||||
|
|
||||||
val cf = new KeyValue
|
val cf = new KeyValue
|
||||||
cf.setValue(UNPAYWALL)
|
cf.setValue (UNPAYWALL)
|
||||||
cf.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5(UNPAYWALL.toLowerCase))
|
cf.setKey ("10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5 (UNPAYWALL.toLowerCase) )
|
||||||
cf
|
cf
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def createORIDCollectedFrom(): KeyValue = {
|
def createORIDCollectedFrom (): KeyValue = {
|
||||||
|
|
||||||
val cf = new KeyValue
|
val cf = new KeyValue
|
||||||
cf.setValue(ORCID)
|
cf.setValue (ORCID)
|
||||||
cf.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5(ORCID.toLowerCase))
|
cf.setKey ("10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5 (ORCID.toLowerCase) )
|
||||||
cf
|
cf
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def generateIdentifier (oaf: Result, doi: String): String = {
|
||||||
def generateIdentifier(oaf: Result, doi: String): String = {
|
val id = DHPUtils.md5 (doi.toLowerCase)
|
||||||
val id = DHPUtils.md5(doi.toLowerCase)
|
if (oaf.isInstanceOf[Dataset] )
|
||||||
if (oaf.isInstanceOf[Dataset])
|
return s"60|${
|
||||||
return s"60|${doiBoostNSPREFIX}${SEPARATOR}${id}"
|
doiBoostNSPREFIX
|
||||||
s"50|${doiBoostNSPREFIX}${SEPARATOR}${id}"
|
}${
|
||||||
}
|
SEPARATOR
|
||||||
|
}${
|
||||||
|
id
|
||||||
|
}"
|
||||||
|
s"50|${
|
||||||
|
doiBoostNSPREFIX
|
||||||
|
}${
|
||||||
|
SEPARATOR
|
||||||
|
}${
|
||||||
|
id
|
||||||
|
}"
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def createMAGCollectedFrom (): KeyValue = {
|
||||||
|
|
||||||
|
|
||||||
def createMAGCollectedFrom(): KeyValue = {
|
|
||||||
|
|
||||||
val cf = new KeyValue
|
val cf = new KeyValue
|
||||||
cf.setValue(MAG)
|
cf.setValue (MAG)
|
||||||
cf.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5(MAG))
|
cf.setKey ("10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5 (MAG) )
|
||||||
cf
|
cf
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def createQualifier(clsName: String, clsValue: String, schName: String, schValue: String): Qualifier = {
|
def createQualifier (clsName: String, clsValue: String, schName: String, schValue: String): Qualifier = {
|
||||||
val q = new Qualifier
|
val q = new Qualifier
|
||||||
q.setClassid(clsName)
|
q.setClassid (clsName)
|
||||||
q.setClassname(clsValue)
|
q.setClassname (clsValue)
|
||||||
q.setSchemeid(schName)
|
q.setSchemeid (schName)
|
||||||
q.setSchemename(schValue)
|
q.setSchemename (schValue)
|
||||||
q
|
q
|
||||||
}
|
}
|
||||||
|
|
||||||
def createQualifier(cls: String, sch: String): Qualifier = {
|
def createQualifier (cls: String, sch: String): Qualifier = {
|
||||||
createQualifier(cls, cls, sch, sch)
|
createQualifier (cls, cls, sch, sch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def asField[T](value: T): Field[T] = {
|
def asField[T] (value: T): Field[T] = {
|
||||||
val tmp = new Field[T]
|
val tmp = new Field[T]
|
||||||
tmp.setValue(value)
|
tmp.setValue (value)
|
||||||
tmp
|
tmp
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,11 +32,11 @@ object SparkGenerateDoiBoost {
|
||||||
|
|
||||||
|
|
||||||
logger.info("Phase 1) repartition and move all the dataset in a same working folder")
|
logger.info("Phase 1) repartition and move all the dataset in a same working folder")
|
||||||
spark.read.load(crossrefPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefPublication")
|
// spark.read.load(crossrefPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefPublication")
|
||||||
spark.read.load(crossrefDatasetPath).as(Encoders.bean(classOf[OafDataset])).map(s => s)(Encoders.kryo[OafDataset]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefDataset")
|
// spark.read.load(crossrefDatasetPath).as(Encoders.bean(classOf[OafDataset])).map(s => s)(Encoders.kryo[OafDataset]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefDataset")
|
||||||
spark.read.load(uwPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/uwPublication")
|
// spark.read.load(uwPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/uwPublication")
|
||||||
spark.read.load(orcidPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/orcidPublication")
|
// spark.read.load(orcidPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/orcidPublication")
|
||||||
spark.read.load(magPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/magPublication")
|
// spark.read.load(magPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/magPublication")
|
||||||
|
|
||||||
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
|
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
|
||||||
implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
|
implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
|
||||||
|
@ -67,14 +67,13 @@ object SparkGenerateDoiBoost {
|
||||||
logger.info("Phase 3) Join Result with MAG")
|
logger.info("Phase 3) Join Result with MAG")
|
||||||
val sj: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/secondJoin").as[Publication].map(p => (p.getId, p))
|
val sj: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/secondJoin").as[Publication].map(p => (p.getId, p))
|
||||||
|
|
||||||
sj.where(sj("_1").like())
|
|
||||||
|
|
||||||
val magPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/magPublication").as[Publication].map(p => (p.getId, p))
|
val magPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/magPublication").as[Publication].map(p => (p.getId, p))
|
||||||
sj.joinWith(magPublication, sj("_1").equalTo(magPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublication")
|
sj.joinWith(magPublication, sj("_1").equalTo(magPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublication")
|
||||||
|
|
||||||
|
|
||||||
|
val doiBoostPublication: Dataset[Publication] = spark.read.load(s"$workingDirPath/doiBoostPublication").as[Publication]
|
||||||
|
|
||||||
|
doiBoostPublication.filter(p=>DoiBoostMappingUtil.filterPublication(p)).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationFiltered")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
File diff suppressed because one or more lines are too long
|
@ -0,0 +1,16 @@
|
||||||
|
package eu.dnetlib.dhp.doiboost
|
||||||
|
|
||||||
|
|
||||||
|
import eu.dnetlib.doiboost.DoiBoostMappingUtil
|
||||||
|
import org.junit.jupiter.api.Test
|
||||||
|
|
||||||
|
class DoiBoostHostedByMapTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testLoadMap(): Unit = {
|
||||||
|
println(DoiBoostMappingUtil.retrieveHostedByMap().keys.size)
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue