Remove API test, as it will be tested in ckanext-dgu

This commit is contained in:
Adrià Mercader 2011-03-15 14:13:49 +00:00
parent 102bbeceba
commit b2320a9f25
1 changed files with 0 additions and 206 deletions

View File

@ -1,206 +0,0 @@
from ckan.tests.functional.api.base import BaseModelApiTestCase
from ckan.tests.functional.api.base import Api1TestCase as Version1TestCase
from ckan.tests.functional.api.base import Api2TestCase as Version2TestCase
from ckan.tests.functional.api.base import ApiUnversionedTestCase as UnversionedTestCase
# Todo: Remove this ckan.model stuff.
import ckan.model as model
from ckanext.harvest.model import HarvestSource
from ckanext.harvest.model import HarvestingJob
from ckanext.harvest.model import HarvestedDocument
class HarvestingTestCase(BaseModelApiTestCase):
commit_changesets = False
reuse_common_fixtures = True
def setup(self):
#model.repo.init_db()
super(HarvestingTestCase, self).setup()
self.source = None
self.source1 = None
self.source2 = None
self.source3 = None
self.source4 = None
self.source5 = None
self.job = None
self.job1 = None
self.job2 = None
self.job3 = None
def teardown(self):
model.repo.delete_all()
def init_extra_environ(self):
self.user = model.User.by_name(self.user_name)
self.extra_environ={'Authorization' : config.get('ckan.harvesting.api_key')}
def _create_harvest_source_fixture(self, **kwds):
source = HarvestSource(**kwds)
model.Session.add(source)
model.Session.commit()
assert source.id
return source
def _create_harvesting_job_fixture(self, **kwds):
if not kwds.get('user_ref'):
kwds['user_ref'] = u'c_publisher_user'
job = HarvestingJob(**kwds)
model.Session.add(job)
model.Session.commit()
assert job.id
return job
def test_harvestsource_entity_get_ok(self):
# Setup harvest source fixture.
fixture_url = u'http://localhost/'
self.source = self._create_harvest_source_fixture(url=fixture_url)
offset = self.offset('/rest/harvestsource/%s' % self.source.id)
res = self.app.get(offset, status=[200])
source_data = self.data_from_res(res)
assert 'url' in source_data, "No 'id' in changeset data: %s" % source_data
self.assert_equal(source_data.get('url'), fixture_url)
def test_harvestsource_entity_get_not_found(self):
offset = self.offset('/rest/harvestsource/%s' % "notasource")
self.app.get(offset, status=[404])
def test_publisher_harvestsource_register_get_ok(self):
# Setup harvest source fixtures.
fixture_url = u'http://localhost/'
self.source1 = self._create_harvest_source_fixture(url=fixture_url+'1', publisher_ref=u'pub1')
self.source2 = self._create_harvest_source_fixture(url=fixture_url+'2', publisher_ref=u'pub1')
self.source3 = self._create_harvest_source_fixture(url=fixture_url+'3', publisher_ref=u'pub1')
self.source4 = self._create_harvest_source_fixture(url=fixture_url+'4', publisher_ref=u'pub2')
self.source5 = self._create_harvest_source_fixture(url=fixture_url+'5', publisher_ref=u'pub2')
offset = self.offset('/rest/harvestsource/publisher/pub1')
res = self.app.get(offset, status=[200])
source_data = self.data_from_res(res)
self.assert_equal(len(source_data), 3)
offset = self.offset('/rest/harvestsource/publisher/pub2')
res = self.app.get(offset, status=[200])
source_data = self.data_from_res(res)
self.assert_equal(len(source_data), 2)
def test_harvestingjob_entity_get_ok(self):
# Setup harvesting job fixture.
fixture_url = u'http://localhost/6'
self.source = self._create_harvest_source_fixture(url=fixture_url)
self.job = self._create_harvesting_job_fixture(source_id=self.source.id)
offset = self.offset('/rest/harvestingjob/%s' % self.job.id)
res = self.app.get(offset, status=[200])
job_data = self.data_from_res(res)
self.assert_equal(job_data.get('source_id'), self.source.id)
def test_harvestingjob_entity_get_not_found(self):
# Setup harvesting job fixture.
offset = self.offset('/rest/harvestingjob/%s' % "notajob")
self.app.get(offset, status=[404])
def test_harvestingjob_register_post_ok(self):
# Setup harvest source fixture.
fixture_url = u'http://localhost/7'
self.source = self._create_harvest_source_fixture(url=fixture_url)
# Prepare and send POST request to register.
offset = self.offset('/rest/harvestingjob')
# - invalid example.
job_details = {
'source_id': 'made-up-source-id',
'user_ref': u'a_publisher_user',
}
assert not HarvestingJob.get(u'a_publisher_user', default=None, attr='user_ref')
response = self.post(offset, job_details, status=400)
job_error = self.data_from_res(response)
assert "does not exist" in job_error
assert not HarvestingJob.get(u'a_publisher_user', default=None, attr='user_ref')
# - invalid example.
job_details = {
'source_id': self.source.id,
'user_ref': u'',
}
assert not HarvestingJob.get(u'a_publisher_user', None, 'user_ref')
response = self.post(offset, job_details, status=400)
job_error = self.data_from_res(response)
assert "You must supply a user_ref" in job_error
assert not HarvestingJob.get(self.source.id, default=None, attr='source_id')
# - valid example.
job_details = {
'source_id': self.source.id,
'user_ref': u'a_publisher_user',
}
assert not HarvestingJob.get(u'a_publisher_user', None, 'user_ref')
response = self.post(offset, job_details)
new_job = self.data_from_res(response)
assert new_job['id']
self.assert_equal(new_job['source_id'], self.source.id)
self.assert_equal(new_job['user_ref'], u'a_publisher_user')
self.job = HarvestingJob.get(self.source.id, attr='source_id')
HarvestingJob.get(u'a_publisher_user', attr='user_ref')
def test_harvestingjob_register_get_filter_by_status(self):
# Setup harvest source fixture.
fixture_url = u'http://localhost/8'
self.source = self._create_harvest_source_fixture(url=fixture_url)
self.job = self._create_harvesting_job_fixture(source_id=self.source.id)
register_offset = self.offset('/rest/harvestingjob')
self.assert_equal(self.job.status, 'New')
filter_offset = '/status/new'
offset = register_offset + filter_offset
res = self.get(offset)
data = self.data_from_res(res)
self.assert_equal(data, [self.job.id])
filter_offset = '/status/error'
offset = register_offset + filter_offset
res = self.get(offset)
data = self.data_from_res(res)
self.assert_equal(data, [])
self.job.status = u'Error'
self.job.save()
res = self.get(offset)
data = self.data_from_res(res)
self.assert_equal(data, [self.job.id])
filter_offset = '/status/new'
offset = register_offset + filter_offset
res = self.get(offset)
data = self.data_from_res(res)
self.assert_equal(data, [])
filter_offset = '/status/error'
offset = register_offset + filter_offset
res = self.get(offset)
data = self.data_from_res(res)
self.assert_equal(data, [self.job.id])
def test_harvestingjob_entity_delete_ok(self):
# Setup harvesting job fixture.
fixture_url = u'http://localhost/6'
self.source = self._create_harvest_source_fixture(url=fixture_url)
self.job = self._create_harvesting_job_fixture(source_id=self.source.id)
offset = self.offset('/rest/harvestingjob/%s' % self.job.id)
self.get(offset, status=[200])
res = self.app_delete(offset, status=[200])
self.get(offset, status=[404])
def test_harvestingjob_entity_delete_denied(self):
self.send_authorization_header = False
# Setup harvesting job fixture.
fixture_url = u'http://localhost/6'
self.source = self._create_harvest_source_fixture(url=fixture_url)
self.job = self._create_harvesting_job_fixture(source_id=self.source.id)
offset = self.offset('/rest/harvestingjob/%s' % self.job.id)
self.get(offset, status=[200])
self.app_delete(offset, status=[403])
def test_harvestingjob_entity_delete_not_found(self):
# Setup harvesting job fixture.
offset = self.offset('/rest/harvestingjob/%s' % "notajob")
self.get(offset, status=[404])
class TestHarvestingVersion1(Version1TestCase, HarvestingTestCase): pass
class TestHarvestingVersion2(Version2TestCase, HarvestingTestCase): pass
class TestHarvestingUnversioned(UnversionedTestCase, HarvestingTestCase): pass