[AffiliationFromPublisher]refactoring
This commit is contained in:
parent
675de07138
commit
7df492fa34
|
@ -89,49 +89,51 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
Constants.removeOutputDir(spark, outputPath);
|
Constants.removeOutputDir(spark, outputPath);
|
||||||
|
createActionSet(spark, crossrefInputPath, pubmedInputPath, openapcInputPath, dataciteInputPath, webcrawlInputPath, publisherlInputPath, outputPath);
|
||||||
List<KeyValue> collectedFromCrossref = OafMapperUtils
|
|
||||||
.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref");
|
|
||||||
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelations(
|
|
||||||
spark, crossrefInputPath, collectedFromCrossref);
|
|
||||||
|
|
||||||
List<KeyValue> collectedFromPubmed = OafMapperUtils
|
|
||||||
.listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed");
|
|
||||||
JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations(
|
|
||||||
spark, pubmedInputPath, collectedFromPubmed);
|
|
||||||
|
|
||||||
List<KeyValue> collectedFromOpenAPC = OafMapperUtils
|
|
||||||
.listKeyValues(ModelConstants.OPEN_APC_ID, "OpenAPC");
|
|
||||||
JavaPairRDD<Text, Text> openAPCRelations = prepareAffiliationRelations(
|
|
||||||
spark, openapcInputPath, collectedFromOpenAPC);
|
|
||||||
|
|
||||||
List<KeyValue> collectedFromDatacite = OafMapperUtils
|
|
||||||
.listKeyValues(ModelConstants.DATACITE_ID, "Datacite");
|
|
||||||
JavaPairRDD<Text, Text> dataciteRelations = prepareAffiliationRelations(
|
|
||||||
spark, dataciteInputPath, collectedFromDatacite);
|
|
||||||
|
|
||||||
List<KeyValue> collectedFromWebCrawl = OafMapperUtils
|
|
||||||
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
|
|
||||||
JavaPairRDD<Text, Text> webCrawlRelations = prepareAffiliationRelations(
|
|
||||||
spark, webcrawlInputPath, collectedFromWebCrawl);
|
|
||||||
|
|
||||||
List<KeyValue> collectedfromPublisher = OafMapperUtils
|
|
||||||
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
|
|
||||||
JavaPairRDD<Text, Text> publisherRelations = prepareAffiliationRelationFromPublisher(
|
|
||||||
spark, publisherlInputPath, collectedfromPublisher);
|
|
||||||
|
|
||||||
crossrefRelations
|
|
||||||
.union(pubmedRelations)
|
|
||||||
.union(openAPCRelations)
|
|
||||||
.union(dataciteRelations)
|
|
||||||
.union(webCrawlRelations)
|
|
||||||
.union(publisherRelations)
|
|
||||||
.saveAsHadoopFile(
|
|
||||||
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class);
|
|
||||||
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void createActionSet(SparkSession spark, String crossrefInputPath, String pubmedInputPath, String openapcInputPath, String dataciteInputPath, String webcrawlInputPath, String publisherlInputPath, String outputPath) {
|
||||||
|
List<KeyValue> collectedFromCrossref = OafMapperUtils
|
||||||
|
.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref");
|
||||||
|
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelations(
|
||||||
|
spark, crossrefInputPath, collectedFromCrossref);
|
||||||
|
|
||||||
|
List<KeyValue> collectedFromPubmed = OafMapperUtils
|
||||||
|
.listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed");
|
||||||
|
JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations(
|
||||||
|
spark, pubmedInputPath, collectedFromPubmed);
|
||||||
|
|
||||||
|
List<KeyValue> collectedFromOpenAPC = OafMapperUtils
|
||||||
|
.listKeyValues(ModelConstants.OPEN_APC_ID, "OpenAPC");
|
||||||
|
JavaPairRDD<Text, Text> openAPCRelations = prepareAffiliationRelations(
|
||||||
|
spark, openapcInputPath, collectedFromOpenAPC);
|
||||||
|
|
||||||
|
List<KeyValue> collectedFromDatacite = OafMapperUtils
|
||||||
|
.listKeyValues(ModelConstants.DATACITE_ID, "Datacite");
|
||||||
|
JavaPairRDD<Text, Text> dataciteRelations = prepareAffiliationRelations(
|
||||||
|
spark, dataciteInputPath, collectedFromDatacite);
|
||||||
|
|
||||||
|
List<KeyValue> collectedFromWebCrawl = OafMapperUtils
|
||||||
|
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
|
||||||
|
JavaPairRDD<Text, Text> webCrawlRelations = prepareAffiliationRelations(
|
||||||
|
spark, webcrawlInputPath, collectedFromWebCrawl);
|
||||||
|
|
||||||
|
List<KeyValue> collectedfromPublisher = OafMapperUtils
|
||||||
|
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
|
||||||
|
JavaPairRDD<Text, Text> publisherRelations = prepareAffiliationRelationFromPublisher(
|
||||||
|
spark, publisherlInputPath, collectedfromPublisher);
|
||||||
|
|
||||||
|
crossrefRelations
|
||||||
|
.union(pubmedRelations)
|
||||||
|
.union(openAPCRelations)
|
||||||
|
.union(dataciteRelations)
|
||||||
|
.union(webCrawlRelations)
|
||||||
|
.union(publisherRelations)
|
||||||
|
.saveAsHadoopFile(
|
||||||
|
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class);
|
||||||
|
}
|
||||||
|
|
||||||
private static JavaPairRDD<Text,Text> prepareAffiliationRelationFromPublisher(SparkSession spark, String inputPath,
|
private static JavaPairRDD<Text,Text> prepareAffiliationRelationFromPublisher(SparkSession spark, String inputPath,
|
||||||
List<KeyValue> collectedfrom){
|
List<KeyValue> collectedfrom){
|
||||||
Dataset<Row> df = spark
|
Dataset<Row> df = spark
|
||||||
|
|
Loading…
Reference in New Issue