reimplementation and optimization of the procedure to create documents
This commit is contained in:
parent
6202c3e108
commit
b56066ea5a
|
|
@ -0,0 +1,8 @@
|
|||
<component name="InspectionProjectProfileManager">
|
||||
<profile version="1.0">
|
||||
<option name="myName" value="Project Default" />
|
||||
<inspection_tool class="AutoCloseableResource" enabled="true" level="WARNING" enabled_by_default="true">
|
||||
<option name="METHOD_MATCHER_CONFIG" value="java.util.Formatter,format,java.io.Writer,append,com.google.common.base.Preconditions,checkNotNull,org.hibernate.Session,close,java.io.PrintWriter,printf,java.io.PrintStream,printf,org.apache.spark.api.java.JavaSparkContext,fromSparkContext" />
|
||||
</inspection_tool>
|
||||
</profile>
|
||||
</component>
|
||||
|
|
@ -5,17 +5,31 @@
|
|||
</component>
|
||||
<component name="ChangeListManager">
|
||||
<list default="true" id="67528cef-c396-4288-9c84-8cedea88df9c" name="Changes" comment="">
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateDocuments.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/jobs/parameters/createDocuments_parameters.json" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/.idea/inspectionProfiles/Project_Default.xml" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/README.markdown" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateDocumentsGraphFrames.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateDocumentsTesting.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/ClusteringAlgorithms.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/DataPoint.java" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/organization" afterDir="false" />
|
||||
<change afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/project" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-build/dhp-build-properties-maven-plugin/test.properties" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-build/dhp-build-properties-maven-plugin/test.properties" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/job-override.properties" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/job-override.properties" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/pom.xml" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/AbstractSparkJob.java" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/AbstractSparkJob.java" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateDocuments.java" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateDocuments.java" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateEmbeddings.java" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateEmbeddings.java" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkRAiDClustering.java" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkRAiDClustering.java" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/resources/jobs/parameters/createEmbeddings_parameters.json" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/jobs/parameters/createEmbeddings_parameters.json" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkRAiDClustering.java" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkClustering.java" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/RAiDConfig.java" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/RAiDConfig.java" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/resources/raid/oozie_app/workflow.xml" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/raid/oozie_app/workflow.xml" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/scala/eu/dnetlib/raid/graph/GraphUtil.scala" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/scala/eu/dnetlib/raid/graph/GraphUtil.scala" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/test/java/eu/dnetlib/raid/RAiDInferenceTest.java" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/test/java/eu/dnetlib/raid/RAiDInferenceTest.java" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/config/raid.conf.json" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/config/raid.conf.json" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/dataset" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/dataset" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/publication" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/publication" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/software" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/software" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/pom.xml" afterDir="false" />
|
||||
</list>
|
||||
<option name="SHOW_DIALOG" value="false" />
|
||||
<option name="HIGHLIGHT_CONFLICTS" value="true" />
|
||||
|
|
@ -65,6 +79,7 @@
|
|||
</component>
|
||||
<component name="PropertiesComponent">{
|
||||
"keyToString": {
|
||||
"ASKED_ADD_EXTERNAL_FILES": "true",
|
||||
"RunOnceActivity.OpenProjectViewOnStart": "true",
|
||||
"RunOnceActivity.ShowReadmeOnStart": "true",
|
||||
"SHARE_PROJECT_CONFIGURATION_FILES": "true",
|
||||
|
|
@ -92,19 +107,6 @@
|
|||
</key>
|
||||
</component>
|
||||
<component name="RunManager" selected="JUnit.RAiDInferenceTest">
|
||||
<configuration name="Main" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
|
||||
<option name="MAIN_CLASS_NAME" value="org.example.Main" />
|
||||
<module name="dhp-raid" />
|
||||
<extension name="coverage">
|
||||
<pattern>
|
||||
<option name="PATTERN" value="org.example.*" />
|
||||
<option name="ENABLED" value="true" />
|
||||
</pattern>
|
||||
</extension>
|
||||
<method v="2">
|
||||
<option name="Make" enabled="true" />
|
||||
</method>
|
||||
</configuration>
|
||||
<configuration name="RAiDInferenceTest" type="JUnit" factoryName="JUnit" temporary="true" nameIsGenerated="true">
|
||||
<module name="dhp-raid" />
|
||||
<extension name="coverage">
|
||||
|
|
@ -136,6 +138,38 @@
|
|||
<option name="Make" enabled="true" />
|
||||
</method>
|
||||
</configuration>
|
||||
<configuration name="RAiDInferenceTest.createDocuments2Test" type="JUnit" factoryName="JUnit" temporary="true" nameIsGenerated="true">
|
||||
<module name="dhp-raid" />
|
||||
<extension name="coverage">
|
||||
<pattern>
|
||||
<option name="PATTERN" value="eu.dnetlib.raid.*" />
|
||||
<option name="ENABLED" value="true" />
|
||||
</pattern>
|
||||
</extension>
|
||||
<option name="PACKAGE_NAME" value="eu.dnetlib.raid" />
|
||||
<option name="MAIN_CLASS_NAME" value="eu.dnetlib.raid.RAiDInferenceTest" />
|
||||
<option name="METHOD_NAME" value="createDocuments2Test" />
|
||||
<option name="TEST_OBJECT" value="method" />
|
||||
<method v="2">
|
||||
<option name="Make" enabled="true" />
|
||||
</method>
|
||||
</configuration>
|
||||
<configuration name="RAiDInferenceTest.createDocumentsTest" type="JUnit" factoryName="JUnit" temporary="true" nameIsGenerated="true">
|
||||
<module name="dhp-raid" />
|
||||
<extension name="coverage">
|
||||
<pattern>
|
||||
<option name="PATTERN" value="eu.dnetlib.raid.*" />
|
||||
<option name="ENABLED" value="true" />
|
||||
</pattern>
|
||||
</extension>
|
||||
<option name="PACKAGE_NAME" value="eu.dnetlib.raid" />
|
||||
<option name="MAIN_CLASS_NAME" value="eu.dnetlib.raid.RAiDInferenceTest" />
|
||||
<option name="METHOD_NAME" value="createDocumentsTest" />
|
||||
<option name="TEST_OBJECT" value="method" />
|
||||
<method v="2">
|
||||
<option name="Make" enabled="true" />
|
||||
</method>
|
||||
</configuration>
|
||||
<configuration name="RAiDInferenceTest.createRAiDEmbeddingsTest" type="JUnit" factoryName="JUnit" temporary="true" nameIsGenerated="true">
|
||||
<module name="dhp-raid" />
|
||||
<extension name="coverage">
|
||||
|
|
@ -155,9 +189,10 @@
|
|||
<recent_temporary>
|
||||
<list>
|
||||
<item itemvalue="JUnit.RAiDInferenceTest" />
|
||||
<item itemvalue="JUnit.RAiDInferenceTest.createDocumentsTest" />
|
||||
<item itemvalue="JUnit.RAiDInferenceTest.createDocuments2Test" />
|
||||
<item itemvalue="JUnit.RAiDInferenceTest.createClusters" />
|
||||
<item itemvalue="JUnit.RAiDInferenceTest.createRAiDEmbeddingsTest" />
|
||||
<item itemvalue="Application.Main" />
|
||||
</list>
|
||||
</recent_temporary>
|
||||
</component>
|
||||
|
|
@ -172,11 +207,23 @@
|
|||
<workItem from="1729084634987" duration="21063000" />
|
||||
<workItem from="1729257864565" duration="95390000" />
|
||||
<workItem from="1730275959282" duration="35702000" />
|
||||
<workItem from="1731482199949" duration="11193000" />
|
||||
<workItem from="1731482199949" duration="87224000" />
|
||||
<workItem from="1732531061543" duration="76487000" />
|
||||
</task>
|
||||
<servers />
|
||||
</component>
|
||||
<component name="TypeScriptGeneratedFilesManager">
|
||||
<option name="version" value="3" />
|
||||
</component>
|
||||
<component name="XDebuggerManager">
|
||||
<breakpoint-manager>
|
||||
<breakpoints>
|
||||
<line-breakpoint enabled="true" type="java-line">
|
||||
<url>file://$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateDocumentsGraphFrames.java</url>
|
||||
<line>130</line>
|
||||
<option name="timeStamp" value="1" />
|
||||
</line-breakpoint>
|
||||
</breakpoints>
|
||||
</breakpoint-manager>
|
||||
</component>
|
||||
</project>
|
||||
|
|
@ -0,0 +1,24 @@
|
|||
# RAiD Inference
|
||||
|
||||
The Research Activity ID is inferred by taking advantage of relationships in the graph.
|
||||
The process is configured through the JSON configuration file (es: the file `raid.conf.json` in `dhp-raid/test/resources`).
|
||||
|
||||
The workflow is composed by three steps:
|
||||
|
||||
### 1. Documents creation
|
||||
The documents are created using graph nodes and relations. The purpose is to associate to each node a list of labels inherited from the nodes linked with it.
|
||||
|
||||
*(possible lacks:)* Approximated Cross Join to create metapaths. Connected Components on each relationship type to create list of labels.
|
||||
|
||||
### 2. Embeddings creation
|
||||
The embeddings are created using the documents of the previous step. The implementation uses a Word2Vec algorithm normalized in order to make vectors of length equal to 1 (to fit with the clustering needs).
|
||||
|
||||
*(possible lacks:)* Word2Vec creates vectors using cosine similarity.
|
||||
|
||||
### 3. Clustering
|
||||
The clustering is done in parallel on different partitions obtained via a preliminary K-Means algorithm. The clustering adopted for each partition is the DBSCAN algorithm.
|
||||
|
||||
*(possible lacks:)* DBSCAN is not much scalable and it strongly depends on the creation of the partitions.
|
||||
|
||||
### 4. (optional) Disambiguation-like processing
|
||||
The clustering keys created by the previous step can be used to group nodes and create similarity relationships between them following a JSON configuration (similar to FDup, engineered to group together nodes in the same Research Activity).
|
||||
|
|
@ -1,2 +1 @@
|
|||
# Wed Nov 20 10:38:53 CET 2024
|
||||
projectPropertyKey=projectPropertyValue
|
||||
# Fri Dec 06 16:14:34 CET 2024
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
graphBasePath = /tmp/raid_subgraph_test/
|
||||
graphBasePath = /tmp/raid_community_subgraph
|
||||
workingPath = /user/michele.debonis/raid_inference_test/working_dir
|
||||
clustersPath = /user/michele.debonis/raid_inference_test/clusters
|
||||
embeddingsPath = /user/michele.debonis/raid_inference_test/embeddings
|
||||
documentsPath = /user/michele.debonis/raid_inference_test/documents
|
||||
clustersPath = /user/michele.debonis/raid_inference_community_test/clusters_apache
|
||||
embeddingsPath = /user/michele.debonis/raid_inference_community_test/embeddings
|
||||
documentsPath = /user/michele.debonis/raid_inference_community_test/documents
|
||||
raidConfPath = /user/michele.debonis/raid.conf.json
|
||||
numPartitions = 300
|
||||
numPartitions = 500
|
||||
|
|
@ -65,6 +65,11 @@
|
|||
<artifactId>smile-core</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>graphframes</groupId>
|
||||
<artifactId>graphframes</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
package eu.dnetlib.raid.jobs;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.raid.support.ArgumentApplicationParser;
|
||||
import eu.dnetlib.raid.support.RAiDConfig;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
|
@ -7,9 +9,10 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.apache.spark.sql.types.DataTypes;
|
||||
import org.apache.spark.sql.types.StructField;
|
||||
import org.apache.spark.sql.types.StructType;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
|
|
@ -22,12 +25,24 @@ public abstract class AbstractSparkJob implements Serializable {
|
|||
protected static final int NUM_PARTITIONS = 1000;
|
||||
protected static final int SEED = 42;
|
||||
protected static final String EMBEDDING_COL = "embedding";
|
||||
protected static final String STRING_ID_COL = "id";
|
||||
protected static final String LONG_ID_COL = "longId";
|
||||
protected static final String RANDOM_WALK_COL = "random_walk";
|
||||
protected static final String STRING_ID_COL = "openaireId";
|
||||
protected static final String LONG_ID_COL = "id";
|
||||
protected static final String TYPE_COL = "type";
|
||||
protected static final String CLUSTER_ID_COL = "cluster";
|
||||
protected static final String DELETEDBYINFERENCE_COL = "deletedbyinference";
|
||||
protected static final String LABEL_ID_COL = "labelId";
|
||||
protected static final String LABELS_COL = "labels";
|
||||
protected static final String PARTITION_COL = "partition";
|
||||
protected static final Encoder<Relation> REL_BEAN_ENC = Encoders.bean(Relation.class);
|
||||
|
||||
protected static final Encoder<Result> RESULT_BEAN_ENC = Encoders.bean(Result.class);
|
||||
|
||||
protected static final StructType CLUSTERS_DS_STRUCT = new StructType(
|
||||
new StructField[]{
|
||||
DataTypes.createStructField(STRING_ID_COL, DataTypes.StringType, false),
|
||||
DataTypes.createStructField(CLUSTER_ID_COL, DataTypes.IntegerType, false)
|
||||
}
|
||||
);
|
||||
|
||||
public ArgumentApplicationParser parser; // parameters for the spark action
|
||||
public SparkSession spark; // the spark session
|
||||
|
|
|
|||
|
|
@ -0,0 +1,94 @@
|
|||
package eu.dnetlib.raid.jobs;
|
||||
|
||||
import eu.dnetlib.raid.support.ArgumentApplicationParser;
|
||||
import eu.dnetlib.raid.support.ClusteringAlgorithms;
|
||||
import eu.dnetlib.raid.support.RAiDConfig;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.ml.clustering.BisectingKMeans;
|
||||
import org.apache.spark.ml.clustering.KMeans;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.apache.spark.sql.functions.col;
|
||||
|
||||
public class SparkClustering extends AbstractSparkJob {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkClustering.class);
|
||||
|
||||
public SparkClustering(ArgumentApplicationParser parser, SparkSession spark) {
|
||||
super(parser, spark);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
readResource("/jobs/parameters/createClusters_parameters.json", SparkClustering.class)
|
||||
);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
new SparkClustering(
|
||||
parser,
|
||||
getSparkSession(conf)
|
||||
).run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() throws Exception {
|
||||
|
||||
// read oozie parameters
|
||||
final String inputPath = parser.get("inputPath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
final String outputPath = parser.get("outputPath");
|
||||
final String raidConfPath = parser.get("raidConfPath");
|
||||
final int numPartitions = Optional
|
||||
.ofNullable(parser.get("numPartitions"))
|
||||
.map(Integer::valueOf)
|
||||
.orElse(NUM_PARTITIONS);
|
||||
|
||||
log.info("inputPath: '{}'", inputPath);
|
||||
log.info("workingPath: '{}'", workingPath);
|
||||
log.info("outputPath: '{}'", outputPath);
|
||||
log.info("raidConfPath: '{}'", raidConfPath);
|
||||
log.info("numPartitions: '{}'", numPartitions);
|
||||
|
||||
RAiDConfig config = loadRAiDConfig(raidConfPath);
|
||||
|
||||
final Dataset<Row> embeddings = spark.read().load(inputPath);
|
||||
|
||||
Dataset<Row> clusteredData = new KMeans() //use BisectingKMeans for more balanced clusters
|
||||
.setK(numPartitions)
|
||||
.setSeed(SEED)
|
||||
.setFeaturesCol(EMBEDDING_COL)
|
||||
.setPredictionCol(PARTITION_COL)
|
||||
.setMaxIter(config.getParams().getOrDefault("partitioningMaxIter", 10).intValue())
|
||||
.fit(embeddings)
|
||||
.transform(embeddings)
|
||||
.repartition(col(PARTITION_COL))
|
||||
.mapPartitions(
|
||||
ClusteringAlgorithms.getApacheDBSCAN(
|
||||
PARTITION_COL,
|
||||
EMBEDDING_COL,
|
||||
STRING_ID_COL,
|
||||
config.getParams().getOrDefault("minPts", 2).intValue(),
|
||||
config.getParams().getOrDefault("epsilon", 0.2).doubleValue(),
|
||||
"euclidean"
|
||||
), Encoders.kryo(Row.class));
|
||||
|
||||
Dataset<Row> result = spark.createDataFrame(clusteredData.rdd(), CLUSTERS_DS_STRUCT);
|
||||
|
||||
result.show(false);
|
||||
|
||||
result.write().save(outputPath);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,22 +1,18 @@
|
|||
package eu.dnetlib.raid.jobs;
|
||||
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.raid.graph.GraphUtil;
|
||||
import eu.dnetlib.raid.support.ArgumentApplicationParser;
|
||||
import eu.dnetlib.raid.support.EdgeParam;
|
||||
import eu.dnetlib.raid.support.RAiDConfig;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.graphx.Edge;
|
||||
import org.apache.spark.ml.feature.Normalizer;
|
||||
import org.apache.spark.ml.feature.Word2Vec;
|
||||
import org.apache.spark.rdd.RDD;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
|
||||
import org.apache.spark.sql.types.DataTypes;
|
||||
import org.apache.spark.sql.types.StructField;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.apache.spark.sql.expressions.Window;
|
||||
import org.apache.spark.sql.expressions.WindowSpec;
|
||||
import org.apache.spark.sql.functions;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import scala.Tuple2;
|
||||
|
|
@ -31,9 +27,6 @@ import static org.apache.spark.sql.functions.*;
|
|||
public class SparkCreateDocuments extends AbstractSparkJob{
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkCreateDocuments.class);
|
||||
private static final String ID_PATH = "$.id";
|
||||
private static final String DELETEDBYINFERENCE_PATH = "$.dataInfo.deletedbyinference";
|
||||
private static final Encoder<Relation> REL_BEAN_ENC = Encoders.bean(Relation.class);
|
||||
|
||||
public SparkCreateDocuments(ArgumentApplicationParser parser, SparkSession spark) {
|
||||
super(parser, spark);
|
||||
|
|
@ -47,7 +40,8 @@ public class SparkCreateDocuments extends AbstractSparkJob{
|
|||
|
||||
parser.parseArgument(args);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
SparkConf conf = new SparkConf()
|
||||
.set("spark.driver.maxResultSize", "4g");
|
||||
|
||||
new SparkCreateDocuments(
|
||||
parser,
|
||||
|
|
@ -57,7 +51,6 @@ public class SparkCreateDocuments extends AbstractSparkJob{
|
|||
|
||||
@Override
|
||||
public void run() throws Exception {
|
||||
|
||||
// read oozie parameters
|
||||
final String graphBasePath = parser.get("graphBasePath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
|
|
@ -74,156 +67,114 @@ public class SparkCreateDocuments extends AbstractSparkJob{
|
|||
log.info("raidConfPath: '{}'", raidConfPath);
|
||||
log.info("numPartitions: '{}'", numPartitions);
|
||||
|
||||
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
RAiDConfig config = loadRAiDConfig(raidConfPath);
|
||||
spark.sparkContext().setCheckpointDir(workingPath + "/checkpoint");
|
||||
|
||||
Dataset<Row> nodeDS = prepareNodes(sc, graphBasePath, config, numPartitions);
|
||||
log.info("Number of nodes: {}", nodeDS.count());
|
||||
|
||||
RDD<Tuple2<Object, String>> vertices = createVertices(nodeDS);
|
||||
|
||||
Dataset<Row> relations = spark
|
||||
.read()
|
||||
.schema(REL_BEAN_ENC.schema())
|
||||
.json(graphBasePath + "/relation")
|
||||
.repartition(numPartitions);
|
||||
log.info("Number of relations: {}", relations.count());
|
||||
|
||||
Dataset<Row> labelDS = spark.createDataFrame(
|
||||
new ArrayList<>(), DataTypes.createStructType(
|
||||
new StructField[]{
|
||||
DataTypes.createStructField(LONG_ID_COL, DataTypes.LongType, false),
|
||||
DataTypes.createStructField(LABEL_ID_COL, DataTypes.StringType, false)
|
||||
}));
|
||||
final Dataset<Row> vertices = createVertices(graphBasePath, config, numPartitions).cache();
|
||||
final Dataset<Row> edges = createEdges(graphBasePath, vertices, config, numPartitions).cache();
|
||||
final RDD<Tuple2<Object, String>> nodes = vertices.toJavaRDD().map(v -> new Tuple2<>(v.get(0), v.getString(1))).rdd().cache();
|
||||
|
||||
List<Dataset<Row>> connectedComponentsList = new ArrayList<>();
|
||||
WindowSpec windowSpec = Window.partitionBy(col(LABEL_ID_COL));
|
||||
for(EdgeParam edgeParam: config.getEdges()) {
|
||||
final RDD<Edge<String>> metapathEdges = applyMetapath(edges, edgeParam.getMetapath(), edgeParam.getName());
|
||||
|
||||
RDD<Edge<String>> edges = createEdges(
|
||||
nodeDS,
|
||||
relations,
|
||||
edgeParam.getMetapath(),
|
||||
edgeParam.getName(),
|
||||
config.getParams().getOrDefault("joinMaxIter", 20).intValue(),
|
||||
config.getParams().getOrDefault("joinSample", 0.1).doubleValue());
|
||||
|
||||
Dataset<Row> connectedComponents = GraphUtil.createConnectedComponents(spark, vertices, edges, 1);
|
||||
Dataset<Row> connectedComponents = GraphUtil.createConnectedComponents(spark, nodes, metapathEdges, config.getParams().getOrDefault("maxIter", 20).intValue());
|
||||
|
||||
//this is getting rid of those connected components of a single element (they are not meaningful ?)
|
||||
Dataset<Row> counts = connectedComponents
|
||||
.groupBy(LABEL_ID_COL)
|
||||
.agg(functions.count(LABEL_ID_COL).alias("count"))
|
||||
.filter(functions.col("count").gt(1));
|
||||
connectedComponents = connectedComponents
|
||||
.join(counts, connectedComponents.col(LABEL_ID_COL).equalTo(counts.col(LABEL_ID_COL)), "inner")
|
||||
.select(connectedComponents.col(LONG_ID_COL), connectedComponents.col(LABEL_ID_COL));
|
||||
|
||||
labelDS = labelDS.union(connectedComponents);
|
||||
.withColumn("count", functions.count("*").over(windowSpec))
|
||||
.filter("count >= 2")
|
||||
.drop("count");
|
||||
|
||||
connectedComponentsList.add(connectedComponents);
|
||||
}
|
||||
|
||||
Dataset<Row> labels = labelDS
|
||||
final Dataset<Row> labelDS = connectedComponentsList.stream().reduce(Dataset::union).orElseThrow(RuntimeException::new);
|
||||
nodes.unpersist(false);
|
||||
edges.unpersist(false);
|
||||
|
||||
Dataset<Row> documents = labelDS
|
||||
.groupBy(col(LONG_ID_COL))
|
||||
.agg(collect_list(labelDS.col(LABEL_ID_COL)).as(LABELS_COL));
|
||||
.agg(collect_list(labelDS.col(LABEL_ID_COL)).as(LABELS_COL))
|
||||
.withColumn(LABELS_COL, expr("transform(" + LABELS_COL +", x -> cast(x as string))"));
|
||||
|
||||
Dataset<Row> documents = nodeDS
|
||||
.join(labels, nodeDS.col(LONG_ID_COL).equalTo(labels.col(LONG_ID_COL)), "left")
|
||||
.select(col(STRING_ID_COL), col(LABELS_COL))
|
||||
.repartition(numPartitions);
|
||||
|
||||
log.info("Labels generated: {}", documents.count());
|
||||
documents = vertices
|
||||
.where(vertices.col(TYPE_COL).isInCollection(config.getEntities()))
|
||||
.join(documents, vertices.col(LONG_ID_COL).equalTo(documents.col(LONG_ID_COL)))
|
||||
.select(vertices.col(STRING_ID_COL), documents.col(LABELS_COL));
|
||||
|
||||
vertices.unpersist(false);
|
||||
documents.write().save(outputPath);
|
||||
|
||||
}
|
||||
|
||||
public Dataset<Row> prepareNodes(JavaSparkContext sc, String graphBasePath, RAiDConfig config, int numPartitions) {
|
||||
// create nodes
|
||||
JavaRDD<String> nodes = sc.emptyRDD();
|
||||
for (String nodeType: config.getNodes()) {
|
||||
nodes = nodes.union(
|
||||
sc.textFile(graphBasePath + "/" + nodeType)
|
||||
// .filter(json -> !((boolean) JsonPath.read(json, DELETEDBYINFERENCE_PATH)))
|
||||
.map(json -> JsonPath.read(json, ID_PATH)));
|
||||
}
|
||||
public Dataset<Row> createVertices(String graphBasePath, RAiDConfig config, int numPartitions) {
|
||||
|
||||
return spark
|
||||
.read()
|
||||
.schema(RESULT_BEAN_ENC.schema())
|
||||
.json(config.getNodes().stream().map(nodeType -> graphBasePath + "/" + nodeType).toArray(String[]::new))
|
||||
.select(col("id").as(STRING_ID_COL), col("resulttype.classname").as(TYPE_COL), lit(false).as(DELETEDBYINFERENCE_COL)) //col("dataInfo.deletedbyinference").as(DELETEDBYINFERENCE_COL))
|
||||
.filter(col(DELETEDBYINFERENCE_COL).equalTo(false))
|
||||
.withColumn(LONG_ID_COL, monotonically_increasing_id())
|
||||
.select(LONG_ID_COL, STRING_ID_COL, TYPE_COL)
|
||||
.repartition(numPartitions, col(STRING_ID_COL)); //reduces the join execution time
|
||||
|
||||
return spark.createDataset(
|
||||
nodes.zipWithIndex().map(n -> RowFactory.create(n._1(), n._2())).rdd(),
|
||||
RowEncoder.apply(
|
||||
DataTypes.createStructType(
|
||||
new StructField[]{
|
||||
DataTypes.createStructField(STRING_ID_COL, DataTypes.StringType, false),
|
||||
DataTypes.createStructField(LONG_ID_COL, DataTypes.LongType, false)
|
||||
})
|
||||
))
|
||||
.repartition(numPartitions);
|
||||
}
|
||||
|
||||
public RDD<Tuple2<Object,String>> createVertices(Dataset<Row> nodeDS) {
|
||||
return nodeDS
|
||||
.toJavaRDD()
|
||||
.map(row -> new Tuple2<>(row.get(1), row.getString(0)))
|
||||
.rdd();
|
||||
public Dataset<Row> createEdges(String graphBasePath, Dataset<Row> vertices, RAiDConfig config, int numPartitions) {
|
||||
|
||||
Dataset<Row> relationsDS = spark
|
||||
.read()
|
||||
.schema(REL_BEAN_ENC.schema())
|
||||
.json(graphBasePath + "/relation")
|
||||
.select(col("source"), col("target"), col("relClass"))
|
||||
.repartition(numPartitions, col("source"), col("target")); //reduces the join execution time
|
||||
|
||||
return relationsDS
|
||||
.join(vertices, relationsDS.col("source").equalTo(vertices.col(STRING_ID_COL)))
|
||||
.select(col(LONG_ID_COL).as("source"), col("target"), col("relClass"))
|
||||
.join(vertices, relationsDS.col("target").equalTo(vertices.col(STRING_ID_COL)))
|
||||
.select(col("source"), col(LONG_ID_COL).as("target"), col("relClass"));
|
||||
}
|
||||
|
||||
public RDD<Edge<String>> createEdges(Dataset<Row> nodeDS, Dataset<Row> relations, List<String> metapath, String edgeName, int maxIter, double fraction) throws Exception {
|
||||
public RDD<Edge<String>> applyMetapath(Dataset<Row> edges, List<String> metapath, String metapathEdgeName) throws Exception {
|
||||
|
||||
Dataset<Row> edgesRow = spark.createDataFrame(
|
||||
new ArrayList<>(), DataTypes.createStructType(
|
||||
new StructField[]{
|
||||
DataTypes.createStructField("src", DataTypes.StringType, false),
|
||||
DataTypes.createStructField("dst", DataTypes.StringType, false),
|
||||
DataTypes.createStructField("relClass", DataTypes.StringType, false)
|
||||
}
|
||||
)
|
||||
);
|
||||
switch(metapath.size()) {
|
||||
int pathLength = metapath.size();
|
||||
switch(pathLength) {
|
||||
case 1:
|
||||
edgesRow = relations
|
||||
.where(col("relClass").equalTo(metapath.get(0)))
|
||||
.select(col("source").as("src"), col("target").as("dst"), col("relClass"));
|
||||
edges = edges.where(col("relClass").equalTo(metapath.get(0)));
|
||||
break;
|
||||
case 2:
|
||||
Dataset<Row> edges_1 = relations
|
||||
Dataset<Row> edges1 = edges
|
||||
.where(col("relClass").equalTo(metapath.get(0)))
|
||||
.select(col("source").as("source_1"), col("target").as("target_1"));
|
||||
Dataset<Row> edges_2 = relations
|
||||
.withColumn("targetCount", count("*").over(Window.partitionBy("target")))
|
||||
.filter("targetCount >= 2")
|
||||
.drop("targetCount");
|
||||
|
||||
Dataset<Row> edges2 = edges
|
||||
.where(col("relClass").equalTo(metapath.get(1)))
|
||||
.select(col("source").as("source_2"), col("target").as("target_2"));
|
||||
|
||||
for (int i = 0; i < maxIter; i++) {
|
||||
Dataset<Row> edges_sample1 = edges_1.sample(fraction);
|
||||
Dataset<Row> edges_sample2 = edges_2.sample(fraction);
|
||||
Dataset<Row> joined = edges_sample1.join(
|
||||
edges_sample2,
|
||||
edges_sample1.col("target_1").equalTo(edges_sample2.col("source_2")),
|
||||
"inner"
|
||||
)
|
||||
.select(col("source_1").as("src"), col("target_2").as("dst"))
|
||||
.withColumn("relClass", lit(edgeName));
|
||||
|
||||
edgesRow = edgesRow.union(joined);
|
||||
}
|
||||
|
||||
edgesRow.distinct();
|
||||
.withColumn("sourceCount", count("*").over(Window.partitionBy("source")))
|
||||
.filter("sourceCount >= 2")
|
||||
.drop("sourceCount");
|
||||
|
||||
edges = edges1.union(edges2);
|
||||
break;
|
||||
default:
|
||||
throw new Exception("Metapath size not allowed");
|
||||
}
|
||||
// join with nodes to get longs instead of string ids
|
||||
return edgesRow
|
||||
.join(nodeDS, edgesRow.col("src").equalTo(nodeDS.col(STRING_ID_COL)))
|
||||
.select(col(LONG_ID_COL).as("src"), col("dst"), col("relClass"))
|
||||
.join(nodeDS, edgesRow.col("dst").equalTo(nodeDS.col(STRING_ID_COL)))
|
||||
.select(col("src"), col(LONG_ID_COL).as("dst"), col("relClass"))
|
||||
|
||||
return edges
|
||||
.toJavaRDD()
|
||||
.map(row ->
|
||||
new Edge<>(
|
||||
row.getLong(0),
|
||||
row.getLong(1),
|
||||
row.getString(2)
|
||||
metapathEdgeName
|
||||
)
|
||||
)
|
||||
.rdd();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,202 @@
|
|||
package eu.dnetlib.raid.jobs;
|
||||
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.raid.graph.GraphUtil;
|
||||
import eu.dnetlib.raid.support.ArgumentApplicationParser;
|
||||
import eu.dnetlib.raid.support.EdgeParam;
|
||||
import eu.dnetlib.raid.support.RAiDConfig;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
|
||||
import org.apache.spark.sql.types.DataTypes;
|
||||
import org.apache.spark.sql.types.StructField;
|
||||
import org.graphframes.GraphFrame;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.apache.spark.sql.functions.*;
|
||||
|
||||
|
||||
public class SparkCreateDocumentsGraphFrames extends AbstractSparkJob{
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkCreateDocumentsGraphFrames.class);
|
||||
private static final String ID_PATH = "$.id";
|
||||
private static final String DELETEDBYINFERENCE_PATH = "$.dataInfo.deletedbyinference";
|
||||
private static final Encoder<Relation> REL_BEAN_ENC = Encoders.bean(Relation.class);
|
||||
private static final String[] METAPATH_QUERY = new String[]{
|
||||
"(a)-[e]->(c)",
|
||||
"(a)-[e1]->(b); (b)-[e2]->(c)"
|
||||
};
|
||||
|
||||
public SparkCreateDocumentsGraphFrames(ArgumentApplicationParser parser, SparkSession spark) {
|
||||
super(parser, spark);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
readResource("/jobs/parameters/createDocuments_parameters.json", SparkCreateDocumentsGraphFrames.class)
|
||||
);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
SparkConf conf = new SparkConf()
|
||||
.set("spark.driver.maxResultSize", "4g");
|
||||
|
||||
|
||||
new SparkCreateDocumentsGraphFrames(
|
||||
parser,
|
||||
getSparkSession(conf)
|
||||
).run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() throws Exception {
|
||||
|
||||
// read oozie parameters
|
||||
final String graphBasePath = parser.get("graphBasePath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
final String outputPath = parser.get("outputPath");
|
||||
final String raidConfPath = parser.get("raidConfPath");
|
||||
final int numPartitions = Optional
|
||||
.ofNullable(parser.get("numPartitions"))
|
||||
.map(Integer::valueOf)
|
||||
.orElse(NUM_PARTITIONS);
|
||||
|
||||
log.info("graphBasePath: '{}'", graphBasePath);
|
||||
log.info("workingPath: '{}'", workingPath);
|
||||
log.info("outputPath: '{}'", outputPath);
|
||||
log.info("raidConfPath: '{}'", raidConfPath);
|
||||
log.info("numPartitions: '{}'", numPartitions);
|
||||
|
||||
RAiDConfig config = loadRAiDConfig(raidConfPath);
|
||||
spark.sparkContext().setCheckpointDir(workingPath + "/checkpoint");
|
||||
|
||||
Dataset<Row> vertices = createVertices(graphBasePath, config, numPartitions);
|
||||
log.info("Number of nodes: {}", vertices.count());
|
||||
|
||||
Dataset<Row> edges = createEdges(graphBasePath, config, numPartitions);
|
||||
log.info("Number of relations: {}", edges.count());
|
||||
|
||||
GraphFrame graph = GraphFrame.apply(vertices, edges);
|
||||
|
||||
Dataset<Row> labelDS = spark.createDataFrame(
|
||||
new ArrayList<>(), DataTypes.createStructType(
|
||||
new StructField[]{
|
||||
DataTypes.createStructField("id", DataTypes.StringType, false),
|
||||
DataTypes.createStructField("component", DataTypes.StringType, false)
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
for(EdgeParam edgeParam: config.getEdges()) {
|
||||
|
||||
GraphFrame metapathGraph = getMetapathGraph(
|
||||
graph,
|
||||
edgeParam.getMetapath(),
|
||||
edgeParam.getName()
|
||||
);
|
||||
|
||||
Dataset<Row> connectedComponents = GraphUtil.createConnectedComponents(
|
||||
spark,
|
||||
metapathGraph.toGraphX(),
|
||||
config.getParams().getOrDefault("maxIter", 20).intValue()
|
||||
);
|
||||
|
||||
//this is getting rid of those connected components of a single element (they are not meaningful ?)
|
||||
Dataset<Row> counts = connectedComponents
|
||||
.groupBy("component")
|
||||
.agg(functions.count("component").alias("count"))
|
||||
.filter(functions.col("count").gt(1));
|
||||
connectedComponents = connectedComponents
|
||||
.join(counts, connectedComponents.col("component").equalTo(counts.col("component")), "inner")
|
||||
.select(connectedComponents.col("id"), connectedComponents.col("component"));
|
||||
|
||||
labelDS = labelDS.union(connectedComponents);
|
||||
}
|
||||
|
||||
Dataset<Row> documents = labelDS
|
||||
.groupBy(col("id"))
|
||||
.agg(collect_list(labelDS.col("component")).as(LABELS_COL));
|
||||
|
||||
log.info("Labels generated: {}", documents.count());
|
||||
|
||||
documents.show();
|
||||
|
||||
documents.write().save(outputPath);
|
||||
|
||||
}
|
||||
|
||||
public Dataset<Row> createVertices(String graphBasePath, RAiDConfig config, int numPartitions) {
|
||||
// create nodes
|
||||
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
JavaRDD<String> nodes = sc.emptyRDD();
|
||||
for (String nodeType: config.getNodes()) {
|
||||
nodes = nodes.union(
|
||||
sc.textFile(graphBasePath + "/" + nodeType)
|
||||
// .filter(json -> !((boolean) JsonPath.read(json, DELETEDBYINFERENCE_PATH)))
|
||||
.map(json -> JsonPath.read(json, ID_PATH)));
|
||||
}
|
||||
return spark.createDataset(
|
||||
nodes.map(RowFactory::create).rdd(),
|
||||
RowEncoder.apply(
|
||||
DataTypes.createStructType(
|
||||
new StructField[]{
|
||||
DataTypes.createStructField(LONG_ID_COL, DataTypes.StringType, false)
|
||||
}
|
||||
)
|
||||
)
|
||||
)
|
||||
.repartition(numPartitions);
|
||||
}
|
||||
|
||||
public Dataset<Row> createEdges(String graphBasePath, RAiDConfig config, int numPartitions) {
|
||||
|
||||
Dataset<Row> relations = spark
|
||||
.read()
|
||||
.schema(REL_BEAN_ENC.schema())
|
||||
.json(graphBasePath + "/relation")
|
||||
.repartition(numPartitions);
|
||||
|
||||
return relations
|
||||
.select(col("source").as("src"), col("target").as("dst"), col("relClass"));
|
||||
}
|
||||
|
||||
public GraphFrame getMetapathGraph(GraphFrame graph, List<String> metapath, String metapathEdgeName) throws Exception {
|
||||
|
||||
int pathLength = metapath.size();
|
||||
Dataset<Row> metapathEdges;
|
||||
Dataset<Row> motIf;
|
||||
switch(metapath.size()) {
|
||||
case 1:
|
||||
motIf = graph
|
||||
.filterEdges(String.format("relClass == '%s'", metapath.get(0)))
|
||||
.find(METAPATH_QUERY[pathLength-1]);
|
||||
metapathEdges = motIf
|
||||
.filter(col("e.relClass").equalTo(metapath.get(0)));
|
||||
break;
|
||||
case 2:
|
||||
motIf = graph
|
||||
.filterEdges(String.format("relClass == '%s' OR relClass == '%s'", metapath.get(0), metapath.get(1)))
|
||||
.find(METAPATH_QUERY[pathLength-1]);
|
||||
metapathEdges = motIf
|
||||
.filter(col("e1.relClass").equalTo(metapath.get(0)).and(col("e2.relClass").equalTo(metapath.get(1))));
|
||||
break;
|
||||
default:
|
||||
throw new Exception("Metapath size not allowed");
|
||||
}
|
||||
metapathEdges = metapathEdges
|
||||
.select(col("a.id").as("src"), col("c.id").as("dst"))
|
||||
.withColumn("relClass", lit(metapathEdgeName));
|
||||
|
||||
return GraphFrame.apply(graph.vertices(), metapathEdges);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,227 @@
|
|||
package eu.dnetlib.raid.jobs;
|
||||
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.raid.graph.GraphUtil;
|
||||
import eu.dnetlib.raid.support.ArgumentApplicationParser;
|
||||
import eu.dnetlib.raid.support.EdgeParam;
|
||||
import eu.dnetlib.raid.support.RAiDConfig;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.graphx.Edge;
|
||||
import org.apache.spark.rdd.RDD;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
|
||||
import org.apache.spark.sql.types.DataTypes;
|
||||
import org.apache.spark.sql.types.StructField;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.apache.spark.sql.functions.*;
|
||||
|
||||
|
||||
public class SparkCreateDocumentsTesting extends AbstractSparkJob{
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkCreateDocumentsTesting.class);
|
||||
private static final String ID_PATH = "$.id";
|
||||
private static final String DELETEDBYINFERENCE_PATH = "$.dataInfo.deletedbyinference";
|
||||
private static final Encoder<Relation> REL_BEAN_ENC = Encoders.bean(Relation.class);
|
||||
|
||||
public SparkCreateDocumentsTesting(ArgumentApplicationParser parser, SparkSession spark) {
|
||||
super(parser, spark);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
readResource("/jobs/parameters/createDocuments_parameters.json", SparkCreateDocumentsTesting.class)
|
||||
);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
new SparkCreateDocumentsTesting(
|
||||
parser,
|
||||
getSparkSession(conf)
|
||||
).run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() throws Exception {
|
||||
|
||||
// read oozie parameters
|
||||
final String graphBasePath = parser.get("graphBasePath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
final String outputPath = parser.get("outputPath");
|
||||
final String raidConfPath = parser.get("raidConfPath");
|
||||
final int numPartitions = Optional
|
||||
.ofNullable(parser.get("numPartitions"))
|
||||
.map(Integer::valueOf)
|
||||
.orElse(NUM_PARTITIONS);
|
||||
|
||||
log.info("graphBasePath: '{}'", graphBasePath);
|
||||
log.info("workingPath: '{}'", workingPath);
|
||||
log.info("outputPath: '{}'", outputPath);
|
||||
log.info("raidConfPath: '{}'", raidConfPath);
|
||||
log.info("numPartitions: '{}'", numPartitions);
|
||||
|
||||
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
RAiDConfig config = loadRAiDConfig(raidConfPath);
|
||||
|
||||
Dataset<Row> nodeDS = prepareNodes(sc, graphBasePath, config, numPartitions);
|
||||
log.info("Number of nodes: {}", nodeDS.count());
|
||||
|
||||
RDD<Tuple2<Object, String>> vertices = createVertices(nodeDS);
|
||||
|
||||
Dataset<Row> relations = spark
|
||||
.read()
|
||||
.schema(REL_BEAN_ENC.schema())
|
||||
.json(graphBasePath + "/relation")
|
||||
.repartition(numPartitions);
|
||||
log.info("Number of relations: {}", relations.count());
|
||||
|
||||
Dataset<Row> labelDS = spark.createDataFrame(
|
||||
new ArrayList<>(), DataTypes.createStructType(
|
||||
new StructField[]{
|
||||
DataTypes.createStructField(LONG_ID_COL, DataTypes.LongType, false),
|
||||
DataTypes.createStructField(LABEL_ID_COL, DataTypes.StringType, false)
|
||||
}));
|
||||
|
||||
for(EdgeParam edgeParam: config.getEdges()) {
|
||||
|
||||
RDD<Edge<String>> edges = createEdges(
|
||||
nodeDS,
|
||||
relations,
|
||||
edgeParam.getMetapath(),
|
||||
edgeParam.getName(),
|
||||
config.getParams().getOrDefault("joinMaxIter", 20).intValue(),
|
||||
config.getParams().getOrDefault("joinSample", 0.1).doubleValue());
|
||||
|
||||
Dataset<Row> connectedComponents = GraphUtil.createConnectedComponents(spark, vertices, edges, 1);
|
||||
|
||||
//this is getting rid of those connected components of a single element (they are not meaningful ?)
|
||||
Dataset<Row> counts = connectedComponents
|
||||
.groupBy(LABEL_ID_COL)
|
||||
.agg(functions.count(LABEL_ID_COL).alias("count"))
|
||||
.filter(functions.col("count").gt(1));
|
||||
connectedComponents = connectedComponents
|
||||
.join(counts, connectedComponents.col(LABEL_ID_COL).equalTo(counts.col(LABEL_ID_COL)), "inner")
|
||||
.select(connectedComponents.col(LONG_ID_COL), connectedComponents.col(LABEL_ID_COL));
|
||||
|
||||
labelDS = labelDS.union(connectedComponents);
|
||||
|
||||
}
|
||||
|
||||
Dataset<Row> labels = labelDS
|
||||
.groupBy(col(LONG_ID_COL))
|
||||
.agg(collect_list(labelDS.col(LABEL_ID_COL)).as(LABELS_COL));
|
||||
|
||||
Dataset<Row> documents = nodeDS
|
||||
.join(labels, nodeDS.col(LONG_ID_COL).equalTo(labels.col(LONG_ID_COL)), "left")
|
||||
.select(col(STRING_ID_COL), col(LABELS_COL))
|
||||
.repartition(numPartitions);
|
||||
|
||||
log.info("Labels generated: {}", documents.count());
|
||||
|
||||
documents.write().save(outputPath);
|
||||
|
||||
}
|
||||
|
||||
public Dataset<Row> prepareNodes(JavaSparkContext sc, String graphBasePath, RAiDConfig config, int numPartitions) {
|
||||
// create nodes
|
||||
JavaRDD<String> nodes = sc.emptyRDD();
|
||||
for (String nodeType: config.getNodes()) {
|
||||
nodes = nodes.union(
|
||||
sc.textFile(graphBasePath + "/" + nodeType)
|
||||
// .filter(json -> !((boolean) JsonPath.read(json, DELETEDBYINFERENCE_PATH)))
|
||||
.map(json -> (String) JsonPath.read(json, ID_PATH)));
|
||||
}
|
||||
|
||||
return spark.createDataset(
|
||||
nodes.zipWithIndex().map(n -> RowFactory.create(n._1(), n._2())).rdd(),
|
||||
RowEncoder.apply(
|
||||
DataTypes.createStructType(
|
||||
new StructField[]{
|
||||
DataTypes.createStructField(STRING_ID_COL, DataTypes.StringType, false),
|
||||
DataTypes.createStructField(LONG_ID_COL, DataTypes.LongType, false)
|
||||
})
|
||||
))
|
||||
.repartition(numPartitions);
|
||||
}
|
||||
|
||||
public RDD<Tuple2<Object,String>> createVertices(Dataset<Row> nodeDS) {
|
||||
return nodeDS
|
||||
.toJavaRDD()
|
||||
.map(row -> new Tuple2<>(row.get(1), row.getString(0)))
|
||||
.rdd();
|
||||
}
|
||||
|
||||
public RDD<Edge<String>> createEdges(Dataset<Row> nodeDS, Dataset<Row> relations, List<String> metapath, String edgeName, int maxIter, double fraction) throws Exception {
|
||||
|
||||
Dataset<Row> edgesRow = spark.createDataFrame(
|
||||
new ArrayList<>(), DataTypes.createStructType(
|
||||
new StructField[]{
|
||||
DataTypes.createStructField("src", DataTypes.StringType, false),
|
||||
DataTypes.createStructField("dst", DataTypes.StringType, false),
|
||||
DataTypes.createStructField("relClass", DataTypes.StringType, false)
|
||||
}
|
||||
)
|
||||
);
|
||||
switch(metapath.size()) {
|
||||
case 1:
|
||||
edgesRow = relations
|
||||
.where(col("relClass").equalTo(metapath.get(0)))
|
||||
.select(col("source").as("src"), col("target").as("dst"), col("relClass"));
|
||||
break;
|
||||
case 2:
|
||||
Dataset<Row> edges_1 = relations
|
||||
.where(col("relClass").equalTo(metapath.get(0)))
|
||||
.select(col("source").as("source_1"), col("target").as("target_1"));
|
||||
Dataset<Row> edges_2 = relations
|
||||
.where(col("relClass").equalTo(metapath.get(1)))
|
||||
.select(col("source").as("source_2"), col("target").as("target_2"));
|
||||
|
||||
for (int i = 0; i < maxIter; i++) {
|
||||
Dataset<Row> edges_sample1 = edges_1.sample(fraction);
|
||||
Dataset<Row> edges_sample2 = edges_2.sample(fraction);
|
||||
Dataset<Row> joined = edges_sample1.join(
|
||||
edges_sample2,
|
||||
edges_sample1.col("target_1").equalTo(edges_sample2.col("source_2")),
|
||||
"inner"
|
||||
)
|
||||
.select(col("source_1").as("src"), col("target_2").as("dst"))
|
||||
.withColumn("relClass", lit(edgeName));
|
||||
|
||||
edgesRow = edgesRow.union(joined);
|
||||
}
|
||||
|
||||
edgesRow.distinct();
|
||||
|
||||
break;
|
||||
default:
|
||||
throw new Exception("Metapath size not allowed");
|
||||
}
|
||||
// join with nodes to get longs instead of string ids
|
||||
return edgesRow
|
||||
.join(nodeDS, edgesRow.col("src").equalTo(nodeDS.col(STRING_ID_COL)))
|
||||
.select(col(LONG_ID_COL).as("src"), col("dst"), col("relClass"))
|
||||
.join(nodeDS, edgesRow.col("dst").equalTo(nodeDS.col(STRING_ID_COL)))
|
||||
.select(col("src"), col(LONG_ID_COL).as("dst"), col("relClass"))
|
||||
.toJavaRDD()
|
||||
.map(row ->
|
||||
new Edge<>(
|
||||
row.getLong(0),
|
||||
row.getLong(1),
|
||||
row.getString(2)
|
||||
)
|
||||
)
|
||||
.rdd();
|
||||
}
|
||||
}
|
||||
|
|
@ -65,9 +65,9 @@ public class SparkCreateEmbeddings extends AbstractSparkJob{
|
|||
|
||||
Dataset<Row> documents = spark.read().load(documentsPath).filter(size(col(LABELS_COL)).gt(0)).filter(col(LABELS_COL).isNotNull());
|
||||
|
||||
Dataset<Row> word2vecEmbeddings = new Word2Vec()
|
||||
Dataset<Row> embeddings = new Word2Vec()
|
||||
.setInputCol(LABELS_COL)
|
||||
.setOutputCol("word2vec_" + EMBEDDING_COL)
|
||||
.setOutputCol(EMBEDDING_COL)
|
||||
.setVectorSize(config.getParams().getOrDefault("embeddingSize", 128).intValue())
|
||||
.setStepSize(config.getParams().getOrDefault("learningRate", 0.01).doubleValue())
|
||||
.setMinCount(config.getParams().getOrDefault("minCount", 1).intValue())
|
||||
|
|
@ -75,15 +75,17 @@ public class SparkCreateEmbeddings extends AbstractSparkJob{
|
|||
.fit(documents)
|
||||
.transform(documents);
|
||||
|
||||
Dataset<Row> embeddings = new Normalizer()
|
||||
.setInputCol("word2vec_" + EMBEDDING_COL)
|
||||
.setOutputCol(EMBEDDING_COL)
|
||||
.setP(2.0)
|
||||
.transform(word2vecEmbeddings)
|
||||
.select(col(STRING_ID_COL), col(EMBEDDING_COL));
|
||||
// embeddings = new Normalizer()
|
||||
// .setInputCol("word2vec" + EMBEDDING_COL)
|
||||
// .setOutputCol(EMBEDDING_COL)
|
||||
// .setP(2.0)
|
||||
// .transform(embeddings)
|
||||
// .select(col("id"), col(EMBEDDING_COL));
|
||||
|
||||
log.info("Embeddings generated: {}", embeddings.count());
|
||||
|
||||
embeddings.show();
|
||||
|
||||
embeddings.write().save(outputPath);
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,131 +0,0 @@
|
|||
package eu.dnetlib.raid.jobs;
|
||||
|
||||
import com.google.common.collect.Iterators;
|
||||
import eu.dnetlib.raid.support.ArgumentApplicationParser;
|
||||
import eu.dnetlib.raid.support.RAiDConfig;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.MapPartitionsFunction;
|
||||
import org.apache.spark.ml.clustering.KMeans;
|
||||
import org.apache.spark.ml.linalg.DenseVector;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.apache.spark.sql.types.DataTypes;
|
||||
import org.apache.spark.sql.types.StructField;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import smile.clustering.DBSCAN;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.apache.spark.sql.functions.col;
|
||||
|
||||
public class SparkRAiDClustering extends AbstractSparkJob {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkRAiDClustering.class);
|
||||
|
||||
public SparkRAiDClustering(ArgumentApplicationParser parser, SparkSession spark) {
|
||||
super(parser, spark);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
readResource("/jobs/parameters/createClusters_parameters.json", SparkRAiDClustering.class)
|
||||
);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
new SparkRAiDClustering(
|
||||
parser,
|
||||
getSparkSession(conf)
|
||||
).run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() throws Exception {
|
||||
|
||||
// read oozie parameters
|
||||
final String inputPath = parser.get("inputPath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
final String outputPath = parser.get("outputPath");
|
||||
final String raidConfPath = parser.get("raidConfPath");
|
||||
final int numPartitions = Optional
|
||||
.ofNullable(parser.get("numPartitions"))
|
||||
.map(Integer::valueOf)
|
||||
.orElse(NUM_PARTITIONS);
|
||||
|
||||
log.info("inputPath: '{}'", inputPath);
|
||||
log.info("workingPath: '{}'", workingPath);
|
||||
log.info("outputPath: '{}'", outputPath);
|
||||
log.info("raidConfPath: '{}'", raidConfPath);
|
||||
log.info("numPartitions: '{}'", numPartitions);
|
||||
|
||||
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
RAiDConfig config = loadRAiDConfig(raidConfPath);
|
||||
|
||||
MapPartitionsFunction<Row, Row> dbScan = partition -> {
|
||||
|
||||
List<double[]> points = new ArrayList<>();
|
||||
List<String> ids = new ArrayList<>();
|
||||
int index = 0; //partition index
|
||||
|
||||
while (partition.hasNext()) {
|
||||
Row row = partition.next();
|
||||
index = row.getAs(PARTITION_COL);
|
||||
double[] embedding = ((DenseVector) row.getAs(EMBEDDING_COL)).toArray();
|
||||
ids.add(row.getString(row.fieldIndex(STRING_ID_COL)));
|
||||
points.add(embedding);
|
||||
}
|
||||
|
||||
if (points.size() > 0 ) {
|
||||
|
||||
DBSCAN<double[]> dbscan = DBSCAN.fit(
|
||||
points.toArray(new double[0][]),
|
||||
config.getParams().getOrDefault("minPts", 1).intValue(),
|
||||
config.getParams().getOrDefault("epsilon", 0.1).doubleValue()
|
||||
);
|
||||
|
||||
List<Row> results = new ArrayList<>();
|
||||
for (int i = 0; i < dbscan.y.length; i++) {
|
||||
int clusterId = dbscan.y[i] == Integer.MAX_VALUE ? -1 : index * 1000 + dbscan.y[i];
|
||||
Row resultRow = RowFactory.create(ids.get(i), clusterId);
|
||||
results.add(resultRow);
|
||||
}
|
||||
|
||||
return results.iterator();
|
||||
|
||||
}
|
||||
else {
|
||||
return Iterators.emptyIterator();
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
final Dataset<Row> embeddings = spark.read().load(inputPath);
|
||||
|
||||
Dataset<Row> clusteredData = new KMeans()
|
||||
.setK(numPartitions)
|
||||
.setSeed(SEED)
|
||||
.setFeaturesCol(EMBEDDING_COL)
|
||||
.setPredictionCol(PARTITION_COL)
|
||||
.setMaxIter(config.getParams().getOrDefault("partitioningMaxIter", 10).intValue())
|
||||
.fit(embeddings)
|
||||
.transform(embeddings)
|
||||
.repartition(col(PARTITION_COL))
|
||||
.mapPartitions(dbScan, Encoders.kryo(Row.class));
|
||||
|
||||
Dataset<Row> result = spark
|
||||
.createDataFrame(clusteredData.rdd(), DataTypes.createStructType(new StructField[]{DataTypes.createStructField(STRING_ID_COL, DataTypes.StringType, false), DataTypes.createStructField(LABEL_ID_COL, DataTypes.IntegerType, false)}));
|
||||
|
||||
// result.show(false);
|
||||
|
||||
result.write().save(outputPath);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,121 @@
|
|||
package eu.dnetlib.raid.support;
|
||||
|
||||
import com.google.common.collect.Iterators;
|
||||
import org.apache.commons.math3.linear.ArrayRealVector;
|
||||
import org.apache.commons.math3.linear.RealVector;
|
||||
import org.apache.commons.math3.ml.clustering.Cluster;
|
||||
import org.apache.commons.math3.ml.clustering.DBSCANClusterer;
|
||||
import org.apache.spark.api.java.function.MapPartitionsFunction;
|
||||
import org.apache.spark.ml.linalg.DenseVector;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.RowFactory;
|
||||
import smile.clustering.DBSCAN;
|
||||
import smile.math.distance.Distance;
|
||||
import smile.math.distance.EuclideanDistance;
|
||||
|
||||
import javax.validation.constraints.Pattern;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Spliterators;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
public class ClusteringAlgorithms {
|
||||
|
||||
private static final Distance<DataPoint> euclideanDistance = (d1, d2) -> new EuclideanDistance().d(d1.getPoint(), d2.getPoint());
|
||||
private static final Distance<DataPoint> cosineDistance = (d1, d2) -> cosineSimilarity(d1.getPoint(), d2.getPoint());
|
||||
public static MapPartitionsFunction<Row, Row> getSmileDBSCAN(String partitionCol, String embeddingCol, String idCol, int minPts, double epsilon, @Pattern(regexp = "cosine|euclidean")String distanceFunction) {
|
||||
|
||||
return partition -> {
|
||||
|
||||
DataPoint[] points = StreamSupport.stream(Spliterators.spliteratorUnknownSize(partition, 0), false)
|
||||
.map(row -> new DataPoint(
|
||||
((DenseVector) row.getAs(embeddingCol)).toArray(),
|
||||
row.getString(row.fieldIndex(idCol)),
|
||||
row.getAs(partitionCol))
|
||||
).toArray(DataPoint[]::new);
|
||||
|
||||
if (points.length > 0 ) {
|
||||
DBSCAN<DataPoint> dbscan = DBSCAN.fit(
|
||||
points,
|
||||
distanceFunction.equals("cosine")? cosineDistance : euclideanDistance,
|
||||
minPts,
|
||||
epsilon
|
||||
);
|
||||
|
||||
return IntStream.range(0, points.length)
|
||||
.mapToObj(i -> RowFactory.create(points[i].getId(), dbscan.y[i] == Integer.MAX_VALUE ? -1 : points[i].getPartitionIndex() * 1000 + dbscan.y[i]))
|
||||
.iterator();
|
||||
|
||||
}
|
||||
else {
|
||||
return Iterators.emptyIterator();
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
public static MapPartitionsFunction<Row, Row> getApacheDBSCAN(String partitionCol, String embeddingCol, String idCol, int minPts, double epsilon, @Pattern(regexp = "cosine|euclidean") String distanceFunction) {
|
||||
return partition -> {
|
||||
|
||||
DBSCANClusterer<DataPoint> clusterer;
|
||||
switch(distanceFunction) {
|
||||
case "cosine":
|
||||
clusterer = new DBSCANClusterer<>(
|
||||
epsilon,
|
||||
minPts,
|
||||
ClusteringAlgorithms::cosineSimilarity
|
||||
);
|
||||
break;
|
||||
case "euclidean":
|
||||
clusterer = new DBSCANClusterer<>(
|
||||
epsilon,
|
||||
minPts
|
||||
);
|
||||
break;
|
||||
default:
|
||||
throw new NoSuchMethodException();
|
||||
}
|
||||
|
||||
List<DataPoint> points = StreamSupport.stream(Spliterators.spliteratorUnknownSize(partition, 0), false)
|
||||
.map(row -> new DataPoint(
|
||||
((DenseVector) row.getAs(embeddingCol)).toArray(),
|
||||
row.getString(row.fieldIndex(idCol)),
|
||||
row.getAs(partitionCol))
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (points.size()>0){
|
||||
|
||||
List<Cluster<DataPoint>> clusters = clusterer.cluster(points);
|
||||
|
||||
int nCluster = 0;
|
||||
List<Row> rows = new ArrayList<>();
|
||||
for(Cluster<DataPoint> cluster: clusters) {
|
||||
for(DataPoint dp: cluster.getPoints()) {
|
||||
rows.add(RowFactory.create(dp.getId(), nCluster + dp.getPartitionIndex() * 2000));
|
||||
}
|
||||
nCluster+=1;
|
||||
}
|
||||
|
||||
return rows.iterator();
|
||||
}
|
||||
else {
|
||||
return Iterators.emptyIterator();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static double cosineSimilarity(double[] a, double[] b) {
|
||||
RealVector v1 = new ArrayRealVector(a);
|
||||
RealVector v2 = new ArrayRealVector(b);
|
||||
|
||||
double dotProduct = v1.dotProduct(v2);
|
||||
double normA = v1.getNorm();
|
||||
double normB = v2.getNorm();
|
||||
|
||||
return 1 - dotProduct / (normA * normB);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,30 @@
|
|||
package eu.dnetlib.raid.support;
|
||||
|
||||
import org.apache.commons.math3.ml.clustering.Clusterable;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class DataPoint implements Clusterable, Serializable {
|
||||
private final double[] point;
|
||||
private final String id;
|
||||
private final int partitionIndex;
|
||||
|
||||
public DataPoint(double[] point, String id, int partitionIndex) {
|
||||
this.point = point;
|
||||
this.id = id;
|
||||
this.partitionIndex = partitionIndex;
|
||||
}
|
||||
|
||||
@Override
|
||||
public double[] getPoint() {
|
||||
return point;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public int getPartitionIndex() {
|
||||
return partitionIndex;
|
||||
}
|
||||
}
|
||||
|
|
@ -11,6 +11,7 @@ import java.util.Map;
|
|||
public class RAiDConfig implements Serializable {
|
||||
|
||||
private List<String> nodes; // list of nodes to be considered
|
||||
private List<String> entities; //list of nodes to be clustered
|
||||
|
||||
private List<EdgeParam> edges; //list of edges to be created
|
||||
|
||||
|
|
@ -22,8 +23,9 @@ public class RAiDConfig implements Serializable {
|
|||
|
||||
}
|
||||
|
||||
public RAiDConfig(List<String> nodes, List<EdgeParam> edges, Map<String, RandomWalkParam> randomWalks, Map<String, Number> params) {
|
||||
public RAiDConfig(List<String> nodes, List<String> entities, List<EdgeParam> edges, Map<String, RandomWalkParam> randomWalks, Map<String, Number> params) {
|
||||
this.nodes = nodes;
|
||||
this.entities = entities;
|
||||
this.edges = edges;
|
||||
this.randomWalks = randomWalks;
|
||||
this.params = params;
|
||||
|
|
@ -37,6 +39,14 @@ public class RAiDConfig implements Serializable {
|
|||
this.nodes = nodes;
|
||||
}
|
||||
|
||||
public List<String> getEntities() {
|
||||
return entities;
|
||||
}
|
||||
|
||||
public void setEntities(List<String> entities) {
|
||||
this.entities = entities;
|
||||
}
|
||||
|
||||
public List<EdgeParam> getEdges() {
|
||||
return edges;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -83,12 +83,23 @@
|
|||
</configuration>
|
||||
</global>
|
||||
|
||||
<start to="CreateClusters"/>
|
||||
<start to="resetWorkingPath"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="resetWorkingPath">
|
||||
<fs>
|
||||
<!-- <delete path="${workingPath}"/>-->
|
||||
<!-- <delete path="${documentsPath}"/>-->
|
||||
<delete path="${clustersPath}"/>
|
||||
<!-- <delete path="${embeddingsPath}"/>-->
|
||||
</fs>
|
||||
<ok to="CreateClusters"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="CreateDocuments">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
|
|
@ -152,7 +163,7 @@
|
|||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Create Clusters</name>
|
||||
<class>eu.dnetlib.raid.jobs.SparkRAiDClustering</class>
|
||||
<class>eu.dnetlib.raid.jobs.SparkClustering</class>
|
||||
<jar>dhp-raid-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--num-executors=32
|
||||
|
|
|
|||
|
|
@ -45,9 +45,20 @@ object GraphUtil {
|
|||
val cc : RDD[Row] = graph.connectedComponents(maxIter).vertices.map(tuple => Row(tuple._1, tuple._2)).rdd
|
||||
|
||||
spark.createDataFrame(cc.rdd, StructType(Seq(
|
||||
StructField("longId", LongType, nullable = false),
|
||||
StructField("id", LongType, nullable = false),
|
||||
StructField("labelId", LongType, nullable = false)
|
||||
)))
|
||||
}
|
||||
|
||||
def createConnectedComponents(spark: SparkSession, graph: Graph[Row,Row], maxIter: Int): DataFrame = {
|
||||
val vertices : RDD[(VertexId,Row)] = graph.vertices.rdd
|
||||
val cc : RDD[(VertexId, VertexId)] = graph.connectedComponents(maxIter).vertices.rdd
|
||||
val res : RDD[Row] = vertices.rdd.join(cc).map(tuple => Row(tuple._2._1.get(0), tuple._2._2))
|
||||
spark.createDataFrame(res, StructType(Seq(
|
||||
StructField("id", StringType, nullable = false),
|
||||
StructField("component", LongType, nullable = false)
|
||||
)))
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,10 +1,6 @@
|
|||
package eu.dnetlib.raid;
|
||||
|
||||
import com.sun.media.jfxmedia.logging.Logger;
|
||||
import eu.dnetlib.raid.jobs.AbstractSparkJob;
|
||||
import eu.dnetlib.raid.jobs.SparkCreateDocuments;
|
||||
import eu.dnetlib.raid.jobs.SparkCreateEmbeddings;
|
||||
import eu.dnetlib.raid.jobs.SparkRAiDClustering;
|
||||
import eu.dnetlib.raid.jobs.*;
|
||||
import eu.dnetlib.raid.support.ArgumentApplicationParser;
|
||||
import eu.dnetlib.raid.support.RAiDConfig;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
|
@ -42,7 +38,7 @@ public class RAiDInferenceTest {
|
|||
final static String clustersPath = "/tmp/clusters";
|
||||
final static String documentsPath = "/tmp/documents";
|
||||
|
||||
final static String numPartitions = "3";
|
||||
final static String numPartitions = "2";
|
||||
|
||||
final String raidConfPath = Paths
|
||||
.get(RAiDInferenceTest.class.getResource("/eu/dnetlib/raid/config/raid.conf.json").toURI())
|
||||
|
|
@ -73,6 +69,7 @@ public class RAiDInferenceTest {
|
|||
.master("local[*]")
|
||||
.getOrCreate();
|
||||
context = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
context.setCheckpointDir(workingPath + "/checkpoint");
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -98,6 +95,7 @@ public class RAiDInferenceTest {
|
|||
clazz.getResourceAsStream(path));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
@Order(1)
|
||||
public void createDocumentsTest() throws Exception {
|
||||
|
|
@ -122,7 +120,7 @@ public class RAiDInferenceTest {
|
|||
|
||||
@Test
|
||||
@Order(2)
|
||||
public void createRAiDEmbeddingsTest() throws Exception {
|
||||
public void createEmbeddingsTest() throws Exception {
|
||||
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(readResource("/jobs/parameters/createEmbeddings_parameters.json", SparkCreateEmbeddings.class));
|
||||
|
||||
|
|
@ -146,7 +144,7 @@ public class RAiDInferenceTest {
|
|||
@Order(3)
|
||||
public void createClustersTest() throws Exception {
|
||||
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(readResource("/jobs/parameters/createClusters_parameters.json", SparkRAiDClustering.class));
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(readResource("/jobs/parameters/createClusters_parameters.json", SparkClustering.class));
|
||||
|
||||
parser.parseArgument(
|
||||
new String[] {
|
||||
|
|
@ -158,7 +156,7 @@ public class RAiDInferenceTest {
|
|||
}
|
||||
);
|
||||
|
||||
new SparkRAiDClustering(
|
||||
new SparkClustering(
|
||||
parser,
|
||||
spark
|
||||
).run();
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
{
|
||||
"nodes": ["publication", "software", "dataset"],
|
||||
"nodes": ["publication", "software", "dataset", "organization", "project"],
|
||||
"entities": ["publication", "software", "dataset"],
|
||||
"edges": [
|
||||
{
|
||||
"name": "coproduced",
|
||||
|
|
@ -31,12 +32,10 @@
|
|||
},
|
||||
"params": {
|
||||
"embeddingSize": 4,
|
||||
"maxIter": 10,
|
||||
"joinMaxIter": 20,
|
||||
"joinSample": 0.3,
|
||||
"maxIter": 20,
|
||||
"partitioningMaxIter": 3,
|
||||
"learningRate": 0.01,
|
||||
"epsilon": 0.1,
|
||||
"minPts": 1
|
||||
"epsilon": 0.15,
|
||||
"minPts": 2
|
||||
}
|
||||
}
|
||||
|
|
@ -1,3 +1,3 @@
|
|||
{"dataInfo": {"deletedbyinference": false}, "author": [],"description": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": { "classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions" }, "trust": "0.9" }, "value": "Description of dataset 1" }],"id": "50|dataset::1","pid": [],"subject": [],"title": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": { "classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions" }, "trust": "0.9" }, "qualifier": { "classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title" }, "value": "Dataset 1" }]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "author": [],"description": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": { "classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions" }, "trust": "0.9" }, "value": "Description of dataset 2" }],"id": "50|dataset::2","pid": [],"subject": [],"title": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": { "classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions" }, "trust": "0.9" }, "qualifier": { "classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title" }, "value": "Dataset 2" }]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "author": [],"description": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": { "classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions" }, "trust": "0.9" }, "value": "Description of dataset 3" }],"id": "50|dataset::3","pid": [],"subject": [],"title": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": { "classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions" }, "trust": "0.9" }, "qualifier": { "classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title" }, "value": "Dataset 3" }]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "resulttype": {"classname": "dataset"}, "author": [],"description": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": { "classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions" }, "trust": "0.9" }, "value": "Description of dataset 1" }],"id": "50|dataset::1","pid": [],"subject": [],"title": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": { "classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions" }, "trust": "0.9" }, "qualifier": { "classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title" }, "value": "Dataset 1" }]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "resulttype": {"classname": "dataset"}, "author": [],"description": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": { "classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions" }, "trust": "0.9" }, "value": "Description of dataset 2" }],"id": "50|dataset::2","pid": [],"subject": [],"title": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": { "classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions" }, "trust": "0.9" }, "qualifier": { "classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title" }, "value": "Dataset 2" }]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "resulttype": {"classname": "dataset"}, "author": [],"description": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": { "classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions" }, "trust": "0.9" }, "value": "Description of dataset 3" }],"id": "50|dataset::3","pid": [],"subject": [],"title": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": { "classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions" }, "trust": "0.9" }, "qualifier": { "classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title" }, "value": "Dataset 3" }]}
|
||||
|
|
@ -0,0 +1 @@
|
|||
{"dataInfo": {"deletedbyinference": false}, "author": [], "description": [{"dataInfo": {"deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9" }, "value": "Description of organization 1"}], "id": "20|organization::1", "pid": [], "title": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9"}, "qualifier": { "classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title"},"value": "Organization 1"}]}
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
{"dataInfo": {"deletedbyinference": false}, "author": [], "description": [{"dataInfo": {"deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9" }, "value": "Description of project 1"}], "id": "40|project::1", "pid": [], "title": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9"}, "qualifier": { "classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title"},"value": "Project 1"}]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "author": [], "description": [{"dataInfo": {"deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9" }, "value": "Description of project 2"}], "id": "40|project::2", "pid": [], "title": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9"}, "qualifier": { "classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title"},"value": "Project 2"}]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "author": [], "description": [{"dataInfo": {"deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9" }, "value": "Description of project 3"}], "id": "40|project::3", "pid": [], "title": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9"}, "qualifier": { "classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title"},"value": "Project 3"}]}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
{"dataInfo": {"deletedbyinference": false}, "author": [], "description": [{"dataInfo": {"deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9" }, "value": "Description of publication 1"}], "id": "50|publication::1", "pid": [], "title": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9"}, "qualifier": { "classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title"},"value": "Publication 1"}]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "author": [], "description": [{"dataInfo": {"deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9" }, "value": "Description of publication 2"}], "id": "50|publication::2", "pid": [], "title": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9"}, "qualifier": { "classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title"},"value": "Publication 2"}]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "author": [], "description": [{"dataInfo": {"deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9" }, "value": "Description of publication 3"}], "id": "50|publication::3", "pid": [], "title": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9"}, "qualifier": { "classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title"},"value": "Publication 3"}]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "author": [], "description": [{"dataInfo": {"deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9" }, "value": "Description of publication 4"}], "id": "50|publication::4", "pid": [], "title": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9"}, "qualifier": { "classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title"},"value": "Publication 4"}]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "resulttype": {"classname": "publication"}, "author": [], "description": [{"dataInfo": {"deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9" }, "value": "Description of publication 1"}], "id": "50|publication::1", "pid": [], "title": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9"}, "qualifier": { "classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title"},"value": "Publication 1"}]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "resulttype": {"classname": "publication"}, "author": [], "description": [{"dataInfo": {"deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9" }, "value": "Description of publication 2"}], "id": "50|publication::2", "pid": [], "title": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9"}, "qualifier": { "classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title"},"value": "Publication 2"}]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "resulttype": {"classname": "publication"}, "author": [], "description": [{"dataInfo": {"deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9" }, "value": "Description of publication 3"}], "id": "50|publication::3", "pid": [], "title": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9"}, "qualifier": { "classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title"},"value": "Publication 3"}]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "resulttype": {"classname": "publication"}, "author": [], "description": [{"dataInfo": {"deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9" }, "value": "Description of publication 4"}], "id": "50|publication::4", "pid": [], "title": [ { "dataInfo": { "deletedbyinference": false, "inferred": false, "invisible": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "trust": "0.9"}, "qualifier": { "classid": "main title", "classname": "main title", "schemeid": "dnet:dataCite_title", "schemename": "dnet:dataCite_title"},"value": "Publication 4"}]}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
{"dataInfo": {"deletedbyinference": false}, "author": [],"description": [{"dataInfo": {"deletedbyinference": false,"inferenceprovenance": "","inferred": false,"invisible": false,"provenanceaction": {"classid": "sysimport:crosswalk:datasetarchive","classname": "Harvested","schemeid": "dnet:provenanceActions","schemename": "dnet:provenanceActions"},"trust": "0.9"},"value": "Description of software 1"}],"id": "50|software::1","originalId": ["amphoranet","50|__bioTools__::393e61aebe3ef9bc8701c8dc843e08f2"],"pid": [],"source": [],"subject": [],"title": [{"dataInfo": {"deletedbyinference": false,"inferenceprovenance": "","inferred": false,"invisible": false,"provenanceaction": {"classid": "sysimport:crosswalk:datasetarchive","classname": "Harvested","schemeid": "dnet:provenanceActions","schemename": "dnet:provenanceActions"},"trust": "0.9"},"qualifier": {"classid": "main title","classname": "main title","schemeid": "dnet:dataCite_title","schemename": "dnet:dataCite_title"},"value": "Software 1"}]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "author": [],"description": [{"dataInfo": {"deletedbyinference": false,"inferenceprovenance": "","inferred": false,"invisible": false,"provenanceaction": {"classid": "sysimport:crosswalk:datasetarchive","classname": "Harvested","schemeid": "dnet:provenanceActions","schemename": "dnet:provenanceActions"},"trust": "0.9"},"value": "Description of software 2"}],"id": "50|software::2","originalId": ["amphoranet","50|__bioTools__::393e61aebe3ef9bc8701c8dc843e08f2"],"pid": [],"source": [],"subject": [],"title": [{"dataInfo": {"deletedbyinference": false,"inferenceprovenance": "","inferred": false,"invisible": false,"provenanceaction": {"classid": "sysimport:crosswalk:datasetarchive","classname": "Harvested","schemeid": "dnet:provenanceActions","schemename": "dnet:provenanceActions"},"trust": "0.9"},"qualifier": {"classid": "main title","classname": "main title","schemeid": "dnet:dataCite_title","schemename": "dnet:dataCite_title"},"value": "Software 2"}]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "author": [],"description": [{"dataInfo": {"deletedbyinference": false,"inferenceprovenance": "","inferred": false,"invisible": false,"provenanceaction": {"classid": "sysimport:crosswalk:datasetarchive","classname": "Harvested","schemeid": "dnet:provenanceActions","schemename": "dnet:provenanceActions"},"trust": "0.9"},"value": "Description of software 3"}],"id": "50|software::3","originalId": ["amphoranet","50|__bioTools__::393e61aebe3ef9bc8701c8dc843e08f2"],"pid": [],"source": [],"subject": [],"title": [{"dataInfo": {"deletedbyinference": false,"inferenceprovenance": "","inferred": false,"invisible": false,"provenanceaction": {"classid": "sysimport:crosswalk:datasetarchive","classname": "Harvested","schemeid": "dnet:provenanceActions","schemename": "dnet:provenanceActions"},"trust": "0.9"},"qualifier": {"classid": "main title","classname": "main title","schemeid": "dnet:dataCite_title","schemename": "dnet:dataCite_title"},"value": "Software 3"}]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "author": [],"description": [{"dataInfo": {"deletedbyinference": false,"inferenceprovenance": "","inferred": false,"invisible": false,"provenanceaction": {"classid": "sysimport:crosswalk:datasetarchive","classname": "Harvested","schemeid": "dnet:provenanceActions","schemename": "dnet:provenanceActions"},"trust": "0.9"},"value": "Description of software 4"}],"id": "50|software::4","originalId": ["amphoranet","50|__bioTools__::393e61aebe3ef9bc8701c8dc843e08f2"],"pid": [],"source": [],"subject": [],"title": [{"dataInfo": {"deletedbyinference": false,"inferenceprovenance": "","inferred": false,"invisible": false,"provenanceaction": {"classid": "sysimport:crosswalk:datasetarchive","classname": "Harvested","schemeid": "dnet:provenanceActions","schemename": "dnet:provenanceActions"},"trust": "0.9"},"qualifier": {"classid": "main title","classname": "main title","schemeid": "dnet:dataCite_title","schemename": "dnet:dataCite_title"},"value": "Software 4"}]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "resulttype": {"classname": "software"}, "author": [],"description": [{"dataInfo": {"deletedbyinference": false,"inferenceprovenance": "","inferred": false,"invisible": false,"provenanceaction": {"classid": "sysimport:crosswalk:datasetarchive","classname": "Harvested","schemeid": "dnet:provenanceActions","schemename": "dnet:provenanceActions"},"trust": "0.9"},"value": "Description of software 1"}],"id": "50|software::1","originalId": ["amphoranet","50|__bioTools__::393e61aebe3ef9bc8701c8dc843e08f2"],"pid": [],"source": [],"subject": [],"title": [{"dataInfo": {"deletedbyinference": false,"inferenceprovenance": "","inferred": false,"invisible": false,"provenanceaction": {"classid": "sysimport:crosswalk:datasetarchive","classname": "Harvested","schemeid": "dnet:provenanceActions","schemename": "dnet:provenanceActions"},"trust": "0.9"},"qualifier": {"classid": "main title","classname": "main title","schemeid": "dnet:dataCite_title","schemename": "dnet:dataCite_title"},"value": "Software 1"}]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "resulttype": {"classname": "software"}, "author": [],"description": [{"dataInfo": {"deletedbyinference": false,"inferenceprovenance": "","inferred": false,"invisible": false,"provenanceaction": {"classid": "sysimport:crosswalk:datasetarchive","classname": "Harvested","schemeid": "dnet:provenanceActions","schemename": "dnet:provenanceActions"},"trust": "0.9"},"value": "Description of software 2"}],"id": "50|software::2","originalId": ["amphoranet","50|__bioTools__::393e61aebe3ef9bc8701c8dc843e08f2"],"pid": [],"source": [],"subject": [],"title": [{"dataInfo": {"deletedbyinference": false,"inferenceprovenance": "","inferred": false,"invisible": false,"provenanceaction": {"classid": "sysimport:crosswalk:datasetarchive","classname": "Harvested","schemeid": "dnet:provenanceActions","schemename": "dnet:provenanceActions"},"trust": "0.9"},"qualifier": {"classid": "main title","classname": "main title","schemeid": "dnet:dataCite_title","schemename": "dnet:dataCite_title"},"value": "Software 2"}]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "resulttype": {"classname": "software"}, "author": [],"description": [{"dataInfo": {"deletedbyinference": false,"inferenceprovenance": "","inferred": false,"invisible": false,"provenanceaction": {"classid": "sysimport:crosswalk:datasetarchive","classname": "Harvested","schemeid": "dnet:provenanceActions","schemename": "dnet:provenanceActions"},"trust": "0.9"},"value": "Description of software 3"}],"id": "50|software::3","originalId": ["amphoranet","50|__bioTools__::393e61aebe3ef9bc8701c8dc843e08f2"],"pid": [],"source": [],"subject": [],"title": [{"dataInfo": {"deletedbyinference": false,"inferenceprovenance": "","inferred": false,"invisible": false,"provenanceaction": {"classid": "sysimport:crosswalk:datasetarchive","classname": "Harvested","schemeid": "dnet:provenanceActions","schemename": "dnet:provenanceActions"},"trust": "0.9"},"qualifier": {"classid": "main title","classname": "main title","schemeid": "dnet:dataCite_title","schemename": "dnet:dataCite_title"},"value": "Software 3"}]}
|
||||
{"dataInfo": {"deletedbyinference": false}, "resulttype": {"classname": "software"}, "author": [],"description": [{"dataInfo": {"deletedbyinference": false,"inferenceprovenance": "","inferred": false,"invisible": false,"provenanceaction": {"classid": "sysimport:crosswalk:datasetarchive","classname": "Harvested","schemeid": "dnet:provenanceActions","schemename": "dnet:provenanceActions"},"trust": "0.9"},"value": "Description of software 4"}],"id": "50|software::4","originalId": ["amphoranet","50|__bioTools__::393e61aebe3ef9bc8701c8dc843e08f2"],"pid": [],"source": [],"subject": [],"title": [{"dataInfo": {"deletedbyinference": false,"inferenceprovenance": "","inferred": false,"invisible": false,"provenanceaction": {"classid": "sysimport:crosswalk:datasetarchive","classname": "Harvested","schemeid": "dnet:provenanceActions","schemename": "dnet:provenanceActions"},"trust": "0.9"},"qualifier": {"classid": "main title","classname": "main title","schemeid": "dnet:dataCite_title","schemename": "dnet:dataCite_title"},"value": "Software 4"}]}
|
||||
18
pom.xml
18
pom.xml
|
|
@ -101,6 +101,16 @@
|
|||
<enabled>false</enabled>
|
||||
</snapshots>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>bintray</id>
|
||||
<name>Bintray Repository</name>
|
||||
<url>https://repos.spark-packages.org</url>
|
||||
</repository>
|
||||
|
||||
<repository>
|
||||
<id>alitouka-repo</id>
|
||||
<url>http://alitouka-public.s3-website-us-east-1.amazonaws.com/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
<build>
|
||||
<directory>target</directory>
|
||||
|
|
@ -347,7 +357,7 @@
|
|||
<dependency>
|
||||
<groupId>com.github.haifengl</groupId>
|
||||
<artifactId>smile-core</artifactId>
|
||||
<version>2.5.3</version>
|
||||
<version>3.1.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
|
|
@ -356,6 +366,12 @@
|
|||
<version>${json4s.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>graphframes</groupId>
|
||||
<artifactId>graphframes</artifactId>
|
||||
<version>0.8.1-spark2.4-s_2.11</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
</dependencyManagement>
|
||||
|
|
|
|||
Loading…
Reference in New Issue