from . import Version
Version.append ('$Revision: 95206 $')
del Version
import datetime
import time
from . import Lookup
from . import SQL
[docs]def columnar (keywords, begin=None, end=None, accept=None, mangle=False):
''' Retrieve a data set and render it in columnar format,
with one row for each keyword broadcast. Rows will be
collapsed to the nearest millisecond.
If *accept* is set, it should be a function that accepts
a row as its sole argument, and returns True if the row
should be included as a valid data point.
If *mangle* is set to True, the timestamps in the first column
will be pre-mangled so that gnuplot (which only interprets UNIX
timestamps as UTC) will render the times in the local time zone.
The results from :func:`columnar` are straightforward
to feed into a :func:`csv.writer` instance.
'''
results = retrieve (keywords, begin, end)
if len (results) == 0:
return ()
if mangle == True:
last = results[-1]['time']
utc_datetime = datetime.datetime.utcfromtimestamp (last)
local_datetime = datetime.datetime.fromtimestamp (last)
utc_offset = utc_datetime - local_datetime
utc_offset = utc_offset.days * 86400 + utc_offset.seconds
else:
utc_offset = 0
output = []
columns = {}
last = {}
last_time = None
if mangle == True:
columns[0] = 'mangled_time'
columns['mangled_time'] = 0
else:
columns[0] = 'time'
columns['time'] = 0
column = 1
column_count = 0
row = [columns[0]]
for service in keywords.keys ():
for keyword in keywords[service]:
full_name = service + '.' + keyword.upper ()
columns[full_name] = column
columns[column] = full_name
row.append (full_name)
column += 1
column_count += 1
output.append (row)
for row in results:
if accept != None and accept (row) == False:
continue
service = row['service']
keyword = row['keyword']
value = row['ascvalue']
full_name = service + '.' + keyword
this_time = "%.3f" % (row['time'] - utc_offset)
# Initial conditions: don't append to the output if we haven't
# seen all of the columns yet.
if len (last) < column_count:
last[full_name] = value
last_time = this_time
continue
if this_time != last_time:
output_row = [None,] * (column_count + 1)
output_row[0] = last_time
for last_keyword in last.keys ():
index = columns[last_keyword]
output_row[index] = last[last_keyword]
output.append (output_row)
last[full_name] = value
last_time = this_time
return output
dictresult_columns = ('time', 'service', 'keyword', 'binvalue', 'ascvalue', 'repeated', 'discarded')
def dictresult (result):
''' Take a single result and convert it to a dictionary as described
in in the commentary for :func:`retrieve`.
'''
return dict (zip (dictresult_columns, result))
[docs]def retrieve (keywords, begin=None, end=None, prime=True, dict=True, snap=False):
''' Retrieve all available records corresponding to the requested
*keywords* as of a given time range. *keywords* is expected
to be a dictionary, with KTL service names as keys, and the
keyed value a list of keyword names. The *begin* and *end*
arguments are UNIX timestamps, and will be extended by one
millisecond to avoid problems with floating point inequalities.
If *begin* is not specified, all records up to *end* will be
returned; if *end* is not specified, all records after *begin*
will be returned. If neither *begin* nor *end* is specified,
all records for that service+keyword combination will be returned.
If *prime* is True (the default) and *begin* is specified, the
most recent record before *begin* will be included in the results.
If *dict* is False, the results will be returned as a list of
lists, as is typical for the :func:`fetchall` method of a pgdb
cursor instance. The columns are as follows: service, keyword,
time, binvalue, ascvalue, repeated, discarded. If *dict* is True
(the default), each result in the sequential list will be a
dictionary instead of a list, with the column names as keys.
Results are sorted first by timestamp, then by service, and
lastly by keyword.
'''
# Offset by a millisecond to avoid unexpected behavior when using
# inequality operators against recorded timestamps.
if begin != None:
begin = float (begin)
begin = begin - 0.001
if end != None:
end = float (end)
end = end + 0.001
# See how many handlers will be required. Each service could be
# on a discrete database host.
handlers = {}
hosts = {}
for service in keywords:
try:
datahost = SQL.override['hostname']
except KeyError:
datahost = Lookup.datahost (service)
try:
hosts[datahost][service] = True
except KeyError:
hosts[datahost] = {service: True}
handlers[datahost] = SQL.Handler (hostname=datahost)
# For each requested service, confirm that the database host has a
# table with records for that service. If it does not, return no
# results. The alternative would be to raise an exception, which
# is what the database layer would normally do.
for datahost in hosts.keys ():
targets = hosts[datahost].keys ()
valid = Lookup.services (handlers[datahost])
for target in targets:
if target in valid:
pass
else:
del keywords[target]
del hosts[datahost][target]
if len (hosts[datahost]) == 0:
del hosts[datahost]
del handlers[datahost]
# Generate a per-service SELECT query; these will be UNION'd together
# on a per-handler basis when executed.
chunks = {}
for service in keywords:
chunks[service] = []
names = keywords[service]
if prime == True and begin != None:
for name in names:
name = name.upper ()
chunk = prime_query % (service, service, service, name, begin, name)
chunks[service].append (chunk)
# If making a snapshot, no further queries are required to
# satisfy the request. Move on to the next service.
if snap == True:
continue
names = "','".join (names)
names = "'%s'" % (names)
names = names.upper ()
if begin != None and end != None:
chunk = bounded_query % (service, service, begin, end, names)
elif begin == None and end == None:
chunk = unbounded_query % (service, service, names)
elif begin != None:
chunk = no_end_query % (service, service, begin, names)
elif end != None:
chunk = no_begin_query % (service, service, end, names)
chunks[service].append (chunk)
for service in chunks.keys ():
if len (chunks[service]) == 0:
del chunks[service]
if len (chunks) == 0:
# Nothing to do. Return an empty result set.
return []
# OK, enough building. Execute the queries.
results = []
for datahost in handlers:
handler = handlers[datahost]
relevant_chunks = []
for service in hosts[datahost]:
relevant_chunks.extend (chunks[service])
query = '\nUNION\n'.join (relevant_chunks)
query = "%s\nORDER BY time,service,keyword" % (query)
handler.select (query, background=True)
for handler in handlers.values ():
results.extend (handler.wait ())
if len (handlers) > 1:
# When comparing lists, Python will compare list elements
# in order until an inequality is reached. The first element
# in each row is the timestamp; the second is the service,
# then the keyword, etc. The default Python behavior will
# correctly sort the concatenated result lists from multiple
# queries.
# Letting Python handle this internally is an order of magnitude
# faster than implementing a merge of multiple ordered lists
# in pure Python; to do any better would require a Python/C
# implementation.
results.sort ()
if dict == True:
results = map (dictresult, results)
return results
[docs]def snapshot (keywords, time, dict=True):
''' Retreive a snapshot of keyword values corresponding to the
timestamp *time*. See :func:`retrieve` for an explanation
of the arguments.
'''
return retrieve (keywords, begin=time, snap=True)
[docs]def tally (keywords, begin, end=None, binning=None):
''' Return a per-keyword tally of how long each individual keyword
in the specified *keywords* had a particular value. The results
will be returned as a dictionary, with each keyword as a key;
each value in that dictionary will likewise be a dictionary,
but instead keyed by the value of the keyword itself. If *binning*
is specified, numeric results will be binned to the nearest
round value; for example, if 0.1 is specified, numeric values
will be binned to the nearest 0.1; if 10, the nearest 10. The
remainder of the arguments are interpreted the same way as they
are for :func:`retrieve`.
'''
if end == None:
end = time.time ()
if end < begin:
raise ValueError ('end time must be later than begin time')
results = retrieve (keywords, begin, end)
# Prepare the dictionary to receive values.
# Track the most recent keyword value for comparison
# when tallying.
final = {}
states = {}
for service in keywords:
list = keywords[service]
totals = {}
final[service] = totals
states[service] = {}
for keyword in list:
totals[keyword] = {}
for row in results:
service = row['service']
keyword = row['keyword']
value = row['ascvalue']
last = row['time']
try:
previous_time, previous_value = states[service][keyword]
except KeyError:
states[service][keyword] = (begin, value)
continue
states[service][keyword] = (last, value)
duration = last - previous_time
totals = final[service][keyword]
try:
totals[previous_value] += duration
except KeyError:
totals[previous_value] = duration
# The last recorded value needs to have its tallied time
# augmented by the time of its broadcast up to the end
# of the reporting window.
for service in states:
for keyword in states[service]:
totals = final[service][keyword]
last, value = states[service][keyword]
duration = end - last
try:
totals[value] += duration
except KeyError:
totals[value] = duration
# Apply rounding/binning, if requested.
if binning != None:
for service in states:
for keyword in states[service].keys ():
totals = final[service][keyword]
binned_totals = {}
for value in totals.keys ():
duration = totals[value]
try:
number = float (value)
except (TypeError, ValueError):
# Non-numeric. No binning here.
binned_totals[value] = duration
continue
# Preserve the number of decimal places
# in the ascii representation.
sides = value.split ('.')
if len (sides) == 2:
precision = len (sides[1])
else:
precision = 0
number = round (number / binning)
number = number * binning
value = "%.*f" % (precision, number)
try:
binned_totals[value] += duration
except KeyError:
binned_totals[value] = duration
final[service][keyword] = binned_totals
return final
# The following queries are used in retrieve().
bounded_query = '''
SELECT
time,'%s' as service,keyword,binvalue,ascvalue,repeated,discarded
FROM
%s
WHERE
time BETWEEN %.3f AND %.3f
AND
keyword IN ( %s )
'''
no_begin_query = '''
SELECT
time,'%s' as service,keyword,binvalue,ascvalue,repeated,discarded
FROM
%s
WHERE
time < %.3f
AND
keyword IN ( %s )
'''
no_end_query = '''
SELECT
time,'%s' as service,keyword,binvalue,ascvalue,repeated,discarded
FROM
%s
WHERE
time > %.3f
AND
keyword IN ( %s )
'''
prime_query = '''
SELECT
time,'%s' as service,keyword,binvalue,ascvalue,repeated,discarded
FROM
%s
WHERE
time = ( SELECT MAX (time) from %s WHERE keyword='%s' AND time < %.3f )
AND
keyword='%s'
'''
unbounded_query = '''
SELECT
'%s' as service,time,keyword,binvalue,ascvalue,repeated,discarded
FROM
%s
WHERE
keyword IN ( %s )
'''
# vim: set expandtab tabstop=8 softtabstop=4 shiftwidth=4 autoindent: