diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/create_MAG_denormalized_table_properites.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/create_MAG_denormalized_table_properites.json
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/config-default.xml
new file mode 100644
index 0000000000..dd3c32c620
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/config-default.xml
@@ -0,0 +1,23 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/workflow.xml
new file mode 100644
index 0000000000..bcb049a06b
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/workflow.xml
@@ -0,0 +1,95 @@
+
+
+
+ crossrefPath
+ the path of the native Crossref DUMP
+
+
+ magBasePath
+ The base path of MAG DUMP CSV Tables
+
+
+ workingPath
+ The working path
+
+
+ resume_from
+ start Node
+
+
+
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+ ${wf:conf('resumeFrom') eq 'generateTable'}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Generate ORCID Tables
+ eu.dnetlib.dhp.collection.mag.SparkCreateMagDenormalizedTable
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.executor.memoryOverhead=2g
+ --conf spark.sql.shuffle.partitions=3000
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --crossrefPath${crossrefPath}
+ --magBasePath${magBasePath}
+ --workingPath${workingPath}
+ --masteryarn
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Generate ORCID Tables
+ eu.dnetlib.dhp.collection.mag.SparkCreateMagDenormalizedTable
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.executor.memoryOverhead=2g
+ --conf spark.sql.shuffle.partitions=3000
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --crossrefPath${crossrefPath}
+ --magBasePath${magBasePath}
+ --workingPath${workingPath}
+ --masteryarn
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala
index 412089893f..42cee7354b 100644
--- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala
@@ -6,10 +6,67 @@ import org.json4s
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
+case class MAGPaper(
+ paperId: Option[Long],
+ rank: Option[Int],
+ doi: Option[String],
+ docType: Option[String],
+ paperTitle: Option[String],
+ originalTitle: Option[String],
+ bookTitle: Option[String],
+ year: Option[Int],
+ date: Option[String],
+ onlineDate: Option[String],
+ publisher: Option[String],
+ // Journal or Conference information (one will be populated)
+ journalId: Option[Long],
+ journalName: Option[String],
+ journalIssn: Option[String],
+ journalPublisher: Option[String],
+ journalWebpage: Option[String],
+ conferenceSeriesId: Option[Long],
+ conferenceInstanceId: Option[Long],
+ conferenceName: Option[String],
+ conferenceLocation: Option[String],
+ conferenceStartDate: Option[String],
+ conferenceEndDate: Option[String],
+ volume: Option[String],
+ issue: Option[String],
+ firstPage: Option[String],
+ lastPage: Option[String],
+ referenceCount: Option[Long],
+ citationCount: Option[Long],
+ estimatedCitation: Option[Long],
+ originalVenue: Option[String],
+ familyId: Option[Long],
+ familyRank: Option[Int],
+ docSubTypes: Option[String],
+ createdDate: Option[String],
+ abstractText: Option[String],
+ // List of authors
+ authors: Option[List[MAGAuthor]],
+ // List of Fields of Study
+ fos: Option[List[MAGFieldOfStudy]]
+)
+
+case class MAGAuthor(
+ AffiliationId: Option[Long],
+ AuthorSequenceNumber: Option[Int],
+ AffiliationName: Option[String],
+ AuthorName: Option[String],
+ AuthorId: Option[Long],
+ GridId: Option[String]
+)
+
+case class MAGFieldOfStudy(
+ FieldOfStudyId: Option[Long],
+ DisplayName: Option[String],
+ MainType: Option[String],
+ Score: Option[Double]
+)
object MagUtility extends Serializable {
-
val datatypedict = Map(
"bool" -> BooleanType,
"int" -> IntegerType,
@@ -251,22 +308,22 @@ object MagUtility extends Serializable {
def getSchema(streamName: String): StructType = {
var schema = new StructType()
val d: Seq[String] = stream(streamName)._2
- d.foreach { case t =>
- val currentType = t.split(":")
- val fieldName: String = currentType.head
- var fieldType: String = currentType.last
- val nullable: Boolean = fieldType.endsWith("?")
- if (nullable)
- fieldType = fieldType.replace("?", "")
- schema = schema.add(StructField(fieldName, datatypedict(fieldType), nullable))
+ d.foreach {
+ case t =>
+ val currentType = t.split(":")
+ val fieldName: String = currentType.head
+ var fieldType: String = currentType.last
+ val nullable: Boolean = fieldType.endsWith("?")
+ if (nullable)
+ fieldType = fieldType.replace("?", "")
+ schema = schema.add(StructField(fieldName, datatypedict(fieldType), nullable))
}
schema
}
-
- def loadMagEntity(spark:SparkSession, entity:String, basePath:String):Dataset[Row] = {
+ def loadMagEntity(spark: SparkSession, entity: String, basePath: String): Dataset[Row] = {
if (stream.contains(entity)) {
- val s =getSchema(entity)
+ val s = getSchema(entity)
val pt = stream(entity)._1
spark.read
.option("header", "false")
@@ -278,6 +335,7 @@ object MagUtility extends Serializable {
null
}
+
def convertInvertedIndexString(json_input: String): String = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(json_input)
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDataset.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDataset.scala
deleted file mode 100644
index 70073cbe60..0000000000
--- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDataset.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-package eu.dnetlib.dhp.collection.mag
-
-import eu.dnetlib.dhp.application.AbstractScalaApplication
-import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.slf4j.Logger
-
-class SparkCreateMagDataset (propertyPath: String, args: Array[String], log: Logger)
- extends AbstractScalaApplication(propertyPath, args, log: Logger) {
-
-
-
- /** Here all the spark applications runs this method
- * where the whole logic of the spark node is defined
- */
- override def run(): Unit = {
-
- }
-
-
- private def loadAndFilterPapers(spark:SparkSession, crossrefPath:String, magBasePath:String, workingPath:String): Unit = {
-
- import spark.implicits._
- val schema:StructType= StructType(StructField("DOI", StringType)::Nil)
-
- //Filter all the MAG Papers that intersect with a Crossref DOI
- val crId= spark.read.schema(schema).json(crossrefPath).withColumn("crId", lower(col("DOI"))).distinct.select("crId")
- val magPapers = MagUtility.loadMagEntity(spark, "Papers", magBasePath)
- .withColumn("Doi", lower(col("Doi")))
- .where(col("Doi").isNotNull)
-
-
- val intersectedPapers:Dataset[Row] =magPapers.join(crId, magPapers("Doi").equalTo(crId("crId")), "leftsemi").dropDuplicates("Doi")
- intersectedPapers.cache()
- intersectedPapers.count()
- //log.info("Create current abstract")
-
- //Abstract is an inverted list, we define a function that convert in string the abstract and recreate
- // a table(PaperId, Abstract)
- val paperAbstract = MagUtility.loadMagEntity(spark, "PaperAbstractsInvertedIndex", magBasePath)
- .map(s => (s.getLong(0),MagUtility.convertInvertedIndexString(s.getString(1))))
- .withColumnRenamed("_1","PaperId")
- .withColumnRenamed("_2","Abstract")
-
- //We define Step0 as the result of the Join between PaperIntersection and the PaperAbstract
-
- val step0 =intersectedPapers
- .join(paperAbstract, intersectedPapers("PaperId") === paperAbstract("PaperId"), "left")
- .select(intersectedPapers("*"),paperAbstract("Abstract")).cache()
-
- step0.count()
-
- intersectedPapers.unpersist()
-
- // We have three table Author, Affiliation, and PaperAuthorAffiliation, in the
- //next step we create a table containing
- val authors = MagUtility.loadMagEntity(spark, "Authors", magBasePath)
- val affiliations= MagUtility.loadMagEntity(spark, "Affiliations", magBasePath)
- val paaf= MagUtility.loadMagEntity(spark, "PaperAuthorAffiliations", magBasePath)
-
- val paperAuthorAffiliations =paaf.join(step0,paaf("PaperId") === step0("PaperId"),"leftsemi")
-
- val j1 = paperAuthorAffiliations.join(authors,paperAuthorAffiliations("AuthorId") === authors("AuthorId"), "inner")
- .select(col("PaperId"), col("AffiliationId"),col("AuthorSequenceNumber"), authors("DisplayName").alias("AuthorName"), authors("AuthorId"))
-
- val paperAuthorAffiliationNormalized = j1.join(affiliations, j1("AffiliationId")=== affiliations("AffiliationId"), "left")
- .select(j1("*"), affiliations("DisplayName").alias("AffiliationName"), affiliations("GridId"))
- .groupBy("PaperId")
- .agg(collect_list(struct("AffiliationId","AuthorSequenceNumber","AffiliationName","AuthorName","AuthorId","GridId")).alias("authors"))
- val step1 =step0.join(paperAuthorAffiliationNormalized, step0("PaperId")=== paperAuthorAffiliationNormalized("PaperId"), "left")
- .select(step0("*"),paperAuthorAffiliationNormalized("authors"))
- .cache()
- step1.count()
-
- step0.unpersist()
-
-
- val conference = MagUtility.loadMagEntity(spark, "ConferenceInstances", magBasePath).select(
- $"ConferenceInstanceId",
- $"DisplayName".as("conferenceName"),
- $"Location".as("conferenceLocation"),
- $"StartDate".as("conferenceStartDate"),
- $"EndDate".as("conferenceEndDate")
- )
-
- val step2 =step1.join(conference, step1("ConferenceInstanceId")=== conference("ConferenceInstanceId"),"left").select(
- step1("*"), conference("conferenceName"),
- conference("conferenceLocation"),
- conference("conferenceStartDate"),
- conference("conferenceEndDate")).cache()
- step2.count()
- step1.unpersist()
-
- val fos = MagUtility.loadMagEntity(spark, "FieldsOfStudy", magBasePath)
- .select($"FieldOfStudyId".alias("fos"), $"DisplayName", $"MainType")
-
- val paperFieldsOfStudy = MagUtility.loadMagEntity(spark, "PaperFieldsOfStudy", magBasePath)
- .select($"FieldOfStudyId", $"Score", $"PaperId")
-
- val paperFoS = paperFieldsOfStudy.join(broadcast(fos),fos("fos")===paperFieldsOfStudy("FieldOfStudyId")).groupBy("PaperId")
- .agg(collect_set(struct("FieldOfStudyId","DisplayName","MainType","Score")).as("FoS"))
-
- val step3=step2.join(paperFoS, step2("PaperId")===paperFoS("PaperId"), "left")
- .select(step2("*"), paperFoS("FoS")).cache()
- step3.count()
-
- step2.unpersist()
-
- val journals= MagUtility.loadMagEntity(spark, "Journals", magBasePath)
- .select(
- $"JournalId",
- $"DisplayName".as("journalName"),
- $"Issn".as("journalIssn"),
- $"Publisher".as("journalPublisher"),
- $"Webpage".as("journalWebpage")
- )
- step3.join(journals, step3("JournalId")===journals("JournalId"), "left").
- select(step3("*"),
- journals("journalName"),
- journals("journalIssn"),
- journals("journalPublisher"),
- journals("journalWebpage")
- ).write.mode("OverWrite")
- .save(s"$workingPath/generatedMAG")
- step3.unpersist()
- }
-}
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDenormalizedTable.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDenormalizedTable.scala
new file mode 100644
index 0000000000..7843d796a1
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkCreateMagDenormalizedTable.scala
@@ -0,0 +1,222 @@
+package eu.dnetlib.dhp.collection.mag
+
+import eu.dnetlib.dhp.application.AbstractScalaApplication
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.slf4j.{Logger, LoggerFactory}
+
+class SparkCreateMagDenormalizedTable(propertyPath: String, args: Array[String], log: Logger)
+ extends AbstractScalaApplication(propertyPath, args, log: Logger) {
+
+ /** Here all the spark applications runs this method
+ * where the whole logic of the spark node is defined
+ */
+ override def run(): Unit = {
+ val crossrefPath: String = parser.get("crossrefPath")
+ log.info("found parameters crossrefPath: {}", crossrefPath)
+ val magBasePath: String = parser.get("magBasePath")
+ log.info("found parameters magBasePath: {}", magBasePath)
+ val workingPath: String = parser.get("workingPath")
+ log.info("found parameters workingPath: {}", workingPath)
+ generatedDenormalizedMAGTable(spark, crossrefPath, magBasePath, workingPath)
+ }
+
+ private def generatedDenormalizedMAGTable(
+ spark: SparkSession,
+ crossrefPath: String,
+ magBasePath: String,
+ workingPath: String
+ ): Unit = {
+
+ import spark.implicits._
+ val schema: StructType = StructType(StructField("DOI", StringType) :: Nil)
+
+ //Filter all the MAG Papers that intersect with a Crossref DOI
+ val crId =
+ spark.read.schema(schema).json(crossrefPath).withColumn("crId", lower(col("DOI"))).distinct.select("crId")
+ val magPapers = MagUtility
+ .loadMagEntity(spark, "Papers", magBasePath)
+ .withColumn("Doi", lower(col("Doi")))
+ .where(col("Doi").isNotNull)
+
+ val intersectedPapers: Dataset[Row] =
+ magPapers.join(crId, magPapers("Doi").equalTo(crId("crId")), "leftsemi").dropDuplicates("Doi")
+ intersectedPapers.cache()
+ intersectedPapers.count()
+ //log.info("Create current abstract")
+
+ //Abstract is an inverted list, we define a function that convert in string the abstract and recreate
+ // a table(PaperId, Abstract)
+ val paperAbstract = MagUtility
+ .loadMagEntity(spark, "PaperAbstractsInvertedIndex", magBasePath)
+ .map(s => (s.getLong(0), MagUtility.convertInvertedIndexString(s.getString(1))))
+ .withColumnRenamed("_1", "PaperId")
+ .withColumnRenamed("_2", "Abstract")
+
+ //We define Step0 as the result of the Join between PaperIntersection and the PaperAbstract
+
+ val step0 = intersectedPapers
+ .join(paperAbstract, intersectedPapers("PaperId") === paperAbstract("PaperId"), "left")
+ .select(intersectedPapers("*"), paperAbstract("Abstract"))
+ .cache()
+
+ step0.count()
+
+ intersectedPapers.unpersist()
+
+ // We have three table Author, Affiliation, and PaperAuthorAffiliation, in the
+ //next step we create a table containing
+ val authors = MagUtility.loadMagEntity(spark, "Authors", magBasePath)
+ val affiliations = MagUtility.loadMagEntity(spark, "Affiliations", magBasePath)
+ val paaf = MagUtility.loadMagEntity(spark, "PaperAuthorAffiliations", magBasePath)
+
+ val paperAuthorAffiliations = paaf.join(step0, paaf("PaperId") === step0("PaperId"), "leftsemi")
+
+ val j1 = paperAuthorAffiliations
+ .join(authors, paperAuthorAffiliations("AuthorId") === authors("AuthorId"), "inner")
+ .select(
+ col("PaperId"),
+ col("AffiliationId"),
+ col("AuthorSequenceNumber"),
+ authors("DisplayName").alias("AuthorName"),
+ authors("AuthorId")
+ )
+
+ val paperAuthorAffiliationNormalized = j1
+ .join(affiliations, j1("AffiliationId") === affiliations("AffiliationId"), "left")
+ .select(j1("*"), affiliations("DisplayName").alias("AffiliationName"), affiliations("GridId"))
+ .groupBy("PaperId")
+ .agg(
+ collect_list(
+ struct("AffiliationId", "AuthorSequenceNumber", "AffiliationName", "AuthorName", "AuthorId", "GridId")
+ ).alias("authors")
+ )
+ val step1 = step0
+ .join(paperAuthorAffiliationNormalized, step0("PaperId") === paperAuthorAffiliationNormalized("PaperId"), "left")
+ .select(step0("*"), paperAuthorAffiliationNormalized("authors"))
+ .cache()
+ step1.count()
+
+ step0.unpersist()
+
+ val conference = MagUtility
+ .loadMagEntity(spark, "ConferenceInstances", magBasePath)
+ .select(
+ $"ConferenceInstanceId",
+ $"DisplayName".as("conferenceName"),
+ $"Location".as("conferenceLocation"),
+ $"StartDate".as("conferenceStartDate"),
+ $"EndDate".as("conferenceEndDate")
+ )
+
+ val step2 = step1
+ .join(conference, step1("ConferenceInstanceId") === conference("ConferenceInstanceId"), "left")
+ .select(
+ step1("*"),
+ conference("conferenceName"),
+ conference("conferenceLocation"),
+ conference("conferenceStartDate"),
+ conference("conferenceEndDate")
+ )
+ .cache()
+ step2.count()
+ step1.unpersist()
+
+ val fos = MagUtility
+ .loadMagEntity(spark, "FieldsOfStudy", magBasePath)
+ .select($"FieldOfStudyId".alias("fos"), $"DisplayName", $"MainType")
+
+ val paperFieldsOfStudy = MagUtility
+ .loadMagEntity(spark, "PaperFieldsOfStudy", magBasePath)
+ .select($"FieldOfStudyId", $"Score", $"PaperId")
+
+ val paperFoS = paperFieldsOfStudy
+ .join(broadcast(fos), fos("fos") === paperFieldsOfStudy("FieldOfStudyId"))
+ .groupBy("PaperId")
+ .agg(collect_set(struct("FieldOfStudyId", "DisplayName", "MainType", "Score")).as("FoS"))
+
+ val step3 = step2
+ .join(paperFoS, step2("PaperId") === paperFoS("PaperId"), "left")
+ .select(step2("*"), paperFoS("FoS"))
+ .cache()
+ step3.count()
+
+ step2.unpersist()
+
+ val journals = MagUtility
+ .loadMagEntity(spark, "Journals", magBasePath)
+ .select(
+ $"JournalId",
+ $"DisplayName".as("journalName"),
+ $"Issn".as("journalIssn"),
+ $"Publisher".as("journalPublisher"),
+ $"Webpage".as("journalWebpage")
+ )
+ step3
+ .join(journals, step3("JournalId") === journals("JournalId"), "left")
+ .select(
+ step3("*"),
+ journals("journalName"),
+ journals("journalIssn"),
+ journals("journalPublisher"),
+ journals("journalWebpage")
+ )
+ .select(
+ $"PaperId".as("paperId"),
+ $"Rank".as("rank"),
+ $"Doi".as("doi"),
+ $"DocType".as("docType"),
+ $"PaperTitle".as("paperTitle"),
+ $"OriginalTitle".as("originalTitle"),
+ $"BookTitle".as("bookTitle"),
+ $"Year".as("year"),
+ $"Date".as("date"),
+ $"OnlineDate".as("onlineDate"),
+ $"Publisher".as("publisher"),
+ $"JournalId".as("journalId"),
+ $"ConferenceSeriesId".as("conferenceSeriesId"),
+ $"ConferenceInstanceId".as("conferenceInstanceId"),
+ $"Volume".as("volume"),
+ $"Issue".as("issue"),
+ $"FirstPage".as("firstPage"),
+ $"LastPage".as("lastPage"),
+ $"ReferenceCount".as("referenceCount"),
+ $"CitationCount".as("citationCount"),
+ $"EstimatedCitation".as("estimatedCitation"),
+ $"OriginalVenue".as("originalVenue"),
+ $"FamilyId".as("familyId"),
+ $"FamilyRank".as("familyRank"),
+ $"DocSubTypes".as("docSubTypes"),
+ $"CreatedDate".as("createdDate"),
+ $"Abstract".as("abstractText"),
+ $"authors".as("authors"),
+ $"conferenceName".as("conferenceName"),
+ $"conferenceLocation".as("conferenceLocation"),
+ $"conferenceStartDate".as("conferenceStartDate"),
+ $"conferenceEndDate".as("conferenceEndDate"),
+ $"FoS".as("fos"),
+ $"journalName".as("journalName"),
+ $"journalIssn".as("journalIssn"),
+ $"journalPublisher".as("journalPublisher"),
+ $"journalWebpage".as("journalWebpage")
+ )
+ .write
+ .mode("OverWrite")
+ .save(s"$workingPath/mag")
+ step3.unpersist()
+ }
+}
+
+object SparkCreateMagDenormalizedTable {
+
+ val log: Logger = LoggerFactory.getLogger(SparkCreateMagDenormalizedTable.getClass)
+
+ def main(args: Array[String]): Unit = {
+ new SparkCreateMagDenormalizedTable(
+ "/eu/dnetlib/dhp/collection/mag/create_MAG_denormalized_table_properites.json",
+ args,
+ log
+ )
+ }
+}