Source code for elastalk.connect

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Created on 1/12/19 by Pat Blair
"""
.. currentmodule:: elastalk.client
.. moduleauthor:: Pat Daburu <pat@daburu.net>

Start a conversation with Elasticsearch!
"""
import base64
import binascii
import json
import logging
from typing import Any, Dict
import elasticsearch
from .config import ElastalkConf, ElastalkConfigException


__logger__: logging.Logger = logging.getLogger(__name__)  #: the module logger


[docs]class ElastalkConnection(object): """ Defines an Elasticsearch environment. """
[docs] def __init__(self, config: ElastalkConf = None): """ :param config: the configuration """ self._config: ElastalkConf = ( config if config else ElastalkConf() ) #: the configuration # The Elasticsearch client will be created on demand. self._client: elasticsearch.Elasticsearch or None = None
@property def config(self) -> ElastalkConf: """ Get the connection configuration. :return: the connection configuration """ return self._config @config.setter def config(self, value: ElastalkConf): """ Set the configuration object. (Setting this property will :ref:`reset <:py:func:ElastalkConnection.reset>` the connection. :param value: the new configuration """ self._config = value self.reset() @property def client(self) -> elasticsearch.Elasticsearch: """ Get the Elasticsearch client. :return: the Elasticsearch client :raises ElasticsearchConfigurationException: if there is an error in the current configuration """ # If we've already created the client... if self._client: return self._client # ...just send it back. # Create a list of the configured seed hosts. if not self.config.seeds: raise ElastalkConfigException( 'No seed hosts have been defined.' ) # Otherwise we need to create it. self._client = elasticsearch.Elasticsearch( list(self.config.seeds), sniff_on_start=self.config.sniff_on_start, sniff_on_connection_fail=self.config.sniff_on_connection_fail, sniffer_timeout=self.config.sniffer_timeout, maxsize=self.config.maxsize ) return self._client
[docs] def reset(self): """Reset the connection.""" self._client = None
[docs] def pack(self, doc: Dict, index: str) -> Dict[str, Any]: """ Convert a document object into a BLOB document. :param doc: the original document object :param index: the name of the index *(This is optional, but if you supply it the behavior configured for the index can be used.)* :return: the BLOB document """ # If blobbing isn't enabled for this index... if not self.config.blobs_enabled(index=index): return doc # ...we don't need to do anything further. # Get the key used to hold blobbed data. blob_key = self.config.blob_key(index=index) # Get the list of document properties that should be excluded from # blobbed data. blob_exclusions = self.config.blob_exclusions(index=index) # Create a dictionary to hold... blob_ = {} # ...the blob-able keys, and another to hold... noblob_ = {} # ...the excluded keys. # Sort out which keys will go in the blob and which are excluded. for k, v in doc.items(): if k in blob_exclusions: noblob_[k] = v else: blob_[k] = v # Recombine the blobbed data with the un-blobbed data. return { **noblob_, blob_key: _encode(blob_) }
[docs] def unpack(self, doc: Dict, index: str = None) -> Dict: """ Convert a :py:func:`packed <pack>` document to its original form. :param doc: the packed document :param index: the name of the index for which the packed document came :return: the unpacked document """ # Get the key we should use to store blobbed data. blob_key = self.config.blob_key(index=index) # Try to get the document. blob = doc.get(blob_key) # If there is no blob in the document... if not blob: return doc # ...we can simply return the original document. # Decode the blob and combine it with the rest of the document. return { **{k: v for k, v in doc.items() if k != blob_key}, **_decode(blob) }
[docs] @staticmethod def default(cnx: 'ElastalkConnection' = None) -> 'ElastalkConnection': """ Set and/or retrieve the default connection object. :param cnx: Provide a new connection object if you want to change the default. Otherwise, leave this argument out to retrieve the current object. :return: the default connection object """ if cnx: setattr(ElastalkConnection, '__default__', cnx) return cnx try: return getattr(ElastalkConnection, '__default__') except AttributeError: _cnx = ElastalkConnection() return ElastalkConnection.default(cnx=_cnx)
def _encode(doc: Dict) -> str: """ Encode a dictionary (document) as a base64-encoded string. :param doc: the dictionary (document) object :return: the base64-encoded string """ # Convert the object to a string. as_str: str = json.dumps(doc) # Encode the string using base64. enc_bytes: bytes = base64.b64encode(bytes(as_str, 'utf-8')) # Get the ASCII representation of the base64-encoded bytes. enc_ascii: bytes = binascii.b2a_base64(enc_bytes, newline=False) # Return the string form of the ASCII representation of the base-64 # encoded bytes. return enc_ascii.decode('utf-8') def _decode(encoded: bytes) -> Dict: """ Decode an dictionary (document) encoded as a base64-encoded string. :param encoded: the base64-encoded string :return: the dictionary (document) object """ dec_ascii: bytes = encoded dec_ascii_bytes: bytes = binascii.a2b_base64(dec_ascii) dec_bytes: bytes = base64.b64decode(dec_ascii_bytes) return json.loads(dec_bytes.decode('utf-8'))
[docs]class ElastalkMixin: """ Mix this into your class to get easy access to the Elasticsearch client. """ @property def es_cnx(self) -> ElastalkConnection: """ Get the Elastalk connection object. :return: the Elastalk connection object """ try: return getattr(self, '__elastalk_connection__') except AttributeError: _cnx = ElastalkConnection.default() setattr(self, '__elastalk_connection__', _cnx) return _cnx @es_cnx.setter def es_cnx(self, cnx: ElastalkConnection): """ Set the mixin's Elastalk connection connection object. :param cnx: the object """ setattr(self, '__elastalk_connection__', cnx) @property def es(self) -> elasticsearch.Elasticsearch: """Get the Elasticsearch client.""" return self.es_cnx.client