我對于Memcached的接觸,還是在去年看了CSDN的一系列國外大型網站架構設計而開始的。最初的時候只是簡單的封裝了Memcached Java版的客戶端,主要是對于配置的簡化以及Memcached多點備份作了一些工作,然后就作為ASF的組件一部分提供給其他Team使用。其實看過Memcached Java客戶端代碼的人就會了解其實客戶端的事情很簡單,就是要有一套高性能的Socket通信框架以及對Memcached的私有協議實現的接口,自己去做這些事情也是很簡單的,不過既然有可以滿足自己需求的開源部分,那么就去實現自己需要的但沒有實現的。這里我用的是Whalin的客戶端版本,這里為什么還要提出來講這個,后面會提到。
在對Java客戶端作了簡單封裝和擴展以后,由于其他Team使用的沒有什么特殊需求,也就沒有再去做太多的修改,直到最近自己的服務集成平臺需要做服務訪問控制,才重新豐富了Cache組件,也就是這個過程中對于Memcached的一些特性和小的細節有了一些新的認識。
作為服務集成平臺需要對服務有所監控,包括訪問頻率控制以及訪問次數控制。頻率控制其實很類似于硬件方面的頻率控制,例如硬件可以對IP的高頻率訪問視為攻擊,列入黑名單。而作為服務的訪問,對于服務訪問者的控制其實涉及到了業務參數,那么硬件就不是很適合去做這方面的控制,為此我也考慮了很久,最開始打算在Apache上做一個模塊控制,但是最后覺得還是放在后面的業務框架上做這件事情。當然后面我說說的方案可能并不好,但是也算是一種想法。要把頻繁的訪問數據記錄下來同時分析,那么數據庫肯定是不行的,最簡單的方式就是采用Cache,又因為是集群范圍內的控制,那么集中式Cache就非Memcached莫數了(分布式的Cache傳播本身損耗太大,集中式Cache本來的最大缺點就是單點,但作簡單的備份操作就可以基本解決此類問題)。
作為解決這個問題的方法來說只需要實現兩部分工作:訪問計數器,定時任務。定時任務在我做日志分析框架的時候都是采用了Jdk5的Concurrent包里面的ScheduledExecutorService,這個作簡單的循環任務足夠用了,同時也是有很好的多線程異步支持,復雜一點么用Quartz。計數器就要靠Memcached來實現了,本來一般的Cache最大的問題就是高并發下的事務保證,如果采用Get+Set來完成計數的話,那么高并發下計數器就會出現讀寫不一致性的問題,幸好Memcached提供了計數累加功能,讓這種累加動作能夠在服務端一次做好,服務端控制并發寫入,保證數據的一致性。
下面就看看以下幾個方法:
boolean storeCounter(String key, long count):存儲key的計數器,值為count。
long getCounter(String key):獲取key的計數器,如果不存在返回-1。
long addOrDecr(String key, long decr):計數器值減去decr,如果計數器不存在,保存decr作為計數器值
long addOrIncr(String key, long inc):計數器值增加inc,如果計數器不存在,保存inc作為計數器值
long decr(String key, long decr):與addOrDecr不同的是在計數器不存在的時候不保存任何值,返回-1
long incr(String key, long inc) :與addOrIncr不同的是在計數器不存在的時候不保存任何值,返回-1
這里需要說明幾點:
storeCounter和普通的set方法不同,如果通過set方式置入key:value的話,getCounter等其他四個方法都認為技術器不存在。所以Counter的存儲方式是和普通內容存儲不同的。
在不同的場景要慎用addOrXXXX和XXXX的方法,兩者還是有比較大的區別的。
計數器沒有提供移除特殊方法,使用delete方法可以移除計數器,但是頻繁的delete和addOrXXXX有時候會出現一些奇怪的問題(例如同名的計數器就沒有辦法再次被創建,不過這個還需要進一步的去研究一下看看)。一般情況下如果計數器的key不是很多,同時也會被復用,那么可以通過置為0或者減去已經分析過的數量來復位。
有上面的一套計數器機制就可以很方便的實現Memcached的計數功能,但是又一個問題出現了,如何讓定時任務去遍歷計數器,分析計數器是否到了閥值,觸發創建黑名單記錄的工作。早先我同事希望我能夠提供封裝好的keySet接口,但是我自己覺得其實作為Cache來說簡單就是最重要的,Cache不需要去遍歷。首先使用Cache的角色就應該知道Key,然后去Cache里面找,找不到就去后臺例如DB里面去搜索,然后將搜索的結果在考慮更新到Cache里面,這樣才是最高效并且最可靠的,Cache靠不住阿,隨時都可能會丟失或者崩潰,因此作為類似于一級緩存或者這類數據完整性要求不高,性能要求很高的場景使用最合適。當時就沒有提供這樣的接口,直到今天自己需要了,才考慮如何去做這件事情。
開始考慮是否能夠將key都記錄在另外的Cache中或者是Memcached中,首先在高并發下更新操作就是一大問題,再者Memcached的內存分配回收機制以及Value的大小限制都不能滿足這樣的需求,如果使用數據庫,那么頻繁更新操作勢必不可行,采用異步緩存刷新又有一個時間間隔期,同時更新也不是很方便。最后考慮如果能夠讓Memcached實現Keyset那么就是最好的解決方案,網上搜索了一下,找到一種策略,然后自己優化了一下,優化后的代碼如下:

Code
@SuppressWarnings("unchecked")
public Set keySet(int limit,boolean fast)
{
Set<String> keys = new HashSet<String>();
Map<String,Integer> dumps = new HashMap<String,Integer>();
Map slabs = getCacheClient().statsItems();
if (slabs != null && slabs.keySet() != null)
{
Iterator itemsItr = slabs.keySet().iterator();
while(itemsItr.hasNext())
{
String server = itemsItr.next().toString();
Map itemNames = (Map) slabs.get(server);
Iterator itemNameItr = itemNames.keySet().iterator();
while(itemNameItr.hasNext())
{
String itemName = itemNameItr.next().toString();
// itemAtt[0] = itemname
// itemAtt[1] = number
// itemAtt[2] = field
String[] itemAtt = itemName.split(":");
if (itemAtt[2].startsWith("number"))
dumps.put(itemAtt[1], Integer.parseInt(itemAtt[1]));
}
}
if (!dumps.values().isEmpty())
{
Iterator<Integer> dumpIter = dumps.values().iterator();
while(dumpIter.hasNext())
{
int dump = dumpIter.next();
Map cacheDump = statsCacheDump(dump,limit);
Iterator entryIter = cacheDump.values().iterator();
while (entryIter.hasNext())
{
Map items = (Map)entryIter.next();
Iterator ks = items.keySet().iterator();
while(ks.hasNext())
{
String k = (String)ks.next();
try
{
k = URLDecoder.decode(k,"UTF-8");
}
catch(Exception ex)
{
Logger.error(ex);
}
if (k != null && !k.trim().equals(""))
{
if (fast)
keys.add(k);
else
if (containsKey(k))
keys.add(k);
}
}
}
}
}
}
return keys;
}
對于上面代碼的了解需要從Memcached內存分配和回收機制開始,以前接觸Memcached的時候只是了解,這部分代碼寫了以后就有些知道怎么回事了。Memcached為了提高內存的分配和回收效率,采用了slab和dump分區的概念。Memcached一大優勢就是能夠充分利用Memory資源,將同機器或者不同機器的Memcached服務端組合成為對客戶端看似統一的存儲空間,Memcached可以在一臺機器上開多個端口作為服務端多個實例,也可以在多臺機器上開多個服務實例,而slab就是Memcached的服務端。下面是我封裝后的Cache配置:
配置
<?xml version="1.0" encoding="UTF-8"?>
<memcached>
<client name="mclient0" compressEnable="true" defaultEncoding="UTF-8" socketpool="pool0">
<!--errorHandler></errorHandler-->
</client>
<client name="mclient1" compressEnable="true" defaultEncoding="UTF-8" socketpool="pool1">
<!--errorHandler></errorHandler-->
</client>
<client name="mclient11" compressEnable="true" defaultEncoding="UTF-8" socketpool="pool11">
<!--errorHandler></errorHandler-->
</client>
<socketpool name="pool0" failover="true" initConn="10" minConn="5" maxConn="250" maintSleep="0"
nagle="false" socketTO="3000" aliveCheck="true">
<servers>10.2.225.210:13000,10.2.225.210:13001,10.2.225.210:13002</servers>
</socketpool>
<socketpool name="pool1" failover="true" initConn="10" minConn="5" maxConn="250" maintSleep="0"
nagle="false" socketTO="3000" aliveCheck="true">
<servers>10.2.225.210:13000</servers>
</socketpool>
<socketpool name="pool11" failover="true" initConn="10" minConn="5" maxConn="250" maintSleep="0"
nagle="false" socketTO="3000" aliveCheck="true">
<servers>10.2.225.210:13000</servers>
</socketpool>
<cluster name="cluster1">
<memCachedClients>mclient1,mclient11</memCachedClients>
</cluster>
</memcached>
可以看到其實pool才是最終連接服務端的配置,看看pool0,它會連接10.2.225.210:13000,10.2.225.210:13001,10.2.225.210:13002這些機器和他們的端口,但是對于使用pool0的mclient0來說它僅僅只是知道有一個叫做mclient0的cache可以保存數據。此時slab就有三個:10.2.225.210:13000和10.2.225.210:13001和10.2.225.210:13002。
當一個key:value要被放入到Memcached中,首先Memcached會根據key的hash算法獲取到hash值來選擇被分配的slab,然后根據value選擇適合的dump區。所謂dump區其實就是根據value的大小來將內存按照存儲單元內容大小分頁。這個是可以配置Memcached的,例如Memcached將slab中的內存劃分成4個dump,第一dump區存儲0-50k大小的數據,第二dump區存儲50-100k的數據,第三dump區存儲100-500k的數據,第四dump區存儲500-1000K的數據。那么當key:value需要被寫入的時候,很容易定位到value所處的dump,分配內存給value。這種分dump模式簡化內存管理,加速了內存回收和分配。但是這里需要注意的幾點就是,首先當你的應用場景中保存的數據大小離散度很高,那么就不是很適合Memcached的這種分配模式,容易造成浪費,例如第一dump區已經滿了,第二第三dump區都還是只有一個數據,那么第二第三dump區不會被回收,第二第三dump區的空間就浪費了。同時Memcached對于value的大小支持到1M,大于1M的內容不適合Memcached存儲。其實在Cache的設計中這樣的情況發生本來就證明設計有問題,Cache只是加速,一般保存都是較小的id或者小對象,用來驗證以及為數據定位作精準細化,而大數據量的內容還是在數據庫等存儲中。
知道了基本的分配機制以后再回過頭來看看代碼:
Map slabs = getCacheClient().statsItems();//獲取所有的slab
//用來收集所有slab的dump號
while(itemsItr.hasNext())
{
String server = itemsItr.next().toString();
Map itemNames = (Map) slabs.get(server);
Iterator itemNameItr = itemNames.keySet().iterator();
while(itemNameItr.hasNext())
{
String itemName = itemNameItr.next().toString();
// itemAtt[0] = itemname
// itemAtt[1] = number
// itemAtt[2] = field
String[] itemAtt = itemName.split(":");
// 如果是itemName中是:number來表示,那么證明是一個存儲數據的dump,還有一些是age的部分
if (itemAtt[2].startsWith("number"))
dumps.put(itemAtt[1], Integer.parseInt(itemAtt[1]));
}
}
//根據收集到的dump來獲取keys
if (!dumps.values().isEmpty())
{
Iterator<Integer> dumpIter = dumps.values().iterator();
while(dumpIter.hasNext())
{
int dump = dumpIter.next();
// statsCacheDump支持三個參數String[],int,int,第一個參數可以省略,默認填入null,表示從那些slab中獲取dump號為第二個參數的keys,如果是null就從當前所有的slab中獲取。第二個參數表示dump號,第三個參數表示返回最多多少個結果。
Map cacheDump = statsCacheDump(dump,limit);
Iterator entryIter = cacheDump.values().iterator();
while (entryIter.hasNext())
{
Map items = (Map)entryIter.next();
Iterator ks = items.keySet().iterator();
while(ks.hasNext())
{
String k = (String)ks.next();
try
{
//這里為什么要作decode,因為其實在我使用的這個Java客戶端存儲的時候,默認會把key都作encoding一次,所以必須要做,不然會出現問題。
k = URLDecoder.decode(k,"UTF-8");
}
catch(Exception ex)
{
Logger.error(ex);
}
if (k != null && !k.trim().equals(""))
{
//這里的fast參數是在方法參數中傳入,作用是什么,其實采用這種搜索slab以及dump的方式獲取keys會發現返回的可能還有一些已經移除的內容的keys,如果覺得需要準確的keys,就在做一次contains的檢查,不過速度就會有一定的影響。
if (fast)
keys.add(k);
else
if (containsKey(k))
keys.add(k);
}
}
}
}
}
NET技術:Memcached使用點滴,轉載需保留來源!
鄭重聲明:本文版權歸原作者所有,轉載文章僅為傳播更多信息之目的,如作者信息標記有誤,請第一時間聯系我們修改或刪除,多謝。