#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Created on 1/12/19 by Pat Blair
"""
.. currentmodule:: elastalk.config
.. moduleauthor:: Pat Daburu <pat@daburu.net>
Make things work the way you want!
"""
from functools import lru_cache
import importlib
import json
import logging
import os
from pathlib import Path
import uuid
from typing import Iterable, Dict, Set
from dataclasses import dataclass, field
import toml
#: the module logger
__logger__: logging.Logger = logging.getLogger(__name__)
class _Defaults:
"""
Module-level default values.
"""
blob_key: str = '_blob' #: the default key for blobs
[docs]class ElastalkConfigException(Exception):
"""Raised when a configuration error is detected."""
[docs]@dataclass
class BlobConf:
"""
Define blobbing parameters.
"""
#: indicates whether or not blobbing is enabled.
enabled: bool = None
#: the excluded top-level document keys
excluded: Set[str] = field(default_factory=set)
#: the key that stores blobbed values in packed documents
key: str = None
[docs] def exclude(self, *keys: str):
"""
Add to the set of excluded document keys.
:param keys: the excluded document keys
"""
self.excluded.update(keys) # pylint: disable=no-member
[docs] @classmethod
def load(cls, dict_: Dict) -> 'BlobConf' or None:
"""
Create an instance of the class from a dictionary.
:param dict_: the dictionary
:return: the instance
"""
# If we don't get any input...
if not dict_:
# ...we give nothing back.
return None
# Get the attributes to be excluded from blobbing (if there are any).
_excluded = dict_.get('excluded')
# Compile a dictionary of constructor arguments.
cargs = {
k: v for k, v in
{
'enabled': dict_.get('enabled'),
'excluded': set(_excluded) if _excluded else None,
'key': dict_.get('key')
}.items() if v is not None
}
# Create the instance and return it.
return cls(**cargs)
[docs]@dataclass
class IndexConf:
"""
Define index-specific configuration settings.
"""
blobs: BlobConf = BlobConf() #: blobbing configuration for the index
#: the path to Elasticsearch mappings for the configuration
mappings: str = None
[docs] def mappings_document(self, root: Path = None) -> dict or None:
"""
Get the contents of the index mapping document (if one is defined).
:param root: the root path that contains the document file
:return: the index mapping document (or `None` if one isn't defined)
"""
# If no mapping document is defined...
if not self.mappings:
return None # ...that's that.
# Determine the mappings path.
mappings_path: Path = Path(self.mappings)
# Figure out what the full path to the document is.
_root = root if root else Path.cwd()
full_path = (
mappings_path
if mappings_path.is_absolute()
else (_root / mappings_path).resolve()
)
# If a mapping document *is* defined, but the file doesn't exist...
if not full_path.exists():
__logger__.warning(f"{mappings_path} does not exist.")
return None # ..there isn't much more we can do.
# Read the text in the mappings document.
return json.loads(full_path.read_text())
[docs] @classmethod
def load(cls, dict_: Dict) -> 'IndexConf' or None:
"""
Create an instance of the class from a dictionary.
:param dict_: the dictionary
:return: the instance
"""
# If we don't get any input...
if not dict_:
# ...we give nothing back.
return None
# Get the value of the 'mappings' key.
_mappings = dict_.get('mappings')
# Compile a dictionary of constructor arguments.
cargs = {
k: v for k, v in
{
'blobs': BlobConf.load(dict_.get('blobs')),
'mappings': _mappings if _mappings else None
}.items() if v is not None
}
# Create the instance and return it.
return cls(**cargs)
[docs]@dataclass
class ElastalkConf: # pylint: disable=unsubscriptable-object, unsupported-assignment-operation
"""
Configuration options for an Elastalk and the Elasticsearch client.
.. seealso:
* :py:func:`client`
* `Sniffing <https://bit.ly/2UTKyHh>`_
* `Mapping <https://bit.ly/2EAzfir>`_
"""
seeds: Iterable[str] = field(
default_factory=lambda: [
s.strip()
for s
in os.environ.get('ES_HOSTS', '127.0.0.1').split(',')
]
) #: the Elasticsearch seed hosts
sniff_on_start: bool = True #: Start sniffing on startup?
sniff_on_connection_fail: bool = True #: Sniff when the connection fails?
sniffer_timeout: int = 60 #: the sniffer timeout
maxsize: int = 10 #: the maximum number of connections
mapping_field_limit: int = 1000 #: the maximum number of mapped fields
blobs: BlobConf = BlobConf() #: global BLOB behavior configuration
indexes: Dict[str, IndexConf] = field(
default_factory=dict
) #: index-specific configurations
[docs] @lru_cache(maxsize=128)
def blobs_enabled(self, index: str = None) -> bool:
"""
Determine whether or not blobbing is enabled for an index.
:param index: the name of the index
:return: `True` if blobbing is enabled, otherwise `False`
"""
try:
# Get the value configured for the index.
idx_value = self.indexes[index].blobs.enabled
# If the index has a configured value, use it. Otherwise use the
# global version.
if idx_value is not None:
return idx_value
except KeyError:
pass
# If we got to this point, just return the configured value.
return False if self.blobs.enabled is None else self.blobs.enabled
[docs] @lru_cache(maxsize=128)
def blob_exclusions(self, index: str = None) -> Set[str]:
"""
Get the full set of top-level document properties that should be
excluded from blobs for a given index. If you don't supply the `index`
parameter, the method returns the global exclusions.
:param index: the name of the index
:return: the set of excluded property names
"""
if not index:
return self.blobs.excluded
try:
return (
self.blobs.excluded |
self.indexes[index].blobs.excluded
)
except KeyError:
return self.blobs.excluded
[docs] def blob_key(self, index: str = None) -> str:
"""
Get the configured document key for blobbed data. (If you don't
supply the index, the method returns the global configuration value.
If there is no global configuration value, the method returns the
default.)
:param index: the name of the index
:return: the blobbed data key
"""
key: str = None
try:
key = self.indexes[index].blobs.key
except KeyError:
pass
key = key if key else self.blobs.key
return key if key else _Defaults.blob_key
[docs] def from_object(self, o: str) -> 'ElastalkConf':
"""
Update the configuration from an object.
:param o: the configuration object
"""
# Split up the parts of the configuration object name.
name_parts = o.split('.')
# Import the module.
mod = importlib.import_module('.'.join(name_parts[:-1]))
# Get the configuration class.
cls = getattr(mod, name_parts[-1])
# Retrieve the hosts (if there are any).
es_hosts = getattr(cls, 'ES_HOSTS', self.seeds)
# The seeds might already be a list (if they're defined in code) or
# they might be expressed as a comma-separated list. We'll account
# for both...
self.seeds = [
h.strip() for h in es_hosts.split(',')
] if isinstance(es_hosts, str) else list(es_hosts)
# Configure other parameters.
for t in [
('ES_SNIFF_ON_START', '_sniff_on_start', bool),
(
'ES_SNIFF_ON_CONNECTION_FAIL',
'sniff_on_connection_fail',
bool
),
('ES_SNIFFER_TIMEOUT', 'sniffer_timeout', int),
('ES_MAXSIZE', 'maxsize', int),
('ES_MAPPING_FIELD_LIMIT', 'mapping_field_limit', int),
]:
o_val = getattr(cls, t[0], None)
if o_val is not None:
self_val = t[2](o_val)
setattr(self, t[1], self_val)
# Let's see if there are any blobbing directives.
blobs = BlobConf.load(getattr(o, 'ES_BLOBS', None))
# If there are...
if blobs:
# ...we'll use 'em.
self.blobs = blobs
# Look for index-specific settings.
indexes: dict = getattr(o, 'ES_INDEXES', None)
if indexes:
for index in indexes.keys():
self.indexes[index] = IndexConf.load(indexes[index])
# Return this instance to the caller (for more fluidity in the calling
# code).
return self
[docs] def from_toml(self, toml_: Path or str) -> 'ElastalkConf':
"""
Update the configuration from a TOML configuration.
:param toml_: the path to the file or the TOML configuration string
"""
# Whether we were passed a string or file path, get the TOML text and
# parse it.
_toml: dict = toml.loads(
toml_.read_text()
if isinstance(toml_, Path)
else toml_
)
# Retrieve the hosts (if there are any).
_seeds = getattr(_toml, 'seeds', self.seeds)
# The seeds might already be a list (if they're defined in code) or
# they might be expressed as a comma-separated list. We'll account
# for both...
self.seeds = [
h.strip() for h in _seeds.split(',')
] if isinstance(_seeds, str) else list(_seeds)
# Let's see if there are any blobbing directives.
blobs = BlobConf.load(_toml.get('blobs'))
# If there are...
if blobs:
# ...we'll use 'em.
self.blobs = blobs
# Look for index-specific settings.
indexes: dict = _toml.get('indexes')
if indexes:
for index in indexes.keys():
self.indexes[index] = IndexConf.load(indexes[index])
# Load the top-level configuration values.
for att in [
'sniff_on_start',
'sniff_on_connection_fail',
'sniffer_timeout',
'maxsize',
'mapping_field_limit'
]:
value = _toml.get(att)
if value is not None:
setattr(self, att, value)
# Return this instance to the caller (for more fluidity in the calling
# code).
return self
def __hash__(self):
try:
return getattr(self, '_hash')
except AttributeError:
hsh = uuid.uuid4().int
setattr(self, '_hash', hsh)
return hsh
def __eq__(self, other):
return self.__hash__() == hash(other) if other else False
def __ne__(self, other):
return not self.__eq__(other)