WIP: Fix SWH integration WF #396

Draft
schatz wants to merge 1 commits from SWH_issue_377 into beta
7 changed files with 21 additions and 126 deletions
Showing only changes of commit 0301e4cb96 - Show all commits

View File

@ -58,6 +58,9 @@ public class PrepareSWHActionsets {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final String hiveDbName = parser.get("hiveDbName");
log.info("hiveDbName: {}", hiveDbName);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
@ -67,9 +70,6 @@ public class PrepareSWHActionsets {
final String inputPath = parser.get("lastVisitsPath");
log.info("inputPath: {}", inputPath);
final String softwareInputPath = parser.get("softwareInputPath");
log.info("softwareInputPath: {}", softwareInputPath);
final String outputPath = parser.get("actionsetsPath");
log.info("outputPath: {}", outputPath);
@ -79,7 +79,7 @@ public class PrepareSWHActionsets {
conf,
isSparkSessionManaged,
spark -> {
JavaPairRDD<Text, Text> softwareRDD = prepareActionsets(spark, inputPath, softwareInputPath);
JavaPairRDD<Text, Text> softwareRDD = prepareActionsets(spark, inputPath, hiveDbName);
softwareRDD
.saveAsHadoopFile(
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
@ -110,24 +110,27 @@ public class PrepareSWHActionsets {
return spark.createDataFrame(swhRDD, schema);
}
private static Dataset<Row> loadGraphSoftwareData(SparkSession spark, String softwareInputPath) {
return spark
.read()
.textFile(softwareInputPath)
.map(
(MapFunction<String, Software>) t -> OBJECT_MAPPER.readValue(t, Software.class),
Encoders.bean(Software.class))
.filter(t -> t.getCodeRepositoryUrl() != null)
.select(col("id"), col("codeRepositoryUrl.value").as("repoUrl"));
private static Dataset<Row> loadGraphSoftwareData(SparkSession spark, String hiveDbName) {
String queryTemplate = "SELECT id, coderepositoryurl.value AS repoUrl" +
"FROM %s.software " +
"WHERE coderepositoryurl IS NOT NULL";
String query = String.format(queryTemplate, hiveDbName);
log.info("Hive query to fetch all software with a code repository URL: {}", query);
return spark.sql(query);
}
private static <I extends Software> JavaPairRDD<Text, Text> prepareActionsets(SparkSession spark, String inputPath,
String softwareInputPath) {
String hiveDbName) {
Dataset<Row> swhDF = loadSWHData(spark, inputPath);
// swhDF.show(false);
Dataset<Row> graphSoftwareDF = loadGraphSoftwareData(spark, softwareInputPath);
Dataset<Row> graphSoftwareDF = loadGraphSoftwareData(spark, hiveDbName);
// graphSoftwareDF.show(5);
Dataset<Row> joinedDF = graphSoftwareDF.join(swhDF, "repoUrl").select("id", "swhid");

View File

@ -16,11 +16,5 @@
"paramLongName": "actionsetsPath",
"paramDescription": "the URL path where to store actionsets",
"paramRequired": true
},
{
"paramName": "sip",
"paramLongName": "softwareInputPath",
"paramDescription": "the URL path of the software in the graph",
"paramRequired": true
}
]

View File

@ -1,12 +1,11 @@
# hive
hiveDbName=openaire_prod_20230914
hiveDbName=openaire_beta_20231208
# input/output files
softwareCodeRepositoryURLs=${workingDir}/1_code_repo_urls.csv
lastVisitsPath=${workingDir}/2_last_visits.seq
archiveRequestsPath=${workingDir}/3_archive_requests.seq
actionsetsPath=${workingDir}/4_actionsets
graphPath=/tmp/prod_provision/graph/18_graph_blacklisted
apiAccessToken=eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJhMTMxYTQ1My1hM2IyLTQwMTUtODQ2Ny05MzAyZjk3MTFkOGEifQ.eyJpYXQiOjE2OTQ2MzYwMjAsImp0aSI6IjkwZjdkNTNjLTQ5YTktNGFiMy1hY2E0LTcwMTViMjEyZTNjNiIsImlzcyI6Imh0dHBzOi8vYXV0aC5zb2Z0d2FyZWhlcml0YWdlLm9yZy9hdXRoL3JlYWxtcy9Tb2Z0d2FyZUhlcml0YWdlIiwiYXVkIjoiaHR0cHM6Ly9hdXRoLnNvZnR3YXJlaGVyaXRhZ2Uub3JnL2F1dGgvcmVhbG1zL1NvZnR3YXJlSGVyaXRhZ2UiLCJzdWIiOiIzMTY5OWZkNC0xNmE0LTQxOWItYTdhMi00NjI5MDY4ZjI3OWEiLCJ0eXAiOiJPZmZsaW5lIiwiYXpwIjoic3doLXdlYiIsInNlc3Npb25fc3RhdGUiOiIzMjYzMzEwMS00ZDRkLTQwMjItODU2NC1iMzNlMTJiNTE3ZDkiLCJzY29wZSI6Im9wZW5pZCBvZmZsaW5lX2FjY2VzcyBwcm9maWxlIGVtYWlsIn0.XHj1VIZu1dZ4Ej32-oU84mFmaox9cLNjXosNxwZM0Xs
@ -16,4 +15,4 @@ requestDelay=100
softwareLimit=500
resume=collect-software-repository-urls
resumeFrom=collect-software-repository-urls

View File

@ -22,10 +22,6 @@
<name>actionsetsPath</name>
<description>The path in the HDFS to save the action sets</description>
</property>
<property>
<name>graphPath</name>
<description>The path in the HDFS to the base folder of the graph</description>
</property>
<property>
<name>maxNumberOfRetry</name>
<description>Max number of retries for failed API calls</description>
@ -170,9 +166,9 @@
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--lastVisitsPath</arg><arg>${lastVisitsPath}</arg>
<arg>--actionsetsPath</arg><arg>${actionsetsPath}</arg>
<arg>--softwareInputPath</arg><arg>${graphPath}/software</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>

View File

@ -1,97 +0,0 @@
package eu.dnetlib.dhp.swh;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
public class PrepareSWHActionsetsTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(PrepareSWHActionsetsTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(PrepareSWHActionsetsTest.class.getSimpleName());
log.info("Using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(PrepareSWHActionsetsTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(PrepareSWHActionsetsTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
void testRun() throws Exception {
String lastVisitsPath = getClass()
.getResource("/eu/dnetlib/dhp/swh/last_visits_data.seq")
.getPath();
String outputPath = workingDir.toString() + "/actionSet";
String softwareInputPath = getClass()
.getResource("/eu/dnetlib/dhp/swh/software.json.gz")
.getPath();
PrepareSWHActionsets
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-lastVisitsPath", lastVisitsPath,
"-softwareInputPath", softwareInputPath,
"-actionsetsPath", outputPath
});
}
}