Konu: SQLBulkCopy
hocam foxite.com da yazmis oldugun bu kodun kullanimini biraz anlatabilirmisin ?
.net kodunu nasil derlememiz gerekiyor ?
vfp + sql tarafinda column count ve name lerinin aynisi olmasi sarti disinda diger kısımları anlayamadim
&&SqlBulkCopy is defined in System.Data.SqlClient namespace. Here is .Net code (COM interop version) + a test prg.
&& Target platform: x86
&& Register for COM interop
&& Assumptions: Table has the same columns on both VFP side and SQL server side (that is: column count and column names match. Ordinals may be different).
using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.InteropServices;
using System.Data;
using System.Data.OleDb;
using System.Data.SqlClient;
using System.ComponentModel;
namespace SbcOle
{
[ComSourceInterfaces(typeof(IcbSBCEvents))]
[ClassInterface(ClassInterfaceType.AutoDual)]
[ProgId("cbNetCOM.cbSBC")]
[ComVisible(true)]
public class cbSBC
{
public string SourceConStr = "";
public string TargetConStr = "";
public string DestinationTable = "";
public string SourceTable = "";
public int Odometer = 1000;
public int ReadAtOnce = 10000; // records count to read at once from VFP to prevent OutOfMemoryException
public int BulkCopyTimeout = 864000;
public int RowsToCopy = 0;
public int RowsCopied = 0;
private int rowsBefore = 0;
public delegate void delFeedbackProgress(SbcState state, string message);
public event delFeedbackProgress Feedbackprogress;
public void DoBulkLoad(int mode)
{
if (mode > 0)
{
BackgroundWorker bgWorker = new BackgroundWorker();
bgWorker.DoWork += new DoWorkEventHandler(bgWorker_DoWork);
bgWorker.ProgressChanged += new ProgressChangedEventHandler(bgWorker_ProgressChanged);
bgWorker.RunWorkerCompleted += new RunWorkerCompletedEventHandler(bgWorker_RunWorkerCompleted);
bgWorker.RunWorkerAsync();
}
else
{
BulkLoad();
}
}
private void BulkLoad()
{
// get source columns
OleDbConnection source = new OleDbConnection(SourceConStr);
source.Open();
OleDbCommand getSourceColumns =
new OleDbCommand(String.Format("select * from {0}", SourceTable), source);
OleDbDataReader rdr = getSourceColumns.ExecuteReader();
string[] srcColumns = GetColumnNames(rdr);
source.Close();
// Get Schema
SqlConnection sCon = new SqlConnection(TargetConStr);
sCon.Open();
SqlCommand cmd = new SqlCommand(String.Format("select * from {0} where 1=2", DestinationTable), sCon);
SqlDataReader sRdr = cmd.ExecuteReader();
// get target columns
string[] trgColumns = GetColumnNames((IDataReader)sRdr);
DataTable schemeTable = sRdr.GetSchemaTable();
sRdr.Close();
sCon.Close();
bool hasGuidColumn = false;
foreach (DataRow row in schemeTable.Rows)
{
if ((row["ProviderSpecificDataType"]).ToString() == "System.Data.SqlTypes.SqlGuid")
{
hasGuidColumn = true;
break;
}
}
if (hasGuidColumn)
{
DoBulkLoadWithDataTable(srcColumns, trgColumns);
}
else
{
DoBulkLoadWithDataReader(srcColumns, trgColumns);
}
}
public string[] GetColumnNames(IDataReader rdr)
{
string[] colNames = new string[rdr.FieldCount];
for (int i = 0; i < rdr.FieldCount; i++)
{
colNames[i] = rdr.GetName(i);
}
Array.Sort(colNames);
return colNames;
}
void bgWorker_RunWorkerCompleted(object sender, RunWorkerCompletedEventArgs e)
{
RaiseEvent(SbcState.RunWorkerComplete, "Runworker Completed");
}
void bgWorker_ProgressChanged(object sender, ProgressChangedEventArgs e)
{
;
}
void bgWorker_DoWork(object sender, DoWorkEventArgs e)
{
BackgroundWorker worker = (BackgroundWorker)sender;
BulkLoad();
}
private void DoBulkLoadWithDataReader(string[] srcColumns,string[] trgColumns)
{
OleDbConnection source = new OleDbConnection(SourceConStr);
try
{
// Read with a false where to grab schema from SQL server - like set fmtonly on/off
SqlConnection sCon = new SqlConnection(TargetConStr);
sCon.Open();
// Load from source into existing schema
source.Open();
OleDbCommand getRowCount = new OleDbCommand(String.Format("select count(*) from {0}", SourceTable), source);
RowsToCopy = Convert.ToInt32(getRowCount.ExecuteScalar());
RaiseEvent(SbcState.ReadingFromSource, String.Format("Rows to copy {0}", RowsToCopy));
int startRow = 0;
int endRow = startRow + ReadAtOnce;
while (startRow < RowsToCopy)
{
OleDbCommand sourceCmd = new OleDbCommand(
String.Format("select * from {0} where recno() > {1} and recno() <= {2}",
SourceTable, startRow, startRow + ReadAtOnce),
source);
RaiseEvent(SbcState.ReadingFromSource, "Reading from source...");
using (OleDbDataReader rdr = sourceCmd.ExecuteReader())
{
RaiseEvent(SbcState.ReadingFromSource, String.Format("Read {0} - {1} from source...", startRow, Math.Min(startRow + ReadAtOnce, RowsToCopy)));
UploadBlock(rdr, sCon, srcColumns, trgColumns);
rowsBefore = Math.Min(startRow + ReadAtOnce, RowsToCopy);
}
startRow += ReadAtOnce;
}
RaiseEvent(SbcState.Complete, "Bulk loading completed.");
}
catch (Exception e)
{
RaiseEvent(SbcState.Error, e.Message);
}
finally
{
source.Close();
}
}
private void DoBulkLoadWithDataTable(string[] srcColumns, string[] trgColumns)
{
OleDbConnection source = new OleDbConnection(SourceConStr);
DataTable tbl = new DataTable();
try
{
// Read with a false where to grab schema from SQL server - like set fmtonly on/off
SqlConnection sCon = new SqlConnection(TargetConStr);
sCon.Open();
SqlCommand cmd = new SqlCommand(String.Format("select * from {0} where 1=2", DestinationTable), sCon);
SqlDataReader sRdr = cmd.ExecuteReader();
tbl.Load(sRdr);
// Load from source into existing schema
source.Open();
OleDbCommand getRowCount = new OleDbCommand(String.Format("select count(*) from {0}", SourceTable), source);
RowsToCopy = Convert.ToInt32(getRowCount.ExecuteScalar());
RaiseEvent(SbcState.ReadingFromSource, String.Format("Rows to copy {0}", RowsToCopy));
int startRow = 0;
int endRow = startRow + ReadAtOnce;
while (startRow < RowsToCopy)
{
OleDbCommand sourceCmd = new OleDbCommand(
String.Format("select * from {0} where recno() > {1} and recno() <= {2}",
SourceTable, startRow, startRow + ReadAtOnce),
source);
RaiseEvent(SbcState.ReadingFromSource, "Reading from source...");
using (OleDbDataReader rdr = sourceCmd.ExecuteReader())
{
tbl.Load(rdr);
}
RaiseEvent(SbcState.ReadingFromSource, String.Format("Read {0} - {1} from source...", startRow, Math.Min(startRow + ReadAtOnce, RowsToCopy)));
UploadBlock(tbl, sCon, srcColumns, trgColumns);
tbl.Clear();
rowsBefore = Math.Min(startRow + ReadAtOnce, RowsToCopy);
startRow += ReadAtOnce;
}
RaiseEvent(SbcState.Complete, "Bulk loading completed.");
}
catch (Exception e)
{
RaiseEvent(SbcState.Error, e.Message);
}
finally
{
source.Close();
}
}
private void UploadBlock(DataTable tbl, SqlConnection sCon, string[] srcColumns, string[] trgColumns)
{
// Upload
using (SqlBulkCopy bc = new SqlBulkCopy(sCon))
{
for (int i = 0; i < srcColumns.Length; i++)
{
bc.ColumnMappings.Add(srcColumns[i], trgColumns[i]);
}
bc.BatchSize = 500;
bc.BulkCopyTimeout = BulkCopyTimeout;
bc.NotifyAfter = Odometer;
bc.DestinationTableName = this.DestinationTable;
bc.SqlRowsCopied += new SqlRowsCopiedEventHandler(bc_SqlRowsCopied);
RaiseEvent(SbcState.WritingToServer, "Writing to server...");
bc.WriteToServer(tbl);
bc.Close();
}
}
private void UploadBlock(OleDbDataReader rdr, SqlConnection sCon, string[] srcColumns, string[] trgColumns)
{
// Upload
using (SqlBulkCopy bc = new SqlBulkCopy(sCon))
{
for (int i = 0; i < srcColumns.Length; i++)
{
bc.ColumnMappings.Add(srcColumns[i], trgColumns[i]);
}
bc.BatchSize = 500;
bc.BulkCopyTimeout = BulkCopyTimeout;
bc.NotifyAfter = Odometer;
bc.DestinationTableName = this.DestinationTable;
bc.SqlRowsCopied += new SqlRowsCopiedEventHandler(bc_SqlRowsCopied);
RaiseEvent(SbcState.WritingToServer, "Writing to server...");
bc.WriteToServer(rdr);
bc.Close();
}
}
void bc_SqlRowsCopied(object sender, SqlRowsCopiedEventArgs e)
{
RowsCopied = rowsBefore + (int)e.RowsCopied;
RaiseEvent(SbcState.CopyRows,
String.Format("Copied {0} rows", RowsCopied));
}
void RaiseEvent(SbcState state, string message)
{
if (this.Feedbackprogress != null)
{
this.Feedbackprogress(state, message);
}
}
public string CreateGUID()
{
return Guid.NewGuid().ToString();
}
}
[InterfaceType(ComInterfaceType.InterfaceIsDual)]
[ComVisible(true)]
public interface IcbSBCEvents
{
[DispId(0)]
void Feedbackprogress(SbcState state, string message);
}
[ComVisible(true)]
public enum SbcState
{
Idle = -1,
Error = 0,
ReadingFromSource = 1,
WritingToServer = 2,
CopyRows = 3,
Complete = 4,
RunWorkerComplete = 5
}
//[ClassInterface(ClassInterfaceType.AutoDual)]
//[ProgId("cbNetCom.SbcEventArgs")]
//public class SbcEventArgs : EventArgs
//{
// public SbcState state = SbcState.Idle;
// public string message = "";
//}
}
* Test.prg
lcSourceDb = 'c:\MyFolder\MyDatabase.dbc'
lcTargetDb = 'MySQLDb'
lcSourceTable = 'myTableName' && w/o extension
lcTargetTable = 'myTableName'
Clear
Local o As "cbNetCOM.cbSBC"
o = Createobject("cbNetCOM.cbSBC")
x=Createobject("SBCEvents",o,0,.Null.)
Eventhandler(o,x)
o.SourceConStr='Provider=VFPOLEDB;Data Source='+m.lcSourceDb
o.TargetConStr='server=.\sqlexpress;Trusted_connection=yes;Database='+m.lcTargetDb
o.SourceTable = m.lcSourceTable
o.DestinationTable = m.lcTargetTable
o.ReadAtOnce = 10000 && default 10000
o.Odometer=10000 && inform per 10000 - default 1000
o.BulkCopyTimeout = 5 * 60 && if not completed in 5 minutes fail with timeout
&& default 864000 - 10 days
o.DoBulkLoad(0)
Define Class SBCEvents As Session OlePublic
Implements IcbSBCEvents In "c:\NetCOMBase\SBCOLE\SBCOLE\BIN\DEBUG\SBCOLE.TLB"
oHook = Null
tStart = 0
BulkMode = 0
pBar = Null
Procedure Init(loHook,nMode,oBar)
This.oHook = loHook
This.tStart = Seconds()
This.BulkMode = m.nMode
This.pBar = oBar
Endproc
Procedure IcbSBCEvents_Feedbackprogress(state As VARIANT, Message As String) As VOID
If state = 3
? Textmerge('[<<this.oHook.DestinationTable>>]'+;
' Copied <<this.oHook.RowsCopied>> of <<this.oHook.RowsToCopy>>')
Else
If state = Iif(This.BulkMode=0,4,5)
Eventhandler(This.oHook,This,.T.)
? Textmerge('[<<this.oHook.DestinationTable>>]'+;
' completed in <<SECONDS()-this.tStart>> seconds.')
Else
? This.oHook.rowscopied,Textmerge('[<<this.oHook.DestinationTable>>] <<message>>')
Endif
Endif
Endproc
Enddefine
ayrica Dough Hennig'in br kodu var asagidaki gibi ondada hatada yok ama kaydolan bir sey de yok . Dough errorLog ekledinmi diye sordu loBulkLoad.ErrorLogFile = 'xml_load.txt' ekledim koda ama gene degisen bir sey olmadi.
SET SAFETY off
CREATE CURSOR crsTest(ad c(20),soyad c(50))
INSERT INTO crsTest values('soykan','ozcelik')
INSERT INTO crsTest values('dh ozcelik','ozcelik')
SELECT crsTest
xml_copy2_sql('CrsTest','test','xml_test','.\SQLEXPRESS')
*==============================================================================
* Function: BulkXMLLoad
* Purpose: Performs a SQL Server bulk XML load
* Author: Doug Hennig
* Last revision: 07/06/2006
* Parameters: tcAlias - the alias of the cursor to export
* tcTable - the name of the table to import into
* tcDatabase - the database the table belongs to
* tcServer - the SQL Server name
* tcUserName - the user name for the connection (optional:
* if it isn't specified, Windows Integrated Security is
* used)
* tcPassword - the password for the connection (optional:
* if it isn't specified, Windows Integrated Security is
* used)
* Returns: an empty string if the bulk load succeeded or the text of
* an error message if it failed
* Environment in: the alias specified in tcAlias must be open
* the specified table and database must exist
* the specified server must be accessible
* there must be enough disk space for the XML files
* Environment out: if an empty string is returned, the data was imported into
* the specified table
*==============================================================================
Function xml_copy2_sql
Lparameters tcAlias, ;
tcTable, ;
tcDatabase, ;
tcServer, ;
tcUserName, ;
tcPassword
Local lnSelect, ;
lcSchema, ;
lcData, ;
lcReturn, ;
loException As Exception, ;
lcXSD, ;
loBulkLoad
* Create the XML data and schema files.
lnSelect = Select()
Select (tcAlias)
lcSchema = Forceext(tcTable, 'xsd')
lcData = Forceext(tcTable, 'xml')
Try
Cursortoxml(Alias(), lcData, 1, 512 + 8, 0, lcSchema)
lcReturn = ''
Catch To loException
lcReturn = loException.Message
Endtry
* Convert the XSD into a format acceptable by SQL Server. Add the SQL
* namespace, convert the start and end tags to ,
* use the sql:datatype attribute for DateTime fields, and specify the table
* imported into with the sql:relation attribute.
If Empty(lcReturn)
lcXSD = Filetostr(lcSchema)
lcXSD = Strtran(lcXSD, ':xml-msdata">', ;
':xml-msdata" xmlns:sql="urn:schemas-microsoft-com:mapping-schema">')
lcXSD = Strtran(lcXSD, 'IsDataSet="true">', ;
'IsDataSet="true" sql:is-constant="1">')
lcXSD = Strtran(lcXSD, '<xsd:choice maxOccurs="unbounded">', ;
'<xsd:sequence>')
lcXSD = Strtran(lcXSD, '</xsd:choice>', ;
'</xsd:sequence>')
lcXSD = Strtran(lcXSD, 'type="xsd:dateTime"', ;
'type="xsd:dateTime" sql:datatype="dateTime"')
lcXSD = Strtran(lcXSD, 'minOccurs="0"', ;
'sql:relation="' + Lower(tcTable) + '" minOccurs="0"')
Strtofile(lcXSD, lcSchema)
* Instantiate the SQLXMLBulkLoad object, set its ConnectionString and other
* properties, and call Execute to perform the bulk import.
Try
loBulkLoad = Createobject('SQLXMLBulkLoad.SQLXMLBulkload.4.0')
lcConnString = 'Provider=SQLOLEDB.1;Initial Catalog=' + tcDatabase + ;
';Data Source=' + tcServer + ';Persist Security Info=False;'
If Empty(tcUserName)
lcConnString = lcConnString + 'Integrated Security=SSPI'
Else
lcConnString = lcConnString + 'User ID=' + tcUserName + ;
';Password=' + tcPassword
Endif Empty(tcUserName)
loBulkLoad.ConnectionString = lcConnString
*** Can set the ErrorLogFile property to the name of a file to write import
*** errors to
loBulkLoad.KeepNulls = .T.
loBulkLoad.Execute(lcSchema, lcData)
loBulkLoad.ErrorLogFile = 'xml_load.txt'
lcReturn = ''
Catch To loException
lcReturn = loException.Message
Endtry
* Clean up.
Erase (lcSchema)
Erase (lcData)
Endif Empty(lcReturn)
Select (lnSelect)
Return lcReturn
Endfunc