Reorganized. Message handling now in theory works

This commit is contained in:
Jacob Henry 2019-03-03 00:44:37 -05:00
parent b6f08df6eb
commit 8191231649
13 changed files with 688 additions and 661 deletions

382
client.py Normal file
View File

@ -0,0 +1,382 @@
from __future__ import annotations
import asyncio
import json
import sys
import traceback
from typing import List, Any, AsyncGenerator, Dict, Coroutine, TypeVar
from typing import Optional
from aiohttp import web
from slackclient import SlackClient
import hooks
import slack_util
# Enable to do single-threaded and have better exceptions
DEBUG_MODE = False
"""
Objects to wrap slack connections
"""
# Read the API token
api_file = open("apitoken.txt", 'r')
SLACK_API = next(api_file).strip()
api_file.close()
class ClientWrapper(object):
"""
Essentially the main state object.
We only ever expect one of these per api token.
Holds a slack client, and handles messsages.
"""
def __init__(self, api_token):
# Init slack
self.slack = SlackClient(api_token)
# Hooks go regex -> callback on (slack, msg, match)
self.hooks: List[hooks.AbsHook] = []
# Periodicals are just wrappers around an iterable, basically
self.passives: List[hooks.Passive] = []
# Cache users and channels
self.users: Dict[str, slack_util.User] = {}
self.conversations: Dict[str, slack_util.Conversation] = {}
# Scheduled/passive events handling
def add_passive(self, per: hooks.Passive) -> None:
self.passives.append(per)
async def run_passives(self) -> None:
"""
Run all currently added passives
"""
awaitables = [p.run() for p in self.passives]
await asyncio.gather(*awaitables)
# Incoming slack hook handling
def add_hook(self, hook: hooks.AbsHook) -> None:
self.hooks.append(hook)
async def handle_events(self) -> None:
"""
Asynchronous tasks that eternally reads and responds to messages.
"""
# Create a queue
queue = asyncio.Queue()
# Create a task to put rtm events to the queue
rtm_task = asyncio.create_task(self.rtm_event_feed(queue))
# Create a task to put http events to the queue
http_task = asyncio.create_task(self.http_event_feed(queue))
# Create a task to handle all other tasks
async def handle_task_loop():
async for t3 in self.spool_tasks(queue):
sys.stdout.flush()
if DEBUG_MODE:
await t3
# Handle them all
await asyncio.gather(rtm_task, http_task, handle_task_loop())
async def rtm_event_feed(self, msg_queue: asyncio.Queue) -> None:
"""
Async wrapper around the message feed.
Yields messages awaitably forever.
"""
# Create the msg feed
feed = slack_util.message_stream(self.slack)
# Create a simple callable that gets one message from the feed
def get_one():
return next(feed)
# Continuously yield async threaded tasks that poll the feed
while True:
next_event = await asyncio.get_running_loop().run_in_executor(None, get_one)
await msg_queue.put(next_event)
async def http_event_feed(self, event_queue: asyncio.Queue) -> None:
# Create a callback to convert requests to events
async def interr(request: web.Request):
if request.can_read_body:
# Get the payload
post_params = await request.post()
payload = json.loads(post_params["payload"])
print("Interaction received: {}".format(payload))
# Handle each action separately
if "actions" in payload:
for action in payload["actions"]:
# Start building the event
ev = slack_util.Event()
# Get the user who clicked the button
ev.user = slack_util.UserContext(payload["user"]["id"])
# Get the channel it was clicked in
ev.conversation = slack_util.ConversationContext(payload["channel"]["id"])
# Get the message this button/action was attached to
ev.interaction = slack_util.InteractiveContext(payload["response_url"],
payload["trigger_id"],
action["block_id"],
action["action_id"],
action.get("value"))
# Put it in the queue
await event_queue.put(ev)
# Respond that everything is fine
return web.Response(status=200)
else:
# If we can't read it, get mad
return web.Response(status=400)
# Create the server
app = web.Application()
app.add_routes([web.post('/bothttpcallback', interr)])
# Asynchronously serve that boy up
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, port=31019)
await site.start()
print("Server up")
# while True:
# await asyncio.sleep(30)
async def spool_tasks(self, event_queue: asyncio.Queue) -> AsyncGenerator[asyncio.Task, Any]:
"""
Read in from async event feed, and spool them out as async tasks
"""
while True:
event: slack_util.Event = await event_queue.get()
# Find which hook, if any, satisfies
for hook in list(self.hooks): # Note that we do list(self.hooks) to avoid edit-while-iterating issues
# Try invoking each
try:
# Try to make a coroutine handling the message
coro = hook.try_apply(event)
# If we get a coro back, then task it up and set consumption appropriately
if coro is not None:
print("Spawned task. Now {} running total.".format(len(asyncio.all_tasks())))
yield asyncio.create_task(_exception_printing_task(coro))
if hook.consumes:
break
except hooks.HookDeath:
# If a hook wants to die, let it.
self.hooks.remove(hook)
# Data getting/sending
def get_conversation(self, conversation_id: str) -> Optional[slack_util.Conversation]:
return self.conversations.get(conversation_id)
def get_conversation_by_name(self, conversation_identifier: str) -> Optional[slack_util.Conversation]:
# If looking for a direct message, first lookup user, then fetch
if conversation_identifier[0] == "@":
user_name = conversation_identifier
# Find the user by their name
raise NotImplementedError("There wasn't a clear use case for this yet, so we've opted to just not use it")
# If looking for a channel, just lookup normally
elif conversation_identifier[0] == "#":
channel_name = conversation_identifier
# Find the channel in the dict
for channel in self.conversations.values():
if channel.name == channel_name:
return channel
# If it doesn't fit the above, we don't know how to process
else:
raise ValueError("Please give either an #channel-name or @user-name")
# If we haven't returned already, give up and return None
return None
def get_user(self, user_id: str) -> Optional[slack_util.User]:
return self.users.get(user_id)
def get_user_by_name(self, user_name: str) -> Optional[slack_util.User]:
raise NotImplementedError()
def api_call(self, api_method, **kwargs):
return self.slack.api_call(api_method, **kwargs)
# Simpler wrappers around message sending/replying
def reply(self, event: slack_util.Event, text: str, in_thread: bool = True) -> dict:
"""
Replies to a message.
Message must have a channel and message context.
Returns the JSON response.
"""
# Ensure we're actually replying to a valid message
assert (event.conversation and event.message) is not None
# Send in a thread by default
if in_thread:
# Figure otu what thread to send it to
thread = event.message.ts
if event.thread:
thread = event.thread.thread_ts
return self.send_message(text, event.conversation.conversation_id, thread=thread)
else:
return self.send_message(text, event.conversation.conversation_id)
def _send_core(self, api_method: str, text: str, channel_id: str, thread: str, broadcast: bool,
blocks: List[dict]) -> dict:
"""
Copy of the internal send message function of slack, with some helpful options.
Returns the JSON response.
"""
kwargs = {"channel": channel_id, "text": text}
if thread:
kwargs["thread_ts"] = thread
if broadcast:
kwargs["reply_broadcast"] = True
if blocks:
kwargs["blocks"] = blocks
return self.api_call(api_method, **kwargs)
def send_message(self,
text: str,
channel_id: str,
thread: str = None,
broadcast: bool = False,
blocks: List[dict] = None) -> dict:
"""
Wraps _send_core for normal messages
"""
return self._send_core("chat.postMessage", text, channel_id, thread, broadcast, blocks)
def send_ephemeral(self,
text: str,
channel_id: str,
thread: str = None,
blocks: List[dict] = None) -> dict:
"""
Wraps _send_core for ephemeral messages
"""
return self._send_core("chat.postEphemeral", text, channel_id, thread, False, blocks)
# Update slack data
def update_channels(self):
"""
Queries the slack API for all current channels
"""
# Necessary because of pagination
cursor = None
# Make a new dict to use
new_dict = {}
# Iterate over results
while True:
# Set args depending on if a cursor exists
args = {"limit": 1000, "types": "public_channel,private_channel,mpim,im"}
if cursor:
args["cursor"] = cursor
channel_dicts = self.api_call("conversations.list", **args)
# If the response is good, put its results to the dict
if channel_dicts["ok"]:
for channel_dict in channel_dicts["channels"]:
if channel_dict["is_im"]:
new_channel = slack_util.DirectMessage(id=channel_dict["id"],
user_id="@" + channel_dict["user"])
else:
new_channel = slack_util.Channel(id=channel_dict["id"],
name="#" + channel_dict["name"])
new_dict[new_channel.id] = new_channel
# Fetch the cursor
cursor = channel_dicts.get("response_metadata").get("next_cursor")
# If cursor is blank, we're done new channels, just give it up
if cursor == "":
break
else:
print("Warning: failed to retrieve channels. Message: {}".format(channel_dicts))
break
self.conversations = new_dict
def update_users(self):
"""
Queries the slack API for all current users
"""
# Necessary because of pagination
cursor = None
while True:
# Set args depending on if a cursor exists
args = {"limit": 1000}
if cursor:
args["cursor"] = cursor
user_dicts = self.api_call("users.list", **args)
# Make a new dict to use
new_dict = {}
# If the response is good:
if user_dicts["ok"]:
for user_dict in user_dicts["members"]:
new_user = slack_util.User(id=user_dict.get("id"),
name=user_dict.get("name"),
real_name=user_dict.get("real_name"),
email=user_dict.get("profile").get("email"))
new_dict[new_user.id] = new_user
# Fetch the cursor
cursor = user_dicts.get("response_metadata").get("next_cursor")
# If cursor is blank, we're done new channels, just give it up
if cursor == "":
break
else:
print("Warning: failed to retrieve users")
break
self.users = new_dict
# Create a single instance of the client wrapper
_singleton = ClientWrapper(SLACK_API)
def get_slack() -> ClientWrapper:
return _singleton
"""
Miscellania
"""
A, B, C = TypeVar("A"), TypeVar("B"), TypeVar("C")
# Prints exceptions instead of silently dropping them in async tasks
async def _exception_printing_task(c: Coroutine[A, B, C]) -> Coroutine[A, B, C]:
# Print exceptions as they pass through
try:
return await c
except Exception:
traceback.print_exc()
raise

153
hooks.py Normal file
View File

@ -0,0 +1,153 @@
from __future__ import annotations
import re
from time import time
from typing import Match, Any, Coroutine, Callable, Optional, Union, List
import slack_util
# Return type of an event callback
MsgAction = Coroutine[Any, Any, None]
# Type signature of an event callback function
Callback = Callable[[slack_util.Event, Match], MsgAction]
"""
Hooks
"""
# Signal exception to be raised when a hook has died
class HookDeath(Exception):
pass
# Abstract hook parent class
class AbsHook(object):
def __init__(self, consumes_applicable: bool):
# Whether or not messages that yield a coroutine should not be checked further
self.consumes = consumes_applicable
def try_apply(self, event: slack_util.Event) -> Optional[MsgAction]:
raise NotImplementedError()
class ChannelHook(AbsHook):
"""
Hook that handles messages in a variety of channels
"""
def __init__(self,
callback: Callback,
patterns: Union[str, List[str]],
channel_whitelist: Optional[List[str]] = None,
channel_blacklist: Optional[List[str]] = None,
consumer: bool = True,
allow_dms: bool = True):
super(ChannelHook, self).__init__(consumer)
# Save all
if not isinstance(patterns, list):
patterns = [patterns]
self.patterns = patterns
self.channel_whitelist = channel_whitelist
self.channel_blacklist = channel_blacklist
self.callback = callback
self.allows_dms = allow_dms
# Remedy some sensible defaults
if self.channel_blacklist is None:
self.channel_blacklist = ["#general"]
elif self.channel_whitelist is None:
pass # We leave as none to show no whitelisting in effect
else:
raise ValueError("Cannot whitelist and blacklist")
def try_apply(self, event: slack_util.Event) -> Optional[MsgAction]:
"""
Returns whether a message should be handled by this dict, returning a Match if so, or None
"""
# Ensure that this is an event in a specific channel, with a text component
if not (event.conversation and event.message):
return None
# Fail if pattern invalid
match = None
for p in self.patterns:
match = re.match(p, event.message.text.strip(), flags=re.IGNORECASE)
if match is not None:
break
if match is None:
return None
# Get the channel name
if isinstance(event.conversation.get_conversation(), slack_util.Channel):
channel_name = event.conversation.get_conversation().name
elif self.allows_dms:
channel_name = "DIRECT_MSG"
else:
return None
# Fail if whitelist defined, and we aren't there
if self.channel_whitelist is not None and channel_name not in self.channel_whitelist:
return None
# Fail if blacklist defined, and we are there
if self.channel_blacklist is not None and channel_name in self.channel_blacklist:
return None
return self.callback(event, match)
class ReplyWaiter(AbsHook):
"""
A special hook that only cares about replies to a given message.
"""
def __init__(self, callback: Callback, pattern: str, thread_ts: str, lifetime: float):
super().__init__(True)
self.callback = callback
self.pattern = pattern
self.thread_ts = thread_ts
self.lifetime = lifetime
self.start_time = time()
self.dead = False
def try_apply(self, event: slack_util.Event) -> Optional[MsgAction]:
# First check: are we dead of age yet?
time_alive = time() - self.start_time
should_expire = time_alive > self.lifetime
# If so, give up the ghost
if self.dead or should_expire:
raise HookDeath()
# Next make sure we're actually a message
if not (event.message and event.thread):
return None
# Otherwise proceed normally
# Is the msg the one we care about? If not, ignore
if event.thread.thread_ts != self.thread_ts:
return None
# Does it match the regex? if not, ignore
match = re.match(self.pattern, event.message.text.strip(), flags=re.IGNORECASE)
if match:
self.dead = True
return self.callback(event, match)
else:
return None
class Passive(object):
"""
Base class for Periodical tasks, such as reminders and stuff
"""
async def run(self) -> None:
# Run this passive routed through the specified slack client.
raise NotImplementedError()

15
main.py
View File

@ -2,17 +2,14 @@ import asyncio
import textwrap
from typing import Match
import identifier
import job_commands
import management_commands
import periodicals
import scroll_util
import hooks
from plugins import identifier, job_commands, management_commands, periodicals, scroll_util, slavestothemachine
import client
import slack_util
import slavestothemachine
def main() -> None:
wrap = slack_util.get_slack()
wrap = client.get_slack()
# Add scroll handling
wrap.add_hook(scroll_util.scroll_hook)
@ -39,7 +36,7 @@ def main() -> None:
wrap.add_hook(job_commands.refresh_hook)
# Add help
wrap.add_hook(slack_util.ChannelHook(help_callback, patterns=[r"help", r"bot\s+help"]))
wrap.add_hook(hooks.ChannelHook(help_callback, patterns=[r"help", r"bot\s+help"]))
# Add boozebot
# wrap.add_passive(periodicals.ItsTenPM())
@ -80,7 +77,7 @@ def main() -> None:
# noinspection PyUnusedLocal
async def help_callback(event: slack_util.Event, match: Match) -> None:
slack_util.get_slack().reply(event, textwrap.dedent("""
client.get_slack().reply(event, textwrap.dedent("""
Commands are as follows. Note that some only work in certain channels.
"my scroll is number" : Registers your slack account to have a certain scroll, for the purpose of automatic dm's.
"@person has scroll number" : same as above, but for other users. Helpful if they are being obstinate.

View File

@ -1,25 +0,0 @@
from typing import Match, List
import slack_util
def list_hooks_callback_gen(hooks: List[slack_util.ChannelHook]) -> slack_util.Callback:
# noinspection PyUnusedLocal
async def callback(event: slack_util.Event, match: Match) -> None:
slack_util.get_slack().reply(event, "\n".join(hook.patterns for hook in hooks))
return callback
# Gracefully reboot to reload code changes
# noinspection PyUnusedLocal
async def reboot_callback(event: slack_util.Event, match: Match) -> None:
response = "Ok. Rebooting..."
slack_util.get_slack().reply(event, response)
exit(0)
# Make hooks
reboot_hook = slack_util.ChannelHook(reboot_callback,
patterns=r"reboot",
channel_whitelist=["#command-center"])

0
plugins/__init__.py Normal file
View File

View File

@ -4,7 +4,7 @@ from datetime import date, timedelta
from typing import Tuple, List, Optional, Any
import google_api
import scroll_util
from plugins import scroll_util
SHEET_ID = "1f9p4H7TWPm8rAM4v_qr2Vc6lBiFNEmR-quTY9UtxEBI"

View File

@ -5,7 +5,9 @@ import asyncio
import shelve
from typing import List, Match
import scroll_util
import hooks
from plugins import scroll_util
import client
import slack_util
# The following db maps SLACK_USER_ID -> SCROLL_INTEGER
@ -36,7 +38,7 @@ async def identify_callback(event: slack_util.Event, match: Match):
result = "Bad scroll: {}".format(query)
# Respond
slack_util.get_slack().reply(event, result)
client.get_slack().reply(event, result)
async def identify_other_callback(event: slack_util.Event, match: Match):
@ -60,7 +62,7 @@ async def identify_other_callback(event: slack_util.Event, match: Match):
result = "Bad scroll: {}".format(scroll_txt)
# Respond
slack_util.get_slack().reply(event, result)
client.get_slack().reply(event, result)
# noinspection PyUnusedLocal
@ -76,7 +78,7 @@ async def check_callback(event: slack_util.Event, match: Match):
result = "You are currently registered with scroll {}".format(scroll)
except KeyError:
result = NON_REG_MSG
slack_util.get_slack().reply(event, result)
client.get_slack().reply(event, result)
# noinspection PyUnusedLocal
@ -97,7 +99,7 @@ async def name_callback(event: slack_util.Event, match: Match):
result = NON_REG_MSG
# Respond
slack_util.get_slack().reply(event, result)
client.get_slack().reply(event, result)
async def lookup_slackid_brother(slack_id: str) -> scroll_util.Brother:
@ -133,7 +135,7 @@ async def lookup_brother_userids(brother: scroll_util.Brother) -> List[str]:
return result
identify_hook = slack_util.ChannelHook(identify_callback, patterns=r"my scroll is (.*)")
identify_other_hook = slack_util.ChannelHook(identify_other_callback, patterns=r"<@(.*)>\s+has scroll\s+(.*)")
check_hook = slack_util.ChannelHook(check_callback, patterns=r"what is my scroll")
name_hook = slack_util.ChannelHook(name_callback, patterns=r"what is my name")
identify_hook = hooks.ChannelHook(identify_callback, patterns=r"my scroll is (.*)")
identify_other_hook = hooks.ChannelHook(identify_other_callback, patterns=r"<@(.*)>\s+has scroll\s+(.*)")
check_hook = hooks.ChannelHook(check_callback, patterns=r"what is my scroll")
name_hook = hooks.ChannelHook(name_callback, patterns=r"what is my name")

View File

@ -3,9 +3,9 @@ from typing import List, Match, Callable, TypeVar, Optional, Iterable, Any, Coro
from fuzzywuzzy import fuzz
import house_management
import identifier
import scroll_util
import hooks
from plugins import identifier, house_management, scroll_util
import client
import slack_util
SHEET_ID = "1lPj9GjB00BuIq9GelOWh5GmiGsheLlowPnHLnWBvMOM"
@ -20,7 +20,7 @@ async def alert_user(brother: scroll_util.Brother, saywhat: str) -> None:
# We do this as a for loop just in case multiple people reg. to same scroll for some reason (e.g. dup accounts)
succ = False
for slack_id in await identifier.lookup_brother_userids(brother):
slack_util.get_slack().send_message(saywhat, slack_id)
client.get_slack().send_message(saywhat, slack_id)
succ = True
# Warn if we never find
@ -112,7 +112,7 @@ async def _mod_jobs(event: slack_util.Event,
if len(closest_assigns) == 0:
if no_job_msg is None:
no_job_msg = "Unable to find any jobs to apply this command to. Try again with better spelling or whatever."
slack_util.get_slack().reply(event, no_job_msg)
client.get_slack().reply(event, no_job_msg)
# If theres only one job, sign it off
elif len(closest_assigns) == 1:
@ -122,9 +122,9 @@ async def _mod_jobs(event: slack_util.Event,
else:
# Say we need more info
job_list = "\n".join("{}: {}".format(i, a.job.pretty_fmt()) for i, a in enumerate(closest_assigns))
slack_util.get_slack().reply(event, "Multiple relevant job listings found.\n"
"Please enter the number corresponding to the job "
"you wish to modify:\n{}".format(job_list))
client.get_slack().reply(event, "Multiple relevant job listings found.\n"
"Please enter the number corresponding to the job "
"you wish to modify:\n{}".format(job_list))
# Establish a follow up command pattern
pattern = r"\d+"
@ -140,13 +140,13 @@ async def _mod_jobs(event: slack_util.Event,
await success_callback(closest_assigns[index])
else:
# They gave a bad index, or we were unable to find the assignment again.
slack_util.get_slack().reply(_event, "Invalid job index / job unable to be found.")
client.get_slack().reply(_event, "Invalid job index / job unable to be found.")
# Make a listener hook
new_hook = slack_util.ReplyWaiter(foc, pattern, event.message.ts, 120)
new_hook = hooks.ReplyWaiter(foc, pattern, event.message.ts, 120)
# Register it
slack_util.get_slack().add_hook(new_hook)
client.get_slack().add_hook(new_hook)
async def signoff_callback(event: slack_util.Event, match: Match) -> None:
@ -168,8 +168,8 @@ async def signoff_callback(event: slack_util.Event, match: Match) -> None:
context.assign.signer = context.signer
# Say we did it wooo!
slack_util.get_slack().reply(event, "Signed off {} for {}".format(context.assign.assignee.name,
context.assign.job.name))
client.get_slack().reply(event, "Signed off {} for {}".format(context.assign.assignee.name,
context.assign.job.name))
await alert_user(context.assign.assignee, "{} signed you off for {}.".format(context.assign.signer.name,
context.assign.job.pretty_fmt()))
@ -196,8 +196,8 @@ async def undo_callback(event: slack_util.Event, match: Match) -> None:
context.assign.signer = None
# Say we did it wooo!
slack_util.get_slack().reply(event, "Undid signoff of {} for {}".format(context.assign.assignee.name,
context.assign.job.name))
client.get_slack().reply(event, "Undid signoff of {} for {}".format(context.assign.assignee.name,
context.assign.job.name))
await alert_user(context.assign.assignee, "{} undid your signoff off for {}.\n"
"Must have been a mistake".format(context.assign.signer.name,
context.assign.job.pretty_fmt()))
@ -225,9 +225,9 @@ async def late_callback(event: slack_util.Event, match: Match) -> None:
context.assign.late = not context.assign.late
# Say we did it
slack_util.get_slack().reply(event, "Toggled lateness of {}.\n"
"Now marked as late: {}".format(context.assign.job.pretty_fmt(),
context.assign.late))
client.get_slack().reply(event, "Toggled lateness of {}.\n"
"Now marked as late: {}".format(context.assign.job.pretty_fmt(),
context.assign.late))
# Fire it off
await _mod_jobs(event, scorer, modifier)
@ -260,7 +260,7 @@ async def reassign_callback(event: slack_util.Event, match: Match) -> None:
reassign_msg = "Job {} reassigned from {} to {}".format(context.assign.job.pretty_fmt(),
from_bro,
to_bro)
slack_util.get_slack().reply(event, reassign_msg)
client.get_slack().reply(event, reassign_msg)
# Tell the people
reassign_msg = "Job {} reassigned from {} to {}".format(context.assign.job.pretty_fmt(),
@ -296,7 +296,7 @@ async def reset_callback(event: slack_util.Event, match: Match) -> None:
house_management.apply_house_points(points, await house_management.import_assignments())
house_management.export_points(headers, points)
slack_util.get_slack().reply(event, "Reset scores and signoffs")
client.get_slack().reply(event, "Reset scores and signoffs")
# noinspection PyUnusedLocal
@ -304,17 +304,17 @@ async def refresh_callback(event: slack_util.Event, match: Match) -> None:
headers, points = await house_management.import_points()
house_management.apply_house_points(points, await house_management.import_assignments())
house_management.export_points(headers, points)
slack_util.get_slack().reply(event, "Force updated point values")
client.get_slack().reply(event, "Force updated point values")
async def nag_callback(event: slack_util.Event, match: Match) -> None:
# Get the day
day = match.group(1).lower().strip()
if not await nag_jobs(day):
slack_util.get_slack().reply(event,
"No jobs found. Check that the day is spelled correctly, with no extra symbols.\n"
"It is possible that all jobs have been signed off, as well.",
in_thread=True)
client.get_slack().reply(event,
"No jobs found. Check that the day is spelled correctly, with no extra symbols.\n"
"It is possible that all jobs have been signed off, as well.",
in_thread=True)
# Wrapper so we can auto-call this as well
@ -350,57 +350,57 @@ async def nag_jobs(day_of_week: str) -> bool:
response += "(scroll missing. Please register for @ pings!)"
response += "\n"
general_id = slack_util.get_slack().get_conversation_by_name("#general").id
slack_util.get_slack().send_message(response, general_id)
general_id = client.get_slack().get_conversation_by_name("#general").id
client.get_slack().send_message(response, general_id)
return True
signoff_hook = slack_util.ChannelHook(signoff_callback,
patterns=[
r"signoff\s+(.*)",
r"sign off\s+(.*)",
],
channel_whitelist=["#housejobs"])
signoff_hook = hooks.ChannelHook(signoff_callback,
patterns=[
r"signoff\s+(.*)",
r"sign off\s+(.*)",
],
channel_whitelist=["#housejobs"])
undo_hook = slack_util.ChannelHook(undo_callback,
patterns=[
r"unsignoff\s+(.*)",
r"undosignoff\s+(.*)",
r"undo signoff\s+(.*)",
],
channel_whitelist=["#housejobs"])
late_hook = slack_util.ChannelHook(late_callback,
patterns=[
r"marklate\s+(.*)",
r"mark late\s+(.*)",
],
channel_whitelist=["#housejobs"])
reset_hook = slack_util.ChannelHook(reset_callback,
patterns=[
r"reset signoffs",
r"reset sign offs",
],
channel_whitelist=["#command-center"])
nag_hook = slack_util.ChannelHook(nag_callback,
patterns=[
r"nagjobs\s+(.*)",
r"nag jobs\s+(.*)"
undo_hook = hooks.ChannelHook(undo_callback,
patterns=[
r"unsignoff\s+(.*)",
r"undosignoff\s+(.*)",
r"undo signoff\s+(.*)",
],
channel_whitelist=["#command-center"])
channel_whitelist=["#housejobs"])
reassign_hook = slack_util.ChannelHook(reassign_callback,
patterns=r"reassign\s+(.*?)-&gt;\s+(.+)",
channel_whitelist=["#housejobs"])
late_hook = hooks.ChannelHook(late_callback,
patterns=[
r"marklate\s+(.*)",
r"mark late\s+(.*)",
],
channel_whitelist=["#housejobs"])
refresh_hook = slack_util.ChannelHook(refresh_callback,
patterns=[
"refresh points",
"update points"
],
channel_whitelist=["#command-center"])
reset_hook = hooks.ChannelHook(reset_callback,
patterns=[
r"reset signoffs",
r"reset sign offs",
],
channel_whitelist=["#command-center"])
nag_hook = hooks.ChannelHook(nag_callback,
patterns=[
r"nagjobs\s+(.*)",
r"nag jobs\s+(.*)"
],
channel_whitelist=["#command-center"])
reassign_hook = hooks.ChannelHook(reassign_callback,
patterns=r"reassign\s+(.*?)-&gt;\s+(.+)",
channel_whitelist=["#housejobs"])
refresh_hook = hooks.ChannelHook(refresh_callback,
patterns=[
"refresh points",
"update points"
],
channel_whitelist=["#command-center"])
block_action = """
[

View File

@ -0,0 +1,27 @@
from typing import Match, List
import hooks
import client
import slack_util
def list_hooks_callback_gen(hooks: List[hooks.ChannelHook]) -> hooks.Callback:
# noinspection PyUnusedLocal
async def callback(event: slack_util.Event, match: Match) -> None:
client.get_slack().reply(event, "\n".join(hook.patterns for hook in hooks))
return callback
# Gracefully reboot to reload code changes
# noinspection PyUnusedLocal
async def reboot_callback(event: slack_util.Event, match: Match) -> None:
response = "Ok. Rebooting..."
client.get_slack().reply(event, response)
exit(0)
# Make hooks
reboot_hook = hooks.ChannelHook(reboot_callback,
patterns=r"reboot",
channel_whitelist=["#command-center"])

View File

@ -2,10 +2,9 @@ import asyncio
from datetime import datetime
from typing import Optional, List
import house_management
import identifier
import job_commands
import slack_util
import hooks
from plugins import identifier, job_commands, house_management
import client
def seconds_until(target: datetime) -> float:
@ -14,7 +13,7 @@ def seconds_until(target: datetime) -> float:
return delta.seconds
class ItsTenPM(slack_util.Passive):
class ItsTenPM(hooks.Passive):
async def run(self) -> None:
while True:
# Get 10PM
@ -25,7 +24,9 @@ class ItsTenPM(slack_util.Passive):
await asyncio.sleep(delay)
# Crow like a rooster
slack_util.get_slack().send_message("IT'S 10 PM!", slack_util.get_slack().get_conversation_by_name("#random").id)
client.get_slack().send_message("IT'S 10 PM!", client
.get_slack()
.get_conversation_by_name("#random").id)
# Wait a while before trying it again, to prevent duplicates
await asyncio.sleep(60)
@ -66,7 +67,7 @@ class JobNotifier:
return True
class NotifyJobs(slack_util.Passive, JobNotifier):
class NotifyJobs(hooks.Passive, JobNotifier):
async def run(self) -> None:
while True:
# Get the "Start" of the current day (Say, 10AM)
@ -83,7 +84,7 @@ class NotifyJobs(slack_util.Passive, JobNotifier):
await asyncio.sleep(10)
class RemindJobs(slack_util.Passive, JobNotifier):
class RemindJobs(hooks.Passive, JobNotifier):
async def run(self) -> None:
while True:
# Get the end of the current day (Say, 10PM)
@ -110,7 +111,7 @@ class RemindJobs(slack_util.Passive, JobNotifier):
for slack_id in assignee_ids:
msg = "{}, you still need to do {}".format(a.assignee.name, a.job.pretty_fmt())
success = True
slack_util.get_slack().send_message(msg, slack_id)
client.get_slack().send_message(msg, slack_id)
# Warn on failure
if not success:
@ -120,11 +121,11 @@ class RemindJobs(slack_util.Passive, JobNotifier):
await asyncio.sleep(10)
class Updatinator(slack_util.Passive):
class Updatinator(hooks.Passive):
"""
Periodically updates the channels and users in the slack
"""
def __init__(self, wrapper_to_update: slack_util.ClientWrapper, interval_seconds: int):
def __init__(self, wrapper_to_update: client.ClientWrapper, interval_seconds: int):
self.wrapper_target = wrapper_to_update
self.interval = interval_seconds

View File

@ -1,3 +1,5 @@
from __future__ import annotations
"""
This file contains util for scroll polling
Only really kept separate for neatness sake.
@ -9,6 +11,8 @@ from typing import List, Optional, Match
from fuzzywuzzy import process
import hooks
import client
import slack_util
# Use this if we can't figure out who a brother actually is
@ -56,7 +60,7 @@ async def scroll_callback(event: slack_util.Event, match: Match) -> None:
result = "Couldn't find brother {}".format(query)
# Respond
slack_util.get_slack().reply(event, result)
client.get_slack().reply(event, result)
def find_by_scroll(scroll: int) -> Optional[Brother]:
@ -104,4 +108,4 @@ async def find_by_name(name: str, threshold: Optional[float] = None) -> Brother:
raise BrotherNotFound(msg)
scroll_hook = slack_util.ChannelHook(scroll_callback, patterns=r"scroll\s+(.*)")
scroll_hook = hooks.ChannelHook(scroll_callback, patterns=r"scroll\s+(.*)")

View File

@ -2,9 +2,11 @@ import re
import textwrap
from typing import Match
import house_management
import hooks
from plugins import house_management
import client
import slack_util
from scroll_util import Brother
from plugins.scroll_util import Brother
counted_data = ["flaked", "rolled", "replaced", "washed", "dried"]
lookup_format = "{}\s+(\d+)"
@ -42,7 +44,7 @@ async def count_work_callback(event: slack_util.Event, match: Match) -> None:
# Three: check if we found anything
if len(new_work) == 0:
if re.search(r'\s\d\s', text) is not None:
slack_util.get_slack().reply(event,
client.get_slack().reply(event,
"If you were trying to record work, it was not recognized.\n"
"Use words {} or work will not be recorded".format(counted_data))
return
@ -59,7 +61,7 @@ async def count_work_callback(event: slack_util.Event, match: Match) -> None:
fmt_work_dict(new_work),
contribution_count,
new_total))
slack_util.get_slack().reply(event, congrats)
client.get_slack().reply(event, congrats)
async def record_towel_contribution(for_brother: Brother, contribution_count: int) -> int:
@ -89,7 +91,7 @@ async def record_towel_contribution(for_brother: Brother, contribution_count: in
# Make dem HOOKs
count_work_hook = slack_util.ChannelHook(count_work_callback,
patterns=".*",
channel_whitelist=["#slavestothemachine"],
consumer=False)
count_work_hook = hooks.ChannelHook(count_work_callback,
patterns=".*",
channel_whitelist=["#slavestothemachine"],
consumer=False)

View File

@ -1,24 +1,16 @@
from __future__ import annotations
import json
import urllib.parse
from aiohttp import web
import asyncio
import re
import sys
import traceback
from dataclasses import dataclass
from time import sleep, time
from typing import List, Any, AsyncGenerator, Coroutine, TypeVar, Dict
from typing import Optional, Generator, Match, Callable, Union, Awaitable
from time import sleep
from typing import Optional, Generator, Callable, Union, Awaitable
from typing import TypeVar
from slackclient import SlackClient
from slackclient.client import SlackNotConnected
# Enable to do single-threaded and have better exceptions
import identifier
import scroll_util
import plugins
import client
DEBUG_MODE = False
@ -34,11 +26,11 @@ class User:
real_name: Optional[str]
email: Optional[str]
async def get_brother(self) -> Optional[scroll_util.Brother]:
async def get_brother(self) -> Optional[plugins.scroll_util.Brother]:
"""
Try to find the brother corresponding to this user.
"""
return await identifier.lookup_slackid_brother(self.id)
return await plugins.identifier.lookup_slackid_brother(self.id)
@dataclass
@ -56,7 +48,7 @@ class DirectMessage:
"""
Lookup the user to which this DM corresponds.
"""
return get_slack().get_user(self.user_id)
return client.get_slack().get_user(self.user_id)
Conversation = Union[Channel, DirectMessage]
@ -82,7 +74,7 @@ class ConversationContext:
conversation_id: str
def get_conversation(self) -> Optional[Conversation]:
return get_slack().get_conversation(self.conversation_id)
return client.get_slack().get_conversation(self.conversation_id)
# If there is a specific user associated with this event
@ -91,7 +83,7 @@ class UserContext:
user_id: str
def as_user(self) -> Optional[User]:
return get_slack().get_user(self.user_id)
return client.get_slack().get_user(self.user_id)
# Same but for bots
@ -189,497 +181,6 @@ def message_dict_to_event(update: dict) -> Event:
return event
"""
Objects to wrap slack connections
"""
# Read the API token
api_file = open("apitoken.txt", 'r')
SLACK_API = next(api_file).strip()
api_file.close()
class ClientWrapper(object):
"""
Essentially the main state object.
We only ever expect one of these per api token.
Holds a slack client, and handles messsages.
"""
def __init__(self, api_token):
# Init slack
self.slack = SlackClient(api_token)
# Hooks go regex -> callback on (slack, msg, match)
self.hooks: List[AbsHook] = []
# Periodicals are just wrappers around an iterable, basically
self.passives: List[Passive] = []
# Cache users and channels
self.users: Dict[str, User] = {}
self.conversations: Dict[str, Conversation] = {}
# Scheduled/passive events handling
def add_passive(self, per: Passive) -> None:
self.passives.append(per)
async def run_passives(self) -> None:
"""
Run all currently added passives
"""
awaitables = [p.run() for p in self.passives]
await asyncio.gather(*awaitables)
# Incoming slack hook handling
def add_hook(self, hook: AbsHook) -> None:
self.hooks.append(hook)
async def handle_events(self) -> None:
"""
Asynchronous tasks that eternally reads and responds to messages.
"""
# Create a queue
queue = asyncio.Queue()
# Create a task to put rtm events to the queue
rtm_task = asyncio.create_task(self.rtm_event_feed(queue))
# Create a task to put http events to the queue
http_task = asyncio.create_task(self.http_event_feed(queue))
# Create a task to handle all other tasks
async def handle_task_loop():
async for t3 in self.spool_tasks(queue):
sys.stdout.flush()
if DEBUG_MODE:
await t3
# Handle them all
await asyncio.gather(rtm_task, http_task, handle_task_loop())
async def rtm_event_feed(self, msg_queue: asyncio.Queue) -> None:
"""
Async wrapper around the message feed.
Yields messages awaitably forever.
"""
# Create the msg feed
feed = message_stream(self.slack)
# Create a simple callable that gets one message from the feed
def get_one():
return next(feed)
# Continuously yield async threaded tasks that poll the feed
while True:
next_event = await asyncio.get_running_loop().run_in_executor(None, get_one)
await msg_queue.put(next_event)
async def http_event_feed(self, event_queue: asyncio.Queue) -> None:
# Create a callback to convert requests to events
async def interr(request: web.Request):
if request.can_read_body:
# Get the payload
post_params = await request.post()
payload = json.loads(post_params["payload"])
print("Interaction received: {}".format(payload))
# Handle each action separately
if "actions" in payload:
for action in payload["actions"]:
# Start building the event
ev = Event()
# Get the user who clicked the button
ev.user = UserContext(payload["user"]["id"])
# Get the channel it was clicked in
ev.conversation = ConversationContext(payload["channel"]["id"])
# Get the message this button/action was attached to
ev.interaction = InteractiveContext(payload["response_url"],
payload["trigger_id"],
action["block_id"],
action["action_id"],
action.get("value"))
# Put it in the queue
await event_queue.put(ev)
# Respond that everything is fine
return web.Response(status=200)
else:
# If we can't read it, get mad
return web.Response(status=400)
# Create the server
app = web.Application()
app.add_routes([web.post('/bothttpcallback', interr)])
# Asynchronously serve that boy up
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, port=31019)
await site.start()
print("Server up")
# while True:
# await asyncio.sleep(30)
async def spool_tasks(self, event_queue: asyncio.Queue) -> AsyncGenerator[asyncio.Task, Any]:
"""
Read in from async event feed, and spool them out as async tasks
"""
while True:
event: Event = await event_queue.get()
# Find which hook, if any, satisfies
for hook in list(self.hooks): # Note that we do list(self.hooks) to avoid edit-while-iterating issues
# Try invoking each
try:
# Try to make a coroutine handling the message
coro = hook.try_apply(event)
# If we get a coro back, then task it up and set consumption appropriately
if coro is not None:
print("Spawned task. Now {} running total.".format(len(asyncio.all_tasks())))
yield asyncio.create_task(_exception_printing_task(coro))
if hook.consumes:
break
except DeadHook:
# If a hook wants to die, let it.
self.hooks.remove(hook)
# Data getting/sending
def get_conversation(self, conversation_id: str) -> Optional[Conversation]:
return self.conversations.get(conversation_id)
def get_conversation_by_name(self, conversation_identifier: str) -> Optional[Conversation]:
# If looking for a direct message, first lookup user, then fetch
if conversation_identifier[0] == "@":
user_name = conversation_identifier
# Find the user by their name
raise NotImplementedError("There wasn't a clear use case for this yet, so we've opted to just not use it")
# If looking for a channel, just lookup normally
elif conversation_identifier[0] == "#":
channel_name = conversation_identifier
# Find the channel in the dict
for channel in self.conversations.values():
if channel.name == channel_name:
return channel
# If it doesn't fit the above, we don't know how to process
else:
raise ValueError("Please give either an #channel-name or @user-name")
# If we haven't returned already, give up and return None
return None
def get_user(self, user_id: str) -> Optional[User]:
return self.users.get(user_id)
def get_user_by_name(self, user_name: str) -> Optional[User]:
raise NotImplementedError()
def api_call(self, api_method, **kwargs):
return self.slack.api_call(api_method, **kwargs)
# Simpler wrappers around message sending/replying
def reply(self, event: Event, text: str, in_thread: bool = True) -> dict:
"""
Replies to a message.
Message must have a channel and message context.
Returns the JSON response.
"""
# Ensure we're actually replying to a valid message
assert (event.conversation and event.message) is not None
# Send in a thread by default
if in_thread:
# Figure otu what thread to send it to
thread = event.message.ts
if event.thread:
thread = event.thread.thread_ts
return self.send_message(text, event.conversation.conversation_id, thread=thread)
else:
return self.send_message(text, event.conversation.conversation_id)
def _send_core(self, api_method: str, text: str, channel_id: str, thread: str, broadcast: bool,
blocks: List[dict]) -> dict:
"""
Copy of the internal send message function of slack, with some helpful options.
Returns the JSON response.
"""
kwargs = {"channel": channel_id, "text": text}
if thread:
kwargs["thread_ts"] = thread
if broadcast:
kwargs["reply_broadcast"] = True
if blocks:
kwargs["blocks"] = blocks
return self.api_call(api_method, **kwargs)
def send_message(self,
text: str,
channel_id: str,
thread: str = None,
broadcast: bool = False,
blocks: List[dict] = None) -> dict:
"""
Wraps _send_core for normal messages
"""
return self._send_core("chat.postMessage", text, channel_id, thread, broadcast, blocks)
def send_ephemeral(self,
text: str,
channel_id: str,
thread: str = None,
blocks: List[dict] = None) -> dict:
"""
Wraps _send_core for ephemeral messages
"""
return self._send_core("chat.postEphemeral", text, channel_id, thread, False, blocks)
# Update slack data
def update_channels(self):
"""
Queries the slack API for all current channels
"""
# Necessary because of pagination
cursor = None
# Make a new dict to use
new_dict = {}
# Iterate over results
while True:
# Set args depending on if a cursor exists
args = {"limit": 1000, "types": "public_channel,private_channel,mpim,im"}
if cursor:
args["cursor"] = cursor
channel_dicts = self.api_call("conversations.list", **args)
# If the response is good, put its results to the dict
if channel_dicts["ok"]:
for channel_dict in channel_dicts["channels"]:
if channel_dict["is_im"]:
new_channel = DirectMessage(id=channel_dict["id"],
user_id="@" + channel_dict["user"])
else:
new_channel = Channel(id=channel_dict["id"],
name="#" + channel_dict["name"])
new_dict[new_channel.id] = new_channel
# Fetch the cursor
cursor = channel_dicts.get("response_metadata").get("next_cursor")
# If cursor is blank, we're done new channels, just give it up
if cursor == "":
break
else:
print("Warning: failed to retrieve channels. Message: {}".format(channel_dicts))
break
self.conversations = new_dict
def update_users(self):
"""
Queries the slack API for all current users
"""
# Necessary because of pagination
cursor = None
while True:
# Set args depending on if a cursor exists
args = {"limit": 1000}
if cursor:
args["cursor"] = cursor
user_dicts = self.api_call("users.list", **args)
# Make a new dict to use
new_dict = {}
# If the response is good:
if user_dicts["ok"]:
for user_dict in user_dicts["members"]:
new_user = User(id=user_dict.get("id"),
name=user_dict.get("name"),
real_name=user_dict.get("real_name"),
email=user_dict.get("profile").get("email"))
new_dict[new_user.id] = new_user
# Fetch the cursor
cursor = user_dicts.get("response_metadata").get("next_cursor")
# If cursor is blank, we're done new channels, just give it up
if cursor == "":
break
else:
print("Warning: failed to retrieve users")
break
self.users = new_dict
# Create a single instance of the client wrapper
_singleton = ClientWrapper(SLACK_API)
def get_slack() -> ClientWrapper:
return _singleton
# Return type of an event callback
MsgAction = Coroutine[Any, Any, None]
# Type signature of an event callback function
Callback = Callable[[Event, Match], MsgAction]
"""
Hooks
"""
# Signal exception to be raised when a hook has died
class DeadHook(Exception):
pass
# Abstract hook parent class
class AbsHook(object):
def __init__(self, consumes_applicable: bool):
# Whether or not messages that yield a coroutine should not be checked further
self.consumes = consumes_applicable
def try_apply(self, event: Event) -> Optional[MsgAction]:
raise NotImplementedError()
class ChannelHook(AbsHook):
"""
Hook that handles messages in a variety of channels
"""
def __init__(self,
callback: Callback,
patterns: Union[str, List[str]],
channel_whitelist: Optional[List[str]] = None,
channel_blacklist: Optional[List[str]] = None,
consumer: bool = True,
allow_dms: bool = True):
super(ChannelHook, self).__init__(consumer)
# Save all
if not isinstance(patterns, list):
patterns = [patterns]
self.patterns = patterns
self.channel_whitelist = channel_whitelist
self.channel_blacklist = channel_blacklist
self.callback = callback
self.allows_dms = allow_dms
# Remedy some sensible defaults
if self.channel_blacklist is None:
self.channel_blacklist = ["#general"]
elif self.channel_whitelist is None:
pass # We leave as none to show no whitelisting in effect
else:
raise ValueError("Cannot whitelist and blacklist")
def try_apply(self, event: Event) -> Optional[MsgAction]:
"""
Returns whether a message should be handled by this dict, returning a Match if so, or None
"""
# Ensure that this is an event in a specific channel, with a text component
if not (event.conversation and event.message):
return None
# Fail if pattern invalid
match = None
for p in self.patterns:
match = re.match(p, event.message.text.strip(), flags=re.IGNORECASE)
if match is not None:
break
if match is None:
return None
# Get the channel name
if isinstance(event.conversation.get_conversation(), Channel):
channel_name = event.conversation.get_conversation().name
elif self.allows_dms:
channel_name = "DIRECT_MSG"
else:
return None
# Fail if whitelist defined, and we aren't there
if self.channel_whitelist is not None and channel_name not in self.channel_whitelist:
return None
# Fail if blacklist defined, and we are there
if self.channel_blacklist is not None and channel_name in self.channel_blacklist:
return None
return self.callback(event, match)
class ReplyWaiter(AbsHook):
"""
A special hook that only cares about replies to a given message.
"""
def __init__(self, callback: Callback, pattern: str, thread_ts: str, lifetime: float):
super().__init__(True)
self.callback = callback
self.pattern = pattern
self.thread_ts = thread_ts
self.lifetime = lifetime
self.start_time = time()
self.dead = False
def try_apply(self, event: Event) -> Optional[MsgAction]:
# First check: are we dead of age yet?
time_alive = time() - self.start_time
should_expire = time_alive > self.lifetime
# If so, give up the ghost
if self.dead or should_expire:
raise DeadHook()
# Next make sure we're actually a message
if not (event.message and event.thread):
return None
# Otherwise proceed normally
# Is the msg the one we care about? If not, ignore
if event.thread.thread_ts != self.thread_ts:
return None
# Does it match the regex? if not, ignore
match = re.match(self.pattern, event.message.text.strip(), flags=re.IGNORECASE)
if match:
self.dead = True
return self.callback(event, match)
else:
return None
class Passive(object):
"""
Base class for Periodical tasks, such as reminders and stuff
"""
async def run(self) -> None:
# Run this passive routed through the specified slack client.
raise NotImplementedError()
"""
@ -702,22 +203,5 @@ class VerboseWrapper(Callable):
try:
return await awt
except Exception as e:
get_slack().reply(self.event, "Error: {}".format(str(e)), True)
client.get_slack().reply(self.event, "Error: {}".format(str(e)), True)
raise e
"""
Miscellania
"""
A, B, C = TypeVar("A"), TypeVar("B"), TypeVar("C")
# Prints exceptions instead of silently dropping them in async tasks
async def _exception_printing_task(c: Coroutine[A, B, C]) -> Coroutine[A, B, C]:
# Print exceptions as they pass through
try:
return await c
except Exception:
traceback.print_exc()
raise