implementation of sampled join and split of create embedding job
This commit is contained in:
parent
c4eb3211cc
commit
3fd4350213
|
@ -5,65 +5,17 @@
|
|||
</component>
|
||||
<component name="ChangeListManager">
|
||||
<list default="true" id="67528cef-c396-4288-9c84-8cedea88df9c" name="Changes" comment="">
|
||||
<change afterPath="$PROJECT_DIR$/.gitignore" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/.idea/codeStyles/Project.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/.idea/codeStyles/codeStyleConfig.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/.idea/encodings.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/.idea/misc.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/.idea/packagesearch.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/.idea/scala_compiler.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/.idea/uiDesigner.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/.idea/vcs.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-build-assembly-resources/README.markdown" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-build-assembly-resources/pom.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-build-assembly-resources/src/main/resources/assemblies/oozie-installer.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-build-assembly-resources/src/main/resources/assemblies/tests.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-build-assembly-resources/src/main/resources/commands/get_working_dir.sh" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-build-assembly-resources/src/main/resources/commands/print_working_dir.sh" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-build-assembly-resources/src/main/resources/commands/readme.markdown" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-build-assembly-resources/src/main/resources/commands/run_workflow.sh" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-build-assembly-resources/src/main/resources/commands/upload_workflow.sh" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-build-assembly-resources/src/main/resources/project-default.properties" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-build-properties-maven-plugin/README.markdown" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-build-properties-maven-plugin/pom.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-build-properties-maven-plugin/src/main/java/eu/dnetlib/maven/plugin/properties/GenerateOoziePropertiesMojo.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-build-properties-maven-plugin/src/main/java/eu/dnetlib/maven/plugin/properties/WritePredefinedProjectProperties.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-build-properties-maven-plugin/src/test/java/eu/dnetlib/maven/plugin/properties/GenerateOoziePropertiesMojoTest.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-build-properties-maven-plugin/src/test/java/eu/dnetlib/maven/plugin/properties/WritePredefinedProjectPropertiesTest.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-build-properties-maven-plugin/src/test/resources/eu/dnetlib/maven/plugin/properties/included.properties" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-build-properties-maven-plugin/test.properties" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-code-style/pom.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-code-style/src/main/resources/eclipse/formatter_aosp.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-code-style/src/main/resources/eclipse/formatter_dnet.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/dhp-code-style/src/main/resources/eclipse/formatter_google.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-build/pom.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/pom.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/AbstractSparkJob.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateEmbeddings.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateEmbeddingsW2V.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkRAiDClustering.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/ArgumentApplicationParser.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/EdgeParam.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/OptionsParameter.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/RAiDConfig.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/RandomWalkParam.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/jobs/parameters/createClusters_parameters.json" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/jobs/parameters/createEmbeddings_parameters.json" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/raid/oozie_app/config-default.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/raid/oozie_app/workflow.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/scala/eu/dnetlib/raid/graph/AliasOps.scala" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/scala/eu/dnetlib/raid/graph/GraphOps.scala" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/scala/eu/dnetlib/raid/graph/GraphUtil.scala" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/scala/eu/dnetlib/raid/graph/package.scala" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/scala/eu/dnetlib/raid/walker/RandomWalk.scala" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/test/java/eu/dnetlib/raid/RAiDInferenceTest.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/config/raid.conf.json" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/dataset" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/publication" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/relation" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/software" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/pom.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateDocuments.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/jobs/parameters/createDocuments_parameters.json" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/job-override.properties" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/job-override.properties" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/pom.xml" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateEmbeddings.java" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateEmbeddings.java" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkRAiDClustering.java" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkRAiDClustering.java" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/resources/jobs/parameters/createEmbeddings_parameters.json" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/jobs/parameters/createEmbeddings_parameters.json" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/resources/raid/oozie_app/workflow.xml" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/raid/oozie_app/workflow.xml" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/test/java/eu/dnetlib/raid/RAiDInferenceTest.java" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/test/java/eu/dnetlib/raid/RAiDInferenceTest.java" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/config/raid.conf.json" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/config/raid.conf.json" afterDir="false" />
|
||||
</list>
|
||||
<option name="SHOW_DIALOG" value="false" />
|
||||
<option name="HIGHLIGHT_CONFLICTS" value="true" />
|
||||
|
@ -220,6 +172,7 @@
|
|||
<workItem from="1729084634987" duration="21063000" />
|
||||
<workItem from="1729257864565" duration="95390000" />
|
||||
<workItem from="1730275959282" duration="35702000" />
|
||||
<workItem from="1731482199949" duration="11193000" />
|
||||
</task>
|
||||
<servers />
|
||||
</component>
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
graphBasePath = /tmp/beta_provision/graph/06_graph_inferred
|
||||
graphBasePath = /tmp/raid_subgraph_test/
|
||||
workingPath = /user/michele.debonis/raid_inference_test/working_dir
|
||||
clustersPath = /user/michele.debonis/raid_inference_test/clusters
|
||||
embeddingsPath = /user/michele.debonis/raid_inference_test/embeddings
|
||||
documentsPath = /user/michele.debonis/raid_inference_test/documents
|
||||
raidConfPath = /user/michele.debonis/raid.conf.json
|
||||
numPartitions = 1000
|
||||
numPartitions = 300
|
|
@ -11,7 +11,7 @@
|
|||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
<artifactId>dhp-raid</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
|
|
|
@ -0,0 +1,228 @@
|
|||
package eu.dnetlib.raid.jobs;
|
||||
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.raid.graph.GraphUtil;
|
||||
import eu.dnetlib.raid.support.ArgumentApplicationParser;
|
||||
import eu.dnetlib.raid.support.EdgeParam;
|
||||
import eu.dnetlib.raid.support.RAiDConfig;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.graphx.Edge;
|
||||
import org.apache.spark.ml.feature.Normalizer;
|
||||
import org.apache.spark.ml.feature.Word2Vec;
|
||||
import org.apache.spark.rdd.RDD;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
|
||||
import org.apache.spark.sql.types.DataTypes;
|
||||
import org.apache.spark.sql.types.StructField;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.apache.spark.sql.functions.*;
|
||||
|
||||
|
||||
public class SparkCreateDocuments extends AbstractSparkJob{
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkCreateDocuments.class);
|
||||
private static final String ID_PATH = "$.id";
|
||||
private static final String DELETEDBYINFERENCE_PATH = "$.dataInfo.deletedbyinference";
|
||||
private static final Encoder<Relation> REL_BEAN_ENC = Encoders.bean(Relation.class);
|
||||
|
||||
public SparkCreateDocuments(ArgumentApplicationParser parser, SparkSession spark) {
|
||||
super(parser, spark);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
readResource("/jobs/parameters/createDocuments_parameters.json", SparkCreateDocuments.class)
|
||||
);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
new SparkCreateDocuments(
|
||||
parser,
|
||||
getSparkSession(conf)
|
||||
).run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() throws Exception {
|
||||
|
||||
// read oozie parameters
|
||||
final String graphBasePath = parser.get("graphBasePath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
final String outputPath = parser.get("outputPath");
|
||||
final String raidConfPath = parser.get("raidConfPath");
|
||||
final int numPartitions = Optional
|
||||
.ofNullable(parser.get("numPartitions"))
|
||||
.map(Integer::valueOf)
|
||||
.orElse(NUM_PARTITIONS);
|
||||
|
||||
log.info("graphBasePath: '{}'", graphBasePath);
|
||||
log.info("workingPath: '{}'", workingPath);
|
||||
log.info("outputPath: '{}'", outputPath);
|
||||
log.info("raidConfPath: '{}'", raidConfPath);
|
||||
log.info("numPartitions: '{}'", numPartitions);
|
||||
|
||||
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
RAiDConfig config = loadRAiDConfig(raidConfPath);
|
||||
|
||||
Dataset<Row> nodeDS = prepareNodes(sc, graphBasePath, config, numPartitions);
|
||||
log.info("Number of nodes: {}", nodeDS.count());
|
||||
|
||||
RDD<Tuple2<Object, String>> vertices = createVertices(nodeDS);
|
||||
|
||||
Dataset<Row> relations = spark
|
||||
.read()
|
||||
.schema(REL_BEAN_ENC.schema())
|
||||
.json(graphBasePath + "/relation")
|
||||
.repartition(numPartitions);
|
||||
log.info("Number of relations: {}", relations.count());
|
||||
|
||||
Dataset<Row> labelDS = spark.createDataFrame(
|
||||
new ArrayList<>(), DataTypes.createStructType(
|
||||
new StructField[]{
|
||||
DataTypes.createStructField(LONG_ID_COL, DataTypes.LongType, false),
|
||||
DataTypes.createStructField(LABEL_ID_COL, DataTypes.StringType, false)
|
||||
}));
|
||||
|
||||
for(EdgeParam edgeParam: config.getEdges()) {
|
||||
|
||||
RDD<Edge<String>> edges = createEdges(
|
||||
nodeDS,
|
||||
relations,
|
||||
edgeParam.getMetapath(),
|
||||
edgeParam.getName(),
|
||||
config.getParams().getOrDefault("joinMaxIter", 20).intValue(),
|
||||
config.getParams().getOrDefault("joinSample", 0.1).doubleValue());
|
||||
|
||||
Dataset<Row> connectedComponents = GraphUtil.createConnectedComponents(spark, vertices, edges, 1);
|
||||
|
||||
//this is getting rid of those connected components of a single element (they are not meaningful ?)
|
||||
Dataset<Row> counts = connectedComponents
|
||||
.groupBy(LABEL_ID_COL)
|
||||
.agg(functions.count(LABEL_ID_COL).alias("count"))
|
||||
.filter(functions.col("count").gt(1));
|
||||
connectedComponents = connectedComponents
|
||||
.join(counts, connectedComponents.col(LABEL_ID_COL).equalTo(counts.col(LABEL_ID_COL)), "inner")
|
||||
.select(connectedComponents.col(LONG_ID_COL), connectedComponents.col(LABEL_ID_COL));
|
||||
|
||||
labelDS = labelDS.union(connectedComponents);
|
||||
|
||||
}
|
||||
|
||||
Dataset<Row> labels = labelDS
|
||||
.groupBy(col(LONG_ID_COL))
|
||||
.agg(collect_list(labelDS.col(LABEL_ID_COL)).as(LABELS_COL));
|
||||
|
||||
Dataset<Row> documents = nodeDS
|
||||
.join(labels, nodeDS.col(LONG_ID_COL).equalTo(labels.col(LONG_ID_COL)), "left")
|
||||
.select(col(STRING_ID_COL), col(LABELS_COL));
|
||||
|
||||
log.info("Labels generated: {}", documents.count());
|
||||
|
||||
documents.write().save(outputPath);
|
||||
|
||||
}
|
||||
|
||||
public Dataset<Row> prepareNodes(JavaSparkContext sc, String graphBasePath, RAiDConfig config, int numPartitions) {
|
||||
// create nodes
|
||||
JavaRDD<String> nodes = sc.emptyRDD();
|
||||
for (String nodeType: config.getNodes()) {
|
||||
nodes = nodes.union(
|
||||
sc.textFile(graphBasePath + "/" + nodeType)
|
||||
// .filter(json -> !((boolean) JsonPath.read(json, DELETEDBYINFERENCE_PATH)))
|
||||
.map(json -> JsonPath.read(json, ID_PATH)));
|
||||
}
|
||||
|
||||
return spark.createDataset(
|
||||
nodes.zipWithIndex().map(n -> RowFactory.create(n._1(), n._2())).rdd(),
|
||||
RowEncoder.apply(
|
||||
DataTypes.createStructType(
|
||||
new StructField[]{
|
||||
DataTypes.createStructField(STRING_ID_COL, DataTypes.StringType, false),
|
||||
DataTypes.createStructField(LONG_ID_COL, DataTypes.LongType, false)
|
||||
})
|
||||
))
|
||||
.repartition(numPartitions);
|
||||
}
|
||||
|
||||
public RDD<Tuple2<Object,String>> createVertices(Dataset<Row> nodeDS) {
|
||||
return nodeDS
|
||||
.toJavaRDD()
|
||||
.map(row -> new Tuple2<>(row.get(1), row.getString(0)))
|
||||
.rdd();
|
||||
}
|
||||
|
||||
public RDD<Edge<String>> createEdges(Dataset<Row> nodeDS, Dataset<Row> relations, List<String> metapath, String edgeName, int maxIter, double fraction) throws Exception {
|
||||
|
||||
Dataset<Row> edgesRow = spark.createDataFrame(
|
||||
new ArrayList<>(), DataTypes.createStructType(
|
||||
new StructField[]{
|
||||
DataTypes.createStructField("src", DataTypes.StringType, false),
|
||||
DataTypes.createStructField("dst", DataTypes.StringType, false),
|
||||
DataTypes.createStructField("relClass", DataTypes.StringType, false)
|
||||
}
|
||||
)
|
||||
);
|
||||
switch(metapath.size()) {
|
||||
case 1:
|
||||
edgesRow = relations
|
||||
.where(col("relClass").equalTo(metapath.get(0)))
|
||||
.select(col("source").as("src"), col("target").as("dst"), col("relClass"));
|
||||
break;
|
||||
case 2:
|
||||
Dataset<Row> edges_1 = relations
|
||||
.where(col("relClass").equalTo(metapath.get(0)))
|
||||
.select(col("source").as("source_1"), col("target").as("target_1"));
|
||||
Dataset<Row> edges_2 = relations
|
||||
.where(col("relClass").equalTo(metapath.get(1)))
|
||||
.select(col("source").as("source_2"), col("target").as("target_2"));
|
||||
|
||||
for (int i = 0; i < maxIter; i++) {
|
||||
Dataset<Row> edges_sample1 = edges_1.sample(fraction);
|
||||
Dataset<Row> edges_sample2 = edges_2.sample(fraction);
|
||||
Dataset<Row> joined = edges_sample1.join(
|
||||
edges_sample2,
|
||||
edges_sample1.col("target_1").equalTo(edges_sample2.col("source_2")),
|
||||
"inner"
|
||||
)
|
||||
.select(col("source_1").as("src"), col("target_2").as("dst"))
|
||||
.withColumn("relClass", lit(edgeName));
|
||||
|
||||
edgesRow = edgesRow.union(joined);
|
||||
}
|
||||
|
||||
edgesRow.distinct();
|
||||
|
||||
break;
|
||||
default:
|
||||
throw new Exception("Metapath size not allowed");
|
||||
}
|
||||
// join with nodes to get longs instead of string ids
|
||||
return edgesRow
|
||||
.join(nodeDS, edgesRow.col("src").equalTo(nodeDS.col(STRING_ID_COL)))
|
||||
.select(col(LONG_ID_COL).as("src"), col("dst"), col("relClass"))
|
||||
.join(nodeDS, edgesRow.col("dst").equalTo(nodeDS.col(STRING_ID_COL)))
|
||||
.select(col("src"), col(LONG_ID_COL).as("dst"), col("relClass"))
|
||||
.toJavaRDD()
|
||||
.map(row ->
|
||||
new Edge<>(
|
||||
row.getLong(0),
|
||||
row.getLong(1),
|
||||
row.getString(2)
|
||||
)
|
||||
)
|
||||
.rdd();
|
||||
}
|
||||
}
|
|
@ -59,7 +59,7 @@ public class SparkCreateEmbeddings extends AbstractSparkJob{
|
|||
public void run() throws Exception {
|
||||
|
||||
// read oozie parameters
|
||||
final String graphBasePath = parser.get("graphBasePath");
|
||||
final String documentsPath = parser.get("documentsPath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
final String outputPath = parser.get("outputPath");
|
||||
final String raidConfPath = parser.get("raidConfPath");
|
||||
|
@ -68,7 +68,7 @@ public class SparkCreateEmbeddings extends AbstractSparkJob{
|
|||
.map(Integer::valueOf)
|
||||
.orElse(NUM_PARTITIONS);
|
||||
|
||||
log.info("graphBasePath: '{}'", graphBasePath);
|
||||
log.info("documentsPath: '{}'", documentsPath);
|
||||
log.info("workingPath: '{}'", workingPath);
|
||||
log.info("outputPath: '{}'", outputPath);
|
||||
log.info("raidConfPath: '{}'", raidConfPath);
|
||||
|
@ -77,52 +77,7 @@ public class SparkCreateEmbeddings extends AbstractSparkJob{
|
|||
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
RAiDConfig config = loadRAiDConfig(raidConfPath);
|
||||
|
||||
Dataset<Row> nodeDS = prepareNodes(sc, graphBasePath, config);
|
||||
log.info("Number of nodes: {}", nodeDS.count());
|
||||
|
||||
RDD<Tuple2<Object, String>> vertices = createVertices(nodeDS);
|
||||
|
||||
Dataset<Row> relations = spark
|
||||
.read()
|
||||
.schema(REL_BEAN_ENC.schema())
|
||||
.json(graphBasePath + "/relation");
|
||||
log.info("Number of relations: {}", relations.count());
|
||||
|
||||
Dataset<Row> labelDS = spark.createDataFrame(
|
||||
new ArrayList<>(), DataTypes.createStructType(
|
||||
new StructField[]{
|
||||
DataTypes.createStructField(LONG_ID_COL, DataTypes.LongType, false),
|
||||
DataTypes.createStructField(LABEL_ID_COL, DataTypes.StringType, false)
|
||||
}));
|
||||
|
||||
for(EdgeParam edgeParam: config.getEdges()) {
|
||||
|
||||
RDD<Edge<String>> edges = createEdges(nodeDS, relations, edgeParam.getMetapath(), edgeParam.getName());
|
||||
|
||||
Dataset<Row> connectedComponents = GraphUtil.createConnectedComponents(spark, vertices, edges, 1);
|
||||
|
||||
//this is getting rid of those connected components of a single element (they are not meaningful ?)
|
||||
Dataset<Row> counts = connectedComponents
|
||||
.groupBy(LABEL_ID_COL)
|
||||
.agg(functions.count(LABEL_ID_COL).alias("count"))
|
||||
.filter(functions.col("count").gt(1));
|
||||
connectedComponents = connectedComponents
|
||||
.join(counts, connectedComponents.col(LABEL_ID_COL).equalTo(counts.col(LABEL_ID_COL)), "inner")
|
||||
.select(connectedComponents.col(LONG_ID_COL), connectedComponents.col(LABEL_ID_COL));
|
||||
|
||||
labelDS = labelDS.union(connectedComponents);
|
||||
|
||||
}
|
||||
|
||||
Dataset<Row> labels = labelDS
|
||||
.groupBy(col(LONG_ID_COL))
|
||||
.agg(collect_list(labelDS.col(LABEL_ID_COL)).as(LABELS_COL));
|
||||
|
||||
Dataset<Row> result = nodeDS
|
||||
.join(labels, nodeDS.col(LONG_ID_COL).equalTo(labels.col(LONG_ID_COL)), "left")
|
||||
.select(col(STRING_ID_COL), col(LABELS_COL));
|
||||
|
||||
log.info("Labels generated: {}", result.count());
|
||||
Dataset<Row> documents = spark.read().load(documentsPath);
|
||||
|
||||
Dataset<Row> word2vecEmbeddings = new Word2Vec()
|
||||
.setInputCol(LABELS_COL)
|
||||
|
@ -131,8 +86,8 @@ public class SparkCreateEmbeddings extends AbstractSparkJob{
|
|||
.setStepSize(config.getParams().getOrDefault("learningRate", 0.01).doubleValue())
|
||||
.setMinCount(config.getParams().getOrDefault("minCount", 1).intValue())
|
||||
.setMaxIter(config.getParams().getOrDefault("maxIter", 10).intValue())
|
||||
.fit(result)
|
||||
.transform(result);
|
||||
.fit(documents)
|
||||
.transform(documents);
|
||||
|
||||
Dataset<Row> embeddings = new Normalizer()
|
||||
.setInputCol("word2vec_" + EMBEDDING_COL)
|
||||
|
@ -141,77 +96,10 @@ public class SparkCreateEmbeddings extends AbstractSparkJob{
|
|||
.transform(word2vecEmbeddings)
|
||||
.select(col(STRING_ID_COL), col(EMBEDDING_COL));
|
||||
|
||||
log.info("Embeddings generated: {}", embeddings.count());
|
||||
|
||||
embeddings.write().save(outputPath);
|
||||
|
||||
}
|
||||
|
||||
public Dataset<Row> prepareNodes(JavaSparkContext sc, String graphBasePath, RAiDConfig config) {
|
||||
// create nodes
|
||||
JavaRDD<String> nodes = sc.emptyRDD();
|
||||
for (String nodeType: config.getNodes()) {
|
||||
nodes = nodes.union(
|
||||
sc.textFile(graphBasePath + "/" + nodeType)
|
||||
.filter(json -> !((boolean) JsonPath.read(json, DELETEDBYINFERENCE_PATH)))
|
||||
.map(json -> JsonPath.read(json, ID_PATH)));
|
||||
}
|
||||
|
||||
return spark.createDataset(
|
||||
nodes.zipWithIndex().map(n -> RowFactory.create(n._1(), n._2())).rdd(),
|
||||
RowEncoder.apply(
|
||||
DataTypes.createStructType(
|
||||
new StructField[]{
|
||||
DataTypes.createStructField(STRING_ID_COL, DataTypes.StringType, false),
|
||||
DataTypes.createStructField(LONG_ID_COL, DataTypes.LongType, false)
|
||||
})
|
||||
));
|
||||
}
|
||||
|
||||
public RDD<Tuple2<Object,String>> createVertices(Dataset<Row> nodeDS) {
|
||||
return nodeDS
|
||||
.toJavaRDD()
|
||||
.map(row -> new Tuple2<>(row.get(1), row.getString(0)))
|
||||
.rdd();
|
||||
}
|
||||
|
||||
public RDD<Edge<String>> createEdges(Dataset<Row> nodeDS, Dataset<Row> relations, List<String> metapath, String edgeName) throws Exception {
|
||||
|
||||
Dataset<Row> edgesRow;
|
||||
switch(metapath.size()) {
|
||||
case 1:
|
||||
edgesRow = relations
|
||||
.where(col("relClass").equalTo(metapath.get(0)))
|
||||
.select(col("source").as("src"), col("target").as("dst"), col("relClass"));
|
||||
break;
|
||||
case 2:
|
||||
Dataset<Row> edges_1 = relations
|
||||
.where(col("relClass").equalTo(metapath.get(0)))
|
||||
.select(col("source").as("source_1"), col("target").as("target_1"));
|
||||
Dataset<Row> edges_2 = relations
|
||||
.where(col("relClass").equalTo(metapath.get(1)))
|
||||
.select(col("source").as("source_2"), col("target").as("target_2"));
|
||||
|
||||
edgesRow = edges_1
|
||||
.join(edges_2, edges_1.col("target_1").equalTo(edges_2.col("source_2")))
|
||||
.select(col("source_1").as("src"), col("target_2").as("dst"))
|
||||
.withColumn("relClass", lit(edgeName));
|
||||
break;
|
||||
default:
|
||||
throw new Exception("Metapath size not allowed");
|
||||
}
|
||||
// join with nodes to get longs instead of string ids
|
||||
return edgesRow
|
||||
.join(nodeDS, edgesRow.col("src").equalTo(nodeDS.col(STRING_ID_COL)))
|
||||
.select(col(LONG_ID_COL).as("src"), col("dst"), col("relClass"))
|
||||
.join(nodeDS, edgesRow.col("dst").equalTo(nodeDS.col(STRING_ID_COL)))
|
||||
.select(col("src"), col(LONG_ID_COL).as("dst"), col("relClass"))
|
||||
.toJavaRDD()
|
||||
.map(row ->
|
||||
new Edge<>(
|
||||
row.getLong(0),
|
||||
row.getLong(1),
|
||||
row.getString(2)
|
||||
)
|
||||
)
|
||||
.rdd();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package eu.dnetlib.raid.jobs;
|
|||
import com.google.common.collect.Iterators;
|
||||
import eu.dnetlib.raid.support.ArgumentApplicationParser;
|
||||
import eu.dnetlib.raid.support.RAiDConfig;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.MapPartitionsFunction;
|
||||
|
@ -32,7 +33,7 @@ public class SparkRAiDClustering extends AbstractSparkJob {
|
|||
public static void main(String[] args) throws Exception {
|
||||
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
readResource("/jobs/parameters/createClusters_parameters.json", SparkCreateEmbeddingsW2V.class)
|
||||
readResource("/jobs/parameters/createClusters_parameters.json", SparkRAiDClustering.class)
|
||||
);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
@ -121,6 +122,8 @@ public class SparkRAiDClustering extends AbstractSparkJob {
|
|||
Dataset<Row> result = spark
|
||||
.createDataFrame(clusteredData.rdd(), DataTypes.createStructType(new StructField[]{DataTypes.createStructField(STRING_ID_COL, DataTypes.StringType, false), DataTypes.createStructField(LABEL_ID_COL, DataTypes.IntegerType, false)}));
|
||||
|
||||
// result.show(false);
|
||||
|
||||
result.write().save(outputPath);
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
[
|
||||
{
|
||||
"paramName": "g",
|
||||
"paramLongName": "graphBasePath",
|
||||
"paramDescription": "the input graph base path",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "w",
|
||||
"paramLongName": "workingPath",
|
||||
"paramDescription": "working directory for the computations",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "o",
|
||||
"paramLongName": "outputPath",
|
||||
"paramDescription": "output path",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "rc",
|
||||
"paramLongName": "raidConfPath",
|
||||
"paramDescription": "RAiD configuration path",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "np",
|
||||
"paramLongName": "numPartitions",
|
||||
"paramDescription": "number of partitions for the spark job",
|
||||
"paramRequired": false
|
||||
}
|
||||
]
|
|
@ -1,8 +1,8 @@
|
|||
[
|
||||
{
|
||||
"paramName": "g",
|
||||
"paramLongName": "graphBasePath",
|
||||
"paramDescription": "the input graph base path",
|
||||
"paramName": "d",
|
||||
"paramLongName": "documentsPath",
|
||||
"paramDescription": "the input documents path",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
|
|
|
@ -4,6 +4,10 @@
|
|||
<name>graphBasePath</name>
|
||||
<description>path for the input graph</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>documentsPath</name>
|
||||
<description>path for the input documents</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>raidConfPath</name>
|
||||
<description>configuration for RAiD inference</description>
|
||||
|
@ -79,12 +83,41 @@
|
|||
</configuration>
|
||||
</global>
|
||||
|
||||
<start to="CreateEmbeddings"/>
|
||||
<start to="CreateDocuments"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="CreateDocuments">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Create Documents</name>
|
||||
<class>eu.dnetlib.raid.jobs.SparkCreateDocuments</class>
|
||||
<jar>dhp-raid-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--num-executors=32
|
||||
--executor-memory=8G
|
||||
--executor-cores=6
|
||||
--driver-memory=8G
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.dynamicAllocation.enabled=true
|
||||
</spark-opts>
|
||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--outputPath</arg><arg>${documentsPath}</arg>
|
||||
<arg>--raidConfPath</arg><arg>${raidConfPath}</arg>
|
||||
<arg>--numPartitions</arg><arg>${numPartitions}</arg>
|
||||
</spark>
|
||||
<ok to="CreateEmbeddings"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="CreateEmbeddings">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
|
@ -94,8 +127,8 @@
|
|||
<jar>dhp-raid-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--num-executors=32
|
||||
--executor-memory=16G
|
||||
--executor-cores=4
|
||||
--executor-memory=8G
|
||||
--executor-cores=6
|
||||
--driver-memory=8G
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
|
@ -104,7 +137,7 @@
|
|||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.dynamicAllocation.enabled=true
|
||||
</spark-opts>
|
||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--documentsPath</arg><arg>${documentsPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--outputPath</arg><arg>${embeddingsPath}</arg>
|
||||
<arg>--raidConfPath</arg><arg>${raidConfPath}</arg>
|
||||
|
@ -123,8 +156,8 @@
|
|||
<jar>dhp-raid-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--num-executors=32
|
||||
--executor-memory=16G
|
||||
--executor-cores=4
|
||||
--executor-memory=8G
|
||||
--executor-cores=6
|
||||
--driver-memory=8G
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package eu.dnetlib.raid;
|
||||
|
||||
import com.sun.media.jfxmedia.logging.Logger;
|
||||
import eu.dnetlib.raid.jobs.AbstractSparkJob;
|
||||
import eu.dnetlib.raid.jobs.SparkCreateDocuments;
|
||||
import eu.dnetlib.raid.jobs.SparkCreateEmbeddings;
|
||||
import eu.dnetlib.raid.jobs.SparkRAiDClustering;
|
||||
import eu.dnetlib.raid.support.ArgumentApplicationParser;
|
||||
|
@ -38,6 +40,7 @@ public class RAiDInferenceTest {
|
|||
final static String workingPath = "/tmp/working_dir";
|
||||
final static String embeddingsPath = "/tmp/embeddings";
|
||||
final static String clustersPath = "/tmp/clusters";
|
||||
final static String documentsPath = "/tmp/documents";
|
||||
|
||||
final static String numPartitions = "3";
|
||||
|
||||
|
@ -54,6 +57,7 @@ public class RAiDInferenceTest {
|
|||
FileUtils.deleteDirectory(new File(workingPath));
|
||||
FileUtils.deleteDirectory(new File(clustersPath));
|
||||
FileUtils.deleteDirectory(new File(embeddingsPath));
|
||||
FileUtils.deleteDirectory(new File(documentsPath));
|
||||
}
|
||||
|
||||
@BeforeAll
|
||||
|
@ -96,13 +100,35 @@ public class RAiDInferenceTest {
|
|||
|
||||
@Test
|
||||
@Order(1)
|
||||
public void createDocumentsTest() throws Exception {
|
||||
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(readResource("/jobs/parameters/createDocuments_parameters.json", SparkCreateDocuments.class));
|
||||
|
||||
parser.parseArgument(
|
||||
new String[] {
|
||||
"-g", graphBasePath,
|
||||
"-rc", raidConfPath,
|
||||
"-o", documentsPath,
|
||||
"-w", workingPath,
|
||||
"-np", numPartitions
|
||||
}
|
||||
);
|
||||
|
||||
new SparkCreateDocuments(
|
||||
parser,
|
||||
spark
|
||||
).run();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(2)
|
||||
public void createRAiDEmbeddingsTest() throws Exception {
|
||||
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(readResource("/jobs/parameters/createEmbeddings_parameters.json", SparkCreateEmbeddings.class));
|
||||
|
||||
parser.parseArgument(
|
||||
new String[] {
|
||||
"-g", graphBasePath,
|
||||
"-d", documentsPath,
|
||||
"-rc", raidConfPath,
|
||||
"-o", embeddingsPath,
|
||||
"-w", workingPath,
|
||||
|
@ -117,8 +143,8 @@ public class RAiDInferenceTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
@Order(2)
|
||||
public void createClusters() throws Exception {
|
||||
@Order(3)
|
||||
public void createClustersTest() throws Exception {
|
||||
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(readResource("/jobs/parameters/createClusters_parameters.json", SparkRAiDClustering.class));
|
||||
|
||||
|
|
|
@ -30,11 +30,13 @@
|
|||
"supplement": {"p": 1.0, "q": 0.25, "walkLength": 3, "numWalks": 3}
|
||||
},
|
||||
"params": {
|
||||
"embeddingSize": 128,
|
||||
"maxIter": 20,
|
||||
"partitioningMaxIter": 50,
|
||||
"embeddingSize": 4,
|
||||
"maxIter": 10,
|
||||
"joinMaxIter": 20,
|
||||
"joinSample": 0.3,
|
||||
"partitioningMaxIter": 3,
|
||||
"learningRate": 0.01,
|
||||
"epsilon": 0.5,
|
||||
"minPts": 5
|
||||
"epsilon": 0.1,
|
||||
"minPts": 1
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue