1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
(ns redisload
(:use clojure.contrib.sql
clojure.contrib.str-utils
config
datalayer
utils)
(:require redis))
(defn redis-days [n]
(* 24 60 60))
;;;; Tag counters
(def tag-query "
SELECT
u.nick as author,
u.user_id as author_id,
m.message_id as message_id,
m.content as message_content,
m.is_image as is_image,
m.created_on as message_ts,
r.key as room,
r.admin_only as admin_only,
t.created_on as tagged_on,
u2.nick as tagger,
u2.user_id as tagger_id
FROM users u, messages m, rooms r, tags t, users u2
WHERE
u.user_id = m.user_id AND
m.message_id = t.message_id AND
r.room_id = m.room_id AND
u2.user_id = t.user_id AND
r.admin_only = false
")
(def tag-counter (ref 0))
(defn stream-tags [fs]
(with-connection *db*
(with-query-results rs [tag-query]
(doseq [r rs]
(if (:admin_only r)
(println r))
(dosync
(ref-set tag-counter (inc @tag-counter)))
(doseq [f fs]
(f r))))))
(def hall-map (ref {}))
(def popular-map (ref {}))
(def score-map (ref {}))
(def daily-hall-map (ref {}))
(defn update-popular [r]
(dosync
(let [author (:author r)
msg-id (:message_id r)
msg-dt (format-yyyymmdd (:message_ts r))
user-map (get @popular-map author {})
msg-cnt (get user-map msg-id 0)
hall-cnt (get @hall-map msg-id 0)
usr-cnt (get @score-map author 0)]
(alter score-map assoc author (inc usr-cnt))
(alter hall-map assoc msg-id (inc hall-cnt))
(when (:is_image r)
(alter daily-hall-map
update-in [msg-dt msg-id] #(inc (or % 0)))
(alter popular-map assoc author
(assoc user-map msg-id (inc msg-cnt)))))))
(defn transmit-popular []
(doseq [[nick msgs] @popular-map]
(let [sorted-msgs (sort #(>= (second %1) (second %2)) msgs)
userkey (redis-popular-key nick)]
(redis/atomically
(redis/del key)
(doseq [[msg-id score] (take (* num-popular-dumps 2)
sorted-msgs)]
(redis/zadd userkey score msg-id)))))
(println "cached popular data for" (count @popular-map) "users"))
(defn transmit-favscores []
(redis/atomically
(redis/del redis-favscores-key)
(doseq [[nick score] @score-map]
(redis/zadd redis-favscores-key score (lower-case nick))))
(println "cached favscores for " (count @score-map) "users"))
(defn transmit-hall []
(let [scores (take (* 2 num-hall-dumps)
(sort #(>= (second %1) (second %2)) @hall-map))]
(redis/atomically
(redis/del redis-hall-key)
(doseq [[msg-id score] scores]
(redis/zadd redis-hall-key score msg-id)))
(println "cached hall-of-fame")))
(defn transmit-daily-hall []
(doseq [[dt msgs] (sort-by first @daily-hall-map)]
(let [datekey (redis-daily-hall-key dt)]
(redis/atomically
(redis/del datekey)
(doseq [[msg-id score] (take 200 (sort-by #(- (second %)) msgs))]
(redis/zadd datekey score msg-id))
(println "inserted daily hall into" datekey)))))
(println "streaming tags")
(stream-tags [update-popular])
(println (format "processed %s tags" @tag-counter))
(redis/with-server redis-server
(transmit-favscores)
(transmit-popular)
(transmit-hall)
(transmit-daily-hall))
|