diff --git a/dhp-build/dhp-build-properties-maven-plugin/test.properties b/dhp-build/dhp-build-properties-maven-plugin/test.properties index 66aeee4..5e77cc5 100644 --- a/dhp-build/dhp-build-properties-maven-plugin/test.properties +++ b/dhp-build/dhp-build-properties-maven-plugin/test.properties @@ -1,2 +1,2 @@ -# Tue Nov 05 10:06:34 CET 2024 +# Wed Nov 20 10:38:53 CET 2024 projectPropertyKey=projectPropertyValue diff --git a/dhp-raid/pom.xml b/dhp-raid/pom.xml index 578f7e9..e6eadb0 100644 --- a/dhp-raid/pom.xml +++ b/dhp-raid/pom.xml @@ -13,68 +13,46 @@ dhp-raid 1.0.0-SNAPSHOT - - 1.8 - 2.11 - 2.11.0 - UTF-8 - UTF-8 - 2.2.6 - UTF-8 - 2.7.3 - 2.3.2 - 2.8.1 - 8.0.1 - - - org.apache.spark spark-core_${scala.binary.version} - ${spark.version} org.apache.spark spark-sql_${scala.binary.version} - ${spark.version} org.apache.spark spark-mllib_${scala.binary.version} - ${spark.version} org.scala-lang scala-library - ${scala.version} org.junit.jupiter junit-jupiter - - org.junit.jupiter - junit-jupiter - RELEASE - test - com.jayway.jsonpath json-path - 2.4.0 + + + + org.json4s + json4s-jackson_2.11 eu.dnetlib.dhp dhp-schemas - ${dhp-schemas.version} diff --git a/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateDocuments.java b/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateDocuments.java index a14946d..b3d9d25 100644 --- a/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateDocuments.java +++ b/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateDocuments.java @@ -127,7 +127,8 @@ public class SparkCreateDocuments extends AbstractSparkJob{ Dataset documents = nodeDS .join(labels, nodeDS.col(LONG_ID_COL).equalTo(labels.col(LONG_ID_COL)), "left") - .select(col(STRING_ID_COL), col(LABELS_COL)); + .select(col(STRING_ID_COL), col(LABELS_COL)) + .repartition(numPartitions); log.info("Labels generated: {}", documents.count()); diff --git a/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateEmbeddings.java b/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateEmbeddings.java index 4588bfc..857894a 100644 --- a/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateEmbeddings.java +++ b/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateEmbeddings.java @@ -1,39 +1,23 @@ package eu.dnetlib.raid.jobs; -import com.jayway.jsonpath.JsonPath; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.raid.graph.GraphUtil; import eu.dnetlib.raid.support.ArgumentApplicationParser; -import eu.dnetlib.raid.support.EdgeParam; import eu.dnetlib.raid.support.RAiDConfig; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.graphx.Edge; import org.apache.spark.ml.feature.Normalizer; import org.apache.spark.ml.feature.Word2Vec; -import org.apache.spark.rdd.RDD; import org.apache.spark.sql.*; -import org.apache.spark.sql.catalyst.encoders.RowEncoder; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.StructField; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Tuple2; -import java.util.ArrayList; -import java.util.List; import java.util.Optional; -import static org.apache.spark.sql.functions.*; - +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.functions.size; public class SparkCreateEmbeddings extends AbstractSparkJob{ private static final Logger log = LoggerFactory.getLogger(SparkCreateEmbeddings.class); - private static final String ID_PATH = "$.id"; - private static final String DELETEDBYINFERENCE_PATH = "$.dataInfo.deletedbyinference"; - private static final Encoder REL_BEAN_ENC = Encoders.bean(Relation.class); public SparkCreateEmbeddings(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); @@ -47,7 +31,9 @@ public class SparkCreateEmbeddings extends AbstractSparkJob{ parser.parseArgument(args); - SparkConf conf = new SparkConf(); + SparkConf conf = new SparkConf() + .set("spark.kryoserializer.buffer", "64k") + .set("spark.kryoserializer.buffer.max", "512m"); new SparkCreateEmbeddings( parser, @@ -77,7 +63,7 @@ public class SparkCreateEmbeddings extends AbstractSparkJob{ JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); RAiDConfig config = loadRAiDConfig(raidConfPath); - Dataset documents = spark.read().load(documentsPath); + Dataset documents = spark.read().load(documentsPath).filter(size(col(LABELS_COL)).gt(0)).filter(col(LABELS_COL).isNotNull()); Dataset word2vecEmbeddings = new Word2Vec() .setInputCol(LABELS_COL) diff --git a/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateEmbeddingsW2V.java b/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateEmbeddingsW2V.java deleted file mode 100644 index 0aad7e1..0000000 --- a/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateEmbeddingsW2V.java +++ /dev/null @@ -1,213 +0,0 @@ -package eu.dnetlib.raid.jobs; - -import com.jayway.jsonpath.JsonPath; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.raid.graph.GraphUtil; -import eu.dnetlib.raid.support.ArgumentApplicationParser; -import eu.dnetlib.raid.support.EdgeParam; -import eu.dnetlib.raid.support.RAiDConfig; -import eu.dnetlib.raid.support.RandomWalkParam; -import eu.dnetlib.raid.walker.RandomWalk; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.ml.feature.Normalizer; -import org.apache.spark.ml.feature.Word2Vec; -import org.apache.spark.ml.feature.Word2VecModel; - -import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.*; -import org.apache.spark.sql.catalyst.encoders.RowEncoder; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -import static org.apache.spark.sql.functions.col; -import static org.apache.spark.sql.functions.lit; - -public class SparkCreateEmbeddingsW2V extends AbstractSparkJob{ - - private static final Logger log = LoggerFactory.getLogger(SparkCreateEmbeddingsW2V.class); - - private static final String ID_PATH = "$.id"; - private static final String DELETEDBYINFERENCE_PATH = "$.dataInfo.deletedbyinference"; - private static final Encoder REL_BEAN_ENC = Encoders.bean(Relation.class); - - public SparkCreateEmbeddingsW2V(ArgumentApplicationParser parser, SparkSession spark) { - super(parser, spark); - } - - public static void main(String[] args) throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - readResource("/jobs/parameters/createEmbeddings_parameters.json", SparkCreateEmbeddingsW2V.class) - ); - - parser.parseArgument(args); - - SparkConf conf = new SparkConf(); - - new SparkCreateEmbeddingsW2V( - parser, - getSparkSession(conf) - ).run(); - } - - @Override - public void run() throws Exception { - - // read oozie parameters - final String graphBasePath = parser.get("graphBasePath"); - final String workingPath = parser.get("workingPath"); - final String outputPath = parser.get("outputPath"); - final String raidConfPath = parser.get("raidConfPath"); - final int numPartitions = Optional - .ofNullable(parser.get("numPartitions")) - .map(Integer::valueOf) - .orElse(NUM_PARTITIONS); - - log.info("graphBasePath: '{}'", graphBasePath); - log.info("workingPath: '{}'", workingPath); - log.info("outputPath: '{}'", outputPath); - log.info("raidConfPath: '{}'", raidConfPath); - log.info("numPartitions: '{}'", numPartitions); - - JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - RAiDConfig config = loadRAiDConfig(raidConfPath); - - // create nodes - JavaRDD nodes = sc.emptyRDD(); - for (String nodeType: config.getNodes()) { - nodes = nodes.union( - sc.textFile(graphBasePath + "/" + nodeType) - .filter(json -> !((boolean) JsonPath.read(json, DELETEDBYINFERENCE_PATH))) - .map(json -> JsonPath.read(json, ID_PATH))); - } - RDD nodeRDD = nodes.zipWithIndex().map(n -> RowFactory.create(n._1(), n._2())).rdd(); - - Dataset nodeDS = spark.createDataset(nodeRDD, RowEncoder.apply( - new StructType( - new StructField[]{ - new StructField(STRING_ID_COL, DataTypes.StringType, false, Metadata.empty()), - new StructField(LONG_ID_COL, DataTypes.LongType, false, Metadata.empty()) - }) - )); - log.info("Number of nodes: {}", nodeDS.count()); - - Dataset relations = spark - .read() - .schema(REL_BEAN_ENC.schema()) - .json(graphBasePath + "/relation"); - log.info("Number of relations: {}", relations.count()); - - // create random walks - Dataset randomWalks = spark.createDataFrame( - new ArrayList<>(), new StructType( - new StructField[]{ - new StructField(RANDOM_WALK_COL, DataTypes.createArrayType(DataTypes.IntegerType), false, Metadata.empty()) - })); - for(EdgeParam edgeParam: config.getEdges()) { - - log.info("Creating '{}' edges with '{}' metapath", edgeParam.getName(), edgeParam.getMetapath()); - Dataset edges = createEdges(nodeDS, relations, edgeParam.getMetapath()); - log.info("Number of '{}' edges: {}", edgeParam.getName(), edges.count()); - - - RandomWalkParam randomWalkParam = config.getRandomWalks().get(edgeParam.getName()); - - Dataset randomWalk = new RandomWalk() - .setNumWalk(randomWalkParam.getNumWalks()) - .setWalkLength(randomWalkParam.getWalkLength()) - .setQ(randomWalkParam.getQ()) - .setP(randomWalkParam.getP()) - .setOutputCol(RANDOM_WALK_COL) - .randomWalk(edges); - log.info("Number of random walks for '{}' edges: {}", edgeParam.getName(), randomWalk.count()); - - randomWalks = randomWalks.union(randomWalk); - - } - - randomWalks.cache(); - - log.info("Creating embeddings"); - Word2VecModel word2VecModel = new Word2Vec() - .setMaxSentenceLength(config.getParams().getOrDefault("maxWalkLength", 100).intValue()) - .setMinCount(config.getParams().getOrDefault("minCount", 2).intValue()) - .setWindowSize(config.getParams().getOrDefault("windowSize", 5).intValue()) - .setVectorSize(config.getParams().getOrDefault("embeddingSize", 128).intValue()) - .setMaxIter(config.getParams().getOrDefault("maxIter", 20).intValue()) - .setStepSize(config.getParams().getOrDefault("learningRate", 0.1).doubleValue()) - .setSeed(SEED) - .setNumPartitions(numPartitions) - .setInputCol(RANDOM_WALK_COL) - .setOutputCol(EMBEDDING_COL) - .fit(randomWalks); - - Dataset embeddings = word2VecModel - .getVectors() - .toDF(LONG_ID_COL, EMBEDDING_COL); - - Normalizer normalizer = new Normalizer() - .setInputCol(EMBEDDING_COL) - .setOutputCol("normalized_" + EMBEDDING_COL) - .setP(2.0); - - embeddings = normalizer - .transform(embeddings) - .select(col(LONG_ID_COL), col("normalized_" + EMBEDDING_COL).as(EMBEDDING_COL)); - - Dataset result = nodeDS - .join(embeddings, nodeDS.col(LONG_ID_COL).equalTo(embeddings.col(LONG_ID_COL))) - .select(col(STRING_ID_COL), col(EMBEDDING_COL)); - - log.info("Number of generated embeddings: {}", result.count()); - - result.write().save(outputPath); - - } - - public Dataset createEdges(Dataset nodeDS, Dataset relations, List metapath) throws Exception { - - Dataset edges; - switch(metapath.size()) { - case 1: - edges = relations - .where(col("relClass").equalTo(metapath.get(0))) - .select(col("source").as("src"), col("target").as("dst")); - break; - case 2: - Dataset edges_1 = relations - .where(col("relClass").equalTo(metapath.get(0))) - .select(col("source").as("source_1"), col("target").as("target_1")); - Dataset edges_2 = relations - .where(col("relClass").equalTo(metapath.get(1))) - .select(col("source").as("source_2"), col("target").as("target_2")); - - edges = edges_1 - .join(edges_2, edges_1.col("target_1").equalTo(edges_2.col("source_2"))) - .select(col("source_1").as("src"), col("target_2").as("dst")); - break; - default: - throw new Exception("Metapath size not allowed"); - } - - // join with nodes to get longs instead of string ids - edges = edges - .join(nodeDS, edges.col("src").equalTo(nodeDS.col(STRING_ID_COL))) - .select(col(LONG_ID_COL).as("src"), col("dst")) - .join(nodeDS, edges.col("dst").equalTo(nodeDS.col(STRING_ID_COL))) - .select(col("src"), col(LONG_ID_COL).as("dst")) - .withColumn("weight", lit(1.0)); - - return GraphUtil.preProcess(edges, "src", "dst", "weight"); - } - -} diff --git a/dhp-raid/src/main/resources/raid/oozie_app/workflow.xml b/dhp-raid/src/main/resources/raid/oozie_app/workflow.xml index b09ffbc..756a0da 100644 --- a/dhp-raid/src/main/resources/raid/oozie_app/workflow.xml +++ b/dhp-raid/src/main/resources/raid/oozie_app/workflow.xml @@ -83,7 +83,7 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] diff --git a/pom.xml b/pom.xml index 013f321..2c55ac1 100644 --- a/pom.xml +++ b/pom.xml @@ -233,12 +233,19 @@ 2.2.2 15.0 - - 2.2.0 - - 2.6.5 + 2.4.0.cloudera2 + 2.14.2 3.3.3 [8.0.1] + 3.5.3 + + 1.8 + UTF-8 + UTF-8 + 2.2.6 + UTF-8 + 2.7.3 + 2.8.1 3.5 2.4 @@ -246,7 +253,8 @@ 1.1.3 4.9 - 2.11.8 + 2.11 + 2.11.12 false 3.6.0 @@ -275,119 +283,31 @@ - - edu.cmu - secondstring - 1.0.0 - - - org.antlr - stringtemplate - 3.2 - - - - com.fasterxml.jackson.core - jackson-databind - ${jackson.version} - provided - - - com.fasterxml.jackson.dataformat - jackson-dataformat-xml - ${jackson.version} - provided - - - com.fasterxml.jackson.module - jackson-module-jsonSchema - ${jackson.version} - provided - - - com.fasterxml.jackson.core - jackson-core - ${jackson.version} - provided - - - com.fasterxml.jackson.core - jackson-annotations - ${jackson.version} - provided - - - - org.mockito - mockito-core - ${mockito-core.version} - test - - - - org.mockito - mockito-junit-jupiter - ${mockito-core.version} - test - - - - org.apache.commons - commons-math3 - 3.6.1 - - - - com.google.guava - guava - ${google.guava.version} - - - com.google.code.gson - gson - ${google.gson.version} - - - - org.apache.commons - commons-lang3 - ${commons.lang.version} - - - - commons-io - commons-io - ${commons.io.version} - - - commons-collections - commons-collections - ${commons.collections.version} - - - commons-logging - commons-logging - ${commons.logging.version} - org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} ${spark.version} provided org.apache.spark - spark-graphx_2.11 + spark-graphx_${scala.binary.version} ${spark.version} provided org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} ${spark.version} provided + + org.apache.spark + spark-mllib_${scala.binary.version} + ${spark.version} + + org.junit.jupiter junit-jupiter @@ -409,13 +329,7 @@ com.jayway.jsonpath json-path - 2.4.0 - - - - com.ibm.icu - icu4j - 70.1 + 2.9.0 @@ -433,7 +347,13 @@ com.github.haifengl smile-core - 2.5.3 + 2.5.3 + + + + org.json4s + json4s-jackson_2.11 + ${json4s.version}