1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
(ns redisload
(:use clojure.contrib.sql
config
datalayer
utils)
(:require redis))
(defn redis-days [n]
(* 24 60 60))
(def tag-query "
SELECT
u.nick as author,
u.user_id as author_id,
m.message_id as message_id,
m.content as message_content,
m.is_image as is_image,
m.created_on as message_ts,
r.key as room,
t.created_on as tagged_on,
u2.nick as tagger,
u2.user_id as tagger_id
FROM users u, messages m, rooms r, tags t, users u2
WHERE
u.user_id = m.user_id AND
m.message_id = t.message_id AND
r.room_id = m.room_id AND
u2.user_id = t.user_id
")
(def tag-counter (ref 0))
(defn stream-tags [fs]
(with-connection *db*
(with-query-results rs [tag-query]
(doseq [r rs]
(dosync
(ref-set tag-counter (inc @tag-counter)))
(doseq [f fs]
(f r))))))
(def popular-map (ref {}))
(defn update-popular [r]
(if (:is_image r)
(dosync
(let [user-map (or (get @popular-map (:author r)) {})
msg-cnt (or (get user-map (:message_id r)) 0)]
(alter popular-map assoc (:author r)
(assoc user-map (:message_id r) (inc msg-cnt)))))))
(defn transmit-popular []
(doseq [[nick msgs] @popular-map]
(let [sorted-msgs (sort #(>= (second %1) (second %2)) msgs)
userkey (str "popular:" nick)]
(redis/atomically
(redis/del key)
(doseq [[msg-id score] (take (* num-popular-dumps 2)
sorted-msgs)]
(redis/zadd userkey score msg-id))
;; (redis/expire userkey (redis-days 2))))) -- need to schedule auto loader
(println "cached popular data for" (count @popular-map) "users"))
(println "streaming tags")
(stream-tags [update-popular])
(println (format "processed %s tags" @tag-counter))
(redis/with-server redis-server
(transmit-popular))
|