diff --git a/.gitignore b/.gitignore index 8b58fa1..6efdf89 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ .DS_Store .idea/ - +arguments.properties # ---> Java # Compiled class file *.class diff --git a/execute_notebook.py b/execute_notebook.py index 65d9750..0818682 100644 --- a/execute_notebook.py +++ b/execute_notebook.py @@ -5,9 +5,20 @@ from pathlib import Path import re import subprocess + regex = r"<\w*>(.*)<\/\w*>" +def extract_argument(path): + arguments = {} + with open(path) as f: + for line in f: + if not line.startswith("#"): + s = line.strip().split("=") + arguments[s[0].strip()] = s[1].strip() + return arguments + + def get_jar_path(): for current_path in glob.glob("target/*.jar"): return current_path @@ -41,6 +52,17 @@ def extract_dependencies(): check_dependency = False return dep_to_add +def extracting_class_args(args): + d = [] + skip =["user_name", "reference_class"] + for item in args: + if item not in skip: + d.append(f"-{item}") + d.append(args[item]) + + return " ".join(d) + + def to_path(dependency): home_dir = Path.home() @@ -59,19 +81,12 @@ if __name__ == "__main__": parser = argparse.ArgumentParser( description="This scripts help you to publish, execute your check script in scala ") - parser.add_argument("user_name") - parser.add_argument("reference_class") - parser.add_argument("argument_file") - args = parser.parse_args() + parser.add_argument("path") + python_args = parser.parse_args() - print(f"reference_class = {args.reference_class} argument_file={args.argument_file}") - - if os.path.exists(args.argument_file): - other_arguments = [] - with open(args.argument_file) as f: - for line in f: - if len(line.strip()): - other_arguments.append(line.strip()) + if os.path.exists(python_args.path): + script_argument = extract_argument(python_args.path) + other_arguments = [] print("Cleaning Compile application ") os.system('mvn clean compile package') main_jar_path = get_jar_path() @@ -80,26 +95,28 @@ if __name__ == "__main__": print("copy on your root folder") - os.system("ssh {}@iis-cdh5-test-gw.ocean.icm.edu.pl rm -rf sandro_nb".format(args.user_name)) + os.system("ssh {}@iis-cdh5-test-gw.ocean.icm.edu.pl rm -rf sandro_nb".format(script_argument['user_name'])) - os.system("ssh {}@iis-cdh5-test-gw.ocean.icm.edu.pl mkdir sandro_nb".format(args.user_name)) + os.system("ssh {}@iis-cdh5-test-gw.ocean.icm.edu.pl mkdir sandro_nb".format(script_argument['user_name'])) - os.system(f"scp {main_jar_path} {args.user_name}@iis-cdh5-test-gw.ocean.icm.edu.pl:sandro_nb/") + os.system(f"scp {main_jar_path} {script_argument['user_name']}@iis-cdh5-test-gw.ocean.icm.edu.pl:sandro_nb/") jars = [] for item in deps: name, p = to_path(item) if p: print(f"Copying dependencies {p} to lib deps") - os.system(f"scp {p} {args.user_name}@iis-cdh5-test-gw.ocean.icm.edu.pl:sandro_nb/") + os.system(f"scp {p} {script_argument['user_name']}@iis-cdh5-test-gw.ocean.icm.edu.pl:sandro_nb/") jars.append(name) j_name = ",".join(["sandro_nb/"+ item for item in jars]) name = main_jar_path.replace("target/", "") - command = f"spark2-submit --master yarn --jars {j_name} --class {args.reference_class} sandro_nb/{name}" - print(f"executing command {command}") - os.system( - "ssh {}@iis-cdh5-test-gw.ocean.icm.edu.pl {} {}".format(args.user_name, command, " ".join(other_arguments))) + class_args = extracting_class_args(script_argument) + command = f"spark2-submit --master yarn --jars {j_name} --class {script_argument['reference_class']} sandro_nb/{name} {class_args}" + + print(f"executing command {command}") + + os.system("ssh {}@iis-cdh5-test-gw.ocean.icm.edu.pl {} ".format(script_argument['user_name'], command, )) else: - raise Exception(f"path not found {args.argument_file}") + raise Exception(f"path not found {python_args.path}") diff --git a/pom.xml b/pom.xml index e4ac9f1..39b659b 100644 --- a/pom.xml +++ b/pom.xml @@ -137,11 +137,18 @@ dhp-schemas 3.15.0 + + org.junit.jupiter + junit-jupiter + ${junit-jupiter.version} + test + + 5.6.1 4.0.1 2.9.6 2.11.12 diff --git a/src/main/java/com/sandro/app/SparkApp.scala b/src/main/java/com/sandro/app/SparkApp.scala new file mode 100644 index 0000000..d34cb9e --- /dev/null +++ b/src/main/java/com/sandro/app/SparkApp.scala @@ -0,0 +1,78 @@ +package com.sandro.app + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.slf4j.Logger +import scala.collection.mutable + +trait SparkApp { + + /** Utility to parse the arguments.properties using the + * property json in the classpath identified from + * the variable propertyPath + * + * @param args the list of arguments.properties + */ + def parseArguments(args: Array[String]): mutable.Map[String, String] = { + var currentVariable: String = null + val argumentMap: mutable.Map[String, String] = mutable.Map() + + args.zipWithIndex.foreach { + case (x, i) => + if (i % 2 == 0) { + // ERROR in case the syntax is wrong + if (!x.startsWith("-")) throw new IllegalArgumentException("wrong input syntax expected -variable_name value") + + if (x.startsWith("--")) + currentVariable = x.substring(2) + else + currentVariable = x.substring(1) + } + else argumentMap += (currentVariable -> x) + } + argumentMap + } + + /** Here all the spark applications runs this method + * where the whole logic of the spark node is defined + */ + def run(): Unit +} + +abstract class AbstractScalaApplication(val args: Array[String], + log: Logger + ) extends SparkApp { + + var argumentMap: mutable.Map[String, String] = _ + + + var spark: SparkSession = _ + + def initialize(): SparkApp = { + argumentMap = parseArguments(args) + spark = createSparkSession() + spark.sparkContext.setLogLevel("WARN") + this + } + + /** Utility for creating a spark session starting from parser + * + * @return a spark Session + */ + private def createSparkSession(): SparkSession = { + require(argumentMap != null) + + val conf: SparkConf = new SparkConf() + val master = argumentMap("master") + log.info(s"Creating Spark session: Master: $master") + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(master) + .getOrCreate() + } + +} + + diff --git a/src/main/java/eu/dnetlib/scholix/CheckRelation.scala b/src/main/java/eu/dnetlib/scholix/CheckRelation.scala index 4a023d7..861a73b 100644 --- a/src/main/java/eu/dnetlib/scholix/CheckRelation.scala +++ b/src/main/java/eu/dnetlib/scholix/CheckRelation.scala @@ -1,17 +1,45 @@ package eu.dnetlib.scholix import com.fasterxml.jackson.databind.ObjectMapper -import org.apache.spark.SparkConf +import com.sandro.app.AbstractScalaApplication import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.slf4j.{Logger, LoggerFactory} import eu.dnetlib.dhp.schema.oaf.Relation +import eu.dnetlib.scholix.CheckRelation.logger +class CheckRelation( args: Array[String], log: Logger) extends AbstractScalaApplication( args: Array[String], log: Logger) { + def filterRelations(r: Relation): Boolean = { + val relClassFilter = List( + "merges", + "isMergedIn", + "HasAmongTopNSimilarDocuments", + "IsAmongTopNSimilarDocuments" + ) + if (relClassFilter.exists(k => k.equalsIgnoreCase(r.getRelClass))) + false + else { + if (r.getCollectedfrom == null || r.getCollectedfrom.size() == 0) + false + else if (r.getCollectedfrom.size() > 1) + true + else if (r.getCollectedfrom.size() == 1 && r.getCollectedfrom.get(0).getValue.equalsIgnoreCase("OpenCitations")) + false + else + true + } + } - -object CheckRelation { - - val logger: Logger = LoggerFactory.getLogger(CheckRelation.getClass.getName) + /** Here all the spark applications runs this method + * where the whole logic of the spark node is defined + */ + override def run(): Unit = { + val path = argumentMap("path") + logger.warn(s"path properties is $path") + if (path == null || path.isEmpty) + throw new IllegalArgumentException("missing path arguments.properties -path when launch file, check if it is inside the arguments.properties") + countRelation(path, spark) + } def countRelation(path:String, spark: SparkSession ): Unit = { @@ -21,31 +49,29 @@ object CheckRelation { val df = spark.read.text(path).as[String] val mapper = new ObjectMapper() - val scholix_rel =df.map(s=> mapper.readValue(s, classOf[Relation])).as[Relation].filter(r => r.getDataInfo.getDeletedbyinference == false).count() +// val total = df.count +// val not_del_rel =df.map(s=> mapper.readValue(s, classOf[Relation])).as[Relation].filter(r => r.getDataInfo.getDeletedbyinference == false).count() +// logger.warn(s"Total number of relations not deleted by Inference: $not_del_rel/$total") + val total_rels_from_scholexplorer = df.map(s=> mapper.readValue(s, classOf[Relation])) + .filter(r => r.getDataInfo != null && !r.getDataInfo.getDeletedbyinference) + .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50")) +// .filter(r => filterRelations(r)) + .count() - - logger.warn(s"Total number of relations: ${df.count}") - logger.warn(s"Total number of relations not deleted by Inference: ${scholix_rel}") - - - + logger.warn(s"Relation used by Scholexplorer $total_rels_from_scholexplorer") } +} +object CheckRelation { + + val logger: Logger = LoggerFactory.getLogger(CheckRelation.getClass.getName) def main(args: Array[String]): Unit = { - val path = args(0) - val master = args(1) - val conf:SparkConf = new SparkConf() - val spark = SparkSession.builder().config(conf).master(master).getOrCreate() - spark.sparkContext.setLogLevel("WARN") - countRelation(path,spark) - - - + new CheckRelation(args,logger).initialize().run() } diff --git a/src/test/java/com/sandro/app/SparkAppTest.scala b/src/test/java/com/sandro/app/SparkAppTest.scala new file mode 100644 index 0000000..d51956e --- /dev/null +++ b/src/test/java/com/sandro/app/SparkAppTest.scala @@ -0,0 +1,30 @@ +package com.sandro.app + + +import org.junit.jupiter.api.Test +import org.slf4j.{Logger, LoggerFactory} + +class TestApp ( args: Array[String], log: Logger) + extends AbstractScalaApplication( args: Array[String], log: Logger) { + /** Here all the spark applications runs this method + * where the whole logic of the spark node is defined + */ + override def run(): Unit = { + println(argumentMap) + + + } +} + +class SparkAppTest { + + @Test + def testParsing() :Unit = { + val log: Logger = LoggerFactory.getLogger(this.getClass.getClass) + val args = "-key1 value1 --key-2 value2" + val t:TestApp=new TestApp(args.split(" "),log) + val m =t.parseArguments(args = args.split(" ")) + println(m) + } + +}