forked from D-Net/dnet-hadoop
changed the save method
This commit is contained in:
parent
951b13ac46
commit
d60fd36046
|
@ -35,7 +35,7 @@ public class SparkResultToOrganizationFromIstRepoJob2 {
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
String jsonConfiguration = IOUtils.toString(SparkCountryPropagationJob2.class
|
String jsonConfiguration = IOUtils.toString(SparkResultToOrganizationFromIstRepoJob2.class
|
||||||
.getResourceAsStream("/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_propagationresulaffiliationfrominstrepo_parameters.json"));
|
.getResourceAsStream("/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_propagationresulaffiliationfrominstrepo_parameters.json"));
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
@ -118,9 +118,14 @@ public class SparkResultToOrganizationFromIstRepoJob2 {
|
||||||
.textFile(alreadylinked)
|
.textFile(alreadylinked)
|
||||||
.map(value -> OBJECT_MAPPER.readValue(value, ResultOrganizationSet.class),
|
.map(value -> OBJECT_MAPPER.readValue(value, ResultOrganizationSet.class),
|
||||||
Encoders.bean(ResultOrganizationSet.class)), potentialUpdates)
|
Encoders.bean(ResultOrganizationSet.class)), potentialUpdates)
|
||||||
.toJavaRDD()
|
.toJSON()
|
||||||
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
|
.write()
|
||||||
.saveAsTextFile(outputPath , GzipCodec.class);
|
.mode(SaveMode.Append)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.text(outputPath);
|
||||||
|
// .toJavaRDD()
|
||||||
|
// .map(r -> OBJECT_MAPPER.writeValueAsString(r))
|
||||||
|
// .saveAsTextFile(outputPath , GzipCodec.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue