implementation of the LDA pipeline

This commit is contained in:
Michele De Bonis 2023-04-11 09:01:19 +02:00
parent 2355f8f65e
commit 4268891d3a
19 changed files with 371 additions and 46 deletions

View File

@ -4,7 +4,7 @@ entitiesPath = /tmp/publications_with_pid_pubmed
inputFieldJPath = $.description[0].value
vocabularyPath = /tmp/lda_working_dir/essential_science_vocabulary
vocabularyType = file
trainRatio = 0.8
numTopics = 5,10,15,20,25,30,35,40,45,50
maxIterations = 200
outputModelPath = /tmp/lda_working_dir/bestLdaModel
trainRatio = 0.6
numTopics = 5,10,15,20,25,30,35,40,45,50,55,60,65,70,75,80,85,90,95,100
maxIterations = 300
outputModelPath = /tmp/lda_working_dir/bestLdaModel

View File

@ -46,18 +46,6 @@ public abstract class AbstractSparkJob implements Serializable {
dataset.write().option("compression", "gzip").mode(mode).json(outPath);
}
protected static String readFileFromHDFS(String filePath) throws IOException {
Path path=new Path(filePath);
FileSystem fs = FileSystem.get(new Configuration());
BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(path)));
try {
return String.join("", br.lines().collect(Collectors.toList()));
} finally {
br.close();
}
}
public static String readResource(String path, Class<? extends AbstractSparkJob> clazz) throws IOException {
return IOUtils.toString(clazz.getResourceAsStream(path));
}

View File

@ -26,7 +26,7 @@ public class SparkCountVectorizer extends AbstractSparkJob{
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
readResource("/jobs/parameters/countVectorizer_parameters.json", SparkTokenizer.class)
readResource("/jobs/parameters/countVectorizer_parameters.json", SparkCountVectorizer.class)
);
parser.parseArgument(args);

View File

@ -28,7 +28,7 @@ public class SparkCreateVocabulary extends AbstractSparkJob{
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
readResource("/jobs/parameters/createVocabulary_parameters.json", SparkTokenizer.class)
readResource("/jobs/parameters/createVocabulary_parameters.json", SparkCreateVocabulary.class)
);
parser.parseArgument(args);
@ -75,14 +75,7 @@ public class SparkCreateVocabulary extends AbstractSparkJob{
Dataset<Row> inputTokensDS = spark.read().load(workingPath + "/tokens").repartition(numPartitions);
CountVectorizerModel vocabulary;
if (vocabularyType.equals("file")) {
try {
vocabulary = FeatureTransformer.createVocabularyFromFile(Paths
.get(getClass().getResource("/eu/dnetlib/jobs/support/vocabulary_words.txt").toURI())
.toFile()
.getAbsolutePath());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
vocabulary = FeatureTransformer.createVocabularyFromFile();
}
else {
vocabulary = FeatureTransformer.createVocabularyFromTokens(inputTokensDS, minDF, minTF, vocabSize);

View File

@ -0,0 +1,73 @@
package eu.dnetlib.jobs;
import eu.dnetlib.featureextraction.FeatureTransformer;
import eu.dnetlib.support.ArgumentApplicationParser;
import org.apache.spark.SparkConf;
import org.apache.spark.ml.clustering.LDAModel;
import org.apache.spark.ml.clustering.LocalLDAModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
public class SparkLDAInference extends AbstractSparkJob{
private static final Logger log = LoggerFactory.getLogger(SparkLDAInference.class);
public SparkLDAInference(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
readResource("/jobs/parameters/ldaInference_parameters.json", SparkLDAInference.class)
);
parser.parseArgument(args);
SparkConf conf = new SparkConf();
new SparkLDAInference(
parser,
getSparkSession(conf)
).run();
}
@Override
public void run() throws IOException {
// read oozie parameters
final String workingPath = parser.get("workingPath");
final String outputPath = parser.get("outputPath");
final String ldaModelPath = parser.get("ldaModelPath");
final int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(NUM_PARTITIONS);
log.info("workingPath: '{}'", workingPath);
log.info("numPartitions: '{}'", numPartitions);
log.info("outputPath: '{}'", outputPath);
log.info("ldaModelPath: '{}'", ldaModelPath);
Dataset<Row> inputFeaturesDS = spark.read().load(workingPath + "/countVectorized");
LDAModel ldaModel = LocalLDAModel.load(ldaModelPath);
Dataset<Row> ldaTopicsDS = FeatureTransformer.ldaInference(inputFeaturesDS, ldaModel);
ldaTopicsDS
.write()
.mode(SaveMode.Overwrite)
.save(outputPath);
}
}

View File

@ -1,7 +1,7 @@
package eu.dnetlib.jobs;
import eu.dnetlib.featureextraction.FeatureTransformer;
import eu.dnetlib.featureextraction.util.Utilities;
import eu.dnetlib.util.Utilities;
import eu.dnetlib.support.ArgumentApplicationParser;
import org.apache.spark.SparkConf;
import org.apache.spark.ml.clustering.LDAModel;
@ -14,7 +14,6 @@ import scala.Tuple2;
import java.io.IOException;
import java.util.*;
import java.util.stream.Stream;
public class SparkLDATuning extends AbstractSparkJob{
@ -27,7 +26,7 @@ public class SparkLDATuning extends AbstractSparkJob{
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
readResource("/jobs/parameters/ldaTuning_parameters.json", SparkTokenizer.class)
readResource("/jobs/parameters/ldaTuning_parameters.json", SparkLDATuning.class)
);
parser.parseArgument(args);

View File

@ -1,16 +1,14 @@
package eu.dnetlib.jobs;
import eu.dnetlib.featureextraction.FeatureTransformer;
import eu.dnetlib.featureextraction.util.Utilities;
import eu.dnetlib.util.Utilities;
import eu.dnetlib.support.ArgumentApplicationParser;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.Feature;
import java.io.IOException;
import java.util.Optional;

View File

@ -0,0 +1,26 @@
[
{
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "path of the working directory",
"paramRequired": true
},
{
"paramName": "np",
"paramLongName": "numPartitions",
"paramDescription": "number of partitions for the similarity relations intermediate phases",
"paramRequired": false
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "location to store the output LDA inference",
"paramRequired": true
},
{
"paramName": "m",
"paramLongName": "ldaModelPath",
"paramDescription": "the LDA model to be used for the inference",
"paramRequired": true
}
]

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -0,0 +1,176 @@
<workflow-app name="LDA Tuning WF" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>entitiesPath</name>
<description>the input entity path</description>
</property>
<property>
<name>workingPath</name>
<description>path for the working directory</description>
</property>
<property>
<name>numPartitions</name>
<description>number of partitions for the spark files</description>
</property>
<property>
<name>outputPath</name>
<description>the output path to store the inference result</description>
</property>
<property>
<name>vocabularyPath</name>
<description>location of the vocabulary</description>
</property>
<property>
<name>ldaModelPath</name>
<description>location of the LDA model</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="resetWorkingPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="resetWorkingPath">
<fs>
<delete path="${workingPath}"/>
</fs>
<ok to="TokenizeData"/>
<error to="Kill"/>
</action>
<action name="TokenizeData">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Tokenize Data</name>
<class>eu.dnetlib.jobs.SparkTokenizer</class>
<jar>dnet-and-test-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=32
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.dynamicAllocation.enabled=false
</spark-opts>
<arg>--entitiesPath</arg><arg>${entitiesPath}</arg>
<arg>--inputFieldJPath</arg><arg>${inputFieldJPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--numPartitions</arg><arg>${numPartitions}</arg>
</spark>
<ok to="CreateCountVectors"/>
<error to="Kill"/>
</action>
<action name="CreateCountVectors">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Create Count Vectors</name>
<class>eu.dnetlib.jobs.SparkCountVectorizer</class>
<jar>dnet-and-test-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--vocabularyPath</arg><arg>${vocabularyPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--numPartitions</arg><arg>${numPartitions}</arg>
</spark>
<ok to="LDAInference"/>
<error to="Kill"/>
</action>
<action name="LDAInference">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>LDA Inference</name>
<class>eu.dnetlib.jobs.SparkLDAInference</class>
<jar>dnet-and-test-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--ldaModelPath</arg><arg>${ldaModelPath}</arg>
<arg>--numPartitions</arg><arg>${numPartitions}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -169,7 +169,7 @@
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Compute Statistics</name>
<name>Create Count Vectors</name>
<class>eu.dnetlib.jobs.SparkCountVectorizer</class>
<jar>dnet-and-test-${projectVersion}.jar</jar>
<spark-opts>

View File

@ -25,6 +25,7 @@ public class LDAAnalysisTest {
final static String tokensPath = workingPath + "/tokens";
final static String vocabularyPath = workingPath + "/vocabulary";
final static String bestLDAModelPath = workingPath + "/bestLDAmodel";
final static String outputPath = workingPath + "/ldaInferenceOutput";
final static String numPartitions = "20";
final String inputDataPath = Paths
.get(getClass().getResource("/eu/dnetlib/jobs/examples/publications.subset.json").toURI())
@ -52,10 +53,10 @@ public class LDAAnalysisTest {
context = JavaSparkContext.fromSparkContext(spark.sparkContext());
}
// @AfterAll
// public static void finalCleanUp() throws IOException {
// cleanup();
// }
@AfterAll
public static void finalCleanUp() throws IOException {
cleanup();
}
@Test
@Order(1)
@ -139,11 +140,52 @@ public class LDAAnalysisTest {
parser,
spark
).run();
}
@Test
@Order(5)
public void ldaInferenceTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(readResource("/jobs/parameters/ldaInference_parameters.json", SparkLDAInference.class));
parser.parseArgument(
new String[]{
"-w", workingPath,
"-np", numPartitions,
"-o", outputPath,
"-m", bestLDAModelPath
});
new SparkLDAInference(
parser,
spark
).run();
}
public static String readResource(String path, Class<? extends AbstractSparkJob> clazz) throws IOException {
return IOUtils.toString(clazz.getResourceAsStream(path));
}
// @Test
// public void createVocabulary() {
//
// StructType inputSchema = new StructType(new StructField[]{
// new StructField("id", DataTypes.StringType, false, Metadata.empty()),
// new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
// });
//
// JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
// JavaRDD<Row> rows = sc.textFile("/Users/miconis/Desktop/dewey").map(s -> s.substring(4)).map(s -> Utilities.normalize(s).replaceAll(" ", " ")).filter(s -> !s.contains("unassigned")).map(s -> RowFactory.create("id", s));
//
// Dataset<Row> dataFrame = spark.createDataFrame(rows, inputSchema);
//
// dataFrame = FeatureTransformer.tokenizeData(dataFrame);
//
// JavaRDD<String> map = dataFrame.toJavaRDD().map(r -> r.getList(1)).flatMap(l -> l.iterator()).map(s -> s.toString()).distinct();
//
// map.coalesce(1).saveAsTextFile("/tmp/vocab_raw");
// System.out.println("map = " + map.count());
// System.out.println("dataFrame = " + map.first());
// }
}

View File

@ -1,9 +0,0 @@
package eu.dnetlib.featureextraction.lda;
public class LDAModeler {
public static void main(String[] args) {
System.out.println("prova");
}
}

View File

@ -0,0 +1,2 @@
package eu.dnetlib.gnn;public class GNNModels {
}

View File

@ -0,0 +1,2 @@
package eu.dnetlib.gnn;public class MatrixProduct {
}

View File

@ -0,0 +1,2 @@
package eu.dnetlib.gnn.dataprocessing;public class DataProcessor {
}

View File

@ -16,10 +16,13 @@ import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.text.Normalizer;
import java.util.List;
import java.util.stream.Collectors;
public class Utilities implements Serializable {
@ -99,4 +102,16 @@ public class Utilities implements Serializable {
e.printStackTrace();
}
}
protected static String readFileFromHDFS(String filePath) throws IOException {
Path path=new Path(filePath);
FileSystem fs = FileSystem.get(new Configuration());
BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(path)));
try {
return String.join("", br.lines().collect(Collectors.toList()));
} finally {
br.close();
}
}
}