# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.## Licensed under the Apache License, Version 2.0 (the "License"). You# may not use this file except in compliance with the License. A copy of# the License is located at## https://aws.amazon.com/apache2.0/## or in the "license" file accompanying this file. This file is# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF# ANY KIND, either express or implied. See the License for the specific# language governing permissions and limitations under the License.importrefromcollectionsimportnamedtuplefromboto3.exceptionsimport(DynamoDBNeedsConditionError,DynamoDBNeedsKeyConditionError,DynamoDBOperationNotSupportedError,)ATTR_NAME_REGEX=re.compile(r'[^.\[\]]+(?![^\[]*\])')classConditionBase:expression_format=''expression_operator=''has_grouped_values=Falsedef__init__(self,*values):self._values=valuesdef__and__(self,other):ifnotisinstance(other,ConditionBase):raiseDynamoDBOperationNotSupportedError('AND',other)returnAnd(self,other)def__or__(self,other):ifnotisinstance(other,ConditionBase):raiseDynamoDBOperationNotSupportedError('OR',other)returnOr(self,other)def__invert__(self):returnNot(self)defget_expression(self):return{'format':self.expression_format,'operator':self.expression_operator,'values':self._values,}def__eq__(self,other):ifisinstance(other,type(self)):ifself._values==other._values:returnTruereturnFalsedef__ne__(self,other):returnnotself.__eq__(other)classAttributeBase:def__init__(self,name):self.name=namedef__and__(self,value):raiseDynamoDBOperationNotSupportedError('AND',self)def__or__(self,value):raiseDynamoDBOperationNotSupportedError('OR',self)def__invert__(self):raiseDynamoDBOperationNotSupportedError('NOT',self)defeq(self,value):"""Creates a condition where the attribute is equal to the value. :param value: The value that the attribute is equal to. """returnEquals(self,value)deflt(self,value):"""Creates a condition where the attribute is less than the value. :param value: The value that the attribute is less than. """returnLessThan(self,value)deflte(self,value):"""Creates a condition where the attribute is less than or equal to the value. :param value: The value that the attribute is less than or equal to. """returnLessThanEquals(self,value)defgt(self,value):"""Creates a condition where the attribute is greater than the value. :param value: The value that the attribute is greater than. """returnGreaterThan(self,value)defgte(self,value):"""Creates a condition where the attribute is greater than or equal to the value. :param value: The value that the attribute is greater than or equal to. """returnGreaterThanEquals(self,value)defbegins_with(self,value):"""Creates a condition where the attribute begins with the value. :param value: The value that the attribute begins with. """returnBeginsWith(self,value)defbetween(self,low_value,high_value):"""Creates a condition where the attribute is greater than or equal to the low value and less than or equal to the high value. :param low_value: The value that the attribute is greater than or equal to. :param high_value: The value that the attribute is less than or equal to. """returnBetween(self,low_value,high_value)def__eq__(self,other):returnisinstance(other,type(self))andself.name==other.namedef__ne__(self,other):returnnotself.__eq__(other)classConditionAttributeBase(ConditionBase,AttributeBase):"""This base class is for conditions that can have attribute methods. One example is the Size condition. To complete a condition, you need to apply another AttributeBase method like eq(). """def__init__(self,*values):ConditionBase.__init__(self,*values)# This is assuming the first value to the condition is the attribute# in which can be used to generate its attribute base.AttributeBase.__init__(self,values[0].name)def__eq__(self,other):returnConditionBase.__eq__(self,other)andAttributeBase.__eq__(self,other)def__ne__(self,other):returnnotself.__eq__(other)classComparisonCondition(ConditionBase):expression_format='{0}{operator}{1}'classEquals(ComparisonCondition):expression_operator='='classNotEquals(ComparisonCondition):expression_operator='<>'classLessThan(ComparisonCondition):expression_operator='<'classLessThanEquals(ComparisonCondition):expression_operator='<='classGreaterThan(ComparisonCondition):expression_operator='>'classGreaterThanEquals(ComparisonCondition):expression_operator='>='classIn(ComparisonCondition):expression_operator='IN'has_grouped_values=TrueclassBetween(ConditionBase):expression_operator='BETWEEN'expression_format='{0}{operator}{1} AND {2}'classBeginsWith(ConditionBase):expression_operator='begins_with'expression_format='{operator}({0}, {1})'classContains(ConditionBase):expression_operator='contains'expression_format='{operator}({0}, {1})'classSize(ConditionAttributeBase):expression_operator='size'expression_format='{operator}({0})'classAttributeType(ConditionBase):expression_operator='attribute_type'expression_format='{operator}({0}, {1})'classAttributeExists(ConditionBase):expression_operator='attribute_exists'expression_format='{operator}({0})'classAttributeNotExists(ConditionBase):expression_operator='attribute_not_exists'expression_format='{operator}({0})'classAnd(ConditionBase):expression_operator='AND'expression_format='({0}{operator}{1})'classOr(ConditionBase):expression_operator='OR'expression_format='({0}{operator}{1})'classNot(ConditionBase):expression_operator='NOT'expression_format='({operator}{0})'
[docs]classAttr(AttributeBase):"""Represents an DynamoDB item's attribute."""
[docs]defne(self,value):"""Creates a condition where the attribute is not equal to the value :param value: The value that the attribute is not equal to. """returnNotEquals(self,value)
[docs]defis_in(self,value):"""Creates a condition where the attribute is in the value, :type value: list :param value: The value that the attribute is in. """returnIn(self,value)
[docs]defexists(self):"""Creates a condition where the attribute exists."""returnAttributeExists(self)
[docs]defnot_exists(self):"""Creates a condition where the attribute does not exist."""returnAttributeNotExists(self)
[docs]defcontains(self,value):"""Creates a condition where the attribute contains the value. :param value: The value the attribute contains. """returnContains(self,value)
[docs]defsize(self):"""Creates a condition for the attribute size. Note another AttributeBase method must be called on the returned size condition to be a valid DynamoDB condition. """returnSize(self)
[docs]defattribute_type(self,value):"""Creates a condition for the attribute type. :param value: The type of the attribute. """returnAttributeType(self,value)
BuiltConditionExpression=namedtuple('BuiltConditionExpression',['condition_expression','attribute_name_placeholders','attribute_value_placeholders',],)classConditionExpressionBuilder:"""This class is used to build condition expressions with placeholders"""def__init__(self):self._name_count=0self._value_count=0self._name_placeholder='n'self._value_placeholder='v'def_get_name_placeholder(self):return'#'+self._name_placeholder+str(self._name_count)def_get_value_placeholder(self):return':'+self._value_placeholder+str(self._value_count)defreset(self):"""Resets the placeholder name and values"""self._name_count=0self._value_count=0defbuild_expression(self,condition,is_key_condition=False):"""Builds the condition expression and the dictionary of placeholders. :type condition: ConditionBase :param condition: A condition to be built into a condition expression string with any necessary placeholders. :type is_key_condition: Boolean :param is_key_condition: True if the expression is for a KeyConditionExpression. False otherwise. :rtype: (string, dict, dict) :returns: Will return a string representing the condition with placeholders inserted where necessary, a dictionary of placeholders for attribute names, and a dictionary of placeholders for attribute values. Here is a sample return value: ('#n0 = :v0', {'#n0': 'myattribute'}, {':v1': 'myvalue'}) """ifnotisinstance(condition,ConditionBase):raiseDynamoDBNeedsConditionError(condition)attribute_name_placeholders={}attribute_value_placeholders={}condition_expression=self._build_expression(condition,attribute_name_placeholders,attribute_value_placeholders,is_key_condition=is_key_condition,)returnBuiltConditionExpression(condition_expression=condition_expression,attribute_name_placeholders=attribute_name_placeholders,attribute_value_placeholders=attribute_value_placeholders,)def_build_expression(self,condition,attribute_name_placeholders,attribute_value_placeholders,is_key_condition,):expression_dict=condition.get_expression()replaced_values=[]forvalueinexpression_dict['values']:# Build the necessary placeholders for that value.# Placeholders are built for both attribute names and values.replaced_value=self._build_expression_component(value,attribute_name_placeholders,attribute_value_placeholders,condition.has_grouped_values,is_key_condition,)replaced_values.append(replaced_value)# Fill out the expression using the operator and the# values that have been replaced with placeholders.returnexpression_dict['format'].format(*replaced_values,operator=expression_dict['operator'])def_build_expression_component(self,value,attribute_name_placeholders,attribute_value_placeholders,has_grouped_values,is_key_condition,):# Continue to recurse if the value is a ConditionBase in order# to extract out all parts of the expression.ifisinstance(value,ConditionBase):returnself._build_expression(value,attribute_name_placeholders,attribute_value_placeholders,is_key_condition,)# If it is not a ConditionBase, we can recurse no further.# So we check if it is an attribute and add placeholders for# its nameelifisinstance(value,AttributeBase):ifis_key_conditionandnotisinstance(value,Key):raiseDynamoDBNeedsKeyConditionError(f'Attribute object {value.name} is of type {type(value)}. 'f'KeyConditionExpression only supports Attribute objects 'f'of type Key')returnself._build_name_placeholder(value,attribute_name_placeholders)# If it is anything else, we treat it as a value and thus placeholders# are needed for the value.else:returnself._build_value_placeholder(value,attribute_value_placeholders,has_grouped_values)def_build_name_placeholder(self,value,attribute_name_placeholders):attribute_name=value.name# Figure out which parts of the attribute name that needs replacement.attribute_name_parts=ATTR_NAME_REGEX.findall(attribute_name)# Add a temporary placeholder for each of these parts.placeholder_format=ATTR_NAME_REGEX.sub('%s',attribute_name)str_format_args=[]forpartinattribute_name_parts:name_placeholder=self._get_name_placeholder()self._name_count+=1str_format_args.append(name_placeholder)# Add the placeholder and value to dictionary of name placeholders.attribute_name_placeholders[name_placeholder]=part# Replace the temporary placeholders with the designated placeholders.returnplaceholder_format%tuple(str_format_args)def_build_value_placeholder(self,value,attribute_value_placeholders,has_grouped_values=False):# If the values are grouped, we need to add a placeholder for# each element inside of the actual value.ifhas_grouped_values:placeholder_list=[]forvinvalue:value_placeholder=self._get_value_placeholder()self._value_count+=1placeholder_list.append(value_placeholder)attribute_value_placeholders[value_placeholder]=v# Assuming the values are grouped by parenthesis.# IN is the currently the only one that uses this so it maybe# needed to be changed in future.return'('+', '.join(placeholder_list)+')'# Otherwise, treat the value as a single value that needs only# one placeholder.else:value_placeholder=self._get_value_placeholder()self._value_count+=1attribute_value_placeholders[value_placeholder]=valuereturnvalue_placeholder