HEX
Server: Apache/2.4.41 (Amazon) OpenSSL/1.0.2k-fips PHP/5.6.40
System: Linux ip-172-31-40-18 4.14.146-93.123.amzn1.x86_64 #1 SMP Tue Sep 24 00:45:23 UTC 2019 x86_64
User: apache (48)
PHP: 5.6.40
Disabled: NONE
Upload Files
File: //proc/thread-self/root/usr/lib/python2.7/dist-packages/cfnbootstrap/sqs_client.py
#==============================================================================
# Copyright 2011 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#==============================================================================
"""
SQS client-related classes

Classes:
SQSClient - an HTTP client that makes API calls against SQS
Message  - a message from an SQS queue

"""
from cfnbootstrap import aws_client
from cfnbootstrap.aws_client import V4Signer
from cfnbootstrap.util import retry_on_failure, timeout
from cfnbootstrap import endpoint_tool
from xml.etree import ElementTree
import StringIO
import logging
import re

log = logging.getLogger("cfn.client")

class SQSClient(aws_client.Client):
    """
    Makes API calls against SQS

    Notes:
    - Public methods of this class have a 1-to-1 equivalence to published SQS APIs.
    - Calls are retried internally when appropriate; callers should not retry.

    Attributes:
    _apiVersion - the SQS API version
    _xmlns      - the XML namespace for the SQS API version in use

    """

    _apiVersion = "2012-11-05"
    _xmlns='http://queue.amazonaws.com/doc/%s/' % _apiVersion

    def __init__(self, credentials, url=None, region='us-east-1', proxyinfo=None):
        if not url:
            endpoint = SQSClient.endpointForRegion(region)
        else:
            endpoint = self._fix_endpoint(url)

        if not region:
            region = SQSClient.regionForEndpoint(endpoint)

        if not region:
            raise ValueError('Region is required for AWS V4 Signatures')

        signer = V4Signer(region, 'sqs')

        super(SQSClient, self).__init__(credentials, False, endpoint, signer=signer, xmlns='http://queue.amazonaws.com/doc/%s/' % SQSClient._apiVersion, proxyinfo=proxyinfo)
        log.debug("SQS client initialized with endpoint %s", endpoint)

    # SQS SSL certificates have CNs based on sqs.<region>.amazonaws.com
    # Python2.6 will fail to verify the hostname of the certificate
    # Due to http://bugs.python.org/issue13034 only being fixed in 2.7 and 3.2
    def _fix_endpoint(self, unparsed_url):
        # Some endpoints with older certificates still use <region>.queue.amazonaws.com instead
        m = re.match(endpoint_tool.SQS_PATTERN, unparsed_url) or re.match(endpoint_tool.SQS_LEGACY_PATTERN, unparsed_url)
        if m:
            for endpoint in endpoint_tool.get_endpoints_for_service("AmazonSQS"):
                if endpoint.region == m.group(1):
                    fixed_hostname = endpoint.hostname
                    if fixed_hostname.endswith('.cn') and m.group(2).startswith('.cn'):
                        fixed_hostname = fixed_hostname[:-3]
                    return 'https://%s%s' % (fixed_hostname, m.group(2))
            return 'https://' + endpoint_tool.SQS_FORMAT % (m.group(1), m.group(2))
        return unparsed_url

    @classmethod
    def endpointForRegion(cls, region):
        for endpoint in endpoint_tool.get_endpoints_for_service("AmazonSQS"):
            if endpoint.region == region:
                return 'https://%s' % endpoint.hostname
        return 'https://' + endpoint_tool.SQS_FORMAT % (region, '.cn' if region.startswith('cn') else '')

    @classmethod
    def regionForEndpoint(cls, parsed_url):
        for endpoint in endpoint_tool.get_endpoints_for_service("AmazonSQS"):
            if parsed_url == 'https://%s' % endpoint.hostname:
                return endpoint.region
        m = re.match(endpoint_tool.SQS_PATTERN, parsed_url) or re.match(endpoint_tool.SQS_LEGACY_PATTERN, parsed_url)
        if m:
            return m.group(1)
        return None

    @retry_on_failure(http_error_extractor=aws_client.Client._get_xml_extractor(_xmlns))
    @timeout(60)
    def receive_message(self, queue_url, attributes=None, max_messages=1, visibility_timeout=None,
                              request_credentials=None, wait_time=None):
        """
        Calls ReceiveMessage and returns a list of Message objects

        Throws an IOError on failure.
        """
        if not attributes: attributes = ['All']
        queue_url = self._fix_endpoint(queue_url)
        log.debug("Receiving messages for queue %s", queue_url)

        params = { "Action" : "ReceiveMessage", "Version" : SQSClient._apiVersion, "MaxNumberOfMessages" : str(max_messages) }
        for i in range(len(attributes)):
            params['AttributeName.%s' % (i + 1)]=attributes[i]
        if visibility_timeout:
            params['VisibilityTimeout'] = str(visibility_timeout)
        if wait_time:
            params['WaitTimeSeconds'] = str(wait_time)

        response_content = self._call(params, queue_url, request_credentials,
                                      timeout=wait_time + 3 if wait_time else None).content
        return Message._parse_list(StringIO.StringIO(response_content), self._xmlns)

    @retry_on_failure(max_tries=25, http_error_extractor=aws_client.Client._get_xml_extractor(_xmlns))
    @timeout()
    def send_message(self, queue_url, message_body, delay_seconds=None, request_credentials=None):
        """
        Calls SendMessage and returns a tuple of (MessageId, MD5OfMessageBody)

        Throws an IOError on failure.
        """
        queue_url = self._fix_endpoint(queue_url)
        log.debug("Sending %s to queue %s", message_body, queue_url)

        params = { "Action" : "SendMessage", "Version" : SQSClient._apiVersion, "MessageBody" : message_body}

        if delay_seconds:
            params["DelaySeconds"] = delay_seconds

        root = ElementTree.ElementTree(file=StringIO.StringIO(self._call(params, queue_url, request_credentials, verb='POST').content))
        message_id = root.findtext('{%s}SendMessageResult/{%s}MessageId' % (self._xmlns, self._xmlns))
        md5_of_body = root.findtext('{%s}SendMessageResult/{%s}MD5OfMessageBody' % (self._xmlns, self._xmlns))

        return (message_id, md5_of_body)

    @retry_on_failure(http_error_extractor=aws_client.Client._get_xml_extractor(_xmlns))
    @timeout()
    def delete_message(self, queue_url, receipt_handle, request_credentials=None):
        """
        Calls DeleteMessage on a specified receipt handle

        Throws an IOError on failure.
        """
        queue_url = self._fix_endpoint(queue_url)
        log.debug("Deleting %s from queue %s", receipt_handle, queue_url)

        params = { "Action" : "DeleteMessage", "Version" : SQSClient._apiVersion, "ReceiptHandle" : receipt_handle}

        self._call(params, queue_url, request_credentials)

class Message(object):
    """A message off of an SQS queue"""

    @classmethod
    def _parse_list(cls, data, xmlns):
        if not data:
            return []
        root = ElementTree.ElementTree(file=data)
        msgs = root.findall('{%s}ReceiveMessageResult/{%s}Message' % (xmlns, xmlns))
        return [cls._from_elem(elem, xmlns) for elem in msgs]

    @classmethod
    def _from_elem(cls, elem, xmlns):
        attribs = {}
        for attrib in elem.findall('{%s}Attribute' % xmlns):
            attribs[attrib.findtext('{%s}Name' % xmlns)] = attrib.findtext('{%s}Value' % xmlns)

        return Message(elem.findtext('{%s}MessageId' % xmlns),
                       elem.findtext('{%s}ReceiptHandle' % xmlns),
                       elem.findtext('{%s}MD5OfBody' % xmlns),
                       elem.findtext('{%s}Body' % xmlns),
                       attribs)

    def __init__(self, msg_id, handle, md5, body, attribs):
        self._message_id = msg_id
        self._receipt_handle = handle
        self._md5_of_body = md5
        self._body = body
        self._attributes = dict(attribs)

    def __repr__(self):
        return self.__str__()

    def __str__(self):
        return 'Message[MessageId:%s, ReceiptHandle:%s, MD5OfBody:%s, Body:%s, Attributes:%s]' % (self.message_id,
                                                                                                  self.receipt_handle,
                                                                                                  self.md5_of_body,
                                                                                                  self.body,
                                                                                                  self.attributes)

    @property
    def message_id(self):
        return self._message_id

    @property
    def receipt_handle(self):
        return self._receipt_handle

    @property
    def md5_of_body(self):
        return self._md5_of_body

    @property
    def body(self):
        return self._body

    @property
    def attributes(self):
        return self._attributes