7d87ccef35
Fixes #98
485 lines
16 KiB
Go
485 lines
16 KiB
Go
package slack
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"reflect"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
// ManageConnection can be called on a Slack RTM instance returned by the
|
|
// NewRTM method. It will connect to the slack RTM API and handle all incoming
|
|
// and outgoing events. If a connection fails then it will attempt to reconnect
|
|
// and will notify any listeners through an error event on the IncomingEvents
|
|
// channel.
|
|
//
|
|
// If the connection ends and the disconnect was unintentional then this will
|
|
// attempt to reconnect.
|
|
//
|
|
// This should only be called once per slack API! Otherwise expect undefined
|
|
// behavior.
|
|
//
|
|
// The defined error events are located in websocket_internals.go.
|
|
func (rtm *RTM) ManageConnection() {
|
|
var connectionCount int
|
|
for {
|
|
connectionCount++
|
|
// start trying to connect
|
|
// the returned err is already passed onto the IncomingEvents channel
|
|
info, conn, err := rtm.connect(connectionCount, rtm.useRTMStart)
|
|
// if err != nil then the connection is sucessful - otherwise it is
|
|
// fatal
|
|
if err != nil {
|
|
rtm.Debugf("Failed to connect with RTM on try %d: %s", connectionCount, err)
|
|
return
|
|
}
|
|
rtm.info = info
|
|
rtm.IncomingEvents <- RTMEvent{"connected", &ConnectedEvent{
|
|
ConnectionCount: connectionCount,
|
|
Info: info,
|
|
}}
|
|
|
|
rtm.conn = conn
|
|
rtm.isConnected = true
|
|
|
|
rtm.Debugf("RTM connection succeeded on try %d", connectionCount)
|
|
|
|
keepRunning := make(chan bool)
|
|
// we're now connected (or have failed fatally) so we can set up
|
|
// listeners
|
|
go rtm.handleIncomingEvents(keepRunning)
|
|
|
|
// this should be a blocking call until the connection has ended
|
|
rtm.handleEvents(keepRunning, 30*time.Second)
|
|
|
|
// after being disconnected we need to check if it was intentional
|
|
// if not then we should try to reconnect
|
|
if rtm.wasIntentional {
|
|
return
|
|
}
|
|
// else continue and run the loop again to connect
|
|
}
|
|
}
|
|
|
|
// connect attempts to connect to the slack websocket API. It handles any
|
|
// errors that occur while connecting and will return once a connection
|
|
// has been successfully opened.
|
|
// If useRTMStart is false then it uses rtm.connect to create the connection,
|
|
// otherwise it uses rtm.start.
|
|
func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocket.Conn, error) {
|
|
// used to provide exponential backoff wait time with jitter before trying
|
|
// to connect to slack again
|
|
boff := &backoff{
|
|
Min: 100 * time.Millisecond,
|
|
Max: 5 * time.Minute,
|
|
Factor: 2,
|
|
Jitter: true,
|
|
}
|
|
|
|
for {
|
|
// send connecting event
|
|
rtm.IncomingEvents <- RTMEvent{"connecting", &ConnectingEvent{
|
|
Attempt: boff.attempts + 1,
|
|
ConnectionCount: connectionCount,
|
|
}}
|
|
// attempt to start the connection
|
|
info, conn, err := rtm.startRTMAndDial(useRTMStart)
|
|
if err == nil {
|
|
return info, conn, nil
|
|
}
|
|
// check for fatal errors - currently only invalid_auth
|
|
if sErr, ok := err.(*WebError); ok && (sErr.Error() == "invalid_auth" || sErr.Error() == "account_inactive") {
|
|
rtm.Debugf("Invalid auth when connecting with RTM: %s", err)
|
|
rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}}
|
|
return nil, nil, sErr
|
|
}
|
|
|
|
// any other errors are treated as recoverable and we try again after
|
|
// sending the event along the IncomingEvents channel
|
|
rtm.IncomingEvents <- RTMEvent{"connection_error", &ConnectionErrorEvent{
|
|
Attempt: boff.attempts,
|
|
ErrorObj: err,
|
|
}}
|
|
|
|
// check if Disconnect() has been invoked.
|
|
select {
|
|
case _ = <-rtm.disconnected:
|
|
rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{Intentional: true}}
|
|
return nil, nil, fmt.Errorf("disconnect received while trying to connect")
|
|
default:
|
|
}
|
|
|
|
// get time we should wait before attempting to connect again
|
|
dur := boff.Duration()
|
|
rtm.Debugf("reconnection %d failed: %s", boff.attempts+1, err)
|
|
rtm.Debugln(" -> reconnecting in", dur)
|
|
time.Sleep(dur)
|
|
}
|
|
}
|
|
|
|
// startRTMAndDial attempts to connect to the slack websocket. If useRTMStart is true,
|
|
// then it returns the full information returned by the "rtm.start" method on the
|
|
// slack API. Else it uses the "rtm.connect" method to connect
|
|
func (rtm *RTM) startRTMAndDial(useRTMStart bool) (*Info, *websocket.Conn, error) {
|
|
var info *Info
|
|
var url string
|
|
var err error
|
|
|
|
if useRTMStart {
|
|
rtm.Debugf("Starting RTM")
|
|
info, url, err = rtm.StartRTM()
|
|
} else {
|
|
rtm.Debugf("Connecting to RTM")
|
|
info, url, err = rtm.ConnectRTM()
|
|
}
|
|
if err != nil {
|
|
rtm.Debugf("Failed to start or connect to RTM: %s", err)
|
|
return nil, nil, err
|
|
}
|
|
|
|
rtm.Debugf("Dialing to websocket on url %s", url)
|
|
// Only use HTTPS for connections to prevent MITM attacks on the connection.
|
|
upgradeHeader := http.Header{}
|
|
upgradeHeader.Add("Origin", "https://api.slack.com")
|
|
conn, _, err := websocket.DefaultDialer.Dial(url, upgradeHeader)
|
|
if err != nil {
|
|
rtm.Debugf("Failed to dial to the websocket: %s", err)
|
|
return nil, nil, err
|
|
}
|
|
return info, conn, err
|
|
}
|
|
|
|
// killConnection stops the websocket connection and signals to all goroutines
|
|
// that they should cease listening to the connection for events.
|
|
//
|
|
// This should not be called directly! Instead a boolean value (true for
|
|
// intentional, false otherwise) should be sent to the killChannel on the RTM.
|
|
func (rtm *RTM) killConnection(keepRunning chan bool, intentional bool) error {
|
|
rtm.Debugln("killing connection")
|
|
if rtm.isConnected {
|
|
close(keepRunning)
|
|
}
|
|
rtm.isConnected = false
|
|
rtm.wasIntentional = intentional
|
|
err := rtm.conn.Close()
|
|
rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{intentional}}
|
|
return err
|
|
}
|
|
|
|
// handleEvents is a blocking function that handles all events. This sends
|
|
// pings when asked to (on rtm.forcePing) and upon every given elapsed
|
|
// interval. This also sends outgoing messages that are received from the RTM's
|
|
// outgoingMessages channel. This also handles incoming raw events from the RTM
|
|
// rawEvents channel.
|
|
func (rtm *RTM) handleEvents(keepRunning chan bool, interval time.Duration) {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
// catch "stop" signal on channel close
|
|
case intentional := <-rtm.killChannel:
|
|
_ = rtm.killConnection(keepRunning, intentional)
|
|
return
|
|
// send pings on ticker interval
|
|
case <-ticker.C:
|
|
err := rtm.ping()
|
|
if err != nil {
|
|
_ = rtm.killConnection(keepRunning, false)
|
|
return
|
|
}
|
|
case <-rtm.forcePing:
|
|
err := rtm.ping()
|
|
if err != nil {
|
|
_ = rtm.killConnection(keepRunning, false)
|
|
return
|
|
}
|
|
// listen for messages that need to be sent
|
|
case msg := <-rtm.outgoingMessages:
|
|
rtm.sendOutgoingMessage(msg)
|
|
// listen for incoming messages that need to be parsed
|
|
case rawEvent := <-rtm.rawEvents:
|
|
rtm.handleRawEvent(rawEvent)
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleIncomingEvents monitors the RTM's opened websocket for any incoming
|
|
// events. It pushes the raw events onto the RTM channel rawEvents.
|
|
//
|
|
// This will stop executing once the RTM's keepRunning channel has been closed
|
|
// or has anything sent to it.
|
|
func (rtm *RTM) handleIncomingEvents(keepRunning <-chan bool) {
|
|
for {
|
|
// non-blocking listen to see if channel is closed
|
|
select {
|
|
// catch "stop" signal on channel close
|
|
case <-keepRunning:
|
|
return
|
|
default:
|
|
rtm.receiveIncomingEvent()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (rtm *RTM) sendWithDeadline(msg interface{}) error {
|
|
// set a write deadline on the connection
|
|
if err := rtm.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil {
|
|
return err
|
|
}
|
|
if err := rtm.conn.WriteJSON(msg); err != nil {
|
|
return err
|
|
}
|
|
// remove write deadline
|
|
return rtm.conn.SetWriteDeadline(time.Time{})
|
|
}
|
|
|
|
// sendOutgoingMessage sends the given OutgoingMessage to the slack websocket.
|
|
//
|
|
// It does not currently detect if a outgoing message fails due to a disconnect
|
|
// and instead lets a future failed 'PING' detect the failed connection.
|
|
func (rtm *RTM) sendOutgoingMessage(msg OutgoingMessage) {
|
|
rtm.Debugln("Sending message:", msg)
|
|
if len(msg.Text) > MaxMessageTextLength {
|
|
rtm.IncomingEvents <- RTMEvent{"outgoing_error", &MessageTooLongEvent{
|
|
Message: msg,
|
|
MaxLength: MaxMessageTextLength,
|
|
}}
|
|
return
|
|
}
|
|
|
|
if err := rtm.sendWithDeadline(msg); err != nil {
|
|
rtm.IncomingEvents <- RTMEvent{"outgoing_error", &OutgoingErrorEvent{
|
|
Message: msg,
|
|
ErrorObj: err,
|
|
}}
|
|
// TODO force ping?
|
|
}
|
|
}
|
|
|
|
// ping sends a 'PING' message to the RTM's websocket. If the 'PING' message
|
|
// fails to send then this returns an error signifying that the connection
|
|
// should be considered disconnected.
|
|
//
|
|
// This does not handle incoming 'PONG' responses but does store the time of
|
|
// each successful 'PING' send so latency can be detected upon a 'PONG'
|
|
// response.
|
|
func (rtm *RTM) ping() error {
|
|
id := rtm.idGen.Next()
|
|
rtm.Debugln("Sending PING ", id)
|
|
rtm.pings[id] = time.Now()
|
|
|
|
msg := &Ping{ID: id, Type: "ping"}
|
|
|
|
if err := rtm.sendWithDeadline(msg); err != nil {
|
|
rtm.Debugf("RTM Error sending 'PING %d': %s", id, err.Error())
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// receiveIncomingEvent attempts to receive an event from the RTM's websocket.
|
|
// This will block until a frame is available from the websocket.
|
|
func (rtm *RTM) receiveIncomingEvent() {
|
|
event := json.RawMessage{}
|
|
err := rtm.conn.ReadJSON(&event)
|
|
if err == io.EOF {
|
|
// EOF's don't seem to signify a failed connection so instead we ignore
|
|
// them here and detect a failed connection upon attempting to send a
|
|
// 'PING' message
|
|
|
|
// trigger a 'PING' to detect pontential websocket disconnect
|
|
rtm.forcePing <- true
|
|
return
|
|
} else if err != nil {
|
|
rtm.IncomingEvents <- RTMEvent{"incoming_error", &IncomingEventError{
|
|
ErrorObj: err,
|
|
}}
|
|
// force a ping here too?
|
|
return
|
|
} else if len(event) == 0 {
|
|
rtm.Debugln("Received empty event")
|
|
return
|
|
}
|
|
rtm.Debugln("Incoming Event:", string(event[:]))
|
|
rtm.rawEvents <- event
|
|
}
|
|
|
|
// handleRawEvent takes a raw JSON message received from the slack websocket
|
|
// and handles the encoded event.
|
|
func (rtm *RTM) handleRawEvent(rawEvent json.RawMessage) {
|
|
event := &Event{}
|
|
err := json.Unmarshal(rawEvent, event)
|
|
if err != nil {
|
|
rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}}
|
|
return
|
|
}
|
|
switch event.Type {
|
|
case "":
|
|
rtm.handleAck(rawEvent)
|
|
case "hello":
|
|
rtm.IncomingEvents <- RTMEvent{"hello", &HelloEvent{}}
|
|
case "pong":
|
|
rtm.handlePong(rawEvent)
|
|
case "desktop_notification":
|
|
rtm.Debugln("Received desktop notification, ignoring")
|
|
default:
|
|
rtm.handleEvent(event.Type, rawEvent)
|
|
}
|
|
}
|
|
|
|
// handleAck handles an incoming 'ACK' message.
|
|
func (rtm *RTM) handleAck(event json.RawMessage) {
|
|
ack := &AckMessage{}
|
|
if err := json.Unmarshal(event, ack); err != nil {
|
|
rtm.Debugln("RTM Error unmarshalling 'ack' event:", err)
|
|
rtm.Debugln(" -> Erroneous 'ack' event:", string(event))
|
|
return
|
|
}
|
|
|
|
if ack.Ok {
|
|
rtm.IncomingEvents <- RTMEvent{"ack", ack}
|
|
} else if ack.RTMResponse.Error != nil {
|
|
// As there is no documentation for RTM error-codes, this
|
|
// identification of a rate-limit warning is very brittle.
|
|
if ack.RTMResponse.Error.Code == -1 && ack.RTMResponse.Error.Msg == "slow down, too many messages..." {
|
|
rtm.IncomingEvents <- RTMEvent{"ack_error", &RateLimitEvent{}}
|
|
} else {
|
|
rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{ack.Error}}
|
|
}
|
|
} else {
|
|
rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{fmt.Errorf("ack decode failure")}}
|
|
}
|
|
}
|
|
|
|
// handlePong handles an incoming 'PONG' message which should be in response to
|
|
// a previously sent 'PING' message. This is then used to compute the
|
|
// connection's latency.
|
|
func (rtm *RTM) handlePong(event json.RawMessage) {
|
|
pong := &Pong{}
|
|
if err := json.Unmarshal(event, pong); err != nil {
|
|
rtm.Debugln("RTM Error unmarshalling 'pong' event:", err)
|
|
rtm.Debugln(" -> Erroneous 'ping' event:", string(event))
|
|
return
|
|
}
|
|
if pingTime, exists := rtm.pings[pong.ReplyTo]; exists {
|
|
latency := time.Since(pingTime)
|
|
rtm.IncomingEvents <- RTMEvent{"latency_report", &LatencyReport{Value: latency}}
|
|
delete(rtm.pings, pong.ReplyTo)
|
|
} else {
|
|
rtm.Debugln("RTM Error - unmatched 'pong' event:", string(event))
|
|
}
|
|
}
|
|
|
|
// handleEvent is the "default" response to an event that does not have a
|
|
// special case. It matches the command's name to a mapping of defined events
|
|
// and then sends the corresponding event struct to the IncomingEvents channel.
|
|
// If the event type is not found or the event cannot be unmarshalled into the
|
|
// correct struct then this sends an UnmarshallingErrorEvent to the
|
|
// IncomingEvents channel.
|
|
func (rtm *RTM) handleEvent(typeStr string, event json.RawMessage) {
|
|
v, exists := eventMapping[typeStr]
|
|
if !exists {
|
|
rtm.Debugf("RTM Error, received unmapped event %q: %s\n", typeStr, string(event))
|
|
err := fmt.Errorf("RTM Error: Received unmapped event %q: %s\n", typeStr, string(event))
|
|
rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}}
|
|
return
|
|
}
|
|
t := reflect.TypeOf(v)
|
|
recvEvent := reflect.New(t).Interface()
|
|
err := json.Unmarshal(event, recvEvent)
|
|
if err != nil {
|
|
rtm.Debugf("RTM Error, could not unmarshall event %q: %s\n", typeStr, string(event))
|
|
err := fmt.Errorf("RTM Error: Could not unmarshall event %q: %s\n", typeStr, string(event))
|
|
rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}}
|
|
return
|
|
}
|
|
rtm.IncomingEvents <- RTMEvent{typeStr, recvEvent}
|
|
}
|
|
|
|
// eventMapping holds a mapping of event names to their corresponding struct
|
|
// implementations. The structs should be instances of the unmarshalling
|
|
// target for the matching event type.
|
|
var eventMapping = map[string]interface{}{
|
|
"message": MessageEvent{},
|
|
"presence_change": PresenceChangeEvent{},
|
|
"user_typing": UserTypingEvent{},
|
|
|
|
"channel_marked": ChannelMarkedEvent{},
|
|
"channel_created": ChannelCreatedEvent{},
|
|
"channel_joined": ChannelJoinedEvent{},
|
|
"channel_left": ChannelLeftEvent{},
|
|
"channel_deleted": ChannelDeletedEvent{},
|
|
"channel_rename": ChannelRenameEvent{},
|
|
"channel_archive": ChannelArchiveEvent{},
|
|
"channel_unarchive": ChannelUnarchiveEvent{},
|
|
"channel_history_changed": ChannelHistoryChangedEvent{},
|
|
|
|
"dnd_updated": DNDUpdatedEvent{},
|
|
"dnd_updated_user": DNDUpdatedEvent{},
|
|
|
|
"im_created": IMCreatedEvent{},
|
|
"im_open": IMOpenEvent{},
|
|
"im_close": IMCloseEvent{},
|
|
"im_marked": IMMarkedEvent{},
|
|
"im_history_changed": IMHistoryChangedEvent{},
|
|
|
|
"group_marked": GroupMarkedEvent{},
|
|
"group_open": GroupOpenEvent{},
|
|
"group_joined": GroupJoinedEvent{},
|
|
"group_left": GroupLeftEvent{},
|
|
"group_close": GroupCloseEvent{},
|
|
"group_rename": GroupRenameEvent{},
|
|
"group_archive": GroupArchiveEvent{},
|
|
"group_unarchive": GroupUnarchiveEvent{},
|
|
"group_history_changed": GroupHistoryChangedEvent{},
|
|
|
|
"file_created": FileCreatedEvent{},
|
|
"file_shared": FileSharedEvent{},
|
|
"file_unshared": FileUnsharedEvent{},
|
|
"file_public": FilePublicEvent{},
|
|
"file_private": FilePrivateEvent{},
|
|
"file_change": FileChangeEvent{},
|
|
"file_deleted": FileDeletedEvent{},
|
|
"file_comment_added": FileCommentAddedEvent{},
|
|
"file_comment_edited": FileCommentEditedEvent{},
|
|
"file_comment_deleted": FileCommentDeletedEvent{},
|
|
|
|
"pin_added": PinAddedEvent{},
|
|
"pin_removed": PinRemovedEvent{},
|
|
|
|
"star_added": StarAddedEvent{},
|
|
"star_removed": StarRemovedEvent{},
|
|
|
|
"reaction_added": ReactionAddedEvent{},
|
|
"reaction_removed": ReactionRemovedEvent{},
|
|
|
|
"pref_change": PrefChangeEvent{},
|
|
|
|
"team_join": TeamJoinEvent{},
|
|
"team_rename": TeamRenameEvent{},
|
|
"team_pref_change": TeamPrefChangeEvent{},
|
|
"team_domain_change": TeamDomainChangeEvent{},
|
|
"team_migration_started": TeamMigrationStartedEvent{},
|
|
|
|
"manual_presence_change": ManualPresenceChangeEvent{},
|
|
|
|
"user_change": UserChangeEvent{},
|
|
|
|
"emoji_changed": EmojiChangedEvent{},
|
|
|
|
"commands_changed": CommandsChangedEvent{},
|
|
|
|
"email_domain_changed": EmailDomainChangedEvent{},
|
|
|
|
"bot_added": BotAddedEvent{},
|
|
"bot_changed": BotChangedEvent{},
|
|
|
|
"accounts_changed": AccountsChangedEvent{},
|
|
|
|
"reconnect_url": ReconnectUrlEvent{},
|
|
}
|