(ns redisload (:use clojure.contrib.sql config datalayer utils) (:require redis)) (defn redis-days [n] (* 24 60 60)) (def tag-query " SELECT u.nick as author, u.user_id as author_id, m.message_id as message_id, m.content as message_content, m.is_image as is_image, m.created_on as message_ts, r.key as room, t.created_on as tagged_on, u2.nick as tagger, u2.user_id as tagger_id FROM users u, messages m, rooms r, tags t, users u2 WHERE u.user_id = m.user_id AND m.message_id = t.message_id AND r.room_id = m.room_id AND u2.user_id = t.user_id ") (def tag-counter (ref 0)) (defn stream-tags [fs] (with-connection *db* (with-query-results rs [tag-query] (doseq [r rs] (dosync (ref-set tag-counter (inc @tag-counter))) (doseq [f fs] (f r)))))) (def popular-map (ref {})) (defn update-popular [r] (if (:is_image r) (dosync (let [user-map (or (get @popular-map (:author r)) {}) msg-cnt (or (get user-map (:message_id r)) 0)] (alter popular-map assoc (:author r) (assoc user-map (:message_id r) (inc msg-cnt))))))) (defn transmit-popular [] (doseq [[nick msgs] @popular-map] (let [sorted-msgs (sort #(>= (second %1) (second %2)) msgs) userkey (str "popular:" nick)] (redis/atomically (redis/del key) (doseq [[msg-id score] (take (* num-popular-dumps 2) sorted-msgs)] (redis/zadd userkey score msg-id))))) ;; (redis/expire userkey (redis-days 2))))) -- need to schedule auto loader (println "cached popular data for" (count @popular-map) "users")) (println "streaming tags") (stream-tags [update-popular]) (println (format "processed %s tags" @tag-counter)) (redis/with-server redis-server (transmit-popular))