forked from D-Net/dnet-hadoop
PromoteActionPayloadForGraphTableJob reads directly the content pointed by the input path, adjusted promote action tests (ISLookup mock)
This commit is contained in:
parent
ff30f99c65
commit
c439d0c6bb
|
@ -29,17 +29,21 @@ public class ISClient implements Serializable {
|
|||
|
||||
private static final String INPUT_ACTION_SET_ID_SEPARATOR = ",";
|
||||
|
||||
public static List<String> getLatestRawsetPaths(String isLookupUrl, String setIds) {
|
||||
private ISLookUpService isLookup;
|
||||
|
||||
public ISClient(String isLookupUrl) {
|
||||
isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
}
|
||||
|
||||
public List<String> getLatestRawsetPaths(String setIds) {
|
||||
|
||||
ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
ISClient isClient = new ISClient();
|
||||
List<String> ids = Lists.newArrayList(Splitter.on(INPUT_ACTION_SET_ID_SEPARATOR)
|
||||
.omitEmptyStrings()
|
||||
.trimResults()
|
||||
.split(setIds));
|
||||
|
||||
return ids.stream()
|
||||
.map(id -> isClient.getSet(isLookup, id))
|
||||
.map(id -> getSet(isLookup, id))
|
||||
.map(as -> as.getPathToLatest())
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
}
|
||||
|
|
|
@ -41,6 +41,15 @@ public class PartitionActionSetsByPayloadTypeJob {
|
|||
StructField$.MODULE$.apply("payload", DataTypes.StringType, false, Metadata.empty())
|
||||
));
|
||||
|
||||
private ISClient isClient;
|
||||
|
||||
public PartitionActionSetsByPayloadTypeJob(String isLookupUrl) {
|
||||
this.isClient = new ISClient(isLookupUrl);
|
||||
}
|
||||
|
||||
public PartitionActionSetsByPayloadTypeJob() {
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConfiguration = IOUtils.toString(
|
||||
PromoteActionPayloadForGraphTableJob.class
|
||||
|
@ -63,7 +72,12 @@ public class PartitionActionSetsByPayloadTypeJob {
|
|||
String isLookupUrl = parser.get("isLookupUrl");
|
||||
logger.info("isLookupUrl: {}", isLookupUrl);
|
||||
|
||||
List<String> inputActionSetPaths = ISClient.getLatestRawsetPaths(isLookupUrl, inputActionSetIds);
|
||||
new PartitionActionSetsByPayloadTypeJob(isLookupUrl).run(isSparkSessionManaged, inputActionSetIds, outputPath);
|
||||
}
|
||||
|
||||
protected void run(Boolean isSparkSessionManaged, String inputActionSetIds, String outputPath) {
|
||||
|
||||
List<String> inputActionSetPaths = getIsClient().getLatestRawsetPaths(inputActionSetIds);
|
||||
logger.info("inputActionSetPaths: {}", String.join(",", inputActionSetPaths));
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
@ -95,21 +109,15 @@ public class PartitionActionSetsByPayloadTypeJob {
|
|||
String path) {
|
||||
logger.info("Reading actions from path: {}", path);
|
||||
|
||||
List<String> files = HdfsSupport.listFiles(path, spark.sparkContext().hadoopConfiguration());
|
||||
logger.info("Found files: {}", String.join(",", files));
|
||||
|
||||
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
return files
|
||||
.stream()
|
||||
.map(file -> {
|
||||
|
||||
JavaRDD<Row> rdd = sc
|
||||
.sequenceFile(file, Text.class, Text.class)
|
||||
.sequenceFile(path, Text.class, Text.class)
|
||||
.map(x -> RowFactory.create(x._1().toString(), x._2().toString()));
|
||||
|
||||
return spark.createDataFrame(rdd, KV_SCHEMA)
|
||||
.withColumn("atomic_action", from_json(col("value"), ATOMIC_ACTION_SCHEMA))
|
||||
.select(expr("atomic_action.*"));
|
||||
})
|
||||
.reduce(spark.createDataFrame(Collections.emptyList(), ATOMIC_ACTION_SCHEMA), Dataset::union);
|
||||
}
|
||||
|
||||
private static void saveActions(Dataset<Row> actionDS,
|
||||
|
@ -121,4 +129,12 @@ public class PartitionActionSetsByPayloadTypeJob {
|
|||
.mode(SaveMode.Append)
|
||||
.parquet(path);
|
||||
}
|
||||
|
||||
public ISClient getIsClient() {
|
||||
return isClient;
|
||||
}
|
||||
|
||||
public void setIsClient(ISClient isClient) {
|
||||
this.isClient = isClient;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import eu.dnetlib.dhp.schema.common.ModelSupport;
|
|||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
|
@ -119,10 +120,17 @@ public class PromoteActionPayloadForGraphTableJob {
|
|||
String path,
|
||||
Class<G> rowClazz) {
|
||||
logger.info("Reading graph table from path: {}", path);
|
||||
|
||||
return spark.read()
|
||||
.textFile(path)
|
||||
.map((MapFunction<String, G>) value -> OBJECT_MAPPER.readValue(value, rowClazz), Encoders.bean(rowClazz));
|
||||
|
||||
/*
|
||||
return spark
|
||||
.read()
|
||||
.parquet(path)
|
||||
.as(Encoders.bean(rowClazz));
|
||||
*/
|
||||
}
|
||||
|
||||
private static <A extends Oaf> Dataset<A> readActionPayload(SparkSession spark,
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package eu.dnetlib.dhp.actionmanager.partition;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import eu.dnetlib.dhp.actionmanager.ISClient;
|
||||
import eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJobTest;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -15,7 +17,11 @@ import org.apache.spark.sql.Row;
|
|||
import org.apache.spark.sql.SparkSession;
|
||||
import org.apache.spark.sql.types.*;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import scala.Tuple2;
|
||||
import scala.collection.mutable.Seq;
|
||||
|
||||
|
@ -31,6 +37,7 @@ import static org.apache.spark.sql.functions.*;
|
|||
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
|
||||
import static scala.collection.JavaConversions.mutableSeqAsJavaList;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class PartitionActionSetsByPayloadTypeJobTest {
|
||||
private static final ClassLoader cl = PartitionActionSetsByPayloadTypeJobTest.class.getClassLoader();
|
||||
|
||||
|
@ -64,20 +71,29 @@ public class PartitionActionSetsByPayloadTypeJobTest {
|
|||
@Nested
|
||||
class Main {
|
||||
|
||||
@Mock
|
||||
private ISClient isClient;
|
||||
|
||||
@Test
|
||||
public void shouldPartitionActionSetsByPayloadType(@TempDir Path workingDir) throws Exception {
|
||||
// given
|
||||
Path inputActionSetsDir = workingDir.resolve("input").resolve("action_sets");
|
||||
Path inputActionSetsBaseDir = workingDir.resolve("input").resolve("action_sets");
|
||||
Path outputDir = workingDir.resolve("output");
|
||||
|
||||
Map<String, List<String>> oafsByClassName = createActionSets(inputActionSetsDir);
|
||||
Map<String, List<String>> oafsByClassName = createActionSets(inputActionSetsBaseDir);
|
||||
|
||||
List<String> inputActionSetsPaths = resolveInputActionSetPaths(inputActionSetsBaseDir);
|
||||
|
||||
// when
|
||||
PartitionActionSetsByPayloadTypeJob.main(new String[]{
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-inputActionSetPaths", inputActionSetsDir.toString(),
|
||||
"-outputPath", outputDir.toString()
|
||||
});
|
||||
Mockito.when(isClient.getLatestRawsetPaths(Mockito.anyString())).thenReturn(inputActionSetsPaths);
|
||||
|
||||
PartitionActionSetsByPayloadTypeJob job = new PartitionActionSetsByPayloadTypeJob();
|
||||
job.setIsClient(isClient);
|
||||
job.run(
|
||||
Boolean.FALSE,
|
||||
"", // it can be empty we're mocking the response from isClient to resolve the paths
|
||||
outputDir.toString()
|
||||
);
|
||||
|
||||
// then
|
||||
Files.exists(outputDir);
|
||||
|
@ -94,10 +110,19 @@ public class PartitionActionSetsByPayloadTypeJobTest {
|
|||
}
|
||||
}
|
||||
|
||||
private List<String> resolveInputActionSetPaths(Path inputActionSetsBaseDir) throws IOException {
|
||||
Path inputActionSetJsonDumpsDir = getInputActionSetJsonDumpsDir();
|
||||
return Files
|
||||
.list(inputActionSetJsonDumpsDir)
|
||||
.map(path -> {
|
||||
String inputActionSetId = path.getFileName().toString();
|
||||
return inputActionSetsBaseDir.resolve(inputActionSetId).toString();
|
||||
})
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
}
|
||||
|
||||
private static Map<String, List<String>> createActionSets(Path inputActionSetsDir) throws IOException {
|
||||
Path inputActionSetJsonDumpsDir = Paths
|
||||
.get(Objects.requireNonNull(cl.getResource("eu/dnetlib/dhp/actionmanager/partition/input/"))
|
||||
.getFile());
|
||||
Path inputActionSetJsonDumpsDir = getInputActionSetJsonDumpsDir();
|
||||
|
||||
Map<String, List<String>> oafsByType = new HashMap<>();
|
||||
Files
|
||||
|
@ -138,6 +163,12 @@ public class PartitionActionSetsByPayloadTypeJobTest {
|
|||
return oafsByType;
|
||||
}
|
||||
|
||||
private static Path getInputActionSetJsonDumpsDir() {
|
||||
return Paths
|
||||
.get(Objects.requireNonNull(cl.getResource("eu/dnetlib/dhp/actionmanager/partition/input/"))
|
||||
.getFile());
|
||||
}
|
||||
|
||||
private static Dataset<String> readActionsFromJsonDump(String path) {
|
||||
return spark
|
||||
.read()
|
||||
|
|
Loading…
Reference in New Issue