dnet-and/dnet-and-test/src/main/java/eu/dnetlib/jobs/SparkTokenizer.java

70 lines
2.4 KiB
Java

package eu.dnetlib.jobs;
import eu.dnetlib.featureextraction.FeatureTransformer;
import eu.dnetlib.featureextraction.util.Utilities;
import eu.dnetlib.support.ArgumentApplicationParser;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.Feature;
import java.io.IOException;
import java.util.Optional;
public class SparkTokenizer extends AbstractSparkJob {
private static final Logger log = LoggerFactory.getLogger(SparkTokenizer.class);
public SparkTokenizer(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
readResource("/jobs/parameters/tokenizer_parameters.json", SparkTokenizer.class)
);
parser.parseArgument(args);
SparkConf conf = new SparkConf();
new SparkTokenizer(
parser,
getSparkSession(conf)
).run();
}
@Override
public void run() throws IOException {
// read oozie parameters
final String entitiesPath = parser.get("entitiesPath");
final String workingPath = parser.get("workingPath");
final String inputFieldJPath = parser.get("inputFieldJPath");
final int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(NUM_PARTITIONS);
log.info("entitiesPath: '{}'", entitiesPath);
log.info("workingPath: '{}'", workingPath);
log.info("inputField: '{}'", inputFieldJPath);
log.info("numPartitions: '{}'", numPartitions);
JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());
SQLContext sqlContext = SQLContext.getOrCreate(spark.sparkContext());
Dataset<Row> inputDS = Utilities.prepareDataset(sqlContext, context.textFile(entitiesPath).repartition(numPartitions), inputFieldJPath);
Dataset<Row> tokensDS = FeatureTransformer.tokenizeData(inputDS);
tokensDS
.write()
.mode(SaveMode.Overwrite)
.save(workingPath + "/tokens");
}
}