Requirements
In order to interact with the TAP interface of stella.aip.de you only require
python 3+ and pyvo 1+.
pip install pyvo>=1.0
Importing PyVo and checking the version
It is useful to always print the version of pyvo you are using. Most of non-working scripts fail because of an old version of pyvo.
from pkg_resources import parse_version
import pyvo
# Verify the version of pyvo
if parse_version(pyvo.__version__) < parse_version('1.0'):
raise ImportError('pyvo version must be at least than 1.0')
print('\npyvo version %s \n' % (pyvo.__version__,))
Authentication
After registration, you can access your API Token by clicking on your user name in the right side of the menu bar and selecting API Token.
You will see a long alphanumerical word. In the following examples, please replace the placeholder <your-token> with your actual token.
The
API Tokenidentifies you and provides access to the results tables of your queries.
The connection to the TAP service can be done that way:
import requests
import pyvo
service_name = "Stella@AIP"
url = "https://stella.aip.de/tap"
token = 'Token <your-token>'
tap_session = requests.Session()
tap_session.headers['Authorization'] = token
tap_service = pyvo.dal.TAPService(url, session=tap_session)
Short queries
Many queries last less than a few seconds, we call them short queries. The latter can be executed with synchronized jobs.
lang = "ADQL"
query = '''
-- Show spectral types for all objects
SELECT objname,sptype FROM "stella_vpnep".objects;
'''
tap_result = tap_service.run_sync(query, language=lang)
Remark: the
langparameter can take two values eitherPostgreSQLorADQLthis allows access to some features present in the one or the other language for more details about the difference between both please refer : ADQL or to IVOA docs
The result tap_result is a so called TAPResults that is essentially a wrapper around an Astropy votable.Table.
Asynchronous jobs (longer queries)
For slightly longer queries, typically counting or larger selections (>10000 objects), a synchronized job will fail because of timeouts (from http protocol or server settings). This is why we provide the possibility to submit asynchronous jobs. These type of jobs will run on the server side, store their results so that you can retrieve them at a later time. Choose one of the 2 queues:
- 1 min queue
- 1 hour queue
Most of the asynchronous queries will require less than than a minute. Therefore this queue is the default and should be preferred.
query_name = "subset_teff_logg"
lang = 'ADQL'
query = '''
-- Select targets with specific temperature and log g value
SELECT t1.objname,t1.sptype,t1.simbad,t1.vmag,t1.teff,t1.logg
FROM stella_vpnep.objects AS t1
WHERE t1.teff < 7500 AND t1.logg > 3.5
'''
job = tap_service.submit_job(query, language=lang, runid=query_name, queue="1m")
job.run()
# Wait to be completed (or an error occurs)
job.wait(phases=["COMPLETED", "ERROR", "ABORTED"], timeout=60.0)
print('JOB %s: %s' % (job.job.runid, job.phase))
# Fetch the results
job.raise_if_error()
print('\nfetching the results...')
tap_results = job.fetch_result()
print('...DONE\n')
As for sync jobs, the result is a TAPResults object.
Submitting a job and store job_urls for later retrieval
We first submit the query as an async job to the 1h queue,
and store the job (the url) of the newly created job into a file job_url.txt.
With this url we are able to retrieve the results (when it has finished) at any later time.
# Submit the query as an async job
query_name = "stella_teff_objects"
lang = 'ADQL'
query = '''
-- Limiting targets to those observed with STELLA and teff>4500
SELECT t1.objname,t2.sn
FROM stella_vpnep.objects AS t1
JOIN stella_vpnep.observations AS t2 ON t1.objname = t2.objname
JOIN stella_vpnep.stella_obs AS t3 ON t2.obsid = t3.obsid
WHERE t1.teff > 4500
'''
job = tap_service.submit_job(query, language=lang, runid=query_name, queue="1h")
job.run()
print('JOB %s: SUBMITTED' % (job.job.runid,))
print('JOB %s: %s' % (job.job.runid, job.phase))
# Save the job's url in a file to later retrieve results.
print('URL: %s' % (job.url,))
with open('job_url.txt', 'w') as fd:
fd.write(job.url)
Retrieve the results at a later time
In order to retrieve the results, we will first recreate the job from the job_url stored in the job_url.txt
file and verify that the job is finished, by asking for its current phase. In case the job is finished we will retrieve the results as usual.
# Recreate the job from url
with open('job_url.txt', 'r') as fd:
job_url = fd.readline()
job = pyvo.dal.AsyncTAPJob(job_url, session=tap_session)
# Check the job status
print('JOB %s: %s' % (job.job.runid, job.phase))
# if still running --> exit
if job.phase not in ("COMPLETED", "ERROR", "ABORTED"):
exit(0)
# Fetch the results
job.raise_if_error()
print('\nfetching the results...')
tap_results = job.fetch_result()
print('\n...DONE\n')
Thanks to this method you can submit a job, go for a coffee, write a paper and retrieve the results when it suits you. The job and its results are stored on the server side under your user account.
Convert result to various python types
The results obtained via the fetch_results() method returns a so called TAPResults object. The latter is essencially a votable.
In case you are not familiar with votables here is a few tricks to get back to some more general pythonic types.
-
Print the data:
python tap_results.to_table().pprint(max_lines=10)It is important to notice themax_lineskeyword, printing too many lines may crash a low-memory machine. -
Show as html (in a browser):
python tap_results.to_table().show_in_browser(max_lines=10)It is important to notice themax_lineskeyword, printing too many lines may crash a low-memory machine. -
Show in a notebook (ipython, jupyter or jupyterlab):
python tap_results.to_table().show_in_notebook(display_length=10)It is important to notice thedisplay_lengthkeyword, printing too many lines may crash a low-memory machine. -
Get a numpy array:
python np_array = tap_results.to_table().as_array() -
Get a Panda's DataFrame
python df = tap_results.to_table().to_pandas()- Get the header of DataFrame:
python df.head()
- Get the header of DataFrame:
Archiving your jobs
If you submit several large queries you may go over quota: set to 10 GB. In order to avoid to get over quota you may consider archiving your jobs. Archiving removes the data from the server side but keeps the SQL query. This allows to resubmit a query at a later time.
Deleting (Archiving) a job with pyvo can be simply done that way:
job.delete()
Archiving all COMPLETED jobs
A nice feature of the TAP service is to retrieve all jobs that are marked as COMPLETED and archive them at ones. This can be done as follows:
# Archiving all COMPLETED jobs
completed_job_descriptions = tap_service.get_job_list(phases='COMPLETED')
for job_description in completed_job_descriptions:
jobid = job_description.jobid
job_url = tap_service.baseurl + '/async/' + jobid
job = pyvo.dal.AsyncTAPJob(job_url, session=tap_session)
print('Archiving: {url}'.format(url=job_url))
job.delete() # archive job