minor changes

This commit is contained in:
Michele De Bonis 2025-03-17 10:03:35 +01:00
parent 764936a201
commit f1e44b4765
3 changed files with 34 additions and 122 deletions

View File

@ -5,56 +5,9 @@
</component>
<component name="ChangeListManager">
<list default="true" id="67528cef-c396-4288-9c84-8cedea88df9c" name="Changes" comment="">
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkEmbeddings.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkEntityInference.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkNodeEmbeddings.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkPartitionedClustering.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkProximityClustering.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkRandomWalks.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/AttributeParam.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/AttributeType.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/LLMModel.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/RAiDEntity.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/StrategyType.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/jobs/parameters/createRandomWalks_parameters.json" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/jobs/parameters/entityInference_parameters.json" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/jobs/parameters/proximityClustering_parameters.json" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/llm/grammar.gbnf" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/raid_deepwalk/oozie_app/config-default.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/raid_deepwalk/oozie_app/workflow.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/test/java/eu/dnetlib/raid/SimplifiedDocumentsRAiDInferenceTest.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/test/java/eu/dnetlib/raid/SupportTest.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/config/cluster.refine.tree.conf.json" afterDir="false" />
<change afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/config/raid.deepwalk.conf.json" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/job-override.properties" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/job-override.properties" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/pom.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/AbstractSparkJob.java" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/AbstractSparkJob.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkClustering.java" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateDocuments.java" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateDocuments.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateDocumentsGraphFrames.java" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateDocumentsTesting.java" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkCreateEmbeddings.java" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/ClusteringAlgorithms.java" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/ClusteringAlgorithms.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/DataPoint.java" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/DataPoint.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/EdgeParam.java" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/EdgeParam.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/RAiDConfig.java" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/support/RAiDConfig.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/resources/jobs/parameters/createClusters_parameters.json" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/jobs/parameters/createClusters_parameters.json" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/resources/jobs/parameters/createDocuments_parameters.json" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/jobs/parameters/createDocuments_parameters.json" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/resources/jobs/parameters/createEmbeddings_parameters.json" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/jobs/parameters/createEmbeddings_parameters.json" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/resources/raid/oozie_app/workflow.xml" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/resources/raid/oozie_app/workflow.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/scala/eu/dnetlib/raid/graph/GraphOps.scala" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/scala/eu/dnetlib/raid/graph/GraphOps.scala" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/scala/eu/dnetlib/raid/graph/GraphUtil.scala" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/scala/eu/dnetlib/raid/graph/GraphUtil.scala" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/scala/eu/dnetlib/raid/walker/RandomWalk.scala" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/test/java/eu/dnetlib/raid/RAiDInferenceTest.java" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/test/java/eu/dnetlib/raid/DeepWalkRAiDInferenceTest.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/config/raid.conf.json" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/config/raid.conf.json" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/dataset" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/dataset" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/organization" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/organization" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/project" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/project" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/publication" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/publication" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/relation" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/relation" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/software" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/test/resources/eu/dnetlib/raid/examples/graph/software" afterDir="false" />
<change beforePath="$PROJECT_DIR$/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/pom.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkEntityInference.java" beforeDir="false" afterPath="$PROJECT_DIR$/dhp-raid/src/main/java/eu/dnetlib/raid/jobs/SparkEntityInference.java" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
@ -244,7 +197,7 @@
<workItem from="1736416293067" duration="16994000" />
<workItem from="1736494459106" duration="119612000" />
<workItem from="1737360024245" duration="260125000" />
<workItem from="1738702901208" duration="171759000" />
<workItem from="1738702901208" duration="175880000" />
</task>
<servers />
</component>

View File

@ -1,6 +1,6 @@
# general
graphBasePath = /tmp/prod_provision/graph/12_graph_blacklisted
numPartitions = 200
numPartitions = 400
modelPath = /user/michele.debonis/Meta-Llama-3.1-8B-Instruct-Q6_K.gguf
# documents raid
raidConfPath = /user/michele.debonis/raid.conf.json

View File

@ -1,21 +1,15 @@
package eu.dnetlib.raid.jobs;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.kherud.llama.LlamaModel;
import eu.dnetlib.pace.util.MapDocumentUtil;
import eu.dnetlib.raid.support.ArgumentApplicationParser;
import eu.dnetlib.raid.support.LLMModel;
import eu.dnetlib.raid.support.RAiDConfig;
import eu.dnetlib.raid.support.RAiDEntity;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
@ -26,16 +20,18 @@ import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.spark.sql.functions.*;
import java.io.File;
import java.io.IOException;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.spark.sql.functions.*;
public class SparkEntityInference extends AbstractSparkJob{
private static final Logger log = LoggerFactory.getLogger(SparkEntityInference.class);
@ -94,31 +90,31 @@ public class SparkEntityInference extends AbstractSparkJob{
final RAiDConfig config = loadRAiDConfig(raidConfPath);
final ObjectMapper mapper = new ObjectMapper();
// Dataset<Row> entities = prepareEntities(graphBasePath, config);
// Dataset<Row> clusters = spark.read().load(clustersPath)
// .withColumn("count", count("*").over(Window.partitionBy(CLUSTER_ID_COL)))
// .filter(
// col("count").between(
// config.getParams().getOrDefault("raidMinSize", 2).intValue(),
// config.getParams().getOrDefault("raidMaxSize", 5).intValue()
// )
// )
// .drop("count");
//
// Dataset<Row> rawEntities = clusters
// .join(entities, clusters.col(STRING_ID_COL).equalTo(entities.col(STRING_ID_COL)))
// .groupBy(clusters.col(CLUSTER_ID_COL))
// .agg(
// collect_list(entities.col(STRING_ID_COL)).alias("ids"),
// collect_list(entities.col("description")).alias("descriptions"),
// collect_list(entities.col("title")).alias("titles"),
// collect_list(entities.col("date")).alias("dates")
// )
// .select(CLUSTER_ID_COL, "ids", "titles", "descriptions", "dates")
// .cache();
//
// // checkpoint
// rawEntities.write().save(workingPath + "/raw_raids");
Dataset<Row> entities = prepareEntities(graphBasePath, config);
Dataset<Row> clusters = spark.read().load(clustersPath)
.withColumn("count", count("*").over(Window.partitionBy(CLUSTER_ID_COL)))
.filter(
col("count").between(
config.getParams().getOrDefault("raidMinSize", 2).intValue(),
config.getParams().getOrDefault("raidMaxSize", 5).intValue()
)
)
.drop("count");
Dataset<Row> rawEntities = clusters
.join(entities, clusters.col(STRING_ID_COL).equalTo(entities.col(STRING_ID_COL)))
.groupBy(clusters.col(CLUSTER_ID_COL))
.agg(
collect_list(entities.col(STRING_ID_COL)).alias("ids"),
collect_list(entities.col("description")).alias("descriptions"),
collect_list(entities.col("title")).alias("titles"),
collect_list(entities.col("date")).alias("dates")
)
.select(CLUSTER_ID_COL, "ids", "titles", "descriptions", "dates")
.cache();
// checkpoint
rawEntities.write().save(workingPath + "/raw_raids");
JavaRDD<RAiDEntity> raidEntities = spark.read().load(workingPath + "/raw_raids")
.withColumn("titles", expr("slice(titles, 1, 10)")) // limit the number of titles //TODO pick the most relevant
@ -202,29 +198,6 @@ public class SparkEntityInference extends AbstractSparkJob{
}
// public static String preparePrompt(List<String> titles, List<String> descriptions){
//
// // limit the lists to prevent OOM //TODO possible improvement, pick the most relevant instead of the firsts
// titles = titles.size()>5? titles.subList(0,4) : titles;
// titles = titles.stream()
// .map(title -> title.length() > 50 ? title.substring(0, 50) + "." : title)
// .collect(Collectors.toList());
//
// descriptions = descriptions.size()>3? descriptions.subList(0,2) : descriptions;
// descriptions = descriptions.stream()
// .map(desc -> desc.length() > 200 ? desc.substring(0, 200) + "." : desc)
// .collect(Collectors.toList());
//
// String prompt = "Titles: [ ";
// prompt += String.join(", ", titles);
// prompt += "]\nDescriptions: [ ";
// prompt += String.join(", ", descriptions);
// prompt += "]\n";
//
// return prompt;
//
// }
public Dataset<Row> prepareEntities(String graphBasePath, RAiDConfig config) {
List<Dataset<Row>> entitiesList = new ArrayList<>();
@ -267,18 +240,4 @@ public class SparkEntityInference extends AbstractSparkJob{
.collect(Collectors.toList());
}
private static String downloadModelFromHDFS(String hdfsPath) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
String fileName = new Path(hdfsPath).getName();
String localModelPath = "/tmp/" + fileName;
File file = new File(localModelPath);
if (!file.exists()) {
fs.copyToLocalFile(new Path(hdfsPath), new Path(localModelPath));
}
return localModelPath;
}
}