Module kibicara.platforms.twitter.webapi
Expand source code
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
#
# SPDX-License-Identifier: 0BSD
from fastapi import APIRouter, Depends, HTTPException, Response, status
from kibicara.config import config
from kibicara.platforms.twitter.bot import spawner
from kibicara.platforms.twitter.model import Twitter
from kibicara.webapi.hoods import get_hood, get_hood_unauthorized
from logging import getLogger
from sqlite3 import IntegrityError
from ormantic.exceptions import NoMatch
from peony.oauth_dance import get_oauth_token, get_access_token
from peony.exceptions import NotAuthenticated
from pydantic import BaseModel
logger = getLogger(__name__)
class BodyTwitterPublic(BaseModel):
username: str
async def get_twitter(twitter_id: int, hood=Depends(get_hood)):
try:
return await Twitter.objects.get(id=twitter_id, hood=hood)
except NoMatch:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
router = APIRouter()
twitter_callback_router = APIRouter()
@router.get(
'/public',
# TODO response_model,
operation_id='get_twitters_public',
)
async def twitter_read_all_public(hood=Depends(get_hood_unauthorized)):
twitterbots = await Twitter.objects.filter(hood=hood).all()
return [
BodyTwitterPublic(username=twitterbot.username)
for twitterbot in twitterbots
if twitterbot.verified == 1 and twitterbot.enabled == 1 and twitterbot.username
]
@router.get(
'/',
# TODO response_model,
operation_id='get_twitters',
)
async def twitter_read_all(hood=Depends(get_hood)):
return await Twitter.objects.filter(hood=hood).all()
@router.get(
'/{twitter_id}',
# TODO response_model
operation_id='get_twitter',
)
async def twitter_read(twitter=Depends(get_twitter)):
return twitter
@router.delete(
'/{twitter_id}',
status_code=status.HTTP_204_NO_CONTENT,
# TODO response_model
operation_id='delete_twitter',
)
async def twitter_delete(twitter=Depends(get_twitter)):
spawner.stop(twitter)
await twitter.delete()
return Response(status_code=status.HTTP_204_NO_CONTENT)
@router.get(
'/{twitter_id}/status',
status_code=status.HTTP_200_OK,
# TODO response_model
operation_id='status_twitter',
)
async def twitter_status(twitter=Depends(get_twitter)):
return {'status': spawner.get(twitter).status.name}
@router.post(
'/{twitter_id}/start',
status_code=status.HTTP_200_OK,
# TODO response_model
operation_id='start_twitter',
)
async def twitter_start(twitter=Depends(get_twitter)):
await twitter.update(enabled=True)
spawner.get(twitter).start()
return {}
@router.post(
'/{twitter_id}/stop',
status_code=status.HTTP_200_OK,
# TODO response_model
operation_id='stop_twitter',
)
async def twitter_stop(twitter=Depends(get_twitter)):
await twitter.update(enabled=False)
spawner.get(twitter).stop()
return {}
@router.post(
'/',
status_code=status.HTTP_201_CREATED,
# TODO response_model
operation_id='create_twitter',
)
async def twitter_create(response: Response, hood=Depends(get_hood)):
"""
`https://api.twitter.com/oauth/authorize?oauth_token=`
"""
try:
# Purge Twitter corpses
for corpse in await Twitter.objects.filter(hood=hood, verified=False).all():
await corpse.delete()
# Create Twitter
request_token = await get_oauth_token(
config['twitter']['consumer_key'],
config['twitter']['consumer_secret'],
callback_uri=f""""""
f"""{config["frontend_url"]}/dashboard/twitter-callback?hood={hood.id}""",
)
if request_token['oauth_callback_confirmed'] != 'true':
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
twitter = await Twitter.objects.create(
hood=hood,
access_token=request_token['oauth_token'],
access_token_secret=request_token['oauth_token_secret'],
)
response.headers['Location'] = '%d' % twitter.id
return twitter
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
except (KeyError, ValueError, NotAuthenticated):
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
@twitter_callback_router.get(
'/callback',
# TODO response_model
operation_id='callback_twitter',
)
async def twitter_read_callback(
oauth_token: str, oauth_verifier: str, hood=Depends(get_hood)
):
try:
twitter = await Twitter.objects.filter(access_token=oauth_token).get()
access_token = await get_access_token(
config['twitter']['consumer_key'],
config['twitter']['consumer_secret'],
twitter.access_token,
twitter.access_token_secret,
oauth_verifier,
)
await twitter.update(
access_token=access_token['oauth_token'],
access_token_secret=access_token['oauth_token_secret'],
verified=True,
enabled=True,
)
spawner.start(twitter)
return {}
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
except NoMatch:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
except (KeyError, ValueError, NotAuthenticated):
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
Functions
async def get_twitter(twitter_id: int, hood=Depends(get_hood))
-
Expand source code
async def get_twitter(twitter_id: int, hood=Depends(get_hood)): try: return await Twitter.objects.get(id=twitter_id, hood=hood) except NoMatch: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
async def twitter_create(response: starlette.responses.Response, hood=Depends(get_hood))
-
https://api.twitter.com/oauth/authorize?oauth_token=
Expand source code
@router.post( '/', status_code=status.HTTP_201_CREATED, # TODO response_model operation_id='create_twitter', ) async def twitter_create(response: Response, hood=Depends(get_hood)): """ `https://api.twitter.com/oauth/authorize?oauth_token=` """ try: # Purge Twitter corpses for corpse in await Twitter.objects.filter(hood=hood, verified=False).all(): await corpse.delete() # Create Twitter request_token = await get_oauth_token( config['twitter']['consumer_key'], config['twitter']['consumer_secret'], callback_uri=f"""""" f"""{config["frontend_url"]}/dashboard/twitter-callback?hood={hood.id}""", ) if request_token['oauth_callback_confirmed'] != 'true': raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE) twitter = await Twitter.objects.create( hood=hood, access_token=request_token['oauth_token'], access_token_secret=request_token['oauth_token_secret'], ) response.headers['Location'] = '%d' % twitter.id return twitter except IntegrityError: raise HTTPException(status_code=status.HTTP_409_CONFLICT) except (KeyError, ValueError, NotAuthenticated): raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
async def twitter_delete(twitter=Depends(get_twitter))
-
Expand source code
@router.delete( '/{twitter_id}', status_code=status.HTTP_204_NO_CONTENT, # TODO response_model operation_id='delete_twitter', ) async def twitter_delete(twitter=Depends(get_twitter)): spawner.stop(twitter) await twitter.delete() return Response(status_code=status.HTTP_204_NO_CONTENT)
async def twitter_read(twitter=Depends(get_twitter))
-
Expand source code
@router.get( '/{twitter_id}', # TODO response_model operation_id='get_twitter', ) async def twitter_read(twitter=Depends(get_twitter)): return twitter
async def twitter_read_all(hood=Depends(get_hood))
-
Expand source code
@router.get( '/', # TODO response_model, operation_id='get_twitters', ) async def twitter_read_all(hood=Depends(get_hood)): return await Twitter.objects.filter(hood=hood).all()
async def twitter_read_all_public(hood=Depends(get_hood_unauthorized))
-
Expand source code
@router.get( '/public', # TODO response_model, operation_id='get_twitters_public', ) async def twitter_read_all_public(hood=Depends(get_hood_unauthorized)): twitterbots = await Twitter.objects.filter(hood=hood).all() return [ BodyTwitterPublic(username=twitterbot.username) for twitterbot in twitterbots if twitterbot.verified == 1 and twitterbot.enabled == 1 and twitterbot.username ]
async def twitter_read_callback(oauth_token: str, oauth_verifier: str, hood=Depends(get_hood))
-
Expand source code
@twitter_callback_router.get( '/callback', # TODO response_model operation_id='callback_twitter', ) async def twitter_read_callback( oauth_token: str, oauth_verifier: str, hood=Depends(get_hood) ): try: twitter = await Twitter.objects.filter(access_token=oauth_token).get() access_token = await get_access_token( config['twitter']['consumer_key'], config['twitter']['consumer_secret'], twitter.access_token, twitter.access_token_secret, oauth_verifier, ) await twitter.update( access_token=access_token['oauth_token'], access_token_secret=access_token['oauth_token_secret'], verified=True, enabled=True, ) spawner.start(twitter) return {} except IntegrityError: raise HTTPException(status_code=status.HTTP_409_CONFLICT) except NoMatch: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) except (KeyError, ValueError, NotAuthenticated): raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
async def twitter_start(twitter=Depends(get_twitter))
-
Expand source code
@router.post( '/{twitter_id}/start', status_code=status.HTTP_200_OK, # TODO response_model operation_id='start_twitter', ) async def twitter_start(twitter=Depends(get_twitter)): await twitter.update(enabled=True) spawner.get(twitter).start() return {}
async def twitter_status(twitter=Depends(get_twitter))
-
Expand source code
@router.get( '/{twitter_id}/status', status_code=status.HTTP_200_OK, # TODO response_model operation_id='status_twitter', ) async def twitter_status(twitter=Depends(get_twitter)): return {'status': spawner.get(twitter).status.name}
async def twitter_stop(twitter=Depends(get_twitter))
-
Expand source code
@router.post( '/{twitter_id}/stop', status_code=status.HTTP_200_OK, # TODO response_model operation_id='stop_twitter', ) async def twitter_stop(twitter=Depends(get_twitter)): await twitter.update(enabled=False) spawner.get(twitter).stop() return {}
Classes
class BodyTwitterPublic (**data: Any)
-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class BodyTwitterPublic(BaseModel): username: str
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var username : str