forked from antonis.lempesis/dnet-hadoop
Merge branch 'master' of code-repo.d4science.org:D-Net/dnet-hadoop
This commit is contained in:
commit
01ea7721f3
|
@ -29,17 +29,21 @@ public class ISClient implements Serializable {
|
||||||
|
|
||||||
private static final String INPUT_ACTION_SET_ID_SEPARATOR = ",";
|
private static final String INPUT_ACTION_SET_ID_SEPARATOR = ",";
|
||||||
|
|
||||||
public static List<String> getLatestRawsetPaths(String isLookupUrl, String setIds) {
|
private ISLookUpService isLookup;
|
||||||
|
|
||||||
|
public ISClient(String isLookupUrl) {
|
||||||
|
isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getLatestRawsetPaths(String setIds) {
|
||||||
|
|
||||||
ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
|
||||||
ISClient isClient = new ISClient();
|
|
||||||
List<String> ids = Lists.newArrayList(Splitter.on(INPUT_ACTION_SET_ID_SEPARATOR)
|
List<String> ids = Lists.newArrayList(Splitter.on(INPUT_ACTION_SET_ID_SEPARATOR)
|
||||||
.omitEmptyStrings()
|
.omitEmptyStrings()
|
||||||
.trimResults()
|
.trimResults()
|
||||||
.split(setIds));
|
.split(setIds));
|
||||||
|
|
||||||
return ids.stream()
|
return ids.stream()
|
||||||
.map(id -> isClient.getSet(isLookup, id))
|
.map(id -> getSet(isLookup, id))
|
||||||
.map(as -> as.getPathToLatest())
|
.map(as -> as.getPathToLatest())
|
||||||
.collect(Collectors.toCollection(ArrayList::new));
|
.collect(Collectors.toCollection(ArrayList::new));
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,15 @@ public class PartitionActionSetsByPayloadTypeJob {
|
||||||
StructField$.MODULE$.apply("payload", DataTypes.StringType, false, Metadata.empty())
|
StructField$.MODULE$.apply("payload", DataTypes.StringType, false, Metadata.empty())
|
||||||
));
|
));
|
||||||
|
|
||||||
|
private ISClient isClient;
|
||||||
|
|
||||||
|
public PartitionActionSetsByPayloadTypeJob(String isLookupUrl) {
|
||||||
|
this.isClient = new ISClient(isLookupUrl);
|
||||||
|
}
|
||||||
|
|
||||||
|
public PartitionActionSetsByPayloadTypeJob() {
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
String jsonConfiguration = IOUtils.toString(
|
String jsonConfiguration = IOUtils.toString(
|
||||||
PromoteActionPayloadForGraphTableJob.class
|
PromoteActionPayloadForGraphTableJob.class
|
||||||
|
@ -63,7 +72,12 @@ public class PartitionActionSetsByPayloadTypeJob {
|
||||||
String isLookupUrl = parser.get("isLookupUrl");
|
String isLookupUrl = parser.get("isLookupUrl");
|
||||||
logger.info("isLookupUrl: {}", isLookupUrl);
|
logger.info("isLookupUrl: {}", isLookupUrl);
|
||||||
|
|
||||||
List<String> inputActionSetPaths = ISClient.getLatestRawsetPaths(isLookupUrl, inputActionSetIds);
|
new PartitionActionSetsByPayloadTypeJob(isLookupUrl).run(isSparkSessionManaged, inputActionSetIds, outputPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void run(Boolean isSparkSessionManaged, String inputActionSetIds, String outputPath) {
|
||||||
|
|
||||||
|
List<String> inputActionSetPaths = getIsClient().getLatestRawsetPaths(inputActionSetIds);
|
||||||
logger.info("inputActionSetPaths: {}", String.join(",", inputActionSetPaths));
|
logger.info("inputActionSetPaths: {}", String.join(",", inputActionSetPaths));
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
|
@ -95,21 +109,15 @@ public class PartitionActionSetsByPayloadTypeJob {
|
||||||
String path) {
|
String path) {
|
||||||
logger.info("Reading actions from path: {}", path);
|
logger.info("Reading actions from path: {}", path);
|
||||||
|
|
||||||
List<String> files = HdfsSupport.listFiles(path, spark.sparkContext().hadoopConfiguration());
|
|
||||||
logger.info("Found files: {}", String.join(",", files));
|
|
||||||
|
|
||||||
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
return files
|
|
||||||
.stream()
|
JavaRDD<Row> rdd = sc
|
||||||
.map(file -> {
|
.sequenceFile(path, Text.class, Text.class)
|
||||||
JavaRDD<Row> rdd = sc
|
.map(x -> RowFactory.create(x._1().toString(), x._2().toString()));
|
||||||
.sequenceFile(file, Text.class, Text.class)
|
|
||||||
.map(x -> RowFactory.create(x._1().toString(), x._2().toString()));
|
return spark.createDataFrame(rdd, KV_SCHEMA)
|
||||||
return spark.createDataFrame(rdd, KV_SCHEMA)
|
.withColumn("atomic_action", from_json(col("value"), ATOMIC_ACTION_SCHEMA))
|
||||||
.withColumn("atomic_action", from_json(col("value"), ATOMIC_ACTION_SCHEMA))
|
.select(expr("atomic_action.*"));
|
||||||
.select(expr("atomic_action.*"));
|
|
||||||
})
|
|
||||||
.reduce(spark.createDataFrame(Collections.emptyList(), ATOMIC_ACTION_SCHEMA), Dataset::union);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void saveActions(Dataset<Row> actionDS,
|
private static void saveActions(Dataset<Row> actionDS,
|
||||||
|
@ -121,4 +129,12 @@ public class PartitionActionSetsByPayloadTypeJob {
|
||||||
.mode(SaveMode.Append)
|
.mode(SaveMode.Append)
|
||||||
.parquet(path);
|
.parquet(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ISClient getIsClient() {
|
||||||
|
return isClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setIsClient(ISClient isClient) {
|
||||||
|
this.isClient = isClient;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
|
@ -119,10 +120,17 @@ public class PromoteActionPayloadForGraphTableJob {
|
||||||
String path,
|
String path,
|
||||||
Class<G> rowClazz) {
|
Class<G> rowClazz) {
|
||||||
logger.info("Reading graph table from path: {}", path);
|
logger.info("Reading graph table from path: {}", path);
|
||||||
|
|
||||||
|
return spark.read()
|
||||||
|
.textFile(path)
|
||||||
|
.map((MapFunction<String, G>) value -> OBJECT_MAPPER.readValue(value, rowClazz), Encoders.bean(rowClazz));
|
||||||
|
|
||||||
|
/*
|
||||||
return spark
|
return spark
|
||||||
.read()
|
.read()
|
||||||
.parquet(path)
|
.parquet(path)
|
||||||
.as(Encoders.bean(rowClazz));
|
.as(Encoders.bean(rowClazz));
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <A extends Oaf> Dataset<A> readActionPayload(SparkSession spark,
|
private static <A extends Oaf> Dataset<A> readActionPayload(SparkSession spark,
|
||||||
|
|
|
@ -118,6 +118,9 @@
|
||||||
|
|
||||||
<action name="SkipPromoteDatasetActionPayloadForDatasetTable">
|
<action name="SkipPromoteDatasetActionPayloadForDatasetTable">
|
||||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<prepare>
|
||||||
|
<delete path="${workingDir}/dataset"/>
|
||||||
|
</prepare>
|
||||||
<arg>-pb</arg>
|
<arg>-pb</arg>
|
||||||
<arg>${inputGraphRootPath}/dataset</arg>
|
<arg>${inputGraphRootPath}/dataset</arg>
|
||||||
<arg>${workingDir}/dataset</arg>
|
<arg>${workingDir}/dataset</arg>
|
||||||
|
@ -166,6 +169,9 @@
|
||||||
|
|
||||||
<action name="SkipPromoteResultActionPayloadForDatasetTable">
|
<action name="SkipPromoteResultActionPayloadForDatasetTable">
|
||||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<prepare>
|
||||||
|
<delete path="${outputGraphRootPath}/dataset"/>
|
||||||
|
</prepare>
|
||||||
<arg>-pb</arg>
|
<arg>-pb</arg>
|
||||||
<arg>${workingDir}/dataset</arg>
|
<arg>${workingDir}/dataset</arg>
|
||||||
<arg>${outputGraphRootPath}/dataset</arg>
|
<arg>${outputGraphRootPath}/dataset</arg>
|
||||||
|
|
|
@ -113,6 +113,9 @@
|
||||||
|
|
||||||
<action name="SkipPromoteDatasourceActionPayloadForDatasourceTable">
|
<action name="SkipPromoteDatasourceActionPayloadForDatasourceTable">
|
||||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<prepare>
|
||||||
|
<delete path="${outputGraphRootPath}/datasource"/>
|
||||||
|
</prepare>
|
||||||
<arg>-pb</arg>
|
<arg>-pb</arg>
|
||||||
<arg>${inputGraphRootPath}/datasource</arg>
|
<arg>${inputGraphRootPath}/datasource</arg>
|
||||||
<arg>${outputGraphRootPath}/datasource</arg>
|
<arg>${outputGraphRootPath}/datasource</arg>
|
||||||
|
|
|
@ -113,6 +113,9 @@
|
||||||
|
|
||||||
<action name="SkipPromoteOrganizationActionPayloadForOrganizationTable">
|
<action name="SkipPromoteOrganizationActionPayloadForOrganizationTable">
|
||||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<prepare>
|
||||||
|
<delete path="${outputGraphRootPath}/organization"/>
|
||||||
|
</prepare>
|
||||||
<arg>-pb</arg>
|
<arg>-pb</arg>
|
||||||
<arg>${inputGraphRootPath}/organization</arg>
|
<arg>${inputGraphRootPath}/organization</arg>
|
||||||
<arg>${outputGraphRootPath}/organization</arg>
|
<arg>${outputGraphRootPath}/organization</arg>
|
||||||
|
|
|
@ -117,6 +117,9 @@
|
||||||
|
|
||||||
<action name="SkipPromoteOtherResearchProductActionPayloadForOtherResearchProductTable">
|
<action name="SkipPromoteOtherResearchProductActionPayloadForOtherResearchProductTable">
|
||||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<prepare>
|
||||||
|
<delete path="${workingDir}/otherresearchproduct"/>
|
||||||
|
</prepare>
|
||||||
<arg>-pb</arg>
|
<arg>-pb</arg>
|
||||||
<arg>${inputGraphRootPath}/otherresearchproduct</arg>
|
<arg>${inputGraphRootPath}/otherresearchproduct</arg>
|
||||||
<arg>${workingDir}/otherresearchproduct</arg>
|
<arg>${workingDir}/otherresearchproduct</arg>
|
||||||
|
@ -165,6 +168,9 @@
|
||||||
|
|
||||||
<action name="SkipPromoteResultActionPayloadForOtherResearchProductTable">
|
<action name="SkipPromoteResultActionPayloadForOtherResearchProductTable">
|
||||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<prepare>
|
||||||
|
<delete path="${outputGraphRootPath}/otherresearchproduct"/>
|
||||||
|
</prepare>
|
||||||
<arg>-pb</arg>
|
<arg>-pb</arg>
|
||||||
<arg>${workingDir}/otherresearchproduct</arg>
|
<arg>${workingDir}/otherresearchproduct</arg>
|
||||||
<arg>${outputGraphRootPath}/otherresearchproduct</arg>
|
<arg>${outputGraphRootPath}/otherresearchproduct</arg>
|
||||||
|
|
|
@ -113,6 +113,9 @@
|
||||||
|
|
||||||
<action name="SkipPromoteProjectActionPayloadForProjectTable">
|
<action name="SkipPromoteProjectActionPayloadForProjectTable">
|
||||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<prepare>
|
||||||
|
<delete path="${outputGraphRootPath}/project"/>
|
||||||
|
</prepare>
|
||||||
<arg>-pb</arg>
|
<arg>-pb</arg>
|
||||||
<arg>${inputGraphRootPath}/project</arg>
|
<arg>${inputGraphRootPath}/project</arg>
|
||||||
<arg>${outputGraphRootPath}/project</arg>
|
<arg>${outputGraphRootPath}/project</arg>
|
||||||
|
|
|
@ -118,6 +118,9 @@
|
||||||
|
|
||||||
<action name="SkipPromotePublicationActionPayloadForPublicationTable">
|
<action name="SkipPromotePublicationActionPayloadForPublicationTable">
|
||||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<prepare>
|
||||||
|
<delete path="${workingDir}/publication"/>
|
||||||
|
</prepare>
|
||||||
<arg>-pb</arg>
|
<arg>-pb</arg>
|
||||||
<arg>${inputGraphRootPath}/publication</arg>
|
<arg>${inputGraphRootPath}/publication</arg>
|
||||||
<arg>${workingDir}/publication</arg>
|
<arg>${workingDir}/publication</arg>
|
||||||
|
@ -166,6 +169,9 @@
|
||||||
|
|
||||||
<action name="SkipPromoteResultActionPayloadForPublicationTable">
|
<action name="SkipPromoteResultActionPayloadForPublicationTable">
|
||||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<prepare>
|
||||||
|
<delete path="${outputGraphRootPath}/publication"/>
|
||||||
|
</prepare>
|
||||||
<arg>-pb</arg>
|
<arg>-pb</arg>
|
||||||
<arg>${workingDir}/publication</arg>
|
<arg>${workingDir}/publication</arg>
|
||||||
<arg>${outputGraphRootPath}/publication</arg>
|
<arg>${outputGraphRootPath}/publication</arg>
|
||||||
|
|
|
@ -114,6 +114,9 @@
|
||||||
|
|
||||||
<action name="SkipPromoteRelationActionPayloadForRelationTable">
|
<action name="SkipPromoteRelationActionPayloadForRelationTable">
|
||||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<prepare>
|
||||||
|
<delete path="${outputGraphRootPath}/relation"/>
|
||||||
|
</prepare>
|
||||||
<arg>-pb</arg>
|
<arg>-pb</arg>
|
||||||
<arg>${inputGraphRootPath}/relation</arg>
|
<arg>${inputGraphRootPath}/relation</arg>
|
||||||
<arg>${outputGraphRootPath}/relation</arg>
|
<arg>${outputGraphRootPath}/relation</arg>
|
||||||
|
|
|
@ -117,6 +117,9 @@
|
||||||
|
|
||||||
<action name="SkipPromoteSoftwareActionPayloadForSoftwareTable">
|
<action name="SkipPromoteSoftwareActionPayloadForSoftwareTable">
|
||||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<prepare>
|
||||||
|
<delete path="${workingDir}/software"/>
|
||||||
|
</prepare>
|
||||||
<arg>-pb</arg>
|
<arg>-pb</arg>
|
||||||
<arg>${inputGraphRootPath}/software</arg>
|
<arg>${inputGraphRootPath}/software</arg>
|
||||||
<arg>${workingDir}/software</arg>
|
<arg>${workingDir}/software</arg>
|
||||||
|
@ -165,6 +168,9 @@
|
||||||
|
|
||||||
<action name="SkipPromoteResultActionPayloadForSoftwareTable">
|
<action name="SkipPromoteResultActionPayloadForSoftwareTable">
|
||||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<prepare>
|
||||||
|
<delete path="${outputGraphRootPath}/software"/>
|
||||||
|
</prepare>
|
||||||
<arg>-pb</arg>
|
<arg>-pb</arg>
|
||||||
<arg>${workingDir}/software</arg>
|
<arg>${workingDir}/software</arg>
|
||||||
<arg>${outputGraphRootPath}/software</arg>
|
<arg>${outputGraphRootPath}/software</arg>
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package eu.dnetlib.dhp.actionmanager.partition;
|
package eu.dnetlib.dhp.actionmanager.partition;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import eu.dnetlib.dhp.actionmanager.ISClient;
|
||||||
import eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJobTest;
|
import eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJobTest;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -15,7 +17,11 @@ import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.apache.spark.sql.types.*;
|
import org.apache.spark.sql.types.*;
|
||||||
import org.junit.jupiter.api.*;
|
import org.junit.jupiter.api.*;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
import org.junit.jupiter.api.io.TempDir;
|
import org.junit.jupiter.api.io.TempDir;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
import scala.collection.mutable.Seq;
|
import scala.collection.mutable.Seq;
|
||||||
|
|
||||||
|
@ -31,6 +37,7 @@ import static org.apache.spark.sql.functions.*;
|
||||||
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
|
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
|
||||||
import static scala.collection.JavaConversions.mutableSeqAsJavaList;
|
import static scala.collection.JavaConversions.mutableSeqAsJavaList;
|
||||||
|
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
public class PartitionActionSetsByPayloadTypeJobTest {
|
public class PartitionActionSetsByPayloadTypeJobTest {
|
||||||
private static final ClassLoader cl = PartitionActionSetsByPayloadTypeJobTest.class.getClassLoader();
|
private static final ClassLoader cl = PartitionActionSetsByPayloadTypeJobTest.class.getClassLoader();
|
||||||
|
|
||||||
|
@ -64,20 +71,29 @@ public class PartitionActionSetsByPayloadTypeJobTest {
|
||||||
@Nested
|
@Nested
|
||||||
class Main {
|
class Main {
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private ISClient isClient;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldPartitionActionSetsByPayloadType(@TempDir Path workingDir) throws Exception {
|
public void shouldPartitionActionSetsByPayloadType(@TempDir Path workingDir) throws Exception {
|
||||||
// given
|
// given
|
||||||
Path inputActionSetsDir = workingDir.resolve("input").resolve("action_sets");
|
Path inputActionSetsBaseDir = workingDir.resolve("input").resolve("action_sets");
|
||||||
Path outputDir = workingDir.resolve("output");
|
Path outputDir = workingDir.resolve("output");
|
||||||
|
|
||||||
Map<String, List<String>> oafsByClassName = createActionSets(inputActionSetsDir);
|
Map<String, List<String>> oafsByClassName = createActionSets(inputActionSetsBaseDir);
|
||||||
|
|
||||||
|
List<String> inputActionSetsPaths = resolveInputActionSetPaths(inputActionSetsBaseDir);
|
||||||
|
|
||||||
// when
|
// when
|
||||||
PartitionActionSetsByPayloadTypeJob.main(new String[]{
|
Mockito.when(isClient.getLatestRawsetPaths(Mockito.anyString())).thenReturn(inputActionSetsPaths);
|
||||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
|
||||||
"-inputActionSetPaths", inputActionSetsDir.toString(),
|
PartitionActionSetsByPayloadTypeJob job = new PartitionActionSetsByPayloadTypeJob();
|
||||||
"-outputPath", outputDir.toString()
|
job.setIsClient(isClient);
|
||||||
});
|
job.run(
|
||||||
|
Boolean.FALSE,
|
||||||
|
"", // it can be empty we're mocking the response from isClient to resolve the paths
|
||||||
|
outputDir.toString()
|
||||||
|
);
|
||||||
|
|
||||||
// then
|
// then
|
||||||
Files.exists(outputDir);
|
Files.exists(outputDir);
|
||||||
|
@ -94,10 +110,19 @@ public class PartitionActionSetsByPayloadTypeJobTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<String> resolveInputActionSetPaths(Path inputActionSetsBaseDir) throws IOException {
|
||||||
|
Path inputActionSetJsonDumpsDir = getInputActionSetJsonDumpsDir();
|
||||||
|
return Files
|
||||||
|
.list(inputActionSetJsonDumpsDir)
|
||||||
|
.map(path -> {
|
||||||
|
String inputActionSetId = path.getFileName().toString();
|
||||||
|
return inputActionSetsBaseDir.resolve(inputActionSetId).toString();
|
||||||
|
})
|
||||||
|
.collect(Collectors.toCollection(ArrayList::new));
|
||||||
|
}
|
||||||
|
|
||||||
private static Map<String, List<String>> createActionSets(Path inputActionSetsDir) throws IOException {
|
private static Map<String, List<String>> createActionSets(Path inputActionSetsDir) throws IOException {
|
||||||
Path inputActionSetJsonDumpsDir = Paths
|
Path inputActionSetJsonDumpsDir = getInputActionSetJsonDumpsDir();
|
||||||
.get(Objects.requireNonNull(cl.getResource("eu/dnetlib/dhp/actionmanager/partition/input/"))
|
|
||||||
.getFile());
|
|
||||||
|
|
||||||
Map<String, List<String>> oafsByType = new HashMap<>();
|
Map<String, List<String>> oafsByType = new HashMap<>();
|
||||||
Files
|
Files
|
||||||
|
@ -138,6 +163,12 @@ public class PartitionActionSetsByPayloadTypeJobTest {
|
||||||
return oafsByType;
|
return oafsByType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Path getInputActionSetJsonDumpsDir() {
|
||||||
|
return Paths
|
||||||
|
.get(Objects.requireNonNull(cl.getResource("eu/dnetlib/dhp/actionmanager/partition/input/"))
|
||||||
|
.getFile());
|
||||||
|
}
|
||||||
|
|
||||||
private static Dataset<String> readActionsFromJsonDump(String path) {
|
private static Dataset<String> readActionsFromJsonDump(String path) {
|
||||||
return spark
|
return spark
|
||||||
.read()
|
.read()
|
||||||
|
|
|
@ -64,6 +64,7 @@ public class DispatchEntitiesApplication {
|
||||||
|
|
||||||
log.info(String.format("Processing entities (%s) in file: %s", type, sourcePath));
|
log.info(String.format("Processing entities (%s) in file: %s", type, sourcePath));
|
||||||
|
|
||||||
|
/*
|
||||||
spark.read()
|
spark.read()
|
||||||
.textFile(sourcePath)
|
.textFile(sourcePath)
|
||||||
.filter((FilterFunction<String>) value -> isEntityType(value, type))
|
.filter((FilterFunction<String>) value -> isEntityType(value, type))
|
||||||
|
@ -73,14 +74,13 @@ public class DispatchEntitiesApplication {
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.parquet(targetPath + "/" + type);
|
.parquet(targetPath + "/" + type);
|
||||||
|
|
||||||
/*
|
*/
|
||||||
|
|
||||||
JavaSparkContext.fromSparkContext(spark.sparkContext())
|
JavaSparkContext.fromSparkContext(spark.sparkContext())
|
||||||
.textFile(sourcePath)
|
.textFile(sourcePath)
|
||||||
.filter(l -> isEntityType(l, type))
|
.filter(l -> isEntityType(l, type))
|
||||||
.map(l -> StringUtils.substringAfter(l, "|"))
|
.map(l -> StringUtils.substringAfter(l, "|"))
|
||||||
.saveAsTextFile(targetPath + "/" + type, GzipCodec.class); // use repartition(XXX) ???
|
.saveAsTextFile(targetPath + "/" + type, GzipCodec.class); // use repartition(XXX) ???
|
||||||
|
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean isEntityType(final String line, final String type) {
|
private static boolean isEntityType(final String line, final String type) {
|
||||||
|
|
|
@ -100,16 +100,24 @@ public class MergeClaimsApplication {
|
||||||
return opRaw.isPresent() ? opRaw.get()._2() : opClaim.isPresent() ? opClaim.get()._2() : null;
|
return opRaw.isPresent() ? opRaw.get()._2() : opClaim.isPresent() ? opClaim.get()._2() : null;
|
||||||
}, Encoders.bean(clazz))
|
}, Encoders.bean(clazz))
|
||||||
.filter(Objects::nonNull)
|
.filter(Objects::nonNull)
|
||||||
|
.map((MapFunction<T, String>) value -> OBJECT_MAPPER.writeValueAsString(value), Encoders.STRING())
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.parquet(outPath);
|
.option("compression", "gzip")
|
||||||
|
.text(outPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T extends Oaf> Dataset<T> readFromPath(SparkSession spark, String path, Class<T> clazz) {
|
private static <T extends Oaf> Dataset<T> readFromPath(SparkSession spark, String path, Class<T> clazz) {
|
||||||
|
return spark.read()
|
||||||
|
.textFile(path)
|
||||||
|
.map((MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz))
|
||||||
|
.filter((FilterFunction<T>) value -> Objects.nonNull(idFn().apply(value)));
|
||||||
|
/*
|
||||||
return spark.read()
|
return spark.read()
|
||||||
.load(path)
|
.load(path)
|
||||||
.as(Encoders.bean(clazz))
|
.as(Encoders.bean(clazz))
|
||||||
.filter((FilterFunction<T>) value -> Objects.nonNull(idFn().apply(value)));
|
.filter((FilterFunction<T>) value -> Objects.nonNull(idFn().apply(value)));
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void removeOutputDir(SparkSession spark, String path) {
|
private static void removeOutputDir(SparkSession spark, String path) {
|
||||||
|
|
|
@ -10,6 +10,10 @@
|
||||||
<value>false</value>
|
<value>false</value>
|
||||||
<description>should import content from the aggregator or reuse a previous version</description>
|
<description>should import content from the aggregator or reuse a previous version</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>contentPath</name>
|
||||||
|
<description>path location to store (or reuse) content from the aggregator</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>postgresURL</name>
|
<name>postgresURL</name>
|
||||||
<description>the postgres URL to access to the database</description>
|
<description>the postgres URL to access to the database</description>
|
||||||
|
@ -108,10 +112,10 @@
|
||||||
<action name="ImportDB_claims">
|
<action name="ImportDB_claims">
|
||||||
<java>
|
<java>
|
||||||
<prepare>
|
<prepare>
|
||||||
<delete path="${workingDir}/db_claims"/>
|
<delete path="${contentPath}/db_claims"/>
|
||||||
</prepare>
|
</prepare>
|
||||||
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication</main-class>
|
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication</main-class>
|
||||||
<arg>-p</arg><arg>${workingDir}/db_claims</arg>
|
<arg>-p</arg><arg>${contentPath}/db_claims</arg>
|
||||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||||
|
@ -124,10 +128,10 @@
|
||||||
<action name="ImportODF_claims">
|
<action name="ImportODF_claims">
|
||||||
<java>
|
<java>
|
||||||
<prepare>
|
<prepare>
|
||||||
<delete path="${workingDir}/odf_claims"/>
|
<delete path="${contentPath}/odf_claims"/>
|
||||||
</prepare>
|
</prepare>
|
||||||
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
|
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
|
||||||
<arg>-p</arg><arg>${workingDir}/odf_claims</arg>
|
<arg>-p</arg><arg>${contentPath}/odf_claims</arg>
|
||||||
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
||||||
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
||||||
<arg>-f</arg><arg>ODF</arg>
|
<arg>-f</arg><arg>ODF</arg>
|
||||||
|
@ -141,10 +145,10 @@
|
||||||
<action name="ImportOAF_claims">
|
<action name="ImportOAF_claims">
|
||||||
<java>
|
<java>
|
||||||
<prepare>
|
<prepare>
|
||||||
<delete path="${workingDir}/oaf_claims"/>
|
<delete path="${contentPath}/oaf_claims"/>
|
||||||
</prepare>
|
</prepare>
|
||||||
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
|
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
|
||||||
<arg>-p</arg><arg>${workingDir}/oaf_claims</arg>
|
<arg>-p</arg><arg>${contentPath}/oaf_claims</arg>
|
||||||
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
||||||
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
||||||
<arg>-f</arg><arg>OAF</arg>
|
<arg>-f</arg><arg>OAF</arg>
|
||||||
|
@ -158,10 +162,10 @@
|
||||||
<action name="ImportDB">
|
<action name="ImportDB">
|
||||||
<java>
|
<java>
|
||||||
<prepare>
|
<prepare>
|
||||||
<delete path="${workingDir}/db_records"/>
|
<delete path="${contentPath}/db_records"/>
|
||||||
</prepare>
|
</prepare>
|
||||||
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication</main-class>
|
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication</main-class>
|
||||||
<arg>-p</arg><arg>${workingDir}/db_records</arg>
|
<arg>-p</arg><arg>${contentPath}/db_records</arg>
|
||||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||||
|
@ -173,10 +177,10 @@
|
||||||
<action name="ImportODF">
|
<action name="ImportODF">
|
||||||
<java>
|
<java>
|
||||||
<prepare>
|
<prepare>
|
||||||
<delete path="${workingDir}/odf_records"/>
|
<delete path="${contentPath}/odf_records"/>
|
||||||
</prepare>
|
</prepare>
|
||||||
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
|
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
|
||||||
<arg>-p</arg><arg>${workingDir}/odf_records</arg>
|
<arg>-p</arg><arg>${contentPath}/odf_records</arg>
|
||||||
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
||||||
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
||||||
<arg>-f</arg><arg>ODF</arg>
|
<arg>-f</arg><arg>ODF</arg>
|
||||||
|
@ -190,10 +194,10 @@
|
||||||
<action name="ImportOAF">
|
<action name="ImportOAF">
|
||||||
<java>
|
<java>
|
||||||
<prepare>
|
<prepare>
|
||||||
<delete path="${workingDir}/oaf_records"/>
|
<delete path="${contentPath}/oaf_records"/>
|
||||||
</prepare>
|
</prepare>
|
||||||
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
|
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
|
||||||
<arg>-p</arg><arg>${workingDir}/oaf_records</arg>
|
<arg>-p</arg><arg>${contentPath}/oaf_records</arg>
|
||||||
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
||||||
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
||||||
<arg>-f</arg><arg>OAF</arg>
|
<arg>-f</arg><arg>OAF</arg>
|
||||||
|
@ -227,7 +231,7 @@
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>-s</arg><arg>${workingDir}/db_claims,${workingDir}/oaf_claims,${workingDir}/odf_claims</arg>
|
<arg>-s</arg><arg>${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims</arg>
|
||||||
<arg>-t</arg><arg>${workingDir}/entities_claim</arg>
|
<arg>-t</arg><arg>${workingDir}/entities_claim</arg>
|
||||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||||
|
@ -276,7 +280,7 @@
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>-s</arg><arg>${workingDir}/db_records,${workingDir}/oaf_records,${workingDir}/odf_records</arg>
|
<arg>-s</arg><arg>${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records</arg>
|
||||||
<arg>-t</arg><arg>${workingDir}/entities</arg>
|
<arg>-t</arg><arg>${workingDir}/entities</arg>
|
||||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||||
|
|
Loading…
Reference in New Issue