Use

As typical with Design Patterns there are many ways to implement it. Here we focus on the “Topic” approach. It decouples modules by defining a generic interface and act as a kind of “man-in-the-middle”.

Both the Publisher and the Subscribers share a common Topic instance:

from pubsub import Topic
t = Topic("Just a demo")

Publishers

Publishing a value is very simple; assuming t is a pubsub.Topic instance:

t.publish("Hello World")

Subscribers

Each subscriber should register a callback, which will be called “automagical” when a new value is available:

t.subscribe(demo_cb)

Where t is topic (an instance of pubsub.Topic) and demo_cb is the callback. This can a function or other kind of callable. Multiple subscriptions are possible, by registering another:

oo = Demo()
t.subscribe(oo.demo_oo_cb)

callbacks

A callback is a callable, that should process the new value. In essence, it is just a function (or method) with the correct signature. A trivial example is:

def demo_cb(value, topic):
   print("Function-Demo:: Topic: %s has got the value: %s" %(topic, value))

It can also be a method when you prefer an OO-style:

class Demo:

  def demo_oo_cb(self, val, topic):
      print("Method-demo: I (%s) got '%s' from topic %s" %(self, val, topic))

Threads

You might be wondering about threads: are they needed, essential of even possible?
The simple answer is: It’s an “(I) don’t care!”

It is also depending on the implementation. The shown implementation does not need, nor use threads. Remember, the (main) goal is to decouple (modules) and make it a scalable solution. Effectively, the Publisher is calling the callback of the Subscribers (in a loop); like in a conventional, direct call solution.
That callback will run in the same thread as the Publisher, though it can schedule some work on another thread. For example, with a ThreadPoolExecutor.

Notwithstanding, it might be beneficial to include a ThreadPoolExecutor (or any other concurrency concept) within the implementation of Topic. Then, the runtime of t.publish() can be controlled; even become RealTime.

Questionnaire

  1. Why can the runtime of t.publish be unbound?
    Give an example. (in this implementation).

  2. Why isn’t a threading implementation not always better?
    Give an example of on when t.publish() with threads is slower as the current one

Comments

comments powered by Disqus