summaryrefslogtreecommitdiff
path: root/src/redisload.clj
blob: 1ca7815f5246a38cfd57d313284cab66e5a40286 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
(ns redisload
  (:use clojure.contrib.sql
        clojure.contrib.str-utils
        config
        datalayer
        utils)
  (:require redis))

(defn redis-days [n]
  (* 24 60 60))

;;;; Tag counters

(def tag-query "
SELECT
    u.nick as author,
    u.user_id as author_id,
    m.message_id as message_id,
    m.content as message_content,
    m.is_image as is_image,
    m.created_on as message_ts,
    r.key as room,
    r.admin_only as admin_only,
    t.created_on as tagged_on,
    u2.nick as tagger,
    u2.user_id as tagger_id
FROM users u, messages m, rooms r, tags t, users u2
WHERE
    u.user_id = m.user_id AND
    m.message_id = t.message_id AND
    r.room_id = m.room_id AND
    u2.user_id = t.user_id AND
    r.admin_only = false
")

(def tag-counter (ref 0))

(defn stream-tags [fs]
  (with-connection *db*
    (with-query-results rs [tag-query]
      (doseq [r rs]
        (if (:admin_only r)
          (println r))
        (dosync
         (ref-set tag-counter (inc @tag-counter)))
        (doseq [f fs]
          (f r))))))

(def hall-map (ref {}))
(def popular-map (ref {}))
(def score-map (ref {}))
(def daily-hall-map (ref {}))

(defn update-popular [r]
  (dosync
   (let [author      (:author r)
         msg-id      (:message_id r)
         msg-dt      (format-yyyymmdd (:message_ts r))
         user-map    (get @popular-map author {})
         msg-cnt     (get user-map msg-id 0)
         hall-cnt    (get @hall-map msg-id 0)
         usr-cnt     (get @score-map author 0)]
     (alter score-map assoc author (inc usr-cnt))
     (alter hall-map assoc msg-id (inc hall-cnt))
     (when (:is_image r)
       (alter daily-hall-map
              update-in [msg-dt msg-id] #(inc (or % 0)))
       (alter popular-map assoc author
              (assoc user-map msg-id (inc msg-cnt)))))))

(defn transmit-popular []
  (doseq [[nick msgs] @popular-map]
    (let [sorted-msgs (sort #(>= (second %1) (second %2)) msgs)
          userkey     (redis-popular-key nick)]
      (redis/atomically
       (redis/del key)
       (doseq [[msg-id score] (take (* num-popular-dumps 2)
                                    sorted-msgs)]
         (redis/zadd userkey score msg-id)))))
  (println "cached popular data for" (count @popular-map) "users"))

(defn transmit-favscores []
  (redis/atomically
   (redis/del redis-favscores-key)
   (doseq [[nick score] @score-map]
     (redis/zadd redis-favscores-key score (lower-case nick))))
  (println "cached favscores for " (count @score-map) "users"))

(defn transmit-hall []
  (let [scores (take (* 2 num-hall-dumps)
                     (sort #(>= (second %1) (second %2)) @hall-map))]
    (redis/atomically
     (redis/del redis-hall-key)
     (doseq [[msg-id score] scores]
       (redis/zadd redis-hall-key score msg-id)))
    (println "cached hall-of-fame")))

(defn transmit-daily-hall []
  (doseq [[dt msgs] (sort-by first @daily-hall-map)]
    (let [datekey (redis-daily-hall-key dt)]
      (redis/atomically
       (redis/del datekey)
       (doseq [[msg-id score] (take 200 (sort-by #(- (second %)) msgs))]
         (redis/zadd datekey score msg-id))
       (println "inserted daily hall into" datekey)))))

(println "streaming tags")
(stream-tags [update-popular])
(println (format "processed %s tags" @tag-counter))

(redis/with-server redis-server
  (transmit-favscores)
  (transmit-popular)
  (transmit-hall)
  (transmit-daily-hall))