摘 要: 實時數據分發系統指使用實時數據分發服務中間件的分布式系統。實時數據分發服務中間件采用實時發布-訂閱協議,通過在數據發布方和訂閱方之間配置的服務質量參數,可以實現不同的數據需求和傳輸方式。該中間件能夠跨平臺操作,屏蔽底層操作系統的差異,這有利于系統靈活動態地擴展和升級。文中闡述了數據分發服務的思想和模型,以RTI公司的數據分發服務中間件DDS為例,具體研究了該中間件的軟件設計和實現。
關鍵詞: 發布-訂閱協議; 實時分布式系統; 數據分發服務
實時數據分發系統指采用數據分發服務DDS(Data Distribution Service)中間件的實時分布式系統。OMG的數據分發服務正逐漸成為采用實時發布-訂閱RTPS(Real Time Publish-Subscribe)協議的分布式系統的標準,此標準的目的是提供一種通用的應用層接口,能夠清楚地定義數據分發的服務,提供一種在實時分布式計算環境中以數據為中心的通信規范,定義一個可擴展、與平臺無關、與位置無關的基礎服務模型,以連接發布者和訂閱者,支持服務質量(QoS)和屬性。該通信模型在空間上(節點可以在網絡中的任何地方)、時間上(發布后立即或稍微有些延遲后傳輸)、傳輸上(可根據帶寬流量控制可靠性,每個用戶可以管理自己數據傳輸的狀態)、功能上(多源的數據可以很容易地添加和刪除)都是松散耦合的。
1 數據分發服務
OMG的數據分發服務(DDS)的核心是以數據為中心的發布-訂閱DCPS(Data-Centric Publish-Subscribe),即發布者能高效地將正確的信息傳遞給適當的訂閱者。DCPS定義了在實時分布式系統中運行在異構平臺上的標準接口的應用程序能從系統的全局數據空間讀/寫數據。具有以下特點:
(1)強制類型的全局數據空間
DDS創建一個全局性的數據空間,任何參與者使用數據類型接口高效自然地讀/寫數據,讓每個訂閱者都可以訪問到“最新”的數據。它還建立了一個名稱空間,讓參與者查找和共享數據的對象。為了提高可擴展性,主題(Topic)可以包含多個獨立的數據通道稱為“關鍵字”(key)。這使得節點可以一次性訂閱到很多,可能是數以千計的相似的數據流。當數據到達時,中間件根據關鍵字排序,提供有效的處理。
(2)每個數據流可控的服務質量
DDS允許控制“每個數據流”的服務質量(QoS)。每對發布者——訂閱者之間可以建立獨立的QoS協議。 DDS也具有獨特的允許應用設計支持極其復雜的、靈活的數據流需求。QoS參數幾乎能控制DDS模型和基本的通信機制的每個方面。
(3)自動發現和管理數據流
DDS能設計成自動發現發布者和訂閱者的每一個主題,根據它們之間設置的QoS參數自動建立數據流。DDS模型提供了快速透明的定位。這使得它非常適合用于系統動態配置的變化。它能迅速發現新的節點,新的節點上的新的參與者,新參與者的新的數據主題。能及時地清除掉過時的、失敗的節點和數據流。
(4)不可靠的無連接傳輸
DDS以不可靠的傳輸方式工作,如UDP或無線網絡。沒有任何設備需要中央服務器或特殊節點。高效、直接、點對點的通信、甚至多播,可以實現模型的每一個功能。
2 DDS的對象模型
圖1顯示了典型的DDS分布式應用的關鍵對象,域(Domain)、域成員(DomainParticipant)、數據發送器(Data writer) 和發布者(publisher)、 訂閱者(Subscriber)和數據接收器(Data reader)、主題(Topic) 組成。域成員對象指的是域內參與應用的成員,參與者可以在同一個域上進行通信。
發布者負責數據的發布,可以發布不同類型的數據。域成員通過發布者的數據發送器交流數據和改變數據類型。一旦新的數據值傳達給發布者,就由發布者決定何時執行數據的發布,實際數據發布的執行取決于發布者的服務質量。
訂閱者接收發布者的數據,并提供給域成員。訂閱者可接收和轉發不同的數據類型。要訪問接收到的數據,域成員必須通過和訂閱者相關的數據接收器。
數據發送器對象與數據接收器對象通過主題聯系。主題和數據類型以及數據本身的服務質量(QoS)關聯。DomainParticipantFactory(域成員廠)用于創建和刪除域成員,域成員通過DomainParticipant::get_domain_id()綁定到域上,域成員可以創建和刪除主題、發布者和訂閱者;發布者用于強制創建和刪除數據發送器先前創建的主題的類型;訂閱者用于強制創建和刪除數據接收器先前創建的主題的類型; DataWriter::write() 用來使應用程序更新數據值;DataReader::take()用來從DDS中間件中接收數據樣本。DDS的編程模型如圖2所示。
抽象的實體接口定義了域成員和域實體(DomainEntities)(即主題、發布者、數據發送器、訂閱者和數據接收器)支持的通用行為。get_qos() 和set_qos()用來檢索和修改服務質量;get_listener()可被DDS中間件用來探測和安裝用戶聽眾的具體地位的條件。確切的服務質量參數列表是針對每個具體的實體類型。
3 應用軟件的設計和實現
RTI公司的DDS中間件是執行數據分發服務標準的一個典型代表。下面采用該公司的DDS中間件并結合Demo程序(發布-訂閱的主題用圓形、三角形和方形的幾何圖形形象的表示),研究實時數據分發系統的軟件設計和實現。Demo程序的運行界面如圖3所示。
3.1 應用軟件的設計
一般情況下,一個應用程序可以屬于多個域(如圖4所示),創建若干域成員,三個應用程序在域A交流命令/控制數據,其他三個應用程序在域B中交流狀態數據,其他兩個應用程序是域C內通信。創建多個域,可以提供有效的數據隔離。
DDS的主題是強制的數據類型。DDS中間件支持多語言的綁定,在OMG的接口描述語言(IDL)中,定義類型TopicInfo。TopicInfo包含一個graphics變量——確定圖形的形狀和value變量——圖形的顏色。把graphics字段表記為關鍵字,讓DDS中間件區分不同圖形所更新的數據。
struct TopicInfo {
int graphics; //key
int value;};
鑒于IDL類型定義,DDS的中間件提供的代碼生成器用來生成特定語言代碼來處理數據類型。生成的代碼將在目標語言中包含類型定義(在本文中使用C++語言)、特定標準類型的API、中間件執行的特定代碼。對于TopicInfo數據類型,生成下列標準類:TopicInfoTypeSupport、TopicInfoDataWriter、TopicInfoDataReader。
3.2 應用軟件的實現
(1)創建DomainParticipant綁定到域上
factory = DDSDomainParticipantFactory::get_instance();
factory->get_default_participant_qos(participant_qos);
participant = factory->create_participant(domain_id, participant_qos, NULL /* participant_listener*/);
(2)注冊用戶的數據類型
TopicInfoTypeSupport::register_type()用來注冊數據類型,可以在DomainParticipant中使用。
(3)創建主題(Topics)
使用標準的DDS API創建命名主題綁定到注冊用戶的數據類型上。主題會自動地在DataWriters和DataReaders之間建立一個數據流。
participant->get_default_topic_qos(topic_qos);
participant->create_topic("Square","TopicInfo",topic_qos, NULL/* topic_listener*/);
(4)創建Publishers和DataWriters
在給定的DomainParticipant上,創建Publisher。一個Publishr可以有不同主題的DataWriters。DataWriter提供發送數據樣本的方法,管理輸入數據通道的生命周期。
participant->get_default_publisher_qos(publisher_qos);
publisher=participant->create_publisher( publisher_qos,NULL /* publisher_listener */);
publisher->get_default_datawriter_qos(writer_qos);
writer = publisher->create_datawriter( topic, writer_qos,NULL /* writer_listener */);
(5)發送數據更新
對于給定的writer,使用::write()方法將新的數據值發布給subscribers。
while (1) {
retcode = writer->write(topic_info, topic _instance_handle);}
(6)創建Subscribers和DataReaders
在給定的DomainParticipant,創建Subscriber,一個Subscriber可以有不同主題的DataReaders。通過調用on_data_available()方法處理接收到的數據樣本。
participant->get_default_subscriber_qos(subscriber_qos);
subscriber=participant->create_subscriber(subscriber_qos,NULL /* subscriber_listener*/);
DDSDataReaderListener* reader_listener = new MyReaderListener();
subscriber->get_default_datareader_qos(reader_qos);
reader = subscriber->create_datareader(topic, reader_qos, reader_listener);
(7)接收數據樣本
當應用程序檢索到數據樣本時,調用::on_data_available()函數接收數據;接收到topic_info數據時,調用::take()函數匹配DDS中間件里的數據狀態標準。接收完數據樣本后,應用程序通過調用::return_loan()函數返回。
void MyReaderListener::on_data_available(DDSDataReader* reader) {
TopicInfoDataReader *myReader = (TopicInfoDataReader *)reader;
retcode = myReader->take( data_seq, info_seq,
DDS_LENGTH_UNLIMITED,
DDS_ANY_SAMPLE_STATE,
DDS_ANY_VIEW_STATE,
DDS_ANY_INSTANCE_STATE);
for (int i = 0; i < data_seq.length(); ++i) {
/* display_ graphics _measurement(&data_seq[i]); */ }
myReader->return_loan(data_seq, info_seq);}
(8)調整服務質量(QoS)
在DataReader和DataWriter對象之間調整數據流的QoS參數設置,也可以通過get_qos()/set_qos()指定參數設置。
DataReaders可以要求不同程度的可靠性交付,從快速但不可靠到高可靠性順序交付。這為每個數據流提供了可靠性控制。
中間件可以通過QoS的OwnershipStrength參數自動判斷多個DataWriters的同一主題所代表的信息。根據唯一性,一個通道上的一個主題只能由單一的DataWriter更新。DataReaders從DataWriter接收具有最高權值的數據,并提供了自動故障排除功能。如果一個高優先級的DataWriter失敗,所有DataReaders立即接收較弱的備份 DataWriter的更新數據。DataWriter也可以指定持續時間,通過QoS參數Lifespan(預期生存期限)標示樣本是否過時。DDS可以定義許多QoS參數。每對相關的實體都可以在此基礎上調整各自的行為。
(9)狀態改變的響應
每個DDS實體的狀態可以通過get_*_status()函數更新。使用偵聽者,當狀態發生變化后中間件會自動通知或等待條件變量觸發特定的狀態變化。
應用程序可以對特定狀態的變化做出響應。例如,應用程序可以得到通知,當DataWriter不再有效或其活力發生了變化;或當另一具有相同名稱的主題存在,但具有不同的特征;或當QoS參數不符合提供的數據;或當錯過了生存期限;或者當發現了一個新的與主題匹配的DataReader(或DataWriter);或當數據有效;或當數據樣本丟失或被拒絕。應用程序響應狀態變化可在每個實例的基礎上調整。
OMG的DDS創建了一個簡單的數據通信結構,實現了復雜的數據模式。采用DDS中間件發送和接收數據,每個實體通過調整QoS參數支持復雜的數據通信模式配置,減輕了開發者擔心的低層次的網絡編程任務。該中間件能夠跨平臺操作,便于應用程序的移植和維護,易于組建和實現一個擴展的、易維護的實時分布式系統。
參考文獻
[1] Middleware solutions for automation applications-case RTPS [R]. Helsinki University of Technology Information and Computer Systems in Automation Espoo 2003.
[2] RTPS wire protocol specification[S]. Version 1.0, November 2001,Real-Time Innovations.
[3] Interface for distributed automation architecture description and specification[S]. Revision 1.1,November 2002.
[4] Data distribution service for real-time systems version 1.2[S]. OMG Available Specification,January 2007.
[5] REVILL G. Designing and debugging real-time distributed systems[J]. Safety Critical Embedded Systems,2008(2):27-31.