diff --git a/dhp-workflows/dhp-actionmanager/pom.xml b/dhp-workflows/dhp-actionmanager/pom.xml
index be76db7558..281bcda4a3 100644
--- a/dhp-workflows/dhp-actionmanager/pom.xml
+++ b/dhp-workflows/dhp-actionmanager/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp-workflows
- 1.0.5-SNAPSHOT
+ 1.1.6-SNAPSHOT
dhp-actionmanager
@@ -52,11 +52,34 @@
2.25.0
test
+
eu.dnetlib.dhp
dhp-schemas
- 1.0.5-SNAPSHOT
- compile
+ ${project.version}
+
+
+
+ eu.dnetlib
+ dnet-actionmanager-common
+ [6.0.0, 7.0.0)
+
+
+
+ apache
+ commons-logging
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+
+
+
+ eu.dnetlib
+ dnet-openaire-data-protos
+ [3.0.0, 4.0.0)
diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSFunctions.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSFunctions.java
new file mode 100644
index 0000000000..c1f2e4c117
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSFunctions.java
@@ -0,0 +1,45 @@
+package eu.dnetlib.dhp.actionmanager;
+
+import eu.dnetlib.dhp.schema.oaf.OafEntity;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.ReduceFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import scala.Tuple2;
+
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+public class PromoteActionSetFromHDFSFunctions {
+
+ public static Dataset groupEntitiesByIdAndMerge(Dataset entityDS,
+ Class clazz) {
+ return entityDS
+ .groupByKey((MapFunction) OafEntity::getId, Encoders.STRING())
+ .reduceGroups((ReduceFunction) (x1, x2) -> {
+ x1.mergeFrom(x2);
+ return x1;
+ })
+ .map((MapFunction, T>) pair -> pair._2, Encoders.bean(clazz));
+ }
+
+ public static Dataset joinEntitiesWithActionPayloadAndMerge(Dataset entityDS,
+ Dataset actionPayloadDS,
+ BiFunction, Dataset, Column> entityToActionPayloadJoinExpr,
+ BiFunction, T> actionPayloadToEntityFn,
+ Class clazz) {
+ return entityDS
+ .joinWith(actionPayloadDS, entityToActionPayloadJoinExpr.apply(entityDS, actionPayloadDS), "left_outer")
+ .map((MapFunction, T>) pair -> Optional
+ .ofNullable(pair._2())
+ .map(x -> {
+ T entity = actionPayloadToEntityFn.apply(x, clazz);
+ pair._1().mergeFrom(entity);
+ return pair._1();
+ })
+ .orElse(pair._1()), Encoders.bean(clazz));
+ }
+
+
+}
diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJob.java
index e21dd2acef..099b0b720d 100644
--- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJob.java
+++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJob.java
@@ -1,24 +1,24 @@
package eu.dnetlib.dhp.actionmanager;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.data.proto.OafProtos;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.schema.oaf.OafEntity;
-import eu.dnetlib.dhp.schema.oaf.Software;
+import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.types.*;
-import scala.Tuple2;
import java.io.IOException;
-import java.util.Collections;
+import java.util.Arrays;
+import java.util.List;
+import static eu.dnetlib.dhp.actionmanager.PromoteActionSetFromHDFSFunctions.*;
import static org.apache.spark.sql.functions.*;
public class PromoteActionSetFromHDFSJob {
@@ -28,73 +28,209 @@ public class PromoteActionSetFromHDFSJob {
PromoteActionSetFromHDFSJob.class
.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json")));
parser.parseArgument(args);
- String inputActionSetPath = parser.get("input");
- String outputPath = parser.get("output");
- final SparkConf conf = new SparkConf();
+ String inputGraphPath = parser.get("inputGraphPath");
+ List inputActionSetPaths = Arrays.asList(parser.get("inputActionSetPaths").split(","));
+ String outputGraphPath = parser.get("outputGraphPath");
+
+ SparkConf conf = new SparkConf();
conf.setMaster(parser.get("master"));
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
try (SparkSession spark = SparkSession.builder().config(conf).getOrCreate()) {
- // reading actions as RDD
- JavaRDD actionsRDD = JavaSparkContext
- .fromSparkContext(spark.sparkContext())
- .sequenceFile(inputActionSetPath, Text.class, Text.class)
- .map(x -> RowFactory.create(x._2().toString()));
- // converting actions to DataFrame and deserializing content of TargetValue
- // using unbase64 on TargetValue content to get String representation
- StructType rowSchema = StructType$.MODULE$.apply(
- Collections.singletonList(
- StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty())
- ));
- Dataset deserializedTargetValue = spark.createDataFrame(actionsRDD, rowSchema)
- .withColumn("TargetValue", get_json_object(col("value"), "$.TargetValue"))
- .select(unbase64(col("TargetValue")).cast(DataTypes.StringType).as("target_value_json"))
+ // ----- READ -----
+ // dataset
+ Dataset datasetDS = readGraphTable(
+ spark, String.format("%s/dataset", inputGraphPath), eu.dnetlib.dhp.schema.oaf.Dataset.class)
.cache();
+ datasetDS.printSchema();
+ datasetDS.show();
- // printing: only for testing
- deserializedTargetValue.printSchema();
- deserializedTargetValue.show();
- System.out.println(deserializedTargetValue.first().toString());
+ // datasource
+ Dataset datasourceDS =
+ readGraphTable(spark, String.format("%s/datasource", inputGraphPath), Datasource.class)
+ .cache();
+ datasourceDS.printSchema();
+ datasourceDS.show();
- // grouping and merging: should be generic
- Dataset softwareDS = deserializedTargetValue
- .map((MapFunction) PromoteActionSetFromHDFSJob::rowToOafEntity, Encoders.kryo(Software.class))
- .groupByKey((MapFunction) OafEntity::getId, Encoders.STRING())
- .reduceGroups((ReduceFunction) (software1, software2) -> {
- software1.mergeFrom(software2);
- return software1;
- })
- .map((MapFunction, Software>) pair -> pair._2, Encoders.kryo(Software.class));
+ // organization
+ Dataset organizationDS =
+ readGraphTable(spark, String.format("%s/organization", inputGraphPath), Organization.class)
+ .cache();
+ organizationDS.printSchema();
+ organizationDS.show();
+ // otherresearchproduct
+ Dataset otherResearchProductDS =
+ readGraphTable(spark, String.format("%s/otherresearchproduct", inputGraphPath), OtherResearchProduct.class)
+ .cache();
+ otherResearchProductDS.printSchema();
+ otherResearchProductDS.show();
+
+ // project
+ Dataset projectDS =
+ readGraphTable(spark, String.format("%s/project", inputGraphPath), Project.class)
+ .cache();
+ projectDS.printSchema();
+ projectDS.show();
+
+ // publication
+ Dataset publicationDS =
+ readGraphTable(spark, String.format("%s/publication", inputGraphPath), Publication.class)
+ .cache();
+ publicationDS.printSchema();
+ publicationDS.show();
+
+ // relation
+ Dataset relationDS =
+ readGraphTable(spark, String.format("%s/relation", inputGraphPath), Relation.class)
+ .cache();
+ relationDS.printSchema();
+ relationDS.show();
+
+ // software
+ Dataset softwareDS =
+ readGraphTable(spark, String.format("%s/software", inputGraphPath), Software.class)
+ .cache();
softwareDS.printSchema();
softwareDS.show();
- // save
-// softwareDS.toDF()
-// .write()
-// .partitionBy("id")
-// .save(outputPath);
+ // actions
+ Dataset actionPayloadDS = inputActionSetPaths.stream()
+ .map(inputActionSetPath -> readActionSetPayload(spark, inputActionSetPath))
+ .reduce(Dataset::union)
+ .get()
+ .cache();
+ actionPayloadDS.printSchema();
+ actionPayloadDS.show();
+ System.out.println(String.join("\n", actionPayloadDS.takeAsList(20)));
- // another approach: using only DataFrames i.e. DataSet, not DataSets
+ Dataset relationActionPayloadDS = filterActionPayloadForRelations(actionPayloadDS)
+ .cache();
+ relationActionPayloadDS.printSchema();
+ relationActionPayloadDS.show();
+
+ Dataset entityActionPayloadDS = filterActionPayloadForEntity(actionPayloadDS)
+ .cache();
+ entityActionPayloadDS.printSchema();
+ entityActionPayloadDS.show();
+
+ // ----- LOGIC -----
+ Dataset processedDatasetDS =
+ processEntityDS(datasetDS, entityActionPayloadDS, eu.dnetlib.dhp.schema.oaf.Dataset.class);
+ Dataset processedDatasourceDS =
+ processEntityDS(datasourceDS, entityActionPayloadDS, Datasource.class);
+ Dataset processedOrganizationDS =
+ processEntityDS(organizationDS, entityActionPayloadDS, Organization.class);
+ Dataset processedOtherResearchProductDS =
+ processEntityDS(otherResearchProductDS, entityActionPayloadDS, OtherResearchProduct.class);
+ Dataset processedProjectDS =
+ processEntityDS(projectDS, entityActionPayloadDS, Project.class);
+ Dataset processedPublicationDS =
+ processEntityDS(publicationDS, entityActionPayloadDS, Publication.class);
+ Dataset processedRelationDS =
+ processRelationDS(relationDS, relationActionPayloadDS);
+ Dataset processedSoftwareDS =
+ processEntityDS(softwareDS, entityActionPayloadDS, Software.class);
+
+ // ----- SAVE -----
+ processedDatasetDS.write()
+ .save(String.format("%s/dataset", outputGraphPath));
+ processedDatasourceDS.write()
+ .save(String.format("%s/datasource", outputGraphPath));
+ processedOrganizationDS.write()
+ .save(String.format("%s/organization", outputGraphPath));
+ processedOtherResearchProductDS.write()
+ .save(String.format("%s/otherresearchproduct", outputGraphPath));
+ processedProjectDS.write()
+ .save(String.format("%s/project", outputGraphPath));
+ processedPublicationDS.write()
+ .save(String.format("%s/publication", outputGraphPath));
+ processedRelationDS.write()
+ .save(String.format("%s/relation", outputGraphPath));
+ processedSoftwareDS.write()
+ .save(String.format("%s/software", outputGraphPath));
}
}
- private static Software rowToOafEntity(Row row) {
- // converts row with JSON into Software object: should be generic
- // currently extracts only "entity.id" field from JSON
- ObjectMapper objectMapper = new ObjectMapper();
+ private static final StructType KV_SCHEMA = StructType$.MODULE$.apply(
+ Arrays.asList(
+ StructField$.MODULE$.apply("key", DataTypes.StringType, false, Metadata.empty()),
+ StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty())
+ ));
+
+ private static Dataset readGraphTable(SparkSession spark, String path, Class clazz) {
+ JavaRDD rows = JavaSparkContext
+ .fromSparkContext(spark.sparkContext())
+ .sequenceFile(path, Text.class, Text.class)
+ .map(x -> RowFactory.create(x._1().toString(), x._2().toString()));
+
+ return spark.createDataFrame(rows, KV_SCHEMA)
+ .map((MapFunction) row -> new ObjectMapper().readValue(row.getAs("value"), clazz),
+ Encoders.bean(clazz));
+ }
+
+ private static Dataset readActionSetPayload(SparkSession spark, String inputActionSetPath) {
+ JavaRDD actionsRDD = JavaSparkContext
+ .fromSparkContext(spark.sparkContext())
+ .sequenceFile(inputActionSetPath, Text.class, Text.class)
+ .map(x -> RowFactory.create(x._1().toString(), x._2().toString()));
+
+ return spark.createDataFrame(actionsRDD, KV_SCHEMA)
+ .select(unbase64(get_json_object(col("value"), "$.TargetValue"))
+ .cast(DataTypes.StringType).as("target_value_json"))
+ .as(Encoders.STRING());
+ }
+
+ private static Dataset filterActionPayloadForRelations(Dataset actionPayloadDS) {
+ return actionPayloadDS
+ .where(get_json_object(col("target_value_json"), "$.kind").equalTo("relation"));
+ }
+
+ private static Dataset filterActionPayloadForEntity(Dataset actionPayloadDS) {
+ return actionPayloadDS
+ .where(get_json_object(col("target_value_json"), "$.kind").equalTo("entity"));
+ }
+
+
+ private static Dataset processEntityDS(Dataset entityDS,
+ Dataset actionPayloadDS,
+ Class clazz) {
+ Dataset groupedAndMerged = groupEntitiesByIdAndMerge(entityDS, clazz);
+ Dataset joinedAndMerged = joinEntitiesWithActionPayloadAndMerge(groupedAndMerged,
+ actionPayloadDS,
+ PromoteActionSetFromHDFSJob::entityToActionPayloadJoinExpr,
+ PromoteActionSetFromHDFSJob::actionPayloadToEntity,
+ clazz);
+ return groupEntitiesByIdAndMerge(joinedAndMerged, clazz);
+ }
+
+ private static Column entityToActionPayloadJoinExpr(Dataset left,
+ Dataset right) {
+ return left.col("id").equalTo(
+ get_json_object(right.col("target_value_json"), "$.entity.id"));
+ }
+
+ public static T actionPayloadToEntity(String actionPayload,
+ Class clazz) {
try {
- JsonNode jsonNode = objectMapper.readTree(row.getString(0));
- String id = jsonNode.at("/entity/id").asText();
- Software software = new Software();
- software.setId(id);
- return software;
+ OafProtos.Oaf oldEntity = new ObjectMapper().readValue(actionPayload, OafProtos.Oaf.class);
+ return entityOldToNew(oldEntity, clazz);
} catch (IOException e) {
- e.printStackTrace();
throw new RuntimeException(e);
}
}
+ //TODO
+ private static T entityOldToNew(OafProtos.Oaf old,
+ Class clazz) {
+ return null;
+ }
+
+ //TODO
+ private static Dataset processRelationDS(Dataset relationDS,
+ Dataset actionPayloadDS) {
+ return null;
+ }
}
diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json
index 3b95c90d38..3dc02ef8c3 100644
--- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json
+++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json
@@ -6,15 +6,21 @@
"paramRequired": true
},
{
- "paramName": "i",
- "paramLongName": "input",
- "paramDescription": "the path of the input sequential file to read",
+ "paramName": "ig",
+ "paramLongName": "inputGraphPath",
+ "paramDescription": "#TODO: input graph path",
"paramRequired": true
},
{
- "paramName": "o",
- "paramLongName": "output",
- "paramDescription": "the path of the result DataFrame on HDFS",
+ "paramName": "ia",
+ "paramLongName": "inputActionSetPaths",
+ "paramDescription": "#TODO: comma separated list of paths to input action sets",
+ "paramRequired": true
+ },
+ {
+ "paramName": "og",
+ "paramLongName": "outputGraphPath",
+ "paramDescription": "#TODO: the path of the result DataFrame on HDFS",
"paramRequired": true
}
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSFunctionsTest.java b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSFunctionsTest.java
new file mode 100644
index 0000000000..f5db613475
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSFunctionsTest.java
@@ -0,0 +1,169 @@
+package eu.dnetlib.dhp.actionmanager;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.schema.oaf.OafEntity;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SparkSession;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import static org.apache.spark.sql.functions.get_json_object;
+import static org.junit.Assert.assertEquals;
+
+public class PromoteActionSetFromHDFSFunctionsTest {
+
+ private static SparkSession spark;
+
+ @BeforeClass
+ public static void beforeClass() {
+ SparkConf conf = new SparkConf();
+ conf.setMaster("local");
+ conf.setAppName(PromoteActionSetFromHDFSFunctionsTest.class.getSimpleName());
+ conf.set("spark.driver.host", "localhost");
+ spark = SparkSession.builder().config(conf).getOrCreate();
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ spark.stop();
+ }
+
+ @Test
+ public void shouldGroupOafEntitiesByIdAndMergeWithinGroup() {
+ // given
+ String id1 = "id1";
+ String id2 = "id2";
+ String id3 = "id3";
+ List entityData = Arrays.asList(
+ createOafEntityImpl(id1),
+ createOafEntityImpl(id2), createOafEntityImpl(id2),
+ createOafEntityImpl(id3), createOafEntityImpl(id3), createOafEntityImpl(id3)
+ );
+ Dataset entityDS = spark.createDataset(entityData, Encoders.bean(OafEntityImpl.class));
+
+ // when
+ List results = PromoteActionSetFromHDFSFunctions
+ .groupEntitiesByIdAndMerge(entityDS, OafEntityImpl.class)
+ .collectAsList();
+ System.out.println(results.stream().map(x -> String.format("%s:%d", x.getId(), x.merged)).collect(Collectors.joining(",")));
+
+ // then
+ assertEquals(3, results.size());
+ results.forEach(result -> {
+ switch (result.getId()) {
+ case "id1":
+ assertEquals(1, result.merged);
+ break;
+ case "id2":
+ assertEquals(2, result.merged);
+ break;
+ case "id3":
+ assertEquals(3, result.merged);
+ break;
+ }
+ });
+ }
+
+ @Test
+ public void shouldJoinWithActionPayloadUsingIdAndMerge() {
+ // given
+ String id1 = "id1";
+ String id2 = "id2";
+ String id3 = "id3";
+ String id4 = "id4";
+ List entityData = Arrays.asList(
+ createOafEntityImpl(id1), createOafEntityImpl(id2), createOafEntityImpl(id3), createOafEntityImpl(id4)
+ );
+ Dataset entityDS = spark.createDataset(entityData, Encoders.bean(OafEntityImpl.class));
+
+ List actionPayloadData = Arrays.asList(
+ actionPayload(id1),
+ actionPayload(id2), actionPayload(id2),
+ actionPayload(id3), actionPayload(id3), actionPayload(id3)
+ );
+ Dataset actionPayloadDS = spark.createDataset(actionPayloadData, Encoders.STRING());
+
+ BiFunction, Dataset, Column> entityToActionPayloadJoinExpr = (left, right) ->
+ left.col("id").equalTo(get_json_object(right.col("value"), "$.id"));
+ BiFunction, OafEntityImpl> actionPayloadToEntityFn =
+ (BiFunction, OafEntityImpl> & Serializable) (s, clazz) -> {
+ try {
+ JsonNode jsonNode = new ObjectMapper().readTree(s);
+ String id = jsonNode.at("/id").asText();
+ OafEntityImpl x = new OafEntityImpl();
+ x.setId(id);
+ return x;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ // when
+ List results = PromoteActionSetFromHDFSFunctions
+ .joinEntitiesWithActionPayloadAndMerge(entityDS,
+ actionPayloadDS,
+ entityToActionPayloadJoinExpr,
+ actionPayloadToEntityFn,
+ OafEntityImpl.class)
+ .collectAsList();
+ System.out.println(results.stream().map(x -> String.format("%s:%d", x.getId(), x.merged)).collect(Collectors.joining(",")));
+
+ // then
+ assertEquals(7, results.size());
+ results.forEach(result -> {
+ switch (result.getId()) {
+ case "id1":
+ assertEquals(2, result.merged);
+ break;
+ case "id2":
+ assertEquals(2, result.merged);
+ break;
+ case "id3":
+ assertEquals(2, result.merged);
+ break;
+ case "id4":
+ assertEquals(1, result.merged);
+ break;
+ }
+ });
+ }
+
+ public static class OafEntityImpl extends OafEntity {
+ private int merged = 1;
+
+ @Override
+ public void mergeFrom(OafEntity e) {
+ merged += ((OafEntityImpl) e).merged;
+ }
+
+ public int getMerged() {
+ return merged;
+ }
+
+ public void setMerged(int merged) {
+ this.merged = merged;
+ }
+ }
+
+ private static OafEntityImpl createOafEntityImpl(String id) {
+ OafEntityImpl x = new OafEntityImpl();
+ x.setId(id);
+ return x;
+ }
+
+ private static String actionPayload(String id) {
+ return String.format("{\"id\":\"%s\"}", id);
+ }
+}
diff --git a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJobTest.java b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJobTest.java
index 3020d7b310..50b191ee25 100644
--- a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJobTest.java
+++ b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJobTest.java
@@ -1,7 +1,5 @@
package eu.dnetlib.dhp.actionmanager;
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -14,42 +12,64 @@ import java.util.Objects;
public class PromoteActionSetFromHDFSJobTest {
private ClassLoader cl = getClass().getClassLoader();
private Path workingDir;
- private Path inputActionSetDir;
+ private Path inputDir;
private Path outputDir;
@Before
public void before() throws IOException {
workingDir = Files.createTempDirectory("promote_action_set");
- inputActionSetDir = workingDir.resolve("input");
+ inputDir = workingDir.resolve("input");
outputDir = workingDir.resolve("output");
}
- @After
- public void after() throws IOException {
- FileUtils.deleteDirectory(workingDir.toFile());
- }
+// @After
+// public void after() throws IOException {
+// FileUtils.deleteDirectory(workingDir.toFile());
+// }
@Test
public void shouldReadAtomicActionsFromHDFSAndWritePartitionedAsParquetFiles() throws Exception {
// given
// NOTE: test resource should contain atomic actions in a human readable form, probably as json files; here the
// files should be converted to a serialized format and written out to workingDir/input
- // for current testing: actions from software export, given as sequence file are copied to workingDir/input/
- Path exportedActionSetDir = Paths.get(Objects.requireNonNull(cl.getResource("entities/entities_software")).getFile());
- Path inputDir = inputActionSetDir.resolve("entities_software");
- Files.createDirectories(inputDir);
- copyFiles(exportedActionSetDir, inputDir);
+ // for current testing: actions from iis export, given as sequence file are copied to workingDir/input/
+
+ //graph
+ Path inputGraphDir = inputDir.resolve("graph");
+ Files.createDirectories(inputGraphDir);
+ copyFiles(Paths.get(Objects.requireNonNull(cl.getResource("graph")).getFile()), inputGraphDir);
+
+ //actions
+ Path inputActionsDir = inputDir.resolve("actions");
+ Files.createDirectories(inputActionsDir);
+
+ Path inputEntitiesPatentDir = inputActionsDir.resolve("entities_patent");
+ Files.createDirectories(inputEntitiesPatentDir);
+ copyFiles(Paths.get(Objects.requireNonNull(cl.getResource("actions/entities_patent")).getFile()), inputEntitiesPatentDir);
+
+ Path inputEntitiesSoftwareDir = inputActionsDir.resolve("entities_software");
+ Files.createDirectories(inputEntitiesSoftwareDir);
+ copyFiles(Paths.get(Objects.requireNonNull(cl.getResource("actions/entities_software")).getFile()), inputEntitiesSoftwareDir);
+
+ String inputActionSetPaths = String.join(",", inputEntitiesSoftwareDir.toString()); //inputEntitiesPatentDir.toString(),
+
PromoteActionSetFromHDFSJob.main(new String[]{
- "-mt", "local[*]",
- "-i", inputDir.toString(),
- "-o", outputDir.toString()
+ "-master", "local[*]",
+ "-inputGraphPath", inputGraphDir.toString(),
+ "-inputActionSetPaths", inputActionSetPaths,
+ "-outputGraphPath", outputDir.toString()
});
}
private static void copyFiles(Path source, Path target) throws IOException {
Files.list(source).forEach(f -> {
try {
- Files.copy(f, target.resolve(f.getFileName()));
+ if (Files.isDirectory(f)) {
+ Path subTarget = Files.createDirectories(target.resolve(f.getFileName()));
+ copyFiles(f, subTarget);
+ } else {
+ Files.copy(f, target.resolve(f.getFileName()));
+ }
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);