Boto Python

You need a valid access and secret key. The examples below assume that you have exported them to your environment, as follows:

bash$ export AWS_ACCESS_KEY_ID=<your access key>
bash$ export AWS_SECRET_ACCESS_KEY=<your secret key>

Before workflows and activities can be used, they have to be registered with SWF service:

# register.py
import boto.swf.layer2 as swf
from boto.swf.exceptions import SWFTypeAlreadyExistsError, SWFDomainAlreadyExistsError
DOMAIN = 'boto_tutorial'
VERSION = '1.0'

registerables = []
registerables.append(swf.Domain(name=DOMAIN))
for workflow_type in ('HelloWorkflow', 'SerialWorkflow', 'ParallelWorkflow', 'SubWorkflow'):
    registerables.append(swf.WorkflowType(domain=DOMAIN, name=workflow_type, version=VERSION, task_list='default'))

for activity_type in ('HelloWorld', 'ActivityA', 'ActivityB', 'ActivityC'):
    registerables.append(swf.ActivityType(domain=DOMAIN, name=activity_type, version=VERSION, task_list='default'))

for swf_entity in registerables:
    try:
        swf_entity.register()
        print swf_entity.name, 'registered successfully'
    except (SWFDomainAlreadyExistsError, SWFTypeAlreadyExistsError):
        print swf_entity.__class__.__name__, swf_entity.name, 'already exists'

Execution of the above should produce no errors.

bash$ python -i register.py
Domain boto_tutorial already exists
WorkflowType HelloWorkflow already exists
SerialWorkflow registered successfully
ParallelWorkflow registered successfully
ActivityType HelloWorld already exists
ActivityA registered successfully
ActivityB registered successfully
ActivityC registered successfully
>>>

HelloWorld

This example is an implementation of a minimal Hello World workflow. Its execution should unfold as follows:

  1. A workflow execution is started.
  2. The SWF service schedules the initial decision task.
  3. A decider polls for decision tasks and receives one.
  4. The decider requests scheduling of an activity task.
  5. The SWF service schedules the greeting activity task.
  6. An activity worker polls for activity task and receives one.
  7. The worker completes the greeting activity.
  8. The SWF service schedules a decision task to inform about work outcome.
  9. The decider polls and receives a new decision task.
  10. The decider schedules workflow completion.
  11. The workflow execution finishes.

Workflow logic is encoded in the decider:

# hello_decider.py
import boto.swf.layer2 as swf

DOMAIN = 'boto_tutorial'
ACTIVITY = 'HelloWorld'
VERSION = '1.0'
TASKLIST = 'default'

class HelloDecider(swf.Decider):

    domain = DOMAIN
    task_list = TASKLIST
    version = VERSION

    def run(self):
        history = self.poll()
        if 'events' in history:
            # Find workflow events not related to decision scheduling.
            workflow_events = [e for e in history['events']
                if not e['eventType'].startswith('Decision')]
            last_event = workflow_events[-1]

            decisions = swf.Layer1Decisions()
            if last_event['eventType'] == 'WorkflowExecutionStarted':
                decisions.schedule_activity_task('saying_hi', ACTIVITY, VERSION, task_list=TASKLIST)
            elif last_event['eventType'] == 'ActivityTaskCompleted':
                decisions.complete_workflow_execution()
            self.complete(decisions=decisions)
            return True

The activity worker is responsible for printing the greeting message when the activity task is dispatched to it by the service:

import boto.swf.layer2 as swf

DOMAIN = 'boto_tutorial'
VERSION = '1.0'
TASKLIST = 'default'

class HelloWorker(swf.ActivityWorker):

    domain = DOMAIN
    version = VERSION
    task_list = TASKLIST

    def run(self):
        activity_task = self.poll()
        if 'activityId' in activity_task:
            print 'Hello, World!'
            self.complete()
            return True

With actors implemented we can spin up a workflow execution:

$ python
>>> import boto.swf.layer2 as swf
>>> execution = swf.WorkflowType(name='HelloWorkflow', domain='boto_tutorial', version='1.0', task_list='default').start()
>>>

From separate terminals run an instance of a worker and a decider to carry out a workflow execution (the worker and decider may run from two independent machines).

$ python -i hello_decider.py
>>> while HelloDecider().run(): pass
...
$ python -i hello_worker.py
>>> while HelloWorker().run(): pass
...
Hello, World!

Great. Now, to see what just happened, go back to the original terminal from which the execution was started, and read its history.

>>> execution.history()
[{'eventId': 1,
  'eventTimestamp': 1381095173.2539999,
  'eventType': 'WorkflowExecutionStarted',
  'workflowExecutionStartedEventAttributes': {'childPolicy': 'TERMINATE',
                                              'executionStartToCloseTimeout': '3600',
                                              'parentInitiatedEventId': 0,
                                              'taskList': {'name': 'default'},
                                              'taskStartToCloseTimeout': '300',
                                              'workflowType': {'name': 'HelloWorkflow',
                                                               'version': '1.0'}}},
 {'decisionTaskScheduledEventAttributes': {'startToCloseTimeout': '300',
                                           'taskList': {'name': 'default'}},
  'eventId': 2,
  'eventTimestamp': 1381095173.2539999,
  'eventType': 'DecisionTaskScheduled'},
 {'decisionTaskStartedEventAttributes': {'scheduledEventId': 2},
  'eventId': 3,
  'eventTimestamp': 1381095177.5439999,
  'eventType': 'DecisionTaskStarted'},
 {'decisionTaskCompletedEventAttributes': {'scheduledEventId': 2,
                                           'startedEventId': 3},
  'eventId': 4,
  'eventTimestamp': 1381095177.855,
  'eventType': 'DecisionTaskCompleted'},
 {'activityTaskScheduledEventAttributes': {'activityId': 'saying_hi',
                                           'activityType': {'name': 'HelloWorld',
                                                            'version': '1.0'},
                                           'decisionTaskCompletedEventId': 4,
                                           'heartbeatTimeout': '600',
                                           'scheduleToCloseTimeout': '3900',
                                           'scheduleToStartTimeout': '300',
                                           'startToCloseTimeout': '3600',
                                           'taskList': {'name': 'default'}},
  'eventId': 5,
  'eventTimestamp': 1381095177.855,
  'eventType': 'ActivityTaskScheduled'},
 {'activityTaskStartedEventAttributes': {'scheduledEventId': 5},
  'eventId': 6,
  'eventTimestamp': 1381095179.427,
  'eventType': 'ActivityTaskStarted'},
 {'activityTaskCompletedEventAttributes': {'scheduledEventId': 5,
                                           'startedEventId': 6},
  'eventId': 7,
  'eventTimestamp': 1381095179.6989999,
  'eventType': 'ActivityTaskCompleted'},
 {'decisionTaskScheduledEventAttributes': {'startToCloseTimeout': '300',
                                           'taskList': {'name': 'default'}},
  'eventId': 8,
  'eventTimestamp': 1381095179.6989999,
  'eventType': 'DecisionTaskScheduled'},
 {'decisionTaskStartedEventAttributes': {'scheduledEventId': 8},
  'eventId': 9,
  'eventTimestamp': 1381095179.7420001,
  'eventType': 'DecisionTaskStarted'},
 {'decisionTaskCompletedEventAttributes': {'scheduledEventId': 8,
                                           'startedEventId': 9},
  'eventId': 10,
  'eventTimestamp': 1381095180.026,
  'eventType': 'DecisionTaskCompleted'},
 {'eventId': 11,
  'eventTimestamp': 1381095180.026,
  'eventType': 'WorkflowExecutionCompleted',
  'workflowExecutionCompletedEventAttributes': {'decisionTaskCompletedEventId': 10}}]

Serial Activity Execution

The following example implements a basic workflow with activities executed one after another.

The business logic, i.e. the serial execution of activities, is encoded in the decider:

# serial_decider.py
import time
import boto.swf.layer2 as swf

class SerialDecider(swf.Decider):

    domain = 'boto_tutorial'
    task_list = 'default_tasks'
    version = '1.0'

    def run(self):
        history = self.poll()
        if 'events' in history:
            # Get a list of non-decision events to see what event came in last.
            workflow_events = [e for e in history['events']
                               if not e['eventType'].startswith('Decision')]
            decisions = swf.Layer1Decisions()
            # Record latest non-decision event.
            last_event = workflow_events[-1]
            last_event_type = last_event['eventType']
            if last_event_type == 'WorkflowExecutionStarted':
                # Schedule the first activity.
                decisions.schedule_activity_task('%s-%i' % ('ActivityA', time.time()),
                   'ActivityA', self.version, task_list='a_tasks')
            elif last_event_type == 'ActivityTaskCompleted':
                # Take decision based on the name of activity that has just completed.
                # 1) Get activity's event id.
                last_event_attrs = last_event['activityTaskCompletedEventAttributes']
                completed_activity_id = last_event_attrs['scheduledEventId'] - 1
                # 2) Extract its name.
                activity_data = history['events'][completed_activity_id]
                activity_attrs = activity_data['activityTaskScheduledEventAttributes']
                activity_name = activity_attrs['activityType']['name']
                # 3) Optionally, get the result from the activity.
                result = last_event['activityTaskCompletedEventAttributes'].get('result')

                # Take the decision.
                if activity_name == 'ActivityA':
                    decisions.schedule_activity_task('%s-%i' % ('ActivityB', time.time()),
                        'ActivityB', self.version, task_list='b_tasks', input=result)
                if activity_name == 'ActivityB':
                    decisions.schedule_activity_task('%s-%i' % ('ActivityC', time.time()),
                        'ActivityC', self.version, task_list='c_tasks', input=result)
                elif activity_name == 'ActivityC':
                    # Final activity completed. We're done.
                    decisions.complete_workflow_execution()

            self.complete(decisions=decisions)
            return True

The workers only need to know which task lists to poll.

# serial_worker.py
import time
import boto.swf.layer2 as swf

class MyBaseWorker(swf.ActivityWorker):

    domain = 'boto_tutorial'
    version = '1.0'
    task_list = None

    def run(self):
        activity_task = self.poll()
        if 'activityId' in activity_task:
            # Get input.
            # Get the method for the requested activity.
            try:
                print 'working on activity from tasklist %s at %i' % (self.task_list, time.time())
                self.activity(activity_task.get('input'))
            except Exception as error:
                self.fail(reason=str(error))
                raise error

            return True

    def activity(self, activity_input):
        raise NotImplementedError

class WorkerA(MyBaseWorker):
    task_list = 'a_tasks'
    def activity(self, activity_input):
        self.complete(result="Now don't be givin him sambuca!")

class WorkerB(MyBaseWorker):
    task_list = 'b_tasks'
    def activity(self, activity_input):
        self.complete()

class WorkerC(MyBaseWorker):
    task_list = 'c_tasks'
    def activity(self, activity_input):
        self.complete()

Spin up a workflow execution and run the decider:

$ python
>>> import boto.swf.layer2 as swf
>>> execution = swf.WorkflowType(name='SerialWorkflow', domain='boto_tutorial', version='1.0', task_list='default_tasks').start()
>>>
$ python -i serial_decider.py
>>> while SerialDecider().run(): pass
...

Run the workers. The activities will be executed in order:

$ python -i serial_worker.py
>>> while WorkerA().run(): pass
...
working on activity from tasklist a_tasks at 1382046291
$ python -i serial_worker.py
>>> while WorkerB().run(): pass
...
working on activity from tasklist b_tasks at 1382046541
$ python -i serial_worker.py
>>> while WorkerC().run(): pass
...
working on activity from tasklist c_tasks at 1382046560

Looks good. Now, do the following to inspect the state and history of the execution:

>>> execution.describe()
{'executionConfiguration': {'childPolicy': 'TERMINATE',
  'executionStartToCloseTimeout': '3600',
  'taskList': {'name': 'default_tasks'},
  'taskStartToCloseTimeout': '300'},
 'executionInfo': {'cancelRequested': False,
  'closeStatus': 'COMPLETED',
  'closeTimestamp': 1382046560.901,
  'execution': {'runId': '12fQ1zSaLmI5+lLXB8ux+8U+hLOnnXNZCY9Zy+ZvXgzhE=',
   'workflowId': 'SerialWorkflow-1.0-1382046514'},
  'executionStatus': 'CLOSED',
  'startTimestamp': 1382046514.994,
  'workflowType': {'name': 'SerialWorkflow', 'version': '1.0'}},
 'latestActivityTaskTimestamp': 1382046560.632,
 'openCounts': {'openActivityTasks': 0,
  'openChildWorkflowExecutions': 0,
  'openDecisionTasks': 0,
  'openTimers': 0}}
>>> execution.history()
...

Parallel Activity Execution

When activities are independent from one another, their execution may be scheduled in parallel.

The decider schedules all activities at once and marks progress until all activities are completed, at which point the workflow is completed.

# parallel_decider.py

import boto.swf.layer2 as swf
import time

SCHED_COUNT = 5

class ParallelDecider(swf.Decider):

    domain = 'boto_tutorial'
    task_list = 'default'
    def run(self):
        decision_task = self.poll()
        if 'events' in decision_task:
            decisions = swf.Layer1Decisions()
            # Decision* events are irrelevant here and can be ignored.
            workflow_events = [e for e in decision_task['events']
                               if not e['eventType'].startswith('Decision')]
            # Record latest non-decision event.
            last_event = workflow_events[-1]
            last_event_type = last_event['eventType']
            if last_event_type == 'WorkflowExecutionStarted':
                # At start, kickoff SCHED_COUNT activities in parallel.
                for i in range(SCHED_COUNT):
                    decisions.schedule_activity_task('activity%i' % i, 'ActivityA', '1.0',
                                                     task_list=self.task_list)
            elif last_event_type == 'ActivityTaskCompleted':
                # Monitor progress. When all activities complete, complete workflow.
                completed_count = sum([1 for a in decision_task['events']
                                       if a['eventType'] == 'ActivityTaskCompleted'])
                print '%i/%i' % (completed_count, SCHED_COUNT)
                if completed_count == SCHED_COUNT:
                    decisions.complete_workflow_execution()
            self.complete(decisions=decisions)
            return True

Again, the only bit of information a worker needs is which task list to poll.

# parallel_worker.py
import time
import boto.swf.layer2 as swf

class ParallelWorker(swf.ActivityWorker):

    domain = 'boto_tutorial'
    task_list = 'default'

    def run(self):
        """Report current time."""
        activity_task = self.poll()
        if 'activityId' in activity_task:
            print 'working on', activity_task['activityId']
            self.complete(result=str(time.time()))
            return True

Spin up a workflow execution and run the decider:

$ python -i parallel_decider.py
>>> execution = swf.WorkflowType(name='ParallelWorkflow', domain='boto_tutorial', version='1.0', task_list='default').start()
>>> while ParallelDecider().run(): pass
...
1/5
2/5
4/5
5/5

Run two or more workers to see how the service partitions work execution in parallel.

$ python -i parallel_worker.py
>>> while ParallelWorker().run(): pass
...
working on activity1
working on activity3
working on activity4
$ python -i parallel_worker.py
>>> while ParallelWorker().run(): pass
...
working on activity2
working on activity0

As seen above, the work was partitioned between the two running workers.

Sub-Workflows

Sometimes it’s desired or necessary to break the process up into multiple workflows.

Since the decider is stateless, it’s up to you to determine which workflow is being used and which action you would like to take.

import boto.swf.layer2 as swf

class SubWorkflowDecider(swf.Decider):

    domain = 'boto_tutorial'
    task_list = 'default'
    version = '1.0'

    def run(self):
        history = self.poll()
        events = []
        if 'events' in history:
            events = history['events']
            # Collect the entire history if there are enough events to become paginated
            while 'nextPageToken' in history:
                history = self.poll(next_page_token=history['nextPageToken'])
                if 'events' in history:
                    events = events + history['events']

            workflow_type = history['workflowType']['name']

            # Get all of the relevent events that have happened since the last decision task was started
            workflow_events = [e for e in events
                    if e['eventId'] > history['previousStartedEventId'] and
                    not e['eventType'].startswith('Decision')]

            decisions = swf.Layer1Decisions()

            for event in workflow_events:
                last_event_type = event['eventType']
                if last_event_type == 'WorkflowExecutionStarted':
                    if workflow_type == 'SerialWorkflow':
                        decisions.start_child_workflow_execution('SubWorkflow', self.version,
                            "subworkflow_1", task_list=self.task_list, input="sub_1")
                    elif workflow_type == 'SubWorkflow':
                        for i in range(2):
                            decisions.schedule_activity_task("activity_%d" % i, 'ActivityA', self.version, task_list='a_tasks')
                    else:
                        decisions.fail_workflow_execution(reason="Unknown workflow %s" % workflow_type)
                        break

                elif last_event_type == 'ChildWorkflowExecutionCompleted':
                    decisions.schedule_activity_task("activity_2", 'ActivityB', self.version, task_list='b_tasks')

                elif last_event_type == 'ActivityTaskCompleted':
                    attrs = event['activityTaskCompletedEventAttributes']
                    activity = events[attrs['scheduledEventId'] - 1]
                    activity_name = activity['activityTaskScheduledEventAttributes']['activityType']['name']

                    if activity_name == 'ActivityA':
                        completed_count = sum([1 for a in events if a['eventType'] == 'ActivityTaskCompleted'])
                        if completed_count == 2:
                            # Complete the child workflow
                            decisions.complete_workflow_execution()
                    elif activity_name == 'ActivityB':
                        # Complete the parent workflow
                        decisions.complete_workflow_execution()

            self.complete(decisions=decisions)
        return True

Why SWF useful?

Amazon SimpleWorkflow service defines an interface for workflow orchestration and provides state persistence for workflow executions.

Amazon SWF applications involve communication between the following entities:
  • The Amazon Simple Workflow Service – providing centralized orchestration and workflow state persistence,
  • Workflow Executors – some entity starting workflow executions, typically through an action taken by a user or from a cronjob.
  • Deciders – a program codifying the business logic, i.e. a set of instructions and decisions. Deciders take decisions based on initial set of conditions and outcomes from activities.
  • Activity Workers – their objective is very straightforward: to take inputs, execute the tasks and return a result to the Service.

The Workflow Executor contacts SWF Service and requests instantiation of a workflow. A new workflow is created and its state is stored in the service. The next time a decider contacts SWF service to ask for a decision task, it will be informed about a new workflow execution is taking place and it will be asked to advise SWF service on what the next steps should be. The decider then instructs the service to dispatch specific tasks to activity workers. At the next activity worker poll, the task is dispatched, then executed and the results reported back to the SWF, which then passes them onto the deciders. This exchange keeps happening repeatedly until the decider is satisfied and instructs the service to complete the execution.

SWF Workflow

This tutorial focuses on boto’s interface to AWS SimpleWorkflow service.

What is a workflow?

A workflow is a sequence of multiple activities aimed at accomplishing a well-defined objective. For instance, booking an airline ticket as a workflow may encompass multiple activities, such as selection of itinerary, submission of personal details, payment validation and booking confirmation.

Except for the start and completion of a workflow, each step has a well-defined predecessor and successor. With that
  • on successful completion of an activity the workflow can progress with its execution,
  • when one of workflow’s activities fails it can be retried,
  • and when it keeps failing repeatedly the workflow may regress to the previous step to gather alternative inputs or it may simply fail at that stage.

Why use workflows?

Modelling an application on a workflow provides a useful abstraction layer for writing highly-reliable programs for distributed systems, as individual responsibilities can be delegated to a set of redundant, independent and non-critical processing units.

Amazon SWF Doc

The Amazon Simple Workflow Service (Amazon SWF) makes it easy to build applications that coordinate work across distributed components. In Amazon SWF, a task represents a logical unit of work that is performed by a component of your application. Coordinating tasks across the application involves managing intertask dependencies, scheduling, and concurrency in accordance with the logical flow of the application. Amazon SWF gives you full control over implementing tasks and coordinating them without worrying about underlying complexities such as tracking their progress and maintaining their state.

When using Amazon SWF, you implement workers to perform tasks. These workers can run either on cloud infrastructure, such as Amazon Elastic Compute Cloud (Amazon EC2), or on your own premises. You can create tasks that are long-running, or that may fail, time out, or require restarts—or that may complete with varying throughput and latency. Amazon SWF stores tasks and assigns them to workers when they are ready, tracks their progress, and maintains their state, including details on their completion. To coordinate tasks, you write a program that gets the latest state of each task from Amazon SWF and uses it to initiate subsequent tasks. Amazon SWF maintains an application’s execution state durably so that the application is resilient to failures in individual components. With Amazon SWF, you can implement, deploy, scale, and modify these application components independently.

Amazon SWF offers capabilities to support a variety of application requirements. It is suitable for a range of use cases that require coordination of tasks, including media processing, web application back-ends, business process workflows, and analytics pipelines.

SWF Intro

Amazon Simple Workflow Service (Amazon SWF) makes it easy to build applications that coordinate work across distributed components. In Amazon SWF, a task represents a logical unit of work that is performed by a component of your application. Coordinating tasks across the application involves managing intertask dependencies, scheduling, and concurrency in accordance with the logical flow of the application. Amazon SWF gives you full control over implementing tasks and coordinating them without worrying about underlying complexities such as tracking their progress and maintaining their state.

What is Amazon SWF?

Amazon SWF helps developers build, run, and scale background jobs that have parallel or sequential steps. You can think of Amazon SWF as a fully-managed state tracker and task coordinator in the Cloud.

If your app’s steps take more than 500 milliseconds to complete, you need to track the state of processing, and you need to recover or retry if a task fails, Amazon SWF can help you.

Rabbitmq Bluemix

RabbitMQ is an open source message broker. It receives and delivers messages from and to your applications. A message broker is (unlike databases and key-value store) purpose built to highly effectively and safely deliver information between your applications. Messages can be sent cross languages, platforms and OS—this way of handling messages decouples your processes and creates a highly scalable system.

CloudAMQP is today operating and providing support to the largest fleet of RabbitMQ clusters in the world! The CloudAMQP service is available in the Bluemix catalog.

cloudamqp_overview_bluemix

If you are new to RabbitMQ, we recommend you read the guide RabbitMQ for beginners before continuing.

Step 1. Create a space and an app within that space

Let’s get started with a managed RabbitMQ server hosted in Bluemix!

  • Register an account and log in to Bluemix
  • Create the space for your organization
  • Create an app within the space:
    Create app

Step 2. Create a CloudAMQP service instance

  • Open the dashboard for your app and select “ADD A SERVICE OR API”
  • Find CloudAMQP in the list of services (direct link) and select a plan. We offer seven different plans, both dedicated clusters, individual servers and vhosts on shared clusters. Dedicated plans provides guaranteed isolation between instances, where as shared plans provides great performance but limits number of connections and number of messages sent per month.
    create service

Step 3. Cloud Foundry command line interface

Use the Cloud Foundry command line interface to deploy and modify applications and service instances. See Getting Started with the cf CLI as required.

  • Go to Bluemix dashboard for you app and press the “Start coding” button
  • Follow the guide and download your “starter code”
  • Go to the local app directory (the files you downloaded) for the application directory
  • Connect and log into Bluemix from the command line.
    $ cf api https://api.eu-gb.bluemix.net
    $ cf login -u username -o organization -s space
    

Step 4. Start coding

All AMQP client libraries work with CloudAMQP and there are AMQP client libraries for almost every platform. Sample code, links to recommended libraries and further information about the client libraries can be found in the CloudAMQP documentation.

  • Add the client library to your dependencies file. The library will be downloaded automatically when you deploy your app. CloudAMQP recommended client libraries for different languages can be found here: Ruby, Python, Celery, node.js, PHP, Java, Clojure, Go, Android, .NET, Perl and C. For example, in Ruby, open the Gemfile and add gem ‘bunny’.
    gem 'bunny'
    
  • Copy code from the the same language-specific link as above. For example, in Ruby:
    require "bunny" # don't forget to put gem "bunny" in your Gemfile 
    
    services = JSON.parse(ENV['VCAP_SERVICES'])
    cloudamqp_conf = services["cloudamqp"].first
    uri = cloudamqp_conf["credentials"]["uri"]
    
    b = Bunny.new uri
          
    b.start # start a communication session with the amqp server
          
    ch = b.create_channel # create a channel
    
    q = ch.queue("test") # declare a queue
          
    # declare a default direct exchange, which is bound to all queues
    e = ch.exchange("")
    
    # publish a message to the exchange, the message will be routed to the queue "test"
    e.publish("Hello, everybody!", key: value)
          
    # subscribe from the queue
    q.subscribe(block: true) do |delivery_info, properties, payload|
       puts "This is your message: " + payload + "\n\n"
       b.stop # close the connection
    end
    
    
  • Set up connection Environment Variable: The connection and credential information for CloudAMQP is available in the Bluemix VCAP_SERVICES environment variable. It’s a nested JSON objected with all service credentials.

Step 5. Push the App to Bluemix

When the code has been added it is time to deploy the App. Deploying the application into bluemix is easy, just run this command from the app directory:

$ cf push --name your_app 

Modify your_app to the application name you chose earlier. Once the application has finished deploying, your app will be connecting, publishing and subscribing via CloudAMQP:

http://your_app.mybluemix.net

You can write to log and tail your log via cf by:

$ cf logs your_app 

Useful tools

Once the service is added to space, a link to CloudAMQP Control Panel will appear within Bluemix. Open the Bluemix Dashboard and press Services and “OPEN CLOUDAMQP DASHBOARD” to get to your CloudAMQP instance.

bluemix-to-cloudamqp

The instance details, such as connection URL, metrics, logs etc can be seen at the details page. You can also go to the RabbitMQ management interface from the detail page.

From the management interface it is possible to view, handle and delete connections, queues, message etc.

management interface

A full product overview, showing different tools and features to simplify the usage of your RabbitMQ instances can be found in the product overview page.

Various monitoring tools are also available. These tools will address performance issues promptly and automatically, before they impact your business. For more details, see CloudAMQP: Monitoring Tools.

CLoudamqp Python

The recommended library for Python to access RabbitMQ servers is Pika.

Put pika==0.9.14 in your requirement.txt file.

The following code connects to CloudAMQP, declares a queues, publish a message to it, setups a subscription and print messages coming to the queue.

Note: The DEFAULT_SOCKET_TIMEOUT is set to 0.25s, we would recommend to raise this parameter to about 5s to avoid connection timeout, params.socket_timeout = 5 Other connection parameter options for Pika can be found here: Connection Parameters.

The full code can be seen at github.com/cloudamqp/python-amqp-example.

import pika, os, urlparse, logging
logging.basicConfig()

# Access the CLODUAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost/%2f')
params = pika.URLParameters(url)
params.socket_timeout = 5
connection = pika.BlockingConnection(params) # Connect to CloudAMQP
channel = connection.channel() # start a channel
channel.queue_declare(queue='hello') # Declare a queue
# send a message
channel.basic_publish(exchange='', routing_key='hello', body='Hello CloudAMQP!')
print " [x] Sent 'Hello World!'"

# create a function which is called on incoming messages
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body)

# set up subscription on the queue
channel.basic_consume(callback,
    queue='hello',
    no_ack=True)

channel.start_consuming() # start consuming (blocks)

connection.close()

Cloudamqp

CloudAMQP are managed RabbitMQ servers in the cloud – hosted message queues that lets you pass messages between processes and other systems. Messages are published to a queue by a producer, the consumers can then get the messages off the queue when the consumer wants to handle the messages. In-between, it can route, buffer, and persist the messages according to rules you give it.

Messages can be sent cross languages, platforms and OS, this way of handling messages decouple your processes and creates a highly scalable system.

CloudAMQP overview

RabbitMQ is a high performance message broker, built in Erlang, which implements the AMQP protocol. All AMQP client libraries work with CloudAMQP and there are AMQP client libraries for almost every platform out there, including: Ruby, Node.js, Java, Python, Clojure and Erlang.

Rabbitmq

RabbitMQ is an open source message broker. It receives and delivers messages from and to your applications. A message broker is (unlike databases and key-value store) purpose built to highly effectively and safely deliver information between your applications. Our RabbitMQ for beginners guide is an excellent introduction to messaging.