Module kibicara.platforms.twitter.bot

Expand source code
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
#
# SPDX-License-Identifier: 0BSD

from asyncio import gather, sleep, CancelledError
from kibicara.config import config
from kibicara.platformapi import Censor, Message, Spawner
from kibicara.platforms.twitter.model import Twitter
from logging import getLogger
from peony import PeonyClient, exceptions


logger = getLogger(__name__)


class TwitterBot(Censor):
    def __init__(self, twitter_model):
        super().__init__(twitter_model.hood)
        self.twitter_model = twitter_model
        self.enabled = self.twitter_model.enabled
        self.polling_interval_sec = 60
        self.mentions_since_id = self.twitter_model.mentions_since_id
        self.dms_since_id = self.twitter_model.dms_since_id

    @classmethod
    async def destroy_hood(cls, hood):
        """Removes all its database entries."""
        for twitter in await Twitter.objects.filter(hood=hood).all():
            await twitter.delete()

    async def run(self):
        try:
            if not self.twitter_model.verified:
                raise ValueError('Oauth Handshake not completed')
            self.client = PeonyClient(
                consumer_key=config['twitter']['consumer_key'],
                consumer_secret=config['twitter']['consumer_secret'],
                access_token=self.twitter_model.access_token,
                access_token_secret=self.twitter_model.access_token_secret,
            )
            if self.twitter_model.mentions_since_id is None:
                logger.debug('since_id is None in model, fetch newest mention id')
                await self._poll_mentions()
            if self.twitter_model.dms_since_id is None:
                logger.debug('since_id is None in model, fetch newest dm id')
                await self._poll_direct_messages()
            user = await self.client.user
            if user.screen_name:
                await self.twitter_model.update(username=user.screen_name)
            logger.debug('Starting Twitter bot: %s' % self.twitter_model.__dict__)
            await gather(self.poll(), self.push())
        except CancelledError:
            logger.debug(f'Bot {self.twitter_model.hood.name} received Cancellation.')
        except exceptions.Unauthorized:
            logger.debug(f'Bot {self.twitter_model.hood.name} has invalid auth token.')
            await self.twitter_model.update(enabled=False)
            self.enabled = self.twitter_model.enabled
        except (KeyError, ValueError, exceptions.NotAuthenticated):
            logger.warning('Missing consumer_keys for Twitter in your configuration.')
            await self.twitter_model.update(enabled=False)
            self.enabled = self.twitter_model.enabled
        finally:
            logger.debug(f'Bot {self.twitter_model.hood.name} stopped.')

    async def poll(self):
        while True:
            dms = await self._poll_direct_messages()
            logger.debug(
                'Polled dms (%s): %s' % (self.twitter_model.hood.name, str(dms))
            )
            mentions = await self._poll_mentions()
            logger.debug(
                'Polled mentions (%s): %s'
                % (self.twitter_model.hood.name, str(mentions))
            )
            await self.twitter_model.update(
                dms_since_id=self.dms_since_id, mentions_since_id=self.mentions_since_id
            )
            for message in dms:
                await self.publish(Message(message))
            for message_id, message in mentions:
                await self.publish(Message(message, twitter_mention_id=message_id))
            await sleep(self.polling_interval_sec)

    async def _poll_direct_messages(self):
        dms = await self.client.api.direct_messages.events.list.get()
        dms = dms.events
        # TODO check for next_cursor (see twitter api)
        dms_filtered = []
        if dms:
            for dm in dms:
                if int(dm.id) == self.dms_since_id:
                    break
                dms_filtered.append(dm)
            self.dms_since_id = int(dms[0].id)
        messages = []
        for dm in dms_filtered:
            filtered_text = await self._filter_text(
                dm.message_create.message_data.entities,
                dm.message_create.message_data.text,
            )
            if not filtered_text:
                continue
            messages.append(filtered_text)
        return messages

    async def _poll_mentions(self):
        mentions = await self.client.api.statuses.mentions_timeline.get(
            since_id=self.mentions_since_id
        )
        if mentions:
            self.mentions_since_id = mentions[0].id
        messages = []
        for mention in mentions:
            filtered_text = await self._filter_text(mention.entities, mention.text)
            if not filtered_text:
                continue
            messages.append((mention.id, filtered_text))
        return messages

    async def _filter_text(self, entities, text):
        remove_indices = set()
        for user in entities.user_mentions:
            remove_indices.update(range(user.indices[0], user.indices[1] + 1))
        for url in entities.urls:
            remove_indices.update(range(url.indices[0], url.indices[1] + 1))
        for symbol in entities.symbols:
            remove_indices.update(range(symbol.indices[0], symbol.indices[1] + 1))
        filtered_text = ""
        for index, character in enumerate(text):
            if index not in remove_indices:
                filtered_text += character
        filtered_text = filtered_text.strip()
        return filtered_text

    async def push(self):
        while True:
            message = await self.receive()
            logger.debug(
                'Received message from censor (%s): %s'
                % (self.twitter_model.hood.name, message.text)
            )
            if hasattr(message, 'twitter_mention_id'):
                await self._retweet(message.twitter_mention_id)
            else:
                await self._post_tweet(message.text)

    async def _post_tweet(self, message):
        return await self.client.api.statuses.update.post(status=message)

    async def _retweet(self, message_id):
        return await self.client.api.statuses.retweet.post(id=message_id)


spawner = Spawner(Twitter, TwitterBot)

Classes

class TwitterBot (twitter_model)

The superclass for a platform bot.

The censor is the superclass for every platform bot. It distributes a message to all other bots from the same hood if it passes the message filter. It provides methods to start and stop the bot and an overwritable stub for a starting routine.

Examples

class XYZPlatform(Censor):
    def __init__(self, xyz_model):
        super().__init__(xyz_model.hood)
    ...
    async def run(self):
        await gather(self.poll(), self.push())
    ...
    async def poll(self):
        while True:
            # XXX get text message from platform xyz
            await self.publish(Message(text))
    ...
    async def push(self):
        while True:
            message = await self.receive()
            # XXX send message.text to platform xyz

Args

hood : Hood
A Hood Model object

Attributes

hood : Hood
A Hood Model object
Expand source code
class TwitterBot(Censor):
    def __init__(self, twitter_model):
        super().__init__(twitter_model.hood)
        self.twitter_model = twitter_model
        self.enabled = self.twitter_model.enabled
        self.polling_interval_sec = 60
        self.mentions_since_id = self.twitter_model.mentions_since_id
        self.dms_since_id = self.twitter_model.dms_since_id

    @classmethod
    async def destroy_hood(cls, hood):
        """Removes all its database entries."""
        for twitter in await Twitter.objects.filter(hood=hood).all():
            await twitter.delete()

    async def run(self):
        try:
            if not self.twitter_model.verified:
                raise ValueError('Oauth Handshake not completed')
            self.client = PeonyClient(
                consumer_key=config['twitter']['consumer_key'],
                consumer_secret=config['twitter']['consumer_secret'],
                access_token=self.twitter_model.access_token,
                access_token_secret=self.twitter_model.access_token_secret,
            )
            if self.twitter_model.mentions_since_id is None:
                logger.debug('since_id is None in model, fetch newest mention id')
                await self._poll_mentions()
            if self.twitter_model.dms_since_id is None:
                logger.debug('since_id is None in model, fetch newest dm id')
                await self._poll_direct_messages()
            user = await self.client.user
            if user.screen_name:
                await self.twitter_model.update(username=user.screen_name)
            logger.debug('Starting Twitter bot: %s' % self.twitter_model.__dict__)
            await gather(self.poll(), self.push())
        except CancelledError:
            logger.debug(f'Bot {self.twitter_model.hood.name} received Cancellation.')
        except exceptions.Unauthorized:
            logger.debug(f'Bot {self.twitter_model.hood.name} has invalid auth token.')
            await self.twitter_model.update(enabled=False)
            self.enabled = self.twitter_model.enabled
        except (KeyError, ValueError, exceptions.NotAuthenticated):
            logger.warning('Missing consumer_keys for Twitter in your configuration.')
            await self.twitter_model.update(enabled=False)
            self.enabled = self.twitter_model.enabled
        finally:
            logger.debug(f'Bot {self.twitter_model.hood.name} stopped.')

    async def poll(self):
        while True:
            dms = await self._poll_direct_messages()
            logger.debug(
                'Polled dms (%s): %s' % (self.twitter_model.hood.name, str(dms))
            )
            mentions = await self._poll_mentions()
            logger.debug(
                'Polled mentions (%s): %s'
                % (self.twitter_model.hood.name, str(mentions))
            )
            await self.twitter_model.update(
                dms_since_id=self.dms_since_id, mentions_since_id=self.mentions_since_id
            )
            for message in dms:
                await self.publish(Message(message))
            for message_id, message in mentions:
                await self.publish(Message(message, twitter_mention_id=message_id))
            await sleep(self.polling_interval_sec)

    async def _poll_direct_messages(self):
        dms = await self.client.api.direct_messages.events.list.get()
        dms = dms.events
        # TODO check for next_cursor (see twitter api)
        dms_filtered = []
        if dms:
            for dm in dms:
                if int(dm.id) == self.dms_since_id:
                    break
                dms_filtered.append(dm)
            self.dms_since_id = int(dms[0].id)
        messages = []
        for dm in dms_filtered:
            filtered_text = await self._filter_text(
                dm.message_create.message_data.entities,
                dm.message_create.message_data.text,
            )
            if not filtered_text:
                continue
            messages.append(filtered_text)
        return messages

    async def _poll_mentions(self):
        mentions = await self.client.api.statuses.mentions_timeline.get(
            since_id=self.mentions_since_id
        )
        if mentions:
            self.mentions_since_id = mentions[0].id
        messages = []
        for mention in mentions:
            filtered_text = await self._filter_text(mention.entities, mention.text)
            if not filtered_text:
                continue
            messages.append((mention.id, filtered_text))
        return messages

    async def _filter_text(self, entities, text):
        remove_indices = set()
        for user in entities.user_mentions:
            remove_indices.update(range(user.indices[0], user.indices[1] + 1))
        for url in entities.urls:
            remove_indices.update(range(url.indices[0], url.indices[1] + 1))
        for symbol in entities.symbols:
            remove_indices.update(range(symbol.indices[0], symbol.indices[1] + 1))
        filtered_text = ""
        for index, character in enumerate(text):
            if index not in remove_indices:
                filtered_text += character
        filtered_text = filtered_text.strip()
        return filtered_text

    async def push(self):
        while True:
            message = await self.receive()
            logger.debug(
                'Received message from censor (%s): %s'
                % (self.twitter_model.hood.name, message.text)
            )
            if hasattr(message, 'twitter_mention_id'):
                await self._retweet(message.twitter_mention_id)
            else:
                await self._post_tweet(message.text)

    async def _post_tweet(self, message):
        return await self.client.api.statuses.update.post(status=message)

    async def _retweet(self, message_id):
        return await self.client.api.statuses.retweet.post(id=message_id)

Ancestors

Static methods

async def destroy_hood(hood)

Removes all its database entries.

Expand source code
@classmethod
async def destroy_hood(cls, hood):
    """Removes all its database entries."""
    for twitter in await Twitter.objects.filter(hood=hood).all():
        await twitter.delete()

Methods

async def poll(self)
Expand source code
async def poll(self):
    while True:
        dms = await self._poll_direct_messages()
        logger.debug(
            'Polled dms (%s): %s' % (self.twitter_model.hood.name, str(dms))
        )
        mentions = await self._poll_mentions()
        logger.debug(
            'Polled mentions (%s): %s'
            % (self.twitter_model.hood.name, str(mentions))
        )
        await self.twitter_model.update(
            dms_since_id=self.dms_since_id, mentions_since_id=self.mentions_since_id
        )
        for message in dms:
            await self.publish(Message(message))
        for message_id, message in mentions:
            await self.publish(Message(message, twitter_mention_id=message_id))
        await sleep(self.polling_interval_sec)
async def push(self)
Expand source code
async def push(self):
    while True:
        message = await self.receive()
        logger.debug(
            'Received message from censor (%s): %s'
            % (self.twitter_model.hood.name, message.text)
        )
        if hasattr(message, 'twitter_mention_id'):
            await self._retweet(message.twitter_mention_id)
        else:
            await self._post_tweet(message.text)

Inherited members