reading from newline delimited json textfiles instead of sequence files
This commit is contained in:
parent
af835f2f98
commit
aeb01fa353
|
@ -226,9 +226,8 @@ public class GraphJoiner implements Serializable {
|
|||
* @return the JavaPairRDD<String, TypedRow> indexed by entity identifier
|
||||
*/
|
||||
private JavaPairRDD<String, TypedRow> readPathEntity(final JavaSparkContext sc, final String inputPath, final String type) {
|
||||
return sc.sequenceFile(inputPath + "/" + type, Text.class, Text.class)
|
||||
.mapToPair((PairFunction<Tuple2<Text, Text>, String, TypedRow>) item -> {
|
||||
final String s = item._2().toString();
|
||||
return sc.textFile(inputPath + "/" + type)
|
||||
.mapToPair((PairFunction<String, String, TypedRow>) s -> {
|
||||
final DocumentContext json = JsonPath.parse(s);
|
||||
final String id = json.read("$.id");
|
||||
return new Tuple2<>(id, new TypedRow()
|
||||
|
@ -247,9 +246,8 @@ public class GraphJoiner implements Serializable {
|
|||
* @return the JavaRDD<TypedRow> containing all the relationships
|
||||
*/
|
||||
private JavaRDD<TypedRow> readPathRelation(final JavaSparkContext sc, final String inputPath) {
|
||||
return sc.sequenceFile(inputPath + "/relation", Text.class, Text.class)
|
||||
.map(item -> {
|
||||
final String s = item._2().toString();
|
||||
return sc.textFile(inputPath + "/relation")
|
||||
.map(s -> {
|
||||
final DocumentContext json = JsonPath.parse(s);
|
||||
return new TypedRow()
|
||||
.setSourceId(json.read("$.source"))
|
||||
|
|
Loading…
Reference in New Issue