Commit 5ec2a1f4 authored by Mark Steadman's avatar Mark Steadman

Stores messages in memory rather than database

parent cbfeca40
from .models import Message, Tag
from .models import Message
from . import subscriptions
import warnings
def subscription(*channels, tags=(), once=False):
def subscription(*channels, tags=(), once=None):
if once is not None: # pragma: no cover
warnings.warn(
'\'once\' argument is deprecated',
DeprecationWarning
)
def wrapper(f):
for channel in channels:
subscriptions.register(channel, tags, f, once)
......@@ -15,16 +22,9 @@ def subscription(*channels, tags=(), once=False):
def publish(channel, name, *tags, **data):
message = Message(
channel=channel,
name=name
name=name,
data=data,
tags=tags
)
message.set_data(data)
message.full_clean()
message.save()
for tag in tags:
t = Tag(message=message, slug=tag)
t.full_clean()
t.save()
message.publish()
# Generated by Django 2.1.2 on 2018-10-14 11:29
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='Message',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('channel', models.CharField(db_index=True, max_length=100)),
('name', models.SlugField(max_length=100)),
('data', models.TextField()),
('published', models.DateTimeField(auto_now_add=True)),
],
),
migrations.CreateModel(
name='Tag',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('slug', models.SlugField(max_length=100)),
('message', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='tags', to='pubsub.Message')),
],
),
migrations.AlterUniqueTogether(
name='tag',
unique_together={('slug', 'message')},
),
]
from django.db import models
from . import subscriptions
import base64
import pickle
class Message(models.Model):
channel = models.CharField(max_length=100, db_index=True)
name = models.SlugField(max_length=100, db_index=True)
data = models.TextField()
published = models.DateTimeField(auto_now_add=True)
class TagList(list):
def filter(self, *tags):
filtered = []
def set_data(self, value):
self.data = base64.b64encode(
pickle.dumps(value, 0)
).decode('utf-8')
for tag in tags:
if tag in self:
filtered.append(tag)
def get_data(self):
return pickle.loads(base64.b64decode(self.data))
return TagList(filtered)
def exists(self):
return any(self)
def publish(self):
subscriptions.publish(self)
@property
def similar_messages(self):
return type(self).objects.filter(
channel=self.channel,
name=self.name,
tags__slug__in=self.tags.values_list('slug', flat=True),
data=self.data
).exclude(
pk=self.pk
class Message(object):
def __init__(self, **kwargs):
self.channel = kwargs.pop('channel')
self.name = kwargs.pop('name')
self.data = kwargs.pop('data', {})
self.tags = TagList(
sorted(
set(
kwargs.pop('tags', [])
)
)
)
for key in kwargs.keys(): # pragma: no cover
raise TypeError(
'__init__() got an unexpected keyword argument \'%s\'' % (
key
)
)
def publish(self):
subscriptions.publish(self)
def call(self, func):
return func(
self.channel,
self.name,
*sorted(set(self.tags.values_list('slug', flat=True))),
**self.get_data()
*self.tags,
**self.data
)
class Tag(models.Model):
message = models.ForeignKey(
Message,
related_name='tags',
on_delete=models.CASCADE
)
slug = models.SlugField(max_length=100)
def __str__(self): # pragma: no cover
return self.slug
class Meta:
unique_together = ('slug', 'message')
from .exceptions import AlreadyRegisteredError
import logging
import re
import warnings
class SubscriptionList(object):
......@@ -8,7 +9,13 @@ class SubscriptionList(object):
self._subs = []
self._logger = logging.getLogger('pubsub')
def register(self, channel, tags, func, once=False):
def register(self, channel, tags, func, once=None):
if once is not None: # pragma: no cover
warnings.warn(
'\'once\' argument is deprecated',
DeprecationWarning
)
key = '^' + channel.replace(
'.', '\\.'
).replace('*', '([a-z0-9]+)') + '$'
......@@ -39,24 +46,10 @@ class SubscriptionList(object):
continue
if any(listening_tags):
if not message.tags.filter(
slug__in=listening_tags
).exists():
if not message.tags.filter(*listening_tags).exists():
continue
if once and message.similar_messages.exists():
self._logger.debug(
(
'Not sending to %s as this message has already been '
'broadcast'
) % func.__name__
)
continue
self._logger.debug(
'Sending to %s' % func.__name__
)
self._logger.debug('Sending to %s' % func.__name__)
try:
message.call(func)
......@@ -68,7 +61,7 @@ class SubscriptionList(object):
extra={
'channel_name': message.channel,
'msg_name': message.name,
'msg_data': message.get_data(),
'msg_data': message.data,
'msg_func': func.__name__
},
exc_info=True
......
......@@ -8,10 +8,8 @@ local = {}
logging.disable(logging.CRITICAL)
@channels.subscription(
'myapp.test_message', tags=('static', 'dynamic'), once=True
)
def test_message(channel, name, *tags, **data):
@channels.subscription('myapp.test_message', tags=('static', 'dynamic'), once=True)
def _test_message(channel, name, *tags, **data):
local['name'] = name
local['tags'] = tags
local['data'] = data
......@@ -56,12 +54,12 @@ class PublishTests(TestCase):
subscriptions.register(
'myapp.test_message',
(),
test_message
_test_message
)
self.assertEqual(
ctx.exception.args[0],
'test_message is already registered.'
'_test_message is already registered.'
)
def test_ignored_channel(self):
......@@ -80,21 +78,3 @@ class PublishTests(TestCase):
)
self.assertFalse(any(local))
def test_more_than_once(self):
channels.publish(
'myapp.test_message',
'test_message',
'static'
)
self.assertTrue(any(local))
local.clear()
channels.publish(
'myapp.test_message',
'test_message',
'static'
)
self.assertFalse(any(local))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment