1
0
Fork 0

added oozie wf

This commit is contained in:
Sandro La Bruzzo 2019-03-18 10:46:07 +01:00
parent c0da3da4c4
commit 49d8cc716e
4 changed files with 20 additions and 106 deletions

3
.gitignore vendored
View File

@ -5,4 +5,5 @@
/*/target
/target
/*/build
/build
/build
spark-warehouse

View File

@ -1,79 +0,0 @@
package eu.dnetlib.collection;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.apache.spark.sql.functions.array_contains;
public class GenerateNativeStoreSparkJob {
public static void main(String[] args) {
final SparkSession spark = SparkSession
.builder()
.appName("GenerateNativeStoreSparkJob")
.master("local[*]")
.getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaPairRDD<IntWritable, Text> f = sc.sequenceFile("/home/sandro/Downloads/mdstore_oai", IntWritable.class, Text.class);
String first = f.map(a -> a._2().toString()).first();
final List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("format", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("formatName", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("body", DataTypes.StringType, true));
JavaRDD<Row> mdRdd = f.map((Function<Tuple2<IntWritable, Text>, Row>) item -> RowFactory.create("" + item._1().get(), "xml", null, item._2().toString()));
final StructType schema = DataTypes.createStructType(fields);
Dataset<Row> ds = spark.createDataFrame(mdRdd, schema);
// ds.write().save("/home/sandro/Downloads/test.parquet");
Publication p2 = new Publication();
p2.setDates(Collections.singletonList("2018-09-09"));
p2.setTitles(Collections.singletonList("Titolo 2"));
p2.setIdentifiers(Collections.singletonList(new PID("pmID", "1234567")));
Publication p1 = new Publication();
p1.setDates(Collections.singletonList("2018-09-09"));
p1.setTitles(Collections.singletonList("Titolo 1"));
p1.setIdentifiers(Collections.singletonList(new PID("doi", "1234567")));
Encoder<Publication> encoder = Encoders.bean(Publication.class);
Dataset<Publication> dp = spark.createDataset(Arrays.asList(p1,p2), encoder);
long count = dp.where(array_contains(new Column("identifiers.schema"), "doi")).count();
System.out.println("count = " + count);
System.out.println(ds.count());
}
}

View File

@ -1,22 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>dhp-wf</artifactId>
<packaging>pom</packaging>
<modules>
</modules>
</project>

22
pom.xml
View File

@ -21,10 +21,8 @@
<modules>
<!--<module>dhp-build</module>-->
<!--<module>dhp-common</module>-->
<!--<module>dhp-schemas</module>-->
<!--<module>dhp-wf</module>-->
<!--<module>dhp-collector-worker</module>-->
<module>dhp-common</module>
<module>dhp-workflows</module>
</modules>
<issueManagement>
@ -104,6 +102,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${dhp.hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
@ -118,6 +117,20 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${dhp.commons.lang.version}</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.9</version>
</dependency>
</dependencies>
</dependencyManagement>
@ -239,6 +252,7 @@
<dhp.cdh.version>cdh5.9.2</dhp.cdh.version>
<dhp.hadoop.version>2.6.0-${dhp.cdh.version}</dhp.hadoop.version>
<dhp.spark.version>2.2.0</dhp.spark.version>
<dhp.commons.lang.version>3.5</dhp.commons.lang.version>
<scala.version>2.11.8</scala.version>
</properties>
</project>