Source code for keygrabber.Retrieve

from . import Version
Version.append ('$Revision: 95206 $')
del Version


import datetime
import time

from . import Lookup
from . import SQL


[docs]def columnar (keywords, begin=None, end=None, accept=None, mangle=False): ''' Retrieve a data set and render it in columnar format, with one row for each keyword broadcast. Rows will be collapsed to the nearest millisecond. If *accept* is set, it should be a function that accepts a row as its sole argument, and returns True if the row should be included as a valid data point. If *mangle* is set to True, the timestamps in the first column will be pre-mangled so that gnuplot (which only interprets UNIX timestamps as UTC) will render the times in the local time zone. The results from :func:`columnar` are straightforward to feed into a :func:`csv.writer` instance. ''' results = retrieve (keywords, begin, end) if len (results) == 0: return () if mangle == True: last = results[-1]['time'] utc_datetime = datetime.datetime.utcfromtimestamp (last) local_datetime = datetime.datetime.fromtimestamp (last) utc_offset = utc_datetime - local_datetime utc_offset = utc_offset.days * 86400 + utc_offset.seconds else: utc_offset = 0 output = [] columns = {} last = {} last_time = None if mangle == True: columns[0] = 'mangled_time' columns['mangled_time'] = 0 else: columns[0] = 'time' columns['time'] = 0 column = 1 column_count = 0 row = [columns[0]] for service in keywords.keys (): for keyword in keywords[service]: full_name = service + '.' + keyword.upper () columns[full_name] = column columns[column] = full_name row.append (full_name) column += 1 column_count += 1 output.append (row) for row in results: if accept != None and accept (row) == False: continue service = row['service'] keyword = row['keyword'] value = row['ascvalue'] full_name = service + '.' + keyword this_time = "%.3f" % (row['time'] - utc_offset) # Initial conditions: don't append to the output if we haven't # seen all of the columns yet. if len (last) < column_count: last[full_name] = value last_time = this_time continue if this_time != last_time: output_row = [None,] * (column_count + 1) output_row[0] = last_time for last_keyword in last.keys (): index = columns[last_keyword] output_row[index] = last[last_keyword] output.append (output_row) last[full_name] = value last_time = this_time return output
dictresult_columns = ('time', 'service', 'keyword', 'binvalue', 'ascvalue', 'repeated', 'discarded') def dictresult (result): ''' Take a single result and convert it to a dictionary as described in in the commentary for :func:`retrieve`. ''' return dict (zip (dictresult_columns, result))
[docs]def retrieve (keywords, begin=None, end=None, prime=True, dict=True, snap=False): ''' Retrieve all available records corresponding to the requested *keywords* as of a given time range. *keywords* is expected to be a dictionary, with KTL service names as keys, and the keyed value a list of keyword names. The *begin* and *end* arguments are UNIX timestamps, and will be extended by one millisecond to avoid problems with floating point inequalities. If *begin* is not specified, all records up to *end* will be returned; if *end* is not specified, all records after *begin* will be returned. If neither *begin* nor *end* is specified, all records for that service+keyword combination will be returned. If *prime* is True (the default) and *begin* is specified, the most recent record before *begin* will be included in the results. If *dict* is False, the results will be returned as a list of lists, as is typical for the :func:`fetchall` method of a pgdb cursor instance. The columns are as follows: service, keyword, time, binvalue, ascvalue, repeated, discarded. If *dict* is True (the default), each result in the sequential list will be a dictionary instead of a list, with the column names as keys. Results are sorted first by timestamp, then by service, and lastly by keyword. ''' # Offset by a millisecond to avoid unexpected behavior when using # inequality operators against recorded timestamps. if begin != None: begin = float (begin) begin = begin - 0.001 if end != None: end = float (end) end = end + 0.001 # See how many handlers will be required. Each service could be # on a discrete database host. handlers = {} hosts = {} for service in keywords: try: datahost = SQL.override['hostname'] except KeyError: datahost = Lookup.datahost (service) try: hosts[datahost][service] = True except KeyError: hosts[datahost] = {service: True} handlers[datahost] = SQL.Handler (hostname=datahost) # For each requested service, confirm that the database host has a # table with records for that service. If it does not, return no # results. The alternative would be to raise an exception, which # is what the database layer would normally do. for datahost in hosts.keys (): targets = hosts[datahost].keys () valid = Lookup.services (handlers[datahost]) for target in targets: if target in valid: pass else: del keywords[target] del hosts[datahost][target] if len (hosts[datahost]) == 0: del hosts[datahost] del handlers[datahost] # Generate a per-service SELECT query; these will be UNION'd together # on a per-handler basis when executed. chunks = {} for service in keywords: chunks[service] = [] names = keywords[service] if prime == True and begin != None: for name in names: name = name.upper () chunk = prime_query % (service, service, service, name, begin, name) chunks[service].append (chunk) # If making a snapshot, no further queries are required to # satisfy the request. Move on to the next service. if snap == True: continue names = "','".join (names) names = "'%s'" % (names) names = names.upper () if begin != None and end != None: chunk = bounded_query % (service, service, begin, end, names) elif begin == None and end == None: chunk = unbounded_query % (service, service, names) elif begin != None: chunk = no_end_query % (service, service, begin, names) elif end != None: chunk = no_begin_query % (service, service, end, names) chunks[service].append (chunk) for service in chunks.keys (): if len (chunks[service]) == 0: del chunks[service] if len (chunks) == 0: # Nothing to do. Return an empty result set. return [] # OK, enough building. Execute the queries. results = [] for datahost in handlers: handler = handlers[datahost] relevant_chunks = [] for service in hosts[datahost]: relevant_chunks.extend (chunks[service]) query = '\nUNION\n'.join (relevant_chunks) query = "%s\nORDER BY time,service,keyword" % (query) handler.select (query, background=True) for handler in handlers.values (): results.extend (handler.wait ()) if len (handlers) > 1: # When comparing lists, Python will compare list elements # in order until an inequality is reached. The first element # in each row is the timestamp; the second is the service, # then the keyword, etc. The default Python behavior will # correctly sort the concatenated result lists from multiple # queries. # Letting Python handle this internally is an order of magnitude # faster than implementing a merge of multiple ordered lists # in pure Python; to do any better would require a Python/C # implementation. results.sort () if dict == True: results = map (dictresult, results) return results
[docs]def snapshot (keywords, time, dict=True): ''' Retreive a snapshot of keyword values corresponding to the timestamp *time*. See :func:`retrieve` for an explanation of the arguments. ''' return retrieve (keywords, begin=time, snap=True)
[docs]def tally (keywords, begin, end=None, binning=None): ''' Return a per-keyword tally of how long each individual keyword in the specified *keywords* had a particular value. The results will be returned as a dictionary, with each keyword as a key; each value in that dictionary will likewise be a dictionary, but instead keyed by the value of the keyword itself. If *binning* is specified, numeric results will be binned to the nearest round value; for example, if 0.1 is specified, numeric values will be binned to the nearest 0.1; if 10, the nearest 10. The remainder of the arguments are interpreted the same way as they are for :func:`retrieve`. ''' if end == None: end = time.time () if end < begin: raise ValueError ('end time must be later than begin time') results = retrieve (keywords, begin, end) # Prepare the dictionary to receive values. # Track the most recent keyword value for comparison # when tallying. final = {} states = {} for service in keywords: list = keywords[service] totals = {} final[service] = totals states[service] = {} for keyword in list: totals[keyword] = {} for row in results: service = row['service'] keyword = row['keyword'] value = row['ascvalue'] last = row['time'] try: previous_time, previous_value = states[service][keyword] except KeyError: states[service][keyword] = (begin, value) continue states[service][keyword] = (last, value) duration = last - previous_time totals = final[service][keyword] try: totals[previous_value] += duration except KeyError: totals[previous_value] = duration # The last recorded value needs to have its tallied time # augmented by the time of its broadcast up to the end # of the reporting window. for service in states: for keyword in states[service]: totals = final[service][keyword] last, value = states[service][keyword] duration = end - last try: totals[value] += duration except KeyError: totals[value] = duration # Apply rounding/binning, if requested. if binning != None: for service in states: for keyword in states[service].keys (): totals = final[service][keyword] binned_totals = {} for value in totals.keys (): duration = totals[value] try: number = float (value) except (TypeError, ValueError): # Non-numeric. No binning here. binned_totals[value] = duration continue # Preserve the number of decimal places # in the ascii representation. sides = value.split ('.') if len (sides) == 2: precision = len (sides[1]) else: precision = 0 number = round (number / binning) number = number * binning value = "%.*f" % (precision, number) try: binned_totals[value] += duration except KeyError: binned_totals[value] = duration final[service][keyword] = binned_totals return final
# The following queries are used in retrieve(). bounded_query = ''' SELECT time,'%s' as service,keyword,binvalue,ascvalue,repeated,discarded FROM %s WHERE time BETWEEN %.3f AND %.3f AND keyword IN ( %s ) ''' no_begin_query = ''' SELECT time,'%s' as service,keyword,binvalue,ascvalue,repeated,discarded FROM %s WHERE time < %.3f AND keyword IN ( %s ) ''' no_end_query = ''' SELECT time,'%s' as service,keyword,binvalue,ascvalue,repeated,discarded FROM %s WHERE time > %.3f AND keyword IN ( %s ) ''' prime_query = ''' SELECT time,'%s' as service,keyword,binvalue,ascvalue,repeated,discarded FROM %s WHERE time = ( SELECT MAX (time) from %s WHERE keyword='%s' AND time < %.3f ) AND keyword='%s' ''' unbounded_query = ''' SELECT '%s' as service,time,keyword,binvalue,ascvalue,repeated,discarded FROM %s WHERE keyword IN ( %s ) ''' # vim: set expandtab tabstop=8 softtabstop=4 shiftwidth=4 autoindent: