# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# https://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import re
from collections import namedtuple
from boto3.exceptions import (
DynamoDBNeedsConditionError,
DynamoDBNeedsKeyConditionError,
DynamoDBOperationNotSupportedError,
)
ATTR_NAME_REGEX = re.compile(r'[^.\[\]]+(?![^\[]*\])')
class ConditionBase:
expression_format = ''
expression_operator = ''
has_grouped_values = False
def __init__(self, *values):
self._values = values
def __and__(self, other):
if not isinstance(other, ConditionBase):
raise DynamoDBOperationNotSupportedError('AND', other)
return And(self, other)
def __or__(self, other):
if not isinstance(other, ConditionBase):
raise DynamoDBOperationNotSupportedError('OR', other)
return Or(self, other)
def __invert__(self):
return Not(self)
def get_expression(self):
return {
'format': self.expression_format,
'operator': self.expression_operator,
'values': self._values,
}
def __eq__(self, other):
if isinstance(other, type(self)):
if self._values == other._values:
return True
return False
def __ne__(self, other):
return not self.__eq__(other)
class AttributeBase:
def __init__(self, name):
self.name = name
def __and__(self, value):
raise DynamoDBOperationNotSupportedError('AND', self)
def __or__(self, value):
raise DynamoDBOperationNotSupportedError('OR', self)
def __invert__(self):
raise DynamoDBOperationNotSupportedError('NOT', self)
def eq(self, value):
"""Creates a condition where the attribute is equal to the value.
:param value: The value that the attribute is equal to.
"""
return Equals(self, value)
def lt(self, value):
"""Creates a condition where the attribute is less than the value.
:param value: The value that the attribute is less than.
"""
return LessThan(self, value)
def lte(self, value):
"""Creates a condition where the attribute is less than or equal to the
value.
:param value: The value that the attribute is less than or equal to.
"""
return LessThanEquals(self, value)
def gt(self, value):
"""Creates a condition where the attribute is greater than the value.
:param value: The value that the attribute is greater than.
"""
return GreaterThan(self, value)
def gte(self, value):
"""Creates a condition where the attribute is greater than or equal to
the value.
:param value: The value that the attribute is greater than or equal to.
"""
return GreaterThanEquals(self, value)
def begins_with(self, value):
"""Creates a condition where the attribute begins with the value.
:param value: The value that the attribute begins with.
"""
return BeginsWith(self, value)
def between(self, low_value, high_value):
"""Creates a condition where the attribute is greater than or equal
to the low value and less than or equal to the high value.
:param low_value: The value that the attribute is greater than or equal to.
:param high_value: The value that the attribute is less than or equal to.
"""
return Between(self, low_value, high_value)
def __eq__(self, other):
return isinstance(other, type(self)) and self.name == other.name
def __ne__(self, other):
return not self.__eq__(other)
class ConditionAttributeBase(ConditionBase, AttributeBase):
"""This base class is for conditions that can have attribute methods.
One example is the Size condition. To complete a condition, you need
to apply another AttributeBase method like eq().
"""
def __init__(self, *values):
ConditionBase.__init__(self, *values)
# This is assuming the first value to the condition is the attribute
# in which can be used to generate its attribute base.
AttributeBase.__init__(self, values[0].name)
def __eq__(self, other):
return ConditionBase.__eq__(self, other) and AttributeBase.__eq__(
self, other
)
def __ne__(self, other):
return not self.__eq__(other)
class ComparisonCondition(ConditionBase):
expression_format = '{0} {operator} {1}'
class Equals(ComparisonCondition):
expression_operator = '='
class NotEquals(ComparisonCondition):
expression_operator = '<>'
class LessThan(ComparisonCondition):
expression_operator = '<'
class LessThanEquals(ComparisonCondition):
expression_operator = '<='
class GreaterThan(ComparisonCondition):
expression_operator = '>'
class GreaterThanEquals(ComparisonCondition):
expression_operator = '>='
class In(ComparisonCondition):
expression_operator = 'IN'
has_grouped_values = True
class Between(ConditionBase):
expression_operator = 'BETWEEN'
expression_format = '{0} {operator} {1} AND {2}'
class BeginsWith(ConditionBase):
expression_operator = 'begins_with'
expression_format = '{operator}({0}, {1})'
class Contains(ConditionBase):
expression_operator = 'contains'
expression_format = '{operator}({0}, {1})'
class Size(ConditionAttributeBase):
expression_operator = 'size'
expression_format = '{operator}({0})'
class AttributeType(ConditionBase):
expression_operator = 'attribute_type'
expression_format = '{operator}({0}, {1})'
class AttributeExists(ConditionBase):
expression_operator = 'attribute_exists'
expression_format = '{operator}({0})'
class AttributeNotExists(ConditionBase):
expression_operator = 'attribute_not_exists'
expression_format = '{operator}({0})'
class And(ConditionBase):
expression_operator = 'AND'
expression_format = '({0} {operator} {1})'
class Or(ConditionBase):
expression_operator = 'OR'
expression_format = '({0} {operator} {1})'
class Not(ConditionBase):
expression_operator = 'NOT'
expression_format = '({operator} {0})'
[docs]class Key(AttributeBase):
pass
[docs]class Attr(AttributeBase):
"""Represents an DynamoDB item's attribute."""
[docs] def ne(self, value):
"""Creates a condition where the attribute is not equal to the value
:param value: The value that the attribute is not equal to.
"""
return NotEquals(self, value)
[docs] def is_in(self, value):
"""Creates a condition where the attribute is in the value,
:type value: list
:param value: The value that the attribute is in.
"""
return In(self, value)
[docs] def exists(self):
"""Creates a condition where the attribute exists."""
return AttributeExists(self)
[docs] def not_exists(self):
"""Creates a condition where the attribute does not exist."""
return AttributeNotExists(self)
[docs] def contains(self, value):
"""Creates a condition where the attribute contains the value.
:param value: The value the attribute contains.
"""
return Contains(self, value)
[docs] def size(self):
"""Creates a condition for the attribute size.
Note another AttributeBase method must be called on the returned
size condition to be a valid DynamoDB condition.
"""
return Size(self)
[docs] def attribute_type(self, value):
"""Creates a condition for the attribute type.
:param value: The type of the attribute.
"""
return AttributeType(self, value)
BuiltConditionExpression = namedtuple(
'BuiltConditionExpression',
[
'condition_expression',
'attribute_name_placeholders',
'attribute_value_placeholders',
],
)
class ConditionExpressionBuilder:
"""This class is used to build condition expressions with placeholders"""
def __init__(self):
self._name_count = 0
self._value_count = 0
self._name_placeholder = 'n'
self._value_placeholder = 'v'
def _get_name_placeholder(self):
return '#' + self._name_placeholder + str(self._name_count)
def _get_value_placeholder(self):
return ':' + self._value_placeholder + str(self._value_count)
def reset(self):
"""Resets the placeholder name and values"""
self._name_count = 0
self._value_count = 0
def build_expression(self, condition, is_key_condition=False):
"""Builds the condition expression and the dictionary of placeholders.
:type condition: ConditionBase
:param condition: A condition to be built into a condition expression
string with any necessary placeholders.
:type is_key_condition: Boolean
:param is_key_condition: True if the expression is for a
KeyConditionExpression. False otherwise.
:rtype: (string, dict, dict)
:returns: Will return a string representing the condition with
placeholders inserted where necessary, a dictionary of
placeholders for attribute names, and a dictionary of
placeholders for attribute values. Here is a sample return value:
('#n0 = :v0', {'#n0': 'myattribute'}, {':v1': 'myvalue'})
"""
if not isinstance(condition, ConditionBase):
raise DynamoDBNeedsConditionError(condition)
attribute_name_placeholders = {}
attribute_value_placeholders = {}
condition_expression = self._build_expression(
condition,
attribute_name_placeholders,
attribute_value_placeholders,
is_key_condition=is_key_condition,
)
return BuiltConditionExpression(
condition_expression=condition_expression,
attribute_name_placeholders=attribute_name_placeholders,
attribute_value_placeholders=attribute_value_placeholders,
)
def _build_expression(
self,
condition,
attribute_name_placeholders,
attribute_value_placeholders,
is_key_condition,
):
expression_dict = condition.get_expression()
replaced_values = []
for value in expression_dict['values']:
# Build the necessary placeholders for that value.
# Placeholders are built for both attribute names and values.
replaced_value = self._build_expression_component(
value,
attribute_name_placeholders,
attribute_value_placeholders,
condition.has_grouped_values,
is_key_condition,
)
replaced_values.append(replaced_value)
# Fill out the expression using the operator and the
# values that have been replaced with placeholders.
return expression_dict['format'].format(
*replaced_values, operator=expression_dict['operator']
)
def _build_expression_component(
self,
value,
attribute_name_placeholders,
attribute_value_placeholders,
has_grouped_values,
is_key_condition,
):
# Continue to recurse if the value is a ConditionBase in order
# to extract out all parts of the expression.
if isinstance(value, ConditionBase):
return self._build_expression(
value,
attribute_name_placeholders,
attribute_value_placeholders,
is_key_condition,
)
# If it is not a ConditionBase, we can recurse no further.
# So we check if it is an attribute and add placeholders for
# its name
elif isinstance(value, AttributeBase):
if is_key_condition and not isinstance(value, Key):
raise DynamoDBNeedsKeyConditionError(
f'Attribute object {value.name} is of type {type(value)}. '
f'KeyConditionExpression only supports Attribute objects '
f'of type Key'
)
return self._build_name_placeholder(
value, attribute_name_placeholders
)
# If it is anything else, we treat it as a value and thus placeholders
# are needed for the value.
else:
return self._build_value_placeholder(
value, attribute_value_placeholders, has_grouped_values
)
def _build_name_placeholder(self, value, attribute_name_placeholders):
attribute_name = value.name
# Figure out which parts of the attribute name that needs replacement.
attribute_name_parts = ATTR_NAME_REGEX.findall(attribute_name)
# Add a temporary placeholder for each of these parts.
placeholder_format = ATTR_NAME_REGEX.sub('%s', attribute_name)
str_format_args = []
for part in attribute_name_parts:
name_placeholder = self._get_name_placeholder()
self._name_count += 1
str_format_args.append(name_placeholder)
# Add the placeholder and value to dictionary of name placeholders.
attribute_name_placeholders[name_placeholder] = part
# Replace the temporary placeholders with the designated placeholders.
return placeholder_format % tuple(str_format_args)
def _build_value_placeholder(
self, value, attribute_value_placeholders, has_grouped_values=False
):
# If the values are grouped, we need to add a placeholder for
# each element inside of the actual value.
if has_grouped_values:
placeholder_list = []
for v in value:
value_placeholder = self._get_value_placeholder()
self._value_count += 1
placeholder_list.append(value_placeholder)
attribute_value_placeholders[value_placeholder] = v
# Assuming the values are grouped by parenthesis.
# IN is the currently the only one that uses this so it maybe
# needed to be changed in future.
return '(' + ', '.join(placeholder_list) + ')'
# Otherwise, treat the value as a single value that needs only
# one placeholder.
else:
value_placeholder = self._get_value_placeholder()
self._value_count += 1
attribute_value_placeholders[value_placeholder] = value
return value_placeholder