Fixed a problem with JavaRDD Union

This commit is contained in:
Michele Artini 2020-02-25 15:59:21 +01:00
parent 4c94e74a84
commit 93665773ea
2 changed files with 49 additions and 14 deletions

View File

@ -1,10 +1,16 @@
package eu.dnetlib.dhp.migration; package eu.dnetlib.dhp.migration;
import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
@ -23,6 +29,8 @@ import scala.Tuple2;
public class ExtractEntitiesFromHDFSJob { public class ExtractEntitiesFromHDFSJob {
private static final Log log = LogFactory.getLog(ExtractEntitiesFromHDFSJob.class);
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils.toString(MigrateMongoMdstoresApplication.class IOUtils.toString(MigrateMongoMdstoresApplication.class
@ -35,10 +43,11 @@ public class ExtractEntitiesFromHDFSJob {
.master(parser.get("master")) .master(parser.get("master"))
.getOrCreate(); .getOrCreate();
final List<String> sourcePaths = Arrays.asList(parser.get("sourcePaths").split(","));
final String targetPath = parser.get("graphRawPath");
try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) { try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
final List<String> sourcePaths = Arrays.stream(parser.get("sourcePaths").split(",")).filter(p -> exists(sc, p)).collect(Collectors.toList());
final String targetPath = parser.get("graphRawPath");
processEntity(sc, Publication.class, sourcePaths, targetPath); processEntity(sc, Publication.class, sourcePaths, targetPath);
processEntity(sc, Dataset.class, sourcePaths, targetPath); processEntity(sc, Dataset.class, sourcePaths, targetPath);
processEntity(sc, Software.class, sourcePaths, targetPath); processEntity(sc, Software.class, sourcePaths, targetPath);
@ -53,16 +62,33 @@ public class ExtractEntitiesFromHDFSJob {
private static void processEntity(final JavaSparkContext sc, final Class<?> clazz, final List<String> sourcePaths, final String targetPath) { private static void processEntity(final JavaSparkContext sc, final Class<?> clazz, final List<String> sourcePaths, final String targetPath) {
final String type = clazz.getSimpleName().toLowerCase(); final String type = clazz.getSimpleName().toLowerCase();
final JavaRDD<String> inputRdd = sc.emptyRDD(); log.info(String.format("Processing entities (%s) in files:", type));
sourcePaths.forEach(sourcePath -> inputRdd.union(sc.sequenceFile(sourcePath, Text.class, Text.class) sourcePaths.forEach(log::info);
.map(k -> new Tuple2<>(k._1().toString(), k._2().toString()))
.filter(k -> isEntityType(k._1(), type)) JavaRDD<String> inputRdd = sc.emptyRDD();
.map(Tuple2::_2)));
for (final String sp : sourcePaths) {
inputRdd = inputRdd.union(sc.sequenceFile(sp, Text.class, Text.class)
.map(k -> new Tuple2<>(k._1().toString(), k._2().toString()))
.filter(k -> isEntityType(k._1(), type))
.map(Tuple2::_2));
}
inputRdd.saveAsTextFile(targetPath + "/" + type); inputRdd.saveAsTextFile(targetPath + "/" + type);
} }
private static boolean isEntityType(final String item, final String entity) { private static boolean isEntityType(final String item, final String type) {
return StringUtils.substringAfter(item, ":").equalsIgnoreCase(entity); return StringUtils.substringAfter(item, ":").equalsIgnoreCase(type);
}
private static boolean exists(final JavaSparkContext context, final String pathToFile) {
try {
final FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(context.hadoopConfiguration());
final Path path = new Path(pathToFile);
return hdfs.exists(path);
} catch (final IOException e) {
throw new RuntimeException(e);
}
} }
} }

View File

@ -43,7 +43,7 @@
</property> </property>
</parameters> </parameters>
<start to="ResetWorkingPath"/> <start to="ResetWorkingPath"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -87,7 +87,7 @@
<arg>-dbpasswd</arg><arg>${postgresPassword}</arg> <arg>-dbpasswd</arg><arg>${postgresPassword}</arg>
<arg>-a</arg><arg>claims</arg> <arg>-a</arg><arg>claims</arg>
</java> </java>
<ok to="ImportODFEntitiesFromMongoDB"/> <ok to="ImportODFEntitiesFromMongoDB"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
@ -171,11 +171,20 @@
<arg>-pguser</arg><arg>${postgresUser}</arg> <arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg> <arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
</java> </java>
<ok to="ExtractEntities"/> <ok to="ResetGraphRawPath"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="ExtractEntities"> <action name="ResetGraphRawPath">
<fs>
<delete path='${graphRawPath}'/>
<mkdir path='${graphRawPath}'/>
</fs>
<ok to="ExtractEntitiesInGraphRawPath"/>
<error to="Kill"/>
</action>
<action name="ExtractEntitiesInGraphRawPath">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker> <job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node> <name-node>${nameNode}</name-node>