Fixing issues with channel stats

This commit is contained in:
Brian Buller 2015-10-29 14:16:45 -05:00
parent d5947f745f
commit aef1feb64f
5 changed files with 152 additions and 89 deletions

View File

@ -18,8 +18,9 @@ func (p *generalProcessor) GetHelp() string {
return "" return ""
} }
func (p *generalProcessor) ProcessAdminMessage(slack *Slack, m *Message) {}
func (p *generalProcessor) ProcessMessage(slack *Slack, m *Message) {} func (p *generalProcessor) ProcessMessage(slack *Slack, m *Message) {}
func (p *generalProcessor) ProcessBotMessage(slack *Slack, m *Message) {}
func (p *generalProcessor) ProcessAdminMessage(slack *Slack, m *Message) {}
func (p *generalProcessor) ProcessUserMessage(slack *Slack, m *Message) {} func (p *generalProcessor) ProcessUserMessage(slack *Slack, m *Message) {}
func (p *generalProcessor) ProcessAdminUserMessage(slack *Slack, m *Message) { func (p *generalProcessor) ProcessAdminUserMessage(slack *Slack, m *Message) {
@ -68,9 +69,11 @@ func (p *generalProcessor) ProcessAdminUserMessage(slack *Slack, m *Message) {
} }
} }
} }
func (p *generalProcessor) ProcessBotUserMessage(slack *Slack, m *Message) {}
func (p *generalProcessor) ProcessChannelMessage(slack *Slack, m *Message) {} func (p *generalProcessor) ProcessChannelMessage(slack *Slack, m *Message) {}
func (p *generalProcessor) ProcessAdminChannelMessage(slack *Slack, m *Message) {} func (p *generalProcessor) ProcessAdminChannelMessage(slack *Slack, m *Message) {}
func (p *generalProcessor) ProcessBotChannelMessage(slack *Slack, m *Message) {}
/* /*
*General Statistics Processor *General Statistics Processor
@ -96,15 +99,18 @@ func (p *generalStatProcessor) ProcessMessage(m *Message) {
incrementUserStat(m.User, "message-dow-"+m.Time.Format("Mon")) incrementUserStat(m.User, "message-dow-"+m.Time.Format("Mon"))
incrementUserStat(m.User, "message-dom-"+m.Time.Format("_2")) incrementUserStat(m.User, "message-dom-"+m.Time.Format("_2"))
} }
func (p *generalStatProcessor) ProcessBotMessage(m *Message) {}
func (p *generalStatProcessor) ProcessUserMessage(m *Message) { func (p *generalStatProcessor) ProcessUserMessage(m *Message) {
incrementUserStat(m.User, "bot-message") incrementUserStat(m.User, "bot-message")
} }
func (p *generalStatProcessor) ProcessBotUserMessage(m *Message) {}
func (p *generalStatProcessor) ProcessChannelMessage(m *Message) { func (p *generalStatProcessor) ProcessChannelMessage(m *Message) {
incrementUserStat(m.User, "channel-message") incrementUserStat(m.User, "channel-message")
incrementChannelStat(m.User, "message-hour-"+m.Time.Format("15")) incrementChannelStat(m.Channel, "message-hour-"+m.Time.Format("15"))
incrementChannelStat(m.User, "message-dow-"+m.Time.Format("Mon")) incrementChannelStat(m.Channel, "message-dow-"+m.Time.Format("Mon"))
incrementChannelStat(m.User, "message-dom-"+m.Time.Format("_2")) incrementChannelStat(m.Channel, "message-dom-"+m.Time.Format("_2"))
} }
func (p *generalStatProcessor) ProcessBotChannelMessage(m *Message) {}

View File

@ -13,8 +13,12 @@ func (p *levelUpStatProcessor) GetStatKeys() []string {
} }
func (p *levelUpStatProcessor) ProcessMessage(m *Message) {} func (p *levelUpStatProcessor) ProcessMessage(m *Message) {}
func (p *levelUpStatProcessor) ProcessAdminMessage(m *Message) {}
func (p *levelUpStatProcessor) ProcessBotMessage(m *Message) {}
func (p *levelUpStatProcessor) ProcessUserMessage(m *Message) {} func (p *levelUpStatProcessor) ProcessUserMessage(m *Message) {}
func (p *levelUpStatProcessor) ProcessAdminUserMessage(m *Message) {}
func (p *levelUpStatProcessor) ProcessBotUserMessage(m *Message) {}
func (p *levelUpStatProcessor) ProcessChannelMessage(m *Message) { func (p *levelUpStatProcessor) ProcessChannelMessage(m *Message) {
// levelup XP, for now, is awarded like this: // levelup XP, for now, is awarded like this:
@ -33,3 +37,5 @@ func (p *levelUpStatProcessor) ProcessChannelMessage(m *Message) {
} }
} }
} }
func (p *levelUpStatProcessor) ProcessAdminChannelMessage(m *Message) {}
func (p *levelUpStatProcessor) ProcessBotChannelMessage(m *Message) {}

View File

@ -5,8 +5,6 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strconv"
"strings"
"sync/atomic" "sync/atomic"
"time" "time"
@ -86,11 +84,7 @@ func (s *Slack) getMessage() (Message, error) {
var m Message var m Message
err := websocket.JSON.Receive(s.socket, &m) err := websocket.JSON.Receive(s.socket, &m)
txtArr := strings.Split(m.Ts, ".") m.Time = convertSlackTimestamp(m.Ts)
if t, err := strconv.Atoi(txtArr[0]); err == nil {
rawts := int64(t)
m.Time = time.Unix(0, rawts*1000000000)
}
return m, err return m, err
} }
@ -153,6 +147,9 @@ func (s *Slack) getChannelInfo(cid string) (*Channel, error) {
err = fmt.Errorf("Slack error: %s", respObj.Error) err = fmt.Errorf("Slack error: %s", respObj.Error)
return nil, err return nil, err
} }
respObj.Channel.LastRead = convertSlackTimestamp(respObj.Channel.LastReadRaw)
return respObj.Channel, nil return respObj.Channel, nil
} }

View File

@ -1,7 +1,6 @@
package main package main
import ( import (
"encoding/json"
"fmt" "fmt"
"os" "os"
"strings" "strings"
@ -15,10 +14,13 @@ type messageProcessor interface {
GetHelp() string GetHelp() string
ProcessMessage(s *Slack, m *Message) ProcessMessage(s *Slack, m *Message)
ProcessAdminMessage(s *Slack, m *Message) ProcessAdminMessage(s *Slack, m *Message)
ProcessBotMessage(s *Slack, m *Message)
ProcessUserMessage(s *Slack, m *Message) ProcessUserMessage(s *Slack, m *Message)
ProcessAdminUserMessage(s *Slack, m *Message) ProcessAdminUserMessage(s *Slack, m *Message)
ProcessBotUserMessage(s *Slack, m *Message)
ProcessChannelMessage(s *Slack, m *Message) ProcessChannelMessage(s *Slack, m *Message)
ProcessAdminChannelMessage(s *Slack, m *Message) ProcessAdminChannelMessage(s *Slack, m *Message)
ProcessBotChannelMessage(s *Slack, m *Message)
} }
var messageProcessors []messageProcessor var messageProcessors []messageProcessor
@ -27,8 +29,11 @@ type statProcessor interface {
GetName() string GetName() string
GetStatKeys() []string GetStatKeys() []string
ProcessMessage(m *Message) ProcessMessage(m *Message)
ProcessBotMessage(m *Message)
ProcessUserMessage(m *Message) ProcessUserMessage(m *Message)
ProcessBotUserMessage(m *Message)
ProcessChannelMessage(m *Message) ProcessChannelMessage(m *Message)
ProcessBotChannelMessage(m *Message)
} }
var statProcessors []statProcessor var statProcessors []statProcessor
@ -77,43 +82,68 @@ func statBotMain(slack *Slack) {
} }
func processMessage(slack *Slack, m *Message) { func processMessage(slack *Slack, m *Message) {
/*
if mb, me := json.Marshal(m); me == nil { if mb, me := json.Marshal(m); me == nil {
// Write the JSON representation to the log // Write the JSON representation to the log
writeToLog(string(mb) + "\n") writeToLog(string(mb) + "\n")
} }
*/
// TODO: Handle reaction_added messages // TODO: Handle reaction_added messages
// TODO: Handle reaction_removed messages // TODO: Handle reaction_removed messages
if m.Type == "message" { if m.Type == "message" {
for _, proc := range statProcessors { var err error
proc.ProcessMessage(m) var usr *User
}
for _, proc := range messageProcessors {
if isAdmin(m.User) {
proc.ProcessAdminMessage(slack, m)
}
proc.ProcessMessage(slack, m)
}
// Check if we know who the user is // Check if we know who the user is
usr, err := getUserInfo(m.User) usr, err = getUserInfo(m.User)
// If the user information hasn't been updated in the last day, update it. // If the user information hasn't been updated in the last day, update it.
if err != nil || usr.LastUpdated.IsZero() || time.Since(usr.LastUpdated) > (time.Hour*24) { if err != nil || usr.LastUpdated.IsZero() || time.Since(usr.LastUpdated) > (time.Hour*24) {
if u, ue := slack.getUserInfo(m.User); ue == nil { if u, ue := slack.getUserInfo(m.User); ue == nil {
saveUserInfo(u) saveUserInfo(u)
} }
} }
for _, stats := range statProcessors {
if err == nil {
if usr.IsBot {
stats.ProcessBotMessage(m)
} else {
stats.ProcessMessage(m)
}
}
}
for _, proc := range messageProcessors {
if isAdmin(m.User) {
proc.ProcessAdminMessage(slack, m)
}
if usr.IsBot {
proc.ProcessBotMessage(slack, m)
} else {
proc.ProcessMessage(slack, m)
}
}
if m.Channel != "" { if m.Channel != "" {
// Check if we know what the channel is // Check if we know what the channel is
chnl, err := getChannelInfo(m.Channel) chnl, err := getChannelInfo(m.Channel)
var isDirectMessage bool
// If the channel information hasn't been updated in the last day, update it. // If the channel information hasn't been updated in the last day, update it.
if err != nil || chnl.LastUpdated.IsZero() || time.Since(chnl.LastUpdated) > (time.Hour*24) { if err != nil || chnl.LastUpdated.IsZero() || time.Since(chnl.LastUpdated) > (time.Hour*24) {
// Either we don't have this channel, or it's a direct message // Either we don't have this channel, or it's a direct message
if c, ce := slack.getChannelInfo(m.Channel); ce != nil { if c, ce := slack.getChannelInfo(m.Channel); ce == nil {
// Save channel info
saveChannelInfo(c)
} else {
isDirectMessage = true
}
}
if isDirectMessage {
// Invalid channel, save as a direct message // Invalid channel, save as a direct message
saveUserMessage(m.User, m) saveUserMessage(m.User, m)
for _, proc := range statProcessors { for _, stats := range statProcessors {
proc.ProcessUserMessage(m) stats.ProcessUserMessage(m)
} }
for _, proc := range messageProcessors { for _, proc := range messageProcessors {
if isAdmin(m.User) { if isAdmin(m.User) {
@ -122,10 +152,9 @@ func processMessage(slack *Slack, m *Message) {
proc.ProcessUserMessage(slack, m) proc.ProcessUserMessage(slack, m)
} }
} else { } else {
// Save channel info
saveChannelInfo(c)
// And save the channel message // And save the channel message
saveChannelMessage(m.Channel, m) err = saveChannelMessage(m.Channel, m)
for _, proc := range statProcessors { for _, proc := range statProcessors {
proc.ProcessChannelMessage(m) proc.ProcessChannelMessage(m)
} }
@ -136,7 +165,6 @@ func processMessage(slack *Slack, m *Message) {
} }
} }
} }
}
func registerMessageProcessor(b messageProcessor) { func registerMessageProcessor(b messageProcessor) {
// Register a Message Processor // Register a Message Processor

View File

@ -3,6 +3,7 @@ package main
import ( import (
"fmt" "fmt"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/boltdb/bolt" "github.com/boltdb/bolt"
@ -69,6 +70,23 @@ func initDatabase() error {
return err return err
} }
func isBot(uid string) (bool, error) {
openDatabase()
var ret bool
err := db.View(func(tx *bolt.Tx) error {
var b, uB *bolt.Bucket
var err error
b = tx.Bucket([]byte("users"))
if b == nil {
return fmt.Errorf("Unable to open 'users' bucket")
}
uB = b.Bucket([]byte(uid))
ret, err = bktGetBool(uB, "is_bot")
return err
})
return ret, err
}
func isAdmin(uid string) bool { func isAdmin(uid string) bool {
var foundUser bool var foundUser bool
openDatabase() openDatabase()
@ -165,7 +183,7 @@ type Channel struct {
IsArchived bool `json:"is_archived"` IsArchived bool `json:"is_archived"`
IsGeneral bool `json:"is_general"` IsGeneral bool `json:"is_general"`
IsMember bool `json:"is_member"` IsMember bool `json:"is_member"`
LastReadRaw int `json:"last_read"` LastReadRaw string `json:"last_read"`
LastRead time.Time LastRead time.Time
Latest *Message `json:"latest"` Latest *Message `json:"latest"`
UnreadCount int `json:"unread_count"` UnreadCount int `json:"unread_count"`
@ -327,6 +345,7 @@ func saveChannelInfo(chnl *Channel) error {
if err = bktPutString(chMembersB, idxKey, mm); err != nil { if err = bktPutString(chMembersB, idxKey, mm); err != nil {
return err return err
} }
mbIdx++
} }
if chnl.Topic != nil { if chnl.Topic != nil {
if err = bktPutString(chIB, "topic_value", chnl.Topic.Value); err != nil { if err = bktPutString(chIB, "topic_value", chnl.Topic.Value); err != nil {
@ -357,56 +376,59 @@ func saveChannelInfo(chnl *Channel) error {
return err return err
} }
func saveChannelStat(channel string, key string, val string) error { func saveChannelStat(channel string, key string, val int) error {
openDatabase() openDatabase()
err := db.Update(func(tx *bolt.Tx) error { err := db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("channels")) var b, chB, chSB *bolt.Bucket
if chB, err := b.CreateBucketIfNotExists([]byte(channel)); err == nil { var err error
chB.Put([]byte(key), []byte(val))
b = tx.Bucket([]byte("channels"))
if chB, err = b.CreateBucketIfNotExists([]byte(channel)); err == nil {
if chSB, err = chB.CreateBucketIfNotExists([]byte("stats")); err == nil {
err = bktPutInt(chSB, key, val)
} }
return nil }
return err
}) })
closeDatabase() closeDatabase()
return err return err
} }
func incrementChannelStat(channel string, key string) error { func addChannelStat(channel string, key string, addVal int) error {
openDatabase() openDatabase()
strRet, err := getChannelStat(channel, key) v, err := getChannelStat(channel, key)
if err == nil { err = saveChannelStat(channel, key, v+addVal)
if i, err := strconv.Atoi(strRet); err == nil {
i++
return saveChannelStat(channel, key, fmt.Sprintf("%d", i))
}
return err
}
closeDatabase() closeDatabase()
return err return err
} }
func incrementChannelStat(channel string, key string) error {
return addChannelStat(channel, key, 1)
}
func decrementChannelStat(channel string, key string) error { func decrementChannelStat(channel string, key string) error {
openDatabase() return addChannelStat(channel, key, -1)
strRet, err := getChannelStat(channel, key)
if err == nil {
if i, err := strconv.Atoi(strRet); err == nil {
i--
return saveChannelStat(channel, key, fmt.Sprintf("%d", i))
}
return err
}
closeDatabase()
return err
} }
func getChannelStat(channel string, key string) (string, error) { func getChannelStat(channel string, key string) (int, error) {
openDatabase() openDatabase()
var ret string var ret int
err := db.View(func(tx *bolt.Tx) error { err := db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("channels")) var b, chB, chSB *bolt.Bucket
if chB := b.Bucket([]byte(channel)); chB != nil { var err error
ret = string(chB.Get([]byte(key)))
b = tx.Bucket([]byte("channels"))
if b == nil {
return fmt.Errorf("Unable to open 'channels' bucket")
} }
return nil if chB, err = b.CreateBucketIfNotExists([]byte(channel)); err == nil {
if chSB, err = chB.CreateBucketIfNotExists([]byte("stats")); err == nil {
ret, err = bktGetInt(chSB, key)
return err
}
}
return err
}) })
closeDatabase() closeDatabase()
return ret, err return ret, err
@ -641,7 +663,7 @@ func saveUserMessage(user string, message *Message) error {
func getUserStat(user string, key string) (int, error) { func getUserStat(user string, key string) (int, error) {
openDatabase() openDatabase()
var ret int var ret int
err := db.Update(func(tx *bolt.Tx) error { err := db.View(func(tx *bolt.Tx) error {
var b, uB, uSB *bolt.Bucket var b, uB, uSB *bolt.Bucket
var err error var err error
@ -651,10 +673,6 @@ func getUserStat(user string, key string) (int, error) {
} }
if uB, err = b.CreateBucketIfNotExists([]byte(user)); err == nil { if uB, err = b.CreateBucketIfNotExists([]byte(user)); err == nil {
if uSB, err = uB.CreateBucketIfNotExists([]byte("stats")); err == nil { if uSB, err = uB.CreateBucketIfNotExists([]byte("stats")); err == nil {
uSB.ForEach(func(k, v []byte) error {
return nil
})
ret, err = bktGetInt(uSB, key) ret, err = bktGetInt(uSB, key)
return err return err
} }
@ -674,12 +692,10 @@ func saveUserStat(user string, key string, val int) error {
b = tx.Bucket([]byte("users")) b = tx.Bucket([]byte("users"))
if uB, err = b.CreateBucketIfNotExists([]byte(user)); err == nil { if uB, err = b.CreateBucketIfNotExists([]byte(user)); err == nil {
if uSB, err = uB.CreateBucketIfNotExists([]byte("stats")); err == nil { if uSB, err = uB.CreateBucketIfNotExists([]byte("stats")); err == nil {
if err = bktPutInt(uSB, key, val); err != nil { err = bktPutInt(uSB, key, val)
}
}
return err return err
}
}
}
return nil
}) })
closeDatabase() closeDatabase()
return err return err
@ -763,6 +779,16 @@ func bktPutTime(b *bolt.Bucket, key string, val time.Time) error {
return b.Put([]byte(key), []byte(bs)) return b.Put([]byte(key), []byte(bs))
} }
func convertSlackTimestamp(ts string) time.Time {
var ret time.Time
txtArr := strings.Split(ts, ".")
if t, err := strconv.Atoi(txtArr[0]); err == nil {
rawts := int64(t)
ret = time.Unix(0, rawts*1000000000)
}
return ret
}
// All user stats are saved in the db like so: // All user stats are saved in the db like so:
// users (bucket) // users (bucket)
// |- username1 (bucket) // |- username1 (bucket)