From 958f0693d61fe51606b1e1c08e1a91e5c14866a2 Mon Sep 17 00:00:00 2001 From: pjacewicz Date: Mon, 17 Feb 2020 18:18:24 +0100 Subject: [PATCH] WIP: logic for promoting action sets added --- dhp-workflows/dhp-actionmanager/pom.xml | 29 ++- .../PromoteActionSetFromHDFSFunctions.java | 45 ++++ .../PromoteActionSetFromHDFSJob.java | 240 ++++++++++++++---- .../actionmanager_input_parameters.json | 18 +- ...PromoteActionSetFromHDFSFunctionsTest.java | 169 ++++++++++++ .../PromoteActionSetFromHDFSJobTest.java | 54 ++-- 6 files changed, 477 insertions(+), 78 deletions(-) create mode 100644 dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSFunctions.java create mode 100644 dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSFunctionsTest.java diff --git a/dhp-workflows/dhp-actionmanager/pom.xml b/dhp-workflows/dhp-actionmanager/pom.xml index be76db755..281bcda4a 100644 --- a/dhp-workflows/dhp-actionmanager/pom.xml +++ b/dhp-workflows/dhp-actionmanager/pom.xml @@ -5,7 +5,7 @@ eu.dnetlib.dhp dhp-workflows - 1.0.5-SNAPSHOT + 1.1.6-SNAPSHOT dhp-actionmanager @@ -52,11 +52,34 @@ 2.25.0 test + eu.dnetlib.dhp dhp-schemas - 1.0.5-SNAPSHOT - compile + ${project.version} + + + + eu.dnetlib + dnet-actionmanager-common + [6.0.0, 7.0.0) + + + + apache + commons-logging + + + org.apache.hadoop + hadoop-common + + + + + + eu.dnetlib + dnet-openaire-data-protos + [3.0.0, 4.0.0) diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSFunctions.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSFunctions.java new file mode 100644 index 000000000..c1f2e4c11 --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSFunctions.java @@ -0,0 +1,45 @@ +package eu.dnetlib.dhp.actionmanager; + +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.ReduceFunction; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import scala.Tuple2; + +import java.util.Optional; +import java.util.function.BiFunction; + +public class PromoteActionSetFromHDFSFunctions { + + public static Dataset groupEntitiesByIdAndMerge(Dataset entityDS, + Class clazz) { + return entityDS + .groupByKey((MapFunction) OafEntity::getId, Encoders.STRING()) + .reduceGroups((ReduceFunction) (x1, x2) -> { + x1.mergeFrom(x2); + return x1; + }) + .map((MapFunction, T>) pair -> pair._2, Encoders.bean(clazz)); + } + + public static Dataset joinEntitiesWithActionPayloadAndMerge(Dataset entityDS, + Dataset actionPayloadDS, + BiFunction, Dataset, Column> entityToActionPayloadJoinExpr, + BiFunction, T> actionPayloadToEntityFn, + Class clazz) { + return entityDS + .joinWith(actionPayloadDS, entityToActionPayloadJoinExpr.apply(entityDS, actionPayloadDS), "left_outer") + .map((MapFunction, T>) pair -> Optional + .ofNullable(pair._2()) + .map(x -> { + T entity = actionPayloadToEntityFn.apply(x, clazz); + pair._1().mergeFrom(entity); + return pair._1(); + }) + .orElse(pair._1()), Encoders.bean(clazz)); + } + + +} diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJob.java index e21dd2ace..099b0b720 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJob.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJob.java @@ -1,24 +1,24 @@ package eu.dnetlib.dhp.actionmanager; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.data.proto.OafProtos; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.OafEntity; -import eu.dnetlib.dhp.schema.oaf.Software; +import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.types.*; -import scala.Tuple2; import java.io.IOException; -import java.util.Collections; +import java.util.Arrays; +import java.util.List; +import static eu.dnetlib.dhp.actionmanager.PromoteActionSetFromHDFSFunctions.*; import static org.apache.spark.sql.functions.*; public class PromoteActionSetFromHDFSJob { @@ -28,73 +28,209 @@ public class PromoteActionSetFromHDFSJob { PromoteActionSetFromHDFSJob.class .getResourceAsStream("/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json"))); parser.parseArgument(args); - String inputActionSetPath = parser.get("input"); - String outputPath = parser.get("output"); - final SparkConf conf = new SparkConf(); + String inputGraphPath = parser.get("inputGraphPath"); + List inputActionSetPaths = Arrays.asList(parser.get("inputActionSetPaths").split(",")); + String outputGraphPath = parser.get("outputGraphPath"); + + SparkConf conf = new SparkConf(); conf.setMaster(parser.get("master")); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); try (SparkSession spark = SparkSession.builder().config(conf).getOrCreate()) { - // reading actions as RDD - JavaRDD actionsRDD = JavaSparkContext - .fromSparkContext(spark.sparkContext()) - .sequenceFile(inputActionSetPath, Text.class, Text.class) - .map(x -> RowFactory.create(x._2().toString())); - // converting actions to DataFrame and deserializing content of TargetValue - // using unbase64 on TargetValue content to get String representation - StructType rowSchema = StructType$.MODULE$.apply( - Collections.singletonList( - StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty()) - )); - Dataset deserializedTargetValue = spark.createDataFrame(actionsRDD, rowSchema) - .withColumn("TargetValue", get_json_object(col("value"), "$.TargetValue")) - .select(unbase64(col("TargetValue")).cast(DataTypes.StringType).as("target_value_json")) + // ----- READ ----- + // dataset + Dataset datasetDS = readGraphTable( + spark, String.format("%s/dataset", inputGraphPath), eu.dnetlib.dhp.schema.oaf.Dataset.class) .cache(); + datasetDS.printSchema(); + datasetDS.show(); - // printing: only for testing - deserializedTargetValue.printSchema(); - deserializedTargetValue.show(); - System.out.println(deserializedTargetValue.first().toString()); + // datasource + Dataset datasourceDS = + readGraphTable(spark, String.format("%s/datasource", inputGraphPath), Datasource.class) + .cache(); + datasourceDS.printSchema(); + datasourceDS.show(); - // grouping and merging: should be generic - Dataset softwareDS = deserializedTargetValue - .map((MapFunction) PromoteActionSetFromHDFSJob::rowToOafEntity, Encoders.kryo(Software.class)) - .groupByKey((MapFunction) OafEntity::getId, Encoders.STRING()) - .reduceGroups((ReduceFunction) (software1, software2) -> { - software1.mergeFrom(software2); - return software1; - }) - .map((MapFunction, Software>) pair -> pair._2, Encoders.kryo(Software.class)); + // organization + Dataset organizationDS = + readGraphTable(spark, String.format("%s/organization", inputGraphPath), Organization.class) + .cache(); + organizationDS.printSchema(); + organizationDS.show(); + // otherresearchproduct + Dataset otherResearchProductDS = + readGraphTable(spark, String.format("%s/otherresearchproduct", inputGraphPath), OtherResearchProduct.class) + .cache(); + otherResearchProductDS.printSchema(); + otherResearchProductDS.show(); + + // project + Dataset projectDS = + readGraphTable(spark, String.format("%s/project", inputGraphPath), Project.class) + .cache(); + projectDS.printSchema(); + projectDS.show(); + + // publication + Dataset publicationDS = + readGraphTable(spark, String.format("%s/publication", inputGraphPath), Publication.class) + .cache(); + publicationDS.printSchema(); + publicationDS.show(); + + // relation + Dataset relationDS = + readGraphTable(spark, String.format("%s/relation", inputGraphPath), Relation.class) + .cache(); + relationDS.printSchema(); + relationDS.show(); + + // software + Dataset softwareDS = + readGraphTable(spark, String.format("%s/software", inputGraphPath), Software.class) + .cache(); softwareDS.printSchema(); softwareDS.show(); - // save -// softwareDS.toDF() -// .write() -// .partitionBy("id") -// .save(outputPath); + // actions + Dataset actionPayloadDS = inputActionSetPaths.stream() + .map(inputActionSetPath -> readActionSetPayload(spark, inputActionSetPath)) + .reduce(Dataset::union) + .get() + .cache(); + actionPayloadDS.printSchema(); + actionPayloadDS.show(); + System.out.println(String.join("\n", actionPayloadDS.takeAsList(20))); - // another approach: using only DataFrames i.e. DataSet, not DataSets + Dataset relationActionPayloadDS = filterActionPayloadForRelations(actionPayloadDS) + .cache(); + relationActionPayloadDS.printSchema(); + relationActionPayloadDS.show(); + + Dataset entityActionPayloadDS = filterActionPayloadForEntity(actionPayloadDS) + .cache(); + entityActionPayloadDS.printSchema(); + entityActionPayloadDS.show(); + + // ----- LOGIC ----- + Dataset processedDatasetDS = + processEntityDS(datasetDS, entityActionPayloadDS, eu.dnetlib.dhp.schema.oaf.Dataset.class); + Dataset processedDatasourceDS = + processEntityDS(datasourceDS, entityActionPayloadDS, Datasource.class); + Dataset processedOrganizationDS = + processEntityDS(organizationDS, entityActionPayloadDS, Organization.class); + Dataset processedOtherResearchProductDS = + processEntityDS(otherResearchProductDS, entityActionPayloadDS, OtherResearchProduct.class); + Dataset processedProjectDS = + processEntityDS(projectDS, entityActionPayloadDS, Project.class); + Dataset processedPublicationDS = + processEntityDS(publicationDS, entityActionPayloadDS, Publication.class); + Dataset processedRelationDS = + processRelationDS(relationDS, relationActionPayloadDS); + Dataset processedSoftwareDS = + processEntityDS(softwareDS, entityActionPayloadDS, Software.class); + + // ----- SAVE ----- + processedDatasetDS.write() + .save(String.format("%s/dataset", outputGraphPath)); + processedDatasourceDS.write() + .save(String.format("%s/datasource", outputGraphPath)); + processedOrganizationDS.write() + .save(String.format("%s/organization", outputGraphPath)); + processedOtherResearchProductDS.write() + .save(String.format("%s/otherresearchproduct", outputGraphPath)); + processedProjectDS.write() + .save(String.format("%s/project", outputGraphPath)); + processedPublicationDS.write() + .save(String.format("%s/publication", outputGraphPath)); + processedRelationDS.write() + .save(String.format("%s/relation", outputGraphPath)); + processedSoftwareDS.write() + .save(String.format("%s/software", outputGraphPath)); } } - private static Software rowToOafEntity(Row row) { - // converts row with JSON into Software object: should be generic - // currently extracts only "entity.id" field from JSON - ObjectMapper objectMapper = new ObjectMapper(); + private static final StructType KV_SCHEMA = StructType$.MODULE$.apply( + Arrays.asList( + StructField$.MODULE$.apply("key", DataTypes.StringType, false, Metadata.empty()), + StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty()) + )); + + private static Dataset readGraphTable(SparkSession spark, String path, Class clazz) { + JavaRDD rows = JavaSparkContext + .fromSparkContext(spark.sparkContext()) + .sequenceFile(path, Text.class, Text.class) + .map(x -> RowFactory.create(x._1().toString(), x._2().toString())); + + return spark.createDataFrame(rows, KV_SCHEMA) + .map((MapFunction) row -> new ObjectMapper().readValue(row.getAs("value"), clazz), + Encoders.bean(clazz)); + } + + private static Dataset readActionSetPayload(SparkSession spark, String inputActionSetPath) { + JavaRDD actionsRDD = JavaSparkContext + .fromSparkContext(spark.sparkContext()) + .sequenceFile(inputActionSetPath, Text.class, Text.class) + .map(x -> RowFactory.create(x._1().toString(), x._2().toString())); + + return spark.createDataFrame(actionsRDD, KV_SCHEMA) + .select(unbase64(get_json_object(col("value"), "$.TargetValue")) + .cast(DataTypes.StringType).as("target_value_json")) + .as(Encoders.STRING()); + } + + private static Dataset filterActionPayloadForRelations(Dataset actionPayloadDS) { + return actionPayloadDS + .where(get_json_object(col("target_value_json"), "$.kind").equalTo("relation")); + } + + private static Dataset filterActionPayloadForEntity(Dataset actionPayloadDS) { + return actionPayloadDS + .where(get_json_object(col("target_value_json"), "$.kind").equalTo("entity")); + } + + + private static Dataset processEntityDS(Dataset entityDS, + Dataset actionPayloadDS, + Class clazz) { + Dataset groupedAndMerged = groupEntitiesByIdAndMerge(entityDS, clazz); + Dataset joinedAndMerged = joinEntitiesWithActionPayloadAndMerge(groupedAndMerged, + actionPayloadDS, + PromoteActionSetFromHDFSJob::entityToActionPayloadJoinExpr, + PromoteActionSetFromHDFSJob::actionPayloadToEntity, + clazz); + return groupEntitiesByIdAndMerge(joinedAndMerged, clazz); + } + + private static Column entityToActionPayloadJoinExpr(Dataset left, + Dataset right) { + return left.col("id").equalTo( + get_json_object(right.col("target_value_json"), "$.entity.id")); + } + + public static T actionPayloadToEntity(String actionPayload, + Class clazz) { try { - JsonNode jsonNode = objectMapper.readTree(row.getString(0)); - String id = jsonNode.at("/entity/id").asText(); - Software software = new Software(); - software.setId(id); - return software; + OafProtos.Oaf oldEntity = new ObjectMapper().readValue(actionPayload, OafProtos.Oaf.class); + return entityOldToNew(oldEntity, clazz); } catch (IOException e) { - e.printStackTrace(); throw new RuntimeException(e); } } + //TODO + private static T entityOldToNew(OafProtos.Oaf old, + Class clazz) { + return null; + } + + //TODO + private static Dataset processRelationDS(Dataset relationDS, + Dataset actionPayloadDS) { + return null; + } } diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json index 3b95c90d3..3dc02ef8c 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json @@ -6,15 +6,21 @@ "paramRequired": true }, { - "paramName": "i", - "paramLongName": "input", - "paramDescription": "the path of the input sequential file to read", + "paramName": "ig", + "paramLongName": "inputGraphPath", + "paramDescription": "#TODO: input graph path", "paramRequired": true }, { - "paramName": "o", - "paramLongName": "output", - "paramDescription": "the path of the result DataFrame on HDFS", + "paramName": "ia", + "paramLongName": "inputActionSetPaths", + "paramDescription": "#TODO: comma separated list of paths to input action sets", + "paramRequired": true + }, + { + "paramName": "og", + "paramLongName": "outputGraphPath", + "paramDescription": "#TODO: the path of the result DataFrame on HDFS", "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSFunctionsTest.java b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSFunctionsTest.java new file mode 100644 index 000000000..f5db61347 --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSFunctionsTest.java @@ -0,0 +1,169 @@ +package eu.dnetlib.dhp.actionmanager; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import static org.apache.spark.sql.functions.get_json_object; +import static org.junit.Assert.assertEquals; + +public class PromoteActionSetFromHDFSFunctionsTest { + + private static SparkSession spark; + + @BeforeClass + public static void beforeClass() { + SparkConf conf = new SparkConf(); + conf.setMaster("local"); + conf.setAppName(PromoteActionSetFromHDFSFunctionsTest.class.getSimpleName()); + conf.set("spark.driver.host", "localhost"); + spark = SparkSession.builder().config(conf).getOrCreate(); + } + + @AfterClass + public static void afterClass() { + spark.stop(); + } + + @Test + public void shouldGroupOafEntitiesByIdAndMergeWithinGroup() { + // given + String id1 = "id1"; + String id2 = "id2"; + String id3 = "id3"; + List entityData = Arrays.asList( + createOafEntityImpl(id1), + createOafEntityImpl(id2), createOafEntityImpl(id2), + createOafEntityImpl(id3), createOafEntityImpl(id3), createOafEntityImpl(id3) + ); + Dataset entityDS = spark.createDataset(entityData, Encoders.bean(OafEntityImpl.class)); + + // when + List results = PromoteActionSetFromHDFSFunctions + .groupEntitiesByIdAndMerge(entityDS, OafEntityImpl.class) + .collectAsList(); + System.out.println(results.stream().map(x -> String.format("%s:%d", x.getId(), x.merged)).collect(Collectors.joining(","))); + + // then + assertEquals(3, results.size()); + results.forEach(result -> { + switch (result.getId()) { + case "id1": + assertEquals(1, result.merged); + break; + case "id2": + assertEquals(2, result.merged); + break; + case "id3": + assertEquals(3, result.merged); + break; + } + }); + } + + @Test + public void shouldJoinWithActionPayloadUsingIdAndMerge() { + // given + String id1 = "id1"; + String id2 = "id2"; + String id3 = "id3"; + String id4 = "id4"; + List entityData = Arrays.asList( + createOafEntityImpl(id1), createOafEntityImpl(id2), createOafEntityImpl(id3), createOafEntityImpl(id4) + ); + Dataset entityDS = spark.createDataset(entityData, Encoders.bean(OafEntityImpl.class)); + + List actionPayloadData = Arrays.asList( + actionPayload(id1), + actionPayload(id2), actionPayload(id2), + actionPayload(id3), actionPayload(id3), actionPayload(id3) + ); + Dataset actionPayloadDS = spark.createDataset(actionPayloadData, Encoders.STRING()); + + BiFunction, Dataset, Column> entityToActionPayloadJoinExpr = (left, right) -> + left.col("id").equalTo(get_json_object(right.col("value"), "$.id")); + BiFunction, OafEntityImpl> actionPayloadToEntityFn = + (BiFunction, OafEntityImpl> & Serializable) (s, clazz) -> { + try { + JsonNode jsonNode = new ObjectMapper().readTree(s); + String id = jsonNode.at("/id").asText(); + OafEntityImpl x = new OafEntityImpl(); + x.setId(id); + return x; + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + + // when + List results = PromoteActionSetFromHDFSFunctions + .joinEntitiesWithActionPayloadAndMerge(entityDS, + actionPayloadDS, + entityToActionPayloadJoinExpr, + actionPayloadToEntityFn, + OafEntityImpl.class) + .collectAsList(); + System.out.println(results.stream().map(x -> String.format("%s:%d", x.getId(), x.merged)).collect(Collectors.joining(","))); + + // then + assertEquals(7, results.size()); + results.forEach(result -> { + switch (result.getId()) { + case "id1": + assertEquals(2, result.merged); + break; + case "id2": + assertEquals(2, result.merged); + break; + case "id3": + assertEquals(2, result.merged); + break; + case "id4": + assertEquals(1, result.merged); + break; + } + }); + } + + public static class OafEntityImpl extends OafEntity { + private int merged = 1; + + @Override + public void mergeFrom(OafEntity e) { + merged += ((OafEntityImpl) e).merged; + } + + public int getMerged() { + return merged; + } + + public void setMerged(int merged) { + this.merged = merged; + } + } + + private static OafEntityImpl createOafEntityImpl(String id) { + OafEntityImpl x = new OafEntityImpl(); + x.setId(id); + return x; + } + + private static String actionPayload(String id) { + return String.format("{\"id\":\"%s\"}", id); + } +} diff --git a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJobTest.java b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJobTest.java index 3020d7b31..50b191ee2 100644 --- a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJobTest.java +++ b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJobTest.java @@ -1,7 +1,5 @@ package eu.dnetlib.dhp.actionmanager; -import org.apache.commons.io.FileUtils; -import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -14,42 +12,64 @@ import java.util.Objects; public class PromoteActionSetFromHDFSJobTest { private ClassLoader cl = getClass().getClassLoader(); private Path workingDir; - private Path inputActionSetDir; + private Path inputDir; private Path outputDir; @Before public void before() throws IOException { workingDir = Files.createTempDirectory("promote_action_set"); - inputActionSetDir = workingDir.resolve("input"); + inputDir = workingDir.resolve("input"); outputDir = workingDir.resolve("output"); } - @After - public void after() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - } +// @After +// public void after() throws IOException { +// FileUtils.deleteDirectory(workingDir.toFile()); +// } @Test public void shouldReadAtomicActionsFromHDFSAndWritePartitionedAsParquetFiles() throws Exception { // given // NOTE: test resource should contain atomic actions in a human readable form, probably as json files; here the // files should be converted to a serialized format and written out to workingDir/input - // for current testing: actions from software export, given as sequence file are copied to workingDir/input/ - Path exportedActionSetDir = Paths.get(Objects.requireNonNull(cl.getResource("entities/entities_software")).getFile()); - Path inputDir = inputActionSetDir.resolve("entities_software"); - Files.createDirectories(inputDir); - copyFiles(exportedActionSetDir, inputDir); + // for current testing: actions from iis export, given as sequence file are copied to workingDir/input/ + + //graph + Path inputGraphDir = inputDir.resolve("graph"); + Files.createDirectories(inputGraphDir); + copyFiles(Paths.get(Objects.requireNonNull(cl.getResource("graph")).getFile()), inputGraphDir); + + //actions + Path inputActionsDir = inputDir.resolve("actions"); + Files.createDirectories(inputActionsDir); + + Path inputEntitiesPatentDir = inputActionsDir.resolve("entities_patent"); + Files.createDirectories(inputEntitiesPatentDir); + copyFiles(Paths.get(Objects.requireNonNull(cl.getResource("actions/entities_patent")).getFile()), inputEntitiesPatentDir); + + Path inputEntitiesSoftwareDir = inputActionsDir.resolve("entities_software"); + Files.createDirectories(inputEntitiesSoftwareDir); + copyFiles(Paths.get(Objects.requireNonNull(cl.getResource("actions/entities_software")).getFile()), inputEntitiesSoftwareDir); + + String inputActionSetPaths = String.join(",", inputEntitiesSoftwareDir.toString()); //inputEntitiesPatentDir.toString(), + PromoteActionSetFromHDFSJob.main(new String[]{ - "-mt", "local[*]", - "-i", inputDir.toString(), - "-o", outputDir.toString() + "-master", "local[*]", + "-inputGraphPath", inputGraphDir.toString(), + "-inputActionSetPaths", inputActionSetPaths, + "-outputGraphPath", outputDir.toString() }); } private static void copyFiles(Path source, Path target) throws IOException { Files.list(source).forEach(f -> { try { - Files.copy(f, target.resolve(f.getFileName())); + if (Files.isDirectory(f)) { + Path subTarget = Files.createDirectories(target.resolve(f.getFileName())); + copyFiles(f, subTarget); + } else { + Files.copy(f, target.resolve(f.getFileName())); + } } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e);