forked from D-Net/dnet-hadoop
WIP: worflow nodes for including Scholexplorer records in the RAW graph
This commit is contained in:
parent
c8850456e9
commit
98f37c8d81
|
@ -101,24 +101,15 @@ public class CopyHdfsOafApplication extends AbstractMigrationApplication {
|
||||||
.as(Encoders.kryo(Oaf.class));
|
.as(Encoders.kryo(Oaf.class));
|
||||||
|
|
||||||
// dispatch each entity type individually in the respective graph subdirectory in append mode
|
// dispatch each entity type individually in the respective graph subdirectory in append mode
|
||||||
for(Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) {
|
for(Map.Entry<String, Class> e : ModelSupport.oafTypes.entrySet()) {
|
||||||
oaf
|
oaf
|
||||||
.filter((FilterFunction<Oaf>) o -> o.getClass().getSimpleName().toLowerCase().equals(e.getKey().toString()))
|
.filter((FilterFunction<Oaf>) o -> o.getClass().getSimpleName().toLowerCase().equals(e.getKey()))
|
||||||
.map((MapFunction<Oaf, String>) OBJECT_MAPPER::writeValueAsString, Encoders.bean(e.getValue()))
|
.map((MapFunction<Oaf, String>) OBJECT_MAPPER::writeValueAsString, Encoders.bean(e.getValue()))
|
||||||
.write()
|
.write()
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.mode(SaveMode.Append)
|
.mode(SaveMode.Append)
|
||||||
.text(outputPath + "/" + e.getKey());
|
.text(outputPath + "/" + e.getKey());
|
||||||
}
|
}
|
||||||
|
|
||||||
oaf
|
|
||||||
.flatMap((FlatMapFunction<Oaf, Relation>) o -> {
|
|
||||||
Relation rel = (Relation) o;
|
|
||||||
List<Relation> rels = Lists.newArrayList();
|
|
||||||
rels.add(getInverse(rel, vocs));
|
|
||||||
|
|
||||||
return rels.iterator();
|
|
||||||
}, Encoders.bean(Relation.class));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue