Author: | Jesús Arias Fisteus |
---|
Contents
This document describes how you can develop applications that consume events, produce events (sensors) or act as streaming servers. Servers must necessarily use the Python libraries we provide as part of Ztreamy. Consumers and producers can be programmed in any programming language. They just need to communicate with the streaming server with HTTP. Nevertheless, we also provide a library that helps programming those kinds of applications in Python.
Ztreamy is built on top of the Tornado Web Server. Tornado's architecture is based on an event loop they call the IOLoop. The IOLoop is in control of the process. Input/output operations (e.g. for sending and receiving data through sockets) are non-blocking: they return immediately instead of waiting for the operation to complete (new data to arrive or the data to be sent to be effectively dispatched). They register callback functions that the system calls when the operation completes.
The examples of this guide are distributed inside the examples directory of the source code of Ztreamy. Assuming you have installed Ztreamy, you can run them from a console.
For running consumers, you need to have the example server running:
$ python server.py
Then, in another console, run the client you want to try:
$ python client_async.py
If you want to try the event producer, you need to run the server and a client as shown above, and then, in another console:
$ python publisher_async.py
If your application needs to consume events, you have to decide whether to program your application asynchronously (on top of Tornado's IOLoop) or synchronously. The key functions of the asynchronous library are non-blocking. On the contrary, all the operations of the synchronous library are blocking. Asynchronous consumers are developed by using the ztreamy.client.Client class, whereas synchronous consumers use the ztreamy.client.SynchronousClient.
Streams are identified by HTTP URIs. Because the same stream server can serve more than one stream, a stream has an associated path. For example, the following stream has the path /stream1 at a stream server installed at port 9000 of example.com:
http://example.com:9000/stream1
When clients want to connect to a stream, the need to specify also an access mode, which can be one of the following:
The access mode is communicated to the server by appending to the path of the URI one of the following components:
The following program connects to two streams and prints the events that come from them:
from __future__ import print_function from ztreamy import Client def event_received(event): print(str(event)) print() def error(message, http_error=None): if http_error is not None: print('[Error] ' + message + ': ' + str(http_error)) else: print('[Error] ' + message) streams = [ 'http://localhost:9000/stream1/compressed', 'http://localhost:9000/stream2/compressed', ] client = Client(streams, event_callback=event_received, error_callback=error) try: # Start receiving events and block on the IOLoop client.start(loop=True) except KeyboardInterrupt: # Ctrl-c finishes the program pass finally: client.stop()
Using the client requires three main steps:
The callback function for received events is called by the library whenever an event arrives. It receives the event (an instance of ztreamy.Event or of one of its subclasses) as a parameter.
Note that the URIs of the streams must specify a long-lived requests access mode: the stream name in the path must be followed either by /compressed or by /stream. The difference between the two of them is that the first one uses ZLIB compression. Your program does not need to be aware about compression, because ztreamy decompresses the data internally. Connecting to the compressed stream should normally be the preferred option, due to the amount if traffic it saves.
The following program connects to a stream with the synchronous API and prints the events that come from it:
from __future__ import print_function from ztreamy import SynchronousClient stream = 'http://localhost:9000/stream1/long-polling' client = SynchronousClient(stream) try: while not client.stream_finished: events = client.receive_events() for event in events: print(str(event)) except KeyboardInterrupt: # Ctrl-c finishes the program pass
There are two main steps:
Note that the URI of the stream must instruct the server to use the long polling requests mode: the stream name in the path must be followed by /long-polling, like in the example.
You can publish events from the stream server itself, or from a remote client that sends the events to the stream server through HTTP. The former is useful when you want the producer of the events to act as a server for its own events. The latter is useful for situations in which sensors (or event producers of any kind) are scattered in the network and separate from the stream server that serves the events they produce.
When publishing events through a remote stream server, the producer of the events needs to know the URI of the stream the events are to be published to. The special path component /publish must be appended to the URI of the stream. For example:
http://example.com:9000/stream1/publish
The EventPublisher and SynchronousEventPublisher append automatically /publish to the URI they receive if it does not contain it.
The following example sets up a stream server that serves two streams, and publishes periodical events on them:
import time import random import ztreamy import tornado.ioloop # Create a server with two streams server = ztreamy.StreamServer(9000) # Create the streams; stream1 allows remote producers to publish through HTTP stream1 = ztreamy.Stream('/stream1', allow_publish=True) stream2 = ztreamy.Stream('/stream2') server.add_stream(stream1) server.add_stream(stream2) # Create two publisher objects publisher1 = ztreamy.LocalEventPublisher(stream1) publisher2 = ztreamy.LocalEventPublisher(stream2) source_id = ztreamy.random_id() application_ids = ['ztreamy-example-a', 'ztreamy-example-b'] # Publish events periodically def publish_hi(): print 'Publishing "hi"' app_id = random.choice(application_ids) event = ztreamy.Event(source_id, 'text/plain', 'Hi', application_id=app_id) publisher1.publish(event) def publish_there(): print 'Publishing "there"' app_id = random.choice(application_ids) event = ztreamy.Event(source_id, 'text/plain', 'there!', application_id=app_id) publisher2.publish(event) tornado.ioloop.PeriodicCallback(publish_hi, 10000).start() time.sleep(5) tornado.ioloop.PeriodicCallback(publish_there, 10000).start() try: print 'Starting the server' server.start(loop=True) except KeyboardInterrupt: # Allow ctrl-c to close the server pass finally: server.stop()
The key aspects to take into account in the previous example are that:
The following example publishes periodic events using the asynchronous API:
import ztreamy import tornado.ioloop # Create a publisher object stream = 'http://localhost:9000/stream1' publisher = ztreamy.EventPublisher(stream) source_id = ztreamy.random_id() # Publish events periodically def publish(): print 'Publishing' event = ztreamy.Event(source_id, 'text/plain', 'This is a new event') publisher.publish(event) tornado.ioloop.PeriodicCallback(publish, 10000).start() try: # Block on the ioloop tornado.ioloop.IOLoop.instance().start() except KeyboardInterrupt: # Allow ctrl-c to finish the program pass finally: publisher.close()
The program creates an EventPublisher object and publishes a new event every 10 seconds, by using its publish method. Note that the program needs to block on Tornado's ioloop at the end, in order to work. Because of that, the timer of ioloop is used for scheduling the creation of events.
The following example publishes periodic events using the synchronous API:
import time import ztreamy # Create a publisher object stream = 'http://localhost:9000/stream1' publisher = ztreamy.SynchronousEventPublisher(stream) source_id = ztreamy.random_id() try: while True: time.sleep(10) event = ztreamy.Event(source_id, 'text/plain', 'This is a new event') publisher.publish(event) except KeyboardInterrupt: # Allow ctrl-c to finish the program pass finally: publisher.close()
The main difference with the previous example is that now the program does not block on the ioloop, but uses sleep to control the rate at which the events are published.
Ztreamy serializes events as a series of headers and a body. A header is similar to an HTTP header. It contains a name and a value. The body contains the main data of the event. There is no assumption on the kind of data that the body stores. This is an example serialization of an event:
Event-Id: 1100254f-f4ba-49aa-8c47-605e3110169e Source-Id: 83a4c888-c395-4bb7-a635-c5b864d6bd06 Syntax: text/n3 Application-Id: identi.ca dataset Timestamp: 2012-10-25T13:31:24+02:00 Body-Length: 843 @prefix dc: <http://purl.org/dc/elements/1.1/> . @prefix foaf: <http://xmlns.com/foaf/0.1/> . @prefix geo: <http://www.w3.org/2003/01/geo/wgs84_pos#> . @prefix webtlab: <http://webtlab.it.uc3m.es/ns/> . <http://identi.ca/notice/97535534> dc:creator "http://identi.ca/user/94360"; dc:date "2012-10-25T11:28:51+00:00"; webtlab:content "Completed registrations for #wmbangalore !Wikimedia DevCamp Banglalore: 2430 applications, 130 invitations sent http://is.gd/FtXMhT"; webtlab:conversation "http://identi.ca/conversation/96703048"; webtlab:hashtag "wmbangalore"; webtlab:location [ a geo:Place; geo:lat "13.018", geo:long "77.568" ] . "http://identi.ca/user/94360" foaf:based_near [ a geo:Place; geo:lat "52.392"; geo:long "4.899" ]; foaf:name "S....... M......" .
Ztreamy provides an API for representing events as objects, and for serializing and deserializing them. The Event class is the base class for all the events. Classes for specific types of events, such as RDFEvent, which is used events whose body is RDF, subclass Event.
You can create an event directly using the Event class or using one of its subclass. This is an example of a generic event:
import ztreamy source_id = ztreamy.random_id() event = ztreamy.Event(source_id, 'text/plain', 'This is a new event')
If there is an appropriate class for representing a type of event, events should be created with the constructor of that class (see the example for RDF events below).
In order to access the contents of an event object, you can use its attributes: event_id, source_id, syntax, application_id, aggregator id, event_type, timestamp, extra_headers (a dictionary with the application-specific headers) and body.
All the attributes above can also be accessed through the dictionary that the method as_dictionay returns:
dictionary = event.as_dictionary() print dictionary['Source-Id']
In addition, you can obtain a textual representation of its body with the method serialize_body:
print event.serialize_body()
Ztreamy uses internally the rdflib library to work with RDF data. The events whose body is represented as RDF are represented as objects of the RDFEvent class. This is an example of an RDF event, in which an RDF graph is used for the body of the event:
import ztreamy from rdflib import Graph, Namespace, Literal source_id = ztreamy.random_id() graph = Graph() ns_example = Namespace('http://example.com/ns/') graph.add((ns_example['dog'], ns_example['eats'], Literal('10'))) event = ztreamy.RDFEvent(source_id, 'text/n3', graph)
RDFEvent objects return the body of the event also as an rdflib Graph object.
In order to create a custom event type, you must create a class that extends from Event. It should have a constructor and the code for serializing and deserializing the body of the events. The constructor must receive the parameters source_id, syntax, body, **kwargs and call the constructor of its superclass.
Then, you need to registrer in the system the MIME types it handles. This way, when the platform finds an event of one of those types, it automatically creates the event using the custom class.
As an example, this is the source code of the implementation of RDFEvent in ztreamy:
.. include:: ../ztreamy/rdfevents.py :literal:
The ztreamy.filters module provides a base class for filtering events, called Filter, and several subclasses that implement some built-in filter. If you need to select just a subset of the events, you can use one of those built-in filters or program your own filter by subclassing the Filter class.
The built-in filters currently available in Ztreamy are:
For implementing a custom filter class, just create a class that extends Filter and implements the method filter_event(self, event). Remember to ask in the constructor of your class for a callback function, and invoke the constructor of Filter with that callback.
As an example, look how the built-in filters are programmed. For example, this is the implementation of SourceFilter:
class SourceFilter(Filter): def __init__(self, callback, source_id=None, source_ids=[]): """Creates a filter for source ids. 'source_id' must be only one id, whereas 'source_ids' must be a list of ids. If both are present, 'source_id' is appended to the list of ids. """ super(SourceFilter, self).__init__(callback) self.source_ids = set() if source_id is not None: self.source_ids.add(source_id) for source in source_ids: self.source_ids.add(source) def filter_event(self, event): if event.source_id in self.source_ids: self.callback(event)
Filter objects can act as event handlers. You can filter events when using a client object by passing an instance of the filter in the place of the event callback. This is an example:
filter_ = ApplicationFilter(callback, application_id='ztreamy-example-a') client = Client(streams, event_callback=filter_, error_callback=error)
In the example, the filter receives the callback function and some filter-specific configuration parameters (in this case, the application id to filter). Then, it creates the client and passes the filter object as an event callback.
This is a complete example that you can find in the examples directory under the name of consumer_filter.py:
.. include:: ../examples/publisher_async.py :literal:
In order to try it, run the example server of Publishing events from the stream server and, in another terminal, run the consumer above. You can compare the output of this consumer with the one described at Developing a consumer asynchronously. which does not apply filters.
You can also apply filters in the RelayStream class, which implements a stream that relays the events of other streams. Pass a filter object with the keyword parameter filter_ to the constructor of RelayStream. Only the events that match the filter will be relayed. Note that RelayStream will overwrite the callback of the filter with its own internal code.