diff --git a/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/PrepareSWHActionsets.java b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/PrepareSWHActionsets.java index 2691d4b7e..5b85c29fb 100644 --- a/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/PrepareSWHActionsets.java +++ b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/PrepareSWHActionsets.java @@ -58,6 +58,9 @@ public class PrepareSWHActionsets { final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); + final String hiveDbName = parser.get("hiveDbName"); + log.info("hiveDbName: {}", hiveDbName); + final Boolean isSparkSessionManaged = Optional .ofNullable(parser.get("isSparkSessionManaged")) .map(Boolean::valueOf) @@ -67,9 +70,6 @@ public class PrepareSWHActionsets { final String inputPath = parser.get("lastVisitsPath"); log.info("inputPath: {}", inputPath); - final String softwareInputPath = parser.get("softwareInputPath"); - log.info("softwareInputPath: {}", softwareInputPath); - final String outputPath = parser.get("actionsetsPath"); log.info("outputPath: {}", outputPath); @@ -79,7 +79,7 @@ public class PrepareSWHActionsets { conf, isSparkSessionManaged, spark -> { - JavaPairRDD softwareRDD = prepareActionsets(spark, inputPath, softwareInputPath); + JavaPairRDD softwareRDD = prepareActionsets(spark, inputPath, hiveDbName); softwareRDD .saveAsHadoopFile( outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); @@ -110,24 +110,27 @@ public class PrepareSWHActionsets { return spark.createDataFrame(swhRDD, schema); } - private static Dataset loadGraphSoftwareData(SparkSession spark, String softwareInputPath) { - return spark - .read() - .textFile(softwareInputPath) - .map( - (MapFunction) t -> OBJECT_MAPPER.readValue(t, Software.class), - Encoders.bean(Software.class)) - .filter(t -> t.getCodeRepositoryUrl() != null) - .select(col("id"), col("codeRepositoryUrl.value").as("repoUrl")); + private static Dataset loadGraphSoftwareData(SparkSession spark, String hiveDbName) { + + String queryTemplate = "SELECT id, coderepositoryurl.value AS repoUrl" + + "FROM %s.software " + + "WHERE coderepositoryurl IS NOT NULL"; + + String query = String.format(queryTemplate, hiveDbName); + + log.info("Hive query to fetch all software with a code repository URL: {}", query); + + return spark.sql(query); + } private static JavaPairRDD prepareActionsets(SparkSession spark, String inputPath, - String softwareInputPath) { + String hiveDbName) { Dataset swhDF = loadSWHData(spark, inputPath); // swhDF.show(false); - Dataset graphSoftwareDF = loadGraphSoftwareData(spark, softwareInputPath); + Dataset graphSoftwareDF = loadGraphSoftwareData(spark, hiveDbName); // graphSoftwareDF.show(5); Dataset joinedDF = graphSoftwareDF.join(swhDF, "repoUrl").select("id", "swhid"); diff --git a/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_prepare_swh_actionsets.json b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_prepare_swh_actionsets.json index 07ab0b1f4..a63b7388d 100644 --- a/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_prepare_swh_actionsets.json +++ b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_prepare_swh_actionsets.json @@ -16,11 +16,5 @@ "paramLongName": "actionsetsPath", "paramDescription": "the URL path where to store actionsets", "paramRequired": true - }, - { - "paramName": "sip", - "paramLongName": "softwareInputPath", - "paramDescription": "the URL path of the software in the graph", - "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/job.properties b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/job.properties index 35c068286..9c3a50d5d 100644 --- a/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/job.properties +++ b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/job.properties @@ -1,12 +1,11 @@ # hive -hiveDbName=openaire_prod_20230914 +hiveDbName=openaire_beta_20231208 # input/output files softwareCodeRepositoryURLs=${workingDir}/1_code_repo_urls.csv lastVisitsPath=${workingDir}/2_last_visits.seq archiveRequestsPath=${workingDir}/3_archive_requests.seq actionsetsPath=${workingDir}/4_actionsets -graphPath=/tmp/prod_provision/graph/18_graph_blacklisted apiAccessToken=eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJhMTMxYTQ1My1hM2IyLTQwMTUtODQ2Ny05MzAyZjk3MTFkOGEifQ.eyJpYXQiOjE2OTQ2MzYwMjAsImp0aSI6IjkwZjdkNTNjLTQ5YTktNGFiMy1hY2E0LTcwMTViMjEyZTNjNiIsImlzcyI6Imh0dHBzOi8vYXV0aC5zb2Z0d2FyZWhlcml0YWdlLm9yZy9hdXRoL3JlYWxtcy9Tb2Z0d2FyZUhlcml0YWdlIiwiYXVkIjoiaHR0cHM6Ly9hdXRoLnNvZnR3YXJlaGVyaXRhZ2Uub3JnL2F1dGgvcmVhbG1zL1NvZnR3YXJlSGVyaXRhZ2UiLCJzdWIiOiIzMTY5OWZkNC0xNmE0LTQxOWItYTdhMi00NjI5MDY4ZjI3OWEiLCJ0eXAiOiJPZmZsaW5lIiwiYXpwIjoic3doLXdlYiIsInNlc3Npb25fc3RhdGUiOiIzMjYzMzEwMS00ZDRkLTQwMjItODU2NC1iMzNlMTJiNTE3ZDkiLCJzY29wZSI6Im9wZW5pZCBvZmZsaW5lX2FjY2VzcyBwcm9maWxlIGVtYWlsIn0.XHj1VIZu1dZ4Ej32-oU84mFmaox9cLNjXosNxwZM0Xs @@ -16,4 +15,4 @@ requestDelay=100 softwareLimit=500 -resume=collect-software-repository-urls +resumeFrom=collect-software-repository-urls diff --git a/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/oozie_app/workflow.xml b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/oozie_app/workflow.xml index c625fcb5b..33691193f 100644 --- a/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/oozie_app/workflow.xml @@ -22,10 +22,6 @@ actionsetsPath The path in the HDFS to save the action sets - - graphPath - The path in the HDFS to the base folder of the graph - maxNumberOfRetry Max number of retries for failed API calls @@ -170,9 +166,9 @@ --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + --hiveDbName${hiveDbName} --lastVisitsPath${lastVisitsPath} --actionsetsPath${actionsetsPath} - --softwareInputPath${graphPath}/software diff --git a/dhp-workflows/dhp-swh/src/test/java/eu/dnetlib/dhp/swh/PrepareSWHActionsetsTest.java b/dhp-workflows/dhp-swh/src/test/java/eu/dnetlib/dhp/swh/PrepareSWHActionsetsTest.java deleted file mode 100644 index ffcb7aaee..000000000 --- a/dhp-workflows/dhp-swh/src/test/java/eu/dnetlib/dhp/swh/PrepareSWHActionsetsTest.java +++ /dev/null @@ -1,97 +0,0 @@ - -package eu.dnetlib.dhp.swh; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.io.Text; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; - -public class PrepareSWHActionsetsTest { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - private static SparkSession spark; - - private static Path workingDir; - - private static final Logger log = LoggerFactory - .getLogger(PrepareSWHActionsetsTest.class); - - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files.createTempDirectory(PrepareSWHActionsetsTest.class.getSimpleName()); - - log.info("Using work dir {}", workingDir); - - SparkConf conf = new SparkConf(); - conf.setAppName(PrepareSWHActionsetsTest.class.getSimpleName()); - - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.toString()); - conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); - - spark = SparkSession - .builder() - .appName(PrepareSWHActionsetsTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } - - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - spark.stop(); - } - - @Test - void testRun() throws Exception { - - String lastVisitsPath = getClass() - .getResource("/eu/dnetlib/dhp/swh/last_visits_data.seq") - .getPath(); - - String outputPath = workingDir.toString() + "/actionSet"; - - String softwareInputPath = getClass() - .getResource("/eu/dnetlib/dhp/swh/software.json.gz") - .getPath(); - - PrepareSWHActionsets - .main( - new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-lastVisitsPath", lastVisitsPath, - "-softwareInputPath", softwareInputPath, - "-actionsetsPath", outputPath - }); - - } -} diff --git a/dhp-workflows/dhp-swh/src/test/resources/eu/dnetlib/dhp/swh/last_visits_data.seq b/dhp-workflows/dhp-swh/src/test/resources/eu/dnetlib/dhp/swh/last_visits_data.seq deleted file mode 100644 index 683fc0e69..000000000 Binary files a/dhp-workflows/dhp-swh/src/test/resources/eu/dnetlib/dhp/swh/last_visits_data.seq and /dev/null differ diff --git a/dhp-workflows/dhp-swh/src/test/resources/eu/dnetlib/dhp/swh/software.json.gz b/dhp-workflows/dhp-swh/src/test/resources/eu/dnetlib/dhp/swh/software.json.gz deleted file mode 100644 index 3a62c0615..000000000 Binary files a/dhp-workflows/dhp-swh/src/test/resources/eu/dnetlib/dhp/swh/software.json.gz and /dev/null differ