implementation of author extractor and lda threshold analysis
This commit is contained in:
parent
f4c7fc1c15
commit
88451e3832
|
@ -1,2 +1,2 @@
|
|||
# Sun Apr 02 15:21:44 CEST 2023
|
||||
# Thu Apr 13 16:22:22 CEST 2023
|
||||
projectPropertyKey=projectPropertyValue
|
||||
|
|
|
@ -1,10 +1,21 @@
|
|||
workingPath = /tmp/lda_working_dir
|
||||
#LDA TUNING
|
||||
#workingPath = /user/michele.debonis/lda_experiments/lda_working_dir2
|
||||
#numPartitions = 1000
|
||||
#entitiesPath = /tmp/publications_with_pid_pubmed
|
||||
#inputFieldJPath = $.description[0].value
|
||||
#vocabularyPath = /user/michele.debonis/lda_experiments/dewey_vocabulary
|
||||
#vocabularyType = file
|
||||
#trainRatio = 0.6
|
||||
#numTopics = 5,10,15,20,25,30,35,40,45,50,55,60,65,70,75,80,85,90,95,100
|
||||
#maxIterations = 300
|
||||
#outputModelPath = /user/michele.debonis/lda_experiments/lda_dewey2.model
|
||||
|
||||
#LDA INFERENCE
|
||||
numPartitions = 1000
|
||||
entitiesPath = /tmp/publications_with_pid_pubmed
|
||||
inputFieldJPath = $.description[0].value
|
||||
vocabularyPath = /tmp/lda_working_dir/essential_science_vocabulary
|
||||
vocabularyType = file
|
||||
trainRatio = 0.6
|
||||
numTopics = 5,10,15,20,25,30,35,40,45,50,55,60,65,70,75,80,85,90,95,100
|
||||
maxIterations = 300
|
||||
outputModelPath = /tmp/lda_working_dir/bestLdaModel
|
||||
vocabularyPath = /user/michele.debonis/lda_experiments/dewey_vocabulary
|
||||
entitiesPath = /tmp/publications_with_pid_pubmed
|
||||
workingPath = /user/michele.debonis/lda_experiments/lda_inference_working_dir
|
||||
ldaInferencePath = /user/michele.debonis/lda_experiments/publications_pubmed_topics
|
||||
ldaModelPath = /user/michele.debonis/lda_experiments/lda_dewey.model
|
||||
authorsPath = /user/michele.debonis/lda_experiments/authors_pubmed
|
|
@ -131,17 +131,23 @@
|
|||
<artifactId>json-path</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-junit-jupiter</artifactId>
|
||||
<scope>test</scope>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-schemas</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>org.mockito</groupId>-->
|
||||
<!-- <artifactId>mockito-core</artifactId>-->
|
||||
<!-- <scope>test</scope>-->
|
||||
<!-- </dependency>-->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>org.mockito</groupId>-->
|
||||
<!-- <artifactId>mockito-junit-jupiter</artifactId>-->
|
||||
<!-- <scope>test</scope>-->
|
||||
<!-- </dependency>-->
|
||||
|
||||
</dependencies>
|
||||
|
||||
<profiles>
|
||||
|
|
|
@ -0,0 +1,77 @@
|
|||
package eu.dnetlib.jobs;
|
||||
|
||||
import eu.dnetlib.support.ArgumentApplicationParser;
|
||||
import eu.dnetlib.support.Author;
|
||||
import eu.dnetlib.support.AuthorsFactory;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.ml.linalg.DenseVector;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
|
||||
public class SparkAuthorExtractor extends AbstractSparkJob{
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkAuthorExtractor.class);
|
||||
|
||||
public SparkAuthorExtractor(ArgumentApplicationParser parser, SparkSession spark) {
|
||||
super(parser, spark);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
readResource("/jobs/parameters/authorExtractor_parameters.json", SparkTokenizer.class)
|
||||
);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
new SparkAuthorExtractor(
|
||||
parser,
|
||||
getSparkSession(conf)
|
||||
).run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
// read oozie parameters
|
||||
final String topicsPath = parser.get("topicsPath");
|
||||
final String entitiesPath = parser.get("entitiesPath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
final String outputPath = parser.get("outputPath");
|
||||
final int numPartitions = Optional
|
||||
.ofNullable(parser.get("numPartitions"))
|
||||
.map(Integer::valueOf)
|
||||
.orElse(NUM_PARTITIONS);
|
||||
|
||||
log.info("entitiesPath: '{}'", entitiesPath);
|
||||
log.info("topicsPath: '{}'", topicsPath);
|
||||
log.info("workingPath: '{}'", workingPath);
|
||||
log.info("outputPath: '{}'", outputPath);
|
||||
log.info("numPartitions: '{}'", numPartitions);
|
||||
|
||||
//join publications with topics
|
||||
JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<String> entities = context.textFile(entitiesPath);
|
||||
|
||||
JavaPairRDD<String, DenseVector> topics = spark.read().load(topicsPath).toJavaRDD()
|
||||
.mapToPair(t -> new Tuple2<>(t.getString(0), (DenseVector) t.get(1)));
|
||||
|
||||
JavaRDD<Author> authors = AuthorsFactory.extractAuthorsFromPublications(entities, topics);
|
||||
|
||||
authors
|
||||
.map(a -> new ObjectMapper().writeValueAsString(a))
|
||||
.saveAsTextFile(outputPath, GzipCodec.class);
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,135 @@
|
|||
package eu.dnetlib.jobs;
|
||||
|
||||
import com.clearspring.analytics.util.Lists;
|
||||
import eu.dnetlib.featureextraction.Utilities;
|
||||
import eu.dnetlib.support.ArgumentApplicationParser;
|
||||
import eu.dnetlib.support.Author;
|
||||
import eu.dnetlib.support.AuthorsFactory;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaDoubleRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class SparkLDAAnalysis extends AbstractSparkJob {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkLDAAnalysis.class);
|
||||
|
||||
public SparkLDAAnalysis(ArgumentApplicationParser parser, SparkSession spark) {
|
||||
super(parser, spark);
|
||||
}
|
||||
public static void main(String[] args) throws Exception {
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
readResource("/jobs/parameters/ldaAnalysis_parameters.json", SparkLDATuning.class)
|
||||
);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
new SparkLDAAnalysis(
|
||||
parser,
|
||||
getSparkSession(conf)
|
||||
).run();
|
||||
}
|
||||
|
||||
@Override
|
||||
void run() throws IOException {
|
||||
// read oozie parameters
|
||||
final String authorsPath = parser.get("authorsPath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
final int numPartitions = Optional
|
||||
.ofNullable(parser.get("numPartitions"))
|
||||
.map(Integer::valueOf)
|
||||
.orElse(NUM_PARTITIONS);
|
||||
|
||||
log.info("authorsPath: '{}'", authorsPath);
|
||||
log.info("workingPath: '{}'", workingPath);
|
||||
log.info("numPartitions: '{}'", numPartitions);
|
||||
|
||||
JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Author> authors = context
|
||||
.textFile(authorsPath)
|
||||
.map(s -> new ObjectMapper().readValue(s, Author.class))
|
||||
.filter(a -> !a.getOrcid().isEmpty()); //don't need authors without orcid for the threshold analysis
|
||||
|
||||
JavaRDD<Tuple2<Boolean, Double>> groundTruthThreshold = authors
|
||||
.mapToPair(a -> new Tuple2<>(AuthorsFactory.getLNFI(a), a))
|
||||
.flatMapToPair(a -> a._1().stream().map(k -> new Tuple2<>(k, a._2())).collect(Collectors.toList()).iterator())
|
||||
.groupByKey()
|
||||
.flatMap(a -> thresholdAnalysis(a._2()));
|
||||
|
||||
JavaDoubleRDD groundTruthTrue = groundTruthThreshold.filter(Tuple2::_1).mapToDouble(Tuple2::_2);
|
||||
long totalPositives = groundTruthTrue.count();
|
||||
JavaDoubleRDD groundTruthFalse = groundTruthThreshold.filter(x -> !x._1()).mapToDouble(Tuple2::_2);
|
||||
long totalNegatives = groundTruthFalse.count();
|
||||
|
||||
double[] thresholds = new double[]{0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0};
|
||||
|
||||
List<String> stats = new ArrayList<>();
|
||||
stats.add("th,fp,fn,tp,tn,total_positives,total_negatives");
|
||||
for(double threshold: thresholds) {
|
||||
long truePositive = groundTruthTrue.filter(d -> d >= threshold).count();
|
||||
long falsePositive = groundTruthFalse.filter(d -> d >= threshold).count();
|
||||
long trueNegative = groundTruthFalse.filter(d -> d < threshold).count();
|
||||
long falseNegative = groundTruthTrue.filter(d -> d < threshold).count();
|
||||
|
||||
stats.add(threshold + "," + falsePositive + "," + falseNegative + "," + truePositive + "," + trueNegative + "," + totalPositives + "," + totalNegatives);
|
||||
}
|
||||
|
||||
Utilities.writeLinesToHDFSFile(stats, workingPath + "/threshold_analysis.csv");
|
||||
|
||||
}
|
||||
|
||||
public Iterator<Tuple2<Boolean, Double>> thresholdAnalysis(Iterable<Author> a) {
|
||||
List<Author> authors = Lists.newArrayList(a);
|
||||
|
||||
List<Tuple2<Boolean, Double>> results = new ArrayList<>();
|
||||
int i = 0;
|
||||
int j = 1;
|
||||
while(i < authors.size()) {
|
||||
|
||||
while(j < authors.size()) {
|
||||
boolean bRes;
|
||||
if(authors.get(i).getOrcid().isEmpty() || authors.get(j).getOrcid().isEmpty())
|
||||
bRes = false;
|
||||
else {
|
||||
bRes = authors.get(i).getOrcid().equals(authors.get(j).getOrcid());
|
||||
}
|
||||
results.add(new Tuple2<>(bRes, cosineSimilarity(authors.get(i).getTopics(), authors.get(j).getTopics())));
|
||||
j++;
|
||||
}
|
||||
i++;
|
||||
j=i+1;
|
||||
}
|
||||
|
||||
return results.iterator();
|
||||
}
|
||||
|
||||
double cosineSimilarity(double[] a, double[] b) {
|
||||
double dotProduct = 0;
|
||||
double normASum = 0;
|
||||
double normBSum = 0;
|
||||
|
||||
for(int i = 0; i < a.length; i ++) {
|
||||
dotProduct += a[i] * b[i];
|
||||
normASum += a[i] * a[i];
|
||||
normBSum += b[i] * b[i];
|
||||
}
|
||||
|
||||
double eucledianDist = Math.sqrt(normASum) * Math.sqrt(normBSum);
|
||||
return dotProduct / eucledianDist;
|
||||
}
|
||||
}
|
|
@ -1,7 +1,7 @@
|
|||
package eu.dnetlib.jobs;
|
||||
|
||||
import eu.dnetlib.featureextraction.FeatureTransformer;
|
||||
import eu.dnetlib.util.Utilities;
|
||||
import eu.dnetlib.featureextraction.Utilities;
|
||||
import eu.dnetlib.support.ArgumentApplicationParser;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.ml.clustering.LDAModel;
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package eu.dnetlib.jobs;
|
||||
|
||||
import eu.dnetlib.featureextraction.FeatureTransformer;
|
||||
import eu.dnetlib.util.Utilities;
|
||||
import eu.dnetlib.featureextraction.Utilities;
|
||||
import eu.dnetlib.support.ArgumentApplicationParser;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
[
|
||||
{
|
||||
"paramName": "w",
|
||||
"paramLongName": "workingPath",
|
||||
"paramDescription": "location of the working directory",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "e",
|
||||
"paramLongName": "entitiesPath",
|
||||
"paramDescription": "location of the input entities",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "np",
|
||||
"paramLongName": "numPartitions",
|
||||
"paramDescription": "number of partitions for the similarity relations intermediate phases",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "o",
|
||||
"paramLongName": "outputPath",
|
||||
"paramDescription": "location of the output author extracted",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "t",
|
||||
"paramLongName": "topicsPath",
|
||||
"paramDescription": "location of the lda topics",
|
||||
"paramRequired": false
|
||||
}
|
||||
]
|
|
@ -0,0 +1,20 @@
|
|||
[
|
||||
{
|
||||
"paramName": "w",
|
||||
"paramLongName": "workingPath",
|
||||
"paramDescription": "path of the working directory",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "np",
|
||||
"paramLongName": "numPartitions",
|
||||
"paramDescription": "number of partitions for the similarity relations intermediate phases",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "i",
|
||||
"paramLongName": "authorsPath",
|
||||
"paramDescription": "location of the authors",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -1,4 +1,4 @@
|
|||
<workflow-app name="LDA Tuning WF" xmlns="uri:oozie:workflow:0.5">
|
||||
<workflow-app name="LDA Inference WF" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>entitiesPath</name>
|
||||
|
@ -13,7 +13,7 @@
|
|||
<description>number of partitions for the spark files</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>outputPath</name>
|
||||
<name>ldaInferencePath</name>
|
||||
<description>the output path to store the inference result</description>
|
||||
</property>
|
||||
<property>
|
||||
|
@ -24,6 +24,10 @@
|
|||
<name>ldaModelPath</name>
|
||||
<description>location of the LDA model</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>authorsPath</name>
|
||||
<description>location of the authors</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
|
@ -164,10 +168,64 @@
|
|||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||
<arg>--outputPath</arg><arg>${ldaInferencePath}</arg>
|
||||
<arg>--ldaModelPath</arg><arg>${ldaModelPath}</arg>
|
||||
<arg>--numPartitions</arg><arg>${numPartitions}</arg>
|
||||
</spark>
|
||||
<ok to="AuthorExtraction"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="AuthorExtraction">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>LDA Inference</name>
|
||||
<class>eu.dnetlib.jobs.SparkAuthorExtractor</class>
|
||||
<jar>dnet-and-test-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--entitiesPath</arg><arg>${entitiesPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--outputPath</arg><arg>${authorsPath}</arg>
|
||||
<arg>--numPartitions</arg><arg>${numPartitions}</arg>
|
||||
<arg>--topicsPath</arg><arg>${ldaInferencePath}</arg>
|
||||
</spark>
|
||||
<ok to="ThresholdAnalysis"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ThresholdAnalysis">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>LDA Threshold Analysis</name>
|
||||
<class>eu.dnetlib.jobs.SparkLDAAnalysis</class>
|
||||
<jar>dnet-and-test-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--num-executors=32
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.dynamicAllocation.enabled=false
|
||||
</spark-opts>
|
||||
<arg>--authorsPath</arg><arg>${authorsPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--numPartitions</arg><arg>${numPartitions}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
|
|
@ -6,15 +6,12 @@ import org.apache.commons.io.IOUtils;
|
|||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||
public class LDAAnalysisTest {
|
||||
|
@ -25,7 +22,8 @@ public class LDAAnalysisTest {
|
|||
final static String tokensPath = workingPath + "/tokens";
|
||||
final static String vocabularyPath = workingPath + "/vocabulary";
|
||||
final static String bestLDAModelPath = workingPath + "/bestLDAmodel";
|
||||
final static String outputPath = workingPath + "/ldaInferenceOutput";
|
||||
final static String topicsPath = workingPath + "/ldaInferenceOutput";
|
||||
final static String authorsPath = workingPath + "/authors";
|
||||
final static String numPartitions = "20";
|
||||
final String inputDataPath = Paths
|
||||
.get(getClass().getResource("/eu/dnetlib/jobs/examples/publications.subset.json").toURI())
|
||||
|
@ -151,7 +149,7 @@ public class LDAAnalysisTest {
|
|||
new String[]{
|
||||
"-w", workingPath,
|
||||
"-np", numPartitions,
|
||||
"-o", outputPath,
|
||||
"-o", topicsPath,
|
||||
"-m", bestLDAModelPath
|
||||
});
|
||||
|
||||
|
@ -161,6 +159,47 @@ public class LDAAnalysisTest {
|
|||
).run();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(6)
|
||||
public void authorExtractorTest() throws Exception {
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(readResource("/jobs/parameters/authorExtractor_parameters.json", SparkLDAInference.class));
|
||||
|
||||
parser.parseArgument(
|
||||
new String[]{
|
||||
"-e", inputDataPath,
|
||||
"-o", authorsPath,
|
||||
"-t", topicsPath,
|
||||
"-w", workingPath,
|
||||
"-np", numPartitions
|
||||
});
|
||||
|
||||
new SparkAuthorExtractor(
|
||||
parser,
|
||||
spark
|
||||
).run();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(7)
|
||||
public void ldaAnalysis() throws Exception {
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(readResource("/jobs/parameters/ldaAnalysis_parameters.json", SparkLDAAnalysis.class));
|
||||
|
||||
parser.parseArgument(
|
||||
new String[]{
|
||||
"-i", authorsPath,
|
||||
"-w", workingPath,
|
||||
"-np", numPartitions
|
||||
});
|
||||
|
||||
new SparkLDAAnalysis(
|
||||
parser,
|
||||
spark
|
||||
).run();
|
||||
|
||||
Thread.sleep(1000000000);
|
||||
|
||||
}
|
||||
|
||||
public static String readResource(String path, Class<? extends AbstractSparkJob> clazz) throws IOException {
|
||||
return IOUtils.toString(clazz.getResourceAsStream(path));
|
||||
}
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -35,19 +35,22 @@
|
|||
<groupId>com.jayway.jsonpath</groupId>
|
||||
<artifactId>json-path</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.deeplearning4j</groupId>
|
||||
<artifactId>dl4j-spark-parameterserver_2.11</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.deeplearning4j</groupId>
|
||||
<artifactId>dl4j-spark_2.11</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.nd4j</groupId>
|
||||
<artifactId>${nd4j.backend}</artifactId>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-schemas</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.ibm.icu</groupId>
|
||||
<artifactId>icu4j</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -1,7 +1,9 @@
|
|||
package eu.dnetlib.featureextraction.util;
|
||||
package eu.dnetlib.featureextraction;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import net.minidev.json.JSONArray;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -15,6 +17,7 @@ import org.apache.spark.sql.types.DataTypes;
|
|||
import org.apache.spark.sql.types.Metadata;
|
||||
import org.apache.spark.sql.types.StructField;
|
||||
import org.apache.spark.sql.types.StructType;
|
||||
import com.ibm.icu.text.Transliterator;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
|
@ -22,11 +25,15 @@ import java.io.InputStreamReader;
|
|||
import java.io.Serializable;
|
||||
import java.text.Normalizer;
|
||||
import java.util.List;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class Utilities implements Serializable {
|
||||
|
||||
public static String DATA_ID_FIELD = "$.id";
|
||||
private static final String aliases_from = "⁰¹²³⁴⁵⁶⁷⁸⁹⁺⁻⁼⁽⁾ⁿ₀₁₂₃₄₅₆₇₈₉₊₋₌₍₎àáâäæãåāèéêëēėęəîïíīįìôöòóœøōõûüùúūßśšłžźżçćčñń";
|
||||
private static final String aliases_to = "0123456789+-=()n0123456789+-=()aaaaaaaaeeeeeeeeiiiiiioooooooouuuuussslzzzcccnn";
|
||||
|
||||
static StructType inputSchema = new StructType(new StructField[]{
|
||||
new StructField("id", DataTypes.StringType, false, Metadata.empty()),
|
||||
|
@ -64,13 +71,13 @@ public class Utilities implements Serializable {
|
|||
}
|
||||
}
|
||||
|
||||
public static String normalize(final String s) {
|
||||
return Normalizer.normalize(s, Normalizer.Form.NFD)
|
||||
.replaceAll("[^\\w\\s-]", "") // Remove all non-word, non-space or non-dash characters
|
||||
.replace('-', ' ') // Replace dashes with spaces
|
||||
.trim() // trim leading/trailing whitespace (including what used to be leading/trailing dashes)
|
||||
.toLowerCase(); // Lowercase the final results
|
||||
}
|
||||
// public static String normalize(final String s) {
|
||||
// return Normalizer.normalize(s, Normalizer.Form.NFD)
|
||||
// .replaceAll("[^\\w\\s-]", "") // Remove all non-word, non-space or non-dash characters
|
||||
// .replace('-', ' ') // Replace dashes with spaces
|
||||
// .trim() // trim leading/trailing whitespace (including what used to be leading/trailing dashes)
|
||||
// .toLowerCase(); // Lowercase the final results
|
||||
// }
|
||||
|
||||
public static void writeLinesToHDFSFile(List<String> lines, String filePath) throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
|
@ -103,15 +110,37 @@ public class Utilities implements Serializable {
|
|||
}
|
||||
}
|
||||
|
||||
protected static String readFileFromHDFS(String filePath) throws IOException {
|
||||
public static String unicodeNormalization(final String s) {
|
||||
|
||||
Path path=new Path(filePath);
|
||||
FileSystem fs = FileSystem.get(new Configuration());
|
||||
BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(path)));
|
||||
try {
|
||||
return String.join("", br.lines().collect(Collectors.toList()));
|
||||
} finally {
|
||||
br.close();
|
||||
Matcher m = Pattern.compile("\\\\u(\\p{XDigit}{4})").matcher(s);
|
||||
StringBuffer buf = new StringBuffer(s.length());
|
||||
while (m.find()) {
|
||||
String ch = String.valueOf((char) Integer.parseInt(m.group(1), 16));
|
||||
m.appendReplacement(buf, Matcher.quoteReplacement(ch));
|
||||
}
|
||||
m.appendTail(buf);
|
||||
return buf.toString();
|
||||
}
|
||||
|
||||
|
||||
public static String normalize(final String s) {
|
||||
return fixAliases(Normalizer.normalize(unicodeNormalization(s), Normalizer.Form.NFD))
|
||||
// do not compact the regexes in a single expression, would cause StackOverflowError in case of large input strings
|
||||
.replaceAll("[^ \\w]+", "")
|
||||
.replaceAll("(\\p{InCombiningDiacriticalMarks})+", "")
|
||||
.replaceAll("(\\p{Punct})+", " ")
|
||||
.replaceAll("(\\d)+", " ")
|
||||
.replaceAll("(\\n)+", " ")
|
||||
.toLowerCase()
|
||||
.trim();
|
||||
}
|
||||
|
||||
protected static String fixAliases(final String s) {
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
for (final char ch : Lists.charactersOf(s)) {
|
||||
final int i = StringUtils.indexOf(aliases_from, ch);
|
||||
sb.append(i >= 0 ? aliases_to.charAt(i) : ch);
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
|
@ -1,2 +0,0 @@
|
|||
package eu.dnetlib.gnn;public class GNNModels {
|
||||
}
|
|
@ -1,2 +0,0 @@
|
|||
package eu.dnetlib.gnn;public class MatrixProduct {
|
||||
}
|
|
@ -1,2 +0,0 @@
|
|||
package eu.dnetlib.gnn.dataprocessing;public class DataProcessor {
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
package eu.dnetlib.support;
|
||||
|
||||
import org.apache.spark.ml.linalg.DenseVector;
|
||||
import org.codehaus.jackson.annotate.JsonIgnore;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
public class Author implements Serializable {
|
||||
|
||||
public String fullname;
|
||||
public String firstname;
|
||||
public String lastname;
|
||||
public List<CoAuthor> coAuthors;
|
||||
public double[] topics;
|
||||
public String orcid;
|
||||
public String id;
|
||||
|
||||
public String pubId;
|
||||
|
||||
public Author() {
|
||||
}
|
||||
|
||||
public Author(String fullname, String firstname, String lastname, List<CoAuthor> coAuthors, double[] topics, String id, String pubId, String orcid) {
|
||||
this.fullname = fullname;
|
||||
this.firstname = firstname;
|
||||
this.lastname = lastname;
|
||||
this.coAuthors = coAuthors;
|
||||
this.topics = topics;
|
||||
this.id = id;
|
||||
this.pubId = pubId;
|
||||
this.orcid = orcid;
|
||||
}
|
||||
|
||||
public String getFullname() {
|
||||
return fullname;
|
||||
}
|
||||
|
||||
public void setFullname(String fullname) {
|
||||
this.fullname = fullname;
|
||||
}
|
||||
|
||||
public String getFirstname() {
|
||||
return firstname;
|
||||
}
|
||||
|
||||
public void setFirstname(String firstname) {
|
||||
this.firstname = firstname;
|
||||
}
|
||||
|
||||
public String getLastname() {
|
||||
return lastname;
|
||||
}
|
||||
|
||||
public void setLastname(String lastname) {
|
||||
this.lastname = lastname;
|
||||
}
|
||||
|
||||
public List<CoAuthor> getCoAuthors() {
|
||||
return coAuthors;
|
||||
}
|
||||
|
||||
public void setCoAuthors(List<CoAuthor> coAuthors) {
|
||||
this.coAuthors = coAuthors;
|
||||
}
|
||||
|
||||
public double[] getTopics() {
|
||||
return topics;
|
||||
}
|
||||
|
||||
public void setTopics(double[] topics) {
|
||||
this.topics = topics;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public String getPubId() {
|
||||
return pubId;
|
||||
}
|
||||
|
||||
public void setPubId(String pubId) {
|
||||
this.pubId = pubId;
|
||||
}
|
||||
|
||||
public String getOrcid() {
|
||||
return orcid;
|
||||
}
|
||||
|
||||
public void setOrcid(String orcid) {
|
||||
this.orcid = orcid;
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isAccurate() {
|
||||
return ((this.firstname != null) && (this.lastname != null) && !firstname.isEmpty() && !lastname.isEmpty());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,117 @@
|
|||
package eu.dnetlib.support;
|
||||
|
||||
import com.clearspring.analytics.util.Lists;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.featureextraction.Utilities;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.ml.linalg.DenseVector;
|
||||
import org.codehaus.jackson.map.DeserializationConfig;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import scala.Tuple2;
|
||||
|
||||
import javax.rmi.CORBA.Util;
|
||||
import java.math.BigInteger;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class AuthorsFactory {
|
||||
|
||||
public static JavaRDD<Author> extractAuthorsFromPublications(JavaRDD<String> entities, JavaPairRDD<String, DenseVector> topics) {
|
||||
|
||||
JavaPairRDD<Publication, DenseVector> publicationWithTopics = entities.map(x -> new ObjectMapper().configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false).readValue(x, Publication.class))
|
||||
.mapToPair(p -> new Tuple2<>(p.getId(), p))
|
||||
.join(topics)
|
||||
.mapToPair(Tuple2::_2);
|
||||
|
||||
return publicationWithTopics.flatMap(p -> createAuthors(p));
|
||||
|
||||
}
|
||||
|
||||
public static Iterator<Author> createAuthors(Tuple2<Publication, DenseVector> publicationWithTopic){
|
||||
List<CoAuthor> baseCoAuthors = publicationWithTopic._1()
|
||||
.getAuthor()
|
||||
.stream()
|
||||
.map(a -> new CoAuthor(a.getFullname(), a.getName()!=null?a.getName():"", a.getSurname()!=null?a.getSurname():"", a.getPid().size()>0? a.getPid().get(0).getValue():""))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
List<Author> authors = new ArrayList<>();
|
||||
for(eu.dnetlib.dhp.schema.oaf.Author a : publicationWithTopic._1().getAuthor()) {
|
||||
|
||||
//prepare orcid
|
||||
String orcid = a.getPid().size()>0? a.getPid().get(0).getValue() : "";
|
||||
//prepare coauthors
|
||||
List<CoAuthor> coAuthors = Lists.newArrayList(baseCoAuthors);
|
||||
coAuthors.remove(new CoAuthor(a.getFullname(), a.getName() != null ? a.getName() : "", a.getSurname() != null ? a.getSurname() : "", a.getPid().size() > 0 ? a.getPid().get(0).getValue() : ""));
|
||||
|
||||
//prepare raw author id
|
||||
String id = "author::" + getMd5(a.getFullname().concat(publicationWithTopic._1().getId()));
|
||||
|
||||
authors.add(new Author(a.getFullname(), a.getName(), a.getSurname(), coAuthors, publicationWithTopic._2().toArray(), id, publicationWithTopic._1().getId(), orcid));
|
||||
}
|
||||
|
||||
return authors.iterator();
|
||||
}
|
||||
|
||||
public static String getMd5(String input)
|
||||
{
|
||||
try {
|
||||
|
||||
// Static getInstance method is called with hashing MD5
|
||||
MessageDigest md = MessageDigest.getInstance("MD5");
|
||||
|
||||
// digest() method is called to calculate message digest
|
||||
// of an input digest() return array of byte
|
||||
byte[] messageDigest = md.digest(input.getBytes());
|
||||
|
||||
// Convert byte array into signum representation
|
||||
BigInteger no = new BigInteger(1, messageDigest);
|
||||
|
||||
// Convert message digest into hex value
|
||||
String hashtext = no.toString(16);
|
||||
while (hashtext.length() < 32) {
|
||||
hashtext = "0" + hashtext;
|
||||
}
|
||||
return hashtext;
|
||||
}
|
||||
|
||||
// For specifying wrong message digest algorithms
|
||||
catch (NoSuchAlgorithmException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static List<String> getLNFI(Author a) {
|
||||
final List<String> res = Lists.newArrayList();
|
||||
|
||||
if (a.isAccurate()) {
|
||||
String lastName = Utilities.normalize(a.getLastname());
|
||||
String firstName = Utilities.normalize(a.getFirstname());
|
||||
String firstInitial = firstName.length()>0? firstName.substring(0,1) : "";
|
||||
|
||||
res.add(firstInitial.concat(lastName));
|
||||
}
|
||||
else { // is not accurate, meaning it has no defined name and surname
|
||||
List<String> fullname = Arrays.asList(Utilities.normalize(a.getFullname()).split(" "));
|
||||
if (fullname.size() == 1) {
|
||||
res.add(Utilities.normalize(a.getFullname()).toLowerCase());
|
||||
}
|
||||
else if (fullname.size() == 2) {
|
||||
res.add(fullname.get(0).substring(0,1).concat(fullname.get(1)).toLowerCase());
|
||||
res.add(fullname.get(1).substring(0,1).concat(fullname.get(0)).toLowerCase());
|
||||
}
|
||||
else {
|
||||
res.add(fullname.get(0).substring(0,1).concat(fullname.get(fullname.size()-1)).toLowerCase());
|
||||
res.add(fullname.get(fullname.size()-1).substring(0,1).concat(fullname.get(0)).toLowerCase());
|
||||
}
|
||||
}
|
||||
|
||||
return res.stream().map(k -> k.replaceAll(" ","")).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
package eu.dnetlib.support;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class CoAuthor implements Serializable {
|
||||
|
||||
public String fullname;
|
||||
public String lastname;
|
||||
public String firstname;
|
||||
public String orcid;
|
||||
|
||||
public CoAuthor(String fullname, String firstname, String lastname, String orcid) {
|
||||
this.fullname = fullname;
|
||||
this.lastname = lastname;
|
||||
this.firstname = firstname;
|
||||
this.orcid = orcid;
|
||||
}
|
||||
|
||||
public CoAuthor() {
|
||||
}
|
||||
|
||||
public String getFullname() {
|
||||
return fullname;
|
||||
}
|
||||
|
||||
public void setFullname(String fullname) {
|
||||
this.fullname = fullname;
|
||||
}
|
||||
|
||||
public String getLastname() {
|
||||
return lastname;
|
||||
}
|
||||
|
||||
public void setLastname(String lastname) {
|
||||
this.lastname = lastname;
|
||||
}
|
||||
|
||||
public String getFirstname() {
|
||||
return firstname;
|
||||
}
|
||||
|
||||
public void setFirstname(String firstname) {
|
||||
this.firstname = firstname;
|
||||
}
|
||||
|
||||
public String getOrcid() {
|
||||
return orcid;
|
||||
}
|
||||
|
||||
public void setOrcid(String orcid) {
|
||||
this.orcid = orcid;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
|
||||
if (o == this)
|
||||
return true;
|
||||
|
||||
if (!(o instanceof CoAuthor))
|
||||
return false;
|
||||
|
||||
CoAuthor c = (CoAuthor) o;
|
||||
|
||||
return this.fullname.concat(this.firstname).concat(this.lastname).concat(this.orcid)
|
||||
.equals(c.fullname.concat(c.firstname).concat(c.lastname).concat(c.orcid));
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
import eu.dnetlib.featureextraction.Utilities;
|
||||
import eu.dnetlib.support.Author;
|
||||
import eu.dnetlib.support.AuthorsFactory;
|
||||
import eu.dnetlib.support.CoAuthor;
|
||||
import org.apache.avro.TestAnnotation;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.lang.annotation.Target;
|
||||
import java.util.ArrayList;
|
||||
|
||||
public class UtilityTest {
|
||||
|
||||
@Test
|
||||
public void normalizeTest(){
|
||||
String cicciopasticcio = Utilities.normalize("cicciÒ pasticcio");
|
||||
|
||||
System.out.println("cicciopasticcio = " + cicciopasticcio);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void equalTest(){
|
||||
System.out.println("true = " + "".equals(""));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void lnfiTest() throws Exception {
|
||||
Author a = new Author("De Bonis, Michele", "Æ", "De Bonis", new ArrayList<CoAuthor>(), new double[]{0.0, 1.0}, "author::id", "pub::id", "orcid");
|
||||
System.out.println("a = " + a.isAccurate());
|
||||
System.out.println(AuthorsFactory.getLNFI(a));
|
||||
|
||||
}
|
||||
}
|
55
pom.xml
55
pom.xml
|
@ -192,24 +192,24 @@
|
|||
</executions>
|
||||
</plugin>
|
||||
|
||||
<!-- <plugin>-->
|
||||
<!-- <groupId>org.apache.maven.plugins</groupId>-->
|
||||
<!-- <artifactId>maven-plugin-plugin</artifactId>-->
|
||||
<!-- <version>3.7.1</version>-->
|
||||
<!-- <configuration>-->
|
||||
<!-- <!– see http://jira.codehaus.org/browse/MNG-5346 –>-->
|
||||
<!-- <skipErrorNoDescriptorsFound>true</skipErrorNoDescriptorsFound>-->
|
||||
<!-- </configuration>-->
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-plugin-plugin</artifactId>
|
||||
<version>3.7.1</version>
|
||||
<configuration>
|
||||
<!-- see http://jira.codehaus.org/browse/MNG-5346 -->
|
||||
<skipErrorNoDescriptorsFound>true</skipErrorNoDescriptorsFound>
|
||||
</configuration>
|
||||
|
||||
<!-- <executions>-->
|
||||
<!-- <execution>-->
|
||||
<!-- <id>mojo-descriptor</id>-->
|
||||
<!-- <goals>-->
|
||||
<!-- <goal>descriptor</goal>-->
|
||||
<!-- </goals>-->
|
||||
<!-- </execution>-->
|
||||
<!-- </executions>-->
|
||||
<!-- </plugin>-->
|
||||
<executions>
|
||||
<execution>
|
||||
<id>mojo-descriptor</id>
|
||||
<goals>
|
||||
<goal>descriptor</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
|
@ -433,15 +433,24 @@
|
|||
<artifactId>json-path</artifactId>
|
||||
<version>2.4.0</version>
|
||||
</dependency>
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>com.ibm.icu</groupId>-->
|
||||
<!-- <artifactId>icu4j</artifactId>-->
|
||||
<!-- <version>70.1</version>-->
|
||||
<!-- </dependency>-->
|
||||
|
||||
<dependency>
|
||||
<groupId>com.ibm.icu</groupId>
|
||||
<artifactId>icu4j</artifactId>
|
||||
<version>70.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-schemas</artifactId>
|
||||
<version>2.10.29</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
</dependencyManagement>
|
||||
|
||||
<profiles>
|
||||
</profiles>
|
||||
</project>
|
||||
</project>
|
Loading…
Reference in New Issue