Implemented standard ScalaApp where all the other class should inherit
This commit is contained in:
parent
dc39761130
commit
11634d1cc5
|
@ -1,6 +1,6 @@
|
||||||
.DS_Store
|
.DS_Store
|
||||||
.idea/
|
.idea/
|
||||||
|
arguments.properties
|
||||||
# ---> Java
|
# ---> Java
|
||||||
# Compiled class file
|
# Compiled class file
|
||||||
*.class
|
*.class
|
||||||
|
|
|
@ -5,9 +5,20 @@ from pathlib import Path
|
||||||
import re
|
import re
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
|
|
||||||
regex = r"<\w*>(.*)<\/\w*>"
|
regex = r"<\w*>(.*)<\/\w*>"
|
||||||
|
|
||||||
|
|
||||||
|
def extract_argument(path):
|
||||||
|
arguments = {}
|
||||||
|
with open(path) as f:
|
||||||
|
for line in f:
|
||||||
|
if not line.startswith("#"):
|
||||||
|
s = line.strip().split("=")
|
||||||
|
arguments[s[0].strip()] = s[1].strip()
|
||||||
|
return arguments
|
||||||
|
|
||||||
|
|
||||||
def get_jar_path():
|
def get_jar_path():
|
||||||
for current_path in glob.glob("target/*.jar"):
|
for current_path in glob.glob("target/*.jar"):
|
||||||
return current_path
|
return current_path
|
||||||
|
@ -41,6 +52,17 @@ def extract_dependencies():
|
||||||
check_dependency = False
|
check_dependency = False
|
||||||
return dep_to_add
|
return dep_to_add
|
||||||
|
|
||||||
|
def extracting_class_args(args):
|
||||||
|
d = []
|
||||||
|
skip =["user_name", "reference_class"]
|
||||||
|
for item in args:
|
||||||
|
if item not in skip:
|
||||||
|
d.append(f"-{item}")
|
||||||
|
d.append(args[item])
|
||||||
|
|
||||||
|
return " ".join(d)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def to_path(dependency):
|
def to_path(dependency):
|
||||||
home_dir = Path.home()
|
home_dir = Path.home()
|
||||||
|
@ -59,19 +81,12 @@ if __name__ == "__main__":
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
description="This scripts help you to publish, execute your check script in scala ")
|
description="This scripts help you to publish, execute your check script in scala ")
|
||||||
parser.add_argument("user_name")
|
parser.add_argument("path")
|
||||||
parser.add_argument("reference_class")
|
python_args = parser.parse_args()
|
||||||
parser.add_argument("argument_file")
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
print(f"reference_class = {args.reference_class} argument_file={args.argument_file}")
|
if os.path.exists(python_args.path):
|
||||||
|
script_argument = extract_argument(python_args.path)
|
||||||
if os.path.exists(args.argument_file):
|
|
||||||
other_arguments = []
|
other_arguments = []
|
||||||
with open(args.argument_file) as f:
|
|
||||||
for line in f:
|
|
||||||
if len(line.strip()):
|
|
||||||
other_arguments.append(line.strip())
|
|
||||||
print("Cleaning Compile application ")
|
print("Cleaning Compile application ")
|
||||||
os.system('mvn clean compile package')
|
os.system('mvn clean compile package')
|
||||||
main_jar_path = get_jar_path()
|
main_jar_path = get_jar_path()
|
||||||
|
@ -80,26 +95,28 @@ if __name__ == "__main__":
|
||||||
|
|
||||||
print("copy on your root folder")
|
print("copy on your root folder")
|
||||||
|
|
||||||
os.system("ssh {}@iis-cdh5-test-gw.ocean.icm.edu.pl rm -rf sandro_nb".format(args.user_name))
|
os.system("ssh {}@iis-cdh5-test-gw.ocean.icm.edu.pl rm -rf sandro_nb".format(script_argument['user_name']))
|
||||||
|
|
||||||
os.system("ssh {}@iis-cdh5-test-gw.ocean.icm.edu.pl mkdir sandro_nb".format(args.user_name))
|
os.system("ssh {}@iis-cdh5-test-gw.ocean.icm.edu.pl mkdir sandro_nb".format(script_argument['user_name']))
|
||||||
|
|
||||||
os.system(f"scp {main_jar_path} {args.user_name}@iis-cdh5-test-gw.ocean.icm.edu.pl:sandro_nb/")
|
os.system(f"scp {main_jar_path} {script_argument['user_name']}@iis-cdh5-test-gw.ocean.icm.edu.pl:sandro_nb/")
|
||||||
jars = []
|
jars = []
|
||||||
for item in deps:
|
for item in deps:
|
||||||
name, p = to_path(item)
|
name, p = to_path(item)
|
||||||
if p:
|
if p:
|
||||||
print(f"Copying dependencies {p} to lib deps")
|
print(f"Copying dependencies {p} to lib deps")
|
||||||
os.system(f"scp {p} {args.user_name}@iis-cdh5-test-gw.ocean.icm.edu.pl:sandro_nb/")
|
os.system(f"scp {p} {script_argument['user_name']}@iis-cdh5-test-gw.ocean.icm.edu.pl:sandro_nb/")
|
||||||
jars.append(name)
|
jars.append(name)
|
||||||
|
|
||||||
j_name = ",".join(["sandro_nb/"+ item for item in jars])
|
j_name = ",".join(["sandro_nb/"+ item for item in jars])
|
||||||
name = main_jar_path.replace("target/", "")
|
name = main_jar_path.replace("target/", "")
|
||||||
command = f"spark2-submit --master yarn --jars {j_name} --class {args.reference_class} sandro_nb/{name}"
|
|
||||||
|
class_args = extracting_class_args(script_argument)
|
||||||
|
command = f"spark2-submit --master yarn --jars {j_name} --class {script_argument['reference_class']} sandro_nb/{name} {class_args}"
|
||||||
|
|
||||||
print(f"executing command {command}")
|
print(f"executing command {command}")
|
||||||
|
|
||||||
os.system(
|
os.system("ssh {}@iis-cdh5-test-gw.ocean.icm.edu.pl {} ".format(script_argument['user_name'], command, ))
|
||||||
"ssh {}@iis-cdh5-test-gw.ocean.icm.edu.pl {} {}".format(args.user_name, command, " ".join(other_arguments)))
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise Exception(f"path not found {args.argument_file}")
|
raise Exception(f"path not found {python_args.path}")
|
||||||
|
|
7
pom.xml
7
pom.xml
|
@ -137,11 +137,18 @@
|
||||||
<artifactId>dhp-schemas</artifactId>
|
<artifactId>dhp-schemas</artifactId>
|
||||||
<version>3.15.0</version>
|
<version>3.15.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter</artifactId>
|
||||||
|
<version>${junit-jupiter.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
|
<junit-jupiter.version>5.6.1</junit-jupiter.version>
|
||||||
<net.alchim31.maven.version>4.0.1</net.alchim31.maven.version>
|
<net.alchim31.maven.version>4.0.1</net.alchim31.maven.version>
|
||||||
<dhp.jackson.version>2.9.6</dhp.jackson.version>
|
<dhp.jackson.version>2.9.6</dhp.jackson.version>
|
||||||
<scala.version>2.11.12</scala.version>
|
<scala.version>2.11.12</scala.version>
|
||||||
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
package com.sandro.app
|
||||||
|
|
||||||
|
import org.apache.spark.SparkConf
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
|
import org.slf4j.Logger
|
||||||
|
import scala.collection.mutable
|
||||||
|
|
||||||
|
trait SparkApp {
|
||||||
|
|
||||||
|
/** Utility to parse the arguments.properties using the
|
||||||
|
* property json in the classpath identified from
|
||||||
|
* the variable propertyPath
|
||||||
|
*
|
||||||
|
* @param args the list of arguments.properties
|
||||||
|
*/
|
||||||
|
def parseArguments(args: Array[String]): mutable.Map[String, String] = {
|
||||||
|
var currentVariable: String = null
|
||||||
|
val argumentMap: mutable.Map[String, String] = mutable.Map()
|
||||||
|
|
||||||
|
args.zipWithIndex.foreach {
|
||||||
|
case (x, i) =>
|
||||||
|
if (i % 2 == 0) {
|
||||||
|
// ERROR in case the syntax is wrong
|
||||||
|
if (!x.startsWith("-")) throw new IllegalArgumentException("wrong input syntax expected -variable_name value")
|
||||||
|
|
||||||
|
if (x.startsWith("--"))
|
||||||
|
currentVariable = x.substring(2)
|
||||||
|
else
|
||||||
|
currentVariable = x.substring(1)
|
||||||
|
}
|
||||||
|
else argumentMap += (currentVariable -> x)
|
||||||
|
}
|
||||||
|
argumentMap
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Here all the spark applications runs this method
|
||||||
|
* where the whole logic of the spark node is defined
|
||||||
|
*/
|
||||||
|
def run(): Unit
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract class AbstractScalaApplication(val args: Array[String],
|
||||||
|
log: Logger
|
||||||
|
) extends SparkApp {
|
||||||
|
|
||||||
|
var argumentMap: mutable.Map[String, String] = _
|
||||||
|
|
||||||
|
|
||||||
|
var spark: SparkSession = _
|
||||||
|
|
||||||
|
def initialize(): SparkApp = {
|
||||||
|
argumentMap = parseArguments(args)
|
||||||
|
spark = createSparkSession()
|
||||||
|
spark.sparkContext.setLogLevel("WARN")
|
||||||
|
this
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Utility for creating a spark session starting from parser
|
||||||
|
*
|
||||||
|
* @return a spark Session
|
||||||
|
*/
|
||||||
|
private def createSparkSession(): SparkSession = {
|
||||||
|
require(argumentMap != null)
|
||||||
|
|
||||||
|
val conf: SparkConf = new SparkConf()
|
||||||
|
val master = argumentMap("master")
|
||||||
|
log.info(s"Creating Spark session: Master: $master")
|
||||||
|
SparkSession
|
||||||
|
.builder()
|
||||||
|
.config(conf)
|
||||||
|
.appName(getClass.getSimpleName)
|
||||||
|
.master(master)
|
||||||
|
.getOrCreate()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,17 +1,45 @@
|
||||||
package eu.dnetlib.scholix
|
package eu.dnetlib.scholix
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import org.apache.spark.SparkConf
|
import com.sandro.app.AbstractScalaApplication
|
||||||
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
|
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
|
||||||
import org.slf4j.{Logger, LoggerFactory}
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation
|
import eu.dnetlib.dhp.schema.oaf.Relation
|
||||||
|
import eu.dnetlib.scholix.CheckRelation.logger
|
||||||
|
|
||||||
|
class CheckRelation( args: Array[String], log: Logger) extends AbstractScalaApplication( args: Array[String], log: Logger) {
|
||||||
|
|
||||||
|
def filterRelations(r: Relation): Boolean = {
|
||||||
|
val relClassFilter = List(
|
||||||
|
"merges",
|
||||||
|
"isMergedIn",
|
||||||
|
"HasAmongTopNSimilarDocuments",
|
||||||
|
"IsAmongTopNSimilarDocuments"
|
||||||
|
)
|
||||||
|
if (relClassFilter.exists(k => k.equalsIgnoreCase(r.getRelClass)))
|
||||||
|
false
|
||||||
|
else {
|
||||||
|
if (r.getCollectedfrom == null || r.getCollectedfrom.size() == 0)
|
||||||
|
false
|
||||||
|
else if (r.getCollectedfrom.size() > 1)
|
||||||
|
true
|
||||||
|
else if (r.getCollectedfrom.size() == 1 && r.getCollectedfrom.get(0).getValue.equalsIgnoreCase("OpenCitations"))
|
||||||
|
false
|
||||||
|
else
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Here all the spark applications runs this method
|
||||||
object CheckRelation {
|
* where the whole logic of the spark node is defined
|
||||||
|
*/
|
||||||
val logger: Logger = LoggerFactory.getLogger(CheckRelation.getClass.getName)
|
override def run(): Unit = {
|
||||||
|
val path = argumentMap("path")
|
||||||
|
logger.warn(s"path properties is $path")
|
||||||
|
if (path == null || path.isEmpty)
|
||||||
|
throw new IllegalArgumentException("missing path arguments.properties -path when launch file, check if it is inside the arguments.properties")
|
||||||
|
countRelation(path, spark)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def countRelation(path:String, spark: SparkSession ): Unit = {
|
def countRelation(path:String, spark: SparkSession ): Unit = {
|
||||||
|
@ -21,31 +49,29 @@ object CheckRelation {
|
||||||
val df = spark.read.text(path).as[String]
|
val df = spark.read.text(path).as[String]
|
||||||
|
|
||||||
val mapper = new ObjectMapper()
|
val mapper = new ObjectMapper()
|
||||||
val scholix_rel =df.map(s=> mapper.readValue(s, classOf[Relation])).as[Relation].filter(r => r.getDataInfo.getDeletedbyinference == false).count()
|
// val total = df.count
|
||||||
|
// val not_del_rel =df.map(s=> mapper.readValue(s, classOf[Relation])).as[Relation].filter(r => r.getDataInfo.getDeletedbyinference == false).count()
|
||||||
|
// logger.warn(s"Total number of relations not deleted by Inference: $not_del_rel/$total")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
val total_rels_from_scholexplorer = df.map(s=> mapper.readValue(s, classOf[Relation]))
|
||||||
|
.filter(r => r.getDataInfo != null && !r.getDataInfo.getDeletedbyinference)
|
||||||
|
.filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50"))
|
||||||
|
// .filter(r => filterRelations(r))
|
||||||
|
.count()
|
||||||
|
|
||||||
|
logger.warn(s"Relation used by Scholexplorer $total_rels_from_scholexplorer")
|
||||||
logger.warn(s"Total number of relations: ${df.count}")
|
|
||||||
logger.warn(s"Total number of relations not deleted by Inference: ${scholix_rel}")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
object CheckRelation {
|
||||||
|
|
||||||
|
val logger: Logger = LoggerFactory.getLogger(CheckRelation.getClass.getName)
|
||||||
|
|
||||||
def main(args: Array[String]): Unit = {
|
def main(args: Array[String]): Unit = {
|
||||||
val path = args(0)
|
new CheckRelation(args,logger).initialize().run()
|
||||||
val master = args(1)
|
|
||||||
val conf:SparkConf = new SparkConf()
|
|
||||||
val spark = SparkSession.builder().config(conf).master(master).getOrCreate()
|
|
||||||
spark.sparkContext.setLogLevel("WARN")
|
|
||||||
countRelation(path,spark)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
package com.sandro.app
|
||||||
|
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test
|
||||||
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
|
class TestApp ( args: Array[String], log: Logger)
|
||||||
|
extends AbstractScalaApplication( args: Array[String], log: Logger) {
|
||||||
|
/** Here all the spark applications runs this method
|
||||||
|
* where the whole logic of the spark node is defined
|
||||||
|
*/
|
||||||
|
override def run(): Unit = {
|
||||||
|
println(argumentMap)
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class SparkAppTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testParsing() :Unit = {
|
||||||
|
val log: Logger = LoggerFactory.getLogger(this.getClass.getClass)
|
||||||
|
val args = "-key1 value1 --key-2 value2"
|
||||||
|
val t:TestApp=new TestApp(args.split(" "),log)
|
||||||
|
val m =t.parseArguments(args = args.split(" "))
|
||||||
|
println(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue