diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/ISClient.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/ISClient.java index 1cb1eb4bc..994108b1c 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/ISClient.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/ISClient.java @@ -29,17 +29,21 @@ public class ISClient implements Serializable { private static final String INPUT_ACTION_SET_ID_SEPARATOR = ","; - public static List getLatestRawsetPaths(String isLookupUrl, String setIds) { + private ISLookUpService isLookup; + + public ISClient(String isLookupUrl) { + isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl); + } + + public List getLatestRawsetPaths(String setIds) { - ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl); - ISClient isClient = new ISClient(); List ids = Lists.newArrayList(Splitter.on(INPUT_ACTION_SET_ID_SEPARATOR) .omitEmptyStrings() .trimResults() .split(setIds)); return ids.stream() - .map(id -> isClient.getSet(isLookup, id)) + .map(id -> getSet(isLookup, id)) .map(as -> as.getPathToLatest()) .collect(Collectors.toCollection(ArrayList::new)); } diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java index 8ba331626..31a4da190 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java @@ -41,6 +41,15 @@ public class PartitionActionSetsByPayloadTypeJob { StructField$.MODULE$.apply("payload", DataTypes.StringType, false, Metadata.empty()) )); + private ISClient isClient; + + public PartitionActionSetsByPayloadTypeJob(String isLookupUrl) { + this.isClient = new ISClient(isLookupUrl); + } + + public PartitionActionSetsByPayloadTypeJob() { + } + public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils.toString( PromoteActionPayloadForGraphTableJob.class @@ -63,7 +72,12 @@ public class PartitionActionSetsByPayloadTypeJob { String isLookupUrl = parser.get("isLookupUrl"); logger.info("isLookupUrl: {}", isLookupUrl); - List inputActionSetPaths = ISClient.getLatestRawsetPaths(isLookupUrl, inputActionSetIds); + new PartitionActionSetsByPayloadTypeJob(isLookupUrl).run(isSparkSessionManaged, inputActionSetIds, outputPath); + } + + protected void run(Boolean isSparkSessionManaged, String inputActionSetIds, String outputPath) { + + List inputActionSetPaths = getIsClient().getLatestRawsetPaths(inputActionSetIds); logger.info("inputActionSetPaths: {}", String.join(",", inputActionSetPaths)); SparkConf conf = new SparkConf(); @@ -95,21 +109,15 @@ public class PartitionActionSetsByPayloadTypeJob { String path) { logger.info("Reading actions from path: {}", path); - List files = HdfsSupport.listFiles(path, spark.sparkContext().hadoopConfiguration()); - logger.info("Found files: {}", String.join(",", files)); - JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - return files - .stream() - .map(file -> { - JavaRDD rdd = sc - .sequenceFile(file, Text.class, Text.class) - .map(x -> RowFactory.create(x._1().toString(), x._2().toString())); - return spark.createDataFrame(rdd, KV_SCHEMA) - .withColumn("atomic_action", from_json(col("value"), ATOMIC_ACTION_SCHEMA)) - .select(expr("atomic_action.*")); - }) - .reduce(spark.createDataFrame(Collections.emptyList(), ATOMIC_ACTION_SCHEMA), Dataset::union); + + JavaRDD rdd = sc + .sequenceFile(path, Text.class, Text.class) + .map(x -> RowFactory.create(x._1().toString(), x._2().toString())); + + return spark.createDataFrame(rdd, KV_SCHEMA) + .withColumn("atomic_action", from_json(col("value"), ATOMIC_ACTION_SCHEMA)) + .select(expr("atomic_action.*")); } private static void saveActions(Dataset actionDS, @@ -121,4 +129,12 @@ public class PartitionActionSetsByPayloadTypeJob { .mode(SaveMode.Append) .parquet(path); } + + public ISClient getIsClient() { + return isClient; + } + + public void setIsClient(ISClient isClient) { + this.isClient = isClient; + } } diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java index 68bb35c2b..20b75842c 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java @@ -8,6 +8,7 @@ import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -119,10 +120,17 @@ public class PromoteActionPayloadForGraphTableJob { String path, Class rowClazz) { logger.info("Reading graph table from path: {}", path); + + return spark.read() + .textFile(path) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, rowClazz), Encoders.bean(rowClazz)); + + /* return spark .read() .parquet(path) .as(Encoders.bean(rowClazz)); + */ } private static Dataset readActionPayload(SparkSession spark, diff --git a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJobTest.java b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJobTest.java index 2761a7c89..bd5dc9a5d 100644 --- a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJobTest.java +++ b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJobTest.java @@ -1,6 +1,8 @@ package eu.dnetlib.dhp.actionmanager.partition; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import eu.dnetlib.dhp.actionmanager.ISClient; import eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJobTest; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.hadoop.conf.Configuration; @@ -15,7 +17,11 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.*; import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; import scala.Tuple2; import scala.collection.mutable.Seq; @@ -31,6 +37,7 @@ import static org.apache.spark.sql.functions.*; import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static scala.collection.JavaConversions.mutableSeqAsJavaList; +@ExtendWith(MockitoExtension.class) public class PartitionActionSetsByPayloadTypeJobTest { private static final ClassLoader cl = PartitionActionSetsByPayloadTypeJobTest.class.getClassLoader(); @@ -64,20 +71,29 @@ public class PartitionActionSetsByPayloadTypeJobTest { @Nested class Main { + @Mock + private ISClient isClient; + @Test public void shouldPartitionActionSetsByPayloadType(@TempDir Path workingDir) throws Exception { // given - Path inputActionSetsDir = workingDir.resolve("input").resolve("action_sets"); + Path inputActionSetsBaseDir = workingDir.resolve("input").resolve("action_sets"); Path outputDir = workingDir.resolve("output"); - Map> oafsByClassName = createActionSets(inputActionSetsDir); + Map> oafsByClassName = createActionSets(inputActionSetsBaseDir); + + List inputActionSetsPaths = resolveInputActionSetPaths(inputActionSetsBaseDir); // when - PartitionActionSetsByPayloadTypeJob.main(new String[]{ - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-inputActionSetPaths", inputActionSetsDir.toString(), - "-outputPath", outputDir.toString() - }); + Mockito.when(isClient.getLatestRawsetPaths(Mockito.anyString())).thenReturn(inputActionSetsPaths); + + PartitionActionSetsByPayloadTypeJob job = new PartitionActionSetsByPayloadTypeJob(); + job.setIsClient(isClient); + job.run( + Boolean.FALSE, + "", // it can be empty we're mocking the response from isClient to resolve the paths + outputDir.toString() + ); // then Files.exists(outputDir); @@ -94,10 +110,19 @@ public class PartitionActionSetsByPayloadTypeJobTest { } } + private List resolveInputActionSetPaths(Path inputActionSetsBaseDir) throws IOException { + Path inputActionSetJsonDumpsDir = getInputActionSetJsonDumpsDir(); + return Files + .list(inputActionSetJsonDumpsDir) + .map(path -> { + String inputActionSetId = path.getFileName().toString(); + return inputActionSetsBaseDir.resolve(inputActionSetId).toString(); + }) + .collect(Collectors.toCollection(ArrayList::new)); + } + private static Map> createActionSets(Path inputActionSetsDir) throws IOException { - Path inputActionSetJsonDumpsDir = Paths - .get(Objects.requireNonNull(cl.getResource("eu/dnetlib/dhp/actionmanager/partition/input/")) - .getFile()); + Path inputActionSetJsonDumpsDir = getInputActionSetJsonDumpsDir(); Map> oafsByType = new HashMap<>(); Files @@ -138,6 +163,12 @@ public class PartitionActionSetsByPayloadTypeJobTest { return oafsByType; } + private static Path getInputActionSetJsonDumpsDir() { + return Paths + .get(Objects.requireNonNull(cl.getResource("eu/dnetlib/dhp/actionmanager/partition/input/")) + .getFile()); + } + private static Dataset readActionsFromJsonDump(String path) { return spark .read()