forked from D-Net/dnet-hadoop
refactoring
This commit is contained in:
parent
0e447add66
commit
adcbf0e29a
|
@ -0,0 +1,7 @@
|
||||||
|
#sandboxName when not provided explicitly will be generated
|
||||||
|
sandboxName=${sandboxName}
|
||||||
|
sandboxDir=/user/${dhp.hadoop.frontend.user.name}/${sandboxName}
|
||||||
|
workingDir=${sandboxDir}/working_dir
|
||||||
|
oozie.wf.application.path = ${nameNode}${sandboxDir}/${oozieAppDir}
|
||||||
|
oozieTopWfApplicationPath = ${oozie.wf.application.path}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package eu.dnetlib.dhp;
|
package eu.dnetlib.dhp.bulktag;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
@ -28,7 +28,7 @@ public class SparkBulkTagJob2 {
|
||||||
String jsonConfiguration =
|
String jsonConfiguration =
|
||||||
IOUtils.toString(
|
IOUtils.toString(
|
||||||
SparkBulkTagJob2.class.getResourceAsStream(
|
SparkBulkTagJob2.class.getResourceAsStream(
|
||||||
"/eu/dnetlib/dhp/bulktag/input_bulktag_parameters.json"));
|
"/eu/dnetlib/dhp/bulktag/input_bulkTag_parameters.json"));
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
|
|
||||||
|
|
|
@ -65,13 +65,16 @@ public class ResultTagger implements Serializable {
|
||||||
|
|
||||||
// tagging for Subject
|
// tagging for Subject
|
||||||
final Set<String> subjects = new HashSet<>();
|
final Set<String> subjects = new HashSet<>();
|
||||||
result.getSubject().stream()
|
Optional<List<StructuredProperty>> oresultsubj = Optional.ofNullable(result.getSubject());
|
||||||
.map(subject -> subject.getValue())
|
if (oresultsubj.isPresent()) {
|
||||||
.filter(StringUtils::isNotBlank)
|
oresultsubj.get().stream()
|
||||||
.map(String::toLowerCase)
|
.map(subject -> subject.getValue())
|
||||||
.map(String::trim)
|
.filter(StringUtils::isNotBlank)
|
||||||
.collect(Collectors.toCollection(HashSet::new))
|
.map(String::toLowerCase)
|
||||||
.forEach(s -> subjects.addAll(conf.getCommunityForSubjectValue(s)));
|
.map(String::trim)
|
||||||
|
.collect(Collectors.toCollection(HashSet::new))
|
||||||
|
.forEach(s -> subjects.addAll(conf.getCommunityForSubjectValue(s)));
|
||||||
|
}
|
||||||
|
|
||||||
communities.addAll(subjects);
|
communities.addAll(subjects);
|
||||||
|
|
||||||
|
@ -79,32 +82,43 @@ public class ResultTagger implements Serializable {
|
||||||
final Set<String> datasources = new HashSet<>();
|
final Set<String> datasources = new HashSet<>();
|
||||||
final Set<String> tmp = new HashSet<>();
|
final Set<String> tmp = new HashSet<>();
|
||||||
|
|
||||||
for (Instance i : result.getInstance()) {
|
Optional<List<Instance>> oresultinstance = Optional.ofNullable(result.getInstance());
|
||||||
tmp.add(StringUtils.substringAfter(i.getCollectedfrom().getKey(), "|"));
|
if (oresultinstance.isPresent()) {
|
||||||
tmp.add(StringUtils.substringAfter(i.getHostedby().getKey(), "|"));
|
for (Instance i : oresultinstance.get()) {
|
||||||
}
|
tmp.add(StringUtils.substringAfter(i.getCollectedfrom().getKey(), "|"));
|
||||||
|
tmp.add(StringUtils.substringAfter(i.getHostedby().getKey(), "|"));
|
||||||
|
}
|
||||||
|
|
||||||
result.getInstance().stream()
|
oresultinstance.get().stream()
|
||||||
.map(i -> new Pair<>(i.getCollectedfrom().getKey(), i.getHostedby().getKey()))
|
.map(i -> new Pair<>(i.getCollectedfrom().getKey(), i.getHostedby().getKey()))
|
||||||
.flatMap(p -> Stream.of(p.getFst(), p.getSnd()))
|
.flatMap(p -> Stream.of(p.getFst(), p.getSnd()))
|
||||||
.map(s -> StringUtils.substringAfter(s, "|"))
|
.map(s -> StringUtils.substringAfter(s, "|"))
|
||||||
.collect(Collectors.toCollection(HashSet::new))
|
.collect(Collectors.toCollection(HashSet::new))
|
||||||
.forEach(dsId -> datasources.addAll(conf.getCommunityForDatasource(dsId, param)));
|
.forEach(
|
||||||
|
dsId ->
|
||||||
|
datasources.addAll(
|
||||||
|
conf.getCommunityForDatasource(dsId, param)));
|
||||||
|
}
|
||||||
|
|
||||||
communities.addAll(datasources);
|
communities.addAll(datasources);
|
||||||
|
|
||||||
/*Tagging for Zenodo Communities*/
|
/*Tagging for Zenodo Communities*/
|
||||||
final Set<String> czenodo = new HashSet<>();
|
final Set<String> czenodo = new HashSet<>();
|
||||||
result.getContext().stream()
|
|
||||||
.filter(c -> c.getId().contains(ZENODO_COMMUNITY_INDICATOR))
|
Optional<List<Context>> oresultcontext = Optional.ofNullable(result.getContext());
|
||||||
.collect(Collectors.toList())
|
if (oresultcontext.isPresent()) {
|
||||||
.forEach(
|
oresultcontext.get().stream()
|
||||||
c ->
|
.filter(c -> c.getId().contains(ZENODO_COMMUNITY_INDICATOR))
|
||||||
czenodo.addAll(
|
.collect(Collectors.toList())
|
||||||
conf.getCommunityForZenodoCommunityValue(
|
.forEach(
|
||||||
c.getId()
|
c ->
|
||||||
.substring(c.getId().lastIndexOf("/") + 1)
|
czenodo.addAll(
|
||||||
.trim())));
|
conf.getCommunityForZenodoCommunityValue(
|
||||||
|
c.getId()
|
||||||
|
.substring(
|
||||||
|
c.getId().lastIndexOf("/") + 1)
|
||||||
|
.trim())));
|
||||||
|
}
|
||||||
|
|
||||||
communities.addAll(czenodo);
|
communities.addAll(czenodo);
|
||||||
|
|
||||||
|
|
|
@ -67,8 +67,8 @@
|
||||||
<name-node>${nameNode}</name-node>
|
<name-node>${nameNode}</name-node>
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>bulkTagging</name>
|
<name>bulkTagging-publication</name>
|
||||||
<class>eu.dnetlib.dhp.SparkBulkTagJob2</class>
|
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob2</class>
|
||||||
<jar>dhp-bulktag-${projectVersion}.jar</jar>
|
<jar>dhp-bulktag-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--num-executors=${sparkExecutorNumber}
|
--num-executors=${sparkExecutorNumber}
|
||||||
|
@ -96,8 +96,8 @@
|
||||||
<name-node>${nameNode}</name-node>
|
<name-node>${nameNode}</name-node>
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>bulkTagging</name>
|
<name>bulkTagging-dataset</name>
|
||||||
<class>eu.dnetlib.dhp.SparkBulkTagJob2</class>
|
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob2</class>
|
||||||
<jar>dhp-bulktag-${projectVersion}.jar</jar>
|
<jar>dhp-bulktag-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--num-executors=${sparkExecutorNumber}
|
--num-executors=${sparkExecutorNumber}
|
||||||
|
@ -125,8 +125,8 @@
|
||||||
<name-node>${nameNode}</name-node>
|
<name-node>${nameNode}</name-node>
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>bulkTagging</name>
|
<name>bulkTagging-orp</name>
|
||||||
<class>eu.dnetlib.dhp.SparkBulkTagJob2</class>
|
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob2</class>
|
||||||
<jar>dhp-bulktag-${projectVersion}.jar</jar>
|
<jar>dhp-bulktag-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--num-executors=${sparkExecutorNumber}
|
--num-executors=${sparkExecutorNumber}
|
||||||
|
@ -154,8 +154,8 @@
|
||||||
<name-node>${nameNode}</name-node>
|
<name-node>${nameNode}</name-node>
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>bulkTagging</name>
|
<name>bulkTagging-software</name>
|
||||||
<class>eu.dnetlib.dhp.SparkBulkTagJob2</class>
|
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob2</class>
|
||||||
<jar>dhp-bulktag-${projectVersion}.jar</jar>
|
<jar>dhp-bulktag-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--num-executors=${sparkExecutorNumber}
|
--num-executors=${sparkExecutorNumber}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package eu.dnetlib.dhp;
|
||||||
import static eu.dnetlib.dhp.community.TagginConstants.ZENODO_COMMUNITY_INDICATOR;
|
import static eu.dnetlib.dhp.community.TagginConstants.ZENODO_COMMUNITY_INDICATOR;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import eu.dnetlib.dhp.bulktag.SparkBulkTagJob2;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||||
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
|
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
|
|
|
@ -141,4 +141,15 @@ public class CommunityConfigurationFactoryTest {
|
||||||
|
|
||||||
System.out.println(cc.toJson());
|
System.out.println(cc.toJson());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void temporaneo() throws Exception {
|
||||||
|
String xml =
|
||||||
|
IOUtils.toString(
|
||||||
|
getClass()
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/communityconfiguration/tagging_conf.xml"));
|
||||||
|
final CommunityConfiguration cc = CommunityConfigurationFactory.newInstance(xml);
|
||||||
|
System.out.println(cc.toJson());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
File diff suppressed because one or more lines are too long
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue