[scala-refactor] Module dhp-graph-mapper:

Moved all scala source into src/main/scala and src/test/scala
This commit is contained in:
Sandro La Bruzzo 2021-12-06 13:57:41 +01:00
parent 81bf604059
commit bf880e2508
23 changed files with 219 additions and 257 deletions

View File

@ -1,8 +1,8 @@
package eu.dnetlib.dhp.oa.graph.hostedbymap
import eu.dnetlib.dhp.oa.graph.hostedbymap.model.EntityInfo
import org.apache.spark.sql.{Dataset, Encoder, Encoders, TypedColumn}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Dataset, Encoder, Encoders, TypedColumn}
case class HostedByItemType(id: String, officialname: String, issn: String, eissn: String, lissn: String, openAccess: Boolean) {}

View File

@ -2,13 +2,12 @@ package eu.dnetlib.dhp.oa.graph.hostedbymap
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.oa.graph.hostedbymap.SparkApplyHostedByMapToResult.{applyHBtoPubs, getClass}
import eu.dnetlib.dhp.oa.graph.hostedbymap.model.EntityInfo
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.{Datasource, Publication}
import eu.dnetlib.dhp.schema.oaf.Datasource
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.sql._
import org.json4s.DefaultFormats
import org.slf4j.{Logger, LoggerFactory}

View File

@ -5,16 +5,14 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.oa.graph.hostedbymap.model.EntityInfo
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils
import eu.dnetlib.dhp.schema.oaf.{Datasource, Instance, OpenAccessRoute, Publication}
import eu.dnetlib.dhp.schema.oaf.{Instance, OpenAccessRoute, Publication}
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.sql._
import org.json4s.DefaultFormats
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
object SparkApplyHostedByMapToResult {
def applyHBtoPubs(join: Dataset[EntityInfo], pubs: Dataset[Publication]) = {
@ -25,7 +23,7 @@ object SparkApplyHostedByMapToResult {
val ei: EntityInfo = t2._2
val i = p.getInstance().asScala
if (i.size == 1) {
val inst: Instance = i(0)
val inst: Instance = i.head
inst.getHostedby.setKey(ei.getHostedById)
inst.getHostedby.setValue(ei.getName)
if (ei.getOpenAccess) {
@ -39,6 +37,7 @@ object SparkApplyHostedByMapToResult {
p
})(Encoders.bean(classOf[Publication]))
}
def main(args: Array[String]): Unit = {

View File

@ -3,18 +3,15 @@ package eu.dnetlib.dhp.oa.graph.hostedbymap
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.oa.graph.hostedbymap.model.EntityInfo
import eu.dnetlib.dhp.schema.oaf.{Journal, Publication}
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.sql._
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
import org.slf4j.{Logger, LoggerFactory}
object SparkPrepareHostedByInfoToApply {
implicit val mapEncoderPInfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])

View File

@ -1,22 +1,20 @@
package eu.dnetlib.dhp.oa.graph.hostedbymap
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.oa.graph.hostedbymap.model.{DOAJModel, UnibiGoldModel}
import eu.dnetlib.dhp.schema.oaf.Datasource
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import org.json4s.DefaultFormats
import org.slf4j.{Logger, LoggerFactory}
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import java.io.PrintWriter
import org.apache.hadoop.io.compress.GzipCodec
object SparkProduceHostedByMap {
@ -52,7 +50,6 @@ object SparkProduceHostedByMap {
}
def getHostedByItemType(id: String, officialname: String, issn: String, eissn: String, issnl: String, oa: Boolean): HostedByItemType = {
if (issn != null) {
if (eissn != null) {
@ -163,7 +160,6 @@ object SparkProduceHostedByMap {
}
def writeToHDFS(input: Array[String], outputPath: String, hdfsNameNode: String): Unit = {
val conf = new Configuration()
@ -182,7 +178,6 @@ object SparkProduceHostedByMap {
}
def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(getClass)

View File

@ -4,17 +4,11 @@ import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.HdfsSupport
import eu.dnetlib.dhp.schema.common.ModelSupport
import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.HttpClients
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
import scala.io.Source

View File

@ -2,9 +2,8 @@ package eu.dnetlib.dhp.oa.graph.resolution
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.HdfsSupport
import eu.dnetlib.dhp.schema.common.EntityType
import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Result, Software, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.oaf.{Dataset => OafDataset,_}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf

View File

@ -3,7 +3,7 @@ package eu.dnetlib.dhp.oa.graph.resolution
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.HdfsSupport
import eu.dnetlib.dhp.schema.oaf.{Relation, Result}
import eu.dnetlib.dhp.schema.oaf.Relation
import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FileSystem, Path}

View File

@ -18,7 +18,6 @@ object SparkDataciteToOAF {
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
import spark.implicits._
val sc = spark.sparkContext

View File

@ -2,7 +2,7 @@ package eu.dnetlib.dhp.sx.graph
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, OtherResearchProduct, Publication, Result, Software, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.oaf.Result
import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkConf

View File

@ -5,10 +5,10 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.sx.scholix.Scholix
import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary
import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import org.apache.hadoop.io.compress._
object SparkConvertObjectToJson {

View File

@ -2,11 +2,12 @@ package eu.dnetlib.dhp.sx.graph
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Software,Dataset => OafDataset}
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
object SparkConvertRDDtoDataset {
def main(args: Array[String]): Unit = {

View File

@ -1,14 +1,12 @@
package eu.dnetlib.dhp.sx.graph
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.oaf.{Dataset => OafDataset,_}
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
object SparkCreateInputGraph {
def main(args: Array[String]): Unit = {
@ -41,9 +39,6 @@ object SparkCreateInputGraph {
implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath -> $sourcePath")
val targetPath = parser.get("targetPath")

View File

@ -9,7 +9,7 @@ import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils.RelatedEntities
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.count
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
object SparkCreateScholix {

View File

@ -6,7 +6,7 @@ import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary
import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
object SparkCreateSummaryObject {

View File

@ -5,6 +5,7 @@ import org.apache.spark.sql.{Encoder, Encoders}
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
import java.util.regex.Pattern
import scala.language.postfixOps
import scala.xml.{Elem, Node, XML}

View File

@ -2,11 +2,11 @@ package eu.dnetlib.dhp.sx.graph.pangaea
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
import scala.io.Source
object SparkGeneratePanagaeaDataset {
@ -46,7 +46,4 @@ object SparkGeneratePanagaeaDataset {
}
}

View File

@ -1,6 +1,5 @@
package eu.dnetlib.dhp.sx.graph.scholix
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Result, StructuredProperty}
import eu.dnetlib.dhp.schema.sx.scholix._
import eu.dnetlib.dhp.schema.sx.summary.{CollectedFromType, SchemeValue, ScholixSummary, Typology}
@ -10,10 +9,8 @@ import org.apache.spark.sql.{Encoder, Encoders}
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
import scala.collection.JavaConverters._
import scala.io.Source
import scala.language.postfixOps
object ScholixUtils {
@ -21,6 +18,7 @@ object ScholixUtils {
val DNET_IDENTIFIER_SCHEMA: String = "DNET Identifier"
val DATE_RELATION_KEY: String = "RelationDate"
case class RelationVocabulary(original: String, inverse: String) {}
case class RelatedEntities(id: String, relatedDataset: Long, relatedPublication: Long) {}
@ -66,7 +64,6 @@ object ScholixUtils {
}
val statsAggregator: Aggregator[(String, String, Long), RelatedEntities, RelatedEntities] = new Aggregator[(String, String, Long), RelatedEntities, RelatedEntities] with Serializable {
override def zero: RelatedEntities = null
@ -85,8 +82,7 @@ object ScholixUtils {
if (b1 != null && b2 != null)
RelatedEntities(b1.id, b1.relatedDataset + b2.relatedDataset, b1.relatedPublication + b2.relatedPublication)
else
if (b1!= null)
else if (b1 != null)
b1
else
b2
@ -144,7 +140,6 @@ object ScholixUtils {
s
}
@ -203,8 +198,7 @@ object ScholixUtils {
if (summaryObject.getDate != null && !summaryObject.getDate.isEmpty)
r.setPublicationDate(summaryObject.getDate.get(0))
if (summaryObject.getPublisher!= null && !summaryObject.getPublisher.isEmpty)
{
if (summaryObject.getPublisher != null && !summaryObject.getPublisher.isEmpty) {
val plist: List[ScholixEntityId] = summaryObject.getPublisher.asScala.map(p => new ScholixEntityId(p, null)).toList
if (plist.nonEmpty)
@ -228,9 +222,6 @@ object ScholixUtils {
}
def scholixFromSource(relation: Relation, source: ScholixSummary): Scholix = {
if (relation == null || source == null)

View File

@ -3,13 +3,9 @@ package eu.dnetlib.dhp.oa.graph.hostedbymap
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.oa.graph.hostedbymap.SparkPrepareHostedByInfoToApply.{joinResHBM, prepareResultInfo, toEntityInfo}
import eu.dnetlib.dhp.oa.graph.hostedbymap.model.EntityInfo
import eu.dnetlib.dhp.schema.oaf.{Datasource, OpenAccessRoute, Publication}
import javax.management.openmbean.OpenMBeanAttributeInfo
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import org.json4s
import org.json4s.DefaultFormats
import eu.dnetlib.dhp.schema.common.ModelConstants
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Test

View File

@ -4,10 +4,9 @@ import eu.dnetlib.dhp.schema.oaf.Datasource
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import org.json4s.DefaultFormats
import org.junit.jupiter.api.Assertions.{assertNotNull, assertTrue}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions._
import org.json4s.jackson.Serialization.write
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
class TestPreprocess extends java.io.Serializable{