[AffiliationIngestion]refactoring

This commit is contained in:
Miriam Baglioni 2024-06-29 18:35:49 +02:00
parent 236b64d830
commit 9cbe966b4a
3 changed files with 6 additions and 7 deletions

View File

@ -106,15 +106,15 @@ public class PrepareAffiliationRelations implements Serializable {
spark, dataciteInputPath, collectedFromDatacite); spark, dataciteInputPath, collectedFromDatacite);
List<KeyValue> collectedFromWebCrawl = OafMapperUtils List<KeyValue> collectedFromWebCrawl = OafMapperUtils
.listKeyValues(Constants.WEB_CRAWL_ID, Constants.WEB_CRAWL_NAME); .listKeyValues(Constants.WEB_CRAWL_ID, Constants.WEB_CRAWL_NAME);
JavaPairRDD<Text, Text> webCrawlRelations = prepareAffiliationRelations( JavaPairRDD<Text, Text> webCrawlRelations = prepareAffiliationRelations(
spark, webcrawlInputPath, collectedFromWebCrawl); spark, webcrawlInputPath, collectedFromWebCrawl);
crossrefRelations crossrefRelations
.union(pubmedRelations) .union(pubmedRelations)
.union(openAPCRelations) .union(openAPCRelations)
.union(dataciteRelations) .union(dataciteRelations)
.union(webCrawlRelations) .union(webCrawlRelations)
.saveAsHadoopFile( .saveAsHadoopFile(
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class);

View File

@ -1,15 +1,12 @@
package eu.dnetlib.dhp.actionmanager.webcrawl; package eu.dnetlib.dhp.actionmanager.webcrawl;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable; import java.io.Serializable;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.actionmanager.Constants;
import io.netty.util.Constant;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.GzipCodec;
@ -24,6 +21,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.Constants;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
@ -32,6 +30,7 @@ import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner; import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner;
import eu.dnetlib.dhp.schema.oaf.utils.PidType; import eu.dnetlib.dhp.schema.oaf.utils.PidType;
import io.netty.util.Constant;
import scala.Tuple2; import scala.Tuple2;
/** /**

View File

@ -88,7 +88,7 @@ public class PrepareAffiliationRelationsTest {
"-pubmedInputPath", crossrefAffiliationRelationPath, "-pubmedInputPath", crossrefAffiliationRelationPath,
"-openapcInputPath", crossrefAffiliationRelationPath, "-openapcInputPath", crossrefAffiliationRelationPath,
"-dataciteInputPath", crossrefAffiliationRelationPath, "-dataciteInputPath", crossrefAffiliationRelationPath,
"-webCrawlInputPath", crossrefAffiliationRelationPath, "-webCrawlInputPath", crossrefAffiliationRelationPath,
"-outputPath", outputPath "-outputPath", outputPath
}); });