[Enrichment Step] get rid of hive
This commit is contained in:
parent
7501e823ed
commit
30e0f60ac8
|
@ -80,7 +80,7 @@ public class PrepareResultOrcidAssociationStep1 {
|
||||||
conf,
|
conf,
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
removeOutputDir(spark, outputPath);
|
// removeOutputDir(spark, outputPath);
|
||||||
prepareInfo(
|
prepareInfo(
|
||||||
spark, inputPath, outputPath, resultType, resultClazz, allowedsemrel, allowedPids);
|
spark, inputPath, outputPath, resultType, resultClazz, allowedsemrel, allowedPids);
|
||||||
});
|
});
|
||||||
|
@ -96,14 +96,6 @@ public class PrepareResultOrcidAssociationStep1 {
|
||||||
List<String> allowedPids) {
|
List<String> allowedPids) {
|
||||||
|
|
||||||
final String inputResultPath = inputPath + "/" + resultType;
|
final String inputResultPath = inputPath + "/" + resultType;
|
||||||
readPath(spark, inputPath + "/relation", Relation.class)
|
|
||||||
.filter(
|
|
||||||
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference()
|
|
||||||
&& allowedsemrel.contains(r.getRelClass().toLowerCase()))
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.json(outputPath + "/" + resultType + "/relationSubset");
|
|
||||||
|
|
||||||
Dataset<Relation> relation = readPath(spark, outputPath + "/relationSubset", Relation.class);
|
Dataset<Relation> relation = readPath(spark, outputPath + "/relationSubset", Relation.class);
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
</property>
|
</property>
|
||||||
</parameters>
|
</parameters>
|
||||||
|
|
||||||
<start to="fork_prepare_assoc_step1"/>
|
<start to="prepare_relations"/>
|
||||||
|
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
|
Loading…
Reference in New Issue